Commit 72ac78e7 by yunpeng.song

日志优化,thrift接口更新,thrift请求优化

parent b44171d0
......@@ -88,10 +88,13 @@ func (p *MsgSender) WriteMsgToSendChan(msg *model.Msg) {
}
// AddMsgTries 判断消息的重试次数
func (p *MsgSender) AddMsgTries(msg *model.Msg) bool {
func (p *MsgSender) AddMsgTries(msg *model.Msg, r bool) bool {
if msg.TriesAddOne() {
log.Info("已经达到最大尝试次数", msg.MsgId)
go p.MsgFeedBack(msg)
log.Debug("已经达到最大尝试次数", msg.MsgId)
if !r {
log.Debug("消息回调", msg.MsgId)
go p.MsgFeedBack(msg)
}
return true
}
return false
......@@ -109,6 +112,7 @@ func (p *MsgSender) MsgFeedBack(msg *model.Msg) {
}
}())
}
log.Debug("删除消息", msg.MsgId)
p.msgControl.DeleteMsg(msg.MsgId)
}
......@@ -131,9 +135,11 @@ func (p *MsgSender) DispatchMsg(msg *model.Msg) (r bool, err error) {
push := thriftservice.NewDispatchMsgTrans(client.Trans)
if err = push.Dispatch(context.Background(), msg.Message); err != nil {
p.SendRetryMsg(msg)
return true, nil
} else {
r = true
}
}
log.Debug("消息推送 result ", r, err)
return
}
......@@ -145,14 +151,16 @@ func (p *MsgSender) ReadMsgSendChan() {
//判断消息是否存在
msg, ok := p.msgControl.GetMsgFromMap(msgId)
if !ok {
log.Info("消息ID:", msgId, "不存在")
log.Debug("消息ID:", msgId, "不存在")
return
}
log.Info("消息ID:", msgId, "存在")
log.Debug("消息ID:", msgId, "存在")
//只推送状态未DISPATCHED 的消息 给client
if p.IsMatch(msg) {
p.DispatchMsg(msg)
p.AddMsgTries(msg)
r, _ := p.DispatchMsg(msg)
p.AddMsgTries(msg, r)
} else {
p.msgControl.DeleteMsg(msg.MsgId)
}
}(rm)
}
......
......@@ -73,7 +73,7 @@ func (p *ClientManager) Get(k string) (*FicusClient, error) {
func (p *ClientManager) SetClient(k string, v *FicusClient) {
p.Lock.Lock()
defer p.Lock.Unlock()
log.Println("add client", k)
log.Info("add client", k)
p.mapClient[k] = v
p.AllCount++
}
......@@ -82,7 +82,7 @@ func (p *ClientManager) SetClient(k string, v *FicusClient) {
func (p *ClientManager) AddTimes(k string) bool {
p.Lock.Lock()
defer p.Lock.Unlock()
log.Println(" AddTimes", k)
log.Debug(" AddTimes", k)
v, ok := p.mapClient[k]
if ok {
v.LostTimes++
......@@ -103,7 +103,7 @@ func (p *ClientManager) AddTimes(k string) bool {
func (p *ClientManager) ClearTimes(k string) {
p.Lock.Lock()
defer p.Lock.Unlock()
log.Println(" ClearTimes", k)
log.Debug(" ClearTimes", k)
v, ok := p.mapClient[k]
if ok {
v.LostTimes = 0
......@@ -149,7 +149,7 @@ func (p *ClientManager) IsKey(key string) bool {
func (p *ClientManager) UpdateDevice(key string, device *device.Device, trans thrift.TTransport) bool {
p.Lock.Lock()
defer p.Lock.Unlock()
log.Println(" UpdateDevice", key)
log.Debug(" UpdateDevice", key)
v, ok := p.mapClient[key]
if ok {
v.Device = device
......
......@@ -54,7 +54,7 @@ func (p *MsgControl) LoadRedisMsg() {
if err != nil {
continue
}
log.Println("load msg ", m.MsgId)
log.Debug("load msg ", m.MsgId)
// 增加消息
serverName := m.Message.Proto
if serverName != proto.Type_HEARTBEAT {
......@@ -83,10 +83,10 @@ func (p *MsgControl) HandleMsgList() {
func (p *MsgControl) setTimerCall(msg *Msg) {
t := time.Unix(msg.TimeSecond, int64(0))
d := time.Until(t.AddDate(0, 0, config.GetMsgStoreTime()))
log.Info("时间差", d, msg.TimeSecond, t)
log.Debug("时间差", d, msg.TimeSecond, t)
if d > 0 {
tr := tool.AddCallback(d, func() {
log.Info("删除消息", msg.MsgId)
log.Debug("删除消息", msg.MsgId)
go p.DeleteMsg(msg.MsgId)
})
p.timerMap.Store(msg.MsgId, tr)
......@@ -132,7 +132,7 @@ func (p *MsgControl) GetMsg(k string) *Msg {
// AddMsg 增加Msg
func (p *MsgControl) AddMsg(msg *Msg) {
p.msgMap.addMsg(msg)
p.setTimerCall(msg)
//p.setTimerCall(msg)
p.sendMsg(msg.MsgId)
}
......@@ -157,10 +157,12 @@ func (p *MsgControl) AddMessage(m *mission.Message) {
msg.TimeSecond = time.Now().Unix()
msg.Lastupdatetime = msg.TimeSecond
p.AddMsg(msg)
err := p.redisClient.WriteMsgToRedis(msg)
if err != nil {
log.Info("message write to redis failed ", err)
return
if m.Proto != proto.Type_HEARTBEAT {
err := p.redisClient.WriteMsgToRedis(msg)
if err != nil {
log.Debug("message write to redis failed ", err)
return
}
}
// p.setTimerCall(msg)
// p.sendMsg(msg)
......
......@@ -19,14 +19,14 @@ type RedisClient struct {
func (p *RedisClient) setMsgToRedis(msg *Msg) error {
jsonstr, _ := json.Marshal(msg)
kname := config.RedisKeyConfig.MsgStatusName
log.WithFields(log.Fields{"func": "setRedisMsg", "kname": kname, "msgId": msg.MsgId}).Info(string(jsonstr))
log.WithFields(log.Fields{"func": "setRedisMsg", "kname": kname, "msgId": msg.MsgId}).Debug(string(jsonstr))
return HSet(p.redisClient, kname, msg.MsgId, jsonstr)
}
// 从redis删除msgId 的消息
func (p *RedisClient) deleteMsgToRedis(msgId string) {
kname := config.RedisKeyConfig.MsgStatusName
log.WithFields(log.Fields{"func": "deleteMsgToRedis", "kname": kname, "msgId": msgId}).Info()
log.WithFields(log.Fields{"func": "deleteMsgToRedis", "kname": kname, "msgId": msgId}).Debug()
HDel(p.redisClient, kname, msgId)
}
......@@ -34,14 +34,14 @@ func (p *RedisClient) deleteMsgToRedis(msgId string) {
func (p *RedisClient) setMissionToRedis(msg *mission.Message) {
jsonstr, _ := json.Marshal(msg)
kname := config.RedisKeyConfig.MsgName
log.WithFields(log.Fields{"func": "setmissiontoredis", "kname": kname, "msgId": msg.ID}).Info(string(jsonstr))
log.WithFields(log.Fields{"func": "setmissiontoredis", "kname": kname, "msgId": msg.ID}).Debug(string(jsonstr))
HSet(p.redisClient, kname, msg.ID, jsonstr)
}
// 从redis 删除 message
func (p *RedisClient) deleteMissionToRedis(msgId string) {
kname := config.RedisKeyConfig.MsgName
log.WithFields(log.Fields{"func": "deleteMissionToRedis", "kname": kname, "msgId": msgId}).Info()
log.WithFields(log.Fields{"func": "deleteMissionToRedis", "kname": kname, "msgId": msgId}).Debug()
HDel(p.redisClient, kname, msgId)
}
......@@ -63,7 +63,6 @@ func (p *RedisClient) GetAllMsg(k string) map[string]string {
// 获取Message
func (p *RedisClient) GetMessage(k string) (*mission.Message, error) {
kname := config.RedisKeyConfig.MsgName
log.WithFields(log.Fields{"func": "GetMessage"}).Info(kname)
jsonstr, err := HGet(p.redisClient, kname, k)
if err != nil {
return nil, err
......@@ -108,7 +107,7 @@ func (p *RedisClient) UpdateMsgStatus(mis mission.Message) bool {
func (p *RedisClient) setMessage(msg mission.Message) bool {
jsonstr, _ := json.Marshal(msg)
kname := config.RedisKeyConfig.MsgName
log.WithFields(log.Fields{"func": "setMessage", "kname": kname, "msgId": msg.ID}).Info(string(jsonstr))
log.WithFields(log.Fields{"func": "setMessage", "kname": kname, "msgId": msg.ID}).Debug(string(jsonstr))
HSet(p.redisClient, kname, msg.ID, jsonstr)
return true
}
......@@ -120,7 +119,7 @@ func (p *RedisClient) setMsg(msg *Msg) bool {
fmt.Println("Update Redis Msg Status err:", err)
return false
}
log.WithFields(log.Fields{"func": "setMsg", "kname": kname, "msgId": msg.MsgId}).Info(string(jsonstr))
log.WithFields(log.Fields{"func": "setMsg", "kname": kname, "msgId": msg.MsgId}).Debug(string(jsonstr))
HSet(p.redisClient, kname, msg.MsgId, jsonstr)
return true
}
......@@ -128,7 +127,7 @@ func (p *RedisClient) setMsg(msg *Msg) bool {
func (p *RedisClient) GetMsg(k string) *Msg {
kname := config.RedisKeyConfig.MsgStatusName
jsonstr, _ := HGet(p.redisClient, kname, k)
log.Println(jsonstr)
log.Debug(jsonstr)
var msg Msg
json.Unmarshal([]byte(jsonstr), &msg)
return &msg
......
......@@ -18,8 +18,6 @@ type (
}
)
var t1 int = 0
func NewDeviceHandle(h DeviceHandler) DeviceHandler {
d := &DeviceHandle{h}
return d
......
......@@ -23,8 +23,12 @@ func NewPackageHandle(p PackageHandler) PackageHandler {
return h
}
func (p *PackageHandle) GetSpec(ctx context.Context, pkgName string, ver string, partnerId string) (r *pkg.PackageSpec, err error) {
return p.PackageHandler.GetSpec(ctx, pkgName, ver, partnerId)
func (p *PackageHandle) GetSpec(ctx context.Context, pkgName string, ver string) (r *pkg.PackageSpec, err error) {
return p.PackageHandler.GetSpec(ctx, pkgName, ver)
}
func (p *PackageHandle) GetSpecByPartner(ctx context.Context, pkgName string, ver string, partnerId string) (r *pkg.PackageSpec, err error) {
return p.PackageHandler.GetSpecByPartner(ctx, pkgName, ver, partnerId)
}
func (p *PackageHandle) SetDispatch() (err error) {
......
......@@ -32,11 +32,11 @@ func NewScheduleHandle(m *model.MsgControl, c *model.ClientManager, s ScheduleHa
}
func (s *ScheduleHandle) Schedule(ctx context.Context, request *mission.Message) (r bool, err error) {
log.Info("Schedule start ", request)
log.Info("Schedule", request)
if !s.clientManager.IsKey(request.Device) {
return false, errors.New("机器不在线")
}
go s.msgControl.AddMessage(request)
log.Info("Schedule end ")
log.Debug("Schedule end ")
return true, nil
}
......@@ -53,7 +53,6 @@ func (d *DeviceDispatch) Unbind(ctx context.Context, uuid string) (r bool, err e
// Activate 设备上线
func (d *DeviceDispatch) Activate(ctx context.Context, device *device.Device, ip string) (r bool, err error) {
defer d.tran.Close()
log.Info("dispatch Activate start")
r, err = d.client.Activate(ctx, device, ip)
log.Info("dispatch Activate", err, device, ip)
return
......
......@@ -12,14 +12,12 @@ import (
type (
DeviceHandle struct {
deviceDispatch nethandle.DeviceHandler
//deviceDispatch nethandle.DeviceHandler
}
)
func NewDeviceHandle() *DeviceHandle {
d := DeviceHandle{}
dispatch := NewDeviceDispatch()
d.deviceDispatch = nethandle.NewDeviceHandle(dispatch)
return &d
}
......@@ -27,21 +25,27 @@ func NewDeviceHandle() *DeviceHandle {
func (d *DeviceHandle) Identify(ctx context.Context, uuid string) (r *identity.Organization, err error) {
//TODO 调用设备服务获取绑定的门店信息
log.Info("Identify", uuid)
return d.deviceDispatch.Identify(ctx, uuid)
dispatch := NewDeviceDispatch()
deviceDispatch := nethandle.NewDeviceHandle(dispatch)
return deviceDispatch.Identify(ctx, uuid)
}
// client 绑定门店信息接口
func (d *DeviceHandle) Bind(ctx context.Context, uuid string, device *identity.Organization) (r bool, err error) {
//TODO 调用设备服务绑定/解绑设备
log.Info("Bind", uuid, device)
return d.deviceDispatch.Bind(ctx, uuid, device)
dispatch := NewDeviceDispatch()
deviceDispatch := nethandle.NewDeviceHandle(dispatch)
return deviceDispatch.Bind(ctx, uuid, device)
}
// client 接触绑定接口
func (d *DeviceHandle) Unbind(ctx context.Context, uuid string) (r bool, err error) {
//TODO 解除绑定
log.Info("Unbind", uuid)
return d.deviceDispatch.Unbind(ctx, uuid)
dispatch := NewDeviceDispatch()
deviceDispatch := nethandle.NewDeviceHandle(dispatch)
return deviceDispatch.Unbind(ctx, uuid)
}
func (d *DeviceHandle) Activate(ctx context.Context, device *device.Device, ip string) (r bool, err error) {
......@@ -52,7 +56,9 @@ func (d *DeviceHandle) Activate(ctx context.Context, device *device.Device, ip s
// client 下线接口
func (d *DeviceHandle) Deactivate(ctx context.Context, uuid string, ip string) (r bool, err error) {
log.Info("Deactivate", uuid, ip)
return d.deviceDispatch.Deactivate(ctx, uuid, ip)
dispatch := NewDeviceDispatch()
deviceDispatch := nethandle.NewDeviceHandle(dispatch)
return deviceDispatch.Deactivate(ctx, uuid, ip)
}
func (d *DeviceHandle) Keep(ctx context.Context, device *device.Device, ip string) (r bool, err error) {
//TODO 心跳
......
......@@ -12,33 +12,45 @@ import (
)
type DispatchHandle struct {
dispatchService nethandle.DispatchHandler
//dispatchService nethandle.DispatchHandler
trans thrift.TTransport
clientManager *model.ClientManager
msgControl *model.MsgControl
}
func NewDispatchService(trans thrift.TTransport, clientManager *model.ClientManager, msgControl *model.MsgControl) *DispatchHandle {
p := &DispatchHandle{}
d := NewDispatchMsg(trans, clientManager, msgControl)
p.dispatchService = nethandle.NewDispatchHandle(clientManager, msgControl, d)
p.trans = trans
p.clientManager = clientManager
p.msgControl = msgControl
return p
}
// client心跳接口
func (p *DispatchHandle) Heartbeat(ctx context.Context, whom *device.Device) (r bool, err error) {
return p.dispatchService.Heartbeat(ctx, whom)
d := NewDispatchMsg(p.trans, p.clientManager, p.msgControl)
dispatchService := nethandle.NewDispatchHandle(p.clientManager, p.msgControl, d)
return dispatchService.Heartbeat(ctx, whom)
}
// client消息回调接口
func (p *DispatchHandle) Feedback(ctx context.Context, response *mission.Message) (r bool, err error) {
return p.dispatchService.Feedback(ctx, response)
d := NewDispatchMsg(p.trans, p.clientManager, p.msgControl)
dispatchService := nethandle.NewDispatchHandle(p.clientManager, p.msgControl, d)
return dispatchService.Feedback(ctx, response)
}
// client登录接口
func (p *DispatchHandle) Login(ctx context.Context, device *device.Device) (r bool, err error) {
return p.dispatchService.Login(ctx, device)
d := NewDispatchMsg(p.trans, p.clientManager, p.msgControl)
dispatchService := nethandle.NewDispatchHandle(p.clientManager, p.msgControl, d)
return dispatchService.Login(ctx, device)
}
func (p *DispatchHandle) Dispatch(ctx context.Context, request *mission.Message) (err error) {
return p.dispatchService.Dispatch(ctx, request)
d := NewDispatchMsg(p.trans, p.clientManager, p.msgControl)
dispatchService := nethandle.NewDispatchHandle(p.clientManager, p.msgControl, d)
return dispatchService.Dispatch(ctx, request)
}
type myProcessorFactory struct {
......
......@@ -125,11 +125,10 @@ func (d *DispatchMsg) Login(ctx context.Context, device *device.Device) (r bool,
// Dispatch 消息推送给client
func (p *DispatchMsg) Dispatch(ctx context.Context, request *mission.Message) (err error) {
log.Info("PushClientDispatch start ", request)
buff := thrift.NewTMemoryBuffer()
proto := thrift.NewTBinaryProtocol(buff, true, true)
request.Write(proto)
err = p.callbackClient.Dispatch(ctx, request)
log.Info("PushClientDispatch end ", err)
log.Info("PushClientDispatch end ", err, "request context", request)
return err
}
......@@ -11,7 +11,7 @@ import (
type (
PackageHandle struct {
PackageHandler nethandle.PackageHandler
//PackageHandler nethandle.PackageHandler
}
PackageDispatch struct {
......@@ -20,7 +20,6 @@ type (
func NewPackageHandle() *PackageHandle {
p := &PackageHandle{}
p.PackageHandler = nethandle.NewPackageHandle(NewPackageDispatch())
return p
}
......@@ -30,11 +29,17 @@ func NewPackageDispatch() *PackageDispatch {
}
// 接受消息
func (s *PackageHandle) GetSpec(ctx context.Context, pkgName string, ver string, partnerId string) (r *pkg.PackageSpec, err error) {
return s.PackageHandler.GetSpec(ctx, pkgName, ver, partnerId)
func (s *PackageHandle) GetSpec(ctx context.Context, pkgName string, ver string) (r *pkg.PackageSpec, err error) {
PackageHandler := nethandle.NewPackageHandle(NewPackageDispatch())
return PackageHandler.GetSpec(ctx, pkgName, ver)
}
func (s *PackageDispatch) GetSpec(ctx context.Context, pkgName string, ver string, partnerId string) (r *pkg.PackageSpec, err error) {
func (s *PackageHandle) GetSpecByPartner(ctx context.Context, pkgName string, ver string, partnerId string) (r *pkg.PackageSpec, err error) {
PackageHandler := nethandle.NewPackageHandle(NewPackageDispatch())
return PackageHandler.GetSpecByPartner(ctx, pkgName, ver, partnerId)
}
func (s *PackageDispatch) GetSpec(ctx context.Context, pkgName string, ver string) (r *pkg.PackageSpec, err error) {
log.Info("GetSpec", pkgName, ver)
fcos := FileController.NewOss()
md5, url := fcos.GetFileInfo(pkgName)
......@@ -42,6 +47,19 @@ func (s *PackageDispatch) GetSpec(ctx context.Context, pkgName string, ver strin
r.Pkg = pkg.NewPackage()
r.Pkg.Name = pkgName
r.Pkg.Version = ver
r.Md5 = md5
r.URL = url
return
}
func (s *PackageDispatch) GetSpecByPartner(ctx context.Context, pkgName string, ver string, partnerId string) (r *pkg.PackageSpec, err error) {
log.Info("GetSpecByPartner", pkgName, ver)
fcos := FileController.NewOss()
md5, url := fcos.GetFileInfo(pkgName)
r = pkg.NewPackageSpec()
r.Pkg = pkg.NewPackage()
r.Pkg.Name = pkgName
r.Pkg.Version = ver
r.Pkg.PartnerId = partnerId
r.Md5 = md5
r.URL = url
......
......@@ -24,7 +24,6 @@ func NewScheduleHandle(msgControl *model.MsgControl, clientManager *model.Client
s := &ScheduleHandle{}
s.msgControl = msgControl
s.clientManager = clientManager
s.scheduleHandler = nethandle.NewScheduleHandle(msgControl, clientManager, NewScheduleDispatch())
return s
}
......@@ -36,7 +35,8 @@ func NewScheduleDispatch() *ScheduleDispatch {
// 接受消息
func (s *ScheduleHandle) Schedule(ctx context.Context, request *mission.Message) (r bool, err error) {
log.Info("Schedule", request)
return s.scheduleHandler.Schedule(ctx, request)
scheduleHandler := nethandle.NewScheduleHandle(s.msgControl, s.clientManager, NewScheduleDispatch())
return scheduleHandler.Schedule(ctx, request)
}
func (s *ScheduleDispatch) Schedule(ctx context.Context, request *mission.Message) (r bool, err error) {
......
......@@ -149,44 +149,7 @@ func StartClientHeartWork() {
} else {
log.Info("遍历设备")
for _, v := range res.Devices {
go func(dd *device.Device) {
devicedispatch := thriftservice.NewDeviceDispatch()
if !ClientManager.IsKey(dd.ID) {
if err = devicedispatch.SetDispatch(); err != nil {
log.Info("连接创建失败 ", dd.ID, err)
return
}
if OK, err := devicedispatch.Deactivate(context.Background(), dd.ID, ip); !OK {
log.Info("终端没有上线 id ", dd.ID, err)
}
return
}
if ClientManager.AddTimes(dd.ID) {
if err = devicedispatch.SetDispatch(); err != nil {
log.Info("连接创建失败 ", dd.ID, err)
return
}
if OK, err := devicedispatch.Deactivate(context.Background(), dd.ID, ip); !OK {
log.Info("终端已经掉线 ", dd.ID, err)
}
return
}
log.Info("空闲设备 id ", dd.ID)
m := mission.Message{}
id, _ := uuid.NewV4()
m.ID = id.String()
m.State = &mission.MissionStatus{
Code: mission.MissionStatusCode_DISPATCHED,
Msg: "",
}
m.Priority = 5
m.Tries = 0
m.TriesMax = 1
m.Device = dd.ID
m.Proto = proto.Type_HEARTBEAT
MsgControl.AddMessage(&m)
}(v)
go DispatchMsg(v)
}
}
log.Info(config.GetHeartTimeout())
......@@ -194,6 +157,46 @@ func StartClientHeartWork() {
}
}
func DispatchMsg(dd *device.Device) {
devicedispatch := thriftservice.NewDeviceDispatch()
ip := config.GetAppIp()
if !ClientManager.IsKey(dd.ID) {
if err := devicedispatch.SetDispatch(); err != nil {
log.Error("连接创建失败 ", dd.ID, err)
return
}
if OK, err := devicedispatch.Deactivate(context.Background(), dd.ID, ip); !OK {
log.Error("终端没有上线 id ", dd.ID, err)
}
return
}
if ClientManager.AddTimes(dd.ID) {
if err := devicedispatch.SetDispatch(); err != nil {
log.Error("连接创建失败 ", dd.ID, err)
return
}
if OK, err := devicedispatch.Deactivate(context.Background(), dd.ID, ip); !OK {
log.Error("终端已经掉线 ", dd.ID, err)
}
return
}
log.Info("空闲设备 id ", dd.ID)
m := mission.Message{}
id, _ := uuid.NewV4()
m.ID = id.String()
m.State = &mission.MissionStatus{
Code: mission.MissionStatusCode_DISPATCHED,
Msg: "",
}
m.Priority = 5
m.Tries = 0
m.TriesMax = 1
m.Device = dd.ID
m.Proto = proto.Type_HEARTBEAT
MsgControl.AddMessage(&m)
}
// StartThriftSever1 开启client 的thrift 服务 复用socket实现双向通信
func StartThriftSever1() {
serverTransport, err := thrift.NewTServerSocket(config.GetLoginServerPort())
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment