Commit 3a0895be by yunpeng.song

Merge branch 'feature/function_优化' into develop

parents 90c8ecf0 72ac78e7
package control
import (
"context"
"ficus/mission"
"ficus/proto"
"ficus_clientserver/config"
......@@ -87,10 +88,13 @@ func (p *MsgSender) WriteMsgToSendChan(msg *model.Msg) {
}
// AddMsgTries 判断消息的重试次数
func (p *MsgSender) AddMsgTries(msg *model.Msg) bool {
func (p *MsgSender) AddMsgTries(msg *model.Msg, r bool) bool {
if msg.TriesAddOne() {
log.Info("已经达到最大尝试次数", msg.MsgId)
log.Debug("已经达到最大尝试次数", msg.MsgId)
if !r {
log.Debug("消息回调", msg.MsgId)
go p.MsgFeedBack(msg)
}
return true
}
return false
......@@ -103,12 +107,13 @@ func (p *MsgSender) MsgFeedBack(msg *model.Msg) {
tool.Retry(config.GetRetryTimes(), config.GetRetryTime(), func() func() error {
temp := msg
return func() error {
_, err := d.Feedback(temp.Message)
_, err := d.Feedback(context.Background(), temp.Message)
return err
}
}())
p.msgControl.DeleteMsg(msg.MsgId)
}
log.Debug("删除消息", msg.MsgId)
p.msgControl.DeleteMsg(msg.MsgId)
}
// IsMatch 消息是否符合入机条件
......@@ -128,11 +133,13 @@ func (p *MsgSender) DispatchMsg(msg *model.Msg) (r bool, err error) {
p.SendRetryMsg(msg)
} else {
push := thriftservice.NewDispatchMsgTrans(client.Trans)
if err = push.Dispatch(msg.Message); err != nil {
if err = push.Dispatch(context.Background(), msg.Message); err != nil {
p.SendRetryMsg(msg)
return true, nil
} else {
r = true
}
}
log.Debug("消息推送 result ", r, err)
return
}
......@@ -144,14 +151,16 @@ func (p *MsgSender) ReadMsgSendChan() {
//判断消息是否存在
msg, ok := p.msgControl.GetMsgFromMap(msgId)
if !ok {
log.Info("消息ID:", msgId, "不存在")
log.Debug("消息ID:", msgId, "不存在")
return
}
log.Info("消息ID:", msgId, "存在")
log.Debug("消息ID:", msgId, "存在")
//只推送状态未DISPATCHED 的消息 给client
if p.IsMatch(msg) {
p.DispatchMsg(msg)
p.AddMsgTries(msg)
r, _ := p.DispatchMsg(msg)
p.AddMsgTries(msg, r)
} else {
p.msgControl.DeleteMsg(msg.MsgId)
}
}(rm)
}
......
package main
import (
"bufio"
native "ficus/native/service"
"ficus/service"
"ficus_clientserver/config"
"ficus_clientserver/control"
"ficus_clientserver/model"
"ficus_clientserver/mqcontrol"
"ficus_clientserver/nethandle"
"ficus_clientserver/nethandle/thriftservice"
"ficus_clientserver/tool"
"fmt"
"os"
"os/exec"
"runtime/debug"
"strings"
"time"
server "github.com/kardianos/service"
log "github.com/sirupsen/logrus"
"net/http"
_ "net/http/pprof"
"github.com/apache/thrift/lib/go/thrift"
)
type program struct{}
......@@ -61,10 +45,24 @@ var (
)
func main() {
InitService()
//Work()
//InitService()
Work()
}
func Work() {
CatchDump()
//读取配置文件
InitCfg()
InitLog()
InitGlobleVar()
InitTimeTick()
StartThrift()
//go ConsoleIn()
http.ListenAndServe(":38084", nil)
//阻塞主协程
<-QuitFlag
log.Println("退出")
}
func InitService() {
svcConfig := &server.Config{
Name: "FicusSchedule",
......@@ -101,180 +99,3 @@ func InitService() {
log.Println(err)
}
}
// InitCfg 初始化配置文件
func InitCfg() {
if err := config.LoadCfg(); err != nil {
log.Fatalln("InitCfg", err)
}
}
// InitTimeTick 初始化定时任务管理器
func InitTimeTick() {
tool.StartTicks(time.Second)
}
func Work() {
CatchDump()
//读取配置文件
InitCfg()
InitLog()
InitGlobleVar()
InitTimeTick()
StartThrift()
//go ConsoleIn()
http.ListenAndServe(":8080", nil)
//阻塞主协程
<-QuitFlag
log.Println("退出")
}
func CatchDump() {
errs := recover()
if errs == nil {
return
}
exeName := os.Args[0] //获取程序名称
now := time.Now() //获取当前时间
pid := os.Getpid() //获取进程ID
time_str := now.Format("20060102150405") //设定时间格式
fname := fmt.Sprintf("%s-%d-%s-dump.log", exeName, pid, time_str) //保存错误信息文件名:程序名-进程ID-当前时间(年月日时分秒)
fmt.Println("dump to file ", fname)
f, err := os.Create(fname)
if err != nil {
return
}
defer f.Close()
f.WriteString(fmt.Sprintf("%v\r\n", errs)) //输出panic信息
f.WriteString("========\r\n")
f.WriteString(string(debug.Stack())) //输出堆栈信息
os.Exit(0)
}
// ConsoleIn 控制台输入
func ConsoleIn() {
inputReader := bufio.NewReader(os.Stdin) //创建一个读取器,并将其与标准输入绑定。
fmt.Printf("Please enter some input: ")
for {
input, err := inputReader.ReadString('\n') //读取器对象提供一个方法 ReadString(delim byte) ,该方法从输入中读取内容,直到碰到 delim 指定的字符,然后将读取到的内容连同 delim 字符一起放到缓冲区。
if err == nil {
fmt.Printf("The input was: %s ,err:%s", input, err)
}
s := strings.Split(input, "\r\n")
if s[0] == "reload cfg" {
// 重载 配置文件
if err := config.LoadCfg(); err != nil {
fmt.Printf("reload failed %s", err)
}
continue
}
}
}
// InitGlobleVar 初始化全局变量
func InitGlobleVar() {
QuitFlag = make(chan int)
psend := mqcontrol.NewProducer("myPusher", "myQueue")
sendChan := make(chan string, 50000)
msgMap := model.NewMsgmap()
ClientManager = model.GetDefaultClientMap()
RedisClient = model.NewRedisClient(config.GetRedisAddress(), config.GetRedisPWD())
MsgControl = model.NewMsgControl(msgMap, psend, RedisClient, sendChan, 10)
MsgSender = control.NewMsgSender(MsgControl, sendChan, ClientManager, 10)
MsgSender.StartSendWork()
}
// InitEureka 初始化eureka 服务
func InitEureka() {
err := tool.ConnectEureka(getCurrentPath() + "local.gcfg")
if err != nil {
log.Panicln("ConnectEureka", err)
}
err = tool.LoadEurekaInstance(getCurrentPath() + "i.json")
if err != nil {
log.Panicln("LoadEurekaInstance", err)
}
tool.HeartLoop()
}
func getCurrentPath() string {
s, err := exec.LookPath(os.Args[0])
if err != nil {
panic(err)
}
i := strings.LastIndex(s, "\\")
path := string(s[0 : i+1])
return path
}
// StartThrift 开启thrift 服务
func StartThrift() {
go StartThriftSever1()
go StartThriftSever2()
go StartClientHeartWork()
}
// StartClientHeartWork 开始循环心跳工作
func StartClientHeartWork() {
time.Sleep(time.Second * time.Duration(config.GetHeartFirstTime()))
nethandle.HandExpiredDevice(MsgControl, ClientManager, thriftservice.NewDeviceDispatch())
}
// StartThriftSever1 开启client 的thrift 服务 复用socket实现双向通信
func StartThriftSever1() {
serverTransport, err := thrift.NewTServerSocket(config.GetLoginServerPort())
if err != nil {
log.Fatalln("Error!", err)
}
//redis 控制
processorFactory := thriftservice.NewMyTProcessorFactory(ClientManager, MsgControl)
protocolFactory := thrift.NewTCompactProtocolFactory()
transportFactory := thrift.NewTFramedTransportFactory(thrift.NewTTransportFactory())
server := thrift.NewTSimpleServerFactory4(processorFactory, serverTransport, transportFactory, protocolFactory)
log.Println("thrift server in", config.GetLoginServerPort())
if err := server.Serve(); err != nil {
panic(err)
}
}
// StartThriftSever2 开启client 的thrift 多服务 ,不复用socket
func StartThriftSever2() {
serverTransport, err := thrift.NewTServerSocket(config.GetMultServerPort())
if err != nil {
log.Fatalln(err)
}
TMultiplexedProcessor := thrift.NewTMultiplexedProcessor()
deviceHandle := thriftservice.NewDeviceHandle()
deviceProcessor := service.NewDeviceServiceProcessor(deviceHandle)
dispatchHandle := thriftservice.NewDispatchService(nil, ClientManager, MsgControl)
dispatchProcessor := service.NewDispatchServiceProcessor(dispatchHandle)
scheduleHandle := thriftservice.NewScheduleHandle(MsgControl, ClientManager)
scheduleProcessor := native.NewScheduleServiceProcessor(scheduleHandle)
packageHandle := thriftservice.NewPackageHandle()
packageProcessor := service.NewPackageServiceProcessor(packageHandle)
TMultiplexedProcessor.RegisterProcessor("Identity", deviceProcessor)
TMultiplexedProcessor.RegisterProcessor("Mission", dispatchProcessor)
TMultiplexedProcessor.RegisterProcessor("Schedule", scheduleProcessor)
TMultiplexedProcessor.RegisterProcessor("Package", packageProcessor)
protocolFactory := thrift.NewTCompactProtocolFactory()
transportFactory := thrift.NewTFramedTransportFactory(thrift.NewTTransportFactory())
server := thrift.NewTSimpleServer4(TMultiplexedProcessor, serverTransport, transportFactory, protocolFactory)
log.Println("thrift server in", config.GetMultServerPort())
if err := server.Serve(); err != nil {
log.Panicln(err)
}
}
......@@ -73,7 +73,7 @@ func (p *ClientManager) Get(k string) (*FicusClient, error) {
func (p *ClientManager) SetClient(k string, v *FicusClient) {
p.Lock.Lock()
defer p.Lock.Unlock()
log.Println("add client", k)
log.Info("add client", k)
p.mapClient[k] = v
p.AllCount++
}
......@@ -82,7 +82,7 @@ func (p *ClientManager) SetClient(k string, v *FicusClient) {
func (p *ClientManager) AddTimes(k string) bool {
p.Lock.Lock()
defer p.Lock.Unlock()
log.Println(" AddTimes", k)
log.Debug(" AddTimes", k)
v, ok := p.mapClient[k]
if ok {
v.LostTimes++
......@@ -90,7 +90,9 @@ func (p *ClientManager) AddTimes(k string) bool {
return false
}
if v.LostTimes == v.MaxLostTimes {
p.DeleteClient(k)
//不调用 DeleteClient ,防止死锁
v.Trans.Close()
delete(p.mapClient, k)
return true
}
return false
......@@ -101,7 +103,7 @@ func (p *ClientManager) AddTimes(k string) bool {
func (p *ClientManager) ClearTimes(k string) {
p.Lock.Lock()
defer p.Lock.Unlock()
log.Println(" ClearTimes", k)
log.Debug(" ClearTimes", k)
v, ok := p.mapClient[k]
if ok {
v.LostTimes = 0
......@@ -111,6 +113,8 @@ func (p *ClientManager) ClearTimes(k string) {
// DeleteClient 删除客户端
func (p *ClientManager) DeleteClient(k string) {
p.Lock.Lock()
defer p.Lock.Unlock()
v, ok := p.mapClient[k]
if ok {
v.Trans.Close()
......@@ -145,7 +149,7 @@ func (p *ClientManager) IsKey(key string) bool {
func (p *ClientManager) UpdateDevice(key string, device *device.Device, trans thrift.TTransport) bool {
p.Lock.Lock()
defer p.Lock.Unlock()
log.Println(" UpdateDevice", key)
log.Debug(" UpdateDevice", key)
v, ok := p.mapClient[key]
if ok {
v.Device = device
......
......@@ -3,9 +3,11 @@ package model
import (
"encoding/json"
"ficus/mission"
"ficus/proto"
"ficus_clientserver/config"
"ficus_clientserver/mqcontrol"
"ficus_clientserver/tool"
"sync"
"time"
log "github.com/sirupsen/logrus"
......@@ -16,7 +18,7 @@ type MsgControl struct {
msgMap *MsgMap // 所有到达路由的消息
redisClient *RedisClient // redis控制器
mqProducer *mqcontrol.Producer // mq控制器
timerMap map[string]*tool.Timer // 消息缓存定时删除器
timerMap sync.Map // 消息缓存定时删除器
sendChan chan string // 发送管道
msgList *MsgList // 消息队列
timeout int64 // 消息队列处理时间间隔
......@@ -31,7 +33,6 @@ func NewMsgControl(msgMap *MsgMap, mqProducer *mqcontrol.Producer, redisClient *
p.sendChan = sendChan
p.timeout = timeout
p.msgList = NewMsgList()
p.timerMap = make(map[string]*tool.Timer)
p.Init()
return p
}
......@@ -53,9 +54,14 @@ func (p *MsgControl) LoadRedisMsg() {
if err != nil {
continue
}
log.Println("load msg ", m.MsgId)
log.Debug("load msg ", m.MsgId)
// 增加消息
serverName := m.Message.Proto
if serverName != proto.Type_HEARTBEAT {
p.AddMsg(m)
} else {
p.DeleteMsg(m.MsgId)
}
}
}
......@@ -77,13 +83,13 @@ func (p *MsgControl) HandleMsgList() {
func (p *MsgControl) setTimerCall(msg *Msg) {
t := time.Unix(msg.TimeSecond, int64(0))
d := time.Until(t.AddDate(0, 0, config.GetMsgStoreTime()))
log.Info("时间差", d, msg.TimeSecond, t)
log.Debug("时间差", d, msg.TimeSecond, t)
if d > 0 {
tr := tool.AddCallback(d, func() {
log.Info("删除消息", msg.MsgId)
log.Debug("删除消息", msg.MsgId)
go p.DeleteMsg(msg.MsgId)
})
p.timerMap[msg.MsgId] = tr
p.timerMap.Store(msg.MsgId, tr)
}
}
......@@ -126,7 +132,7 @@ func (p *MsgControl) GetMsg(k string) *Msg {
// AddMsg 增加Msg
func (p *MsgControl) AddMsg(msg *Msg) {
p.msgMap.addMsg(msg)
p.setTimerCall(msg)
//p.setTimerCall(msg)
p.sendMsg(msg.MsgId)
}
......@@ -151,11 +157,13 @@ func (p *MsgControl) AddMessage(m *mission.Message) {
msg.TimeSecond = time.Now().Unix()
msg.Lastupdatetime = msg.TimeSecond
p.AddMsg(msg)
if m.Proto != proto.Type_HEARTBEAT {
err := p.redisClient.WriteMsgToRedis(msg)
if err != nil {
log.Info("message write to redis failed ", err)
log.Debug("message write to redis failed ", err)
return
}
}
// p.setTimerCall(msg)
// p.sendMsg(msg)
}
......@@ -178,5 +186,5 @@ func (p *MsgControl) PushMsgToMq(msgId string) {
func (p *MsgControl) DeleteMsg(msgId string) {
p.msgMap.DeleteMsg(msgId)
p.redisClient.DeleteMsgFromRedis(msgId)
delete(p.timerMap, msgId)
p.timerMap.Delete(msgId)
}
......@@ -19,14 +19,14 @@ type RedisClient struct {
func (p *RedisClient) setMsgToRedis(msg *Msg) error {
jsonstr, _ := json.Marshal(msg)
kname := config.RedisKeyConfig.MsgStatusName
log.WithFields(log.Fields{"func": "setRedisMsg", "kname": kname, "msgId": msg.MsgId}).Info(string(jsonstr))
log.WithFields(log.Fields{"func": "setRedisMsg", "kname": kname, "msgId": msg.MsgId}).Debug(string(jsonstr))
return HSet(p.redisClient, kname, msg.MsgId, jsonstr)
}
// 从redis删除msgId 的消息
func (p *RedisClient) deleteMsgToRedis(msgId string) {
kname := config.RedisKeyConfig.MsgStatusName
log.WithFields(log.Fields{"func": "deleteMsgToRedis", "kname": kname, "msgId": msgId}).Info()
log.WithFields(log.Fields{"func": "deleteMsgToRedis", "kname": kname, "msgId": msgId}).Debug()
HDel(p.redisClient, kname, msgId)
}
......@@ -34,14 +34,14 @@ func (p *RedisClient) deleteMsgToRedis(msgId string) {
func (p *RedisClient) setMissionToRedis(msg *mission.Message) {
jsonstr, _ := json.Marshal(msg)
kname := config.RedisKeyConfig.MsgName
log.WithFields(log.Fields{"func": "setmissiontoredis", "kname": kname, "msgId": msg.ID}).Info(string(jsonstr))
log.WithFields(log.Fields{"func": "setmissiontoredis", "kname": kname, "msgId": msg.ID}).Debug(string(jsonstr))
HSet(p.redisClient, kname, msg.ID, jsonstr)
}
// 从redis 删除 message
func (p *RedisClient) deleteMissionToRedis(msgId string) {
kname := config.RedisKeyConfig.MsgName
log.WithFields(log.Fields{"func": "deleteMissionToRedis", "kname": kname, "msgId": msgId}).Info()
log.WithFields(log.Fields{"func": "deleteMissionToRedis", "kname": kname, "msgId": msgId}).Debug()
HDel(p.redisClient, kname, msgId)
}
......@@ -63,7 +63,6 @@ func (p *RedisClient) GetAllMsg(k string) map[string]string {
// 获取Message
func (p *RedisClient) GetMessage(k string) (*mission.Message, error) {
kname := config.RedisKeyConfig.MsgName
log.WithFields(log.Fields{"func": "GetMessage"}).Info(kname)
jsonstr, err := HGet(p.redisClient, kname, k)
if err != nil {
return nil, err
......@@ -108,7 +107,7 @@ func (p *RedisClient) UpdateMsgStatus(mis mission.Message) bool {
func (p *RedisClient) setMessage(msg mission.Message) bool {
jsonstr, _ := json.Marshal(msg)
kname := config.RedisKeyConfig.MsgName
log.WithFields(log.Fields{"func": "setMessage", "kname": kname, "msgId": msg.ID}).Info(string(jsonstr))
log.WithFields(log.Fields{"func": "setMessage", "kname": kname, "msgId": msg.ID}).Debug(string(jsonstr))
HSet(p.redisClient, kname, msg.ID, jsonstr)
return true
}
......@@ -120,7 +119,7 @@ func (p *RedisClient) setMsg(msg *Msg) bool {
fmt.Println("Update Redis Msg Status err:", err)
return false
}
log.WithFields(log.Fields{"func": "setMsg", "kname": kname, "msgId": msg.MsgId}).Info(string(jsonstr))
log.WithFields(log.Fields{"func": "setMsg", "kname": kname, "msgId": msg.MsgId}).Debug(string(jsonstr))
HSet(p.redisClient, kname, msg.MsgId, jsonstr)
return true
}
......@@ -128,7 +127,7 @@ func (p *RedisClient) setMsg(msg *Msg) bool {
func (p *RedisClient) GetMsg(k string) *Msg {
kname := config.RedisKeyConfig.MsgStatusName
jsonstr, _ := HGet(p.redisClient, kname, k)
log.Println(jsonstr)
log.Debug(jsonstr)
var msg Msg
json.Unmarshal([]byte(jsonstr), &msg)
return &msg
......
package nethandle
import (
"context"
"ficus/device"
"ficus/identity"
"ficus/mission"
"ficus/proto"
"ficus/service"
"ficus_clientserver/config"
"ficus_clientserver/model"
"time"
uuid "github.com/satori/go.uuid"
log "github.com/sirupsen/logrus"
)
type (
DeviceHandler interface {
Identify(uuid string) (r *identity.Organization, err error)
Bind(uuid string, device *identity.Organization) (r bool, err error)
Unbind(uuid string) (r bool, err error)
Activate(device *device.Device, ip string) (r bool, err error)
Deactivate(uuid string, ip string) (r bool, err error)
Keep(device *device.Device, ip string) (r bool, err error)
GetPeer(uuid string) (r string, err error)
GetDeviceEx(uuid string) (r *device.DeviceEx, err error)
ListByStatusEx(status int32, page int32, pageSize int32) (r *service.DevicesExPage, err error)
ListByOrganizationEx(org *identity.Organization, page int32, pageSize int32) (r *service.DevicesExPage, err error)
ListExpired(seconds int32, ip string) (r *service.DevicesPage, err error)
ListByGroupEx(group string) (r *service.DevicesExPage, err error)
ListByRegionEx(rgn string) (r *service.DevicesExPage, err error)
ListByVersionEx(ver string, page int32, pageSize int32) (r *service.DevicesExPage, err error)
service.DeviceService
SetDispatch() error
}
......@@ -38,8 +18,6 @@ type (
}
)
var t1 int = 0
func NewDeviceHandle(h DeviceHandler) DeviceHandler {
d := &DeviceHandle{h}
return d
......@@ -54,145 +32,83 @@ func (d *DeviceHandle) SetDispatch() error {
}
// client 获取绑定门店信息接口
func (d *DeviceHandle) Identify(uuid string) (r *identity.Organization, err error) {
func (d *DeviceHandle) Identify(ctx context.Context, uuid string) (r *identity.Organization, err error) {
//TODO 调用设备服务获取绑定的门店信息
err = d.SetDispatch()
if err != nil {
return
}
return d.deviceDispatch.Identify(uuid)
return d.deviceDispatch.Identify(ctx, uuid)
}
// client 绑定门店信息接口
func (d *DeviceHandle) Bind(uuid string, device *identity.Organization) (r bool, err error) {
func (d *DeviceHandle) Bind(ctx context.Context, uuid string, device *identity.Organization) (r bool, err error) {
//TODO 调用设备服务绑定/解绑设备
err = d.SetDispatch()
if err != nil {
return
}
return d.deviceDispatch.Bind(uuid, device)
return d.deviceDispatch.Bind(ctx, uuid, device)
}
// client 接触绑定接口
func (d *DeviceHandle) Unbind(uuid string) (r bool, err error) {
func (d *DeviceHandle) Unbind(ctx context.Context, uuid string) (r bool, err error) {
//TODO 解除绑定
err = d.SetDispatch()
if err != nil {
return
}
return d.deviceDispatch.Unbind(uuid)
return d.deviceDispatch.Unbind(ctx, uuid)
}
func (d *DeviceHandle) Activate(device *device.Device, ip string) (r bool, err error) {
func (d *DeviceHandle) Activate(ctx context.Context, device *device.Device, ip string) (r bool, err error) {
//TODO 设备上线
return false, nil
}
// client 下线接口
func (d *DeviceHandle) Deactivate(uuid string, ip string) (r bool, err error) {
func (d *DeviceHandle) Deactivate(ctx context.Context, uuid string, ip string) (r bool, err error) {
err = d.SetDispatch()
if err != nil {
return
}
return d.deviceDispatch.Deactivate(uuid, ip)
return d.deviceDispatch.Deactivate(ctx, uuid, ip)
}
func (d *DeviceHandle) Keep(device *device.Device, ip string) (r bool, err error) {
func (d *DeviceHandle) Keep(ctx context.Context, device *device.Device, ip string) (r bool, err error) {
//TODO 心跳
return true, nil
}
func (d *DeviceHandle) GetPeer(uuid string) (r string, err error) {
func (d *DeviceHandle) GetPeer(ctx context.Context, uuid string) (r string, err error) {
//TODO 获取server ip
return "", nil
}
func (d *DeviceHandle) GetDeviceEx(uuid string) (r *device.DeviceEx, err error) {
func (d *DeviceHandle) GetDeviceEx(ctx context.Context, uuid string) (r *device.DeviceEx, err error) {
return
}
func (d *DeviceHandle) ListByStatusEx(status int32, page int32, pageSize int32) (r *service.DevicesExPage, err error) {
func (d *DeviceHandle) ListByStatusEx(ctx context.Context, status int32, page int32, pageSize int32) (r *service.DevicesExPage, err error) {
return nil, nil
}
func (d *DeviceHandle) ListByOrganizationEx(org *identity.Organization, page int32, pageSize int32) (r *service.DevicesExPage, err error) {
func (d *DeviceHandle) ListByOrganizationEx(ctx context.Context, org *identity.Organization, page int32, pageSize int32) (r *service.DevicesExPage, err error) {
return nil, nil
}
func (d *DeviceHandle) ListExpired(seconds int32, ip string) (r *service.DevicesPage, err error) {
func (d *DeviceHandle) ListExpired(ctx context.Context, seconds int32, ip string) (r *service.DevicesPage, err error) {
return nil, nil
}
func (d *DeviceHandle) ListByGroupEx(group string) (r *service.DevicesExPage, err error) {
func (d *DeviceHandle) ListByGroupEx(ctx context.Context, group string, page int32, pageSize int32) (r *service.DevicesExPage, err error) {
return nil, nil
}
func (d *DeviceHandle) ListByRegionEx(rgn string) (r *service.DevicesExPage, err error) {
func (d *DeviceHandle) ListByRegionEx(ctx context.Context, rgn string, page int32, pageSize int32) (r *service.DevicesExPage, err error) {
return nil, nil
}
func (d *DeviceHandle) ListByVersionEx(ver string, page int32, pageSize int32) (r *service.DevicesExPage, err error) {
return
}
// 处理空闲设备,实现由上到下的心跳机制
func HandExpiredDevice(msgControl *model.MsgControl, clientManager *model.ClientManager, deviceDispatch DeviceHandler) {
// TODO 获取服务ip
log.Info("HandExpiredDevice start")
for {
err := deviceDispatch.SetDispatch()
if err != nil {
log.Info(config.GetHeartLoopTime(), err)
time.Sleep(time.Duration(config.GetHeartLoopTime()) * time.Second)
continue
}
ip := config.GetAppIp()
res, err := deviceDispatch.ListExpired(int32(config.GetHeartTimeout()), ip)
if err != nil {
log.Error("HandExpiredDevice", err)
} else {
log.Info("遍历设备")
for _, v := range res.Devices {
go func(dd *device.Device) {
if !clientManager.IsKey(dd.ID) {
if err = deviceDispatch.SetDispatch(); err != nil {
log.Info("连接创建失败 ", dd.ID, err)
return
}
if OK, err := deviceDispatch.Deactivate(dd.ID, ip); !OK {
log.Info("终端没有上线 id ", dd.ID, err)
}
func (d *DeviceHandle) ListByVersionEx(ctx context.Context, ver string, page int32, pageSize int32) (r *service.DevicesExPage, err error) {
return
}
if clientManager.AddTimes(dd.ID) {
if err = deviceDispatch.SetDispatch(); err != nil {
log.Info("连接创建失败 ", dd.ID, err)
return
}
if OK, err := deviceDispatch.Deactivate(dd.ID, ip); !OK {
log.Info("终端已经掉线 ", dd.ID, err)
}
return
}
log.Info("空闲设备 id ", dd.ID)
m := mission.Message{}
id, _ := uuid.NewV4()
m.ID = id.String()
m.State = &mission.MissionStatus{
Code: mission.MissionStatusCode_DISPATCHED,
Msg: "",
}
m.Priority = 5
m.Tries = 0
m.TriesMax = 1
m.Device = dd.ID
m.Proto = proto.Type_HEARTBEAT
msgControl.AddMessage(&m)
}(v)
}
}
log.Info(config.GetHeartTimeout())
time.Sleep(time.Duration(config.GetHeartLoopTime()) * time.Second)
}
}
package nethandle
import (
"context"
"ficus/device"
"ficus/mission"
"ficus/service"
"ficus_clientserver/config"
"ficus_clientserver/model"
"ficus_clientserver/tool"
......@@ -12,10 +14,7 @@ import (
type (
DispatchHandler interface {
Login(device *device.Device) (r bool, err error)
Heartbeat(whom *device.Device) (r bool, err error)
Feedback(response *mission.Message) (r bool, err error)
Dispatch(request *mission.Message) (err error)
service.DispatchService
SetDispatch() error
}
......@@ -44,18 +43,18 @@ func (d *DispatchHandle) SetDispatch() error {
}
// client心跳接口
func (d *DispatchHandle) Heartbeat(whom *device.Device) (r bool, err error) {
func (d *DispatchHandle) Heartbeat(ctx context.Context, whom *device.Device) (r bool, err error) {
log.WithFields(log.Fields{"func": "Heartbeat", "心跳id": whom.ID}).Info(whom)
return d.dispatchService.Heartbeat(whom)
return d.dispatchService.Heartbeat(ctx, whom)
}
// client消息回调接口
func (d *DispatchHandle) Feedback(response *mission.Message) (r bool, err error) {
func (d *DispatchHandle) Feedback(ctx context.Context, response *mission.Message) (r bool, err error) {
log.WithFields(log.Fields{"data": string(response.Data)}).Info(response)
// TODO 更新消息状态
tool.Retry(config.GetRetryTimes(), config.GetRetryTime(), func() error {
// TODO 回调消息
d.dispatchService.Feedback(response)
d.dispatchService.Feedback(ctx, response)
d.msgControl.DeleteMsg(response.ID)
return nil
})
......@@ -64,11 +63,11 @@ func (d *DispatchHandle) Feedback(response *mission.Message) (r bool, err error)
}
// client登录接口
func (d *DispatchHandle) Login(device *device.Device) (r bool, err error) {
func (d *DispatchHandle) Login(ctx context.Context, device *device.Device) (r bool, err error) {
log.WithFields(log.Fields{"func": "Login", "登录id": device.ID}).Info(device)
return d.dispatchService.Login(device)
return d.dispatchService.Login(ctx, device)
}
func (d *DispatchHandle) Dispatch(request *mission.Message) (err error) {
func (d *DispatchHandle) Dispatch(ctx context.Context, request *mission.Message) (err error) {
return nil
}
package nethandle
import "ficus/pkg"
import (
"context"
"ficus/pkg"
"ficus/service"
)
type (
PackageHandler interface {
GetSpec(pkgName string, ver string) (r *pkg.PackageSpec, err error)
service.PackageService
SetDispatch() error
}
PackageHandle struct {
......@@ -19,8 +23,12 @@ func NewPackageHandle(p PackageHandler) PackageHandler {
return h
}
func (p *PackageHandle) GetSpec(pkgName string, ver string) (r *pkg.PackageSpec, err error) {
return p.PackageHandler.GetSpec(pkgName, ver)
func (p *PackageHandle) GetSpec(ctx context.Context, pkgName string, ver string) (r *pkg.PackageSpec, err error) {
return p.PackageHandler.GetSpec(ctx, pkgName, ver)
}
func (p *PackageHandle) GetSpecByPartner(ctx context.Context, pkgName string, ver string, partnerId string) (r *pkg.PackageSpec, err error) {
return p.PackageHandler.GetSpecByPartner(ctx, pkgName, ver, partnerId)
}
func (p *PackageHandle) SetDispatch() (err error) {
......
package nethandle
import (
"context"
"errors"
"ficus/mission"
"ficus/native/service"
"ficus_clientserver/model"
log "github.com/sirupsen/logrus"
......@@ -10,7 +12,7 @@ import (
type (
ScheduleHandler interface {
Schedule(request *mission.Message) (r bool, err error)
service.ScheduleService
}
ScheduleHandle struct {
......@@ -29,12 +31,12 @@ func NewScheduleHandle(m *model.MsgControl, c *model.ClientManager, s ScheduleHa
return h
}
func (s *ScheduleHandle) Schedule(request *mission.Message) (r bool, err error) {
log.Info("Schedule start ", request)
func (s *ScheduleHandle) Schedule(ctx context.Context, request *mission.Message) (r bool, err error) {
log.Info("Schedule", request)
if !s.clientManager.IsKey(request.Device) {
return false, errors.New("机器不在线")
}
go s.msgControl.AddMessage(request)
log.Info("Schedule end ")
log.Debug("Schedule end ")
return true, nil
}
......@@ -27,93 +27,92 @@ func NewDeviceDispatch() *DeviceDispatch {
}
// Identify 获取设备的绑定门店信息
func (d *DeviceDispatch) Identify(uuid string) (r *identity.Organization, err error) {
func (d *DeviceDispatch) Identify(ctx context.Context, uuid string) (r *identity.Organization, err error) {
defer d.tran.Close()
r, err = d.client.Identify(context.Background(), uuid)
r, err = d.client.Identify(ctx, uuid)
log.Info("dispatch Identify", err, uuid)
return
}
// Bind 绑定设备门店信息
func (d *DeviceDispatch) Bind(uuid string, device *identity.Organization) (r bool, err error) {
func (d *DeviceDispatch) Bind(ctx context.Context, uuid string, device *identity.Organization) (r bool, err error) {
defer d.tran.Close()
r, err = d.client.Bind(context.Background(), uuid, device)
r, err = d.client.Bind(ctx, uuid, device)
log.Info("dispatch Bind", err, device)
return
}
// Unbind 绑定设备门店信息
func (d *DeviceDispatch) Unbind(uuid string) (r bool, err error) {
func (d *DeviceDispatch) Unbind(ctx context.Context, uuid string) (r bool, err error) {
defer d.tran.Close()
r, err = d.client.Unbind(context.Background(), uuid)
r, err = d.client.Unbind(ctx, uuid)
log.Info("dispatch Unbind", err, uuid)
return
}
// Activate 设备上线
func (d *DeviceDispatch) Activate(device *device.Device, ip string) (r bool, err error) {
func (d *DeviceDispatch) Activate(ctx context.Context, device *device.Device, ip string) (r bool, err error) {
defer d.tran.Close()
log.Info("dispatch Activate start")
r, err = d.client.Activate(context.Background(), device, ip)
r, err = d.client.Activate(ctx, device, ip)
log.Info("dispatch Activate", err, device, ip)
return
}
// Deactivate 设备下线
func (d *DeviceDispatch) Deactivate(uuid string, ip string) (r bool, err error) {
func (d *DeviceDispatch) Deactivate(ctx context.Context, uuid string, ip string) (r bool, err error) {
defer d.tran.Close()
r, err = d.client.Deactivate(context.Background(), uuid, ip)
r, err = d.client.Deactivate(ctx, uuid, ip)
log.Info("dispatch Deactivate end ", r, err, uuid, ip)
return
}
// Keep 心跳接口
func (d *DeviceDispatch) Keep(device *device.Device, ip string) (r bool, err error) {
func (d *DeviceDispatch) Keep(ctx context.Context, device *device.Device, ip string) (r bool, err error) {
defer d.tran.Close()
r, err = d.client.Keep(context.Background(), device, ip)
r, err = d.client.Keep(ctx, device, ip)
log.Info("dispatch Keep", err, device, ip)
return
}
func (d *DeviceDispatch) GetPeer(uuid string) (r string, err error) {
func (d *DeviceDispatch) GetPeer(ctx context.Context, uuid string) (r string, err error) {
defer d.tran.Close()
return "", nil
}
func (d *DeviceDispatch) GetDeviceEx(uuid string) (r *device.DeviceEx, err error) {
func (d *DeviceDispatch) GetDeviceEx(ctx context.Context, uuid string) (r *device.DeviceEx, err error) {
defer d.tran.Close()
return
}
func (d *DeviceDispatch) ListByStatusEx(status int32, page int32, pageSize int32) (r *service.DevicesExPage, err error) {
func (d *DeviceDispatch) ListByStatusEx(ctx context.Context, status int32, page int32, pageSize int32) (r *service.DevicesExPage, err error) {
defer d.tran.Close()
return nil, nil
}
func (d *DeviceDispatch) ListByOrganizationEx(org *identity.Organization, page int32, pageSize int32) (r *service.DevicesExPage, err error) {
func (d *DeviceDispatch) ListByOrganizationEx(ctx context.Context, org *identity.Organization, page int32, pageSize int32) (r *service.DevicesExPage, err error) {
defer d.tran.Close()
return nil, nil
}
func (d *DeviceDispatch) ListByGroupEx(group string) (r *service.DevicesExPage, err error) {
func (d *DeviceDispatch) ListByGroupEx(ctx context.Context, group string, page int32, pageSize int32) (r *service.DevicesExPage, err error) {
defer d.tran.Close()
return nil, nil
}
func (d *DeviceDispatch) ListByRegionEx(rgn string) (r *service.DevicesExPage, err error) {
func (d *DeviceDispatch) ListByRegionEx(ctx context.Context, rgn string, page int32, pageSize int32) (r *service.DevicesExPage, err error) {
defer d.tran.Close()
return nil, nil
}
func (d *DeviceDispatch) ListByVersionEx(ver string, page int32, pageSize int32) (r *service.DevicesExPage, err error) {
func (d *DeviceDispatch) ListByVersionEx(ctx context.Context, ver string, page int32, pageSize int32) (r *service.DevicesExPage, err error) {
defer d.tran.Close()
return
}
// 获取空闲设备
func (d *DeviceDispatch) ListExpired(seconds int32, ip string) (r *service.DevicesPage, err error) {
func (d *DeviceDispatch) ListExpired(ctx context.Context, seconds int32, ip string) (r *service.DevicesPage, err error) {
defer d.tran.Close()
r, err = d.client.ListExpired(context.Background(), seconds, ip)
r, err = d.client.ListExpired(ctx, seconds, ip)
log.Info("dispatch ListExpired", err, seconds, ip)
return
}
......
......@@ -12,14 +12,12 @@ import (
type (
DeviceHandle struct {
deviceDispatch nethandle.DeviceHandler
//deviceDispatch nethandle.DeviceHandler
}
)
func NewDeviceHandle() *DeviceHandle {
d := DeviceHandle{}
dispatch := NewDeviceDispatch()
d.deviceDispatch = nethandle.NewDeviceHandle(dispatch)
return &d
}
......@@ -27,21 +25,27 @@ func NewDeviceHandle() *DeviceHandle {
func (d *DeviceHandle) Identify(ctx context.Context, uuid string) (r *identity.Organization, err error) {
//TODO 调用设备服务获取绑定的门店信息
log.Info("Identify", uuid)
return d.deviceDispatch.Identify(uuid)
dispatch := NewDeviceDispatch()
deviceDispatch := nethandle.NewDeviceHandle(dispatch)
return deviceDispatch.Identify(ctx, uuid)
}
// client 绑定门店信息接口
func (d *DeviceHandle) Bind(ctx context.Context, uuid string, device *identity.Organization) (r bool, err error) {
//TODO 调用设备服务绑定/解绑设备
log.Info("Bind", uuid, device)
return d.deviceDispatch.Bind(uuid, device)
dispatch := NewDeviceDispatch()
deviceDispatch := nethandle.NewDeviceHandle(dispatch)
return deviceDispatch.Bind(ctx, uuid, device)
}
// client 接触绑定接口
func (d *DeviceHandle) Unbind(ctx context.Context, uuid string) (r bool, err error) {
//TODO 解除绑定
log.Info("Unbind", uuid)
return d.deviceDispatch.Unbind(uuid)
dispatch := NewDeviceDispatch()
deviceDispatch := nethandle.NewDeviceHandle(dispatch)
return deviceDispatch.Unbind(ctx, uuid)
}
func (d *DeviceHandle) Activate(ctx context.Context, device *device.Device, ip string) (r bool, err error) {
......@@ -52,7 +56,9 @@ func (d *DeviceHandle) Activate(ctx context.Context, device *device.Device, ip s
// client 下线接口
func (d *DeviceHandle) Deactivate(ctx context.Context, uuid string, ip string) (r bool, err error) {
log.Info("Deactivate", uuid, ip)
return d.deviceDispatch.Deactivate(uuid, ip)
dispatch := NewDeviceDispatch()
deviceDispatch := nethandle.NewDeviceHandle(dispatch)
return deviceDispatch.Deactivate(ctx, uuid, ip)
}
func (d *DeviceHandle) Keep(ctx context.Context, device *device.Device, ip string) (r bool, err error) {
//TODO 心跳
......@@ -78,9 +84,9 @@ func (d *DeviceHandle) ListByOrganizationEx(ctx context.Context, org *identity.O
func (d *DeviceHandle) ListExpired(ctx context.Context, seconds int32, ip string) (r *service.DevicesPage, err error) {
return nil, nil
}
func (d *DeviceHandle) ListByGroupEx(ctx context.Context, group string) (r *service.DevicesExPage, err error) {
func (d *DeviceHandle) ListByGroupEx(ctx context.Context, group string, page int32, pageSize int32) (r *service.DevicesExPage, err error) {
return nil, nil
}
func (d *DeviceHandle) ListByRegionEx(ctx context.Context, rgn string) (r *service.DevicesExPage, err error) {
func (d *DeviceHandle) ListByRegionEx(ctx context.Context, rgn string, page int32, pageSize int32) (r *service.DevicesExPage, err error) {
return nil, nil
}
......@@ -12,33 +12,45 @@ import (
)
type DispatchHandle struct {
dispatchService nethandle.DispatchHandler
//dispatchService nethandle.DispatchHandler
trans thrift.TTransport
clientManager *model.ClientManager
msgControl *model.MsgControl
}
func NewDispatchService(trans thrift.TTransport, clientManager *model.ClientManager, msgControl *model.MsgControl) *DispatchHandle {
p := &DispatchHandle{}
d := NewDispatchMsg(trans, clientManager, msgControl)
p.dispatchService = nethandle.NewDispatchHandle(clientManager, msgControl, d)
p.trans = trans
p.clientManager = clientManager
p.msgControl = msgControl
return p
}
// client心跳接口
func (p *DispatchHandle) Heartbeat(ctx context.Context, whom *device.Device) (r bool, err error) {
return p.dispatchService.Heartbeat(whom)
d := NewDispatchMsg(p.trans, p.clientManager, p.msgControl)
dispatchService := nethandle.NewDispatchHandle(p.clientManager, p.msgControl, d)
return dispatchService.Heartbeat(ctx, whom)
}
// client消息回调接口
func (p *DispatchHandle) Feedback(ctx context.Context, response *mission.Message) (r bool, err error) {
return p.dispatchService.Feedback(response)
d := NewDispatchMsg(p.trans, p.clientManager, p.msgControl)
dispatchService := nethandle.NewDispatchHandle(p.clientManager, p.msgControl, d)
return dispatchService.Feedback(ctx, response)
}
// client登录接口
func (p *DispatchHandle) Login(ctx context.Context, device *device.Device) (r bool, err error) {
return p.dispatchService.Login(device)
d := NewDispatchMsg(p.trans, p.clientManager, p.msgControl)
dispatchService := nethandle.NewDispatchHandle(p.clientManager, p.msgControl, d)
return dispatchService.Login(ctx, device)
}
func (p *DispatchHandle) Dispatch(ctx context.Context, request *mission.Message) (err error) {
return p.dispatchService.Dispatch(request)
d := NewDispatchMsg(p.trans, p.clientManager, p.msgControl)
dispatchService := nethandle.NewDispatchHandle(p.clientManager, p.msgControl, d)
return dispatchService.Dispatch(ctx, request)
}
type myProcessorFactory struct {
......
......@@ -19,13 +19,12 @@ type DispatchMsg struct {
clientManager *model.ClientManager
msgControl *model.MsgControl
callbackClient *service.DispatchServiceClient
deviceDispatch *DeviceDispatch
}
func NewDispatchMsgTrans(trans thrift.TTransport) *DispatchMsg {
d := &DispatchMsg{}
d.trans = trans
d.deviceDispatch = NewDeviceDispatch()
d.callbackClient = service.NewDispatchServiceClientFactory(thrift.NewTFramedTransport(trans), thrift.NewTCompactProtocolFactory())
return d
}
......@@ -35,7 +34,6 @@ func NewDispatchMsg(trans thrift.TTransport, clientManager *model.ClientManager,
d.clientManager = clientManager
d.msgControl = msgControl
d.trans = trans
d.deviceDispatch = NewDeviceDispatch()
d.callbackClient = service.NewDispatchServiceClientFactory(thrift.NewTFramedTransport(trans), thrift.NewTCompactProtocolFactory())
return d
}
......@@ -45,22 +43,22 @@ func (p *DispatchMsg) SetDispatch() error {
}
// client心跳接口
func (d *DispatchMsg) Heartbeat(whom *device.Device) (r bool, err error) {
log.WithFields(log.Fields{"func": "Heartbeat", "心跳id": whom.ID}).Info(whom)
func (d *DispatchMsg) Heartbeat(ctx context.Context, whom *device.Device) (r bool, err error) {
key := whom.ID
if d.clientManager.IsKey(key) {
d.clientManager.UpdateHartTime(key)
err = d.deviceDispatch.SetDispatch()
deviceDispatch := NewDeviceDispatch()
err = deviceDispatch.SetDispatch()
if err != nil {
return
}
return d.deviceDispatch.Keep(whom, config.GetAppIp())
return deviceDispatch.Keep(ctx, whom, config.GetAppIp())
}
return true, nil
}
// client消息回调接口
func (d *DispatchMsg) Feedback(response *mission.Message) (r bool, err error) {
func (d *DispatchMsg) Feedback(ctx context.Context, response *mission.Message) (r bool, err error) {
serverName := response.Proto.String()
server, err := config.GetCallBackServerAddress(serverName)
if err != nil {
......@@ -89,7 +87,7 @@ func (d *DispatchMsg) Feedback(response *mission.Message) (r bool, err error) {
}
// client登录接口
func (d *DispatchMsg) Login(device *device.Device) (r bool, err error) {
func (d *DispatchMsg) Login(ctx context.Context, device *device.Device) (r bool, err error) {
//TODO 写路由信息到设备服务 ,待接入 eureka获取ip 和端口
go func() {
//判断是否存在
......@@ -102,8 +100,8 @@ func (d *DispatchMsg) Login(device *device.Device) (r bool, err error) {
//写入客户端Map - 或者更新map 的
d.clientManager.SetClient(key, client)
//TODO 写路由信息到设备服务 ,待接入 eureka获取ip 和端口
err := d.deviceDispatch.SetDispatch()
deviceDispatch := NewDeviceDispatch()
err := deviceDispatch.SetDispatch()
if err != nil {
d.clientManager.DeleteClient(key)
return
......@@ -113,7 +111,7 @@ func (d *DispatchMsg) Login(device *device.Device) (r bool, err error) {
temp := device
ip := config.GetAppIp()
return func() error {
_, err := d.deviceDispatch.Activate(temp, ip)
_, err := deviceDispatch.Activate(ctx, temp, ip)
return err
}
}())
......@@ -126,12 +124,11 @@ func (d *DispatchMsg) Login(device *device.Device) (r bool, err error) {
}
// Dispatch 消息推送给client
func (p *DispatchMsg) Dispatch(request *mission.Message) (err error) {
log.Info("PushClientDispatch start ", request)
func (p *DispatchMsg) Dispatch(ctx context.Context, request *mission.Message) (err error) {
buff := thrift.NewTMemoryBuffer()
proto := thrift.NewTBinaryProtocol(buff, true, true)
request.Write(proto)
err = p.callbackClient.Dispatch(context.Background(), request)
log.Info("PushClientDispatch end ", err)
err = p.callbackClient.Dispatch(ctx, request)
log.Info("PushClientDispatch end ", err, "request context", request)
return err
}
......@@ -11,7 +11,7 @@ import (
type (
PackageHandle struct {
PackageHandler nethandle.PackageHandler
//PackageHandler nethandle.PackageHandler
}
PackageDispatch struct {
......@@ -20,7 +20,6 @@ type (
func NewPackageHandle() *PackageHandle {
p := &PackageHandle{}
p.PackageHandler = nethandle.NewPackageHandle(NewPackageDispatch())
return p
}
......@@ -31,10 +30,16 @@ func NewPackageDispatch() *PackageDispatch {
// 接受消息
func (s *PackageHandle) GetSpec(ctx context.Context, pkgName string, ver string) (r *pkg.PackageSpec, err error) {
return s.PackageHandler.GetSpec(pkgName, ver)
PackageHandler := nethandle.NewPackageHandle(NewPackageDispatch())
return PackageHandler.GetSpec(ctx, pkgName, ver)
}
func (s *PackageDispatch) GetSpec(pkgName string, ver string) (r *pkg.PackageSpec, err error) {
func (s *PackageHandle) GetSpecByPartner(ctx context.Context, pkgName string, ver string, partnerId string) (r *pkg.PackageSpec, err error) {
PackageHandler := nethandle.NewPackageHandle(NewPackageDispatch())
return PackageHandler.GetSpecByPartner(ctx, pkgName, ver, partnerId)
}
func (s *PackageDispatch) GetSpec(ctx context.Context, pkgName string, ver string) (r *pkg.PackageSpec, err error) {
log.Info("GetSpec", pkgName, ver)
fcos := FileController.NewOss()
md5, url := fcos.GetFileInfo(pkgName)
......@@ -47,6 +52,20 @@ func (s *PackageDispatch) GetSpec(pkgName string, ver string) (r *pkg.PackageSpe
return
}
func (s *PackageDispatch) GetSpecByPartner(ctx context.Context, pkgName string, ver string, partnerId string) (r *pkg.PackageSpec, err error) {
log.Info("GetSpecByPartner", pkgName, ver)
fcos := FileController.NewOss()
md5, url := fcos.GetFileInfo(pkgName)
r = pkg.NewPackageSpec()
r.Pkg = pkg.NewPackage()
r.Pkg.Name = pkgName
r.Pkg.Version = ver
r.Pkg.PartnerId = partnerId
r.Md5 = md5
r.URL = url
return
}
func (d *PackageDispatch) SetDispatch() error {
return nil
}
......@@ -24,7 +24,6 @@ func NewScheduleHandle(msgControl *model.MsgControl, clientManager *model.Client
s := &ScheduleHandle{}
s.msgControl = msgControl
s.clientManager = clientManager
s.scheduleHandler = nethandle.NewScheduleHandle(msgControl, clientManager, NewScheduleDispatch())
return s
}
......@@ -36,9 +35,10 @@ func NewScheduleDispatch() *ScheduleDispatch {
// 接受消息
func (s *ScheduleHandle) Schedule(ctx context.Context, request *mission.Message) (r bool, err error) {
log.Info("Schedule", request)
return s.scheduleHandler.Schedule(request)
scheduleHandler := nethandle.NewScheduleHandle(s.msgControl, s.clientManager, NewScheduleDispatch())
return scheduleHandler.Schedule(ctx, request)
}
func (s *ScheduleDispatch) Schedule(request *mission.Message) (r bool, err error) {
func (s *ScheduleDispatch) Schedule(ctx context.Context, request *mission.Message) (r bool, err error) {
return true, nil
}
package main
import (
"bufio"
"context"
"ficus/device"
"ficus/mission"
native "ficus/native/service"
"ficus/proto"
"ficus/service"
"ficus_clientserver/config"
"ficus_clientserver/control"
"ficus_clientserver/model"
"ficus_clientserver/mqcontrol"
"ficus_clientserver/nethandle/thriftservice"
"ficus_clientserver/tool"
"fmt"
"os"
"os/exec"
"runtime/debug"
"strings"
"time"
"github.com/apache/thrift/lib/go/thrift"
uuid "github.com/satori/go.uuid"
log "github.com/sirupsen/logrus"
)
// InitCfg 初始化配置文件
func InitCfg() {
if err := config.LoadCfg(); err != nil {
log.Fatalln("InitCfg", err)
}
}
// InitTimeTick 初始化定时任务管理器
func InitTimeTick() {
tool.StartTicks(time.Second)
}
func CatchDump() {
errs := recover()
if errs == nil {
return
}
exeName := os.Args[0] //获取程序名称
now := time.Now() //获取当前时间
pid := os.Getpid() //获取进程ID
time_str := now.Format("20060102150405") //设定时间格式
fname := fmt.Sprintf("%s-%d-%s-dump.log", exeName, pid, time_str) //保存错误信息文件名:程序名-进程ID-当前时间(年月日时分秒)
fmt.Println("dump to file ", fname)
f, err := os.Create(fname)
if err != nil {
return
}
defer f.Close()
f.WriteString(fmt.Sprintf("%v\r\n", errs)) //输出panic信息
f.WriteString("========\r\n")
f.WriteString(string(debug.Stack())) //输出堆栈信息
os.Exit(0)
}
// ConsoleIn 控制台输入
func ConsoleIn() {
inputReader := bufio.NewReader(os.Stdin) //创建一个读取器,并将其与标准输入绑定。
fmt.Printf("Please enter some input: ")
for {
input, err := inputReader.ReadString('\n') //读取器对象提供一个方法 ReadString(delim byte) ,该方法从输入中读取内容,直到碰到 delim 指定的字符,然后将读取到的内容连同 delim 字符一起放到缓冲区。
if err == nil {
fmt.Printf("The input was: %s ,err:%s", input, err)
}
s := strings.Split(input, "\r\n")
if s[0] == "reload cfg" {
// 重载 配置文件
if err := config.LoadCfg(); err != nil {
fmt.Printf("reload failed %s", err)
}
continue
}
}
}
// InitGlobleVar 初始化全局变量
func InitGlobleVar() {
QuitFlag = make(chan int)
psend := mqcontrol.NewProducer("myPusher", "myQueue")
sendChan := make(chan string, 50000)
msgMap := model.NewMsgmap()
ClientManager = model.GetDefaultClientMap()
RedisClient = model.NewRedisClient(config.GetRedisAddress(), config.GetRedisPWD())
MsgControl = model.NewMsgControl(msgMap, psend, RedisClient, sendChan, 10)
MsgSender = control.NewMsgSender(MsgControl, sendChan, ClientManager, 10)
MsgSender.StartSendWork()
}
// InitEureka 初始化eureka 服务
func InitEureka() {
err := tool.ConnectEureka(getCurrentPath() + "local.gcfg")
if err != nil {
log.Panicln("ConnectEureka", err)
}
err = tool.LoadEurekaInstance(getCurrentPath() + "i.json")
if err != nil {
log.Panicln("LoadEurekaInstance", err)
}
tool.HeartLoop()
}
func getCurrentPath() string {
s, err := exec.LookPath(os.Args[0])
if err != nil {
panic(err)
}
i := strings.LastIndex(s, "\\")
path := string(s[0 : i+1])
return path
}
// StartThrift 开启thrift 服务
func StartThrift() {
go StartThriftSever1()
go StartThriftSever2()
go StartClientHeartWork()
}
// StartClientHeartWork 开始循环心跳工作
func StartClientHeartWork() {
// 处理空闲设备,实现由上到下的心跳机制
time.Sleep(time.Second * time.Duration(config.GetHeartFirstTime()))
log.Info("HandExpiredDevice start")
deviceDispatch := thriftservice.NewDeviceDispatch()
for {
err := deviceDispatch.SetDispatch()
if err != nil {
log.Info(config.GetHeartLoopTime(), err)
time.Sleep(time.Duration(config.GetHeartLoopTime()) * time.Second)
continue
}
ip := config.GetAppIp()
res, err := deviceDispatch.ListExpired(context.Background(), int32(config.GetHeartTimeout()), ip)
if err != nil || res == nil || res.Devices == nil {
log.Error("HandExpiredDevice", err)
} else {
log.Info("遍历设备")
for _, v := range res.Devices {
go DispatchMsg(v)
}
}
log.Info(config.GetHeartTimeout())
time.Sleep(time.Duration(config.GetHeartLoopTime()) * time.Second)
}
}
func DispatchMsg(dd *device.Device) {
devicedispatch := thriftservice.NewDeviceDispatch()
ip := config.GetAppIp()
if !ClientManager.IsKey(dd.ID) {
if err := devicedispatch.SetDispatch(); err != nil {
log.Error("连接创建失败 ", dd.ID, err)
return
}
if OK, err := devicedispatch.Deactivate(context.Background(), dd.ID, ip); !OK {
log.Error("终端没有上线 id ", dd.ID, err)
}
return
}
if ClientManager.AddTimes(dd.ID) {
if err := devicedispatch.SetDispatch(); err != nil {
log.Error("连接创建失败 ", dd.ID, err)
return
}
if OK, err := devicedispatch.Deactivate(context.Background(), dd.ID, ip); !OK {
log.Error("终端已经掉线 ", dd.ID, err)
}
return
}
log.Info("空闲设备 id ", dd.ID)
m := mission.Message{}
id, _ := uuid.NewV4()
m.ID = id.String()
m.State = &mission.MissionStatus{
Code: mission.MissionStatusCode_DISPATCHED,
Msg: "",
}
m.Priority = 5
m.Tries = 0
m.TriesMax = 1
m.Device = dd.ID
m.Proto = proto.Type_HEARTBEAT
MsgControl.AddMessage(&m)
}
// StartThriftSever1 开启client 的thrift 服务 复用socket实现双向通信
func StartThriftSever1() {
serverTransport, err := thrift.NewTServerSocket(config.GetLoginServerPort())
if err != nil {
log.Fatalln("Error!", err)
}
//redis 控制
processorFactory := thriftservice.NewMyTProcessorFactory(ClientManager, MsgControl)
protocolFactory := thrift.NewTCompactProtocolFactory()
transportFactory := thrift.NewTFramedTransportFactory(thrift.NewTTransportFactory())
server := thrift.NewTSimpleServerFactory4(processorFactory, serverTransport, transportFactory, protocolFactory)
log.Println("thrift server in", config.GetLoginServerPort())
if err := server.Serve(); err != nil {
panic(err)
}
}
// StartThriftSever2 开启client 的thrift 多服务 ,不复用socket
func StartThriftSever2() {
serverTransport, err := thrift.NewTServerSocket(config.GetMultServerPort())
if err != nil {
log.Fatalln(err)
}
TMultiplexedProcessor := thrift.NewTMultiplexedProcessor()
deviceHandle := thriftservice.NewDeviceHandle()
deviceProcessor := service.NewDeviceServiceProcessor(deviceHandle)
dispatchHandle := thriftservice.NewDispatchService(nil, ClientManager, MsgControl)
dispatchProcessor := service.NewDispatchServiceProcessor(dispatchHandle)
scheduleHandle := thriftservice.NewScheduleHandle(MsgControl, ClientManager)
scheduleProcessor := native.NewScheduleServiceProcessor(scheduleHandle)
packageHandle := thriftservice.NewPackageHandle()
packageProcessor := service.NewPackageServiceProcessor(packageHandle)
TMultiplexedProcessor.RegisterProcessor("Identity", deviceProcessor)
TMultiplexedProcessor.RegisterProcessor("Mission", dispatchProcessor)
TMultiplexedProcessor.RegisterProcessor("Schedule", scheduleProcessor)
TMultiplexedProcessor.RegisterProcessor("Package", packageProcessor)
protocolFactory := thrift.NewTCompactProtocolFactory()
transportFactory := thrift.NewTFramedTransportFactory(thrift.NewTTransportFactory())
server := thrift.NewTSimpleServer4(TMultiplexedProcessor, serverTransport, transportFactory, protocolFactory)
log.Println("thrift server in", config.GetMultServerPort())
if err := server.Serve(); err != nil {
log.Panicln(err)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment