Commit fed03a3c by yunpeng.song

优化

parent 4ecb19da
ficus_router.exe
package config
import (
"ficus/mission"
)
type SCOMMAND uint32
const (
......@@ -47,16 +43,6 @@ type Config_Redis struct {
Time int64
}
type T_CMsg struct {
RouteName string //"路由主机"
Mis mission.Message
PushCount uint32 //"push count 重试次数,1 为发送一次,依此类推"
PushStatus MSG_STATUS //"发送状态"
//Bfeedback bool "false:客户端未成功接收消息,true 客户端成功接收消息"
Lastupdatetime int64 //"laset dispatch time"
Rpcid string //"追踪使用"
}
type OCTOPUS_CONFIG struct {
Traceid string `json:"traceid"` //"路由的唯一,追踪的标示"
Project string `json:"project"` //"项目的唯一标示"
......@@ -64,11 +50,12 @@ type OCTOPUS_CONFIG struct {
//redis key name
type REDISNAME_CONFIG struct {
MsgName_prefix string `json:"msgname"` //"消息的前缀"
ClientName_prefix string `json:"clientname"` //"客户端的前缀"
ClientServer_prefix string `json:"clientserver"` //""
MsgStatusListName string `json:"msgstatusname"` //"未执行消息的list,只带消息id的"
RotuteStatuslistName string `json:"routestatusname"` //"在线路由的列表"
MsgName string `json:"msg"` // 消息的前缀
ClientName string `json:"clientname"` // 客户端的前缀
ClientServer string `json:"clientserver"` // 客户端连接的server
MsgStatusName string `json:"msgstatus"` // 未执行消息的list,只带消息id的
RotuteStatusName string `json:"routestatusname"` // 在线路由的列表
RouterName string `json:"routername"` // 路由映射列表
}
//路由状态表
......
......@@ -25,29 +25,31 @@ func newCfg() *CFG {
return cfg
}
func GetSrverId() string {
func GetRouterId() string {
return OctopusConfig.Traceid
}
func GetSrverName() string {
return OctopusConfig.Project
func GetRouterName() string {
return OctopusConfig.Project + OctopusConfig.Traceid
}
func initOhterCfg() {
OctopusConfig = Ctrl_Config_.OctopusCfg
OctopusRedisConfig.MsgName_prefix = "Msg"
OctopusRedisConfig.ClientName_prefix = "Client"
OctopusRedisConfig.ClientServer_prefix = "ClientServer"
OctopusRedisConfig.MsgStatusListName = OctopusConfig.Traceid + ".MsgStatsus"
OctopusRedisConfig.RotuteStatuslistName = "RouteStatus"
OctopusRedisConfig.MsgName = "Msg"
OctopusRedisConfig.ClientName = "Client"
OctopusRedisConfig.ClientServer = "ClientServer"
OctopusRedisConfig.MsgStatusName = "MsgStatus"
OctopusRedisConfig.RotuteStatusName = "RouteStatus"
OctopusRedisConfig.RouterName = "Router"
log.WithFields(log.Fields{
"Traceid": Ctrl_Config_.OctopusCfg.Traceid,
"Project": Ctrl_Config_.OctopusCfg.Project,
"MsgName_prefix": OctopusRedisConfig.MsgName_prefix,
"ClientName_prefix": OctopusRedisConfig.ClientName_prefix,
"ClientServer_prefix": OctopusRedisConfig.ClientServer_prefix,
"MsgStatusListName": OctopusRedisConfig.MsgStatusListName,
"RotuteStatuslistName": OctopusRedisConfig.RotuteStatuslistName,
"Traceid": Ctrl_Config_.OctopusCfg.Traceid,
"Project": Ctrl_Config_.OctopusCfg.Project,
"MsgName": OctopusRedisConfig.MsgName,
"ClientName": OctopusRedisConfig.ClientName,
"ClientServer": OctopusRedisConfig.ClientServer,
"MsgStatusName": OctopusRedisConfig.MsgStatusName,
"RotuteStatusName": OctopusRedisConfig.RotuteStatusName,
"RouterName": OctopusRedisConfig.RouterName,
}).Info()
}
......
......@@ -2,30 +2,37 @@ package main
import (
"ficus_router/config"
"ficus_router/model"
"ficus_router/mqcontrol"
"ficus_router/msgcontrol"
"ficus_router/work"
"log"
"fmt"
_ "net/http/pprof"
"os"
"runtime/debug"
"time"
log "github.com/sirupsen/logrus"
)
const HttpState1 = ":38888"
var quitFlag chan int
var QuitFlag chan int
var MsgControl *msgcontrol.MsgControl
var ServerMap *msgcontrol.Ctrl_Srvermap
var RetryWork *msgcontrol.CtrlRetry
func main() {
defer TryE()
InitLog()
InitCfg()
InitGlobleVar()
//管道阻塞主协程,防止主协程退出
quitFlag = make(chan int)
//服务程序 上线Map
ctr_mapSer := model.NewServermap()
QuitFlag = make(chan int)
//Msg map 存储 Msg 状态的map
go work.StartClientServer(ctr_mapSer)
go work.StartClientServer(ServerMap, RetryWork)
time.Sleep(5 * time.Second)
go work.Start(ctr_mapSer)
<-quitFlag
go work.StartRecvMqserver(MsgControl, ServerMap)
<-QuitFlag
log.Println("退出")
}
......@@ -35,3 +42,39 @@ func InitCfg() {
log.Fatalln(err)
}
}
func InitGlobleVar() {
sendMsgChan := make(chan *msgcontrol.Msg, 1000)
retryMsgChan := make(chan *msgcontrol.Msg, 1000)
psend := mqcontrol.NewProducer("myPusher", "myQueue")
msgMap := msgcontrol.NewMsgmap()
ServerMap = msgcontrol.NewServermap(sendMsgChan, retryMsgChan)
MsgControl = msgcontrol.NewMsgControl(msgMap, psend, retryMsgChan)
RetryWork = msgcontrol.NewCtrlRetry(MsgControl, retryMsgChan, sendMsgChan, 10)
}
func TryE() {
errs := recover()
if errs == nil {
return
}
exeName := os.Args[0] //获取程序名称
now := time.Now() //获取当前时间
pid := os.Getpid() //获取进程ID
time_str := now.Format("20060102150405") //设定时间格式
fname := fmt.Sprintf("%s-%d-%s-dump.log", exeName, pid, time_str) //保存错误信息文件名:程序名-进程ID-当前时间(年月日时分秒)
log.Println("dump to file ", fname)
f, err := os.Create(fname)
if err != nil {
return
}
defer f.Close()
f.WriteString(fmt.Sprintf("%v\r\n", errs)) //输出panic信息
f.WriteString("========\r\n")
f.WriteString(string(debug.Stack())) //输出堆栈信息
}
package model
import "fmt"
type FmError struct {
itype int
errMsg string
}
func NewMyErr(itype int, errMsg string) *FmError {
return &FmError{itype, errMsg}
}
//实现Error接口
func (e *FmError) Error() string {
return fmt.Sprintf("%d - %s", e.itype, e.errMsg)
}
package model
import (
"encoding/json"
"ficus/mission"
"ficus_router/config"
"ficus_router/mqcontrol"
"ficus_router/redisctrl"
"fmt"
"sync"
"time"
log "github.com/sirupsen/logrus"
)
/*
* 所有的消息集合,这里属于维护集合
*/
type CtrlMsgMap struct {
Timeout int64 // 超时时间
Lock sync.RWMutex
msgMap map[string]config.T_CMsg // 所有到达路由的消息
chan_msgMap chan mission.Message // 写入map的缓冲管道
Ctrl_Retry *CtrlRetry
ctrlRedisClient *redisctrl.CtrlRedisMsg
ctrl_SrverMap *Ctrl_Srvermap
ctrl_SendMq *mqcontrol.Producer
ctrl_RMsgStatus *redisctrl.RMsgStatus
}
func NewMsgmap(ctrl_SendMq *mqcontrol.Producer, ctrl_SrverMap *Ctrl_Srvermap, Ctrl_Retry *CtrlRetry) *CtrlMsgMap {
p := &CtrlMsgMap{}
p.Timeout = 10
p.chan_msgMap = make(chan mission.Message, 10000)
p.Ctrl_Retry = Ctrl_Retry
p.ctrl_SrverMap = ctrl_SrverMap
p.ctrl_SendMq = ctrl_SendMq
p.ctrlRedisClient = redisctrl.NewCtrlRedisMsg()
p.msgMap = make(map[string]config.T_CMsg)
//p.chan_TimeOutMsg=make(chan config.T_CMsg,10000)
p.ctrl_RMsgStatus = redisctrl.NewRMsgStatus()
//启动 缓存map的读取
go p.work_Read_chan_msgMap()
//启动超时检测
go p.TimeOutTimerMsg()
return p
}
func (p *CtrlMsgMap) Write_chan_msgMap(mis mission.Message) {
go func() {
p.chan_msgMap <- mis
}()
}
func (p *CtrlMsgMap) work_Read_chan_msgMap() {
for {
mis := <-p.chan_msgMap
//写入自身的 - map
p.set(mis)
//同时写入 redis 待执行目录
p.ctrl_RMsgStatus.StatusWork(mis.ID, config.MSG_WAITSUCCESS)
}
}
//超时检测协成
func (p *CtrlMsgMap) TimeOutTimerMsg() {
for {
log.Println("------------执行一次超时检测--------------")
p.Lock.Lock()
for k, v := range p.msgMap {
log.WithFields(log.Fields{"func": "msgkey"}).Info(k)
msg := p.ctrlRedisClient.GetMsg(k)
fmt.Println("1111111111111111111")
if msg.PushStatus == config.MSG_WAITSUCCESS {
//获取 客户端 连接到的 服务 Traceid
serName, err := p.ctrlRedisClient.GetClientConnSerTraceid(msg.Mis.Agent)
if err != nil {
log.Println("未找到对应server", err)
continue
}
if serName == "" {
log.Println(msg.Mis.ID, "----", msg.Mis.Agent, "-- 客户端未上线")
//未获取到客户端上线标识,证明 此机器并未上线,则忽略此消息
continue
}
fmt.Println("222222222222")
if p.ctrl_SrverMap.IsServer(serName) {
//超时写入重试管道
ic := time.Now().Unix() - v.Lastupdatetime
if ic > p.Timeout {
//超时,写入重试接口
var rm SerMsg
rm.SerName = v.RouteName
rm.MsgId = v.Mis.ID
rm.TimeSecond = v.Lastupdatetime
p.Ctrl_Retry.Write_chan_retryChan(rm)
fmt.Println("33333333333333")
}
} else {
//客户端上线,但是不在连接的列表中,则丢给MQ , 并且删除redis的缓存
jsonstr, _ := json.Marshal(msg.Mis)
//发送给mq
p.ctrl_SendMq.Push(jsonstr)
fmt.Println("444444444444")
//从 map 中删除 这个key
delete(p.msgMap, k)
//从redis中删除这个key
p.ctrl_RMsgStatus.Del(msg.Mis.ID)
}
} else if msg.PushStatus == config.MSG_SUCCESSED {
//从 map 中删除 这个key
fmt.Println("55555555555555555555555")
delete(p.msgMap, k)
p.ctrl_RMsgStatus.StatusWork(msg.Mis.ID, config.MSG_SUCCESSED)
}
}
p.Lock.Unlock()
time.Sleep(30 * time.Second)
}
}
func (p *CtrlMsgMap) getMsg(k string) config.T_CMsg {
p.Lock.RLock()
defer p.Lock.RUnlock()
return p.msgMap[k]
}
func (p *CtrlMsgMap) SetMsg(msg config.T_CMsg) bool {
p.Lock.RLock()
defer p.Lock.RUnlock()
p.msgMap[msg.Mis.ID] = msg
return true
}
func (p *CtrlMsgMap) Get(k string) mission.Message {
p.Lock.RLock()
var mis mission.Message
mis.ID = ""
mis.Agent = ""
defer p.Lock.RUnlock()
if _, ok := p.msgMap[k]; ok {
msg := p.msgMap[k]
return msg.Mis
}
return mis
}
func (p *CtrlMsgMap) set(mis mission.Message) {
p.Lock.Lock()
defer p.Lock.Unlock()
if v, ok := p.msgMap[mis.ID]; ok {
//存在,更新消息内容,并且更新消息内容
v.PushStatus = config.MSG_WAITSUCCESS
v.Lastupdatetime = time.Now().Unix()
v.PushCount++
v.Mis = mis
p.msgMap[mis.ID] = v
} else {
var m config.T_CMsg
m.PushCount = 1
m.Mis = mis
m.Lastupdatetime = time.Now().Unix()
m.PushStatus = config.MSG_WAITSUCCESS
p.msgMap[mis.ID] = m
}
}
func (p *CtrlMsgMap) AddCount(k string) {
p.Lock.Lock()
defer p.Lock.Unlock()
if v, ok := p.msgMap[k]; ok {
v.PushCount++
p.msgMap[k] = v
}
}
func (p *CtrlMsgMap) UpdateStatus(k string, status config.MSG_STATUS) error {
p.Lock.Lock()
defer p.Lock.Unlock()
if v, ok := p.msgMap[k]; ok {
//存在,更新一下狀態
v.PushStatus = status
p.msgMap[k] = v
} else {
return NewMyErr(1, "map values is nil ,key:"+k)
}
return nil
}
package model
import (
"encoding/json"
"ficus_router/config"
"ficus_router/mqcontrol"
"ficus_router/redisctrl"
"fmt"
"time"
)
/*
* 重试
* 消息失败后 丢入重试管道
* 重试次数,最小间隔 10 秒才能丢入推送客户端
* 重试 一定次数认为重试失败
*/
type CtrlRetry struct {
ctrser *Ctrl_Srvermap
ctrlRedisClient *redisctrl.CtrlRedisMsg
psend *mqcontrol.Producer
Timeout int64 // 超时时间
retryChan chan SerMsg // 重试管道
}
func NewCtrlRetry(producer *mqcontrol.Producer, ctrser *Ctrl_Srvermap, Timeout int64) *CtrlRetry {
p := &CtrlRetry{}
p.ctrser = ctrser
p.Timeout = Timeout
p.retryChan = make(chan SerMsg, 10000)
p.ctrlRedisClient = redisctrl.NewCtrlRedisMsg()
p.psend = producer
go p.Read_chan_retryChan()
return p
}
func (p *CtrlRetry) Set(mis SerMsg) {
go func() {
p.Write_chan_retryChan(mis)
}()
}
func (p *CtrlRetry) Write_chan_retryChan(rm SerMsg) {
p.retryChan <- rm
}
func (p *CtrlRetry) Read_chan_retryChan() {
fmt.Println("Read_chan_retryChan")
for {
rm := <-p.retryChan
ic := time.Now().Unix() - rm.TimeSecond
if ic > p.Timeout {
//大于超时时间
//1. 获取当前的状态
//2. 未完成,写入管道,完成 => 直接丢弃
msg := p.ctrlRedisClient.GetMsg(rm.MsgId)
//SerName := p.ctrlRedisClient.GetClientConnSerTraceid(msg.Mis.Agent)
//判断SerName 是否连接到本路由,如果未连接
ser := p.ctrser.Get(rm.SerName)
if ser.SerName != "" {
//判断服务是否连接了本路由,并且同时超时
if msg.PushStatus == config.MSG_WAITSUCCESS {
//写入待执行管道
p.ctrser.Write_Chan_Server(rm)
}
} else {
// 服务 没有上线到本路由,则重新丢入MQ
// 返回 rabbitmq
json, _ := json.Marshal(msg.Mis)
p.psend.Push(json)
fmt.Println("retry push")
}
} else {
//小于超时时间,则不检测是否成功! 直接丢入自身的 检测重试 管道
p.Write_chan_retryChan(rm)
}
/*
SerName := p.ctrlRedisClient.GetClientTraceid(mis.Agent)
//客户端标示
if SerName != "" {
var rm SerMsg
rm.SerName=SerName
rm.MsgId=mis.ID
//写入待执行的管道中
if len(p.retryChan) > 5000 {
p.ctrser.Write_Chan_Server(rm)
}else{
time.Sleep(1 * time.Second)
p.ctrser.Write_Chan_Server(rm)
}
}else{
//没有查询到上线的机器信息,
//需要查询再次上线的路由
//name:=fmt.Sprintf("Server.%s",mis.Agent);
SerName := p.ctrlRedisClient.GetClientConnSerTraceid(mis.Agent)
//判断SerName 是否连接到本路由,如果未连接
ser:=p.ctrser.Get(SerName)
if ser.SerName != "" {
//服务 连接到了本路由 则写入 重试管道
//则重新丢入重试管道,等待下次重试
p.Write_chan_retryChan(mis)
}else{
// 服务 没有上线到本路由,则重新丢入MQ
// 返回 rabbitmq
json,_:=json.Marshal(mis)
p.psend.Send(string(json))
}
}
*/
}
}
package msgcontrol
import (
"encoding/json"
"ficus/mission"
"ficus_router/config"
"ficus_router/redisctrl"
log "github.com/sirupsen/logrus"
"github.com/go-redis/redis"
)
//redis 写入 默认列表
type CtrlRedisMsg struct {
//redis
redisClient *redis.Client
chanRedisMsg chan Msg
}
func NewCtrlRedisMsg() *CtrlRedisMsg {
p := &CtrlRedisMsg{}
p.chanRedisMsg = make(chan Msg, 10000)
p.redisClient = redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "", // no password set
DB: 0, // use default DB
})
return p
}
func (p *CtrlRedisMsg) setMsgToRedis(msg *Msg) {
jsonstr, _ := json.Marshal(msg)
kname := config.OctopusRedisConfig.MsgStatusName
log.WithFields(log.Fields{"func": "setRedisMsg", "kname": kname, "msgId": msg.MsgId}).Info(string(jsonstr))
redisctrl.HSet(p.redisClient, kname, msg.MsgId, jsonstr)
}
func (p *CtrlRedisMsg) deleteMsgToRedis(msgId string) {
kname := config.OctopusRedisConfig.MsgStatusName
log.WithFields(log.Fields{"func": "deleteMsgToRedis", "kname": kname, "msgId": msgId}).Info()
redisctrl.HDel(p.redisClient, kname, msgId)
}
func (p *CtrlRedisMsg) setMissionToRedis(msg *mission.Message) {
jsonstr, _ := json.Marshal(msg)
kname := config.OctopusRedisConfig.MsgName
log.WithFields(log.Fields{"func": "setmissiontoredis", "kname": kname, "msgId": msg.ID}).Info(string(jsonstr))
redisctrl.HSet(p.redisClient, kname, msg.ID, jsonstr)
}
func (p *CtrlRedisMsg) deleteMissionToRedis(msgId string) {
kname := config.OctopusRedisConfig.MsgName
log.WithFields(log.Fields{"func": "deleteMissionToRedis", "kname": kname, "msgId": msgId}).Info()
redisctrl.HDel(p.redisClient, kname, msgId)
}
func (p *CtrlRedisMsg) GetRouterTraceid(k string) (string, error) {
kname := config.OctopusRedisConfig.RouterName
routerName, err := redisctrl.HGet(p.redisClient, kname, k)
return routerName, err
}
//客户端连接到的server
func (p *CtrlRedisMsg) GetClientConnSerTraceid(k string) (string, error) {
//ClientServer
kname := config.OctopusRedisConfig.ClientServer
return redisctrl.HGet(p.redisClient, kname, k)
}
func (p *CtrlRedisMsg) getMsg(k, field string) Msg {
jsonstr, _ := redisctrl.HGet(p.redisClient, k, field)
var msg Msg
json.Unmarshal([]byte(jsonstr), &msg)
return msg
}
func (p *CtrlRedisMsg) GetAllMsg(k string) map[string]string {
msgMap, _ := redisctrl.HGetAll(p.redisClient, k)
return msgMap
}
func (p *CtrlRedisMsg) GetMsg(k string) Msg {
kname := config.OctopusRedisConfig.MsgStatusName
log.WithFields(log.Fields{"func": "GetMsg"}).Info(kname)
msg := p.getMsg(kname, k)
return msg
}
func (p *CtrlRedisMsg) GetMessage(k string) mission.Message {
kname := config.OctopusRedisConfig.MsgName
log.WithFields(log.Fields{"func": "GetMessage"}).Info(kname)
jsonstr, _ := redisctrl.HGet(p.redisClient, kname, k)
var msg mission.Message
json.Unmarshal([]byte(jsonstr), &msg)
return msg
}
func (p *CtrlRedisMsg) DeleteMsgFromRedis(k string) {
go func() {
p.deleteMissionToRedis(k)
p.deleteMsgToRedis(k)
}()
}
func (p *CtrlRedisMsg) WriteMsgToRedis(msg *Msg, mis *mission.Message) {
go func(msg *Msg, mis *mission.Message) {
p.setMsgToRedis(msg)
p.setMissionToRedis(mis)
}(msg, mis)
}
package msgcontrol
import (
"encoding/json"
"ficus/mission"
"ficus_router/config"
"ficus_router/mqcontrol"
)
type MsgControl struct {
msgMap *MsgMap // 所有到达路由的消息
ctrlRedisClient *CtrlRedisMsg
ctrl_SendMq *mqcontrol.Producer
retryChan chan *Msg
}
func NewMsgControl(msgMap *MsgMap, ctrl_SendMq *mqcontrol.Producer, retryChan chan *Msg) *MsgControl {
p := &MsgControl{}
p.retryChan = retryChan
p.ctrl_SendMq = ctrl_SendMq
p.ctrlRedisClient = NewCtrlRedisMsg()
p.msgMap = msgMap
//启动超时检测
go p.TimeOutTimerMsg()
return p
}
func (p *MsgControl) TimeOutTimerMsg() {
}
func (p *MsgControl) Init() {
go p.LoadRedisMsg()
}
func (p *MsgControl) LoadRedisMsg() {
kname := config.OctopusRedisConfig.MsgName
tempMap := p.ctrlRedisClient.GetAllMsg(kname)
for _, v := range tempMap {
m := &Msg{}
json.Unmarshal([]byte(v), m)
p.msgMap.addMsg(m)
p.retryChan <- m
}
}
func (p *MsgControl) AddMsgAndMessage(msg *Msg, mis *mission.Message) {
p.ctrlRedisClient.WriteMsgToRedis(msg, mis)
p.addMsg(msg)
}
func (p *MsgControl) GetMsgStatusCode(msgId string) mission.MissionStatusCode {
msg := p.ctrlRedisClient.GetMessage(msgId)
return msg.State.Code
}
func (p *MsgControl) WriteMsgToMap(msg *Msg) {
p.msgMap.addMsg(msg)
}
func (p *MsgControl) getMsg(k string) *Msg {
return p.msgMap.getMsg(k)
//return p.msgMap[k]
}
func (p *MsgControl) SetMsg(msg *Msg) bool {
p.msgMap.SetMsg(msg)
return true
}
func (p *MsgControl) GetMsg(k string) Msg {
return p.ctrlRedisClient.GetMsg(k)
}
func (p *MsgControl) GetClientConnSerTraceid(k string) (string, error) {
return p.ctrlRedisClient.GetClientConnSerTraceid(k)
}
func (p *MsgControl) GetRouterTraceid(k string) (string, error) {
return p.ctrlRedisClient.GetRouterTraceid(k)
}
func (p *MsgControl) addMsg(msg *Msg) {
p.msgMap.addMsg(msg)
}
func (p *MsgControl) AddTryCount(k string) {
p.msgMap.AddTryCount(k)
}
func (p *MsgControl) UpdateStatus(k string, status config.MSG_STATUS) error {
return p.msgMap.UpdateStatus(k, status)
}
func (p *MsgControl) PushMsgToMq(msgId string) {
//消息推送回mq
var msg = p.ctrlRedisClient.GetMessage(msgId)
json, _ := json.Marshal(msg)
p.ctrl_SendMq.Push(json)
}
func (p *MsgControl) DeleteMsg(msgId string) {
p.msgMap.DeleteMsg(msgId)
p.ctrlRedisClient.DeleteMsgFromRedis(msgId)
}
package msgcontrol
import (
"container/list"
"errors"
"ficus_router/config"
"sync"
)
/*
* 所有的消息集合,这里属于维护集合
*/
type (
Msg struct {
lock *sync.RWMutex
SerName string // 消息对应server
MsgId string // 消息 id
Agent string // client id
Tries int32 // 当前重试次数
TriesMax int32 // 最大重试次数
TimeSecond int64 // 消息到达路由时间
Lastupdatetime int64 // 上次更新时间
}
MsgMap struct {
Timeout int64 // 超时时间
Lock *sync.RWMutex
msgMap map[string]*Msg // 所有到达路由的消息
}
MsgList struct {
rwlock *sync.RWMutex
Size int
msgList *list.List
}
)
func (msg *Msg) TriesAddOne() bool {
msg.lock.Lock()
msg.Tries++
msg.lock.Unlock()
return msg.Tries == msg.TriesMax
}
func NewMsgmap() *MsgMap {
p := &MsgMap{}
p.Timeout = 10
p.Lock = new(sync.RWMutex)
p.msgMap = make(map[string]*Msg)
return p
}
func (p *MsgMap) WriteMsgToMap(msg *Msg) {
p.addMsg(msg)
}
func (p *MsgMap) getMsg(k string) *Msg {
p.Lock.RLock()
defer p.Lock.RUnlock()
return p.msgMap[k]
}
func (p *MsgMap) SetMsg(msg *Msg) bool {
p.Lock.Lock()
defer p.Lock.Unlock()
p.msgMap[msg.MsgId] = msg
return true
}
func (p *MsgMap) addMsg(msg *Msg) {
p.Lock.Lock()
defer p.Lock.Unlock()
p.msgMap[msg.MsgId] = msg
}
func (p *MsgMap) AddTryCount(k string) {
p.Lock.Lock()
defer p.Lock.Unlock()
if v, ok := p.msgMap[k]; ok {
v.TriesAddOne()
p.msgMap[k] = v
}
}
func (p *MsgMap) UpdateStatus(k string, status config.MSG_STATUS) error {
p.Lock.Lock()
defer p.Lock.Unlock()
if v, ok := p.msgMap[k]; ok {
//存在,更新一下狀態
p.msgMap[k] = v
} else {
return errors.New("map values is nil ,key:" + k)
}
return nil
}
func (p *MsgMap) DeleteMsg(msgId string) {
p.Lock.Lock()
defer p.Lock.Unlock()
delete(p.msgMap, msgId)
}
func NewMsgList() *MsgList {
msglist := &MsgList{}
msglist.rwlock = new(sync.RWMutex)
msglist.msgList = list.New()
return msglist
}
func (l *MsgList) PushBack(msg *Msg) {
l.rwlock.Lock()
l.msgList.PushBack(msg)
l.Size++
l.rwlock.Unlock()
}
func (l *MsgList) PushFront(msg *Msg) {
l.rwlock.Lock()
l.msgList.PushFront(msg)
l.Size++
l.rwlock.Unlock()
}
func (l *MsgList) Remove(msg *Msg) {
l.rwlock.Lock()
l.msgList.PushFront(msg)
l.Size--
l.rwlock.Unlock()
}
func (l *MsgList) TakeFront() *Msg {
l.rwlock.Lock()
e := l.msgList.Front()
l.rwlock.Unlock()
msg := e.Value.(*Msg)
l.msgList.Remove(e)
l.Size--
return msg
}
package model
package msgcontrol
import (
"encoding/json"
"ficus_router/config"
"ficus_router/redisctrl"
"fmt"
"log"
"strconv"
......@@ -19,14 +20,14 @@ import (
type RRouteStatus struct {
ctrlRedisRouteStatus *redis.Client
hashname string "hash name"
hashname string // "hash name"
timeout int64
ctrl_msgMap *CtrlMsgMap
msgControl *MsgControl
}
func NewRRouteStatus(ctrl_msgMap *CtrlMsgMap, timeout int64) *RRouteStatus {
func NewRRouteStatus(msgControl *MsgControl, timeout int64) *RRouteStatus {
p := &RRouteStatus{}
p.ctrl_msgMap = ctrl_msgMap
p.msgControl = msgControl
p.ctrlRedisRouteStatus = redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "", // no password set
......@@ -34,7 +35,7 @@ func NewRRouteStatus(ctrl_msgMap *CtrlMsgMap, timeout int64) *RRouteStatus {
})
p.timeout = timeout
//p.hashname=fmt.Sprint("%s.%s.%s",Config.OctopusConfig.Project,Config.OctopusConfig.Traceid,"RouteStatus")
p.hashname = config.OctopusRedisConfig.RotuteStatuslistName
p.hashname = config.OctopusRedisConfig.RotuteStatusName
log.Println("路由上线 Redis Hash 名称:", p.hashname)
go p.routeWorkStatus()
return p
......@@ -71,16 +72,16 @@ func (p *RRouteStatus) check() {
if ic > (p.timeout * 3) {
//这个客户端下线了,需要加载他所有的 未执行完成的 msg ,添加到自己的列表中
msgkeyMap, _ := p.ctrlRedisRouteStatus.HGetAll(config.OctopusRedisConfig.MsgStatusListName).Result()
msgkeyMap, _ := p.ctrlRedisRouteStatus.HGetAll(config.OctopusRedisConfig.MsgStatusName).Result()
for kk, _ := range msgkeyMap {
key := fmt.Sprintf("%s.%s", config.OctopusRedisConfig.MsgName_prefix, kk)
key := fmt.Sprintf("%s.%s", config.OctopusRedisConfig.MsgName, kk)
jsonstr, err := p.ctrlRedisRouteStatus.Get(key).Result()
if err == nil {
var msg config.T_CMsg
var msg Msg
json.Unmarshal([]byte(jsonstr), &msg)
p.ctrl_msgMap.Write_chan_msgMap(msg.Mis)
p.msgControl.WriteMsgToMap(&msg)
}
p.ctrlRedisRouteStatus.HDel(config.OctopusRedisConfig.MsgStatusListName)
p.ctrlRedisRouteStatus.HDel(config.OctopusRedisConfig.MsgStatusName)
}
//直接跳出,完成后则直接跳出,就不继续遍历了,因为获取也需要很多时间
break
......@@ -90,16 +91,16 @@ func (p *RRouteStatus) check() {
}
func (p *RRouteStatus) Loadhistory() {
msgkeyMap, _ := p.ctrlRedisRouteStatus.HGetAll(config.OctopusRedisConfig.MsgStatusListName).Result()
msgkeyMap, _ := p.ctrlRedisRouteStatus.HGetAll(config.OctopusRedisConfig.MsgStatusName).Result()
for kk, _ := range msgkeyMap {
key := fmt.Sprintf("%s.%s", config.OctopusRedisConfig.MsgName_prefix, kk)
key := fmt.Sprintf("%s.%s", config.OctopusRedisConfig.MsgName, kk)
jsonstr, err := p.ctrlRedisRouteStatus.Get(key).Result()
if err == nil {
var msg config.T_CMsg
var msg Msg
json.Unmarshal([]byte(jsonstr), &msg)
p.ctrl_msgMap.Write_chan_msgMap(msg.Mis)
p.msgControl.WriteMsgToMap(&msg)
}
p.ctrlRedisRouteStatus.HDel(config.OctopusRedisConfig.MsgStatusListName)
p.ctrlRedisRouteStatus.HDel(config.OctopusRedisConfig.MsgStatusName)
}
}
......@@ -119,3 +120,20 @@ func (p *RRouteStatus) get(id string) int64 {
}
return d
}
func (p *RRouteStatus) GetClientTraceid(k string) string {
kname := config.OctopusRedisConfig.ClientName
json, _ := redisctrl.HGet(p.ctrlRedisRouteStatus, kname, k)
return json
}
func (p *RRouteStatus) GetClientConnSerTraceid(k string) (string, error) {
kname := config.OctopusRedisConfig.ClientServer
return redisctrl.HGet(p.ctrlRedisRouteStatus, kname, k)
}
func (p *RRouteStatus) GetRouterTraceid(k string) (string, error) {
kname := config.OctopusRedisConfig.ClientName
json, err := redisctrl.HGet(p.ctrlRedisRouteStatus, kname, k)
return json, err
}
package msgcontrol
import (
"ficus/mission"
"ficus_router/config"
"time"
)
/*
* 重试
* 消息失败后 丢入重试管道
* 重试次数,最小间隔 10 秒才能丢入推送客户端
* 重试 一定次数认为重试失败
*/
type CtrlRetry struct {
msgControl *MsgControl
Timeout int64 // 超时时间
retryChan chan *Msg // 重试管道
sendChan chan *Msg //消息处理管道
msgList *MsgList // 消息缓存
}
func NewCtrlRetry(msgControl *MsgControl, retryChan chan *Msg, sendChan chan *Msg, Timeout int64) *CtrlRetry {
p := &CtrlRetry{}
p.msgControl = msgControl
p.Timeout = Timeout
p.retryChan = retryChan
p.sendChan = sendChan
p.msgList = NewMsgList()
return p
}
func (p *CtrlRetry) StartRetryWork() {
p.ReadMsgRetryChan()
p.HandMsgList()
}
func (p *CtrlRetry) HandMsgList() {
go func() {
for {
time.Sleep(time.Duration(p.Timeout) * time.Second)
for i := 0; i < p.msgList.Size; i++ {
msg := p.msgList.TakeFront()
select {
case p.retryChan <- msg:
break
default:
{
p.msgList.PushBack(msg)
}
}
}
}
}()
}
func (p *CtrlRetry) SendRetryMsg(msg *Msg) {
p.msgList.PushFront(msg)
}
func (p *CtrlRetry) WriteMsgToRetryChan(msg *Msg) {
p.retryChan <- msg
}
func (p *CtrlRetry) WriteMsgToSendChan(msg *Msg) {
p.sendChan <- msg
}
func (p *CtrlRetry) ReadMsgRetryChan() {
go func() {
for {
rm := <-p.retryChan
ic := time.Now().Unix() - rm.TimeSecond
if ic > p.Timeout {
//大于超时时间
//1. 获取当前的状态
//2. 未完成,写入管道,完成 => 直接丢弃
msg := p.msgControl.GetMsg(rm.MsgId)
if msg.MsgId == "" {
continue
}
if msg.TriesAddOne() {
p.msgControl.DeleteMsg(rm.MsgId)
continue
} else {
p.msgControl.SetMsg(&msg)
}
SerName, _ := p.msgControl.GetClientConnSerTraceid(msg.Agent)
*rm = msg
routerName, _ := p.msgControl.GetRouterTraceid(SerName)
//判断路由是否切换
if routerName == config.GetRouterName() {
//判断服务是否连接了本路由,并且同时超时
if p.msgControl.GetMsgStatusCode(rm.MsgId) == mission.MissionStatusCode_DISPATCHED {
//写入待执行管道
p.sendChan <- rm
}
//if p.msgControl.Ge
} else {
// 服务 没有上线到本路由,则重新丢入MQ
p.msgControl.DeleteMsg(rm.MsgId)
p.msgControl.PushMsgToMq(rm.MsgId)
}
} else {
//小于超时时间,则不检测是否成功! 直接丢入自身的 检测重试 管道
p.SendRetryMsg(rm)
}
}
}()
}
package model
package msgcontrol
import (
"ficus_router/config"
"ficus_router/redisctrl"
"log"
"net"
"sync"
"time"
"unsafe"
log "github.com/sirupsen/logrus"
)
type SliceMock struct {
......@@ -24,48 +24,35 @@ type Server struct {
Time int64
}
type SerMsg struct {
SerName string
//Mis ficus.Mission
//Msg Config.T_CMsg
MsgId string
Agent string
TimeSecond int64
}
type Ctrl_Srvermap struct {
SrverMap map[string]Server
Lock sync.RWMutex
chan_SerMsg chan SerMsg //"待发送的消息"
ctrl_Redismsg *redisctrl.CtrlRedisMsg
SrverMap map[string]*Server
Lock sync.RWMutex
sendChan chan *Msg
retryChan chan *Msg
}
func NewServermap() *Ctrl_Srvermap {
func NewServermap(sendChan chan *Msg, retryChan chan *Msg) *Ctrl_Srvermap {
p := &Ctrl_Srvermap{}
//p.ctr_msgMap=ctr_msgMap
p.SrverMap = make(map[string]Server)
p.chan_SerMsg = make(chan SerMsg)
p.ctrl_Redismsg = redisctrl.NewCtrlRedisMsg()
go p.read_Chan_Server()
p.sendChan = sendChan
p.retryChan = retryChan
p.SrverMap = make(map[string]*Server)
go p.writeChanMsgToServer()
return p
}
func (p *Ctrl_Srvermap) AddClient(s Server) {
func (p *Ctrl_Srvermap) AddClient(s *Server) {
p.Lock.Lock()
defer p.Lock.Unlock()
log.Println("Add map SerName", s.SerName)
log.Println("Add map SerName", s.SerName)
s.Time = time.Now().Unix()
p.SrverMap[s.SerName] = s
}
func (p *Ctrl_Srvermap) Get(k string) Server {
func (p *Ctrl_Srvermap) GetServer(k string) (*Server, bool) {
p.Lock.RLock()
defer p.Lock.RUnlock()
rs := p.SrverMap[k]
if _, ok := p.SrverMap[k]; ok {
return rs
}
return rs
rs, ok := p.SrverMap[k]
return rs, ok
}
//判断服务是否存在
......@@ -77,28 +64,23 @@ func (p *Ctrl_Srvermap) IsServer(k string) bool {
}
return false
}
func (p *Ctrl_Srvermap) Write_Chan_Server(rm SerMsg) {
go func() {
p.chan_SerMsg <- rm
}()
}
func (p *Ctrl_Srvermap) read_Chan_Server() {
func (p *Ctrl_Srvermap) writeChanMsgToServer() {
for {
rm := <-p.chan_SerMsg
msg := <-p.sendChan
p.WriteMsgToServer(msg)
}
}
func (p *Ctrl_Srvermap) WriteMsgToServer(msg *Msg) {
go func() {
//获取上线路由
SerName, err := p.ctrl_Redismsg.GetClientConnSerTraceid(rm.Agent)
if err != nil {
log.Println("未找到对应server", err)
continue
}
rm.Agent = SerName
tempSer := p.Get(SerName)
if SerName != "" && tempSer.SerName != "" {
if tempSer, ok := p.GetServer(msg.SerName); ok {
var comm config.SerCommand
comm.Command = config.COMMAND_TASK
copy(comm.Data[:len(rm.MsgId)], rm.MsgId)
comm.Length = uint32(len(rm.MsgId))
copy(comm.Data[:len(msg.MsgId)], msg.MsgId)
comm.Length = uint32(len(msg.MsgId))
log.Println("-1-")
Len := unsafe.Sizeof(comm)
testBytes := &SliceMock{
addr: uintptr(unsafe.Pointer(&comm)),
......@@ -108,26 +90,17 @@ func (p *Ctrl_Srvermap) read_Chan_Server() {
var data = *(*[]byte)(unsafe.Pointer(testBytes))
_, err := tempSer.Conn.Write(data)
if err == nil {
//发送成功
log.Println(rm.MsgId + "start send msg success………………")
log.Println(msg.MsgId + "start send msg success………………")
} else {
//发送失败
p.failOnError(err, "Send SerMsg Error")
log.Println(rm.MsgId + "start send msg fail")
log.Println(msg.MsgId, "start send msg fail", err)
p.retryChan <- msg
}
} else {
//直接丢弃
//发送到重试管道
p.retryChan <- msg
}
}
}
func (p *Ctrl_Srvermap) failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s:%s", msg, err)
panic(msg + err.Error())
}
}()
}
{
"servercfg":{
"traceid":"ser1",
"project":"clientserver"
"traceid":"01",
"project":"router"
},
"prefix":{
"msgname":"",
......
package redisctrl
import (
"encoding/json"
"ficus_router/config"
log "github.com/sirupsen/logrus"
"github.com/go-redis/redis"
)
//redis 写入 默认列表
type CtrlRedisMsg struct {
//redis
ctrlRedisMsg *redis.Client
chanRedisMsg chan config.T_CMsg
}
func NewCtrlRedisMsg() *CtrlRedisMsg {
p := &CtrlRedisMsg{}
p.chanRedisMsg = make(chan config.T_CMsg, 10000)
p.ctrlRedisMsg = redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "", // no password set
DB: 0, // use default DB
})
go p.Read_Chan_RedisMsg()
return p
}
func (p *CtrlRedisMsg) setResisMsg(msg config.T_CMsg) {
jsonstr, _ := json.Marshal(msg)
kname := config.OctopusRedisConfig.MsgName_prefix
log.WithFields(log.Fields{"func": "setResisMsg", "kname": kname}).Info(string(jsonstr))
p.hset(kname, msg.Mis.ID, jsonstr)
}
func (p *CtrlRedisMsg) getMsg(k, field string) config.T_CMsg {
jsonstr, _ := p.hget(k, field)
var msg config.T_CMsg
json.Unmarshal([]byte(jsonstr), &msg)
return msg
}
func (p *CtrlRedisMsg) GetClientTraceid(k string) string {
kname := config.OctopusRedisConfig.ClientName_prefix
json, _ := p.hget(kname, k)
return json
}
//客户端连接到的server
func (p *CtrlRedisMsg) GetClientConnSerTraceid(k string) (string, error) {
//ClientServer
kname := config.OctopusRedisConfig.ClientServer_prefix
return p.hget(kname, k)
}
func (p *CtrlRedisMsg) GetMsg(k string) config.T_CMsg {
kname := config.OctopusRedisConfig.MsgName_prefix
log.WithFields(log.Fields{"func": "GetMsg"}).Info(kname)
msg := p.getMsg(kname, k)
return msg
}
func (p *CtrlRedisMsg) Write_Chan_RedisMsg(msg config.T_CMsg) {
go func() {
p.chanRedisMsg <- msg
}()
}
func (p *CtrlRedisMsg) Read_Chan_RedisMsg() {
for {
var msg config.T_CMsg
msg = <-p.chanRedisMsg
go func() {
p.setResisMsg(msg)
}()
}
}
package redisctrl
import (
"ficus_router/config"
"log"
"github.com/go-redis/redis"
)
//redis - hash 集合
//未成功完成的状态列表
//hash集合的名称为 Project.Traceid.MsgStatsu
type RMsgStatus struct {
ctrlRedisMsgStatus *redis.Client
hashname string // "hash name"
}
func NewRMsgStatus() *RMsgStatus {
p := &RMsgStatus{}
p.ctrlRedisMsgStatus = redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "", // no password set
DB: 0, // use default DB
})
//p.hashname=fmt.Sprint("%s.%s.%s",Config.OctopusConfig.Project,Config.OctopusConfig.Traceid,"MsgStatsu")
p.hashname = config.OctopusRedisConfig.MsgStatusListName
log.Println("MsgStatus Redis Hash 上线 名称:", p.hashname)
return p
}
func (p *RMsgStatus) StatusWork(msgid string, status config.MSG_STATUS) bool {
if status == config.MSG_WAITSUCCESS {
//写入 -- redis
p.write(msgid)
} else if status == config.MSG_SUCCESSED {
//删除 - redis
p.del(msgid)
} else if status == config.MSG_FAILED {
//不做任何操作
}
return true
}
func (p *RMsgStatus) Del(msgid string) bool {
p.ctrlRedisMsgStatus.HDel(p.hashname, msgid)
return true
}
func (p *RMsgStatus) write(msgid string) bool {
if p.ctrlRedisMsgStatus.HExists(p.hashname, msgid).Val() == false {
p.ctrlRedisMsgStatus.HSet(p.hashname, msgid, config.MSG_WAITSUCCESS)
} else {
//存在直接返回
return true
}
return true
}
func (p *RMsgStatus) del(msgid string) bool {
p.ctrlRedisMsgStatus.HDel(p.hashname, msgid)
return true
}
func (p *RMsgStatus) Get(msgid string) (config.MSG_STATUS, error) {
s, err := p.ctrlRedisMsgStatus.HGet(p.hashname, msgid).Int()
if err == nil {
if s == 0 {
return config.MSG_WAITSUCCESS, nil
} else if s == 1 {
return config.MSG_SUCCESSED, nil
} else if s == 3 {
return config.MSG_FAILED, nil
} else {
return config.MSG_WAITSUCCESS, err
}
}
return config.MSG_WAITSUCCESS, err
}
......@@ -2,25 +2,31 @@ package redisctrl
import (
"time"
"github.com/go-redis/redis"
)
func (p *CtrlRedisMsg) hset(key, field string, v interface{}) error {
return p.ctrlRedisMsg.HSet(key, field, v).Err()
func HSet(r *redis.Client, key, field string, v interface{}) error {
return r.HSet(key, field, v).Err()
}
func (p *CtrlRedisMsg) hget(key, field string) (string, error) {
v, err := p.ctrlRedisMsg.HGet(key, field).Result()
func HGet(r *redis.Client, key, field string) (string, error) {
v, err := r.HGet(key, field).Result()
return v, err
}
func (p *CtrlRedisMsg) hgetAll(key string) (map[string]string, error) {
return p.ctrlRedisMsg.HGetAll(key).Result()
func HGetAll(r *redis.Client, key string) (map[string]string, error) {
return r.HGetAll(key).Result()
}
func HDel(r *redis.Client, k, field string) error {
return r.HDel(k, field).Err()
}
func (p *CtrlRedisMsg) set(k, v string, t int) error {
return p.ctrlRedisMsg.Set(k, v, time.Duration(t)*time.Second).Err()
func Set(r *redis.Client, k, v string, t int) error {
return r.Set(k, v, time.Duration(t)*time.Second).Err()
}
func (p *CtrlRedisMsg) get(k string) (string, error) {
return p.ctrlRedisMsg.Get(k).Result()
func Get(r *redis.Client, k string) (string, error) {
return r.Get(k).Result()
}
package rpcid
import (
"ficus_router/config"
"fmt"
)
func AppendRpcid(rpcid string) string {
var tempstr string
if len(rpcid) == 0 {
tempstr = config.OctopusConfig.Traceid
} else {
tempstr = fmt.Sprintf("%s.%s", rpcid, config.OctopusConfig.Traceid)
}
rpcid = tempstr
return rpcid
}
......@@ -4,7 +4,7 @@ import (
"encoding/json"
"ficus/mission"
"ficus_router/config"
"ficus_router/model"
"ficus_router/msgcontrol"
"net"
"time"
"unsafe"
......@@ -12,9 +12,8 @@ import (
log "github.com/sirupsen/logrus"
)
func StartClientServer(ctrser *model.Ctrl_Srvermap) {
func StartClientServer(ctrser *msgcontrol.Ctrl_Srvermap, retryWork *msgcontrol.CtrlRetry) {
//建立socket,监听端口 第一步:绑定端口
//netListen, err := net.Listen("tcp", "localhost:1024")
netListen, err := net.Listen("tcp", ":38000")
log.Println(err, "tcp")
//defer延迟关闭改资源,以免引起内存泄漏
......@@ -27,17 +26,16 @@ func StartClientServer(ctrser *model.Ctrl_Srvermap) {
//log.Println("error:",err)
continue //出错进行下一次循环
}
//defer conn.Close()
//协成写入连接
go RecvClient(conn, ctrser)
}
retryWork.StartRetryWork()
}
func byteString(p []byte) string {
s := p[:]
return string(s)
}
func RecvClient(conn net.Conn, ctrser *model.Ctrl_Srvermap) {
func RecvClient(conn net.Conn, ctrser *msgcontrol.Ctrl_Srvermap) {
//等待反馈
buffer := make([]byte, config.MQ_MAX_RECV_LEN)
for { //无限循环
......@@ -52,26 +50,26 @@ func RecvClient(conn net.Conn, ctrser *model.Ctrl_Srvermap) {
if pServer.Command == config.COMMAND_LOGIN {
//添加 登录过来的 服务
//sername := *(*string)(unsafe.Pointer(&pServer.Data))
Sername := string(pServer.Data[:pServer.Length])
log.Println(pServer.Command)
log.Println("-----recv_ser:", Sername, "len:", len(Sername))
if pServer.SerType == config.SERTYPE_CONSUME {
//注册的消费者,添加到消费服务列表
var ser model.Server
var ser msgcontrol.Server
ser.SerName = Sername
ser.Conn = conn
ser.Online = true
ser.Time = time.Now().Unix()
ctrser.AddClient(ser)
ctrser.AddClient(&ser)
} else if pServer.SerType == config.SERTYPE_FEEDBACK {
//注册的反馈服务,则添加入反馈者列表
}
log.Println("------")
//返回连接的路由消息
var SendSend config.SerCommand
SendSend.Command = config.COMMAND_LOGIN
temp := "route.1"
temp := config.GetRouterName()
copy(SendSend.Data[:], temp)
SendSend.Length = uint32(len(SendSend.Data))
......
package work
import (
"encoding/json"
"ficus/mission"
"ficus_router/mqcontrol"
"ficus_router/msgcontrol"
"time"
log "github.com/sirupsen/logrus"
)
type SliceMock struct {
addr uintptr
len int
cap int
}
func callback(d mqcontrol.MSG, msgControl *msgcontrol.MsgControl, serverMap *msgcontrol.Ctrl_Srvermap) {
go func() {
body := d.Body
var mis mission.Message
err := json.Unmarshal(body, &mis)
if err != nil {
log.WithFields(log.Fields{
"func": "callback",
"消息内容": string(body),
}).Error("消息解析失败", err)
} else {
log.Println("112")
var msg msgcontrol.Msg
msg.MsgId = mis.ID
msg.Agent = mis.Agent
msg.Tries = 0
msg.TriesMax = mis.TriesMax
msg.TimeSecond = time.Now().Unix()
msg.Lastupdatetime = msg.TimeSecond
log.Println("113")
serName, _ := msgControl.GetClientConnSerTraceid(msg.Agent)
msg.SerName = serName
log.Println("114")
msgControl.AddMsgAndMessage(&msg, &mis)
log.Println("115")
//2.直接写入 执行管道
//写入待发送到ser管道
serverMap.WriteMsgToServer(&msg)
log.Println("11111")
}
}()
}
func StartRecvMqserver(msgControl *msgcontrol.MsgControl, serverMap *msgcontrol.Ctrl_Srvermap) {
//redis 客户端
//启动读写实例
if err := mqcontrol.Init("./mq.json"); err != nil {
panic("mq 初始化失败" + err.Error())
}
{ // 一个路由中 可以有多个 任务线程,上线一个路由线程,现在暂且定为 每个 路由 只上线 1个
//routeMsg := msgcontrol.NewRRouteStatus(msgControl, 30)
//重新启动后 --- 把无完成的任务添加在自己的map中
//routeMsg.Loadhistory()
}
//消费者
precv := mqcontrol.NewConsumer("myPoper", func(d mqcontrol.MSG) {
callback(d, msgControl, serverMap)
})
if err := precv.Pop(); err != nil {
panic("Pop 监听失败")
}
}
package work
import (
"encoding/json"
"ficus/mission"
"ficus_router/config"
"ficus_router/model"
"ficus_router/mqcontrol"
"ficus_router/redisctrl"
"ficus_router/rpcid"
"time"
log "github.com/sirupsen/logrus"
)
type SliceMock struct {
addr uintptr
len int
cap int
}
func callback(d mqcontrol.MSG, ctrser *model.Ctrl_Srvermap, redisctrl *redisctrl.CtrlRedisMsg, ctrl_mapMsg *model.CtrlMsgMap) {
log.Info("callback")
body := d.Body
var mis mission.Message
err := json.Unmarshal(body, &mis)
log.Info(string(body))
failOnError(err, "recv:json.Unmarshal - ficus.Mission")
if err == nil {
//获取 客户端 连接到的 服务 Traceid
//SerName := Ctrl_Redismsg.GetClientConnSerTraceid(mis.Agent)
{ //1.写入 redis -> 接受到的所有消息都需要写入 redis 缓存消息
var msg config.T_CMsg
msg.RouteName = ""
msg.Mis = mis
msg.RouteName = config.OctopusConfig.Traceid //经过的路由器
msg.PushStatus = config.MSG_WAITSUCCESS
msg.Lastupdatetime = time.Now().Unix()
msg.Rpcid = rpcid.AppendRpcid("")
redisctrl.Write_Chan_RedisMsg(msg)
}
{ //2.直接写入 执行管道
//fmt.Println("SerName:",SerName);
var rm model.SerMsg
rm.SerName = ""
rm.MsgId = mis.ID
rm.Agent = mis.Agent
rm.TimeSecond = time.Now().Unix()
//fmt.Println("TimeSecond:",rm.TimeSecond)
//写入待发送到ser管道
ctrser.Write_Chan_Server(rm)
//写消息到map中
ctrl_mapMsg.Write_chan_msgMap(mis)
}
} else {
log.Println(string(body))
}
}
func Start(ctrser *model.Ctrl_Srvermap) {
//redis 客户端
Ctrl_Redismsg := redisctrl.NewCtrlRedisMsg()
//启动读写实例
if err := mqcontrol.Init("./mq.json"); err != nil {
panic("mq 初始化失败" + err.Error())
}
//生产者
psend := mqcontrol.NewProducer("myPusher", "myQueue")
//消息的map表
ctrl_Retry := model.NewCtrlRetry(psend, ctrser, 10)
ctrl_mapMsg := model.NewMsgmap(psend, ctrser, ctrl_Retry)
{ // 一个路由中 可以有多个 任务线程,上线一个路由线程,现在暂且定为 每个 路由 只上线 1个
routeMsg := model.NewRRouteStatus(ctrl_mapMsg, 30)
//重新启动后 --- 把无完成的任务添加在自己的map中
routeMsg.Loadhistory()
}
//消费者
precv := mqcontrol.NewConsumer("myPoper", func(d mqcontrol.MSG) {
callback(d, ctrser, Ctrl_Redismsg, ctrl_mapMsg)
})
if err := precv.Pop(); err != nil {
panic("Pop 监听失败")
}
}
func failOnError(err error, msg string) {
if err != nil {
log.Error("%s:%s", msg, err)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment