Commit b44171d0 by yunpeng.song

修复device 接口的transport client 被提前delete 的bug

parent 2519e2dc
package control package control
import ( import (
"context"
"ficus/mission" "ficus/mission"
"ficus/proto" "ficus/proto"
"ficus_clientserver/config" "ficus_clientserver/config"
...@@ -103,7 +104,7 @@ func (p *MsgSender) MsgFeedBack(msg *model.Msg) { ...@@ -103,7 +104,7 @@ func (p *MsgSender) MsgFeedBack(msg *model.Msg) {
tool.Retry(config.GetRetryTimes(), config.GetRetryTime(), func() func() error { tool.Retry(config.GetRetryTimes(), config.GetRetryTime(), func() func() error {
temp := msg temp := msg
return func() error { return func() error {
_, err := d.Feedback(temp.Message) _, err := d.Feedback(context.Background(), temp.Message)
return err return err
} }
}()) }())
...@@ -128,7 +129,7 @@ func (p *MsgSender) DispatchMsg(msg *model.Msg) (r bool, err error) { ...@@ -128,7 +129,7 @@ func (p *MsgSender) DispatchMsg(msg *model.Msg) (r bool, err error) {
p.SendRetryMsg(msg) p.SendRetryMsg(msg)
} else { } else {
push := thriftservice.NewDispatchMsgTrans(client.Trans) push := thriftservice.NewDispatchMsgTrans(client.Trans)
if err = push.Dispatch(msg.Message); err != nil { if err = push.Dispatch(context.Background(), msg.Message); err != nil {
p.SendRetryMsg(msg) p.SendRetryMsg(msg)
return true, nil return true, nil
} }
......
package nethandle package nethandle
import ( import (
"context"
"ficus/device" "ficus/device"
"ficus/identity" "ficus/identity"
"ficus/service" "ficus/service"
...@@ -8,20 +9,7 @@ import ( ...@@ -8,20 +9,7 @@ import (
type ( type (
DeviceHandler interface { DeviceHandler interface {
Identify(uuid string) (r *identity.Organization, err error) service.DeviceService
Bind(uuid string, device *identity.Organization) (r bool, err error)
Unbind(uuid string) (r bool, err error)
Activate(device *device.Device, ip string) (r bool, err error)
Deactivate(uuid string, ip string) (r bool, err error)
Keep(device *device.Device, ip string) (r bool, err error)
GetPeer(uuid string) (r string, err error)
GetDeviceEx(uuid string) (r *device.DeviceEx, err error)
ListByStatusEx(status int32, page int32, pageSize int32) (r *service.DevicesExPage, err error)
ListByOrganizationEx(org *identity.Organization, page int32, pageSize int32) (r *service.DevicesExPage, err error)
ListExpired(seconds int32, ip string) (r *service.DevicesPage, err error)
ListByGroupEx(group string, page int32, pageSize int32) (r *service.DevicesExPage, err error)
ListByRegionEx(rgn string, page int32, pageSize int32) (r *service.DevicesExPage, err error)
ListByVersionEx(ver string, page int32, pageSize int32) (r *service.DevicesExPage, err error)
SetDispatch() error SetDispatch() error
} }
...@@ -46,83 +34,83 @@ func (d *DeviceHandle) SetDispatch() error { ...@@ -46,83 +34,83 @@ func (d *DeviceHandle) SetDispatch() error {
} }
// client 获取绑定门店信息接口 // client 获取绑定门店信息接口
func (d *DeviceHandle) Identify(uuid string) (r *identity.Organization, err error) { func (d *DeviceHandle) Identify(ctx context.Context, uuid string) (r *identity.Organization, err error) {
//TODO 调用设备服务获取绑定的门店信息 //TODO 调用设备服务获取绑定的门店信息
err = d.SetDispatch() err = d.SetDispatch()
if err != nil { if err != nil {
return return
} }
return d.deviceDispatch.Identify(uuid) return d.deviceDispatch.Identify(ctx, uuid)
} }
// client 绑定门店信息接口 // client 绑定门店信息接口
func (d *DeviceHandle) Bind(uuid string, device *identity.Organization) (r bool, err error) { func (d *DeviceHandle) Bind(ctx context.Context, uuid string, device *identity.Organization) (r bool, err error) {
//TODO 调用设备服务绑定/解绑设备 //TODO 调用设备服务绑定/解绑设备
err = d.SetDispatch() err = d.SetDispatch()
if err != nil { if err != nil {
return return
} }
return d.deviceDispatch.Bind(uuid, device) return d.deviceDispatch.Bind(ctx, uuid, device)
} }
// client 接触绑定接口 // client 接触绑定接口
func (d *DeviceHandle) Unbind(uuid string) (r bool, err error) { func (d *DeviceHandle) Unbind(ctx context.Context, uuid string) (r bool, err error) {
//TODO 解除绑定 //TODO 解除绑定
err = d.SetDispatch() err = d.SetDispatch()
if err != nil { if err != nil {
return return
} }
return d.deviceDispatch.Unbind(uuid) return d.deviceDispatch.Unbind(ctx, uuid)
} }
func (d *DeviceHandle) Activate(device *device.Device, ip string) (r bool, err error) { func (d *DeviceHandle) Activate(ctx context.Context, device *device.Device, ip string) (r bool, err error) {
//TODO 设备上线 //TODO 设备上线
return false, nil return false, nil
} }
// client 下线接口 // client 下线接口
func (d *DeviceHandle) Deactivate(uuid string, ip string) (r bool, err error) { func (d *DeviceHandle) Deactivate(ctx context.Context, uuid string, ip string) (r bool, err error) {
err = d.SetDispatch() err = d.SetDispatch()
if err != nil { if err != nil {
return return
} }
return d.deviceDispatch.Deactivate(uuid, ip) return d.deviceDispatch.Deactivate(ctx, uuid, ip)
} }
func (d *DeviceHandle) Keep(device *device.Device, ip string) (r bool, err error) { func (d *DeviceHandle) Keep(ctx context.Context, device *device.Device, ip string) (r bool, err error) {
//TODO 心跳 //TODO 心跳
return true, nil return true, nil
} }
func (d *DeviceHandle) GetPeer(uuid string) (r string, err error) { func (d *DeviceHandle) GetPeer(ctx context.Context, uuid string) (r string, err error) {
//TODO 获取server ip //TODO 获取server ip
return "", nil return "", nil
} }
func (d *DeviceHandle) GetDeviceEx(uuid string) (r *device.DeviceEx, err error) { func (d *DeviceHandle) GetDeviceEx(ctx context.Context, uuid string) (r *device.DeviceEx, err error) {
return return
} }
func (d *DeviceHandle) ListByStatusEx(status int32, page int32, pageSize int32) (r *service.DevicesExPage, err error) { func (d *DeviceHandle) ListByStatusEx(ctx context.Context, status int32, page int32, pageSize int32) (r *service.DevicesExPage, err error) {
return nil, nil return nil, nil
} }
func (d *DeviceHandle) ListByOrganizationEx(org *identity.Organization, page int32, pageSize int32) (r *service.DevicesExPage, err error) { func (d *DeviceHandle) ListByOrganizationEx(ctx context.Context, org *identity.Organization, page int32, pageSize int32) (r *service.DevicesExPage, err error) {
return nil, nil return nil, nil
} }
func (d *DeviceHandle) ListExpired(seconds int32, ip string) (r *service.DevicesPage, err error) { func (d *DeviceHandle) ListExpired(ctx context.Context, seconds int32, ip string) (r *service.DevicesPage, err error) {
return nil, nil return nil, nil
} }
func (d *DeviceHandle) ListByGroupEx(group string, page int32, pageSize int32) (r *service.DevicesExPage, err error) { func (d *DeviceHandle) ListByGroupEx(ctx context.Context, group string, page int32, pageSize int32) (r *service.DevicesExPage, err error) {
return nil, nil return nil, nil
} }
func (d *DeviceHandle) ListByRegionEx(rgn string, page int32, pageSize int32) (r *service.DevicesExPage, err error) { func (d *DeviceHandle) ListByRegionEx(ctx context.Context, rgn string, page int32, pageSize int32) (r *service.DevicesExPage, err error) {
return nil, nil return nil, nil
} }
func (d *DeviceHandle) ListByVersionEx(ver string, page int32, pageSize int32) (r *service.DevicesExPage, err error) { func (d *DeviceHandle) ListByVersionEx(ctx context.Context, ver string, page int32, pageSize int32) (r *service.DevicesExPage, err error) {
return return
} }
package nethandle package nethandle
import ( import (
"context"
"ficus/device" "ficus/device"
"ficus/mission" "ficus/mission"
"ficus/service"
"ficus_clientserver/config" "ficus_clientserver/config"
"ficus_clientserver/model" "ficus_clientserver/model"
"ficus_clientserver/tool" "ficus_clientserver/tool"
...@@ -12,10 +14,7 @@ import ( ...@@ -12,10 +14,7 @@ import (
type ( type (
DispatchHandler interface { DispatchHandler interface {
Login(device *device.Device) (r bool, err error) service.DispatchService
Heartbeat(whom *device.Device) (r bool, err error)
Feedback(response *mission.Message) (r bool, err error)
Dispatch(request *mission.Message) (err error)
SetDispatch() error SetDispatch() error
} }
...@@ -44,18 +43,18 @@ func (d *DispatchHandle) SetDispatch() error { ...@@ -44,18 +43,18 @@ func (d *DispatchHandle) SetDispatch() error {
} }
// client心跳接口 // client心跳接口
func (d *DispatchHandle) Heartbeat(whom *device.Device) (r bool, err error) { func (d *DispatchHandle) Heartbeat(ctx context.Context, whom *device.Device) (r bool, err error) {
log.WithFields(log.Fields{"func": "Heartbeat", "心跳id": whom.ID}).Info(whom) log.WithFields(log.Fields{"func": "Heartbeat", "心跳id": whom.ID}).Info(whom)
return d.dispatchService.Heartbeat(whom) return d.dispatchService.Heartbeat(ctx, whom)
} }
// client消息回调接口 // client消息回调接口
func (d *DispatchHandle) Feedback(response *mission.Message) (r bool, err error) { func (d *DispatchHandle) Feedback(ctx context.Context, response *mission.Message) (r bool, err error) {
log.WithFields(log.Fields{"data": string(response.Data)}).Info(response) log.WithFields(log.Fields{"data": string(response.Data)}).Info(response)
// TODO 更新消息状态 // TODO 更新消息状态
tool.Retry(config.GetRetryTimes(), config.GetRetryTime(), func() error { tool.Retry(config.GetRetryTimes(), config.GetRetryTime(), func() error {
// TODO 回调消息 // TODO 回调消息
d.dispatchService.Feedback(response) d.dispatchService.Feedback(ctx, response)
d.msgControl.DeleteMsg(response.ID) d.msgControl.DeleteMsg(response.ID)
return nil return nil
}) })
...@@ -64,11 +63,11 @@ func (d *DispatchHandle) Feedback(response *mission.Message) (r bool, err error) ...@@ -64,11 +63,11 @@ func (d *DispatchHandle) Feedback(response *mission.Message) (r bool, err error)
} }
// client登录接口 // client登录接口
func (d *DispatchHandle) Login(device *device.Device) (r bool, err error) { func (d *DispatchHandle) Login(ctx context.Context, device *device.Device) (r bool, err error) {
log.WithFields(log.Fields{"func": "Login", "登录id": device.ID}).Info(device) log.WithFields(log.Fields{"func": "Login", "登录id": device.ID}).Info(device)
return d.dispatchService.Login(device) return d.dispatchService.Login(ctx, device)
} }
func (d *DispatchHandle) Dispatch(request *mission.Message) (err error) { func (d *DispatchHandle) Dispatch(ctx context.Context, request *mission.Message) (err error) {
return nil return nil
} }
package nethandle package nethandle
import "ficus/pkg" import (
"context"
"ficus/pkg"
"ficus/service"
)
type ( type (
PackageHandler interface { PackageHandler interface {
GetSpec(pkgName string, ver string, partnerId string) (r *pkg.PackageSpec, err error) service.PackageService
SetDispatch() error SetDispatch() error
} }
PackageHandle struct { PackageHandle struct {
...@@ -19,8 +23,8 @@ func NewPackageHandle(p PackageHandler) PackageHandler { ...@@ -19,8 +23,8 @@ func NewPackageHandle(p PackageHandler) PackageHandler {
return h return h
} }
func (p *PackageHandle) GetSpec(pkgName string, ver string, partnerId string) (r *pkg.PackageSpec, err error) { func (p *PackageHandle) GetSpec(ctx context.Context, pkgName string, ver string, partnerId string) (r *pkg.PackageSpec, err error) {
return p.PackageHandler.GetSpec(pkgName, ver, partnerId) return p.PackageHandler.GetSpec(ctx, pkgName, ver, partnerId)
} }
func (p *PackageHandle) SetDispatch() (err error) { func (p *PackageHandle) SetDispatch() (err error) {
......
package nethandle package nethandle
import ( import (
"context"
"errors" "errors"
"ficus/mission" "ficus/mission"
"ficus/native/service"
"ficus_clientserver/model" "ficus_clientserver/model"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
...@@ -10,7 +12,7 @@ import ( ...@@ -10,7 +12,7 @@ import (
type ( type (
ScheduleHandler interface { ScheduleHandler interface {
Schedule(request *mission.Message) (r bool, err error) service.ScheduleService
} }
ScheduleHandle struct { ScheduleHandle struct {
...@@ -29,7 +31,7 @@ func NewScheduleHandle(m *model.MsgControl, c *model.ClientManager, s ScheduleHa ...@@ -29,7 +31,7 @@ func NewScheduleHandle(m *model.MsgControl, c *model.ClientManager, s ScheduleHa
return h return h
} }
func (s *ScheduleHandle) Schedule(request *mission.Message) (r bool, err error) { func (s *ScheduleHandle) Schedule(ctx context.Context, request *mission.Message) (r bool, err error) {
log.Info("Schedule start ", request) log.Info("Schedule start ", request)
if !s.clientManager.IsKey(request.Device) { if !s.clientManager.IsKey(request.Device) {
return false, errors.New("机器不在线") return false, errors.New("机器不在线")
......
...@@ -27,93 +27,93 @@ func NewDeviceDispatch() *DeviceDispatch { ...@@ -27,93 +27,93 @@ func NewDeviceDispatch() *DeviceDispatch {
} }
// Identify 获取设备的绑定门店信息 // Identify 获取设备的绑定门店信息
func (d *DeviceDispatch) Identify(uuid string) (r *identity.Organization, err error) { func (d *DeviceDispatch) Identify(ctx context.Context, uuid string) (r *identity.Organization, err error) {
defer d.tran.Close() defer d.tran.Close()
r, err = d.client.Identify(context.Background(), uuid) r, err = d.client.Identify(ctx, uuid)
log.Info("dispatch Identify", err, uuid) log.Info("dispatch Identify", err, uuid)
return return
} }
// Bind 绑定设备门店信息 // Bind 绑定设备门店信息
func (d *DeviceDispatch) Bind(uuid string, device *identity.Organization) (r bool, err error) { func (d *DeviceDispatch) Bind(ctx context.Context, uuid string, device *identity.Organization) (r bool, err error) {
defer d.tran.Close() defer d.tran.Close()
r, err = d.client.Bind(context.Background(), uuid, device) r, err = d.client.Bind(ctx, uuid, device)
log.Info("dispatch Bind", err, device) log.Info("dispatch Bind", err, device)
return return
} }
// Unbind 绑定设备门店信息 // Unbind 绑定设备门店信息
func (d *DeviceDispatch) Unbind(uuid string) (r bool, err error) { func (d *DeviceDispatch) Unbind(ctx context.Context, uuid string) (r bool, err error) {
defer d.tran.Close() defer d.tran.Close()
r, err = d.client.Unbind(context.Background(), uuid) r, err = d.client.Unbind(ctx, uuid)
log.Info("dispatch Unbind", err, uuid) log.Info("dispatch Unbind", err, uuid)
return return
} }
// Activate 设备上线 // Activate 设备上线
func (d *DeviceDispatch) Activate(device *device.Device, ip string) (r bool, err error) { func (d *DeviceDispatch) Activate(ctx context.Context, device *device.Device, ip string) (r bool, err error) {
defer d.tran.Close() defer d.tran.Close()
log.Info("dispatch Activate start") log.Info("dispatch Activate start")
r, err = d.client.Activate(context.Background(), device, ip) r, err = d.client.Activate(ctx, device, ip)
log.Info("dispatch Activate", err, device, ip) log.Info("dispatch Activate", err, device, ip)
return return
} }
// Deactivate 设备下线 // Deactivate 设备下线
func (d *DeviceDispatch) Deactivate(uuid string, ip string) (r bool, err error) { func (d *DeviceDispatch) Deactivate(ctx context.Context, uuid string, ip string) (r bool, err error) {
defer d.tran.Close() defer d.tran.Close()
r, err = d.client.Deactivate(context.Background(), uuid, ip) r, err = d.client.Deactivate(ctx, uuid, ip)
log.Info("dispatch Deactivate end ", r, err, uuid, ip) log.Info("dispatch Deactivate end ", r, err, uuid, ip)
return return
} }
// Keep 心跳接口 // Keep 心跳接口
func (d *DeviceDispatch) Keep(device *device.Device, ip string) (r bool, err error) { func (d *DeviceDispatch) Keep(ctx context.Context, device *device.Device, ip string) (r bool, err error) {
defer d.tran.Close() defer d.tran.Close()
r, err = d.client.Keep(context.Background(), device, ip) r, err = d.client.Keep(ctx, device, ip)
log.Info("dispatch Keep", err, device, ip) log.Info("dispatch Keep", err, device, ip)
return return
} }
func (d *DeviceDispatch) GetPeer(uuid string) (r string, err error) { func (d *DeviceDispatch) GetPeer(ctx context.Context, uuid string) (r string, err error) {
defer d.tran.Close() defer d.tran.Close()
return "", nil return "", nil
} }
func (d *DeviceDispatch) GetDeviceEx(uuid string) (r *device.DeviceEx, err error) { func (d *DeviceDispatch) GetDeviceEx(ctx context.Context, uuid string) (r *device.DeviceEx, err error) {
defer d.tran.Close() defer d.tran.Close()
return return
} }
func (d *DeviceDispatch) ListByStatusEx(status int32, page int32, pageSize int32) (r *service.DevicesExPage, err error) { func (d *DeviceDispatch) ListByStatusEx(ctx context.Context, status int32, page int32, pageSize int32) (r *service.DevicesExPage, err error) {
defer d.tran.Close() defer d.tran.Close()
return nil, nil return nil, nil
} }
func (d *DeviceDispatch) ListByOrganizationEx(org *identity.Organization, page int32, pageSize int32) (r *service.DevicesExPage, err error) { func (d *DeviceDispatch) ListByOrganizationEx(ctx context.Context, org *identity.Organization, page int32, pageSize int32) (r *service.DevicesExPage, err error) {
defer d.tran.Close() defer d.tran.Close()
return nil, nil return nil, nil
} }
func (d *DeviceDispatch) ListByGroupEx(group string, page int32, pageSize int32) (r *service.DevicesExPage, err error) { func (d *DeviceDispatch) ListByGroupEx(ctx context.Context, group string, page int32, pageSize int32) (r *service.DevicesExPage, err error) {
defer d.tran.Close() defer d.tran.Close()
return nil, nil return nil, nil
} }
func (d *DeviceDispatch) ListByRegionEx(rgn string, page int32, pageSize int32) (r *service.DevicesExPage, err error) { func (d *DeviceDispatch) ListByRegionEx(ctx context.Context, rgn string, page int32, pageSize int32) (r *service.DevicesExPage, err error) {
defer d.tran.Close() defer d.tran.Close()
return nil, nil return nil, nil
} }
func (d *DeviceDispatch) ListByVersionEx(ver string, page int32, pageSize int32) (r *service.DevicesExPage, err error) { func (d *DeviceDispatch) ListByVersionEx(ctx context.Context, ver string, page int32, pageSize int32) (r *service.DevicesExPage, err error) {
defer d.tran.Close() defer d.tran.Close()
return return
} }
// 获取空闲设备 // 获取空闲设备
func (d *DeviceDispatch) ListExpired(seconds int32, ip string) (r *service.DevicesPage, err error) { func (d *DeviceDispatch) ListExpired(ctx context.Context, seconds int32, ip string) (r *service.DevicesPage, err error) {
defer d.tran.Close() defer d.tran.Close()
r, err = d.client.ListExpired(context.Background(), seconds, ip) r, err = d.client.ListExpired(ctx, seconds, ip)
log.Info("dispatch ListExpired", err, seconds, ip) log.Info("dispatch ListExpired", err, seconds, ip)
return return
} }
......
...@@ -27,21 +27,21 @@ func NewDeviceHandle() *DeviceHandle { ...@@ -27,21 +27,21 @@ func NewDeviceHandle() *DeviceHandle {
func (d *DeviceHandle) Identify(ctx context.Context, uuid string) (r *identity.Organization, err error) { func (d *DeviceHandle) Identify(ctx context.Context, uuid string) (r *identity.Organization, err error) {
//TODO 调用设备服务获取绑定的门店信息 //TODO 调用设备服务获取绑定的门店信息
log.Info("Identify", uuid) log.Info("Identify", uuid)
return d.deviceDispatch.Identify(uuid) return d.deviceDispatch.Identify(ctx, uuid)
} }
// client 绑定门店信息接口 // client 绑定门店信息接口
func (d *DeviceHandle) Bind(ctx context.Context, uuid string, device *identity.Organization) (r bool, err error) { func (d *DeviceHandle) Bind(ctx context.Context, uuid string, device *identity.Organization) (r bool, err error) {
//TODO 调用设备服务绑定/解绑设备 //TODO 调用设备服务绑定/解绑设备
log.Info("Bind", uuid, device) log.Info("Bind", uuid, device)
return d.deviceDispatch.Bind(uuid, device) return d.deviceDispatch.Bind(ctx, uuid, device)
} }
// client 接触绑定接口 // client 接触绑定接口
func (d *DeviceHandle) Unbind(ctx context.Context, uuid string) (r bool, err error) { func (d *DeviceHandle) Unbind(ctx context.Context, uuid string) (r bool, err error) {
//TODO 解除绑定 //TODO 解除绑定
log.Info("Unbind", uuid) log.Info("Unbind", uuid)
return d.deviceDispatch.Unbind(uuid) return d.deviceDispatch.Unbind(ctx, uuid)
} }
func (d *DeviceHandle) Activate(ctx context.Context, device *device.Device, ip string) (r bool, err error) { func (d *DeviceHandle) Activate(ctx context.Context, device *device.Device, ip string) (r bool, err error) {
...@@ -52,7 +52,7 @@ func (d *DeviceHandle) Activate(ctx context.Context, device *device.Device, ip s ...@@ -52,7 +52,7 @@ func (d *DeviceHandle) Activate(ctx context.Context, device *device.Device, ip s
// client 下线接口 // client 下线接口
func (d *DeviceHandle) Deactivate(ctx context.Context, uuid string, ip string) (r bool, err error) { func (d *DeviceHandle) Deactivate(ctx context.Context, uuid string, ip string) (r bool, err error) {
log.Info("Deactivate", uuid, ip) log.Info("Deactivate", uuid, ip)
return d.deviceDispatch.Deactivate(uuid, ip) return d.deviceDispatch.Deactivate(ctx, uuid, ip)
} }
func (d *DeviceHandle) Keep(ctx context.Context, device *device.Device, ip string) (r bool, err error) { func (d *DeviceHandle) Keep(ctx context.Context, device *device.Device, ip string) (r bool, err error) {
//TODO 心跳 //TODO 心跳
......
...@@ -24,21 +24,21 @@ func NewDispatchService(trans thrift.TTransport, clientManager *model.ClientMana ...@@ -24,21 +24,21 @@ func NewDispatchService(trans thrift.TTransport, clientManager *model.ClientMana
// client心跳接口 // client心跳接口
func (p *DispatchHandle) Heartbeat(ctx context.Context, whom *device.Device) (r bool, err error) { func (p *DispatchHandle) Heartbeat(ctx context.Context, whom *device.Device) (r bool, err error) {
return p.dispatchService.Heartbeat(whom) return p.dispatchService.Heartbeat(ctx, whom)
} }
// client消息回调接口 // client消息回调接口
func (p *DispatchHandle) Feedback(ctx context.Context, response *mission.Message) (r bool, err error) { func (p *DispatchHandle) Feedback(ctx context.Context, response *mission.Message) (r bool, err error) {
return p.dispatchService.Feedback(response) return p.dispatchService.Feedback(ctx, response)
} }
// client登录接口 // client登录接口
func (p *DispatchHandle) Login(ctx context.Context, device *device.Device) (r bool, err error) { func (p *DispatchHandle) Login(ctx context.Context, device *device.Device) (r bool, err error) {
return p.dispatchService.Login(device) return p.dispatchService.Login(ctx, device)
} }
func (p *DispatchHandle) Dispatch(ctx context.Context, request *mission.Message) (err error) { func (p *DispatchHandle) Dispatch(ctx context.Context, request *mission.Message) (err error) {
return p.dispatchService.Dispatch(request) return p.dispatchService.Dispatch(ctx, request)
} }
type myProcessorFactory struct { type myProcessorFactory struct {
......
...@@ -19,13 +19,12 @@ type DispatchMsg struct { ...@@ -19,13 +19,12 @@ type DispatchMsg struct {
clientManager *model.ClientManager clientManager *model.ClientManager
msgControl *model.MsgControl msgControl *model.MsgControl
callbackClient *service.DispatchServiceClient callbackClient *service.DispatchServiceClient
deviceDispatch *DeviceDispatch
} }
func NewDispatchMsgTrans(trans thrift.TTransport) *DispatchMsg { func NewDispatchMsgTrans(trans thrift.TTransport) *DispatchMsg {
d := &DispatchMsg{} d := &DispatchMsg{}
d.trans = trans d.trans = trans
d.deviceDispatch = NewDeviceDispatch()
d.callbackClient = service.NewDispatchServiceClientFactory(thrift.NewTFramedTransport(trans), thrift.NewTCompactProtocolFactory()) d.callbackClient = service.NewDispatchServiceClientFactory(thrift.NewTFramedTransport(trans), thrift.NewTCompactProtocolFactory())
return d return d
} }
...@@ -35,7 +34,6 @@ func NewDispatchMsg(trans thrift.TTransport, clientManager *model.ClientManager, ...@@ -35,7 +34,6 @@ func NewDispatchMsg(trans thrift.TTransport, clientManager *model.ClientManager,
d.clientManager = clientManager d.clientManager = clientManager
d.msgControl = msgControl d.msgControl = msgControl
d.trans = trans d.trans = trans
d.deviceDispatch = NewDeviceDispatch()
d.callbackClient = service.NewDispatchServiceClientFactory(thrift.NewTFramedTransport(trans), thrift.NewTCompactProtocolFactory()) d.callbackClient = service.NewDispatchServiceClientFactory(thrift.NewTFramedTransport(trans), thrift.NewTCompactProtocolFactory())
return d return d
} }
...@@ -45,21 +43,22 @@ func (p *DispatchMsg) SetDispatch() error { ...@@ -45,21 +43,22 @@ func (p *DispatchMsg) SetDispatch() error {
} }
// client心跳接口 // client心跳接口
func (d *DispatchMsg) Heartbeat(whom *device.Device) (r bool, err error) { func (d *DispatchMsg) Heartbeat(ctx context.Context, whom *device.Device) (r bool, err error) {
key := whom.ID key := whom.ID
if d.clientManager.IsKey(key) { if d.clientManager.IsKey(key) {
d.clientManager.UpdateHartTime(key) d.clientManager.UpdateHartTime(key)
err = d.deviceDispatch.SetDispatch() deviceDispatch := NewDeviceDispatch()
err = deviceDispatch.SetDispatch()
if err != nil { if err != nil {
return return
} }
return d.deviceDispatch.Keep(whom, config.GetAppIp()) return deviceDispatch.Keep(ctx, whom, config.GetAppIp())
} }
return true, nil return true, nil
} }
// client消息回调接口 // client消息回调接口
func (d *DispatchMsg) Feedback(response *mission.Message) (r bool, err error) { func (d *DispatchMsg) Feedback(ctx context.Context, response *mission.Message) (r bool, err error) {
serverName := response.Proto.String() serverName := response.Proto.String()
server, err := config.GetCallBackServerAddress(serverName) server, err := config.GetCallBackServerAddress(serverName)
if err != nil { if err != nil {
...@@ -88,7 +87,7 @@ func (d *DispatchMsg) Feedback(response *mission.Message) (r bool, err error) { ...@@ -88,7 +87,7 @@ func (d *DispatchMsg) Feedback(response *mission.Message) (r bool, err error) {
} }
// client登录接口 // client登录接口
func (d *DispatchMsg) Login(device *device.Device) (r bool, err error) { func (d *DispatchMsg) Login(ctx context.Context, device *device.Device) (r bool, err error) {
//TODO 写路由信息到设备服务 ,待接入 eureka获取ip 和端口 //TODO 写路由信息到设备服务 ,待接入 eureka获取ip 和端口
go func() { go func() {
//判断是否存在 //判断是否存在
...@@ -101,8 +100,8 @@ func (d *DispatchMsg) Login(device *device.Device) (r bool, err error) { ...@@ -101,8 +100,8 @@ func (d *DispatchMsg) Login(device *device.Device) (r bool, err error) {
//写入客户端Map - 或者更新map 的 //写入客户端Map - 或者更新map 的
d.clientManager.SetClient(key, client) d.clientManager.SetClient(key, client)
//TODO 写路由信息到设备服务 ,待接入 eureka获取ip 和端口 //TODO 写路由信息到设备服务 ,待接入 eureka获取ip 和端口
deviceDispatch := NewDeviceDispatch()
err := d.deviceDispatch.SetDispatch() err := deviceDispatch.SetDispatch()
if err != nil { if err != nil {
d.clientManager.DeleteClient(key) d.clientManager.DeleteClient(key)
return return
...@@ -112,7 +111,7 @@ func (d *DispatchMsg) Login(device *device.Device) (r bool, err error) { ...@@ -112,7 +111,7 @@ func (d *DispatchMsg) Login(device *device.Device) (r bool, err error) {
temp := device temp := device
ip := config.GetAppIp() ip := config.GetAppIp()
return func() error { return func() error {
_, err := d.deviceDispatch.Activate(temp, ip) _, err := deviceDispatch.Activate(ctx, temp, ip)
return err return err
} }
}()) }())
...@@ -125,12 +124,12 @@ func (d *DispatchMsg) Login(device *device.Device) (r bool, err error) { ...@@ -125,12 +124,12 @@ func (d *DispatchMsg) Login(device *device.Device) (r bool, err error) {
} }
// Dispatch 消息推送给client // Dispatch 消息推送给client
func (p *DispatchMsg) Dispatch(request *mission.Message) (err error) { func (p *DispatchMsg) Dispatch(ctx context.Context, request *mission.Message) (err error) {
log.Info("PushClientDispatch start ", request) log.Info("PushClientDispatch start ", request)
buff := thrift.NewTMemoryBuffer() buff := thrift.NewTMemoryBuffer()
proto := thrift.NewTBinaryProtocol(buff, true, true) proto := thrift.NewTBinaryProtocol(buff, true, true)
request.Write(proto) request.Write(proto)
err = p.callbackClient.Dispatch(context.Background(), request) err = p.callbackClient.Dispatch(ctx, request)
log.Info("PushClientDispatch end ", err) log.Info("PushClientDispatch end ", err)
return err return err
} }
...@@ -31,10 +31,10 @@ func NewPackageDispatch() *PackageDispatch { ...@@ -31,10 +31,10 @@ func NewPackageDispatch() *PackageDispatch {
// 接受消息 // 接受消息
func (s *PackageHandle) GetSpec(ctx context.Context, pkgName string, ver string, partnerId string) (r *pkg.PackageSpec, err error) { func (s *PackageHandle) GetSpec(ctx context.Context, pkgName string, ver string, partnerId string) (r *pkg.PackageSpec, err error) {
return s.PackageHandler.GetSpec(pkgName, ver, partnerId) return s.PackageHandler.GetSpec(ctx, pkgName, ver, partnerId)
} }
func (s *PackageDispatch) GetSpec(pkgName string, ver string, partnerId string) (r *pkg.PackageSpec, err error) { func (s *PackageDispatch) GetSpec(ctx context.Context, pkgName string, ver string, partnerId string) (r *pkg.PackageSpec, err error) {
log.Info("GetSpec", pkgName, ver) log.Info("GetSpec", pkgName, ver)
fcos := FileController.NewOss() fcos := FileController.NewOss()
md5, url := fcos.GetFileInfo(pkgName) md5, url := fcos.GetFileInfo(pkgName)
......
...@@ -36,9 +36,9 @@ func NewScheduleDispatch() *ScheduleDispatch { ...@@ -36,9 +36,9 @@ func NewScheduleDispatch() *ScheduleDispatch {
// 接受消息 // 接受消息
func (s *ScheduleHandle) Schedule(ctx context.Context, request *mission.Message) (r bool, err error) { func (s *ScheduleHandle) Schedule(ctx context.Context, request *mission.Message) (r bool, err error) {
log.Info("Schedule", request) log.Info("Schedule", request)
return s.scheduleHandler.Schedule(request) return s.scheduleHandler.Schedule(ctx, request)
} }
func (s *ScheduleDispatch) Schedule(request *mission.Message) (r bool, err error) { func (s *ScheduleDispatch) Schedule(ctx context.Context, request *mission.Message) (r bool, err error) {
return true, nil return true, nil
} }
...@@ -2,6 +2,7 @@ package main ...@@ -2,6 +2,7 @@ package main
import ( import (
"bufio" "bufio"
"context"
"ficus/device" "ficus/device"
"ficus/mission" "ficus/mission"
native "ficus/native/service" native "ficus/native/service"
...@@ -142,7 +143,7 @@ func StartClientHeartWork() { ...@@ -142,7 +143,7 @@ func StartClientHeartWork() {
continue continue
} }
ip := config.GetAppIp() ip := config.GetAppIp()
res, err := deviceDispatch.ListExpired(int32(config.GetHeartTimeout()), ip) res, err := deviceDispatch.ListExpired(context.Background(), int32(config.GetHeartTimeout()), ip)
if err != nil || res == nil || res.Devices == nil { if err != nil || res == nil || res.Devices == nil {
log.Error("HandExpiredDevice", err) log.Error("HandExpiredDevice", err)
} else { } else {
...@@ -155,7 +156,7 @@ func StartClientHeartWork() { ...@@ -155,7 +156,7 @@ func StartClientHeartWork() {
log.Info("连接创建失败 ", dd.ID, err) log.Info("连接创建失败 ", dd.ID, err)
return return
} }
if OK, err := devicedispatch.Deactivate(dd.ID, ip); !OK { if OK, err := devicedispatch.Deactivate(context.Background(), dd.ID, ip); !OK {
log.Info("终端没有上线 id ", dd.ID, err) log.Info("终端没有上线 id ", dd.ID, err)
} }
return return
...@@ -165,7 +166,7 @@ func StartClientHeartWork() { ...@@ -165,7 +166,7 @@ func StartClientHeartWork() {
log.Info("连接创建失败 ", dd.ID, err) log.Info("连接创建失败 ", dd.ID, err)
return return
} }
if OK, err := devicedispatch.Deactivate(dd.ID, ip); !OK { if OK, err := devicedispatch.Deactivate(context.Background(), dd.ID, ip); !OK {
log.Info("终端已经掉线 ", dd.ID, err) log.Info("终端已经掉线 ", dd.ID, err)
} }
return return
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment