Commit aaecc3ee by yunpeng.song

消息锁改为值引用类型,修改重试工作流初始化方式

parent 588d34fc
...@@ -15,13 +15,10 @@ import ( ...@@ -15,13 +15,10 @@ import (
type CtrlRedisMsg struct { type CtrlRedisMsg struct {
//redis //redis
redisClient *redis.Client redisClient *redis.Client
chanRedisMsg chan Msg
} }
func NewCtrlRedisMsg() *CtrlRedisMsg { func NewCtrlRedisMsg() *CtrlRedisMsg {
p := &CtrlRedisMsg{} p := &CtrlRedisMsg{}
p.chanRedisMsg = make(chan Msg, 10000)
p.redisClient = redis.NewClient(&redis.Options{ p.redisClient = redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379", Addr: "127.0.0.1:6379",
Password: "", // no password set Password: "", // no password set
...@@ -68,11 +65,11 @@ func (p *CtrlRedisMsg) GetClientConnSerTraceid(k string) (string, error) { ...@@ -68,11 +65,11 @@ func (p *CtrlRedisMsg) GetClientConnSerTraceid(k string) (string, error) {
return redisctrl.HGet(p.redisClient, kname, k) return redisctrl.HGet(p.redisClient, kname, k)
} }
func (p *CtrlRedisMsg) getMsg(k, field string) Msg { func (p *CtrlRedisMsg) getMsg(k, field string) *Msg {
jsonstr, _ := redisctrl.HGet(p.redisClient, k, field) jsonstr, _ := redisctrl.HGet(p.redisClient, k, field)
var msg Msg var msg Msg
json.Unmarshal([]byte(jsonstr), &msg) json.Unmarshal([]byte(jsonstr), &msg)
return msg return &msg
} }
func (p *CtrlRedisMsg) GetAllMsg(k string) map[string]string { func (p *CtrlRedisMsg) GetAllMsg(k string) map[string]string {
...@@ -81,7 +78,7 @@ func (p *CtrlRedisMsg) GetAllMsg(k string) map[string]string { ...@@ -81,7 +78,7 @@ func (p *CtrlRedisMsg) GetAllMsg(k string) map[string]string {
} }
func (p *CtrlRedisMsg) GetMsg(k string) Msg { func (p *CtrlRedisMsg) GetMsg(k string) *Msg {
kname := config.OctopusRedisConfig.MsgStatusName kname := config.OctopusRedisConfig.MsgStatusName
log.WithFields(log.Fields{"func": "GetMsg"}).Info(kname) log.WithFields(log.Fields{"func": "GetMsg"}).Info(kname)
msg := p.getMsg(kname, k) msg := p.getMsg(kname, k)
......
...@@ -7,6 +7,8 @@ import ( ...@@ -7,6 +7,8 @@ import (
"ficus_router/mqcontrol" "ficus_router/mqcontrol"
"ficus_router/timer" "ficus_router/timer"
"time" "time"
log "github.com/sirupsen/logrus"
) )
type MsgControl struct { type MsgControl struct {
...@@ -23,7 +25,7 @@ func NewMsgControl(msgMap *MsgMap, ctrl_SendMq *mqcontrol.Producer, retryChan ch ...@@ -23,7 +25,7 @@ func NewMsgControl(msgMap *MsgMap, ctrl_SendMq *mqcontrol.Producer, retryChan ch
p.ctrl_SendMq = ctrl_SendMq p.ctrl_SendMq = ctrl_SendMq
p.ctrlRedisClient = NewCtrlRedisMsg() p.ctrlRedisClient = NewCtrlRedisMsg()
p.msgMap = msgMap p.msgMap = msgMap
//启动超时检测 p.Init()
return p return p
} }
...@@ -32,11 +34,12 @@ func (p *MsgControl) Init() { ...@@ -32,11 +34,12 @@ func (p *MsgControl) Init() {
} }
func (p *MsgControl) LoadRedisMsg() { func (p *MsgControl) LoadRedisMsg() {
kname := config.OctopusRedisConfig.MsgName kname := config.OctopusRedisConfig.MsgStatusName
tempMap := p.ctrlRedisClient.GetAllMsg(kname) tempMap := p.ctrlRedisClient.GetAllMsg(kname)
for _, v := range tempMap { for _, v := range tempMap {
m := &Msg{} m := &Msg{}
json.Unmarshal([]byte(v), m) json.Unmarshal([]byte(v), m)
log.Println("load msg ", m.MsgId)
p.msgMap.addMsg(m) p.msgMap.addMsg(m)
p.retryChan <- m p.retryChan <- m
} }
...@@ -77,7 +80,7 @@ func (p *MsgControl) SetMsg(msg *Msg) bool { ...@@ -77,7 +80,7 @@ func (p *MsgControl) SetMsg(msg *Msg) bool {
return true return true
} }
func (p *MsgControl) GetMsg(k string) Msg { func (p *MsgControl) GetMsg(k string) *Msg {
return p.ctrlRedisClient.GetMsg(k) return p.ctrlRedisClient.GetMsg(k)
} }
......
...@@ -14,7 +14,7 @@ import ( ...@@ -14,7 +14,7 @@ import (
type ( type (
Msg struct { Msg struct {
lock *sync.RWMutex lock sync.RWMutex
SerName string // 消息对应server SerName string // 消息对应server
MsgId string // 消息 id MsgId string // 消息 id
Agent string // client id Agent string // client id
...@@ -62,15 +62,15 @@ func (p *MsgMap) getMsg(k string) *Msg { ...@@ -62,15 +62,15 @@ func (p *MsgMap) getMsg(k string) *Msg {
} }
func (p *MsgMap) SetMsg(msg *Msg) bool { func (p *MsgMap) SetMsg(msg *Msg) bool {
p.Lock.Lock() p.Lock.Lock()
defer p.Lock.Unlock()
p.msgMap[msg.MsgId] = msg p.msgMap[msg.MsgId] = msg
p.Lock.Unlock()
return true return true
} }
func (p *MsgMap) addMsg(msg *Msg) { func (p *MsgMap) addMsg(msg *Msg) {
p.Lock.Lock() p.Lock.Lock()
defer p.Lock.Unlock()
p.msgMap[msg.MsgId] = msg p.msgMap[msg.MsgId] = msg
p.Lock.Unlock()
} }
func (p *MsgMap) AddTryCount(k string) { func (p *MsgMap) AddTryCount(k string) {
......
...@@ -5,6 +5,8 @@ import ( ...@@ -5,6 +5,8 @@ import (
"ficus_router/config" "ficus_router/config"
"time" "time"
log "github.com/sirupsen/logrus"
) )
/* /*
...@@ -32,6 +34,7 @@ func NewCtrlRetry(msgControl *MsgControl, retryChan chan *Msg, sendChan chan *Ms ...@@ -32,6 +34,7 @@ func NewCtrlRetry(msgControl *MsgControl, retryChan chan *Msg, sendChan chan *Ms
return p return p
} }
func (p *CtrlRetry) StartRetryWork() { func (p *CtrlRetry) StartRetryWork() {
log.Println("StartRetryWork")
p.ReadMsgRetryChan() p.ReadMsgRetryChan()
p.HandMsgList() p.HandMsgList()
} }
...@@ -85,10 +88,9 @@ func (p *CtrlRetry) ReadMsgRetryChan() { ...@@ -85,10 +88,9 @@ func (p *CtrlRetry) ReadMsgRetryChan() {
p.msgControl.DeleteMsg(rm.MsgId) p.msgControl.DeleteMsg(rm.MsgId)
continue continue
} else { } else {
p.msgControl.SetMsg(&msg) p.msgControl.SetMsg(msg)
} }
SerName, _ := p.msgControl.GetClientConnSerTraceid(msg.Agent) SerName, _ := p.msgControl.GetClientConnSerTraceid(msg.Agent)
*rm = msg
routerName, _ := p.msgControl.GetRouterTraceid(SerName) routerName, _ := p.msgControl.GetRouterTraceid(SerName)
//判断路由是否切换 //判断路由是否切换
if routerName == config.GetRouterName() { if routerName == config.GetRouterName() {
......
...@@ -100,6 +100,7 @@ func (p *Ctrl_Srvermap) WriteMsgToServer(msg *Msg) { ...@@ -100,6 +100,7 @@ func (p *Ctrl_Srvermap) WriteMsgToServer(msg *Msg) {
} }
} else { } else {
//发送到重试管道 //发送到重试管道
log.Println(" send msg to retry", msg.MsgId)
p.retryChan <- msg p.retryChan <- msg
} }
}() }()
......
...@@ -18,7 +18,7 @@ func StartClientServer(ctrser *msgcontrol.Ctrl_Srvermap, retryWork *msgcontrol.C ...@@ -18,7 +18,7 @@ func StartClientServer(ctrser *msgcontrol.Ctrl_Srvermap, retryWork *msgcontrol.C
log.Println(err, "tcp") log.Println(err, "tcp")
//defer延迟关闭改资源,以免引起内存泄漏 //defer延迟关闭改资源,以免引起内存泄漏
defer netListen.Close() defer netListen.Close()
retryWork.StartRetryWork()
log.Println("Waiting for ServiceClients") log.Println("Waiting for ServiceClients")
for { for {
conn, err := netListen.Accept() //第二步:获取连接 conn, err := netListen.Accept() //第二步:获取连接
...@@ -28,7 +28,7 @@ func StartClientServer(ctrser *msgcontrol.Ctrl_Srvermap, retryWork *msgcontrol.C ...@@ -28,7 +28,7 @@ func StartClientServer(ctrser *msgcontrol.Ctrl_Srvermap, retryWork *msgcontrol.C
} }
go RecvClient(conn, ctrser) go RecvClient(conn, ctrser)
} }
retryWork.StartRetryWork()
} }
func byteString(p []byte) string { func byteString(p []byte) string {
...@@ -65,7 +65,6 @@ func RecvClient(conn net.Conn, ctrser *msgcontrol.Ctrl_Srvermap) { ...@@ -65,7 +65,6 @@ func RecvClient(conn net.Conn, ctrser *msgcontrol.Ctrl_Srvermap) {
} else if pServer.SerType == config.SERTYPE_FEEDBACK { } else if pServer.SerType == config.SERTYPE_FEEDBACK {
//注册的反馈服务,则添加入反馈者列表 //注册的反馈服务,则添加入反馈者列表
} }
log.Println("------")
//返回连接的路由消息 //返回连接的路由消息
var SendSend config.SerCommand var SendSend config.SerCommand
SendSend.Command = config.COMMAND_LOGIN SendSend.Command = config.COMMAND_LOGIN
......
...@@ -39,7 +39,9 @@ func callback(d mqcontrol.MSG, msgControl *msgcontrol.MsgControl, serverMap *msg ...@@ -39,7 +39,9 @@ func callback(d mqcontrol.MSG, msgControl *msgcontrol.MsgControl, serverMap *msg
msgControl.AddMsgAndMessage(&msg, &mis) msgControl.AddMsgAndMessage(&msg, &mis)
//2.直接写入 执行管道 //2.直接写入 执行管道
//写入待发送到ser管道 //写入待发送到ser管道
log.Println("-3-")
serverMap.WriteMsgToServer(&msg) serverMap.WriteMsgToServer(&msg)
log.Println("-4-")
} }
}() }()
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment