Commit 1ae78eed by yunpeng.song

优化

parent 18dc1e37
...@@ -70,11 +70,12 @@ type SERVER_CONFIG struct { ...@@ -70,11 +70,12 @@ type SERVER_CONFIG struct {
//redis key name //redis key name
type REDISNAME_CONFIG struct { type REDISNAME_CONFIG struct {
MsgName_prefix string `json:"msgname"` //"消息的前缀" MsgName string `json:"msg"` // 消息的前缀
ClientName_prefix string `json:"clientname"` //"客户端的前缀" ClientName string `json:"clientname"` // 客户端的前缀
ClientServer_prefix string `json:"clientserver"` //"" ClientServer string `json:"clientserver"` // 客户端连接的server
MsgStatusListName string `json:"msgstatusname"` //"未执行消息的list,只带消息id的" MsgStatusName string `json:"msgstatus"` // 未执行消息的list,只带消息id的
RotuteStatuslistName string `json:"routestatusname"` //"在线路由的列表" RotuteStatusName string `json:"routestatusname"` // 在线路由的列表
RouterName string `json:"routername"` // 路由映射列表
} }
type CFG struct { type CFG struct {
......
...@@ -13,7 +13,7 @@ import ( ...@@ -13,7 +13,7 @@ import (
const CONFIG_CONFIGNAME = "cfg.json" const CONFIG_CONFIGNAME = "cfg.json"
var SerConfig SERVER_CONFIG var SerConfig SERVER_CONFIG
var OctopusRedisConfig REDISNAME_CONFIG //各种数据在redis 中的名称 var ServerRedisConfig REDISNAME_CONFIG //各种数据在redis 中的名称
var ConfigFilePath string var ConfigFilePath string
func newCfg() *CFG { func newCfg() *CFG {
...@@ -25,26 +25,28 @@ func GetSrverId() string { ...@@ -25,26 +25,28 @@ func GetSrverId() string {
return SerConfig.Traceid return SerConfig.Traceid
} }
func GetSrverName() string { func GetSrverName() string {
return SerConfig.Project return SerConfig.Project + SerConfig.Traceid
} }
func initOhterCfg() { func initOhterCfg() {
SerConfig = Ctrl_Config_.ServerCfg SerConfig = Ctrl_Config_.ServerCfg
OctopusRedisConfig.MsgName_prefix = "Msg" ServerRedisConfig.MsgName = "Msg"
OctopusRedisConfig.ClientName_prefix = "Client" ServerRedisConfig.ClientName = "Client"
OctopusRedisConfig.ClientServer_prefix = "ClientServer" ServerRedisConfig.ClientServer = "ClientServer"
OctopusRedisConfig.MsgStatusListName = SerConfig.Traceid + ".MsgStatsus" ServerRedisConfig.MsgStatusName = "MsgStatus"
OctopusRedisConfig.RotuteStatuslistName = "RouteStatus" ServerRedisConfig.RotuteStatusName = "RouteStatus"
ServerRedisConfig.RouterName = "Router"
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"Traceid": Ctrl_Config_.ServerCfg.Traceid, "Traceid": Ctrl_Config_.ServerCfg.Traceid,
"Project": Ctrl_Config_.ServerCfg.Project, "Project": Ctrl_Config_.ServerCfg.Project,
"MsgName_prefix": OctopusRedisConfig.MsgName_prefix, "MsgName": ServerRedisConfig.MsgName,
"ClientName_prefix": OctopusRedisConfig.ClientName_prefix, "ClientName": ServerRedisConfig.ClientName,
"ClientServer_prefix": OctopusRedisConfig.ClientServer_prefix, "ClientServer": ServerRedisConfig.ClientServer,
"MsgStatusListName": OctopusRedisConfig.MsgStatusListName, "MsgStatusName": ServerRedisConfig.MsgStatusName,
"RotuteStatuslistName": OctopusRedisConfig.RotuteStatuslistName, "RotuteName": ServerRedisConfig.RouterName,
"RotuteStatusName": ServerRedisConfig.RotuteStatusName,
}).Info() }).Info()
} }
......
...@@ -4,7 +4,6 @@ import ( ...@@ -4,7 +4,6 @@ import (
"ficus/service" "ficus/service"
"ficus_clientserver/config" "ficus_clientserver/config"
"ficus_clientserver/model" "ficus_clientserver/model"
"ficus_clientserver/redis"
"ficus_clientserver/route" "ficus_clientserver/route"
//"ficus_clientserver/route" //"ficus_clientserver/route"
...@@ -52,7 +51,7 @@ func Startwork0() { ...@@ -52,7 +51,7 @@ func Startwork0() {
log.Fatalln("Error!", err) log.Fatalln("Error!", err)
} }
//redis 控制 //redis 控制
processorFactory := thrifthandler.NewMyTProcessorFactory(model.GetDefaultClientMap(), redis.NewCtrl_RedisClient_()) processorFactory := thrifthandler.NewMyTProcessorFactory(model.GetDefaultClientMap(), model.NewCtrl_RedisClient_())
protocolFactory := thrift.NewTCompactProtocolFactory() protocolFactory := thrift.NewTCompactProtocolFactory()
transportFactory := thrift.NewTFramedTransportFactory(thrift.NewTTransportFactory()) transportFactory := thrift.NewTFramedTransportFactory(thrift.NewTTransportFactory())
server := thrift.NewTSimpleServerFactory4(processorFactory, serverTransport, transportFactory, protocolFactory) server := thrift.NewTSimpleServerFactory4(processorFactory, serverTransport, transportFactory, protocolFactory)
...@@ -74,7 +73,7 @@ func Startwork1() { ...@@ -74,7 +73,7 @@ func Startwork1() {
pckprocessor := service.NewPackageServiceProcessor(pkgHandle) pckprocessor := service.NewPackageServiceProcessor(pkgHandle)
TMultiplexedProcessor := thrift.NewTMultiplexedProcessor() TMultiplexedProcessor := thrift.NewTMultiplexedProcessor()
dispatchHandle := thrifthandler.NewMyDispatchService(nil, model.GetDefaultClientMap(), redis.NewCtrl_RedisClient_()) dispatchHandle := thrifthandler.NewMyDispatchService(nil, model.GetDefaultClientMap(), model.NewCtrl_RedisClient_())
dispatchprocessor := service.NewDispatchServiceProcessor(dispatchHandle) dispatchprocessor := service.NewDispatchServiceProcessor(dispatchHandle)
TMultiplexedProcessor.RegisterProcessor("Package", pckprocessor) TMultiplexedProcessor.RegisterProcessor("Package", pckprocessor)
TMultiplexedProcessor.RegisterProcessor("Mission", dispatchprocessor) TMultiplexedProcessor.RegisterProcessor("Mission", dispatchprocessor)
...@@ -99,5 +98,4 @@ func InitCfg() { ...@@ -99,5 +98,4 @@ func InitCfg() {
if err := config.LoadCfg(); err != nil { if err := config.LoadCfg(); err != nil {
log.Fatalln(err) log.Fatalln(err)
} }
} }
package model
import (
"encoding/json"
"ficus/mission"
"ficus_clientserver/config"
"ficus_router/redisctrl"
"fmt"
"github.com/go-redis/redis"
log "github.com/sirupsen/logrus"
"sync"
)
type CtrlRedisClient_ struct {
Lock sync.RWMutex
redisClient *redis.Client
clientChan chan T_FmClient
}
func NewCtrl_RedisClient_() *CtrlRedisClient_ {
p := &CtrlRedisClient_{}
p.clientChan = make(chan T_FmClient, 1000)
p.redisClient = redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "", // no password set
DB: 0, // use default DB
})
// go p.Read_Chan_Client()
return p
}
func (p *CtrlRedisClient_) UpdateMsgStatus(mis mission.Message) bool {
p.setMessage(mis)
return true
}
func (p *CtrlRedisClient_) setMessage(msg mission.Message) bool {
jsonstr, _ := json.Marshal(msg)
kname := config.ServerRedisConfig.MsgName
log.WithFields(log.Fields{"func": "setMessage", "kname": kname, "msgId": msg.ID}).Info(string(jsonstr))
redisctrl.HSet(p.redisClient, kname, msg.ID, jsonstr)
return true
}
func (p *CtrlRedisClient_) GetMessage(k string) (mis mission.Message) {
kname := config.ServerRedisConfig.MsgName
result, err := redisctrl.HGet(p.redisClient, kname, k)
if err != nil {
fmt.Println("redis get:", k, " failed err:", err)
}
err2 := json.Unmarshal([]byte(result), &mis)
if err2 != nil {
log.Println("json.Unmarshal error!", err2)
}
return
}
func (p *CtrlRedisClient_) setMsg(msg Msg) bool {
kname := config.ServerRedisConfig.MsgStatusName
jsonstr, err := json.Marshal(msg)
if err != nil {
fmt.Println("Update Redis Msg Status err:", err)
return false
}
log.WithFields(log.Fields{"func": "setMsg", "kname": kname, "msgId": msg.MsgId}).Info(string(jsonstr))
redisctrl.HSet(p.redisClient, kname, msg.MsgId, jsonstr)
return true
}
func (p *CtrlRedisClient_) SetRouter(routerName, serverName string) bool {
kname := config.ServerRedisConfig.RouterName
redisctrl.HSet(p.redisClient, kname, serverName, routerName)
return true
}
func (p *CtrlRedisClient_) GetMsg(k string) *Msg {
kname := config.ServerRedisConfig.MsgStatusName
jsonstr, _ := redisctrl.HGet(p.redisClient, kname, k)
log.Println(jsonstr)
var msg Msg
json.Unmarshal([]byte(jsonstr), &msg)
return &msg
}
//写路由到 redis
func (p *CtrlRedisClient_) Write_RouteRedis(key string) {
go func() {
k := config.ServerRedisConfig.ClientServer
v := config.SerConfig.Traceid
//p.hset(k, key, v)
redisctrl.HSet(p.redisClient, k, key, v)
}()
}
package model
import "fmt"
type FmError struct {
itype int
errMsg string
}
func NewMyErr(itype int, errMsg string) *FmError {
return &FmError{itype, errMsg}
}
//实现Error接口
func (e *FmError) Error() string {
return fmt.Sprintf("%d - %s", e.itype, e.errMsg)
}
package model package model
import ( import (
"errors"
"ficus/native" "ficus/native"
"sync" "sync"
"time" "time"
"github.com/apache/thrift/lib/go/thrift" "github.com/apache/thrift/lib/go/thrift"
log "github.com/sirupsen/logrus"
) )
//客户端的操作函数 //客户端的操作函数
...@@ -49,12 +51,13 @@ func (p *CtrlmapClient_) Get(k string) (T_FmClient, error) { ...@@ -49,12 +51,13 @@ func (p *CtrlmapClient_) Get(k string) (T_FmClient, error) {
if _, ok := p.mapClient_[k]; ok { if _, ok := p.mapClient_[k]; ok {
return p.mapClient_[k], nil return p.mapClient_[k], nil
} }
err := NewMyErr(1, "map values is nil ,key:"+k) err := errors.New("map values is nil ,key:" + k)
return p.mapClient_[k], err return p.mapClient_[k], err
} }
func (p *CtrlmapClient_) Set(k string, v T_FmClient) { func (p *CtrlmapClient_) Set(k string, v T_FmClient) {
p.Lock.Lock() p.Lock.Lock()
defer p.Lock.Unlock() defer p.Lock.Unlock()
log.Println("add client", k)
p.mapClient_[k] = v p.mapClient_[k] = v
} }
func (p *CtrlmapClient_) SetTime(k string, itime int64) { func (p *CtrlmapClient_) SetTime(k string, itime int64) {
......
package model package model
import ( import (
"ficus/mission"
"ficus_clientserver/config"
"sync" "sync"
"time" "time"
...@@ -11,18 +9,19 @@ import ( ...@@ -11,18 +9,19 @@ import (
//维护一个任务列表的状态 //维护一个任务列表的状态
type ( type (
T_CMsg struct { Msg struct {
RouteName string //"路由主机" lock *sync.RWMutex
Mis mission.Message SerName string // 消息对应server
PushCount uint32 //"push count" MsgId string // 消息 id
PushStatus config.MSG_STATUS //"发送状态" Agent string // client id
Lastupdatetime int64 //laset dispatch time Tries int32 // 当前重试次数
//Rpcid string TriesMax int32 // 最大重试次数
TimeSecond int64 // 消息到达路由时间
Lastupdatetime int64 // 上次更新时间
} }
clientMsg struct { clientMsg struct {
chan_CMsg chan T_CMsg chan_CMsg chan *Msg
//mapClentMsg map[string] T_CMsg "消息不保存了"
} }
T_CountFmClientMsg struct { T_CountFmClientMsg struct {
...@@ -38,6 +37,7 @@ type ( ...@@ -38,6 +37,7 @@ type (
mapLock *sync.RWMutex mapLock *sync.RWMutex
mapClientMsg_ map[string]clientMsg mapClientMsg_ map[string]clientMsg
Ctrl_MapClient *CtrlmapClient_ Ctrl_MapClient *CtrlmapClient_
ctrl_redisMsg *CtrlRedisClient_
} }
) )
...@@ -47,29 +47,29 @@ func NewCtrlmapClientMsg_(Ctrl_MapClient *CtrlmapClient_) *CtrlmapClientMsg_ { ...@@ -47,29 +47,29 @@ func NewCtrlmapClientMsg_(Ctrl_MapClient *CtrlmapClient_) *CtrlmapClientMsg_ {
p.mapLock = new(sync.RWMutex) p.mapLock = new(sync.RWMutex)
p.mapClientMsg_ = make(map[string]clientMsg) p.mapClientMsg_ = make(map[string]clientMsg)
p.Ctrl_MapClient = Ctrl_MapClient p.Ctrl_MapClient = Ctrl_MapClient
p.ctrl_redisMsg = NewCtrl_RedisClient_()
return p return p
} }
func (p *CtrlmapClientMsg_) CteatClientMsgBykey(k string) { func (p *CtrlmapClientMsg_) CreateClientMsgBykey(k string) {
p.Lock.Lock() p.Lock.Lock()
defer p.Lock.Unlock() defer p.Lock.Unlock()
if _, ok := p.mapClientMsg_[k]; ok == false { if _, ok := p.mapClientMsg_[k]; ok == false {
cc := clientMsg{} cc := clientMsg{}
cc.chan_CMsg = make(chan T_CMsg, 1000) cc.chan_CMsg = make(chan *Msg, 100)
//cc.mapClentMsg = make(map[string]T_CMsg);
p.mapClientMsg_[k] = cc p.mapClientMsg_[k] = cc
go p.ReadClientMsg_Work(k) go p.ReadClientMsg_Work(k)
} }
} }
func (p *CtrlmapClientMsg_) Write_Chan_ClientMsg(msg T_CMsg) { func (p *CtrlmapClientMsg_) Write_Chan_ClientMsg(msg *Msg) {
//判断有没有这个列表,还有则写没有则不谢 //判断有没有这个列表,还有则写没有则不谢
p.CteatClientMsgBykey(msg.Mis.Agent) p.CreateClientMsgBykey(msg.Agent)
p.mapClientMsg_[msg.Mis.Agent].chan_CMsg <- msg p.mapClientMsg_[msg.Agent].chan_CMsg <- msg
} }
func (p *CtrlmapClientMsg_) Read_Chan_ClietnMsg(k string) T_CMsg { func (p *CtrlmapClientMsg_) Read_Chan_ClietnMsg(k string) *Msg {
//p.Lock.RLock() //p.Lock.RLock()
//defer p.Lock.RUnlock() //defer p.Lock.RUnlock()
msg := <-p.mapClientMsg_[k].chan_CMsg msg := <-p.mapClientMsg_[k].chan_CMsg
...@@ -84,19 +84,18 @@ func (p *CtrlmapClientMsg_) ReadClientMsg_Work(k string) { ...@@ -84,19 +84,18 @@ func (p *CtrlmapClientMsg_) ReadClientMsg_Work(k string) {
p.ClientMsg_Work(msg) p.ClientMsg_Work(msg)
} }
} }
func (p *CtrlmapClientMsg_) ClientMsg_Work(t T_CMsg) { func (p *CtrlmapClientMsg_) ClientMsg_Work(m *Msg) {
//查询 redis 是否成功,成功则直接丢弃 //查询 redis 是否成功,成功则直接丢弃
cc, err := p.Ctrl_MapClient.Get(t.Mis.Agent) cc, err := p.Ctrl_MapClient.Get(m.Agent)
if err == nil { if err == nil {
//查询是否在线 //查询是否在线
if cc.Trans.IsOpen() { if cc.Trans.IsOpen() {
//发送消息 //发送消息
log.Println("准备发送消息!") log.Println("准备发送消息!")
dis := &MyPushMsg{} dis := &MyPushMsg{}
dis.PushDispatch(cc, t.Mis, p.Ctrl_MapClient) mis := p.ctrl_redisMsg.GetMessage(m.MsgId)
dis.PushDispatch(cc, mis, p.Ctrl_MapClient)
t.PushCount = 1 m.Lastupdatetime = time.Now().Unix()
t.Lastupdatetime = time.Now().Unix()
} else { } else {
//设置map列表不在线 //设置map列表不在线
p.Ctrl_MapClient.SetLine(cc.Agent.ID, false) p.Ctrl_MapClient.SetLine(cc.Agent.ID, false)
...@@ -105,7 +104,7 @@ func (p *CtrlmapClientMsg_) ClientMsg_Work(t T_CMsg) { ...@@ -105,7 +104,7 @@ func (p *CtrlmapClientMsg_) ClientMsg_Work(t T_CMsg) {
} }
} else { } else {
//不在线 //不在线
log.Println("不在线") log.Println("不在线", err)
} }
return return
......
package redis
import (
"encoding/json"
"ficus/mission"
"ficus_clientserver/config"
"ficus_clientserver/model"
"fmt"
"time"
"sync"
"github.com/go-redis/redis"
)
type CtrlRedisClient_ struct {
Lock sync.RWMutex
ctrlRedis *redis.Client
chan_RedisClient chan model.T_FmClient
}
func NewCtrl_RedisClient_() *CtrlRedisClient_ {
p := &CtrlRedisClient_{}
p.chan_RedisClient = make(chan model.T_FmClient, 1000)
p.ctrlRedis = redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "", // no password set
DB: 0, // use default DB
})
// go p.Read_Chan_Client()
return p
}
func (p *CtrlRedisClient_) UpdateMsgStatus(mis mission.Message) bool {
msg := p.GetMsg(mis.ID)
msg.Mis = mis
msg.PushStatus = config.MSG_SUCCESSED
p.setMsg(msg)
return true
}
func (p *CtrlRedisClient_) setc(c model.T_FmClient) bool {
jsonStr, _ := json.Marshal(c)
err := p.ctrlRedis.Set(c.Agent.ID, string(jsonStr), 0).Err()
if err != nil {
fmt.Println("redis set failed:", err)
return false
}
return true
}
func (p *CtrlRedisClient_) setClient(c model.T_FmClient) bool {
jsonStr, _ := json.Marshal(c)
kname := fmt.Sprintf("%s.%s", config.OctopusRedisConfig.ClientName_prefix, c.Agent.ID)
err := p.ctrlRedis.Set(kname, string(jsonStr), 0).Err()
if err != nil {
fmt.Println("redis set failed:", err)
return false
}
return true
}
func (p *CtrlRedisClient_) setMsg(msg model.T_CMsg) bool {
kname := fmt.Sprintf("%s.%s", config.OctopusRedisConfig.MsgName_prefix, msg.Mis.ID)
jsonstr, err := json.Marshal(msg)
if err != nil {
fmt.Println("Update Redis Msg Status err:", err)
return false
}
//fmt.Println("set redis id:",kname,"rpcid:",msg.Rpcid)
//fmt.Println(string(jsonstr))
err2 := p.ctrlRedis.Set(kname, string(jsonstr), 24*time.Hour).Err()
if err2 != nil {
fmt.Println("trlRedis.Set Err", err)
}
return true
}
func (p *CtrlRedisClient_) GetMsg(k string) model.T_CMsg {
kname := config.OctopusRedisConfig.MsgName_prefix
//fmt.Println(kname,len(kname));
result, err := p.hget(kname, k)
//fmt.Println(result)
if err != nil {
fmt.Println("redis get:", k, " failed err:", err)
}
var msg model.T_CMsg
err2 := json.Unmarshal([]byte(result), &msg)
if err2 != nil {
fmt.Println("json.Unmarshal error!", err2)
}
return msg
}
func (p *CtrlRedisClient_) getc(k string) model.T_FmClient {
var client model.T_FmClient
//result, err := redis.String(p.ctrlRedis.Do("GET", k))
result, err := p.ctrlRedis.Get(k).Result()
if err != nil {
fmt.Println("redis get:", k, " failed err:", err)
} else {
fmt.Println("redis get: ", k, ": values:", result)
err := json.Unmarshal([]byte(result), &client)
if err != nil {
fmt.Println("error:", err)
}
}
return client
}
//写路由到 redis
func (p *CtrlRedisClient_) Write_RouteRedis(key string) {
go func() {
k := config.OctopusRedisConfig.ClientServer_prefix
v := config.SerConfig.Traceid
p.hset(k, key, v)
}()
}
//写客户端到 redis
func (p *CtrlRedisClient_) Write_ClientRedis(c model.T_FmClient) {
go func() {
c.Trans = nil
//信息结构
p.setClient(c)
}()
}
...@@ -2,24 +2,51 @@ package redis ...@@ -2,24 +2,51 @@ package redis
import ( import (
"time" "time"
"github.com/go-redis/redis"
) )
func (p *CtrlRedisClient_) hset(key, field, v string) error { // func (p *CtrlRedisClient_) hset(key, field, v string) error {
return p.ctrlRedis.HSet(key, field, v).Err() // return p.redisClient.HSet(key, field, v).Err()
// }
// func (p *CtrlRedisClient_) hget(key, field string) (string, error) {
// return p.redisClient.HGet(key, field).Result()
// }
// func (p *CtrlRedisClient_) hgetAll(key string) (map[string]string, error) {
// return p.redisClient.HGetAll(key).Result()
// }
// func (p *CtrlRedisClient_) set(k, v string, t int) error {
// return p.redisClient.Set(k, v, time.Duration(t)*time.Second).Err()
// }
// func (p *CtrlRedisClient_) get(k string) (string, error) {
// return p.redisClient.Get(k).Result()
// }
func HSet(r *redis.Client, key, field string, v interface{}) error {
return r.HSet(key, field, v).Err()
}
func HGet(r *redis.Client, key, field string) (string, error) {
v, err := r.HGet(key, field).Result()
return v, err
} }
func (p *CtrlRedisClient_) hget(key, field string) (string, error) { func HGetAll(r *redis.Client, key string) (map[string]string, error) {
return p.ctrlRedis.HGet(key, field).Result() return r.HGetAll(key).Result()
} }
func (p *CtrlRedisClient_) hgetAll(key string) (map[string]string, error) { func HDel(r *redis.Client, k, field string) error {
return p.ctrlRedis.HGetAll(key).Result() return r.HDel(k, field).Err()
} }
func (p *CtrlRedisClient_) set(k, v string, t int) error { func Set(r *redis.Client, k, v string, t int) error {
return p.ctrlRedis.Set(k, v, time.Duration(t)*time.Second).Err() return r.Set(k, v, time.Duration(t)*time.Second).Err()
} }
func (p *CtrlRedisClient_) get(k string) (string, error) { func Get(r *redis.Client, k string) (string, error) {
return p.ctrlRedis.Get(k).Result() return r.Get(k).Result()
} }
...@@ -4,7 +4,6 @@ import ( ...@@ -4,7 +4,6 @@ import (
"bytes" "bytes"
"ficus_clientserver/config" "ficus_clientserver/config"
"ficus_clientserver/model" "ficus_clientserver/model"
"ficus_clientserver/redis"
"net" "net"
"sync" "sync"
"time" "time"
...@@ -29,14 +28,13 @@ type Ctrl_RotuteRecv struct { ...@@ -29,14 +28,13 @@ type Ctrl_RotuteRecv struct {
Lock sync.RWMutex Lock sync.RWMutex
Rotute RotuteRecv_ Rotute RotuteRecv_
ctrl_mapClientMsg *model.CtrlmapClientMsg_ ctrl_mapClientMsg *model.CtrlmapClientMsg_
ctrl_redisMsg *redis.CtrlRedisClient_ ctrl_redisMsg *model.CtrlRedisClient_
} }
func NewCtrl_RotuteRecv(Ctrl_mapClientMsg *model.CtrlmapClientMsg_) *Ctrl_RotuteRecv { func NewCtrl_RotuteRecv(Ctrl_mapClientMsg *model.CtrlmapClientMsg_) *Ctrl_RotuteRecv {
p := &Ctrl_RotuteRecv{} p := &Ctrl_RotuteRecv{}
//p.RotuteMap=make(map[string]RotuteRecv_)
p.ctrl_mapClientMsg = Ctrl_mapClientMsg p.ctrl_mapClientMsg = Ctrl_mapClientMsg
p.ctrl_redisMsg = redis.NewCtrl_RedisClient_() p.ctrl_redisMsg = model.NewCtrl_RedisClient_()
go p.StartCommRetute() go p.StartCommRetute()
return p return p
} }
...@@ -59,13 +57,10 @@ func (p *Ctrl_RotuteRecv) ConnRotuteAndRead() { ...@@ -59,13 +57,10 @@ func (p *Ctrl_RotuteRecv) ConnRotuteAndRead() {
//发送一个登录消息 //发送一个登录消息
{ {
sname := config.GetSrverId() sname := config.GetSrverName()
var login config.SerCommand var login config.SerCommand
login.Command = config.COMMAND_LOGIN login.Command = config.COMMAND_LOGIN
login.SerType = config.SERTYPE_CONSUME login.SerType = config.SERTYPE_CONSUME
//sbuffer=[]byte()
//copy(,sname)
//login.Data=[]byte(sname)
copy(login.Data[:len(sname)], sname) copy(login.Data[:len(sname)], sname)
login.Length = uint32(len(sname)) login.Length = uint32(len(sname))
login.PushStatus = 0 login.PushStatus = 0
...@@ -79,8 +74,6 @@ func (p *Ctrl_RotuteRecv) ConnRotuteAndRead() { ...@@ -79,8 +74,6 @@ func (p *Ctrl_RotuteRecv) ConnRotuteAndRead() {
var data = *(*[]byte)(unsafe.Pointer(testBytes)) var data = *(*[]byte)(unsafe.Pointer(testBytes))
conn.Write(data) conn.Write(data)
} }
//var rs RotuteRecv_
//rs.Name=""
buffer := make([]byte, config.ROUTE_MAX_RECV_LEN) buffer := make([]byte, config.ROUTE_MAX_RECV_LEN)
for { for {
_, err2 := conn.Read(buffer) _, err2 := conn.Read(buffer)
...@@ -98,6 +91,7 @@ func (p *Ctrl_RotuteRecv) ConnRotuteAndRead() { ...@@ -98,6 +91,7 @@ func (p *Ctrl_RotuteRecv) ConnRotuteAndRead() {
index := bytes.IndexByte(pServer.Data[0:1024], 0) index := bytes.IndexByte(pServer.Data[0:1024], 0)
rbyf_pn := pServer.Data[0:index] rbyf_pn := pServer.Data[0:index]
log.Println("登录路由成功,路由名称", string(rbyf_pn), "角色:反馈者") log.Println("登录路由成功,路由名称", string(rbyf_pn), "角色:反馈者")
p.ctrl_redisMsg.SetRouter(config.GetSrverName(), string(rbyf_pn))
//rs.Name=string(pServer.Data[:]); //rs.Name=string(pServer.Data[:]);
//rs.Conn = conn; //rs.Conn = conn;
p.Rotute.Conn = conn p.Rotute.Conn = conn
...@@ -111,8 +105,7 @@ func (p *Ctrl_RotuteRecv) ConnRotuteAndRead() { ...@@ -111,8 +105,7 @@ func (p *Ctrl_RotuteRecv) ConnRotuteAndRead() {
//id //id
log.Println("recv one msg-------------id:", string(id), "---------------------------------------") log.Println("recv one msg-------------id:", string(id), "---------------------------------------")
msg := p.ctrl_redisMsg.GetMsg(string(id)) msg := p.ctrl_redisMsg.GetMsg(string(id))
if msg.MsgId == string(id) {
if msg.Mis.ID == string(id) {
p.ctrl_mapClientMsg.Write_Chan_ClientMsg(msg) p.ctrl_mapClientMsg.Write_Chan_ClientMsg(msg)
} else { } else {
log.Println("存在这个消息吗") log.Println("存在这个消息吗")
......
package serrpcid
import (
"ficus_clientserver/config"
"fmt"
"log"
)
/*******************************
1. 一个非侵占方式的 调用链监控
********************************/
type CALLRPC struct {
Project string // 项目的唯一标示
Traceid string // 模块的唯一标示
Log string // 输入的日志
}
type CallChain struct {
//用于存储
hashname string
//1个待执行的感到
chanCallLog chan CALLRPC
}
func NewCtrlCallChain() *CallChain {
p := &CallChain{}
p.chanCallLog = make(chan CALLRPC, 10000)
go p.Work()
return p
}
func (p *CallChain) Work() {
for {
mycall := <-p.chanCallLog
log.Println(mycall.Project)
}
}
func AppendRpcid(rpcid string) string {
var tempstr string
if len(rpcid) == 0 {
tempstr = config.SerConfig.Traceid
} else {
tempstr = fmt.Sprintf("%s.%s", rpcid, config.SerConfig.Traceid)
}
rpcid = tempstr
return rpcid
}
package sysstate
import (
"ficus/mission"
"ficus_clientserver/config"
"ficus_clientserver/model"
"fmt"
"html/template"
"net/http"
"time"
log "github.com/sirupsen/logrus"
)
//运行状态接口
func StartRunningSysState(host string) {
log.Println("start http:", host)
http.HandleFunc("/GetClientCount", GetClientCount)
http.HandleFunc("/DispatchClient", DispatchClient)
http.HandleFunc("/MsgCount", GetClientMsgCount)
err := http.ListenAndServe(host, nil)
if err != nil {
log.Fatal("Err: Listen: ", host, "err:", err)
}
}
//map 统计
func GetClientCount(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
//count :=Model.Ctrl_mapClient.MapCount()
//result:=fmt.Sprintf("map总大小:%d 在线:%d",count.AllCount,count.OnLineCount)
//fmt.Fprintf(w,result)
}
func GetClientMsgCount(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
//count :=Model.Ctrl_mapClientMsg.MapCount("ficus.1")
//count.Clientid="ficus.1"
//result:=fmt.Sprintf("机器id :%s 总消息数量:%v 成功:%v 失败 %v",count.Clientid,count.AllCount,count.Successmsg,count.FailedMsg)
//fmt.Fprintf(w,result)
}
//调度client
func DispatchClient(w http.ResponseWriter, r *http.Request) {
//initTemplate("template/test.html")
//p := Person{Name:"safly!!!!",age:"30"}
//myTemplate.Execute(w,p)
r.ParseForm()
var msg model.T_CMsg
var mis mission.Message
//mis.ID=fmt.Sprintf("%x",time.Now().Unix())
mis.Priority = 1
mis.Agent = "ficus.1"
//msg.ClientId="ficus.1";
msg.Mis = mis
msg.PushCount = 0
msg.PushStatus = config.MSG_WAITSUCCESS //等待执行
msg.Lastupdatetime = time.Now().Unix()
msg.Mis.ID = fmt.Sprintf("%v", time.Now().UnixNano())
//for i:=1; i<=2;i++ {
go forfor(0, msg)
//}
}
func forfor(k int, msg model.T_CMsg) {
for i := 0; i < 100; i++ {
msg.Mis.ID = fmt.Sprintf("%d%v", k, i)
//Model.Ctrl_mapClientMsg.Write_Chan_ClientMsg(msg)
}
}
var myTemplate *template.Template
type Person struct {
Name string
age string
}
func initTemplate(fileName string) (err error) {
myTemplate, err = template.ParseFiles(fileName)
if err != nil {
log.Println("parse file err:", err)
return
}
return
}
...@@ -6,7 +6,6 @@ import ( ...@@ -6,7 +6,6 @@ import (
"ficus/native" "ficus/native"
"ficus/service" "ficus/service"
"ficus_clientserver/model" "ficus_clientserver/model"
"ficus_clientserver/redis"
"time" "time"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
...@@ -17,10 +16,10 @@ import ( ...@@ -17,10 +16,10 @@ import (
type MyDispatchService struct { type MyDispatchService struct {
trans thrift.TTransport trans thrift.TTransport
ctrl_mapClient *model.CtrlmapClient_ ctrl_mapClient *model.CtrlmapClient_
ctrl_RedisClient *redis.CtrlRedisClient_ ctrl_RedisClient *model.CtrlRedisClient_
} }
func NewMyDispatchService(trans thrift.TTransport, ctrl_mapClient *model.CtrlmapClient_, ctrl_RedisClient *redis.CtrlRedisClient_) *MyDispatchService { func NewMyDispatchService(trans thrift.TTransport, ctrl_mapClient *model.CtrlmapClient_, ctrl_RedisClient *model.CtrlRedisClient_) *MyDispatchService {
p := &MyDispatchService{trans: trans, ctrl_mapClient: ctrl_mapClient, ctrl_RedisClient: ctrl_RedisClient} p := &MyDispatchService{trans: trans, ctrl_mapClient: ctrl_mapClient, ctrl_RedisClient: ctrl_RedisClient}
return p return p
} }
...@@ -58,7 +57,7 @@ func (p *MyDispatchService) Login(ctx context.Context, sself *native.Agent) (r b ...@@ -58,7 +57,7 @@ func (p *MyDispatchService) Login(ctx context.Context, sself *native.Agent) (r b
//写入客户端Map - 或者更新map 的 //写入客户端Map - 或者更新map 的
p.ctrl_mapClient.Set(key, gfclient) p.ctrl_mapClient.Set(key, gfclient)
//写路由到 redis - 客户端消息 //写路由到 redis - 客户端消息
p.ctrl_RedisClient.Write_ClientRedis(gfclient) //p.ctrl_RedisClient.Write_ClientRedis(gfclient)
} }
//写路由到redis //写路由到redis
p.ctrl_RedisClient.Write_RouteRedis(key) p.ctrl_RedisClient.Write_RouteRedis(key)
...@@ -72,10 +71,10 @@ func (p *MyDispatchService) Dispatch(ctx context.Context, request *mission.Messa ...@@ -72,10 +71,10 @@ func (p *MyDispatchService) Dispatch(ctx context.Context, request *mission.Messa
type myProcessorFactory struct { type myProcessorFactory struct {
ctrl_mapClient *model.CtrlmapClient_ ctrl_mapClient *model.CtrlmapClient_
ctrl_RedisClient *redis.CtrlRedisClient_ ctrl_RedisClient *model.CtrlRedisClient_
} }
func NewMyTProcessorFactory(ctrl_mapClient *model.CtrlmapClient_, ctrl_RedisClient *redis.CtrlRedisClient_) thrift.TProcessorFactory { func NewMyTProcessorFactory(ctrl_mapClient *model.CtrlmapClient_, ctrl_RedisClient *model.CtrlRedisClient_) thrift.TProcessorFactory {
return &myProcessorFactory{ctrl_mapClient, ctrl_RedisClient} return &myProcessorFactory{ctrl_mapClient, ctrl_RedisClient}
} }
func (p *myProcessorFactory) GetProcessor(trans thrift.TTransport) thrift.TProcessor { func (p *myProcessorFactory) GetProcessor(trans thrift.TTransport) thrift.TProcessor {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment