Commit f9d3c79b by yunpeng.song

初始化

parents
[server]
traceid=01
project=clientserver
\ No newline at end of file
package config
import (
"net"
)
type SCOMMAND uint32
const (
COMMAND_LOGIN SCOMMAND = 5001
COMMAND_HEARTBEAT SCOMMAND = 5002
COMMAND_TASK SCOMMAND = 5003
COMMAND_TASK_BACK SCOMMAND = 5004
)
type MSG_STATUS uint32
const (
//MSG_WAITEWORK MSG_STATUS = 0 //等待执行
MSG_WAITSUCCESS MSG_STATUS = 0 //已经下发等待成功
MSG_SUCCESSED MSG_STATUS = 1 //已经成功
MSG_FAILED MSG_STATUS = 3 //已经失敗
)
const OVER_ClIENT_TIME = 30
const ROUTE_MAX_RECV_LEN = 10240
const ROUTE_HOST_IP_PORT = "127.0.0.1:38000"
//const MyNmae = "ser1"
//注册服务类型
type SSERTYPE uint32
const (
SERTYPE_CONSUME SSERTYPE = 1000 //消费者
SERTYPE_FEEDBACK SSERTYPE = 1001 //反馈者
)
//服务 《====》 路由命令
type SerCommand struct {
Command SCOMMAND
SerType SSERTYPE
PushStatus MSG_STATUS
Length uint32
Data [1024]byte
}
var Ctrl_Config_ = NewCtrlConfig_
type Config_ struct {
Mq map[string]string `json:"mq list"`
}
func NewCtrlConfig_() *Config_ {
return &Config_{}
}
//服务 登录到 路由登录结构
type Server struct {
SerName string
Conn net.Conn
Online bool
Time int64
}
type SERVER_CONFIG struct {
Traceid string //"路由的唯一,追踪的标示"
Project string //"项目的唯一标示"
}
//redis key name
type REDISNAME_CONFIG struct {
MsgName_prefix string //"消息的前缀"
ClientName_prefix string //"客户端的前缀"
ClientServer_prefix string //""
MsgStatusListName string //"未执行消息的list,只带消息id的"
RotuteStatuslistName string //"在线路由的列表"
}
package config
import (
"fmp_kit_eyeatom/eyeatom"
"fmt"
"os"
"os/exec"
"strings"
"github.com/Unknwon/goconfig"
)
const CONFIG_CONFIGNAME = "Server.ini"
var SerConfig SERVER_CONFIG
var OctopusRedisConfig REDISNAME_CONFIG //各种数据在redis 中的名称
var ConfigFilePath string
//日志库,redis 配置
var Eyelog *eyeatom.EyeAtom
type CtrlConfig struct {
}
func NewCtrlConfig() *CtrlConfig {
p := &CtrlConfig{}
ConfigFilePath = fmt.Sprintf("%s%s", p.getCurrentPath(), CONFIG_CONFIGNAME)
p.creteConfigFile()
//初始 CONFIG 化结构体
{
SerConfig.Traceid = ""
SerConfig.Project = ""
}
p.LoadLocalOctopusConfig()
OctopusRedisConfig.MsgName_prefix = fmt.Sprintf("%s.Msg", SerConfig.Project)
OctopusRedisConfig.ClientName_prefix = fmt.Sprintf("%s.Client", SerConfig.Project)
OctopusRedisConfig.ClientServer_prefix = fmt.Sprintf("%s.ClientServer", SerConfig.Project)
OctopusRedisConfig.MsgStatusListName = fmt.Sprintf("%s.%s.%s", SerConfig.Project, SerConfig.Traceid, "MsgStatsus")
OctopusRedisConfig.RotuteStatuslistName = fmt.Sprintf("%s.%s", SerConfig.Project, "RouteStatus")
fmt.Println(OctopusRedisConfig.MsgName_prefix)
fmt.Println(OctopusRedisConfig.ClientName_prefix)
fmt.Println(OctopusRedisConfig.ClientServer_prefix)
fmt.Println(OctopusRedisConfig.MsgStatusListName)
fmt.Println(OctopusRedisConfig.RotuteStatuslistName)
//初始化 eyelog
{
var callrpc = eyeatom.CALLRPC{Project: SerConfig.Project, Traceid: SerConfig.Traceid, LogName: "serlog"}
var redisconfig = eyeatom.RedisConfig{Addr: "127.0.0.1:6379", Password: "", Db: 0}
Eyelog = eyeatom.NewCtrlCallChain(callrpc, redisconfig)
}
return p
}
func (p *CtrlConfig) LoadLocalOctopusConfig() SERVER_CONFIG {
cfg, err := goconfig.LoadConfigFile(ConfigFilePath)
if err != nil {
panic("错误")
}
SerConfig.Traceid, err = cfg.GetValue("Server", "traceid")
SerConfig.Project, err = cfg.GetValue("Server", "project")
if err != nil {
SerConfig.Traceid = ""
}
cfg.Reload()
return SerConfig
}
func (p *CtrlConfig) GetSerConfig() SERVER_CONFIG {
return SerConfig
}
//serConfig
func (p *CtrlConfig) GetLocalSerConfig() SERVER_CONFIG {
cfg, err := goconfig.LoadConfigFile(ConfigFilePath)
if err != nil {
panic("错误")
}
SerConfig.Traceid, err = cfg.GetValue("Server", "traceid")
SerConfig.Project, err = cfg.GetValue("Server", "project")
if err != nil {
SerConfig.Traceid = "ser1"
}
cfg.Reload()
return SerConfig
}
func (p *CtrlConfig) getCurrentPath() string {
s, err := exec.LookPath(os.Args[0])
if err != nil {
panic(err)
}
i := strings.LastIndex(s, "\\")
path := string(s[0 : i+1])
return path
}
func (p *CtrlConfig) creteConfigFile() {
if p.isFileExist(ConfigFilePath) == false {
file2, error := os.OpenFile(ConfigFilePath, os.O_RDWR|os.O_CREATE, 0766)
if error != nil {
fmt.Println(error)
}
file2.Close()
}
}
func (p *CtrlConfig) isFileExist(filename string) bool {
if _, err := os.Stat(filename); os.IsNotExist(err) {
return false
}
return true
}
package dispatch
import (
"context"
"ficus/mission"
"ficus/native"
"ficus/service"
"ficus_clientserver/model"
"ficus_clientserver/redis"
"fmt"
"time"
log "github.com/sirupsen/logrus"
"github.com/apache/thrift/lib/go/thrift"
)
type MyDispatchService struct {
trans thrift.TTransport
ctrl_mapClient *model.CtrlmapClient_
ctrl_RedisClient *redis.CtrlRedisClient_
}
func NewMyDispatchService(trans thrift.TTransport, ctrl_mapClient *model.CtrlmapClient_, ctrl_RedisClient *redis.CtrlRedisClient_) *MyDispatchService {
p := &MyDispatchService{trans: trans, ctrl_mapClient: ctrl_mapClient, ctrl_RedisClient: ctrl_RedisClient}
return p
}
func (p *MyDispatchService) Heartbeat(ctx context.Context, whom *native.Agent) (r bool, err error) {
log.Println("心跳id:", whom.ID)
//心跳上来,更新在线时间,同时检测一下 客户端的在线状态
p.ctrl_mapClient.UpdateOnlyHartTime(whom.ID)
//RouteRedis 写入客户端路由标示 --- 防止过期
p.ctrl_RedisClient.Write_RouteRedis(whom.ID)
return true, nil
}
//TODO yunpeng.song@freemud.cn 20190729
func (p *MyDispatchService) Feedback(ctx context.Context, response *mission.Message) (r bool, err error) {
p.ctrl_RedisClient.UpdateMsgStatus(*response)
return true, nil
}
func (p *MyDispatchService) Login(ctx context.Context, sself *native.Agent) (r bool, err error) {
log.Println("login :", sself.ID)
go func() {
//判断是否存在
key := sself.ID
if p.ctrl_mapClient.IsKey(key) {
p.ctrl_mapClient.UpdateHartTime(key, p.trans)
} else {
fmt.Println("上线---client:", key)
var gfclient model.T_FmClient
gfclient.ClientUuid = sself.ID
gfclient.Agent = *sself
gfclient.Trans = p.trans
gfclient.OnLine = true
gfclient.LastHeartBeatTime = time.Now().Unix()
//写入客户端Map - 或者更新map 的
p.ctrl_mapClient.Set(key, gfclient)
//写路由到 redis - 客户端消息
p.ctrl_RedisClient.Write_ClientRedis(gfclient)
}
//写路由到redis
p.ctrl_RedisClient.Write_RouteRedis(key)
}()
return true, nil
}
func (p *MyDispatchService) Dispatch(ctx context.Context, request *mission.Message) (err error) {
log.Println("Dispatch:")
return nil
}
type myProcessorFactory struct {
ctrl_mapClient *model.CtrlmapClient_
ctrl_RedisClient *redis.CtrlRedisClient_
}
func NewMyTProcessorFactory(ctrl_mapClient *model.CtrlmapClient_, ctrl_RedisClient *redis.CtrlRedisClient_) thrift.TProcessorFactory {
return &myProcessorFactory{ctrl_mapClient, ctrl_RedisClient}
}
func (p *myProcessorFactory) GetProcessor(trans thrift.TTransport) thrift.TProcessor {
// 在这里创建 handler 主要是为了保存 ttransport,以便后续通信
handler := NewMyDispatchService(trans, p.ctrl_mapClient, p.ctrl_RedisClient)
processor := service.NewDispatchServiceProcessor(handler)
return processor
}
package main
import (
"ficus_clientserver/config"
"ficus_clientserver/dispatch"
"ficus_clientserver/model"
"ficus_clientserver/redis"
"ficus_clientserver/route"
"os"
"github.com/apache/thrift/lib/go/thrift"
log "github.com/sirupsen/logrus"
)
const (
NetworkAddr0 = ":9090" //推送
NetworkAddr1 = ":9091" //反馈的
NetworkAddr2 = ":9092"
HttpState0 = ":9000"
)
var quitFlag chan int
func InitLog() {
file, err := os.OpenFile("test.log", os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 666)
if err != nil {
log.Fatalln("fail to create test.log file!")
}
// 设置日志格式为json格式
log.SetFormatter(&log.JSONFormatter{})
// 设置将日志输出到标准输出(默认的输出为stderr,标准错误)
// 日志消息输出可以是任意的io.writer类型
log.SetOutput(file)
// 设置日志级别为warn以上
log.SetLevel(log.InfoLevel)
log.Info("start")
}
func main() {
//读取配置文件
InitLog()
ctrlConfig := config.NewCtrlConfig()
localConfig := ctrlConfig.GetLocalSerConfig()
log.Println("Config Traceid:", localConfig.Traceid, " Project", localConfig.Project)
quitFlag = make(chan int)
go StartWork()
go Startwork0()
<-quitFlag
log.Println("退出")
}
// 连接路由
func StartWork() {
//客户端消息
Ctrl_mapClientMsg := model.NewCtrlmapClientMsg_(model.GetDefaultClientMap())
//连接路由,并读取数据
Route.NewCtrl_RotuteRecv(Ctrl_mapClientMsg)
}
//开启client 的thrift 服务
func Startwork0() {
serverTransport, err := thrift.NewTServerSocket(NetworkAddr0)
if err != nil {
log.Println("Error!", err)
os.Exit(1)
}
//redis 控制
processorFactory := dispatch.NewMyTProcessorFactory(model.GetDefaultClientMap(), redis.NewCtrl_RedisClient_())
protocolFactory := thrift.NewTCompactProtocolFactory()
transportFactory := thrift.NewTFramedTransportFactory(thrift.NewTTransportFactory())
server := thrift.NewTSimpleServerFactory4(processorFactory, serverTransport, transportFactory, protocolFactory)
log.Println("thrift server in", NetworkAddr0)
if err := server.Serve(); err != nil {
panic(err)
}
}
package model
import "fmt"
type FmError struct {
itype int
errMsg string
}
func NewMyErr(itype int, errMsg string) *FmError {
return &FmError{itype, errMsg}
}
//实现Error接口
func (e *FmError) Error() string {
return fmt.Sprintf("%d - %s", e.itype, e.errMsg)
}
package model
import (
"ficus/native"
"fmt"
"sync"
"time"
"github.com/apache/thrift/lib/go/thrift"
)
//客户端的操作函数
type T_FmClient struct {
Trans thrift.TTransport // Transport
ClientUuid string // client uuid
Agent native.Agent // agent
OnLine bool // whether on line
LastHeartBeatTime int64 // Last heartbeat time
}
type T_CountFmClient struct {
AllCount int // map all count
OnLineCount int // on line count
Time int64 // count time
}
type CtrlmapClient_ struct {
Lock *sync.RWMutex
//全局 => 所有的客户端
mapClient_ map[string]T_FmClient
}
var (
//客户端控制map
ClintMap = &CtrlmapClient_{Lock: new(sync.RWMutex), mapClient_: make(map[string]T_FmClient)}
)
func NewCtrl_MapClient_() *CtrlmapClient_ {
p := &CtrlmapClient_{Lock: new(sync.RWMutex), mapClient_: make(map[string]T_FmClient)}
return p
}
func GetDefaultClientMap() *CtrlmapClient_ {
return ClintMap
}
func (p *CtrlmapClient_) Get(k string) (T_FmClient, error) {
p.Lock.RLock()
defer p.Lock.RUnlock()
if _, ok := p.mapClient_[k]; ok {
return p.mapClient_[k], nil
}
err := NewMyErr(1, fmt.Sprint("map values is nil ,key:%v", k))
return p.mapClient_[k], err
}
func (p *CtrlmapClient_) Set(k string, v T_FmClient) {
p.Lock.Lock()
defer p.Lock.Unlock()
p.mapClient_[k] = v
}
func (p *CtrlmapClient_) SetTime(k string, itime int64) {
p.Lock.Lock()
defer p.Lock.Unlock()
v := p.mapClient_[k]
v.LastHeartBeatTime = itime
p.mapClient_[k] = v
}
func (p *CtrlmapClient_) IsKey(key string) bool {
p.Lock.RLock()
defer p.Lock.RUnlock()
if _, ok := p.mapClient_[key]; ok {
//存在
return true
}
return false
}
//更新心跳和socket
func (p *CtrlmapClient_) UpdateHartTime(key string, Trans thrift.TTransport) {
if p.IsKey(key) {
//更新时间
ff, _ := p.Get(key)
if ff.Trans.IsOpen() {
ff.OnLine = true
ff.LastHeartBeatTime = time.Now().Unix()
p.Set(key, ff)
} else {
if Trans.IsOpen() {
ff.OnLine = true
ff.Trans = Trans
ff.LastHeartBeatTime = time.Now().Unix()
p.Set(key, ff)
} else {
ff.OnLine = false
ff.LastHeartBeatTime = time.Now().Unix()
p.Set(key, ff)
}
}
}
}
//更新心跳和socket
func (p *CtrlmapClient_) UpdateOnlyHartTime(key string) {
if p.IsKey(key) {
//更新时间
ff, _ := p.Get(key)
if ff.Trans.IsOpen() {
ff.OnLine = true
ff.LastHeartBeatTime = time.Now().Unix()
p.Set(key, ff)
} else {
ff.OnLine = false
ff.LastHeartBeatTime = time.Now().Unix()
p.Set(key, ff)
}
}
}
func (p *CtrlmapClient_) MapCount() T_CountFmClient {
p.Lock.RLock()
defer p.Lock.RUnlock()
count := T_CountFmClient{0, 0, 0}
for _, v := range p.mapClient_ {
count.AllCount++
if v.OnLine {
count.OnLineCount++
}
}
count.Time = time.Now().Unix()
return count
}
func (p *CtrlmapClient_) SetLine(key string, b bool) {
c, err := p.Get(key)
if err != nil {
c.OnLine = b
p.Set(key, c)
}
}
package model
import (
"ficus/mission"
"ficus_clientserver/config"
"fmt"
"sync"
"time"
)
//维护一个任务列表的状态
type (
T_CMsg struct {
RouteName string //"路由主机"
Mis mission.Message
PushCount uint32 //"push count"
PushStatus config.MSG_STATUS //"发送状态"
Lastupdatetime int64 //laset dispatch time
Rpcid string
}
clientMsg struct {
chan_CMsg chan T_CMsg
//mapClentMsg map[string] T_CMsg "消息不保存了"
}
T_CountFmClientMsg struct {
Clientid string
AllCount int
Successmsg int
FailedMsg int
Time int64
}
CtrlmapClientMsg_ struct {
Lock *sync.RWMutex
mapLock *sync.RWMutex
mapClientMsg_ map[string]clientMsg
Ctrl_MapClient *CtrlmapClient_
}
)
func NewCtrlmapClientMsg_(Ctrl_MapClient *CtrlmapClient_) *CtrlmapClientMsg_ {
p := &CtrlmapClientMsg_{}
p.Lock = new(sync.RWMutex)
p.mapLock = new(sync.RWMutex)
p.mapClientMsg_ = make(map[string]clientMsg)
p.Ctrl_MapClient = Ctrl_MapClient
return p
}
func (p *CtrlmapClientMsg_) CteatClientMsgBykey(k string) {
p.Lock.Lock()
defer p.Lock.Unlock()
if _, ok := p.mapClientMsg_[k]; ok == false {
cc := clientMsg{}
cc.chan_CMsg = make(chan T_CMsg, 1000)
//cc.mapClentMsg = make(map[string]T_CMsg);
p.mapClientMsg_[k] = cc
go p.ReadClientMsg_Work(k)
}
}
func (p *CtrlmapClientMsg_) Write_Chan_ClientMsg(msg T_CMsg) {
//判断有没有这个列表,还有则写没有则不谢
p.CteatClientMsgBykey(msg.Mis.Agent)
p.mapClientMsg_[msg.Mis.Agent].chan_CMsg <- msg
}
func (p *CtrlmapClientMsg_) Read_Chan_ClietnMsg(k string) T_CMsg {
//p.Lock.RLock()
//defer p.Lock.RUnlock()
msg := <-p.mapClientMsg_[k].chan_CMsg
return msg
}
func (p *CtrlmapClientMsg_) ReadClientMsg_Work(k string) {
for {
msg := p.Read_Chan_ClietnMsg(k)
//添加一个rpcid
//发送消息
p.ClientMsg_Work(msg)
}
}
func (p *CtrlmapClientMsg_) ClientMsg_Work(t T_CMsg) {
//查询 redis 是否成功,成功则直接丢弃
cc, err := p.Ctrl_MapClient.Get(t.Mis.Agent)
if err == nil {
//查询是否在线
if cc.Trans.IsOpen() {
//发送消息
fmt.Println("准备发送消息!")
dis := &MyPushMsg{}
dis.PushDispatch(cc, t.Mis, p.Ctrl_MapClient)
t.PushCount = 1
t.Lastupdatetime = time.Now().Unix()
} else {
//设置map列表不在线
p.Ctrl_MapClient.SetLine(cc.Agent.ID, false)
//设置redis列表显示不在线
fmt.Println("设置map列表不在线!")
}
} else {
//不在线
fmt.Println("不在线")
}
return
}
package model
import (
"context"
"ficus/mission"
"ficus/service"
"ficus_clientserver/config"
"fmt"
"github.com/apache/thrift/lib/go/thrift"
)
//interface
type PushMsg interface {
PushDispatch(client T_FmClient, mis mission.Message) bool
PushClientDispatch(trans thrift.TTransport, mis mission.Message)
}
type MyPushMsg struct{}
func (p *MyPushMsg) PushDispatch(client T_FmClient, mis mission.Message, Ctrl_mapClient *CtrlmapClient_) bool {
if !client.Trans.IsOpen() {
Ctrl_mapClient.SetLine(client.Agent.ID, false)
//反馈发送失败
return false
}
return p.PushClientDispatch(client.Trans, mis)
}
func (p *MyPushMsg) PushClientDispatch(trans thrift.TTransport, mis mission.Message) bool {
callbackClient := service.NewDispatchServiceClientFactory(thrift.NewTFramedTransport(trans), thrift.NewTCompactProtocolFactory())
{
config.Eyelog.WriteCallAndLog(string(mis.ID), "", "发送消息到客户端 ... - 消息ID:", string(mis.ID), "客户端id:", mis.Agent)
}
// Type_DAEMON Type = 4587776
// Type_FRAMEWORK Type = 4588032
// Type_SYSTEMINFO Type = 4588288
// Type_LOGGER Type = 4588544
// Type_SYNCUTILITY Type = 4589056
// switch mis.Proto {
// case Type_SYNCUTILITY:
// }
buff := thrift.NewTMemoryBuffer()
proto := thrift.NewTBinaryProtocol(buff, true, true)
mis.Write(proto)
//mis.Data = buff.Bytes()
err := callbackClient.Dispatch(context.Background(), &mis)
if err != nil {
fmt.Println("err:", err)
}
return true
}
package redis
import (
"encoding/json"
"ficus/mission"
"ficus_clientserver/config"
"ficus_clientserver/model"
"ficus_clientserver/serrpcid"
"fmt"
"time"
//"github.com/garyburd/redigo/redis"
"sync"
"github.com/go-redis/redis"
)
type CtrlRedisClient_ struct {
Lock sync.RWMutex
ctrlRedis *redis.Client
chan_RedisClient chan model.T_FmClient
}
func NewCtrl_RedisClient_() *CtrlRedisClient_ {
p := &CtrlRedisClient_{}
p.chan_RedisClient = make(chan model.T_FmClient, 100000)
p.ctrlRedis = redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "", // no password set
DB: 0, // use default DB
})
go p.Read_Chan_Client()
//defer c.Close()
return p
}
func (p *CtrlRedisClient_) UpdateMsgStatus(mis mission.Message) bool {
msg := p.GetMsg(mis.ID)
msg.Mis = mis
msg.PushStatus = config.MSG_SUCCESSED
msg.Rpcid = serrpcid.AppendRpcid(msg.Rpcid)
//kname:=fmt.Sprintf("Msg.%s",mis.ID)
p.setMsg(msg)
return true
}
func (p *CtrlRedisClient_) set(c model.T_FmClient) bool {
jsonStr, _ := json.Marshal(c)
//_, err := p.ctrlRedis.Do("SET", c.Agent.ID, string(jsonStr))
err := p.ctrlRedis.Set(c.Agent.ID, string(jsonStr), 0).Err()
//p.ctrlRedis.Expire(c.Agent.ID,30)
//_,err=p.ctrlRedis.Do("EXPlRE", c.Agent,30)
if err != nil {
fmt.Println("redis set failed:", err)
return false
}
return true
}
func (p *CtrlRedisClient_) setClient(c model.T_FmClient) bool {
jsonStr, _ := json.Marshal(c)
kname := fmt.Sprintf("%s.%s", config.OctopusRedisConfig.ClientName_prefix, c.Agent.ID)
fmt.Println(kname)
err := p.ctrlRedis.Set(kname, string(jsonStr), 0).Err()
if err != nil {
fmt.Println("redis set failed:", err)
return false
}
return true
}
func (p *CtrlRedisClient_) setKV(k string, v string) bool {
//OVER_ClIENT_TIME
/*
_, err := p.ctrlRedis.Do("SET", k, v)
//p.ctrlRedis.Receive(k, 10)
if err != nil {
fmt.Println("redis set failed:", err)
return false;
}
*/
err := p.ctrlRedis.Set(k, v, 30*time.Second).Err()
if err != nil {
panic(err)
}
return true
}
func (p *CtrlRedisClient_) setMsg(msg model.T_CMsg) bool {
kname := fmt.Sprintf("%s.%s", config.OctopusRedisConfig.MsgName_prefix, msg.Mis.ID)
jsonstr, err := json.Marshal(msg)
if err != nil {
fmt.Println("Update Redis Msg Status err:", err)
return false
}
//fmt.Println("set redis id:",kname,"rpcid:",msg.Rpcid)
//fmt.Println(string(jsonstr))
err2 := p.ctrlRedis.Set(kname, string(jsonstr), 24*time.Hour).Err()
if err2 != nil {
fmt.Println("trlRedis.Set Err", err)
}
return true
}
func (p *CtrlRedisClient_) GetMsg(k string) model.T_CMsg {
kname := fmt.Sprintf("%s.%s", config.OctopusRedisConfig.MsgName_prefix, k)
//fmt.Println(kname,len(kname));
result, err := p.ctrlRedis.Get(kname).Result()
//fmt.Println(result)
if err != nil {
fmt.Println("redis get:", k, " failed err:", err)
}
var msg model.T_CMsg
err2 := json.Unmarshal([]byte(result), &msg)
if err2 != nil {
fmt.Println("json.Unmarshal error!", err2)
}
return msg
}
func (p *CtrlRedisClient_) get(k string) model.T_FmClient {
var client model.T_FmClient
//result, err := redis.String(p.ctrlRedis.Do("GET", k))
result, err := p.ctrlRedis.Get(k).Result()
if err != nil {
fmt.Println("redis get:", k, " failed err:", err)
} else {
fmt.Println("redis get: ", k, ": values:", result)
err := json.Unmarshal([]byte(result), &client)
if err != nil {
fmt.Println("error:", err)
}
}
return client
}
//写路由到 redis
func (p *CtrlRedisClient_) Write_RouteRedis(key string) {
go func() {
k := fmt.Sprintf("%s.%v", config.OctopusRedisConfig.ClientServer_prefix, key)
fmt.Println("ClientServer :", k)
v := config.SerConfig.Traceid
fmt.Println("ClientServer value:", v)
p.setKV(k, v)
}()
}
//写客户端到 redis
func (p *CtrlRedisClient_) Write_ClientRedis(c model.T_FmClient) {
go func() {
c.Trans = nil
//信息结构
p.setClient(c)
}()
}
func (p *CtrlRedisClient_) Write_Chan_Client(c model.T_FmClient) {
go func() {
p.chan_RedisClient <- c
}()
}
func (p *CtrlRedisClient_) Read_Chan_Client() {
for {
cc := <-p.chan_RedisClient
p.set(cc)
}
}
package Route
import (
"ficus_clientserver/config"
"ficus_clientserver/model"
"ficus_clientserver/redis"
"fmt"
"net"
"sync"
"time"
"unsafe"
)
type SliceMock struct {
addr uintptr
len int
cap int
}
//接受推送服务
type RotuteRecv_ struct {
Name string
Conn net.Conn
}
type Ctrl_RotuteRecv struct {
Lock sync.RWMutex
Rotute RotuteRecv_
ctrl_mapClientMsg *model.CtrlmapClientMsg_
ctrl_redisMsg *redis.CtrlRedisClient_
}
func NewCtrl_RotuteRecv(Ctrl_mapClientMsg *model.CtrlmapClientMsg_) *Ctrl_RotuteRecv {
p := &Ctrl_RotuteRecv{}
//p.RotuteMap=make(map[string]RotuteRecv_)
p.ctrl_mapClientMsg = Ctrl_mapClientMsg
p.ctrl_redisMsg = redis.NewCtrl_RedisClient_()
go p.StartCommRetute()
return p
}
func (p *Ctrl_RotuteRecv) StartCommRetute() {
//连接 ==》 路由服务
for {
p.ConnRotuteAndRead()
time.Sleep(2 * time.Second)
}
}
func (p *Ctrl_RotuteRecv) ConnRotuteAndRead() {
conn, err := net.Dial("tcp", config.ROUTE_HOST_IP_PORT)
if err != nil {
fmt.Println("dial failed:", err)
fmt.Println("wait reconnection …… …… ……")
return
}
//发送一个登录消息
{
sname := "ser1"
var login config.SerCommand
login.Command = config.COMMAND_LOGIN
login.SerType = config.SERTYPE_CONSUME
//sbuffer=[]byte()
//copy(,sname)
//login.Data=[]byte(sname)
copy(login.Data[:len(sname)], sname)
login.Length = uint32(len(sname))
login.PushStatus = 0
Len := unsafe.Sizeof(login)
testBytes := &SliceMock{
addr: uintptr(unsafe.Pointer(&login)),
cap: int(Len),
len: int(Len),
}
var data = *(*[]byte)(unsafe.Pointer(testBytes))
conn.Write(data)
}
//var rs RotuteRecv_
//rs.Name=""
buffer := make([]byte, config.ROUTE_MAX_RECV_LEN)
for {
_, err2 := conn.Read(buffer)
if err2 != nil {
fmt.Println("conn error return")
return
}
//接受数据成功
pServer := *(**config.SerCommand)(unsafe.Pointer(&buffer))
fmt.Println("Recv Command:", pServer.Command)
if pServer.Command == config.COMMAND_LOGIN {
// 登录成功,添加路由到Map
fmt.Println("登录路由成功,路由名称", string(pServer.Data[:pServer.Length]), "角色:消费者")
//rs.Name=string(pServer.Data[:]);
//rs.Conn = conn;
p.Rotute.Conn = conn
p.Rotute.Name = string(pServer.Data[:])
} else if pServer.Command == config.COMMAND_HEARTBEAT {
// 收到这个为路由主动检测心跳,则不响应
} else if pServer.Command == config.COMMAND_TASK {
//推送来的任务,写入队列
if len(p.Rotute.Name) > 0 {
/*
var msg Model.T_CMsg
err:=json.Unmarshal(pServer.Data[:pServer.Length],&msg)
msg.RouteName=p.Rotute.Name
if err == nil {
fmt.Println("Recv Msg Id:",msg.Mis.ID)
//转成相关的数据格式
//Ctrl_mapCMsg.Set(msg.Mis.ID,msg)
p.ctrl_mapClientMsg.Write_Chan_ClientMsg(msg)
}else{
fmt.Println("json.Unmarshal:",err)
}
*/
id := pServer.Data[:pServer.Length]
//id
fmt.Println("recv one msg-------------id:", string(id), "---------------------------------------")
{
config.Eyelog.WriteCallAndLog(string(id), "", "接收到路由消息 ... - 消息ID:", string(id))
}
msg := p.ctrl_redisMsg.GetMsg(string(id))
if msg.Mis.ID == string(id) {
p.ctrl_mapClientMsg.Write_Chan_ClientMsg(msg)
} else {
fmt.Println("存在这个消息吗")
}
} else {
fmt.Println("未登录成功,不接受消息推送 …… ")
}
}
}
}
package Route
import (
json2 "encoding/json"
"ficus/mission"
"ficus_clientserver/config"
"fmt"
"net"
"sync"
"time"
"unsafe"
)
//接受推送服务
type RouteSend_ struct {
Name string
Conn net.Conn
}
type Ctrl_RouteSend struct {
Lock sync.RWMutex
Rotute RouteSend_
chanSendTMsg chan config.SerCommand
//ctrl_mapClientMsg *Model.CtrlmapClientMsg_
}
func NewCtrl_RotuteSend() *Ctrl_RouteSend {
p := &Ctrl_RouteSend{}
p.chanSendTMsg = make(chan config.SerCommand, 10000)
//p.RotuteMap=make(map[string]RotuteRecv_)
//p.ctrl_mapClientMsg=Ctrl_mapClientMsg
go p.StartCommRetute()
go p.Read_chanSendTMsg()
return p
}
func (p *Ctrl_RouteSend) send(comm config.SerCommand) {
Len := unsafe.Sizeof(comm)
testBytes := &SliceMock{
addr: uintptr(unsafe.Pointer(&comm)),
cap: int(Len),
len: int(Len),
}
var data = *(*[]byte)(unsafe.Pointer(testBytes))
p.Rotute.Conn.Write(data)
}
func (p *Ctrl_RouteSend) Write_chanSendTMsg(mis mission.Message) {
var comm config.SerCommand
comm.Command = config.COMMAND_TASK_BACK //任务消息的反馈
comm.PushStatus = config.MSG_SUCCESSED //消息成功
comm.SerType = config.SERTYPE_FEEDBACK // 反馈者
json, _ := json2.Marshal(mis)
copy(comm.Data[:len(json)], json)
comm.Length = uint32(len(json))
p.chanSendTMsg <- comm
}
func (p *Ctrl_RouteSend) Read_chanSendTMsg() {
for {
if p.Rotute.Name != "" {
break
}
}
for {
msg := <-p.chanSendTMsg
go func() {
p.send(msg)
}()
}
}
func (p *Ctrl_RouteSend) StartCommRetute() {
//连接 ==》 路由服务
for {
p.ConnRotuteAndRead()
time.Sleep(2 * time.Second)
}
}
func (p *Ctrl_RouteSend) ConnRotuteAndRead() {
conn, err := net.Dial("tcp", config.ROUTE_HOST_IP_PORT)
if err != nil {
fmt.Println("dial failed:", err)
fmt.Println("wait reconnection …… …… ……")
//os.Exit(1)
return
}
//defer conn.Close()
//发送一个登录消息
//sbuffer:=make([]byte, 1024)
{
sname := "ser1"
var login config.SerCommand
login.Command = config.COMMAND_LOGIN
login.SerType = config.SERTYPE_FEEDBACK // 反馈者
//sbuffer=[]byte()
//copy(,sname)
//login.Data=[]byte(sname)
copy(login.Data[:len(sname)], sname)
login.Length = uint32(len(sname))
login.PushStatus = 0
Len := unsafe.Sizeof(login)
testBytes := &SliceMock{
addr: uintptr(unsafe.Pointer(&login)),
cap: int(Len),
len: int(Len),
}
var data = *(*[]byte)(unsafe.Pointer(testBytes))
conn.Write(data)
}
//var rs RotuteRecv_
//rs.Name=""
for {
buffer := make([]byte, config.ROUTE_MAX_RECV_LEN)
_, err2 := conn.Read(buffer)
if err2 != nil {
fmt.Println("conn error return")
return
}
//接受数据成功
pServer := *(**config.SerCommand)(unsafe.Pointer(&buffer))
fmt.Println("Recv Command:", pServer.Command)
if pServer.Command == config.COMMAND_LOGIN {
// 登录成功,添加路由到Map
fmt.Println("登录路由成功,路由名称", string(pServer.Data[:pServer.Length]), "角色:反馈者")
p.Rotute.Name = string(pServer.Data[:])
p.Rotute.Conn = conn
//p.Rotute=rs
} else if pServer.Command == config.COMMAND_HEARTBEAT {
// 收到这个为服务端主动心跳,则不响应
} else if pServer.Command == config.COMMAND_TASK {
//这里为 发送反馈 不接受 返回的 COMMAND_TASK 任务
}
}
}
package schedule
import (
"context"
"ficus/mission"
"ficus_clientserver/config"
"ficus_clientserver/redis"
"fmt"
)
type MySchedule struct {
Ctrl_RedisMsg *redis.CtrlRedisClient_
}
func (p *MySchedule) Schedule(ctx context.Context, request *mission.Message) (r bool, err error) {
return true, nil
}
// Parameters:
// - Response
func (p *MySchedule) Feedback(ctx context.Context, response *mission.Message) (r bool, err error) {
//response.ID
//Model.Ctrl_mapClientMsg.Feedback(response.Agent,response.ID,true);
fmt.Println("recv feedback……………id:", response.Agent, response.ID)
//直接更新一个数据到 redis
{
config.Eyelog.WriteCallAndLog(string(response.ID), response.Agent, "客户端发送消息 ... - 消息ID:", response.ID, "客户端id:", response.Agent)
}
{
config.Eyelog.WriteCallAndLog(string(response.ID), "", "收到客户端消息 ... - 消息ID:", response.ID, "客户端id:", response.Agent)
}
p.Ctrl_RedisMsg.UpdateMsgStatus(*response)
return true, nil
}
package serrpcid
import (
"ficus_clientserver/config"
"fmt"
)
/*******************************
1. 一个非侵占方式的 调用链监控
********************************/
type CALLRPC struct {
Project string // 项目的唯一标示
Traceid string // 模块的唯一标示
Log string // 输入的日志
}
type CallChain struct {
//用于存储
hashname string
//1个待执行的感到
chanCallLog chan CALLRPC
}
func NewCtrlCallChain() *CallChain {
p := &CallChain{}
p.chanCallLog = make(chan CALLRPC, 10000)
go p.Work()
return p
}
func (p *CallChain) Work() {
for {
mycall := <-p.chanCallLog
fmt.Println(mycall.Project)
}
}
func AppendRpcid(rpcid string) string {
var tempstr string
if len(rpcid) == 0 {
tempstr = config.SerConfig.Traceid
} else {
tempstr = fmt.Sprintf("%s.%s", rpcid, config.SerConfig.Traceid)
}
rpcid = tempstr
return rpcid
}
package sysstate
import (
"ficus/mission"
"ficus_clientserver/config"
"ficus_clientserver/model"
"fmt"
"html/template"
"log"
"net/http"
"time"
)
//运行状态接口
func StartRunningSysState(host string) {
fmt.Println("start http:", host)
http.HandleFunc("/GetClientCount", GetClientCount)
http.HandleFunc("/DispatchClient", DispatchClient)
http.HandleFunc("/MsgCount", GetClientMsgCount)
err := http.ListenAndServe(host, nil)
if err != nil {
log.Fatal("Err: Listen: ", host, "err:", err)
}
}
//map 统计
func GetClientCount(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
//count :=Model.Ctrl_mapClient.MapCount()
//result:=fmt.Sprintf("map总大小:%d 在线:%d",count.AllCount,count.OnLineCount)
//fmt.Fprintf(w,result)
}
func GetClientMsgCount(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
//count :=Model.Ctrl_mapClientMsg.MapCount("ficus.1")
//count.Clientid="ficus.1"
//result:=fmt.Sprintf("机器id :%s 总消息数量:%v 成功:%v 失败 %v",count.Clientid,count.AllCount,count.Successmsg,count.FailedMsg)
//fmt.Fprintf(w,result)
}
//调度client
func DispatchClient(w http.ResponseWriter, r *http.Request) {
//initTemplate("template/test.html")
//p := Person{Name:"safly!!!!",age:"30"}
//myTemplate.Execute(w,p)
r.ParseForm()
var msg model.T_CMsg
var mis mission.Message
//mis.ID=fmt.Sprintf("%x",time.Now().Unix())
mis.Priority = 1
mis.Agent = "ficus.1"
//msg.ClientId="ficus.1";
msg.Mis = mis
msg.PushCount = 0
msg.PushStatus = config.MSG_WAITSUCCESS //等待执行
msg.Lastupdatetime = time.Now().Unix()
msg.Mis.ID = fmt.Sprintf("%v", time.Now().UnixNano())
//for i:=1; i<=2;i++ {
go forfor(0, msg)
//}
}
func forfor(k int, msg model.T_CMsg) {
for i := 0; i < 100; i++ {
msg.Mis.ID = fmt.Sprintf("%d%v", k, i)
//Model.Ctrl_mapClientMsg.Write_Chan_ClientMsg(msg)
}
}
var myTemplate *template.Template
type Person struct {
Name string
age string
}
func initTemplate(fileName string) (err error) {
myTemplate, err = template.ParseFiles(fileName)
if err != nil {
fmt.Println("parse file err:", err)
return
}
return
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment