Commit 71fb432a by yunpeng.song

逻辑调整

parent 6ad1c52e
......@@ -40,13 +40,18 @@ type (
}
CFG struct {
RedisKey RedisKey `json:"prefix"`
AppCfg AppConfig `json:"appcfg"`
ThriftPort ServerPortCFG `json:"thriftport"`
EurekaKey EurekaServer `json:"eurekakey"`
HeartTime HeartTime `json:"hearttime"`
MsgStoreTime int `json:"msgStoreTime"` // 消息保存时间
Redis RedisCfg `json:"redis"`
RedisKey RedisKey `json:"prefix"`
AppCfg AppConfig `json:"appcfg"`
ThriftPort ServerPortCFG `json:"thriftport"`
EurekaKey EurekaServer `json:"eurekakey"`
HeartTime HeartTime `json:"hearttime"`
MsgStoreTime int `json:"msgStoreTime"` // 消息保存时间
Redis RedisCfg `json:"redis"`
CallBack CallBackServer `json:"CallBackServer"`
}
CallBackServer struct {
Server map[string]string
}
)
......
......@@ -120,6 +120,14 @@ func GetMultServerPort() string {
return net.JoinHostPort("127.0.0.1", ThriftPort.MultServer)
}
func GetCallBackServerAddress(server string) (string, error) {
address, ok := DefaultConfig.CallBack.Server[server]
if !ok {
return "", errors.New("服务不存在")
}
return address, nil
}
// 获取应用路径
func getCurrentPath() string {
s, err := exec.LookPath(os.Args[0])
......
......@@ -83,12 +83,13 @@ func (p *MsgSender) ReadMsgSendChan() {
code = msg.Message.State.Code
}
if code == mission.MissionStatusCode_DISPATCHED {
push := &thriftcontrol.MyPushMsg{}
client, err := p.clientManager.Get(msg.Message.Device)
if err != nil || !client.OnLine {
p.SendRetryMsg(msg)
} else {
if err := push.PushClientDispatch(client.Trans, msg.Message); err != nil {
push := thriftcontrol.NewMyPushMsg(client.Trans)
if err := push.PushClientDispatch(msg.Message); err != nil {
p.SendRetryMsg(msg)
}
}
......
......@@ -39,27 +39,21 @@ type DeviceDispatchMiddleware struct {
// DeviceDispatch 设备服务client
// ip ,port 分别为设备服务的地址和端口
type DeviceDispatch struct {
IP string
Port string
client *service.DeviceServiceClient
tran *thrift.TFramedTransport
}
// NewDeviceDispatch 生成新的DeviceDispatch
func NewDeviceDispatch(ip, port string) *DeviceDispatch {
func NewDeviceDispatch() *DeviceDispatch {
d := &DeviceDispatch{}
d.IP = ip
d.Port = port
return d
}
// Identify 获取设备的绑定门店信息
func (d *DeviceDispatch) Identify(uuid string) (r *identity.Organization, err error) {
log.Info("Identify start ", uuid)
client, tran, err := d.CreateClient(d.IP, d.Port)
if err != nil {
return nil, err
}
defer tran.Close()
res, err := client.Identify(context.Background(), uuid)
defer d.tran.Close()
res, err := d.client.Identify(context.Background(), uuid)
if err != nil {
log.Error("Identify", err)
return nil, err
......@@ -71,12 +65,8 @@ func (d *DeviceDispatch) Identify(uuid string) (r *identity.Organization, err er
// Bind 绑定设备门店信息
func (d *DeviceDispatch) Bind(uuid string, device *identity.Organization) (r bool, err error) {
log.Info("Bind start ", device)
client, tran, err := d.CreateClient(d.IP, d.Port)
if err != nil {
return false, err
}
defer tran.Close()
res, err := client.Bind(context.Background(), uuid, device)
defer d.tran.Close()
res, err := d.client.Bind(context.Background(), uuid, device)
if err != nil {
log.Error("Bind", err)
return false, err
......@@ -88,12 +78,8 @@ func (d *DeviceDispatch) Bind(uuid string, device *identity.Organization) (r boo
// Unbind 绑定设备门店信息
func (d *DeviceDispatch) Unbind(uuid string) (r bool, err error) {
log.Info("Unbind start ", uuid)
client, tran, err := d.CreateClient(d.IP, d.Port)
if err != nil {
return false, err
}
defer tran.Close()
res, err := client.Unbind(context.Background(), uuid)
defer d.tran.Close()
res, err := d.client.Unbind(context.Background(), uuid)
if err != nil {
log.Error("Unbind", err)
return false, err
......@@ -105,12 +91,8 @@ func (d *DeviceDispatch) Unbind(uuid string) (r bool, err error) {
// Activate 设备上线
func (d *DeviceDispatch) Activate(device *device.Device, ip string) (r bool, err error) {
log.Info("activate start ", device)
client, tran, err := d.CreateClient(d.IP, d.Port)
if err != nil {
return false, err
}
defer tran.Close()
res, err := client.Activate(context.Background(), device, ip)
defer d.tran.Close()
res, err := d.client.Activate(context.Background(), device, ip)
if err != nil {
log.Error("Activate", err)
return false, err
......@@ -122,12 +104,8 @@ func (d *DeviceDispatch) Activate(device *device.Device, ip string) (r bool, err
// Deactivate 设备下线
func (d *DeviceDispatch) Deactivate(uuid string) (r bool, err error) {
log.Info("Deactivate start ", uuid)
client, tran, err := d.CreateClient(d.IP, d.Port)
if err != nil {
return false, err
}
defer tran.Close()
res, err := client.Deactivate(context.Background(), uuid, config.GetAppIp())
defer d.tran.Close()
res, err := d.client.Deactivate(context.Background(), uuid, config.GetAppIp())
if err != nil {
log.Error(err)
return false, err
......@@ -139,12 +117,8 @@ func (d *DeviceDispatch) Deactivate(uuid string) (r bool, err error) {
// Keep 心跳接口
func (d *DeviceDispatch) Keep(v *device.Device) (r bool, err error) {
log.Info("Keep start ", v)
client, tran, err := d.CreateClient(d.IP, d.Port)
if err != nil {
return false, err
}
defer tran.Close()
res, err := client.Keep(context.Background(), v, config.GetAppIp())
defer d.tran.Close()
res, err := d.client.Keep(context.Background(), v, config.GetAppIp())
if err != nil {
log.Error(err)
return false, err
......@@ -173,16 +147,32 @@ func (d *DeviceDispatch) ListByRegion(rgn string) (r *service.DevicesPerPage, er
return nil, nil
}
// 获取空闲设备
func (d *DeviceDispatch) ListExpired(seconds int32) (r *service.DevicesPerPage, err error) {
log.Info("ListExpired start")
defer d.tran.Close()
res, err := d.client.ListExpired(context.Background(), seconds, config.GetAppIp())
if err != nil {
return nil, err
}
log.Info("ListExpired end")
return res, nil
}
// 处理空闲设备,实现由上到下的心跳机制
func HandExpiredDevice(msgControl *model.MsgControl, clientManager *model.ClientManager) {
// TODO 获取服务ip
log.Info("HandExpiredDevice start")
ip, port, err := tool.GetApp(config.GetDeviceServerName())
if err != nil {
return
}
d := NewDeviceDispatch(ip, port)
for {
ip, port, err := tool.GetApp(config.GetDeviceServerName())
if err != nil {
continue
}
d := NewDeviceDispatch()
err = d.CreateClient(ip, port)
if err != nil {
continue
}
res, err := d.ListExpired(int32(config.GetHeartTimeout()))
if err != nil {
log.Error("HandExpiredDevice", err)
......@@ -217,42 +207,26 @@ func HandExpiredDevice(msgControl *model.MsgControl, clientManager *model.Client
}
}
// 获取空闲设备
func (d *DeviceDispatch) ListExpired(seconds int32) (r *service.DevicesPerPage, err error) {
log.Info("ListExpired start")
client, tran, err := d.CreateClient(d.IP, d.Port)
if err != nil {
return nil, err
}
defer tran.Close()
res, err := client.ListExpired(context.Background(), seconds, config.GetAppIp())
if err != nil {
return nil, err
}
log.Info("ListExpired end")
return res, nil
}
// 创建client
func (d *DeviceDispatch) CreateClient(ip, port string) (*service.DeviceServiceClient, *thrift.TFramedTransport, error) {
func (d *DeviceDispatch) CreateClient(ip, port string) error {
//TODO 心跳
socket, err := thrift.NewTSocket(net.JoinHostPort(ip, port))
if err != nil {
log.Error("Error opening socket:", err)
return nil, nil, err
return err
}
transport := thrift.NewTFramedTransport(socket)
// 创建二进制协议
protocolFactory := thrift.NewTCompactProtocolFactory()
// 打开Transport,与服务器进行连接
if err := transport.Open(); err != nil {
log.Error("Error opening socket to "+"localhost"+":"+"9999", err)
return nil, nil, err
return err
}
iprot := protocolFactory.GetProtocol(socket)
oprot := protocolFactory.GetProtocol(socket)
client := service.NewDeviceServiceClient(thrift.NewTStandardClient(iprot, oprot))
return client, transport, nil
d.tran = transport
d.client = client
return nil
}
......@@ -11,45 +11,56 @@ import (
type (
DeviceHandle struct {
deviceDispatch *DeviceDispatch
}
)
func NewDeviceHandle() *DeviceHandle {
d := DeviceHandle{}
d.deviceDispatch = NewDeviceDispatch()
return &d
}
func (d *DeviceHandle) SetDispatch() error {
ip, port, err := tool.GetApp(config.GetDeviceServerName())
if err != nil {
return err
}
err = d.deviceDispatch.CreateClient(ip, port)
if err != nil {
return err
}
return nil
}
// client 获取绑定门店信息接口
func (d *DeviceHandle) Identify(ctx context.Context, uuid string) (r *identity.Organization, err error) {
//TODO 调用设备服务获取绑定的门店信息
ip, port, err := tool.GetApp(config.GetDeviceServerName())
err = d.SetDispatch()
if err != nil {
return nil, err
}
dd := NewDeviceDispatch(ip, port)
return dd.Identify(uuid)
return d.deviceDispatch.Identify(uuid)
}
// client 绑定门店信息接口
func (d *DeviceHandle) Bind(ctx context.Context, uuid string, device *identity.Organization) (r bool, err error) {
//TODO 调用设备服务绑定/解绑设备
ip, port, err := tool.GetApp(config.GetDeviceServerName())
err = d.SetDispatch()
if err != nil {
return false, err
}
dd := NewDeviceDispatch(ip, port)
return dd.Bind(uuid, device)
return d.deviceDispatch.Bind(uuid, device)
}
// client 接触绑定接口
func (d *DeviceHandle) Unbind(ctx context.Context, uuid string) (r bool, err error) {
//TODO 解除绑定
ip, port, err := tool.GetApp(config.GetDeviceServerName())
err = d.SetDispatch()
if err != nil {
return false, err
}
dd := NewDeviceDispatch(ip, port)
return dd.Unbind(uuid)
return d.deviceDispatch.Unbind(uuid)
}
func (d *DeviceHandle) Activate(ctx context.Context, device *device.Device, ip string) (r bool, err error) {
......@@ -59,13 +70,11 @@ func (d *DeviceHandle) Activate(ctx context.Context, device *device.Device, ip s
// client 下线接口
func (d *DeviceHandle) Deactivate(ctx context.Context, uuid string, ip string) (r bool, err error) {
ip, port, err := tool.GetApp(config.GetDeviceServerName())
err = d.SetDispatch()
if err != nil {
return false, err
}
dd := NewDeviceDispatch(ip, port)
return dd.Deactivate(uuid)
return false, nil
return d.deviceDispatch.Deactivate(uuid)
}
func (d *DeviceHandle) Keep(ctx context.Context, device *device.Device, ip string) (r bool, err error) {
//TODO 心跳
......
......@@ -16,32 +16,56 @@ import (
)
type MyDispatchService struct {
trans thrift.TTransport
clientManager *model.ClientManager
msgControl *model.MsgControl
trans thrift.TTransport
clientManager *model.ClientManager
msgControl *model.MsgControl
deviceDispatch *DeviceDispatch
}
func NewMyDispatchService(trans thrift.TTransport, clientManager *model.ClientManager, msgControl *model.MsgControl) *MyDispatchService {
p := &MyDispatchService{trans: trans, clientManager: clientManager, msgControl: msgControl}
p.deviceDispatch = NewDeviceDispatch()
return p
}
func (d *MyDispatchService) SetDispatch() error {
ip, port, err := tool.GetApp(config.GetDeviceServerName())
if err != nil {
return err
}
err = d.deviceDispatch.CreateClient(ip, port)
if err != nil {
return err
}
return nil
}
// client心跳接口
func (p *MyDispatchService) Heartbeat(ctx context.Context, whom *device.Device) (r bool, err error) {
log.WithFields(log.Fields{"func": "Heartbeat", "心跳id": whom.ID}).Info(whom)
ip, port, err := tool.GetApp(config.GetDeviceServerName())
err = p.SetDispatch()
if err != nil {
return true, nil
return false, err
}
d := NewDeviceDispatch(ip, port)
return d.Keep(whom)
return p.deviceDispatch.Keep(whom)
}
// client消息回调接口
func (p *MyDispatchService) Feedback(ctx context.Context, response *mission.Message) (r bool, err error) {
log.WithFields(log.Fields{"data": string(response.Data)}).Info(response)
// TODO 更新消息状态
p.msgControl.UpdateMessage(response)
serverName := response.Proto.String()
server, err := config.GetCallBackServerAddress(serverName)
if err != nil {
log.Errorln(serverName, err)
}
tool.Retry(3, time.Second*2, func() error {
// TODO 回调消息
log.Infoln(server)
p.msgControl.DeleteMsg(response.ID)
return nil
})
//p.msgControl.UpdateMessage(response)
return true, nil
}
......@@ -65,18 +89,17 @@ func (p *MyDispatchService) Login(ctx context.Context, device *device.Device) (r
p.clientManager.SetClient(key, &client)
}
//TODO 写路由信息到设备服务 ,待接入 eureka获取ip 和端口
ip, port, err := tool.GetApp(config.GetDeviceServerName())
err := p.SetDispatch()
if err != nil {
return
}
d := NewDeviceDispatch(ip, port)
log.Info("本机ip", config.GetAppIp())
// 请求失败重试机制
err = tool.Retry(5, time.Second*2, func() func() error {
temp := device
ip := config.GetAppIp()
return func() error {
_, err := d.Activate(temp, ip)
_, err := p.deviceDispatch.Activate(temp, ip)
return err
}
}())
......
......@@ -9,16 +9,23 @@ import (
log "github.com/sirupsen/logrus"
)
type MyPushMsg struct{}
type MyPushMsg struct {
callbackClient *service.DispatchServiceClient
}
func NewMyPushMsg(trans thrift.TTransport) *MyPushMsg {
push := &MyPushMsg{}
push.callbackClient = service.NewDispatchServiceClientFactory(thrift.NewTFramedTransport(trans), thrift.NewTCompactProtocolFactory())
return push
}
// 消息推送给client
func (p *MyPushMsg) PushClientDispatch(trans thrift.TTransport, mis *mission.Message) error {
func (p *MyPushMsg) PushClientDispatch(mis *mission.Message) error {
log.Info("PushClientDispatch start ", mis)
callbackClient := service.NewDispatchServiceClientFactory(thrift.NewTFramedTransport(trans), thrift.NewTCompactProtocolFactory())
buff := thrift.NewTMemoryBuffer()
proto := thrift.NewTBinaryProtocol(buff, true, true)
mis.Write(proto)
err := callbackClient.Dispatch(context.Background(), mis)
err := p.callbackClient.Dispatch(context.Background(), mis)
log.Info("PushClientDispatch end ", err)
return err
}
......@@ -117,7 +117,7 @@ func GetApp(app string) (ip, port string, err error) {
a, err = eurekaConn.GetApp(app)
}
if err != nil {
return "", "", nil
return "", "", err
}
eurekaApp.AddApplication(app, a)
size := len(a.Instances)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment