Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
F
ficus_router
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
yunpeng.song
ficus_router
Commits
62cac69d
Commit
62cac69d
authored
Aug 20, 2019
by
yunpeng.song
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
日志模块从ini改成json
parent
80e38f4d
Hide whitespace changes
Inline
Side-by-side
Showing
11 changed files
with
156 additions
and
143 deletions
+156
-143
config/config.go
+14
-7
config/loadoctopusconfig.go
+50
-62
log.go
+40
-0
main.go
+11
-27
model/msgmap.go
+8
-9
model/redisroutestatus.go
+2
-1
model/server.go
+10
-22
mqcontrol/rpc.go
+0
-2
octopus.json
+14
-0
work/Server.go
+7
-7
work/work.go
+0
-6
No files found.
config/config.go
View file @
62cac69d
...
...
@@ -58,17 +58,17 @@ type T_CMsg struct {
}
type
OCTOPUS_CONFIG
struct
{
Traceid
string
//"路由的唯一,追踪的标示"
Project
string
//"项目的唯一标示"
Traceid
string
`json:"traceid"`
//"路由的唯一,追踪的标示"
Project
string
`json:"project"`
//"项目的唯一标示"
}
//redis key name
type
REDISNAME_CONFIG
struct
{
MsgName_prefix
string
//"消息的前缀"
ClientName_prefix
string
//"客户端的前缀"
ClientServer_prefix
string
//""
MsgStatusListName
string
//"未执行消息的list,只带消息id的"
RotuteStatuslistName
string
//"在线路由的列表"
MsgName_prefix
string
`json:"msgname"`
//"消息的前缀"
ClientName_prefix
string
`json:"clientname"`
//"客户端的前缀"
ClientServer_prefix
string
`json:"clientserver"`
//""
MsgStatusListName
string
`json:"msgstatusname"`
//"未执行消息的list,只带消息id的"
RotuteStatuslistName
string
`json:"routestatusname"`
//"在线路由的列表"
}
//路由状态表
...
...
@@ -91,3 +91,10 @@ type SER_STATUS struct {
Memory
int
//"内存使用量,单位为G"
GoCount
int
//"协成数"
}
var
Ctrl_Config_
=
newCfg
()
type
CFG
struct
{
Prefix
REDISNAME_CONFIG
`json:"prefix"`
OctopusCfg
OCTOPUS_CONFIG
`json:"servercfg"`
}
config/loadoctopusconfig.go
View file @
62cac69d
package
config
import
(
"
fmp_kit_eyeatom/eyeatom
"
"
fmt
"
"
encoding/json
"
"
io/ioutil
"
"os"
"os/exec"
"strings"
"github.com/Unknwon/goconfig
"
log
"github.com/sirupsen/logrus
"
)
//加载配置文件
const
CONFIG_CONFIGNAME_OCTOPUS
=
"
Octopus.ini
"
const
CONFIG_CONFIGNAME_OCTOPUS
=
"
octopus.json
"
var
OctopusConfig
OCTOPUS_CONFIG
var
OctopusRedisConfig
REDISNAME_CONFIG
//各种数据在redis 中的名称
var
OctopusConfigFilePath
string
var
Eyelog
*
eyeatom
.
EyeAtom
type
CtrlOctopusConfig
struct
{
}
func
NewCtrlOctopusConfig
()
*
CtrlOctopusConfig
{
p
:=
&
CtrlOctopusConfig
{}
OctopusConfigFilePath
=
fmt
.
Sprintf
(
"%s%s"
,
p
.
getCurrentPath
(),
CONFIG_CONFIGNAME_OCTOPUS
)
fmt
.
Println
(
OctopusConfigFilePath
)
p
.
creteConfigFile
()
//初始 CONFIG 化结构体
{
OctopusConfig
.
Traceid
=
""
OctopusConfig
.
Project
=
""
}
p
.
LoadLocalOctopusConfig
()
OctopusRedisConfig
.
MsgName_prefix
=
fmt
.
Sprintf
(
"%s.Msg"
,
OctopusConfig
.
Project
)
OctopusRedisConfig
.
ClientName_prefix
=
fmt
.
Sprintf
(
"%s.Client"
,
OctopusConfig
.
Project
)
OctopusRedisConfig
.
ClientServer_prefix
=
fmt
.
Sprintf
(
"%s.ClientServer"
,
OctopusConfig
.
Project
)
OctopusRedisConfig
.
MsgStatusListName
=
fmt
.
Sprintf
(
"%s.%s.%s"
,
OctopusConfig
.
Project
,
OctopusConfig
.
Traceid
,
"MsgStatsus"
)
OctopusRedisConfig
.
RotuteStatuslistName
=
fmt
.
Sprintf
(
"%s.%s"
,
OctopusConfig
.
Project
,
"RouteStatus"
)
fmt
.
Println
(
OctopusRedisConfig
.
MsgName_prefix
)
fmt
.
Println
(
OctopusRedisConfig
.
ClientName_prefix
)
fmt
.
Println
(
OctopusRedisConfig
.
ClientServer_prefix
)
fmt
.
Println
(
OctopusRedisConfig
.
MsgStatusListName
)
fmt
.
Println
(
OctopusRedisConfig
.
RotuteStatuslistName
)
//初始化 eyelog
{
var
callrpc
=
eyeatom
.
CALLRPC
{
OctopusConfig
.
Project
,
OctopusConfig
.
Traceid
,
"serlog"
}
var
redisconfig
=
eyeatom
.
RedisConfig
{
"127.0.0.1:6379"
,
""
,
0
}
Eyelog
=
eyeatom
.
NewCtrlCallChain
(
callrpc
,
redisconfig
)
}
func
newCfg
()
*
CFG
{
cfg
:=
&
CFG
{}
return
cfg
}
return
p
func
GetSrverId
()
string
{
return
OctopusConfig
.
Traceid
}
func
GetSrverName
()
string
{
return
OctopusConfig
.
Project
}
func
(
p
*
CtrlOctopusConfig
)
LoadLocalOctopusConfig
()
OCTOPUS_CONFIG
{
func
initOhterCfg
()
{
OctopusConfig
=
Ctrl_Config_
.
OctopusCfg
cfg
,
err
:=
goconfig
.
LoadConfigFile
(
OctopusConfigFilePath
)
if
err
!=
nil
{
panic
(
"错误"
)
}
OctopusConfig
.
Traceid
,
err
=
cfg
.
GetValue
(
"Octopus"
,
"traceid"
)
OctopusConfig
.
Project
,
err
=
cfg
.
GetValue
(
"Octopus"
,
"project"
)
if
err
!=
nil
{
OctopusConfig
.
Traceid
=
""
}
cfg
.
Reload
()
return
OctopusConfig
OctopusRedisConfig
.
MsgName_prefix
=
OctopusConfig
.
Project
+
".Msg"
OctopusRedisConfig
.
ClientName_prefix
=
OctopusConfig
.
Project
+
".Client"
OctopusRedisConfig
.
ClientServer_prefix
=
OctopusConfig
.
Project
+
"ClientServer"
OctopusRedisConfig
.
MsgStatusListName
=
OctopusConfig
.
Project
+
OctopusConfig
.
Traceid
+
"MsgStatsus"
OctopusRedisConfig
.
RotuteStatuslistName
=
OctopusConfig
.
Project
+
"RouteStatus"
log
.
WithFields
(
log
.
Fields
{
"Traceid"
:
Ctrl_Config_
.
OctopusCfg
.
Traceid
,
"Project"
:
Ctrl_Config_
.
OctopusCfg
.
Project
,
"MsgName_prefix"
:
OctopusRedisConfig
.
MsgName_prefix
,
"ClientName_prefix"
:
OctopusRedisConfig
.
ClientName_prefix
,
"ClientServer_prefix"
:
OctopusRedisConfig
.
ClientServer_prefix
,
"MsgStatusListName"
:
OctopusRedisConfig
.
MsgStatusListName
,
"RotuteStatuslistName"
:
OctopusRedisConfig
.
RotuteStatuslistName
,
})
.
Info
()
}
func
(
p
*
CtrlOctopusConfig
)
getCurrentPath
()
string
{
func
getCurrentPath
()
string
{
s
,
err
:=
exec
.
LookPath
(
os
.
Args
[
0
])
if
err
!=
nil
{
...
...
@@ -81,18 +61,26 @@ func (p *CtrlOctopusConfig) getCurrentPath() string {
path
:=
string
(
s
[
0
:
i
+
1
])
return
path
}
func
(
p
*
CtrlOctopusConfig
)
creteConfigFile
()
{
if
p
.
isFileExist
(
OctopusConfigFilePath
)
==
false
{
file2
,
error
:=
os
.
OpenFile
(
OctopusConfigFilePath
,
os
.
O_RDWR
|
os
.
O_CREATE
,
0766
)
if
error
!=
nil
{
fmt
.
Println
(
error
)
}
file2
.
Close
()
func
LoadCfg
()
(
err
error
)
{
OctopusConfigFilePath
=
getCurrentPath
()
+
CONFIG_CONFIGNAME_OCTOPUS
log
.
WithFields
(
log
.
Fields
{
"cfgpath"
:
OctopusConfigFilePath
,
})
.
Info
()
var
fp
*
os
.
File
if
fp
,
err
=
os
.
Open
(
OctopusConfigFilePath
);
err
!=
nil
{
return
err
}
}
func
(
p
*
CtrlOctopusConfig
)
isFileExist
(
filename
string
)
bool
{
if
_
,
err
:=
os
.
Stat
(
filename
);
os
.
IsNotExist
(
err
)
{
return
false
var
data
[]
byte
if
data
,
err
=
ioutil
.
ReadAll
(
fp
);
err
!=
nil
{
return
err
}
if
err
=
fp
.
Close
();
err
!=
nil
{
return
err
}
if
err
=
json
.
Unmarshal
(
data
,
Ctrl_Config_
);
err
!=
nil
{
return
err
}
return
true
initOhterCfg
()
return
nil
}
log.go
0 → 100644
View file @
62cac69d
package
main
import
(
"os"
"time"
rotatelogs
"github.com/lestrrat-go/file-rotatelogs"
"github.com/rifflock/lfshook"
log
"github.com/sirupsen/logrus"
)
func
InitLog
()
{
logName
:=
string
(
"ficus"
)
file
,
err
:=
os
.
OpenFile
(
os
.
DevNull
,
os
.
O_WRONLY
|
os
.
O_TRUNC
|
os
.
O_CREATE
,
666
)
if
err
!=
nil
{
log
.
Fatalln
(
"fail to create ficus.log file!"
)
}
// 设置日志格式为json格式
log
.
SetFormatter
(
&
log
.
JSONFormatter
{})
// 设置将日志输出到标准输出(默认的输出为stderr,标准错误)
// 日志消息输出可以是任意的io.writer类型
log
.
SetOutput
(
file
)
// 设置日志级别为warn以上
log
.
SetLevel
(
log
.
InfoLevel
)
logWriter
,
err
:=
rotatelogs
.
New
(
logName
+
".%Y-%m-%d-%H-%M.log"
,
rotatelogs
.
WithLinkName
(
logName
),
// 生成软链,指向最新日志文件
rotatelogs
.
WithMaxAge
(
30
*
24
*
time
.
Hour
),
// 文件最大保存时间
rotatelogs
.
WithRotationTime
(
24
*
time
.
Hour
),
// 日志切割时间间隔
)
writeMap
:=
lfshook
.
WriterMap
{
log
.
InfoLevel
:
logWriter
,
log
.
ErrorLevel
:
logWriter
,
log
.
FatalLevel
:
logWriter
,
}
lfHook
:=
lfshook
.
NewHook
(
writeMap
,
&
log
.
JSONFormatter
{})
log
.
AddHook
(
lfHook
)
}
main.go
View file @
62cac69d
...
...
@@ -4,42 +4,18 @@ import (
"ficus_router/config"
"ficus_router/model"
"ficus_router/work"
"
fmt
"
"
log
"
_
"net/http/pprof"
"os"
"time"
log
"github.com/sirupsen/logrus"
)
const
HttpState1
=
":38888"
var
quitFlag
chan
int
func
InitLog
()
{
file
,
err
:=
os
.
OpenFile
(
"test.log"
,
os
.
O_WRONLY
|
os
.
O_TRUNC
|
os
.
O_CREATE
,
666
)
if
err
!=
nil
{
log
.
Fatalln
(
"fail to create test.log file!"
)
}
// 设置日志格式为json格式
log
.
SetFormatter
(
&
log
.
JSONFormatter
{})
// 设置将日志输出到标准输出(默认的输出为stderr,标准错误)
// 日志消息输出可以是任意的io.writer类型
log
.
SetOutput
(
file
)
// 设置日志级别为warn以上
log
.
SetLevel
(
log
.
InfoLevel
)
log
.
Info
(
"start"
)
}
func
main
()
{
InitLog
()
cfg
:=
config
.
NewCtrlOctopusConfig
()
octCfg
:=
cfg
.
LoadLocalOctopusConfig
()
fmt
.
Println
(
"Octopus:Traceid"
,
octCfg
.
Traceid
)
InitCfg
()
//管道阻塞主协程,防止主协程退出
quitFlag
=
make
(
chan
int
)
//服务程序 上线Map
...
...
@@ -48,6 +24,14 @@ func main() {
go
work
.
StartClientServer
(
ctr_mapSer
)
time
.
Sleep
(
5
*
time
.
Second
)
go
work
.
Start
(
ctr_mapSer
)
<-
quitFlag
fmt
.
Println
(
"退出"
)
log
.
Println
(
"退出"
)
}
func
InitCfg
()
{
if
err
:=
config
.
LoadCfg
();
err
!=
nil
{
log
.
Fatalln
(
err
)
}
}
model/msgmap.go
View file @
62cac69d
...
...
@@ -6,7 +6,6 @@ import (
"ficus_router/config"
"ficus_router/mqcontrol"
"ficus_router/redisctrl"
"fmt"
"sync"
"time"
...
...
@@ -50,9 +49,6 @@ func NewMsgmap(ctrl_SendMq *mqcontrol.Producer, ctrl_SrverMap *Ctrl_Srvermap, Ct
}
func
(
p
*
CtrlMsgMap
)
Write_chan_msgMap
(
mis
mission
.
Message
)
{
go
func
()
{
{
//日志
config
.
Eyelog
.
WriteIdAndLog
(
mis
.
ID
,
""
,
"消息写入路由的消息map中 ID:"
,
mis
.
ID
)
}
p
.
chan_msgMap
<-
mis
}()
}
...
...
@@ -70,7 +66,7 @@ func (p *CtrlMsgMap) work_Read_chan_msgMap() {
//超时检测协成
func
(
p
*
CtrlMsgMap
)
TimeOutTimerMsg
()
{
for
{
fmt
.
Println
(
"------------执行一次超时检测--------------"
)
log
.
Println
(
"------------执行一次超时检测--------------"
)
p
.
Lock
.
Lock
()
for
k
,
v
:=
range
p
.
msgMap
{
log
.
WithFields
(
log
.
Fields
{
"func"
:
"msgkey"
})
.
Info
(
k
)
...
...
@@ -78,10 +74,13 @@ func (p *CtrlMsgMap) TimeOutTimerMsg() {
if
msg
.
PushStatus
==
config
.
MSG_WAITSUCCESS
{
//获取 客户端 连接到的 服务 Traceid
serName
:=
p
.
ctrlRedisClient
.
GetClientConnSerTraceid
(
msg
.
Mis
.
Agent
)
fmt
.
Println
(
serName
)
serName
,
err
:=
p
.
ctrlRedisClient
.
GetClientConnSerTraceid
(
msg
.
Mis
.
Agent
)
if
err
!=
nil
{
log
.
Println
(
"未找到对应server"
,
err
)
continue
}
if
serName
==
""
{
fmt
.
Println
(
msg
.
Mis
.
ID
,
"----"
,
msg
.
Mis
.
Agent
,
"-- 客户端未上线"
)
log
.
Println
(
msg
.
Mis
.
ID
,
"----"
,
msg
.
Mis
.
Agent
,
"-- 客户端未上线"
)
//未获取到客户端上线标识,证明 此机器并未上线,则忽略此消息
continue
}
...
...
@@ -176,7 +175,7 @@ func (p *CtrlMsgMap) UpdateStatus(k string, status config.MSG_STATUS) error {
v
.
PushStatus
=
status
p
.
msgMap
[
k
]
=
v
}
else
{
return
NewMyErr
(
1
,
fmt
.
Sprint
(
"map values is nil ,key:%v"
,
k
)
)
return
NewMyErr
(
1
,
"map values is nil ,key:"
+
k
)
}
return
nil
}
model/redisroutestatus.go
View file @
62cac69d
...
...
@@ -4,6 +4,7 @@ import (
"encoding/json"
"ficus_router/config"
"fmt"
"log"
"strconv"
"time"
...
...
@@ -34,7 +35,7 @@ func NewRRouteStatus(ctrl_msgMap *CtrlMsgMap, timeout int64) *RRouteStatus {
p
.
timeout
=
timeout
//p.hashname=fmt.Sprint("%s.%s.%s",Config.OctopusConfig.Project,Config.OctopusConfig.Traceid,"RouteStatus")
p
.
hashname
=
config
.
OctopusRedisConfig
.
RotuteStatuslistName
fmt
.
Println
(
"路由上线 Redis Hash 名称:"
,
p
.
hashname
)
log
.
Println
(
"路由上线 Redis Hash 名称:"
,
p
.
hashname
)
go
p
.
routeWorkStatus
()
return
p
}
...
...
model/server.go
View file @
62cac69d
...
...
@@ -3,7 +3,6 @@ package model
import
(
"ficus_router/config"
"ficus_router/redisctrl"
"fmt"
"log"
"net"
"sync"
...
...
@@ -54,7 +53,7 @@ func (p *Ctrl_Srvermap) AddClient(s Server) {
p
.
Lock
.
Lock
()
defer
p
.
Lock
.
Unlock
()
fmt
.
Println
(
"Add map SerName"
,
s
.
SerName
)
log
.
Println
(
"Add map SerName"
,
s
.
SerName
)
s
.
Time
=
time
.
Now
()
.
Unix
()
p
.
SrverMap
[
s
.
SerName
]
=
s
}
...
...
@@ -80,9 +79,6 @@ func (p *Ctrl_Srvermap) IsServer(k string) bool {
}
func
(
p
*
Ctrl_Srvermap
)
Write_Chan_Server
(
rm
SerMsg
)
{
go
func
()
{
{
//日志
config
.
Eyelog
.
WriteIdAndLog
(
rm
.
MsgId
,
""
,
"写入待发送到ser管道 ID:"
,
rm
.
MsgId
)
}
p
.
chan_SerMsg
<-
rm
}()
}
...
...
@@ -90,13 +86,14 @@ func (p *Ctrl_Srvermap) read_Chan_Server() {
for
{
rm
:=
<-
p
.
chan_SerMsg
//获取上线路由
SerName
:=
p
.
ctrl_Redismsg
.
GetClientConnSerTraceid
(
rm
.
Agent
)
SerName
,
err
:=
p
.
ctrl_Redismsg
.
GetClientConnSerTraceid
(
rm
.
Agent
)
if
err
!=
nil
{
log
.
Println
(
"未找到对应server"
,
err
)
continue
}
rm
.
Agent
=
SerName
tempSer
:=
p
.
Get
(
SerName
)
if
SerName
!=
""
&&
tempSer
.
SerName
!=
""
{
{
config
.
Eyelog
.
WriteCallAndLog
(
rm
.
MsgId
,
""
,
"发送到服务程序 ... - 消息ID:"
,
rm
.
MsgId
)
}
var
comm
config
.
SerCommand
comm
.
Command
=
config
.
COMMAND_TASK
copy
(
comm
.
Data
[
:
len
(
rm
.
MsgId
)],
rm
.
MsgId
)
...
...
@@ -110,24 +107,15 @@ func (p *Ctrl_Srvermap) read_Chan_Server() {
}
var
data
=
*
(
*
[]
byte
)(
unsafe
.
Pointer
(
testBytes
))
fmt
.
Println
(
"start send msg ………………"
)
_
,
err
:=
tempSer
.
Conn
.
Write
(
data
)
len
,
err
:=
tempSer
.
Conn
.
Write
(
data
)
fmt
.
Println
(
"start send msg ………………Len:"
,
len
)
if
err
==
nil
{
//发送成功
fmt
.
Println
(
"start send msg success………………"
)
{
//日志
config
.
Eyelog
.
WriteIdAndLog
(
rm
.
MsgId
,
"发送到服务程序成功 - 消息ID"
,
rm
.
MsgId
)
}
log
.
Println
(
rm
.
MsgId
+
"start send msg success………………"
)
}
else
{
//发送失败
//fmt.Println("Command:",comm.Command)
p
.
failOnError
(
err
,
"Send SerMsg Error"
)
fmt
.
Println
(
"start send msg fail"
)
{
//日志
config
.
Eyelog
.
WriteIdAndLog
(
rm
.
MsgId
,
"发送到服务程序失败 - 消息ID"
,
rm
.
MsgId
)
}
log
.
Println
(
rm
.
MsgId
+
"start send msg fail"
)
}
}
else
{
//直接丢弃
...
...
@@ -140,6 +128,6 @@ func (p *Ctrl_Srvermap) read_Chan_Server() {
func
(
p
*
Ctrl_Srvermap
)
failOnError
(
err
error
,
msg
string
)
{
if
err
!=
nil
{
log
.
Fatalf
(
"%s:%s"
,
msg
,
err
)
panic
(
fmt
.
Sprintf
(
"%s:%s"
,
msg
,
err
))
panic
(
msg
+
err
.
Error
(
))
}
}
mqcontrol/rpc.go
View file @
62cac69d
...
...
@@ -4,7 +4,6 @@ import (
"bytes"
"encoding/gob"
"errors"
"fmt"
)
var
_RPCMAP
map
[
string
](
map
[
string
]
func
([]
byte
))
=
make
(
map
[
string
](
map
[
string
]
func
([]
byte
)))
...
...
@@ -63,7 +62,6 @@ func SendRpc(pusherName string, key string, funcName string, params interface{})
Params
:
params
,
}
if
err
=
enc
.
Encode
(
&
data
);
err
!=
nil
{
fmt
.
Println
(
"asdfasd"
)
return
err
}
...
...
octopus.json
0 → 100644
View file @
62cac69d
{
"servercfg"
:{
"traceid"
:
"ser1"
,
"project"
:
"clientserver"
},
"prefix"
:{
"msgname"
:
""
,
"clientname"
:
""
,
"clientserver"
:
""
,
"msgstatusname"
:
""
,
"routestatusname"
:
""
}
}
\ No newline at end of file
work/Server.go
View file @
62cac69d
...
...
@@ -5,10 +5,11 @@ import (
"ficus/mission"
"ficus_router/config"
"ficus_router/model"
"fmt"
"net"
"time"
"unsafe"
log
"github.com/sirupsen/logrus"
)
func
StartClientServer
(
ctrser
*
model
.
Ctrl_Srvermap
)
{
...
...
@@ -19,7 +20,7 @@ func StartClientServer(ctrser *model.Ctrl_Srvermap) {
//defer延迟关闭改资源,以免引起内存泄漏
defer
netListen
.
Close
()
fmt
.
Println
(
"Waiting for ServiceClients"
)
log
.
Println
(
"Waiting for ServiceClients"
)
for
{
conn
,
err
:=
netListen
.
Accept
()
//第二步:获取连接
if
err
!=
nil
{
...
...
@@ -33,7 +34,6 @@ func StartClientServer(ctrser *model.Ctrl_Srvermap) {
}
func
byteString
(
p
[]
byte
)
string
{
fmt
.
Println
(
p
)
s
:=
p
[
:
]
return
string
(
s
)
}
...
...
@@ -43,7 +43,7 @@ func RecvClient(conn net.Conn, ctrser *model.Ctrl_Srvermap) {
for
{
//无限循环
_
,
err
:=
conn
.
Read
(
buffer
)
//第三步:读取从该端口传来的内容
if
err
!=
nil
{
fmt
.
Println
(
"error:"
,
err
)
log
.
Println
(
"error:"
,
err
)
return
}
//var pServer Config.SerCommand
...
...
@@ -54,8 +54,8 @@ func RecvClient(conn net.Conn, ctrser *model.Ctrl_Srvermap) {
//添加 登录过来的 服务
//sername := *(*string)(unsafe.Pointer(&pServer.Data))
Sername
:=
string
(
pServer
.
Data
[
:
pServer
.
Length
])
fmt
.
Println
(
pServer
.
Command
)
fmt
.
Println
(
"-----recv_ser:"
,
Sername
,
"len:"
,
len
(
Sername
))
log
.
Println
(
pServer
.
Command
)
log
.
Println
(
"-----recv_ser:"
,
Sername
,
"len:"
,
len
(
Sername
))
if
pServer
.
SerType
==
config
.
SERTYPE_CONSUME
{
//注册的消费者,添加到消费服务列表
...
...
@@ -88,7 +88,7 @@ func RecvClient(conn net.Conn, ctrser *model.Ctrl_Srvermap) {
var
mis
mission
.
Message
err
:=
json
.
Unmarshal
(
pServer
.
Data
[
:
pServer
.
Length
],
&
mis
)
if
err
!=
nil
{
fmt
.
Println
(
"COMMAND_TASK_BACK json.Unmarshal error"
)
log
.
Println
(
"COMMAND_TASK_BACK json.Unmarshal error"
)
continue
}
if
pServer
.
PushStatus
==
config
.
MSG_SUCCESSED
{
...
...
work/work.go
View file @
62cac69d
...
...
@@ -8,7 +8,6 @@ import (
"ficus_router/mqcontrol"
"ficus_router/redisctrl"
"ficus_router/rpcid"
"fmt"
"time"
log
"github.com/sirupsen/logrus"
...
...
@@ -29,15 +28,11 @@ func callback(d mqcontrol.MSG, ctrser *model.Ctrl_Srvermap, redisctrl *redisctrl
log
.
Info
(
string
(
body
))
failOnError
(
err
,
"recv:json.Unmarshal - ficus.Mission"
)
if
err
==
nil
{
// { //日志
// config.Eyelog.WriteCallAndLog(mis.ID, "", "从 rabbitmq 中 读取到一条消息 ID:", mis.ID)
// }
//获取 客户端 连接到的 服务 Traceid
//SerName := Ctrl_Redismsg.GetClientConnSerTraceid(mis.Agent)
{
//1.写入 redis -> 接受到的所有消息都需要写入 redis 缓存消息
var
msg
config
.
T_CMsg
msg
.
RouteName
=
""
fmt
.
Println
(
"-1"
,
mis
.
ID
)
msg
.
Mis
=
mis
msg
.
RouteName
=
config
.
OctopusConfig
.
Traceid
//经过的路由器
msg
.
PushStatus
=
config
.
MSG_WAITSUCCESS
...
...
@@ -45,7 +40,6 @@ func callback(d mqcontrol.MSG, ctrser *model.Ctrl_Srvermap, redisctrl *redisctrl
msg
.
Rpcid
=
rpcid
.
AppendRpcid
(
""
)
redisctrl
.
Write_Chan_RedisMsg
(
msg
)
}
log
.
Info
(
"步骤2"
)
{
//2.直接写入 执行管道
//fmt.Println("SerName:",SerName);
var
rm
model
.
SerMsg
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment