Commit bb74bd03 by yunpeng.song

Merge branch 'release/v1.0.0.2'

parents 2aee66c9 90c8ecf0
...@@ -5,8 +5,10 @@ ...@@ -5,8 +5,10 @@
"privateIp": "172.16.1.133", "privateIp": "172.16.1.133",
"pulicIp": "172.16.1.133", "pulicIp": "172.16.1.133",
"logpath":"f:\\log\\ficusSchedule", "logpath":"f:\\log\\ficusSchedule",
"deviceurl":"172.16.1.288", "deviceurl":"172.16.1.228",
"deviceport":"9099" "deviceport":"9099",
"retrytime":1000,
"retrytimes":3
}, },
"prefix": { "prefix": {
"msgname": "msg", "msgname": "msg",
......
...@@ -15,6 +15,8 @@ type ( ...@@ -15,6 +15,8 @@ type (
Logpath string `json:"logpath"` // 日志地址 Logpath string `json:"logpath"` // 日志地址
Deviceurl string `json:"deviceurl"` Deviceurl string `json:"deviceurl"`
Deviceport string `json:"deviceport"` Deviceport string `json:"deviceport"`
RetryTime int `json:"retrytime"` //请求重试间隔时间
RetryTimes int `json:"retrytimes"` //请求重试次数
} }
RedisKey struct { RedisKey struct {
......
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
"os" "os"
"os/exec" "os/exec"
"strings" "strings"
"time"
) )
const CONFIG_CONFIGNAME = "cfg.json" const CONFIG_CONFIGNAME = "cfg.json"
...@@ -173,3 +174,20 @@ func GetCallBackServerAddress(server string) (string, error) { ...@@ -173,3 +174,20 @@ func GetCallBackServerAddress(server string) (string, error) {
} }
return address.(string), nil return address.(string), nil
} }
func GetRetryTime() time.Duration {
t := AppCfg.RetryTime
if t == 0 {
t = 1000
}
rt := time.Duration(int64(t))
return rt
}
func GetRetryTimes() int {
times := AppCfg.RetryTimes
if times == 0 {
times = 3
}
return times
}
...@@ -2,8 +2,11 @@ package control ...@@ -2,8 +2,11 @@ package control
import ( import (
"ficus/mission" "ficus/mission"
"ficus/proto"
"ficus_clientserver/config"
"ficus_clientserver/model" "ficus_clientserver/model"
"ficus_clientserver/nethandle/thriftservice" "ficus_clientserver/nethandle/thriftservice"
"ficus_clientserver/tool"
"time" "time"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
...@@ -87,12 +90,27 @@ func (p *MsgSender) WriteMsgToSendChan(msg *model.Msg) { ...@@ -87,12 +90,27 @@ func (p *MsgSender) WriteMsgToSendChan(msg *model.Msg) {
func (p *MsgSender) AddMsgTries(msg *model.Msg) bool { func (p *MsgSender) AddMsgTries(msg *model.Msg) bool {
if msg.TriesAddOne() { if msg.TriesAddOne() {
log.Info("已经达到最大尝试次数", msg.MsgId) log.Info("已经达到最大尝试次数", msg.MsgId)
p.msgControl.DeleteMsg(msg.MsgId) go p.MsgFeedBack(msg)
return true return true
} }
return false return false
} }
func (p *MsgSender) MsgFeedBack(msg *model.Msg) {
serverName := msg.Message.Proto
if serverName != proto.Type_HEARTBEAT {
d := thriftservice.NewDispatchMsgTrans(nil)
tool.Retry(config.GetRetryTimes(), config.GetRetryTime(), func() func() error {
temp := msg
return func() error {
_, err := d.Feedback(temp.Message)
return err
}
}())
p.msgControl.DeleteMsg(msg.MsgId)
}
}
// IsMatch 消息是否符合入机条件 // IsMatch 消息是否符合入机条件
func (p *MsgSender) IsMatch(msg *model.Msg) (r bool) { func (p *MsgSender) IsMatch(msg *model.Msg) (r bool) {
code, err := p.msgControl.GetMsgStatusCode(msg.MsgId) code, err := p.msgControl.GetMsgStatusCode(msg.MsgId)
......
...@@ -14,6 +14,7 @@ import ( ...@@ -14,6 +14,7 @@ import (
"fmt" "fmt"
"os" "os"
"os/exec" "os/exec"
"runtime/debug"
"strings" "strings"
"time" "time"
...@@ -60,8 +61,8 @@ var ( ...@@ -60,8 +61,8 @@ var (
) )
func main() { func main() {
//InitService() InitService()
Work() //Work()
} }
func InitService() { func InitService() {
...@@ -114,19 +115,47 @@ func InitTimeTick() { ...@@ -114,19 +115,47 @@ func InitTimeTick() {
} }
func Work() { func Work() {
CatchDump()
//读取配置文件 //读取配置文件
InitCfg() InitCfg()
InitLog() InitLog()
InitGlobleVar() InitGlobleVar()
InitTimeTick() InitTimeTick()
StartThrift() StartThrift()
go ConsoleIn() //go ConsoleIn()
http.ListenAndServe(":8080", nil) http.ListenAndServe(":8080", nil)
//阻塞主协程 //阻塞主协程
<-QuitFlag <-QuitFlag
log.Println("退出") log.Println("退出")
} }
func CatchDump() {
errs := recover()
if errs == nil {
return
}
exeName := os.Args[0] //获取程序名称
now := time.Now() //获取当前时间
pid := os.Getpid() //获取进程ID
time_str := now.Format("20060102150405") //设定时间格式
fname := fmt.Sprintf("%s-%d-%s-dump.log", exeName, pid, time_str) //保存错误信息文件名:程序名-进程ID-当前时间(年月日时分秒)
fmt.Println("dump to file ", fname)
f, err := os.Create(fname)
if err != nil {
return
}
defer f.Close()
f.WriteString(fmt.Sprintf("%v\r\n", errs)) //输出panic信息
f.WriteString("========\r\n")
f.WriteString(string(debug.Stack())) //输出堆栈信息
os.Exit(0)
}
// ConsoleIn 控制台输入 // ConsoleIn 控制台输入
func ConsoleIn() { func ConsoleIn() {
inputReader := bufio.NewReader(os.Stdin) //创建一个读取器,并将其与标准输入绑定。 inputReader := bufio.NewReader(os.Stdin) //创建一个读取器,并将其与标准输入绑定。
......
...@@ -90,8 +90,7 @@ func (p *ClientManager) AddTimes(k string) bool { ...@@ -90,8 +90,7 @@ func (p *ClientManager) AddTimes(k string) bool {
return false return false
} }
if v.LostTimes == v.MaxLostTimes { if v.LostTimes == v.MaxLostTimes {
v.Trans.Close() p.DeleteClient(k)
delete(p.mapClient, k)
return true return true
} }
return false return false
...@@ -112,7 +111,12 @@ func (p *ClientManager) ClearTimes(k string) { ...@@ -112,7 +111,12 @@ func (p *ClientManager) ClearTimes(k string) {
// DeleteClient 删除客户端 // DeleteClient 删除客户端
func (p *ClientManager) DeleteClient(k string) { func (p *ClientManager) DeleteClient(k string) {
delete(p.mapClient, k) v, ok := p.mapClient[k]
if ok {
v.Trans.Close()
delete(p.mapClient, k)
}
} }
// SetTime 设置上次心跳时间 // SetTime 设置上次心跳时间
......
...@@ -3,9 +3,9 @@ package nethandle ...@@ -3,9 +3,9 @@ package nethandle
import ( import (
"ficus/device" "ficus/device"
"ficus/mission" "ficus/mission"
"ficus_clientserver/config"
"ficus_clientserver/model" "ficus_clientserver/model"
"ficus_clientserver/tool" "ficus_clientserver/tool"
"time"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
...@@ -53,7 +53,7 @@ func (d *DispatchHandle) Heartbeat(whom *device.Device) (r bool, err error) { ...@@ -53,7 +53,7 @@ func (d *DispatchHandle) Heartbeat(whom *device.Device) (r bool, err error) {
func (d *DispatchHandle) Feedback(response *mission.Message) (r bool, err error) { func (d *DispatchHandle) Feedback(response *mission.Message) (r bool, err error) {
log.WithFields(log.Fields{"data": string(response.Data)}).Info(response) log.WithFields(log.Fields{"data": string(response.Data)}).Info(response)
// TODO 更新消息状态 // TODO 更新消息状态
tool.Retry(3, time.Second*2, func() error { tool.Retry(config.GetRetryTimes(), config.GetRetryTime(), func() error {
// TODO 回调消息 // TODO 回调消息
d.dispatchService.Feedback(response) d.dispatchService.Feedback(response)
d.msgControl.DeleteMsg(response.ID) d.msgControl.DeleteMsg(response.ID)
......
...@@ -76,30 +76,37 @@ func (d *DeviceDispatch) Keep(device *device.Device, ip string) (r bool, err err ...@@ -76,30 +76,37 @@ func (d *DeviceDispatch) Keep(device *device.Device, ip string) (r bool, err err
} }
func (d *DeviceDispatch) GetPeer(uuid string) (r string, err error) { func (d *DeviceDispatch) GetPeer(uuid string) (r string, err error) {
defer d.tran.Close()
return "", nil return "", nil
} }
func (d *DeviceDispatch) GetDeviceEx(uuid string) (r *device.DeviceEx, err error) { func (d *DeviceDispatch) GetDeviceEx(uuid string) (r *device.DeviceEx, err error) {
defer d.tran.Close()
return return
} }
func (d *DeviceDispatch) ListByStatusEx(status int32, page int32, pageSize int32) (r *service.DevicesExPage, err error) { func (d *DeviceDispatch) ListByStatusEx(status int32, page int32, pageSize int32) (r *service.DevicesExPage, err error) {
defer d.tran.Close()
return nil, nil return nil, nil
} }
func (d *DeviceDispatch) ListByOrganizationEx(org *identity.Organization, page int32, pageSize int32) (r *service.DevicesExPage, err error) { func (d *DeviceDispatch) ListByOrganizationEx(org *identity.Organization, page int32, pageSize int32) (r *service.DevicesExPage, err error) {
defer d.tran.Close()
return nil, nil return nil, nil
} }
func (d *DeviceDispatch) ListByGroupEx(group string) (r *service.DevicesExPage, err error) { func (d *DeviceDispatch) ListByGroupEx(group string) (r *service.DevicesExPage, err error) {
defer d.tran.Close()
return nil, nil return nil, nil
} }
func (d *DeviceDispatch) ListByRegionEx(rgn string) (r *service.DevicesExPage, err error) { func (d *DeviceDispatch) ListByRegionEx(rgn string) (r *service.DevicesExPage, err error) {
defer d.tran.Close()
return nil, nil return nil, nil
} }
func (d *DeviceDispatch) ListByVersionEx(ver string, page int32, pageSize int32) (r *service.DevicesExPage, err error) { func (d *DeviceDispatch) ListByVersionEx(ver string, page int32, pageSize int32) (r *service.DevicesExPage, err error) {
defer d.tran.Close()
return return
} }
...@@ -128,10 +135,10 @@ func (d *DeviceDispatch) SetDispatch() error { ...@@ -128,10 +135,10 @@ func (d *DeviceDispatch) SetDispatch() error {
protocolFactory := thrift.NewTCompactProtocolFactory() protocolFactory := thrift.NewTCompactProtocolFactory()
// 打开Transport,与服务器进行连接 // 打开Transport,与服务器进行连接
if err := transport.Open(); err != nil { if err := transport.Open(); err != nil {
log.Error("Error opening socket to "+"localhost"+":"+"9999", err) log.Error("Error opening socket to "+"localhost"+":"+"9099", err)
return err return err
} }
client := service.NewDeviceServiceClientFactory(socket, protocolFactory) client := service.NewDeviceServiceClientFactory(transport, protocolFactory)
d.tran = transport d.tran = transport
d.client = client d.client = client
return nil return nil
......
...@@ -90,7 +90,6 @@ func (d *DispatchMsg) Feedback(response *mission.Message) (r bool, err error) { ...@@ -90,7 +90,6 @@ func (d *DispatchMsg) Feedback(response *mission.Message) (r bool, err error) {
// client登录接口 // client登录接口
func (d *DispatchMsg) Login(device *device.Device) (r bool, err error) { func (d *DispatchMsg) Login(device *device.Device) (r bool, err error) {
log.WithFields(log.Fields{"func": "Login", "登录id": device.ID}).Info(device)
//TODO 写路由信息到设备服务 ,待接入 eureka获取ip 和端口 //TODO 写路由信息到设备服务 ,待接入 eureka获取ip 和端口
go func() { go func() {
//判断是否存在 //判断是否存在
...@@ -106,10 +105,11 @@ func (d *DispatchMsg) Login(device *device.Device) (r bool, err error) { ...@@ -106,10 +105,11 @@ func (d *DispatchMsg) Login(device *device.Device) (r bool, err error) {
err := d.deviceDispatch.SetDispatch() err := d.deviceDispatch.SetDispatch()
if err != nil { if err != nil {
d.clientManager.DeleteClient(key)
return return
} }
// 请求失败重试机制 // 请求失败重试机制
err = tool.Retry(5, time.Second*2, func() func() error { err = tool.Retry(config.GetRetryTimes(), config.GetRetryTime(), func() func() error {
temp := device temp := device
ip := config.GetAppIp() ip := config.GetAppIp()
return func() error { return func() error {
...@@ -118,6 +118,7 @@ func (d *DispatchMsg) Login(device *device.Device) (r bool, err error) { ...@@ -118,6 +118,7 @@ func (d *DispatchMsg) Login(device *device.Device) (r bool, err error) {
} }
}()) }())
if err != nil { if err != nil {
d.clientManager.DeleteClient(key)
log.Errorln("Activate retry ", err) log.Errorln("Activate retry ", err)
} }
}() }()
......
...@@ -3,14 +3,9 @@ package thriftservice ...@@ -3,14 +3,9 @@ package thriftservice
import ( import (
"context" "context"
"ficus/pkg" "ficus/pkg"
"ficus/service"
"ficus_clientserver/FileController" "ficus_clientserver/FileController"
"ficus_clientserver/config"
"ficus_clientserver/nethandle" "ficus_clientserver/nethandle"
"ficus_clientserver/tool"
"net"
"github.com/apache/thrift/lib/go/thrift"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
...@@ -20,8 +15,6 @@ type ( ...@@ -20,8 +15,6 @@ type (
} }
PackageDispatch struct { PackageDispatch struct {
client *service.PackageServiceClient
tran *thrift.TFramedTransport
} }
) )
...@@ -55,26 +48,5 @@ func (s *PackageDispatch) GetSpec(pkgName string, ver string) (r *pkg.PackageSpe ...@@ -55,26 +48,5 @@ func (s *PackageDispatch) GetSpec(pkgName string, ver string) (r *pkg.PackageSpe
} }
func (d *PackageDispatch) SetDispatch() error { func (d *PackageDispatch) SetDispatch() error {
ip, port, err := tool.GetApp(config.GetDeviceServerName())
if err != nil {
return err
}
//TODO 心跳
socket, err := thrift.NewTSocket(net.JoinHostPort(ip, port))
if err != nil {
log.Error("Error opening socket:", err)
return err
}
transport := thrift.NewTFramedTransport(socket)
// 创建二进制协议
protocolFactory := thrift.NewTCompactProtocolFactory()
// 打开Transport,与服务器进行连接
if err := transport.Open(); err != nil {
log.Error("Error opening socket to "+"localhost"+":"+"9999", err)
return err
}
client := service.NewPackageServiceClientFactory(socket, protocolFactory)
d.tran = transport
d.client = client
return nil return nil
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment