Commit af5cc75d by yunpeng.song

更新package 和device service thrift 协议

parent ac9d8fab
...@@ -8,15 +8,16 @@ const ( ...@@ -8,15 +8,16 @@ const (
type ( type (
AppConfig struct { AppConfig struct {
Uuid string `json:"uuid"` //"路由的唯一,追踪的标示" Uuid string `json:"uuid"` //"路由的唯一,追踪的标示"
Name string `json:"name"` //"项目的唯一标示" Name string `json:"name"` //"项目的唯一标示"
PrivateIp string `json:"privateIp"` //机器内网ip PrivateIp string `json:"privateIp"` //机器内网ip
PublicIp string `json:"pulicIp"` //机器公网ip PublicIp string `json:"pulicIp"` //机器公网ip
Logpath string `json:"logpath"` // 日志地址 Logpath string `json:"logpath"` // 日志地址
Deviceurl string `json:"deviceurl"` Deviceurl string `json:"deviceurl"`
Deviceport string `json:"deviceport"` Deviceport string `json:"deviceport"`
RetryTime int `json:"retrytime"` //请求重试间隔时间 RetryTime int `json:"retrytime"` //请求重试间隔时间
RetryTimes int `json:"retrytimes"` //请求重试次数 RetryTimes int `json:"retrytimes"` //请求重试次数
MsgChanLength int `josn:"msgchanlength"` //消息管道长度
} }
RedisKey struct { RedisKey struct {
......
...@@ -105,6 +105,14 @@ func GetLogPath() string { ...@@ -105,6 +105,14 @@ func GetLogPath() string {
return AppCfg.Logpath return AppCfg.Logpath
} }
func GetMsgChanLength() int {
length := AppCfg.MsgChanLength
if length == 0 {
return 10000
}
return AppCfg.MsgChanLength
}
// GetDeviceServerName 返回 DeviceServer // GetDeviceServerName 返回 DeviceServer
func GetDeviceServerName() string { func GetDeviceServerName() string {
return EurekaKey.DeviceServer return EurekaKey.DeviceServer
......
...@@ -131,7 +131,10 @@ func (p *MsgSender) DispatchMsg(msg *model.Msg) (r bool, err error) { ...@@ -131,7 +131,10 @@ func (p *MsgSender) DispatchMsg(msg *model.Msg) (r bool, err error) {
r = true r = true
} }
} }
log.Debug("消息推送 result ", r, err) if err != nil {
log.Error("消息推送 result ", msg.MsgId, err)
}
//log.Debug("消息推送 result ", msg.MsgId, r, err)
return return
} }
......
...@@ -112,3 +112,18 @@ func (d *DeviceHandle) ListByRegionEx(ctx context.Context, rgn string, page int3 ...@@ -112,3 +112,18 @@ func (d *DeviceHandle) ListByRegionEx(ctx context.Context, rgn string, page int3
func (d *DeviceHandle) ListByVersionEx(ctx context.Context, ver string, page int32, pageSize int32) (r *service.DevicesExPage, err error) { func (d *DeviceHandle) ListByVersionEx(ctx context.Context, ver string, page int32, pageSize int32) (r *service.DevicesExPage, err error) {
return return
} }
// Parameters:
// - Filter
// - Page
// - PageSize
func (d *DeviceHandle) ListDevicesEx(ctx context.Context, filter *service.DeviceFilter, page int32, pageSize int32) (r *service.DevicesExPage, err error) {
return
}
// Parameters:
// - UUID
// - UpdStatus
func (d *DeviceHandle) FeedbackUpgrade(ctx context.Context, uuid string, updStatus int32) (r bool, err error) {
return
}
...@@ -31,6 +31,38 @@ func (p *PackageHandle) GetSpecByPartner(ctx context.Context, pkgName string, ve ...@@ -31,6 +31,38 @@ func (p *PackageHandle) GetSpecByPartner(ctx context.Context, pkgName string, ve
return p.PackageHandler.GetSpecByPartner(ctx, pkgName, ver, partnerId) return p.PackageHandler.GetSpecByPartner(ctx, pkgName, ver, partnerId)
} }
// Parameters:
// - SessionId
// - PartnerId
// - Timeout
// - UploadSpec
// - Content
func (p *PackageHandle) Upload(ctx context.Context, sessionId string, partnerId string, timeout int32, uploadSpec *pkg.UploadSpec, content []byte) (r *pkg.UploadSpec, err error) {
return
}
// Parameters:
// - SessionId
func (p *PackageHandle) Pack(ctx context.Context, sessionId string) (r *pkg.PackageSpec, err error) {
return
}
// Parameters:
// - PartnerId
// - Page
// - PageSize
func (p *PackageHandle) ListPackageEx(ctx context.Context, partnerId string, page int32, pageSize int32) (r *service.PackagesPage, err error) {
return
}
// Parameters:
// - Name
// - Ver
// - Enabled
func (p *PackageHandle) EnablePackage(ctx context.Context, name string, ver string, enabled bool) (r bool, err error) {
return
}
func (p *PackageHandle) SetDispatch() (err error) { func (p *PackageHandle) SetDispatch() (err error) {
return p.PackageHandler.SetDispatch() return p.PackageHandler.SetDispatch()
} }
...@@ -32,7 +32,7 @@ func NewScheduleHandle(m *model.MsgControl, c *model.ClientManager, s ScheduleHa ...@@ -32,7 +32,7 @@ func NewScheduleHandle(m *model.MsgControl, c *model.ClientManager, s ScheduleHa
} }
func (s *ScheduleHandle) Schedule(ctx context.Context, request *mission.Message) (r bool, err error) { func (s *ScheduleHandle) Schedule(ctx context.Context, request *mission.Message) (r bool, err error) {
log.Info("Schedule", request) //log.Info("Schedule", request)
if !s.clientManager.IsKey(request.Device) { if !s.clientManager.IsKey(request.Device) {
return false, errors.New("机器不在线") return false, errors.New("机器不在线")
} }
......
...@@ -117,6 +117,21 @@ func (d *DeviceDispatch) ListExpired(ctx context.Context, seconds int32, ip stri ...@@ -117,6 +117,21 @@ func (d *DeviceDispatch) ListExpired(ctx context.Context, seconds int32, ip stri
return return
} }
// Parameters:
// - Filter
// - Page
// - PageSize
func (d *DeviceDispatch) ListDevicesEx(ctx context.Context, filter *service.DeviceFilter, page int32, pageSize int32) (r *service.DevicesExPage, err error) {
return
}
// Parameters:
// - UUID
// - UpdStatus
func (d *DeviceDispatch) FeedbackUpgrade(ctx context.Context, uuid string, updStatus int32) (r bool, err error) {
return
}
// 创建client // 创建client
func (d *DeviceDispatch) SetDispatch() error { func (d *DeviceDispatch) SetDispatch() error {
ip, port, err := tool.GetApp(config.GetDeviceServerName()) ip, port, err := tool.GetApp(config.GetDeviceServerName())
......
...@@ -90,3 +90,18 @@ func (d *DeviceHandle) ListByGroupEx(ctx context.Context, group string, page int ...@@ -90,3 +90,18 @@ func (d *DeviceHandle) ListByGroupEx(ctx context.Context, group string, page int
func (d *DeviceHandle) ListByRegionEx(ctx context.Context, rgn string, page int32, pageSize int32) (r *service.DevicesExPage, err error) { func (d *DeviceHandle) ListByRegionEx(ctx context.Context, rgn string, page int32, pageSize int32) (r *service.DevicesExPage, err error) {
return nil, nil return nil, nil
} }
// Parameters:
// - Filter
// - Page
// - PageSize
func (d *DeviceHandle) ListDevicesEx(ctx context.Context, filter *service.DeviceFilter, page int32, pageSize int32) (r *service.DevicesExPage, err error) {
return
}
// Parameters:
// - UUID
// - UpdStatus
func (d *DeviceHandle) FeedbackUpgrade(ctx context.Context, uuid string, updStatus int32) (r bool, err error) {
return
}
...@@ -129,6 +129,8 @@ func (p *DispatchMsg) Dispatch(ctx context.Context, request *mission.Message) (e ...@@ -129,6 +129,8 @@ func (p *DispatchMsg) Dispatch(ctx context.Context, request *mission.Message) (e
proto := thrift.NewTBinaryProtocol(buff, true, true) proto := thrift.NewTBinaryProtocol(buff, true, true)
request.Write(proto) request.Write(proto)
err = p.callbackClient.Dispatch(ctx, request) err = p.callbackClient.Dispatch(ctx, request)
log.Info("PushClientDispatch end ", err, "request context", request) if err != nil {
log.Error("PushClientDispatch end ", err, "request context", request)
}
return err return err
} }
...@@ -3,6 +3,7 @@ package thriftservice ...@@ -3,6 +3,7 @@ package thriftservice
import ( import (
"context" "context"
"ficus/pkg" "ficus/pkg"
"ficus/service"
"ficus_clientserver/FileController" "ficus_clientserver/FileController"
"ficus_clientserver/nethandle" "ficus_clientserver/nethandle"
...@@ -39,6 +40,22 @@ func (s *PackageHandle) GetSpecByPartner(ctx context.Context, pkgName string, ve ...@@ -39,6 +40,22 @@ func (s *PackageHandle) GetSpecByPartner(ctx context.Context, pkgName string, ve
return PackageHandler.GetSpecByPartner(ctx, pkgName, ver, partnerId) return PackageHandler.GetSpecByPartner(ctx, pkgName, ver, partnerId)
} }
func (s *PackageHandle) Upload(ctx context.Context, sessionId string, partnerId string, timeout int32, uploadSpec *pkg.UploadSpec, content []byte) (r *pkg.UploadSpec, err error) {
return
}
func (s *PackageHandle) Pack(ctx context.Context, sessionId string) (r *pkg.PackageSpec, err error) {
return
}
func (s *PackageHandle) ListPackageEx(ctx context.Context, partnerId string, page int32, pageSize int32) (r *service.PackagesPage, err error) {
return
}
func (s *PackageHandle) EnablePackage(ctx context.Context, name string, ver string, enabled bool) (r bool, err error) {
return
}
func (s *PackageDispatch) GetSpec(ctx context.Context, pkgName string, ver string) (r *pkg.PackageSpec, err error) { func (s *PackageDispatch) GetSpec(ctx context.Context, pkgName string, ver string) (r *pkg.PackageSpec, err error) {
log.Info("GetSpec", pkgName, ver) log.Info("GetSpec", pkgName, ver)
fcos := FileController.NewOss() fcos := FileController.NewOss()
...@@ -67,6 +84,22 @@ func (s *PackageDispatch) GetSpecByPartner(ctx context.Context, pkgName string, ...@@ -67,6 +84,22 @@ func (s *PackageDispatch) GetSpecByPartner(ctx context.Context, pkgName string,
return return
} }
func (s *PackageDispatch) Upload(ctx context.Context, sessionId string, partnerId string, timeout int32, uploadSpec *pkg.UploadSpec, content []byte) (r *pkg.UploadSpec, err error) {
return
}
func (s *PackageDispatch) Pack(ctx context.Context, sessionId string) (r *pkg.PackageSpec, err error) {
return
}
func (s *PackageDispatch) ListPackageEx(ctx context.Context, partnerId string, page int32, pageSize int32) (r *service.PackagesPage, err error) {
return
}
func (s *PackageDispatch) EnablePackage(ctx context.Context, name string, ver string, enabled bool) (r bool, err error) {
return
}
func (d *PackageDispatch) SetDispatch() error { func (d *PackageDispatch) SetDispatch() error {
return nil return nil
} }
...@@ -89,7 +89,7 @@ func ConsoleIn() { ...@@ -89,7 +89,7 @@ func ConsoleIn() {
func InitGlobleVar() { func InitGlobleVar() {
QuitFlag = make(chan int) QuitFlag = make(chan int)
psend := mqcontrol.NewProducer("myPusher", "myQueue") psend := mqcontrol.NewProducer("myPusher", "myQueue")
sendChan := make(chan string, 50000) sendChan := make(chan string, config.GetMsgChanLength())
msgMap := model.NewMsgmap() msgMap := model.NewMsgmap()
ClientManager = model.GetDefaultClientMap() ClientManager = model.GetDefaultClientMap()
RedisClient = model.NewRedisClient(config.GetRedisAddress(), config.GetRedisPWD()) RedisClient = model.NewRedisClient(config.GetRedisAddress(), config.GetRedisPWD())
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment