Commit 2519e2dc by yunpeng.song

对map 增加线程安全锁

parent 90c8ecf0
......@@ -107,8 +107,8 @@ func (p *MsgSender) MsgFeedBack(msg *model.Msg) {
return err
}
}())
p.msgControl.DeleteMsg(msg.MsgId)
}
p.msgControl.DeleteMsg(msg.MsgId)
}
// IsMatch 消息是否符合入机条件
......
package main
import (
"bufio"
native "ficus/native/service"
"ficus/service"
"ficus_clientserver/config"
"ficus_clientserver/control"
"ficus_clientserver/model"
"ficus_clientserver/mqcontrol"
"ficus_clientserver/nethandle"
"ficus_clientserver/nethandle/thriftservice"
"ficus_clientserver/tool"
"fmt"
"os"
"os/exec"
"runtime/debug"
"strings"
"time"
server "github.com/kardianos/service"
log "github.com/sirupsen/logrus"
"net/http"
_ "net/http/pprof"
"github.com/apache/thrift/lib/go/thrift"
)
type program struct{}
......@@ -61,10 +45,24 @@ var (
)
func main() {
InitService()
//Work()
//InitService()
Work()
}
func Work() {
CatchDump()
//读取配置文件
InitCfg()
InitLog()
InitGlobleVar()
InitTimeTick()
StartThrift()
//go ConsoleIn()
http.ListenAndServe(":38084", nil)
//阻塞主协程
<-QuitFlag
log.Println("退出")
}
func InitService() {
svcConfig := &server.Config{
Name: "FicusSchedule",
......@@ -101,180 +99,3 @@ func InitService() {
log.Println(err)
}
}
// InitCfg 初始化配置文件
func InitCfg() {
if err := config.LoadCfg(); err != nil {
log.Fatalln("InitCfg", err)
}
}
// InitTimeTick 初始化定时任务管理器
func InitTimeTick() {
tool.StartTicks(time.Second)
}
func Work() {
CatchDump()
//读取配置文件
InitCfg()
InitLog()
InitGlobleVar()
InitTimeTick()
StartThrift()
//go ConsoleIn()
http.ListenAndServe(":8080", nil)
//阻塞主协程
<-QuitFlag
log.Println("退出")
}
func CatchDump() {
errs := recover()
if errs == nil {
return
}
exeName := os.Args[0] //获取程序名称
now := time.Now() //获取当前时间
pid := os.Getpid() //获取进程ID
time_str := now.Format("20060102150405") //设定时间格式
fname := fmt.Sprintf("%s-%d-%s-dump.log", exeName, pid, time_str) //保存错误信息文件名:程序名-进程ID-当前时间(年月日时分秒)
fmt.Println("dump to file ", fname)
f, err := os.Create(fname)
if err != nil {
return
}
defer f.Close()
f.WriteString(fmt.Sprintf("%v\r\n", errs)) //输出panic信息
f.WriteString("========\r\n")
f.WriteString(string(debug.Stack())) //输出堆栈信息
os.Exit(0)
}
// ConsoleIn 控制台输入
func ConsoleIn() {
inputReader := bufio.NewReader(os.Stdin) //创建一个读取器,并将其与标准输入绑定。
fmt.Printf("Please enter some input: ")
for {
input, err := inputReader.ReadString('\n') //读取器对象提供一个方法 ReadString(delim byte) ,该方法从输入中读取内容,直到碰到 delim 指定的字符,然后将读取到的内容连同 delim 字符一起放到缓冲区。
if err == nil {
fmt.Printf("The input was: %s ,err:%s", input, err)
}
s := strings.Split(input, "\r\n")
if s[0] == "reload cfg" {
// 重载 配置文件
if err := config.LoadCfg(); err != nil {
fmt.Printf("reload failed %s", err)
}
continue
}
}
}
// InitGlobleVar 初始化全局变量
func InitGlobleVar() {
QuitFlag = make(chan int)
psend := mqcontrol.NewProducer("myPusher", "myQueue")
sendChan := make(chan string, 50000)
msgMap := model.NewMsgmap()
ClientManager = model.GetDefaultClientMap()
RedisClient = model.NewRedisClient(config.GetRedisAddress(), config.GetRedisPWD())
MsgControl = model.NewMsgControl(msgMap, psend, RedisClient, sendChan, 10)
MsgSender = control.NewMsgSender(MsgControl, sendChan, ClientManager, 10)
MsgSender.StartSendWork()
}
// InitEureka 初始化eureka 服务
func InitEureka() {
err := tool.ConnectEureka(getCurrentPath() + "local.gcfg")
if err != nil {
log.Panicln("ConnectEureka", err)
}
err = tool.LoadEurekaInstance(getCurrentPath() + "i.json")
if err != nil {
log.Panicln("LoadEurekaInstance", err)
}
tool.HeartLoop()
}
func getCurrentPath() string {
s, err := exec.LookPath(os.Args[0])
if err != nil {
panic(err)
}
i := strings.LastIndex(s, "\\")
path := string(s[0 : i+1])
return path
}
// StartThrift 开启thrift 服务
func StartThrift() {
go StartThriftSever1()
go StartThriftSever2()
go StartClientHeartWork()
}
// StartClientHeartWork 开始循环心跳工作
func StartClientHeartWork() {
time.Sleep(time.Second * time.Duration(config.GetHeartFirstTime()))
nethandle.HandExpiredDevice(MsgControl, ClientManager, thriftservice.NewDeviceDispatch())
}
// StartThriftSever1 开启client 的thrift 服务 复用socket实现双向通信
func StartThriftSever1() {
serverTransport, err := thrift.NewTServerSocket(config.GetLoginServerPort())
if err != nil {
log.Fatalln("Error!", err)
}
//redis 控制
processorFactory := thriftservice.NewMyTProcessorFactory(ClientManager, MsgControl)
protocolFactory := thrift.NewTCompactProtocolFactory()
transportFactory := thrift.NewTFramedTransportFactory(thrift.NewTTransportFactory())
server := thrift.NewTSimpleServerFactory4(processorFactory, serverTransport, transportFactory, protocolFactory)
log.Println("thrift server in", config.GetLoginServerPort())
if err := server.Serve(); err != nil {
panic(err)
}
}
// StartThriftSever2 开启client 的thrift 多服务 ,不复用socket
func StartThriftSever2() {
serverTransport, err := thrift.NewTServerSocket(config.GetMultServerPort())
if err != nil {
log.Fatalln(err)
}
TMultiplexedProcessor := thrift.NewTMultiplexedProcessor()
deviceHandle := thriftservice.NewDeviceHandle()
deviceProcessor := service.NewDeviceServiceProcessor(deviceHandle)
dispatchHandle := thriftservice.NewDispatchService(nil, ClientManager, MsgControl)
dispatchProcessor := service.NewDispatchServiceProcessor(dispatchHandle)
scheduleHandle := thriftservice.NewScheduleHandle(MsgControl, ClientManager)
scheduleProcessor := native.NewScheduleServiceProcessor(scheduleHandle)
packageHandle := thriftservice.NewPackageHandle()
packageProcessor := service.NewPackageServiceProcessor(packageHandle)
TMultiplexedProcessor.RegisterProcessor("Identity", deviceProcessor)
TMultiplexedProcessor.RegisterProcessor("Mission", dispatchProcessor)
TMultiplexedProcessor.RegisterProcessor("Schedule", scheduleProcessor)
TMultiplexedProcessor.RegisterProcessor("Package", packageProcessor)
protocolFactory := thrift.NewTCompactProtocolFactory()
transportFactory := thrift.NewTFramedTransportFactory(thrift.NewTTransportFactory())
server := thrift.NewTSimpleServer4(TMultiplexedProcessor, serverTransport, transportFactory, protocolFactory)
log.Println("thrift server in", config.GetMultServerPort())
if err := server.Serve(); err != nil {
log.Panicln(err)
}
}
......@@ -90,7 +90,9 @@ func (p *ClientManager) AddTimes(k string) bool {
return false
}
if v.LostTimes == v.MaxLostTimes {
p.DeleteClient(k)
//不调用 DeleteClient ,防止死锁
v.Trans.Close()
delete(p.mapClient, k)
return true
}
return false
......@@ -111,6 +113,8 @@ func (p *ClientManager) ClearTimes(k string) {
// DeleteClient 删除客户端
func (p *ClientManager) DeleteClient(k string) {
p.Lock.Lock()
defer p.Lock.Unlock()
v, ok := p.mapClient[k]
if ok {
v.Trans.Close()
......
......@@ -3,9 +3,11 @@ package model
import (
"encoding/json"
"ficus/mission"
"ficus/proto"
"ficus_clientserver/config"
"ficus_clientserver/mqcontrol"
"ficus_clientserver/tool"
"sync"
"time"
log "github.com/sirupsen/logrus"
......@@ -13,13 +15,13 @@ import (
// MsgControl 消息管理器
type MsgControl struct {
msgMap *MsgMap // 所有到达路由的消息
redisClient *RedisClient // redis控制器
mqProducer *mqcontrol.Producer // mq控制器
timerMap map[string]*tool.Timer // 消息缓存定时删除器
sendChan chan string // 发送管道
msgList *MsgList // 消息队列
timeout int64 // 消息队列处理时间间隔
msgMap *MsgMap // 所有到达路由的消息
redisClient *RedisClient // redis控制器
mqProducer *mqcontrol.Producer // mq控制器
timerMap sync.Map // 消息缓存定时删除器
sendChan chan string // 发送管道
msgList *MsgList // 消息队列
timeout int64 // 消息队列处理时间间隔
}
// NewMsgControl 创建新的消息管理器
......@@ -31,7 +33,6 @@ func NewMsgControl(msgMap *MsgMap, mqProducer *mqcontrol.Producer, redisClient *
p.sendChan = sendChan
p.timeout = timeout
p.msgList = NewMsgList()
p.timerMap = make(map[string]*tool.Timer)
p.Init()
return p
}
......@@ -55,7 +56,12 @@ func (p *MsgControl) LoadRedisMsg() {
}
log.Println("load msg ", m.MsgId)
// 增加消息
p.AddMsg(m)
serverName := m.Message.Proto
if serverName != proto.Type_HEARTBEAT {
p.AddMsg(m)
} else {
p.DeleteMsg(m.MsgId)
}
}
}
......@@ -83,7 +89,7 @@ func (p *MsgControl) setTimerCall(msg *Msg) {
log.Info("删除消息", msg.MsgId)
go p.DeleteMsg(msg.MsgId)
})
p.timerMap[msg.MsgId] = tr
p.timerMap.Store(msg.MsgId, tr)
}
}
......@@ -178,5 +184,5 @@ func (p *MsgControl) PushMsgToMq(msgId string) {
func (p *MsgControl) DeleteMsg(msgId string) {
p.msgMap.DeleteMsg(msgId)
p.redisClient.DeleteMsgFromRedis(msgId)
delete(p.timerMap, msgId)
p.timerMap.Delete(msgId)
}
......@@ -3,15 +3,7 @@ package nethandle
import (
"ficus/device"
"ficus/identity"
"ficus/mission"
"ficus/proto"
"ficus/service"
"ficus_clientserver/config"
"ficus_clientserver/model"
"time"
uuid "github.com/satori/go.uuid"
log "github.com/sirupsen/logrus"
)
type (
......@@ -27,8 +19,8 @@ type (
ListByStatusEx(status int32, page int32, pageSize int32) (r *service.DevicesExPage, err error)
ListByOrganizationEx(org *identity.Organization, page int32, pageSize int32) (r *service.DevicesExPage, err error)
ListExpired(seconds int32, ip string) (r *service.DevicesPage, err error)
ListByGroupEx(group string) (r *service.DevicesExPage, err error)
ListByRegionEx(rgn string) (r *service.DevicesExPage, err error)
ListByGroupEx(group string, page int32, pageSize int32) (r *service.DevicesExPage, err error)
ListByRegionEx(rgn string, page int32, pageSize int32) (r *service.DevicesExPage, err error)
ListByVersionEx(ver string, page int32, pageSize int32) (r *service.DevicesExPage, err error)
SetDispatch() error
}
......@@ -123,76 +115,14 @@ func (d *DeviceHandle) ListExpired(seconds int32, ip string) (r *service.Devices
return nil, nil
}
func (d *DeviceHandle) ListByGroupEx(group string) (r *service.DevicesExPage, err error) {
func (d *DeviceHandle) ListByGroupEx(group string, page int32, pageSize int32) (r *service.DevicesExPage, err error) {
return nil, nil
}
func (d *DeviceHandle) ListByRegionEx(rgn string) (r *service.DevicesExPage, err error) {
func (d *DeviceHandle) ListByRegionEx(rgn string, page int32, pageSize int32) (r *service.DevicesExPage, err error) {
return nil, nil
}
func (d *DeviceHandle) ListByVersionEx(ver string, page int32, pageSize int32) (r *service.DevicesExPage, err error) {
return
}
// 处理空闲设备,实现由上到下的心跳机制
func HandExpiredDevice(msgControl *model.MsgControl, clientManager *model.ClientManager, deviceDispatch DeviceHandler) {
// TODO 获取服务ip
log.Info("HandExpiredDevice start")
for {
err := deviceDispatch.SetDispatch()
if err != nil {
log.Info(config.GetHeartLoopTime(), err)
time.Sleep(time.Duration(config.GetHeartLoopTime()) * time.Second)
continue
}
ip := config.GetAppIp()
res, err := deviceDispatch.ListExpired(int32(config.GetHeartTimeout()), ip)
if err != nil {
log.Error("HandExpiredDevice", err)
} else {
log.Info("遍历设备")
for _, v := range res.Devices {
go func(dd *device.Device) {
if !clientManager.IsKey(dd.ID) {
if err = deviceDispatch.SetDispatch(); err != nil {
log.Info("连接创建失败 ", dd.ID, err)
return
}
if OK, err := deviceDispatch.Deactivate(dd.ID, ip); !OK {
log.Info("终端没有上线 id ", dd.ID, err)
}
return
}
if clientManager.AddTimes(dd.ID) {
if err = deviceDispatch.SetDispatch(); err != nil {
log.Info("连接创建失败 ", dd.ID, err)
return
}
if OK, err := deviceDispatch.Deactivate(dd.ID, ip); !OK {
log.Info("终端已经掉线 ", dd.ID, err)
}
return
}
log.Info("空闲设备 id ", dd.ID)
m := mission.Message{}
id, _ := uuid.NewV4()
m.ID = id.String()
m.State = &mission.MissionStatus{
Code: mission.MissionStatusCode_DISPATCHED,
Msg: "",
}
m.Priority = 5
m.Tries = 0
m.TriesMax = 1
m.Device = dd.ID
m.Proto = proto.Type_HEARTBEAT
msgControl.AddMessage(&m)
}(v)
}
}
log.Info(config.GetHeartTimeout())
time.Sleep(time.Duration(config.GetHeartLoopTime()) * time.Second)
}
}
......@@ -4,7 +4,7 @@ import "ficus/pkg"
type (
PackageHandler interface {
GetSpec(pkgName string, ver string) (r *pkg.PackageSpec, err error)
GetSpec(pkgName string, ver string, partnerId string) (r *pkg.PackageSpec, err error)
SetDispatch() error
}
PackageHandle struct {
......@@ -19,8 +19,8 @@ func NewPackageHandle(p PackageHandler) PackageHandler {
return h
}
func (p *PackageHandle) GetSpec(pkgName string, ver string) (r *pkg.PackageSpec, err error) {
return p.PackageHandler.GetSpec(pkgName, ver)
func (p *PackageHandle) GetSpec(pkgName string, ver string, partnerId string) (r *pkg.PackageSpec, err error) {
return p.PackageHandler.GetSpec(pkgName, ver, partnerId)
}
func (p *PackageHandle) SetDispatch() (err error) {
......
......@@ -95,12 +95,12 @@ func (d *DeviceDispatch) ListByOrganizationEx(org *identity.Organization, page i
return nil, nil
}
func (d *DeviceDispatch) ListByGroupEx(group string) (r *service.DevicesExPage, err error) {
func (d *DeviceDispatch) ListByGroupEx(group string, page int32, pageSize int32) (r *service.DevicesExPage, err error) {
defer d.tran.Close()
return nil, nil
}
func (d *DeviceDispatch) ListByRegionEx(rgn string) (r *service.DevicesExPage, err error) {
func (d *DeviceDispatch) ListByRegionEx(rgn string, page int32, pageSize int32) (r *service.DevicesExPage, err error) {
defer d.tran.Close()
return nil, nil
}
......
......@@ -78,9 +78,9 @@ func (d *DeviceHandle) ListByOrganizationEx(ctx context.Context, org *identity.O
func (d *DeviceHandle) ListExpired(ctx context.Context, seconds int32, ip string) (r *service.DevicesPage, err error) {
return nil, nil
}
func (d *DeviceHandle) ListByGroupEx(ctx context.Context, group string) (r *service.DevicesExPage, err error) {
func (d *DeviceHandle) ListByGroupEx(ctx context.Context, group string, page int32, pageSize int32) (r *service.DevicesExPage, err error) {
return nil, nil
}
func (d *DeviceHandle) ListByRegionEx(ctx context.Context, rgn string) (r *service.DevicesExPage, err error) {
func (d *DeviceHandle) ListByRegionEx(ctx context.Context, rgn string, page int32, pageSize int32) (r *service.DevicesExPage, err error) {
return nil, nil
}
......@@ -46,7 +46,6 @@ func (p *DispatchMsg) SetDispatch() error {
// client心跳接口
func (d *DispatchMsg) Heartbeat(whom *device.Device) (r bool, err error) {
log.WithFields(log.Fields{"func": "Heartbeat", "心跳id": whom.ID}).Info(whom)
key := whom.ID
if d.clientManager.IsKey(key) {
d.clientManager.UpdateHartTime(key)
......
......@@ -30,11 +30,11 @@ func NewPackageDispatch() *PackageDispatch {
}
// 接受消息
func (s *PackageHandle) GetSpec(ctx context.Context, pkgName string, ver string) (r *pkg.PackageSpec, err error) {
return s.PackageHandler.GetSpec(pkgName, ver)
func (s *PackageHandle) GetSpec(ctx context.Context, pkgName string, ver string, partnerId string) (r *pkg.PackageSpec, err error) {
return s.PackageHandler.GetSpec(pkgName, ver, partnerId)
}
func (s *PackageDispatch) GetSpec(pkgName string, ver string) (r *pkg.PackageSpec, err error) {
func (s *PackageDispatch) GetSpec(pkgName string, ver string, partnerId string) (r *pkg.PackageSpec, err error) {
log.Info("GetSpec", pkgName, ver)
fcos := FileController.NewOss()
md5, url := fcos.GetFileInfo(pkgName)
......@@ -42,6 +42,7 @@ func (s *PackageDispatch) GetSpec(pkgName string, ver string) (r *pkg.PackageSpe
r.Pkg = pkg.NewPackage()
r.Pkg.Name = pkgName
r.Pkg.Version = ver
r.Pkg.PartnerId = partnerId
r.Md5 = md5
r.URL = url
return
......
package main
import (
"bufio"
"ficus/device"
"ficus/mission"
native "ficus/native/service"
"ficus/proto"
"ficus/service"
"ficus_clientserver/config"
"ficus_clientserver/control"
"ficus_clientserver/model"
"ficus_clientserver/mqcontrol"
"ficus_clientserver/nethandle/thriftservice"
"ficus_clientserver/tool"
"fmt"
"os"
"os/exec"
"runtime/debug"
"strings"
"time"
"github.com/apache/thrift/lib/go/thrift"
uuid "github.com/satori/go.uuid"
log "github.com/sirupsen/logrus"
)
// InitCfg 初始化配置文件
func InitCfg() {
if err := config.LoadCfg(); err != nil {
log.Fatalln("InitCfg", err)
}
}
// InitTimeTick 初始化定时任务管理器
func InitTimeTick() {
tool.StartTicks(time.Second)
}
func CatchDump() {
errs := recover()
if errs == nil {
return
}
exeName := os.Args[0] //获取程序名称
now := time.Now() //获取当前时间
pid := os.Getpid() //获取进程ID
time_str := now.Format("20060102150405") //设定时间格式
fname := fmt.Sprintf("%s-%d-%s-dump.log", exeName, pid, time_str) //保存错误信息文件名:程序名-进程ID-当前时间(年月日时分秒)
fmt.Println("dump to file ", fname)
f, err := os.Create(fname)
if err != nil {
return
}
defer f.Close()
f.WriteString(fmt.Sprintf("%v\r\n", errs)) //输出panic信息
f.WriteString("========\r\n")
f.WriteString(string(debug.Stack())) //输出堆栈信息
os.Exit(0)
}
// ConsoleIn 控制台输入
func ConsoleIn() {
inputReader := bufio.NewReader(os.Stdin) //创建一个读取器,并将其与标准输入绑定。
fmt.Printf("Please enter some input: ")
for {
input, err := inputReader.ReadString('\n') //读取器对象提供一个方法 ReadString(delim byte) ,该方法从输入中读取内容,直到碰到 delim 指定的字符,然后将读取到的内容连同 delim 字符一起放到缓冲区。
if err == nil {
fmt.Printf("The input was: %s ,err:%s", input, err)
}
s := strings.Split(input, "\r\n")
if s[0] == "reload cfg" {
// 重载 配置文件
if err := config.LoadCfg(); err != nil {
fmt.Printf("reload failed %s", err)
}
continue
}
}
}
// InitGlobleVar 初始化全局变量
func InitGlobleVar() {
QuitFlag = make(chan int)
psend := mqcontrol.NewProducer("myPusher", "myQueue")
sendChan := make(chan string, 50000)
msgMap := model.NewMsgmap()
ClientManager = model.GetDefaultClientMap()
RedisClient = model.NewRedisClient(config.GetRedisAddress(), config.GetRedisPWD())
MsgControl = model.NewMsgControl(msgMap, psend, RedisClient, sendChan, 10)
MsgSender = control.NewMsgSender(MsgControl, sendChan, ClientManager, 10)
MsgSender.StartSendWork()
}
// InitEureka 初始化eureka 服务
func InitEureka() {
err := tool.ConnectEureka(getCurrentPath() + "local.gcfg")
if err != nil {
log.Panicln("ConnectEureka", err)
}
err = tool.LoadEurekaInstance(getCurrentPath() + "i.json")
if err != nil {
log.Panicln("LoadEurekaInstance", err)
}
tool.HeartLoop()
}
func getCurrentPath() string {
s, err := exec.LookPath(os.Args[0])
if err != nil {
panic(err)
}
i := strings.LastIndex(s, "\\")
path := string(s[0 : i+1])
return path
}
// StartThrift 开启thrift 服务
func StartThrift() {
go StartThriftSever1()
go StartThriftSever2()
go StartClientHeartWork()
}
// StartClientHeartWork 开始循环心跳工作
func StartClientHeartWork() {
// 处理空闲设备,实现由上到下的心跳机制
time.Sleep(time.Second * time.Duration(config.GetHeartFirstTime()))
log.Info("HandExpiredDevice start")
deviceDispatch := thriftservice.NewDeviceDispatch()
for {
err := deviceDispatch.SetDispatch()
if err != nil {
log.Info(config.GetHeartLoopTime(), err)
time.Sleep(time.Duration(config.GetHeartLoopTime()) * time.Second)
continue
}
ip := config.GetAppIp()
res, err := deviceDispatch.ListExpired(int32(config.GetHeartTimeout()), ip)
if err != nil || res == nil || res.Devices == nil {
log.Error("HandExpiredDevice", err)
} else {
log.Info("遍历设备")
for _, v := range res.Devices {
go func(dd *device.Device) {
devicedispatch := thriftservice.NewDeviceDispatch()
if !ClientManager.IsKey(dd.ID) {
if err = devicedispatch.SetDispatch(); err != nil {
log.Info("连接创建失败 ", dd.ID, err)
return
}
if OK, err := devicedispatch.Deactivate(dd.ID, ip); !OK {
log.Info("终端没有上线 id ", dd.ID, err)
}
return
}
if ClientManager.AddTimes(dd.ID) {
if err = devicedispatch.SetDispatch(); err != nil {
log.Info("连接创建失败 ", dd.ID, err)
return
}
if OK, err := devicedispatch.Deactivate(dd.ID, ip); !OK {
log.Info("终端已经掉线 ", dd.ID, err)
}
return
}
log.Info("空闲设备 id ", dd.ID)
m := mission.Message{}
id, _ := uuid.NewV4()
m.ID = id.String()
m.State = &mission.MissionStatus{
Code: mission.MissionStatusCode_DISPATCHED,
Msg: "",
}
m.Priority = 5
m.Tries = 0
m.TriesMax = 1
m.Device = dd.ID
m.Proto = proto.Type_HEARTBEAT
MsgControl.AddMessage(&m)
}(v)
}
}
log.Info(config.GetHeartTimeout())
time.Sleep(time.Duration(config.GetHeartLoopTime()) * time.Second)
}
}
// StartThriftSever1 开启client 的thrift 服务 复用socket实现双向通信
func StartThriftSever1() {
serverTransport, err := thrift.NewTServerSocket(config.GetLoginServerPort())
if err != nil {
log.Fatalln("Error!", err)
}
//redis 控制
processorFactory := thriftservice.NewMyTProcessorFactory(ClientManager, MsgControl)
protocolFactory := thrift.NewTCompactProtocolFactory()
transportFactory := thrift.NewTFramedTransportFactory(thrift.NewTTransportFactory())
server := thrift.NewTSimpleServerFactory4(processorFactory, serverTransport, transportFactory, protocolFactory)
log.Println("thrift server in", config.GetLoginServerPort())
if err := server.Serve(); err != nil {
panic(err)
}
}
// StartThriftSever2 开启client 的thrift 多服务 ,不复用socket
func StartThriftSever2() {
serverTransport, err := thrift.NewTServerSocket(config.GetMultServerPort())
if err != nil {
log.Fatalln(err)
}
TMultiplexedProcessor := thrift.NewTMultiplexedProcessor()
deviceHandle := thriftservice.NewDeviceHandle()
deviceProcessor := service.NewDeviceServiceProcessor(deviceHandle)
dispatchHandle := thriftservice.NewDispatchService(nil, ClientManager, MsgControl)
dispatchProcessor := service.NewDispatchServiceProcessor(dispatchHandle)
scheduleHandle := thriftservice.NewScheduleHandle(MsgControl, ClientManager)
scheduleProcessor := native.NewScheduleServiceProcessor(scheduleHandle)
packageHandle := thriftservice.NewPackageHandle()
packageProcessor := service.NewPackageServiceProcessor(packageHandle)
TMultiplexedProcessor.RegisterProcessor("Identity", deviceProcessor)
TMultiplexedProcessor.RegisterProcessor("Mission", dispatchProcessor)
TMultiplexedProcessor.RegisterProcessor("Schedule", scheduleProcessor)
TMultiplexedProcessor.RegisterProcessor("Package", packageProcessor)
protocolFactory := thrift.NewTCompactProtocolFactory()
transportFactory := thrift.NewTFramedTransportFactory(thrift.NewTTransportFactory())
server := thrift.NewTSimpleServer4(TMultiplexedProcessor, serverTransport, transportFactory, protocolFactory)
log.Println("thrift server in", config.GetMultServerPort())
if err := server.Serve(); err != nil {
log.Panicln(err)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment