Commit 4ecb19da by yunpeng.song

redis数据结构改为hash

parent 62cac69d
......@@ -35,11 +35,11 @@ func GetSrverName() string {
func initOhterCfg() {
OctopusConfig = Ctrl_Config_.OctopusCfg
OctopusRedisConfig.MsgName_prefix = OctopusConfig.Project + ".Msg"
OctopusRedisConfig.ClientName_prefix = OctopusConfig.Project + ".Client"
OctopusRedisConfig.ClientServer_prefix = OctopusConfig.Project + "ClientServer"
OctopusRedisConfig.MsgStatusListName = OctopusConfig.Project + OctopusConfig.Traceid + "MsgStatsus"
OctopusRedisConfig.RotuteStatuslistName = OctopusConfig.Project + "RouteStatus"
OctopusRedisConfig.MsgName_prefix = "Msg"
OctopusRedisConfig.ClientName_prefix = "Client"
OctopusRedisConfig.ClientServer_prefix = "ClientServer"
OctopusRedisConfig.MsgStatusListName = OctopusConfig.Traceid + ".MsgStatsus"
OctopusRedisConfig.RotuteStatuslistName = "RouteStatus"
log.WithFields(log.Fields{
"Traceid": Ctrl_Config_.OctopusCfg.Traceid,
"Project": Ctrl_Config_.OctopusCfg.Project,
......
......@@ -30,8 +30,8 @@ func main() {
}
func InitCfg() {
//加载失败直接退出
if err := config.LoadCfg(); err != nil {
log.Fatalln(err)
}
}
......@@ -6,6 +6,7 @@ import (
"ficus_router/config"
"ficus_router/mqcontrol"
"ficus_router/redisctrl"
"fmt"
"sync"
"time"
......@@ -71,7 +72,7 @@ func (p *CtrlMsgMap) TimeOutTimerMsg() {
for k, v := range p.msgMap {
log.WithFields(log.Fields{"func": "msgkey"}).Info(k)
msg := p.ctrlRedisClient.GetMsg(k)
fmt.Println("1111111111111111111")
if msg.PushStatus == config.MSG_WAITSUCCESS {
//获取 客户端 连接到的 服务 Traceid
serName, err := p.ctrlRedisClient.GetClientConnSerTraceid(msg.Mis.Agent)
......@@ -84,6 +85,7 @@ func (p *CtrlMsgMap) TimeOutTimerMsg() {
//未获取到客户端上线标识,证明 此机器并未上线,则忽略此消息
continue
}
fmt.Println("222222222222")
if p.ctrl_SrverMap.IsServer(serName) {
//超时写入重试管道
ic := time.Now().Unix() - v.Lastupdatetime
......@@ -94,12 +96,14 @@ func (p *CtrlMsgMap) TimeOutTimerMsg() {
rm.MsgId = v.Mis.ID
rm.TimeSecond = v.Lastupdatetime
p.Ctrl_Retry.Write_chan_retryChan(rm)
fmt.Println("33333333333333")
}
} else {
//客户端上线,但是不在连接的列表中,则丢给MQ , 并且删除redis的缓存
jsonstr, _ := json.Marshal(msg.Mis)
//发送给mq
p.ctrl_SendMq.Push(jsonstr)
fmt.Println("444444444444")
//从 map 中删除 这个key
delete(p.msgMap, k)
//从redis中删除这个key
......@@ -107,6 +111,7 @@ func (p *CtrlMsgMap) TimeOutTimerMsg() {
}
} else if msg.PushStatus == config.MSG_SUCCESSED {
//从 map 中删除 这个key
fmt.Println("55555555555555555555555")
delete(p.msgMap, k)
p.ctrl_RMsgStatus.StatusWork(msg.Mis.ID, config.MSG_SUCCESSED)
}
......
......@@ -5,6 +5,7 @@ import (
"ficus_router/config"
"ficus_router/mqcontrol"
"ficus_router/redisctrl"
"fmt"
"time"
)
......@@ -43,6 +44,7 @@ func (p *CtrlRetry) Write_chan_retryChan(rm SerMsg) {
p.retryChan <- rm
}
func (p *CtrlRetry) Read_chan_retryChan() {
fmt.Println("Read_chan_retryChan")
for {
rm := <-p.retryChan
ic := time.Now().Unix() - rm.TimeSecond
......@@ -65,6 +67,7 @@ func (p *CtrlRetry) Read_chan_retryChan() {
// 返回 rabbitmq
json, _ := json.Marshal(msg.Mis)
p.psend.Push(json)
fmt.Println("retry push")
}
} else {
//小于超时时间,则不检测是否成功! 直接丢入自身的 检测重试 管道
......
......@@ -3,8 +3,6 @@ package redisctrl
import (
"encoding/json"
"ficus_router/config"
"fmt"
"time"
log "github.com/sirupsen/logrus"
......@@ -33,52 +31,37 @@ func NewCtrlRedisMsg() *CtrlRedisMsg {
func (p *CtrlRedisMsg) setResisMsg(msg config.T_CMsg) {
jsonstr, _ := json.Marshal(msg)
//订单保存 1 天
kname := fmt.Sprintf("%s.%s", config.OctopusRedisConfig.MsgName_prefix, msg.Mis.ID)
kname := config.OctopusRedisConfig.MsgName_prefix
log.WithFields(log.Fields{"func": "setResisMsg", "kname": kname}).Info(string(jsonstr))
p.ctrlRedisMsg.Set(kname, jsonstr, 60*24*60*time.Second)
p.hset(kname, msg.Mis.ID, jsonstr)
}
func (p *CtrlRedisMsg) get(k string) config.T_CMsg {
jsonstr := p.ctrlRedisMsg.Get(k)
func (p *CtrlRedisMsg) getMsg(k, field string) config.T_CMsg {
jsonstr, _ := p.hget(k, field)
var msg config.T_CMsg
json.Unmarshal([]byte(jsonstr.Val()), msg)
json.Unmarshal([]byte(jsonstr), &msg)
return msg
}
func (p *CtrlRedisMsg) GetClientTraceid(k string) string {
kname := fmt.Sprintf("%s.%s", config.OctopusRedisConfig.ClientName_prefix, k)
json := p.ctrlRedisMsg.Get(kname)
return json.Val()
kname := config.OctopusRedisConfig.ClientName_prefix
json, _ := p.hget(kname, k)
return json
}
//客户端连接到的server
func (p *CtrlRedisMsg) GetClientConnSerTraceid(k string) string {
func (p *CtrlRedisMsg) GetClientConnSerTraceid(k string) (string, error) {
//ClientServer
kname := fmt.Sprintf("%s.%s", config.OctopusRedisConfig.ClientServer_prefix, k)
json := p.ctrlRedisMsg.Get(kname)
return json.Val()
}
func (p *CtrlRedisMsg) test(k string) {
//p.ctrlRedisMsg.ZAdd("ssss","1ss")
kname := config.OctopusRedisConfig.ClientServer_prefix
return p.hget(kname, k)
}
func (p *CtrlRedisMsg) GetMsg(k string) config.T_CMsg {
kname := fmt.Sprintf("%s.%s", config.OctopusRedisConfig.MsgName_prefix, k)
kname := config.OctopusRedisConfig.MsgName_prefix
log.WithFields(log.Fields{"func": "GetMsg"}).Info(kname)
jsonstr := p.ctrlRedisMsg.Get(kname)
log.WithFields(log.Fields{"func": "jsonstr"}).Info(jsonstr.Val())
var msg config.T_CMsg
err := json.Unmarshal([]byte(jsonstr.Val()), &msg)
if err != nil {
fmt.Println("GetMsgTraceid json.Unmarshal!", err)
}
msg := p.getMsg(kname, k)
return msg
}
func (p *CtrlRedisMsg) Write_Chan_RedisMsg(msg config.T_CMsg) {
fmt.Println("0", msg.Mis.ID)
go func() {
fmt.Println("1", msg.Mis.ID)
p.chanRedisMsg <- msg
}()
}
......@@ -86,9 +69,7 @@ func (p *CtrlRedisMsg) Read_Chan_RedisMsg() {
for {
var msg config.T_CMsg
msg = <-p.chanRedisMsg
fmt.Println("2", msg.Mis.ID)
go func() {
fmt.Println("3", msg.Mis.ID)
p.setResisMsg(msg)
}()
}
......
......@@ -2,7 +2,7 @@ package redisctrl
import (
"ficus_router/config"
"fmt"
"log"
"github.com/go-redis/redis"
)
......@@ -13,7 +13,7 @@ import (
type RMsgStatus struct {
ctrlRedisMsgStatus *redis.Client
hashname string "hash name"
hashname string // "hash name"
}
func NewRMsgStatus() *RMsgStatus {
......@@ -27,13 +27,13 @@ func NewRMsgStatus() *RMsgStatus {
//p.hashname=fmt.Sprint("%s.%s.%s",Config.OctopusConfig.Project,Config.OctopusConfig.Traceid,"MsgStatsu")
p.hashname = config.OctopusRedisConfig.MsgStatusListName
fmt.Println("MsgStatus Redis Hash 上线 名称:", p.hashname)
log.Println("MsgStatus Redis Hash 上线 名称:", p.hashname)
return p
}
func (p *RMsgStatus) StatusWork(msgid string, status config.MSG_STATUS) bool {
if status == config.MSG_WAITSUCCESS {
//写入 -- redis
p.set(msgid)
p.write(msgid)
} else if status == config.MSG_SUCCESSED {
//删除 - redis
p.del(msgid)
......@@ -43,10 +43,10 @@ func (p *RMsgStatus) StatusWork(msgid string, status config.MSG_STATUS) bool {
return true
}
func (p *RMsgStatus) Del(msgid string) bool {
p.del(msgid)
p.ctrlRedisMsgStatus.HDel(p.hashname, msgid)
return true
}
func (p *RMsgStatus) set(msgid string) bool {
func (p *RMsgStatus) write(msgid string) bool {
if p.ctrlRedisMsgStatus.HExists(p.hashname, msgid).Val() == false {
p.ctrlRedisMsgStatus.HSet(p.hashname, msgid, config.MSG_WAITSUCCESS)
} else {
......
package redisctrl
import (
"time"
)
func (p *CtrlRedisMsg) hset(key, field string, v interface{}) error {
return p.ctrlRedisMsg.HSet(key, field, v).Err()
}
func (p *CtrlRedisMsg) hget(key, field string) (string, error) {
v, err := p.ctrlRedisMsg.HGet(key, field).Result()
return v, err
}
func (p *CtrlRedisMsg) hgetAll(key string) (map[string]string, error) {
return p.ctrlRedisMsg.HGetAll(key).Result()
}
func (p *CtrlRedisMsg) set(k, v string, t int) error {
return p.ctrlRedisMsg.Set(k, v, time.Duration(t)*time.Second).Err()
}
func (p *CtrlRedisMsg) get(k string) (string, error) {
return p.ctrlRedisMsg.Get(k).Result()
}
......@@ -16,7 +16,7 @@ func StartClientServer(ctrser *model.Ctrl_Srvermap) {
//建立socket,监听端口 第一步:绑定端口
//netListen, err := net.Listen("tcp", "localhost:1024")
netListen, err := net.Listen("tcp", ":38000")
failOnError(err, "tcp")
log.Println(err, "tcp")
//defer延迟关闭改资源,以免引起内存泄漏
defer netListen.Close()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment