Commit 80e38f4d by yunpeng.song

初始化

parents
package config
import (
"ficus/mission"
)
type SCOMMAND uint32
const (
COMMAND_LOGIN SCOMMAND = 5001
COMMAND_HEARTBEAT SCOMMAND = 5002
COMMAND_TASK SCOMMAND = 5003
COMMAND_TASK_BACK SCOMMAND = 5004
)
type MSG_STATUS uint32
const (
//MSG_WAITEWORK MSG_STATUS = 0 //等待执行
MSG_WAITSUCCESS MSG_STATUS = 0 //等待成功
MSG_SUCCESSED MSG_STATUS = 1 //已经成功
MSG_FAILED MSG_STATUS = 3 //已经失敗
)
//注册服务类型
type SSERTYPE uint32
const (
SERTYPE_CONSUME SSERTYPE = 1000 //消费者
SERTYPE_FEEDBACK SSERTYPE = 1001 //反馈者
)
const MQ_MAX_RECV_LEN = 10240
const OVER_SER_TIME = 30
//服务 《====》 路由命令
type SerCommand struct {
Command SCOMMAND
SerType SSERTYPE
PushStatus MSG_STATUS
Length uint32
Data [1024]byte
}
type Config_Redis struct {
SerName string
Time int64
}
type T_CMsg struct {
RouteName string //"路由主机"
Mis mission.Message
PushCount uint32 //"push count 重试次数,1 为发送一次,依此类推"
PushStatus MSG_STATUS //"发送状态"
//Bfeedback bool "false:客户端未成功接收消息,true 客户端成功接收消息"
Lastupdatetime int64 //"laset dispatch time"
Rpcid string //"追踪使用"
}
type OCTOPUS_CONFIG struct {
Traceid string //"路由的唯一,追踪的标示"
Project string //"项目的唯一标示"
}
//redis key name
type REDISNAME_CONFIG struct {
MsgName_prefix string //"消息的前缀"
ClientName_prefix string //"客户端的前缀"
ClientServer_prefix string //""
MsgStatusListName string //"未执行消息的list,只带消息id的"
RotuteStatuslistName string //"在线路由的列表"
}
//路由状态表
type ROUTE_STATUS struct {
Name string //"路由名称"
Time string //"路由的刷新时间"
QueueNum int //"队列个数"
CPU int //"CPU使用率"
Memory int //"内存使用量,单位为G"
GoCount int //"协成数"
OnlineSer []string //"在线Server的名称"
}
// 服务状态表
type SER_STATUS struct {
Name string //"路由名称"
Time string //"路由的刷新时间"
OnlineClient int //"在线客户端"
CPU int //"CPU使用率"
Memory int //"内存使用量,单位为G"
GoCount int //"协成数"
}
package config
import (
"fmp_kit_eyeatom/eyeatom"
"fmt"
"os"
"os/exec"
"strings"
"github.com/Unknwon/goconfig"
)
//加载配置文件
const CONFIG_CONFIGNAME_OCTOPUS = "Octopus.ini"
var OctopusConfig OCTOPUS_CONFIG
var OctopusRedisConfig REDISNAME_CONFIG //各种数据在redis 中的名称
var OctopusConfigFilePath string
var Eyelog *eyeatom.EyeAtom
type CtrlOctopusConfig struct {
}
func NewCtrlOctopusConfig() *CtrlOctopusConfig {
p := &CtrlOctopusConfig{}
OctopusConfigFilePath = fmt.Sprintf("%s%s", p.getCurrentPath(), CONFIG_CONFIGNAME_OCTOPUS)
fmt.Println(OctopusConfigFilePath)
p.creteConfigFile()
//初始 CONFIG 化结构体
{
OctopusConfig.Traceid = ""
OctopusConfig.Project = ""
}
p.LoadLocalOctopusConfig()
OctopusRedisConfig.MsgName_prefix = fmt.Sprintf("%s.Msg", OctopusConfig.Project)
OctopusRedisConfig.ClientName_prefix = fmt.Sprintf("%s.Client", OctopusConfig.Project)
OctopusRedisConfig.ClientServer_prefix = fmt.Sprintf("%s.ClientServer", OctopusConfig.Project)
OctopusRedisConfig.MsgStatusListName = fmt.Sprintf("%s.%s.%s", OctopusConfig.Project, OctopusConfig.Traceid, "MsgStatsus")
OctopusRedisConfig.RotuteStatuslistName = fmt.Sprintf("%s.%s", OctopusConfig.Project, "RouteStatus")
fmt.Println(OctopusRedisConfig.MsgName_prefix)
fmt.Println(OctopusRedisConfig.ClientName_prefix)
fmt.Println(OctopusRedisConfig.ClientServer_prefix)
fmt.Println(OctopusRedisConfig.MsgStatusListName)
fmt.Println(OctopusRedisConfig.RotuteStatuslistName)
//初始化 eyelog
{
var callrpc = eyeatom.CALLRPC{OctopusConfig.Project, OctopusConfig.Traceid, "serlog"}
var redisconfig = eyeatom.RedisConfig{"127.0.0.1:6379", "", 0}
Eyelog = eyeatom.NewCtrlCallChain(callrpc, redisconfig)
}
return p
}
func (p *CtrlOctopusConfig) LoadLocalOctopusConfig() OCTOPUS_CONFIG {
cfg, err := goconfig.LoadConfigFile(OctopusConfigFilePath)
if err != nil {
panic("错误")
}
OctopusConfig.Traceid, err = cfg.GetValue("Octopus", "traceid")
OctopusConfig.Project, err = cfg.GetValue("Octopus", "project")
if err != nil {
OctopusConfig.Traceid = ""
}
cfg.Reload()
return OctopusConfig
}
func (p *CtrlOctopusConfig) getCurrentPath() string {
s, err := exec.LookPath(os.Args[0])
if err != nil {
panic(err)
}
i := strings.LastIndex(s, "\\")
path := string(s[0 : i+1])
return path
}
func (p *CtrlOctopusConfig) creteConfigFile() {
if p.isFileExist(OctopusConfigFilePath) == false {
file2, error := os.OpenFile(OctopusConfigFilePath, os.O_RDWR|os.O_CREATE, 0766)
if error != nil {
fmt.Println(error)
}
file2.Close()
}
}
func (p *CtrlOctopusConfig) isFileExist(filename string) bool {
if _, err := os.Stat(filename); os.IsNotExist(err) {
return false
}
return true
}
package main
import (
"ficus_router/config"
"ficus_router/model"
"ficus_router/work"
"fmt"
_ "net/http/pprof"
"os"
"time"
log "github.com/sirupsen/logrus"
)
const HttpState1 = ":38888"
var quitFlag chan int
func InitLog() {
file, err := os.OpenFile("test.log", os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 666)
if err != nil {
log.Fatalln("fail to create test.log file!")
}
// 设置日志格式为json格式
log.SetFormatter(&log.JSONFormatter{})
// 设置将日志输出到标准输出(默认的输出为stderr,标准错误)
// 日志消息输出可以是任意的io.writer类型
log.SetOutput(file)
// 设置日志级别为warn以上
log.SetLevel(log.InfoLevel)
log.Info("start")
}
func main() {
InitLog()
cfg := config.NewCtrlOctopusConfig()
octCfg := cfg.LoadLocalOctopusConfig()
fmt.Println("Octopus:Traceid", octCfg.Traceid)
//管道阻塞主协程,防止主协程退出
quitFlag = make(chan int)
//服务程序 上线Map
ctr_mapSer := model.NewServermap()
//Msg map 存储 Msg 状态的map
go work.StartClientServer(ctr_mapSer)
time.Sleep(5 * time.Second)
go work.Start(ctr_mapSer)
<-quitFlag
fmt.Println("退出")
}
package model
import "fmt"
type FmError struct {
itype int
errMsg string
}
func NewMyErr(itype int, errMsg string) *FmError {
return &FmError{itype, errMsg}
}
//实现Error接口
func (e *FmError) Error() string {
return fmt.Sprintf("%d - %s", e.itype, e.errMsg)
}
package model
import (
"encoding/json"
"ficus/mission"
"ficus_router/config"
"ficus_router/mqcontrol"
"ficus_router/redisctrl"
"fmt"
"sync"
"time"
log "github.com/sirupsen/logrus"
)
/*
* 所有的消息集合,这里属于维护集合
*/
type CtrlMsgMap struct {
Timeout int64 // 超时时间
Lock sync.RWMutex
msgMap map[string]config.T_CMsg // 所有到达路由的消息
chan_msgMap chan mission.Message // 写入map的缓冲管道
Ctrl_Retry *CtrlRetry
ctrlRedisClient *redisctrl.CtrlRedisMsg
ctrl_SrverMap *Ctrl_Srvermap
ctrl_SendMq *mqcontrol.Producer
ctrl_RMsgStatus *redisctrl.RMsgStatus
}
func NewMsgmap(ctrl_SendMq *mqcontrol.Producer, ctrl_SrverMap *Ctrl_Srvermap, Ctrl_Retry *CtrlRetry) *CtrlMsgMap {
p := &CtrlMsgMap{}
p.Timeout = 10
p.chan_msgMap = make(chan mission.Message, 10000)
p.Ctrl_Retry = Ctrl_Retry
p.ctrl_SrverMap = ctrl_SrverMap
p.ctrl_SendMq = ctrl_SendMq
p.ctrlRedisClient = redisctrl.NewCtrlRedisMsg()
p.msgMap = make(map[string]config.T_CMsg)
//p.chan_TimeOutMsg=make(chan config.T_CMsg,10000)
p.ctrl_RMsgStatus = redisctrl.NewRMsgStatus()
//启动 缓存map的读取
go p.work_Read_chan_msgMap()
//启动超时检测
go p.TimeOutTimerMsg()
return p
}
func (p *CtrlMsgMap) Write_chan_msgMap(mis mission.Message) {
go func() {
{ //日志
config.Eyelog.WriteIdAndLog(mis.ID, "", "消息写入路由的消息map中 ID:", mis.ID)
}
p.chan_msgMap <- mis
}()
}
func (p *CtrlMsgMap) work_Read_chan_msgMap() {
for {
mis := <-p.chan_msgMap
//写入自身的 - map
p.set(mis)
//同时写入 redis 待执行目录
p.ctrl_RMsgStatus.StatusWork(mis.ID, config.MSG_WAITSUCCESS)
}
}
//超时检测协成
func (p *CtrlMsgMap) TimeOutTimerMsg() {
for {
fmt.Println("------------执行一次超时检测--------------")
p.Lock.Lock()
for k, v := range p.msgMap {
log.WithFields(log.Fields{"func": "msgkey"}).Info(k)
msg := p.ctrlRedisClient.GetMsg(k)
if msg.PushStatus == config.MSG_WAITSUCCESS {
//获取 客户端 连接到的 服务 Traceid
serName := p.ctrlRedisClient.GetClientConnSerTraceid(msg.Mis.Agent)
fmt.Println(serName)
if serName == "" {
fmt.Println(msg.Mis.ID, "----", msg.Mis.Agent, "-- 客户端未上线")
//未获取到客户端上线标识,证明 此机器并未上线,则忽略此消息
continue
}
if p.ctrl_SrverMap.IsServer(serName) {
//超时写入重试管道
ic := time.Now().Unix() - v.Lastupdatetime
if ic > p.Timeout {
//超时,写入重试接口
var rm SerMsg
rm.SerName = v.RouteName
rm.MsgId = v.Mis.ID
rm.TimeSecond = v.Lastupdatetime
p.Ctrl_Retry.Write_chan_retryChan(rm)
}
} else {
//客户端上线,但是不在连接的列表中,则丢给MQ , 并且删除redis的缓存
jsonstr, _ := json.Marshal(msg.Mis)
//发送给mq
p.ctrl_SendMq.Push(jsonstr)
//从 map 中删除 这个key
delete(p.msgMap, k)
//从redis中删除这个key
p.ctrl_RMsgStatus.Del(msg.Mis.ID)
}
} else if msg.PushStatus == config.MSG_SUCCESSED {
//从 map 中删除 这个key
delete(p.msgMap, k)
p.ctrl_RMsgStatus.StatusWork(msg.Mis.ID, config.MSG_SUCCESSED)
}
}
p.Lock.Unlock()
time.Sleep(30 * time.Second)
}
}
func (p *CtrlMsgMap) getMsg(k string) config.T_CMsg {
p.Lock.RLock()
defer p.Lock.RUnlock()
return p.msgMap[k]
}
func (p *CtrlMsgMap) SetMsg(msg config.T_CMsg) bool {
p.Lock.RLock()
defer p.Lock.RUnlock()
p.msgMap[msg.Mis.ID] = msg
return true
}
func (p *CtrlMsgMap) Get(k string) mission.Message {
p.Lock.RLock()
var mis mission.Message
mis.ID = ""
mis.Agent = ""
defer p.Lock.RUnlock()
if _, ok := p.msgMap[k]; ok {
msg := p.msgMap[k]
return msg.Mis
}
return mis
}
func (p *CtrlMsgMap) set(mis mission.Message) {
p.Lock.Lock()
defer p.Lock.Unlock()
if v, ok := p.msgMap[mis.ID]; ok {
//存在,更新消息内容,并且更新消息内容
v.PushStatus = config.MSG_WAITSUCCESS
v.Lastupdatetime = time.Now().Unix()
v.PushCount++
v.Mis = mis
p.msgMap[mis.ID] = v
} else {
var m config.T_CMsg
m.PushCount = 1
m.Mis = mis
m.Lastupdatetime = time.Now().Unix()
m.PushStatus = config.MSG_WAITSUCCESS
p.msgMap[mis.ID] = m
}
}
func (p *CtrlMsgMap) AddCount(k string) {
p.Lock.Lock()
defer p.Lock.Unlock()
if v, ok := p.msgMap[k]; ok {
v.PushCount++
p.msgMap[k] = v
}
}
func (p *CtrlMsgMap) UpdateStatus(k string, status config.MSG_STATUS) error {
p.Lock.Lock()
defer p.Lock.Unlock()
if v, ok := p.msgMap[k]; ok {
//存在,更新一下狀態
v.PushStatus = status
p.msgMap[k] = v
} else {
return NewMyErr(1, fmt.Sprint("map values is nil ,key:%v", k))
}
return nil
}
package model
import (
"encoding/json"
"ficus_router/config"
"fmt"
"strconv"
"time"
"github.com/go-redis/redis"
)
/*
* 路由在线状态
* hash集合的名称为 Project.Traceid.RouteStatus
*
*/
type RRouteStatus struct {
ctrlRedisRouteStatus *redis.Client
hashname string "hash name"
timeout int64
ctrl_msgMap *CtrlMsgMap
}
func NewRRouteStatus(ctrl_msgMap *CtrlMsgMap, timeout int64) *RRouteStatus {
p := &RRouteStatus{}
p.ctrl_msgMap = ctrl_msgMap
p.ctrlRedisRouteStatus = redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "", // no password set
DB: 0, // use default DB
})
p.timeout = timeout
//p.hashname=fmt.Sprint("%s.%s.%s",Config.OctopusConfig.Project,Config.OctopusConfig.Traceid,"RouteStatus")
p.hashname = config.OctopusRedisConfig.RotuteStatuslistName
fmt.Println("路由上线 Redis Hash 名称:", p.hashname)
go p.routeWorkStatus()
return p
}
//定时更新,证明自己在线
func (p *RRouteStatus) routeWorkStatus() {
for {
p.set() //每 N 秒 写入一次在线
go p.check()
time.Sleep(time.Duration(p.timeout-1) * time.Second)
}
}
func (p *RRouteStatus) getRedisTime() time.Time {
rtime, err := p.ctrlRedisRouteStatus.Time().Result()
if err == nil {
return rtime
} else {
mytime := time.Now()
return mytime
}
}
func (p *RRouteStatus) check() {
var mymap = make(map[string]string)
mymap, _ = p.ctrlRedisRouteStatus.HGetAll(p.hashname).Result()
ctime := p.getRedisTime().Unix() //time.Now().Unix()
for _, v := range mymap {
//判断是否超时 ,则证明路由下线,则加载这个路由下面的执行列表 , 丢入自己的管道中执行
it, err := strconv.ParseInt(v, 10, 64)
if err == nil {
ic := ctime - it
if ic > (p.timeout * 3) {
//这个客户端下线了,需要加载他所有的 未执行完成的 msg ,添加到自己的列表中
msgkeyMap, _ := p.ctrlRedisRouteStatus.HGetAll(config.OctopusRedisConfig.MsgStatusListName).Result()
for kk, _ := range msgkeyMap {
key := fmt.Sprintf("%s.%s", config.OctopusRedisConfig.MsgName_prefix, kk)
jsonstr, err := p.ctrlRedisRouteStatus.Get(key).Result()
if err == nil {
var msg config.T_CMsg
json.Unmarshal([]byte(jsonstr), &msg)
p.ctrl_msgMap.Write_chan_msgMap(msg.Mis)
}
p.ctrlRedisRouteStatus.HDel(config.OctopusRedisConfig.MsgStatusListName)
}
//直接跳出,完成后则直接跳出,就不继续遍历了,因为获取也需要很多时间
break
}
}
}
}
func (p *RRouteStatus) Loadhistory() {
msgkeyMap, _ := p.ctrlRedisRouteStatus.HGetAll(config.OctopusRedisConfig.MsgStatusListName).Result()
for kk, _ := range msgkeyMap {
key := fmt.Sprintf("%s.%s", config.OctopusRedisConfig.MsgName_prefix, kk)
jsonstr, err := p.ctrlRedisRouteStatus.Get(key).Result()
if err == nil {
var msg config.T_CMsg
json.Unmarshal([]byte(jsonstr), &msg)
p.ctrl_msgMap.Write_chan_msgMap(msg.Mis)
}
p.ctrlRedisRouteStatus.HDel(config.OctopusRedisConfig.MsgStatusListName)
}
}
func (p *RRouteStatus) set() {
rtime, err := p.ctrlRedisRouteStatus.Time().Result()
if err == nil {
ctime := rtime.Unix() //time.Now().Unix()
p.ctrlRedisRouteStatus.HSet(p.hashname, config.OctopusConfig.Traceid, ctime)
}
}
func (p *RRouteStatus) get(id string) int64 {
d, err := p.ctrlRedisRouteStatus.HGet(p.hashname, config.OctopusConfig.Traceid).Int64()
if err == nil {
return d
} else {
d = 0
}
return d
}
package model
import (
"encoding/json"
"ficus_router/config"
"ficus_router/mqcontrol"
"ficus_router/redisctrl"
"time"
)
/*
* 重试
* 消息失败后 丢入重试管道
* 重试次数,最小间隔 10 秒才能丢入推送客户端
* 重试 一定次数认为重试失败
*/
type CtrlRetry struct {
ctrser *Ctrl_Srvermap
ctrlRedisClient *redisctrl.CtrlRedisMsg
psend *mqcontrol.Producer
Timeout int64 // 超时时间
retryChan chan SerMsg // 重试管道
}
func NewCtrlRetry(producer *mqcontrol.Producer, ctrser *Ctrl_Srvermap, Timeout int64) *CtrlRetry {
p := &CtrlRetry{}
p.ctrser = ctrser
p.Timeout = Timeout
p.retryChan = make(chan SerMsg, 10000)
p.ctrlRedisClient = redisctrl.NewCtrlRedisMsg()
p.psend = producer
go p.Read_chan_retryChan()
return p
}
func (p *CtrlRetry) Set(mis SerMsg) {
go func() {
p.Write_chan_retryChan(mis)
}()
}
func (p *CtrlRetry) Write_chan_retryChan(rm SerMsg) {
p.retryChan <- rm
}
func (p *CtrlRetry) Read_chan_retryChan() {
for {
rm := <-p.retryChan
ic := time.Now().Unix() - rm.TimeSecond
if ic > p.Timeout {
//大于超时时间
//1. 获取当前的状态
//2. 未完成,写入管道,完成 => 直接丢弃
msg := p.ctrlRedisClient.GetMsg(rm.MsgId)
//SerName := p.ctrlRedisClient.GetClientConnSerTraceid(msg.Mis.Agent)
//判断SerName 是否连接到本路由,如果未连接
ser := p.ctrser.Get(rm.SerName)
if ser.SerName != "" {
//判断服务是否连接了本路由,并且同时超时
if msg.PushStatus == config.MSG_WAITSUCCESS {
//写入待执行管道
p.ctrser.Write_Chan_Server(rm)
}
} else {
// 服务 没有上线到本路由,则重新丢入MQ
// 返回 rabbitmq
json, _ := json.Marshal(msg.Mis)
p.psend.Push(json)
}
} else {
//小于超时时间,则不检测是否成功! 直接丢入自身的 检测重试 管道
p.Write_chan_retryChan(rm)
}
/*
SerName := p.ctrlRedisClient.GetClientTraceid(mis.Agent)
//客户端标示
if SerName != "" {
var rm SerMsg
rm.SerName=SerName
rm.MsgId=mis.ID
//写入待执行的管道中
if len(p.retryChan) > 5000 {
p.ctrser.Write_Chan_Server(rm)
}else{
time.Sleep(1 * time.Second)
p.ctrser.Write_Chan_Server(rm)
}
}else{
//没有查询到上线的机器信息,
//需要查询再次上线的路由
//name:=fmt.Sprintf("Server.%s",mis.Agent);
SerName := p.ctrlRedisClient.GetClientConnSerTraceid(mis.Agent)
//判断SerName 是否连接到本路由,如果未连接
ser:=p.ctrser.Get(SerName)
if ser.SerName != "" {
//服务 连接到了本路由 则写入 重试管道
//则重新丢入重试管道,等待下次重试
p.Write_chan_retryChan(mis)
}else{
// 服务 没有上线到本路由,则重新丢入MQ
// 返回 rabbitmq
json,_:=json.Marshal(mis)
p.psend.Send(string(json))
}
}
*/
}
}
package model
import (
"ficus_router/config"
"ficus_router/redisctrl"
"fmt"
"log"
"net"
"sync"
"time"
"unsafe"
)
type SliceMock struct {
addr uintptr
len int
cap int
}
//服务 客户端标示
type Server struct {
SerName string
Conn net.Conn
Online bool
Time int64
}
type SerMsg struct {
SerName string
//Mis ficus.Mission
//Msg Config.T_CMsg
MsgId string
Agent string
TimeSecond int64
}
type Ctrl_Srvermap struct {
SrverMap map[string]Server
Lock sync.RWMutex
chan_SerMsg chan SerMsg //"待发送的消息"
ctrl_Redismsg *redisctrl.CtrlRedisMsg
}
func NewServermap() *Ctrl_Srvermap {
p := &Ctrl_Srvermap{}
//p.ctr_msgMap=ctr_msgMap
p.SrverMap = make(map[string]Server)
p.chan_SerMsg = make(chan SerMsg)
p.ctrl_Redismsg = redisctrl.NewCtrlRedisMsg()
go p.read_Chan_Server()
return p
}
func (p *Ctrl_Srvermap) AddClient(s Server) {
p.Lock.Lock()
defer p.Lock.Unlock()
fmt.Println("Add map SerName", s.SerName)
s.Time = time.Now().Unix()
p.SrverMap[s.SerName] = s
}
func (p *Ctrl_Srvermap) Get(k string) Server {
p.Lock.RLock()
defer p.Lock.RUnlock()
rs := p.SrverMap[k]
if _, ok := p.SrverMap[k]; ok {
return rs
}
return rs
}
//判断服务是否存在
func (p *Ctrl_Srvermap) IsServer(k string) bool {
p.Lock.RLock()
defer p.Lock.RUnlock()
if _, ok := p.SrverMap[k]; ok {
return true
}
return false
}
func (p *Ctrl_Srvermap) Write_Chan_Server(rm SerMsg) {
go func() {
{ //日志
config.Eyelog.WriteIdAndLog(rm.MsgId, "", "写入待发送到ser管道 ID:", rm.MsgId)
}
p.chan_SerMsg <- rm
}()
}
func (p *Ctrl_Srvermap) read_Chan_Server() {
for {
rm := <-p.chan_SerMsg
//获取上线路由
SerName := p.ctrl_Redismsg.GetClientConnSerTraceid(rm.Agent)
rm.Agent = SerName
tempSer := p.Get(SerName)
if SerName != "" && tempSer.SerName != "" {
{
config.Eyelog.WriteCallAndLog(rm.MsgId, "", "发送到服务程序 ... - 消息ID:", rm.MsgId)
}
var comm config.SerCommand
comm.Command = config.COMMAND_TASK
copy(comm.Data[:len(rm.MsgId)], rm.MsgId)
comm.Length = uint32(len(rm.MsgId))
Len := unsafe.Sizeof(comm)
testBytes := &SliceMock{
addr: uintptr(unsafe.Pointer(&comm)),
cap: int(Len),
len: int(Len),
}
var data = *(*[]byte)(unsafe.Pointer(testBytes))
fmt.Println("start send msg ………………")
len, err := tempSer.Conn.Write(data)
fmt.Println("start send msg ………………Len:", len)
if err == nil {
//发送成功
fmt.Println("start send msg success………………")
{ //日志
config.Eyelog.WriteIdAndLog(rm.MsgId, "发送到服务程序成功 - 消息ID", rm.MsgId)
}
} else {
//发送失败
//fmt.Println("Command:",comm.Command)
p.failOnError(err, "Send SerMsg Error")
fmt.Println("start send msg fail")
{ //日志
config.Eyelog.WriteIdAndLog(rm.MsgId, "发送到服务程序失败 - 消息ID", rm.MsgId)
}
}
} else {
//直接丢弃
}
}
}
func (p *Ctrl_Srvermap) failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s:%s", msg, err)
panic(fmt.Sprintf("%s:%s", msg, err))
}
}
{
"注释-Connects":[
{"Name":"连接名称",
"Addr":"连接地址:amqp://username:password@IP:Port/vhost"}
],
"注释-Channels":[
{"Name":"信道名称",
"Connect":"信道所属连接"}
],
"注释-Exchanges":[
{"Name":"交换机名称",
"Channel":"交换机所属信道",
"Type":"交换机类型['direct':'路由','topic':'通配符','fanout':'订阅','headers ':'头部']",
"Durable":"是否持久化(bool)",
"AutoDeleted":"是否自动删除(bool)",
"Internal":"只允许MQ内部使用(bool)",
"NoWait":"非阻塞模式(bool)",
"EBind":[
{"Destination":"目标交换机",
"Key":"路由键",
"NoWait":"非阻塞模式(bool)"}
]}
],
"注释-Queue":[
{"Name":"队列名称",
"Durable":"是否持久化(bool)",
"AutoDelete":"是否自动删除(bool)",
"Exclusive":"排他性,只允许创建队列的用户访问(bool)",
"NoWait":"非阻塞模式(bool)",
"Bind":[
{"Key":"路由键", "ExchangeName":"交换机", "NoWait":"非阻塞模式(bool)"}
]}
],
"注释-Pusher-发送者":[
{"Name":"名称",
"Channel":"所属信道",
"Exchange":"交换机",
"Key":"路邮键",
"Mandtory":"(bool)",
"Immediate":"(bool)",
"ContentType":"消息类型",
"DeliveryMode":"是否持久化,2表示持久化,0表示非持久化(uint8)"}
],
"注释-Poper--接收者":[
{"Name":"名称",
"Channel":"所属信道",
"QName":"队列名称",
"Consumer":"消费者",
"AutoACK":"自动确认(bool)",
"Exclusive":"排他性(bool)",
"NoLocal":"非本地(bool)",
"NoWait":"非阻塞模式(bool)"
}
],
"Connects":[
{"Name":"myConnect",
"Addr":"amqp://guest:guest@127.0.0.1:5672/"}
],
"Channels":[
{"Name":"myChannel",
"Connect":"myConnect"}
],
"Exchanges":[
{"Name":"myExchange",
"Channel":"myChannel",
"Type":"direct",
"Durable":false,
"AutoDeleted":false,
"Internal":false,
"NoWait":false,
"Args":{
"alternate-exchange":"errExchange"
}
}
],
"Queue":[
{"Name":"myQueue",
"Channel":"myChannel",
"Durable":false,
"AutoDelete":false,
"Exclusive":false,
"NoWait":false,
"Bind":[
{"Key":"myQueue", "ExchangeName":"myExchange", "NoWait":false}
],
"Args":{
"":""
}
}
],
"Pusher":[
{"Name":"myPusher",
"Channel":"myChannel",
"Exchange":"myExchange",
"Key":"myQueue",
"Mandtory":false,
"Immediate":false,
"ContentType":"text/plain",
"DeliveryMode":0}
],
"Poper":[
{"Name":"myPoper",
"Channel":"myChannel",
"QName":"myQueue",
"Consumer":"",
"AutoACK":true,
"Exclusive":false,
"NoLocal":false,
"NoWait":false
}
]
}
{
"注释-Connects":[
{"Name":"连接名称",
"Addr":"连接地址:amqp://username:password@IP:Port/vhost"}
],
"注释-Channels":[
{"Name":"信道名称",
"Connect":"信道所属连接"}
],
"注释-Exchanges":[
{"Name":"交换机名称",
"Channel":"交换机所属信道",
"Type":"交换机类型['direct':'路由','topic':'通配符','fanout':'订阅','headers ':'头部']",
"Durable":"是否持久化(bool)",
"AutoDeleted":"是否自动删除(bool)",
"Internal":"只允许MQ内部使用(bool)",
"NoWait":"非阻塞模式(bool)",
"EBind":[
{"Destination":"目标交换机",
"Key":"路由键",
"NoWait":"非阻塞模式(bool)"}
]}
],
"注释-Queue":[
{"Name":"队列名称",
"Durable":"是否持久化(bool)",
"AutoDelete":"是否自动删除(bool)",
"Exclusive":"排他性,只允许创建队列的用户访问(bool)",
"NoWait":"非阻塞模式(bool)",
"Bind":[
{"Key":"路由键", "ExchangeName":"交换机", "NoWait":"非阻塞模式(bool)"}
]}
],
"注释-Pusher-发送者":[
{"Name":"名称",
"Channel":"所属信道",
"Exchange":"交换机",
"Key":"路邮键",
"Mandtory":"(bool)",
"Immediate":"(bool)",
"ContentType":"消息类型",
"DeliveryMode":"是否持久化,2表示持久化,0表示非持久化(uint8)"}
],
"注释-Poper--接收者":[
{"Name":"名称",
"Channel":"所属信道",
"QName":"队列名称",
"Consumer":"消费者",
"AutoACK":"自动确认(bool)",
"Exclusive":"排他性(bool)",
"NoLocal":"非本地(bool)",
"NoWait":"非阻塞模式(bool)"
}
],
"Connects":[
{"Name":"myConnect",
"Addr":"amqp://username:password@IP:Port/"}
],
"Channels":[
{"Name":"myChannel",
"Connect":"myConnect"}
],
"Exchanges":[
{"Name":"myExchange",
"Channel":"myChannel",
"Type":"direct",
"Durable":false,
"AutoDeleted":false,
"Internal":false,
"NoWait":false,
"Args":{
"alternate-exchange":"errExchange"
}
}
],
"Queue":[
{"Name":"myQueue",
"Channel":"myChannel",
"Durable":false,
"AutoDelete":false,
"Exclusive":false,
"NoWait":false,
"Bind":[
{"Key":"myQueue", "ExchangeName":"myExchange", "NoWait":false}
],
"Args":{
"":""
}
}
],
"Pusher":[
{"Name":"myPusher",
"Channel":"myChannel",
"Exchange":"myExchange",
"Key":"myQueue",
"Mandtory":false,
"Immediate":false,
"ContentType":"text/plain",
"DeliveryMode":0}
],
"Poper":[
{"Name":"myPoper",
"Channel":"myChannel",
"QName":"myQueue",
"Consumer":"",
"AutoACK":true,
"Exclusive":false,
"NoLocal":false,
"NoWait":false
}
]
}
package mqcontrol
type (
CallBackFunc func(MSG)
Consumer struct {
PoperName string
CallBack CallBackFunc
}
Producer struct {
PusherName string
Key string
}
// MqHandle interface {
// HandleMsg() (err error)
// }
)
func NewConsumer(poperName string, callBack CallBackFunc) *Consumer {
c := &Consumer{PoperName: poperName, CallBack: callBack}
return c
}
func NewProducer(pusherName, key string) *Producer {
p := &Producer{PusherName: pusherName, Key: key}
return p
}
func (c *Consumer) Pop() (err error) {
return Pop(c.PoperName, c.CallBack)
}
func (p *Producer) Push(msg []byte) (err error) {
return Push(p.PusherName, p.Key, msg)
}
package mqcontrol
import (
"bytes"
"encoding/gob"
"errors"
"fmt"
)
var _RPCMAP map[string](map[string]func([]byte)) = make(map[string](map[string]func([]byte)))
//事件结构
type Event struct {
FuncName string
Params interface{}
}
//添加回调函数
func AddRpc(poper string, name string, callback func([]byte)) (err error) {
if _, ok := _RPCMAP[poper]; !ok {
_RPCMAP[poper] = make(map[string]func([]byte))
}
if _, ok := _RPCMAP[poper][name]; ok {
return errors.New("回调函数已存在")
} else {
_RPCMAP[poper][name] = callback
}
return nil
}
//解码并回调函数
func callbackRpc(d MSG) {
var buf bytes.Buffer
buf.Write(d.Body)
dec := gob.NewDecoder(&buf)
var data Event
dec.Decode(&data)
if _, ok := _RPCMAP[d.Poper]; ok {
if _, ok := _RPCMAP[d.Poper][data.FuncName]; ok {
_RPCMAP[d.Poper][data.FuncName](data.Params.([]byte))
}
}
//注,else情况还没处理
d.Ack(false)
}
//收到Rpc调用
func RecvRpc(poperName string) (err error) {
if err = Pop(poperName, callbackRpc); err != nil {
return err
}
return nil
}
//发送RPC调用
func SendRpc(pusherName string, key string, funcName string, params interface{}) (err error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
var data = Event{
FuncName: funcName,
Params: params,
}
if err = enc.Encode(&data); err != nil {
fmt.Println("asdfasd")
return err
}
if err := Push(pusherName, key, buf.Bytes()); err != nil {
return err
}
return nil
}
package redisctrl
import (
"encoding/json"
"ficus_router/config"
"fmt"
"time"
log "github.com/sirupsen/logrus"
"github.com/go-redis/redis"
)
//redis 写入 默认列表
type CtrlRedisMsg struct {
//redis
ctrlRedisMsg *redis.Client
chanRedisMsg chan config.T_CMsg
}
func NewCtrlRedisMsg() *CtrlRedisMsg {
p := &CtrlRedisMsg{}
p.chanRedisMsg = make(chan config.T_CMsg, 10000)
p.ctrlRedisMsg = redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "", // no password set
DB: 0, // use default DB
})
go p.Read_Chan_RedisMsg()
return p
}
func (p *CtrlRedisMsg) setResisMsg(msg config.T_CMsg) {
jsonstr, _ := json.Marshal(msg)
//订单保存 1 天
kname := fmt.Sprintf("%s.%s", config.OctopusRedisConfig.MsgName_prefix, msg.Mis.ID)
log.WithFields(log.Fields{"func": "setResisMsg", "kname": kname}).Info(string(jsonstr))
p.ctrlRedisMsg.Set(kname, jsonstr, 60*24*60*time.Second)
}
func (p *CtrlRedisMsg) get(k string) config.T_CMsg {
jsonstr := p.ctrlRedisMsg.Get(k)
var msg config.T_CMsg
json.Unmarshal([]byte(jsonstr.Val()), msg)
return msg
}
func (p *CtrlRedisMsg) GetClientTraceid(k string) string {
kname := fmt.Sprintf("%s.%s", config.OctopusRedisConfig.ClientName_prefix, k)
json := p.ctrlRedisMsg.Get(kname)
return json.Val()
}
//客户端连接到的server
func (p *CtrlRedisMsg) GetClientConnSerTraceid(k string) string {
//ClientServer
kname := fmt.Sprintf("%s.%s", config.OctopusRedisConfig.ClientServer_prefix, k)
json := p.ctrlRedisMsg.Get(kname)
return json.Val()
}
func (p *CtrlRedisMsg) test(k string) {
//p.ctrlRedisMsg.ZAdd("ssss","1ss")
}
func (p *CtrlRedisMsg) GetMsg(k string) config.T_CMsg {
kname := fmt.Sprintf("%s.%s", config.OctopusRedisConfig.MsgName_prefix, k)
log.WithFields(log.Fields{"func": "GetMsg"}).Info(kname)
jsonstr := p.ctrlRedisMsg.Get(kname)
log.WithFields(log.Fields{"func": "jsonstr"}).Info(jsonstr.Val())
var msg config.T_CMsg
err := json.Unmarshal([]byte(jsonstr.Val()), &msg)
if err != nil {
fmt.Println("GetMsgTraceid json.Unmarshal!", err)
}
return msg
}
func (p *CtrlRedisMsg) Write_Chan_RedisMsg(msg config.T_CMsg) {
fmt.Println("0", msg.Mis.ID)
go func() {
fmt.Println("1", msg.Mis.ID)
p.chanRedisMsg <- msg
}()
}
func (p *CtrlRedisMsg) Read_Chan_RedisMsg() {
for {
var msg config.T_CMsg
msg = <-p.chanRedisMsg
fmt.Println("2", msg.Mis.ID)
go func() {
fmt.Println("3", msg.Mis.ID)
p.setResisMsg(msg)
}()
}
}
package redisctrl
import (
"ficus_router/config"
"fmt"
"github.com/go-redis/redis"
)
//redis - hash 集合
//未成功完成的状态列表
//hash集合的名称为 Project.Traceid.MsgStatsu
type RMsgStatus struct {
ctrlRedisMsgStatus *redis.Client
hashname string "hash name"
}
func NewRMsgStatus() *RMsgStatus {
p := &RMsgStatus{}
p.ctrlRedisMsgStatus = redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "", // no password set
DB: 0, // use default DB
})
//p.hashname=fmt.Sprint("%s.%s.%s",Config.OctopusConfig.Project,Config.OctopusConfig.Traceid,"MsgStatsu")
p.hashname = config.OctopusRedisConfig.MsgStatusListName
fmt.Println("MsgStatus Redis Hash 上线 名称:", p.hashname)
return p
}
func (p *RMsgStatus) StatusWork(msgid string, status config.MSG_STATUS) bool {
if status == config.MSG_WAITSUCCESS {
//写入 -- redis
p.set(msgid)
} else if status == config.MSG_SUCCESSED {
//删除 - redis
p.del(msgid)
} else if status == config.MSG_FAILED {
//不做任何操作
}
return true
}
func (p *RMsgStatus) Del(msgid string) bool {
p.del(msgid)
return true
}
func (p *RMsgStatus) set(msgid string) bool {
if p.ctrlRedisMsgStatus.HExists(p.hashname, msgid).Val() == false {
p.ctrlRedisMsgStatus.HSet(p.hashname, msgid, config.MSG_WAITSUCCESS)
} else {
//存在直接返回
return true
}
return true
}
func (p *RMsgStatus) del(msgid string) bool {
p.ctrlRedisMsgStatus.HDel(p.hashname, msgid)
return true
}
func (p *RMsgStatus) Get(msgid string) (config.MSG_STATUS, error) {
s, err := p.ctrlRedisMsgStatus.HGet(p.hashname, msgid).Int()
if err == nil {
if s == 0 {
return config.MSG_WAITSUCCESS, nil
} else if s == 1 {
return config.MSG_SUCCESSED, nil
} else if s == 3 {
return config.MSG_FAILED, nil
} else {
return config.MSG_WAITSUCCESS, err
}
}
return config.MSG_WAITSUCCESS, err
}
package rpcid
import (
"ficus_router/config"
"fmt"
)
func AppendRpcid(rpcid string) string {
var tempstr string
if len(rpcid) == 0 {
tempstr = config.OctopusConfig.Traceid
} else {
tempstr = fmt.Sprintf("%s.%s", rpcid, config.OctopusConfig.Traceid)
}
rpcid = tempstr
return rpcid
}
package work
import (
"encoding/json"
"ficus/mission"
"ficus_router/config"
"ficus_router/model"
"fmt"
"net"
"time"
"unsafe"
)
func StartClientServer(ctrser *model.Ctrl_Srvermap) {
//建立socket,监听端口 第一步:绑定端口
//netListen, err := net.Listen("tcp", "localhost:1024")
netListen, err := net.Listen("tcp", ":38000")
failOnError(err, "tcp")
//defer延迟关闭改资源,以免引起内存泄漏
defer netListen.Close()
fmt.Println("Waiting for ServiceClients")
for {
conn, err := netListen.Accept() //第二步:获取连接
if err != nil {
//log.Println("error:",err)
continue //出错进行下一次循环
}
//defer conn.Close()
//协成写入连接
go RecvClient(conn, ctrser)
}
}
func byteString(p []byte) string {
fmt.Println(p)
s := p[:]
return string(s)
}
func RecvClient(conn net.Conn, ctrser *model.Ctrl_Srvermap) {
//等待反馈
buffer := make([]byte, config.MQ_MAX_RECV_LEN)
for { //无限循环
_, err := conn.Read(buffer) //第三步:读取从该端口传来的内容
if err != nil {
fmt.Println("error:", err)
return
}
//var pServer Config.SerCommand
//pServer.Data=make([]byte, 1024)
pServer := *(**config.SerCommand)(unsafe.Pointer(&buffer))
if pServer.Command == config.COMMAND_LOGIN {
//添加 登录过来的 服务
//sername := *(*string)(unsafe.Pointer(&pServer.Data))
Sername := string(pServer.Data[:pServer.Length])
fmt.Println(pServer.Command)
fmt.Println("-----recv_ser:", Sername, "len:", len(Sername))
if pServer.SerType == config.SERTYPE_CONSUME {
//注册的消费者,添加到消费服务列表
var ser model.Server
ser.SerName = Sername
ser.Conn = conn
ser.Online = true
ser.Time = time.Now().Unix()
ctrser.AddClient(ser)
} else if pServer.SerType == config.SERTYPE_FEEDBACK {
//注册的反馈服务,则添加入反馈者列表
}
//返回连接的路由消息
var SendSend config.SerCommand
SendSend.Command = config.COMMAND_LOGIN
temp := "route.1"
copy(SendSend.Data[:], temp)
SendSend.Length = uint32(len(SendSend.Data))
Len := unsafe.Sizeof(SendSend)
testBytes := &SliceMock{
addr: uintptr(unsafe.Pointer(&SendSend)),
cap: int(Len),
len: int(Len),
}
var data = *(*[]byte)(unsafe.Pointer(testBytes))
conn.Write(data)
} else if pServer.Command == config.COMMAND_TASK_BACK {
//执行反馈,修改队列任务状态
var mis mission.Message
err := json.Unmarshal(pServer.Data[:pServer.Length], &mis)
if err != nil {
fmt.Println("COMMAND_TASK_BACK json.Unmarshal error")
continue
}
if pServer.PushStatus == config.MSG_SUCCESSED {
//已经成功
/*
fmt.Println("收到一个反馈成功^^^")
//pServer.Data
Ctrl_MsgMap.UpdateStatus(mis.ID,Config.MSG_SUCCESSED)
*/
} else if pServer.PushStatus == config.MSG_FAILED {
/*
//已经失败, 1.消息重新回到MQ 2.写入重试管道
//更新状态
Ctrl_MsgMap.UpdateStatus(mis.ID,Config.MSG_SUCCESSED)
//添加一个重试次数
Ctrl_MsgMap.AddCount(mis.ID)
//写入重试管道,超时后由重试管道 写入异常类
mis:=Ctrl_MsgMap.Get(mis.ID)
if len(mis.ID) > 0 {
ctrlRetry.Write_chan_retryChan(mis)
}
//重试次数过多,可以考虑写入 redis -- 后续考虑
*/
}
}
}
}
package work
import (
"encoding/json"
"ficus/mission"
"ficus_router/config"
"ficus_router/model"
"ficus_router/mqcontrol"
"ficus_router/redisctrl"
"ficus_router/rpcid"
"fmt"
"time"
log "github.com/sirupsen/logrus"
)
type SliceMock struct {
addr uintptr
len int
cap int
}
func callback(d mqcontrol.MSG, ctrser *model.Ctrl_Srvermap, redisctrl *redisctrl.CtrlRedisMsg, ctrl_mapMsg *model.CtrlMsgMap) {
log.Info("callback")
body := d.Body
var mis mission.Message
err := json.Unmarshal(body, &mis)
log.Info(string(body))
failOnError(err, "recv:json.Unmarshal - ficus.Mission")
if err == nil {
// { //日志
// config.Eyelog.WriteCallAndLog(mis.ID, "", "从 rabbitmq 中 读取到一条消息 ID:", mis.ID)
// }
//获取 客户端 连接到的 服务 Traceid
//SerName := Ctrl_Redismsg.GetClientConnSerTraceid(mis.Agent)
{ //1.写入 redis -> 接受到的所有消息都需要写入 redis 缓存消息
var msg config.T_CMsg
msg.RouteName = ""
fmt.Println("-1", mis.ID)
msg.Mis = mis
msg.RouteName = config.OctopusConfig.Traceid //经过的路由器
msg.PushStatus = config.MSG_WAITSUCCESS
msg.Lastupdatetime = time.Now().Unix()
msg.Rpcid = rpcid.AppendRpcid("")
redisctrl.Write_Chan_RedisMsg(msg)
}
log.Info("步骤2")
{ //2.直接写入 执行管道
//fmt.Println("SerName:",SerName);
var rm model.SerMsg
rm.SerName = ""
rm.MsgId = mis.ID
rm.Agent = mis.Agent
rm.TimeSecond = time.Now().Unix()
//fmt.Println("TimeSecond:",rm.TimeSecond)
//写入待发送到ser管道
ctrser.Write_Chan_Server(rm)
//写消息到map中
ctrl_mapMsg.Write_chan_msgMap(mis)
}
} else {
log.Println(string(body))
}
}
func Start(ctrser *model.Ctrl_Srvermap) {
//redis 客户端
Ctrl_Redismsg := redisctrl.NewCtrlRedisMsg()
//启动读写实例
if err := mqcontrol.Init("./mq.json"); err != nil {
panic("mq 初始化失败" + err.Error())
}
//生产者
psend := mqcontrol.NewProducer("myPusher", "myQueue")
//消息的map表
ctrl_Retry := model.NewCtrlRetry(psend, ctrser, 10)
ctrl_mapMsg := model.NewMsgmap(psend, ctrser, ctrl_Retry)
{ // 一个路由中 可以有多个 任务线程,上线一个路由线程,现在暂且定为 每个 路由 只上线 1个
routeMsg := model.NewRRouteStatus(ctrl_mapMsg, 30)
//重新启动后 --- 把无完成的任务添加在自己的map中
routeMsg.Loadhistory()
}
//消费者
precv := mqcontrol.NewConsumer("myPoper", func(d mqcontrol.MSG) {
callback(d, ctrser, Ctrl_Redismsg, ctrl_mapMsg)
})
if err := precv.Pop(); err != nil {
panic("Pop 监听失败")
}
}
func failOnError(err error, msg string) {
if err != nil {
log.Error("%s:%s", msg, err)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment