Commit 18dc1e37 by yunpeng.song

redis 数据结构改为hash

parent 945f256c
......@@ -32,11 +32,11 @@ func initOhterCfg() {
SerConfig = Ctrl_Config_.ServerCfg
OctopusRedisConfig.MsgName_prefix = SerConfig.Project + ".Msg"
OctopusRedisConfig.ClientName_prefix = SerConfig.Project + ".Client"
OctopusRedisConfig.ClientServer_prefix = SerConfig.Project + "ClientServer"
OctopusRedisConfig.MsgStatusListName = SerConfig.Project + SerConfig.Traceid + "MsgStatsus"
OctopusRedisConfig.RotuteStatuslistName = SerConfig.Project + "RouteStatus"
OctopusRedisConfig.MsgName_prefix = "Msg"
OctopusRedisConfig.ClientName_prefix = "Client"
OctopusRedisConfig.ClientServer_prefix = "ClientServer"
OctopusRedisConfig.MsgStatusListName = SerConfig.Traceid + ".MsgStatsus"
OctopusRedisConfig.RotuteStatuslistName = "RouteStatus"
log.WithFields(log.Fields{
"Traceid": Ctrl_Config_.ServerCfg.Traceid,
"Project": Ctrl_Config_.ServerCfg.Project,
......
......@@ -2,7 +2,6 @@ package model
import (
"ficus/native"
"fmt"
"sync"
"time"
......@@ -50,7 +49,7 @@ func (p *CtrlmapClient_) Get(k string) (T_FmClient, error) {
if _, ok := p.mapClient_[k]; ok {
return p.mapClient_[k], nil
}
err := NewMyErr(1, fmt.Sprint("map values is nil ,key:%v", k))
err := NewMyErr(1, "map values is nil ,key:"+k)
return p.mapClient_[k], err
}
func (p *CtrlmapClient_) Set(k string, v T_FmClient) {
......
......@@ -28,16 +28,6 @@ func (p *MyPushMsg) PushDispatch(client T_FmClient, mis mission.Message, Ctrl_ma
}
func (p *MyPushMsg) PushClientDispatch(trans thrift.TTransport, mis mission.Message) bool {
callbackClient := service.NewDispatchServiceClientFactory(thrift.NewTFramedTransport(trans), thrift.NewTCompactProtocolFactory())
// Type_DAEMON Type = 4587776
// Type_FRAMEWORK Type = 4588032
// Type_SYSTEMINFO Type = 4588288
// Type_LOGGER Type = 4588544
// Type_SYNCUTILITY Type = 4589056
// switch mis.Proto {
// case Type_SYNCUTILITY:
// }
buff := thrift.NewTMemoryBuffer()
proto := thrift.NewTBinaryProtocol(buff, true, true)
mis.Write(proto)
......
......@@ -21,14 +21,14 @@ type CtrlRedisClient_ struct {
func NewCtrl_RedisClient_() *CtrlRedisClient_ {
p := &CtrlRedisClient_{}
p.chan_RedisClient = make(chan model.T_FmClient, 100000)
p.chan_RedisClient = make(chan model.T_FmClient, 1000)
p.ctrlRedis = redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "", // no password set
DB: 0, // use default DB
})
go p.Read_Chan_Client()
// go p.Read_Chan_Client()
return p
}
func (p *CtrlRedisClient_) UpdateMsgStatus(mis mission.Message) bool {
......@@ -38,22 +38,19 @@ func (p *CtrlRedisClient_) UpdateMsgStatus(mis mission.Message) bool {
p.setMsg(msg)
return true
}
func (p *CtrlRedisClient_) set(c model.T_FmClient) bool {
func (p *CtrlRedisClient_) setc(c model.T_FmClient) bool {
jsonStr, _ := json.Marshal(c)
//_, err := p.ctrlRedis.Do("SET", c.Agent.ID, string(jsonStr))
err := p.ctrlRedis.Set(c.Agent.ID, string(jsonStr), 0).Err()
//p.ctrlRedis.Expire(c.Agent.ID,30)
//_,err=p.ctrlRedis.Do("EXPlRE", c.Agent,30)
if err != nil {
fmt.Println("redis set failed:", err)
return false
}
return true
}
func (p *CtrlRedisClient_) setClient(c model.T_FmClient) bool {
jsonStr, _ := json.Marshal(c)
kname := fmt.Sprintf("%s.%s", config.OctopusRedisConfig.ClientName_prefix, c.Agent.ID)
fmt.Println("clientkey", kname)
err := p.ctrlRedis.Set(kname, string(jsonStr), 0).Err()
if err != nil {
fmt.Println("redis set failed:", err)
......@@ -61,22 +58,7 @@ func (p *CtrlRedisClient_) setClient(c model.T_FmClient) bool {
}
return true
}
func (p *CtrlRedisClient_) setKV(k string, v string) bool {
//OVER_ClIENT_TIME
/*
_, err := p.ctrlRedis.Do("SET", k, v)
//p.ctrlRedis.Receive(k, 10)
if err != nil {
fmt.Println("redis set failed:", err)
return false;
}
*/
err := p.ctrlRedis.Set(k, v, 30*time.Second).Err()
if err != nil {
panic(err)
}
return true
}
func (p *CtrlRedisClient_) setMsg(msg model.T_CMsg) bool {
kname := fmt.Sprintf("%s.%s", config.OctopusRedisConfig.MsgName_prefix, msg.Mis.ID)
jsonstr, err := json.Marshal(msg)
......@@ -93,9 +75,9 @@ func (p *CtrlRedisClient_) setMsg(msg model.T_CMsg) bool {
return true
}
func (p *CtrlRedisClient_) GetMsg(k string) model.T_CMsg {
kname := fmt.Sprintf("%s.%s", config.OctopusRedisConfig.MsgName_prefix, k)
kname := config.OctopusRedisConfig.MsgName_prefix
//fmt.Println(kname,len(kname));
result, err := p.ctrlRedis.Get(kname).Result()
result, err := p.hget(kname, k)
//fmt.Println(result)
if err != nil {
fmt.Println("redis get:", k, " failed err:", err)
......@@ -107,7 +89,7 @@ func (p *CtrlRedisClient_) GetMsg(k string) model.T_CMsg {
}
return msg
}
func (p *CtrlRedisClient_) get(k string) model.T_FmClient {
func (p *CtrlRedisClient_) getc(k string) model.T_FmClient {
var client model.T_FmClient
//result, err := redis.String(p.ctrlRedis.Do("GET", k))
result, err := p.ctrlRedis.Get(k).Result()
......@@ -126,9 +108,9 @@ func (p *CtrlRedisClient_) get(k string) model.T_FmClient {
//写路由到 redis
func (p *CtrlRedisClient_) Write_RouteRedis(key string) {
go func() {
k := fmt.Sprintf("%s.%v", config.OctopusRedisConfig.ClientServer_prefix, key)
k := config.OctopusRedisConfig.ClientServer_prefix
v := config.SerConfig.Traceid
p.setKV(k, v)
p.hset(k, key, v)
}()
}
......@@ -140,14 +122,3 @@ func (p *CtrlRedisClient_) Write_ClientRedis(c model.T_FmClient) {
p.setClient(c)
}()
}
func (p *CtrlRedisClient_) Write_Chan_Client(c model.T_FmClient) {
go func() {
p.chan_RedisClient <- c
}()
}
func (p *CtrlRedisClient_) Read_Chan_Client() {
for {
cc := <-p.chan_RedisClient
p.set(cc)
}
}
package redis
import (
"time"
)
func (p *CtrlRedisClient_) hset(key, field, v string) error {
return p.ctrlRedis.HSet(key, field, v).Err()
}
func (p *CtrlRedisClient_) hget(key, field string) (string, error) {
return p.ctrlRedis.HGet(key, field).Result()
}
func (p *CtrlRedisClient_) hgetAll(key string) (map[string]string, error) {
return p.ctrlRedis.HGetAll(key).Result()
}
func (p *CtrlRedisClient_) set(k, v string, t int) error {
return p.ctrlRedis.Set(k, v, time.Duration(t)*time.Second).Err()
}
func (p *CtrlRedisClient_) get(k string) (string, error) {
return p.ctrlRedis.Get(k).Result()
}
......@@ -56,11 +56,6 @@ func getPkg(c *gin.Context) {
}
func getPkgs(c *gin.Context) {
// p := &pkg.Package{}
// if err := c.ShouldBindJSON(&p); err != nil {
// WrongRequest(c, err)
// return
// }
if p, err := model.DefaultPkgManger.GetPkgs(); err != nil {
WrongRequest(c, err)
} else {
......
package route
import (
"bytes"
"ficus_clientserver/config"
"ficus_clientserver/model"
"ficus_clientserver/redis"
......@@ -93,7 +94,10 @@ func (p *Ctrl_RotuteRecv) ConnRotuteAndRead() {
log.Println("Recv Command:", pServer.Command)
if pServer.Command == config.COMMAND_LOGIN {
// 登录成功,添加路由到Map
log.Println("登录路由成功,路由名称", string(pServer.Data[:pServer.Length]), "角色:消费者")
index := bytes.IndexByte(pServer.Data[0:1024], 0)
rbyf_pn := pServer.Data[0:index]
log.Println("登录路由成功,路由名称", string(rbyf_pn), "角色:反馈者")
//rs.Name=string(pServer.Data[:]);
//rs.Conn = conn;
p.Rotute.Conn = conn
......@@ -103,20 +107,6 @@ func (p *Ctrl_RotuteRecv) ConnRotuteAndRead() {
} else if pServer.Command == config.COMMAND_TASK {
//推送来的任务,写入队列
if len(p.Rotute.Name) > 0 {
/*
var msg Model.T_CMsg
err:=json.Unmarshal(pServer.Data[:pServer.Length],&msg)
msg.RouteName=p.Rotute.Name
if err == nil {
log.Println("Recv Msg Id:",msg.Mis.ID)
//转成相关的数据格式
//Ctrl_mapCMsg.Set(msg.Mis.ID,msg)
p.ctrl_mapClientMsg.Write_Chan_ClientMsg(msg)
}else{
log.Println("json.Unmarshal:",err)
}
*/
id := pServer.Data[:pServer.Length]
//id
log.Println("recv one msg-------------id:", string(id), "---------------------------------------")
......
package route
import (
json2 "encoding/json"
"ficus/mission"
"ficus_clientserver/config"
"net"
"sync"
"time"
"unsafe"
log "github.com/sirupsen/logrus"
)
//接受推送服务
type RouteSend_ struct {
Name string
Conn net.Conn
}
type Ctrl_RouteSend struct {
Lock sync.RWMutex
Rotute RouteSend_
chanSendTMsg chan config.SerCommand
//ctrl_mapClientMsg *Model.CtrlmapClientMsg_
}
func NewCtrl_RotuteSend() *Ctrl_RouteSend {
p := &Ctrl_RouteSend{}
p.chanSendTMsg = make(chan config.SerCommand, 10000)
//p.RotuteMap=make(map[string]RotuteRecv_)
//p.ctrl_mapClientMsg=Ctrl_mapClientMsg
go p.StartCommRetute()
go p.Read_chanSendTMsg()
return p
}
func (p *Ctrl_RouteSend) send(comm config.SerCommand) {
Len := unsafe.Sizeof(comm)
testBytes := &SliceMock{
addr: uintptr(unsafe.Pointer(&comm)),
cap: int(Len),
len: int(Len),
}
var data = *(*[]byte)(unsafe.Pointer(testBytes))
p.Rotute.Conn.Write(data)
}
func (p *Ctrl_RouteSend) Write_chanSendTMsg(mis mission.Message) {
var comm config.SerCommand
comm.Command = config.COMMAND_TASK_BACK //任务消息的反馈
comm.PushStatus = config.MSG_SUCCESSED //消息成功
comm.SerType = config.SERTYPE_FEEDBACK // 反馈者
json, _ := json2.Marshal(mis)
copy(comm.Data[:len(json)], json)
comm.Length = uint32(len(json))
p.chanSendTMsg <- comm
}
func (p *Ctrl_RouteSend) Read_chanSendTMsg() {
for {
if p.Rotute.Name != "" {
break
}
}
for {
msg := <-p.chanSendTMsg
go func() {
p.send(msg)
}()
}
}
func (p *Ctrl_RouteSend) StartCommRetute() {
//连接 ==》 路由服务
for {
p.ConnRotuteAndRead()
time.Sleep(2 * time.Second)
}
}
func (p *Ctrl_RouteSend) ConnRotuteAndRead() {
conn, err := net.Dial("tcp", config.ROUTE_HOST_IP_PORT)
if err != nil {
log.Println("dial failed:", err)
log.Println("wait reconnection …… …… ……")
//os.Exit(1)
return
}
//defer conn.Close()
//发送一个登录消息
//sbuffer:=make([]byte, 1024)
{
sname := config.GetSrverId()
var login config.SerCommand
login.Command = config.COMMAND_LOGIN
login.SerType = config.SERTYPE_FEEDBACK // 反馈者
//sbuffer=[]byte()
//copy(,sname)
//login.Data=[]byte(sname)
copy(login.Data[:len(sname)], sname)
login.Length = uint32(len(sname))
login.PushStatus = 0
Len := unsafe.Sizeof(login)
testBytes := &SliceMock{
addr: uintptr(unsafe.Pointer(&login)),
cap: int(Len),
len: int(Len),
}
var data = *(*[]byte)(unsafe.Pointer(testBytes))
conn.Write(data)
}
//var rs RotuteRecv_
//rs.Name=""
for {
buffer := make([]byte, config.ROUTE_MAX_RECV_LEN)
_, err2 := conn.Read(buffer)
if err2 != nil {
log.Println("conn error return")
return
}
//接受数据成功
pServer := *(**config.SerCommand)(unsafe.Pointer(&buffer))
log.Println("Recv Command:", pServer.Command)
if pServer.Command == config.COMMAND_LOGIN {
// 登录成功,添加路由到Map
log.Println("登录路由成功,路由名称", string(pServer.Data[:pServer.Length]), "角色:反馈者")
p.Rotute.Name = string(pServer.Data[:])
p.Rotute.Conn = conn
//p.Rotute=rs
} else if pServer.Command == config.COMMAND_HEARTBEAT {
// 收到这个为服务端主动心跳,则不响应
} else if pServer.Command == config.COMMAND_TASK {
//这里为 发送反馈 不接受 返回的 COMMAND_TASK 任务
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment