Commit 91aa9a4e by yunpeng.song

初始化

parent f9d3c79b
package main
import (
"os"
"time"
rotatelogs "github.com/lestrrat-go/file-rotatelogs"
"github.com/rifflock/lfshook"
log "github.com/sirupsen/logrus"
)
func InitLog() {
logName := string("ficus")
file, err := os.OpenFile(os.DevNull, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 666)
if err != nil {
log.Fatalln("fail to create ficus.log file!")
}
// 设置日志格式为json格式
log.SetFormatter(&log.JSONFormatter{})
// 设置将日志输出到标准输出(默认的输出为stderr,标准错误)
// 日志消息输出可以是任意的io.writer类型
log.SetOutput(file)
// 设置日志级别为warn以上
log.SetLevel(log.InfoLevel)
logWriter, err := rotatelogs.New(
logName+".%Y-%m-%d-%H-%M.log",
rotatelogs.WithLinkName(logName), // 生成软链,指向最新日志文件
rotatelogs.WithMaxAge(30*24*time.Hour), // 文件最大保存时间
//rotatelogs.WithRotationTime(24*time.Hour), // 日志切割时间间隔
rotatelogs.WithRotationTime(1*time.Minute),
)
writeMap := lfshook.WriterMap{
log.InfoLevel: logWriter,
log.ErrorLevel: logWriter,
log.FatalLevel: logWriter,
}
lfHook := lfshook.NewHook(writeMap, &log.JSONFormatter{})
log.AddHook(lfHook)
}
package main
import (
"ficus/service"
"ficus_clientserver/config"
"ficus_clientserver/dispatch"
"ficus_clientserver/model"
"ficus_clientserver/redis"
"ficus_clientserver/route"
//"ficus_clientserver/route"
"ficus_clientserver/thrifthandler"
"log"
"os"
"github.com/apache/thrift/lib/go/thrift"
log "github.com/sirupsen/logrus"
)
const (
......@@ -22,25 +25,6 @@ const (
var quitFlag chan int
func InitLog() {
file, err := os.OpenFile("test.log", os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 666)
if err != nil {
log.Fatalln("fail to create test.log file!")
}
// 设置日志格式为json格式
log.SetFormatter(&log.JSONFormatter{})
// 设置将日志输出到标准输出(默认的输出为stderr,标准错误)
// 日志消息输出可以是任意的io.writer类型
log.SetOutput(file)
// 设置日志级别为warn以上
log.SetLevel(log.InfoLevel)
log.Info("start")
}
func main() {
//读取配置文件
InitLog()
......@@ -50,6 +34,8 @@ func main() {
quitFlag = make(chan int)
go StartWork()
go Startwork0()
go Startwork1()
go StartPackServer()
<-quitFlag
log.Println("退出")
}
......@@ -59,10 +45,10 @@ func StartWork() {
//客户端消息
Ctrl_mapClientMsg := model.NewCtrlmapClientMsg_(model.GetDefaultClientMap())
//连接路由,并读取数据
Route.NewCtrl_RotuteRecv(Ctrl_mapClientMsg)
route.NewCtrl_RotuteRecv(Ctrl_mapClientMsg)
}
//开启client 的thrift 服务
//开启client 的thrift 服务 复用socket
func Startwork0() {
serverTransport, err := thrift.NewTServerSocket(NetworkAddr0)
if err != nil {
......@@ -70,7 +56,7 @@ func Startwork0() {
os.Exit(1)
}
//redis 控制
processorFactory := dispatch.NewMyTProcessorFactory(model.GetDefaultClientMap(), redis.NewCtrl_RedisClient_())
processorFactory := thrifthandler.NewMyTProcessorFactory(model.GetDefaultClientMap(), redis.NewCtrl_RedisClient_())
protocolFactory := thrift.NewTCompactProtocolFactory()
transportFactory := thrift.NewTFramedTransportFactory(thrift.NewTTransportFactory())
server := thrift.NewTSimpleServerFactory4(processorFactory, serverTransport, transportFactory, protocolFactory)
......@@ -80,3 +66,36 @@ func Startwork0() {
panic(err)
}
}
//开启client 的thrift 多服务 ,不复用socket
func Startwork1() {
serverTransport, err := thrift.NewTServerSocket(NetworkAddr1)
if err != nil {
log.Println("Error!", err)
os.Exit(1)
}
pkgHandle := &thrifthandler.PkgHandle{}
pckprocessor := service.NewPackageServiceProcessor(pkgHandle)
TMultiplexedProcessor := thrift.NewTMultiplexedProcessor()
dispatchHandle := thrifthandler.NewMyDispatchService(nil, model.GetDefaultClientMap(), redis.NewCtrl_RedisClient_())
dispatchprocessor := service.NewDispatchServiceProcessor(dispatchHandle)
TMultiplexedProcessor.RegisterProcessor("Package", pckprocessor)
TMultiplexedProcessor.RegisterProcessor("Mission", dispatchprocessor)
//redis 控制
protocolFactory := thrift.NewTCompactProtocolFactory()
transportFactory := thrift.NewTFramedTransportFactory(thrift.NewTTransportFactory())
server := thrift.NewTSimpleServer4(TMultiplexedProcessor, serverTransport, transportFactory, protocolFactory)
log.Println("thrift server in", NetworkAddr1)
if err := server.Serve(); err != nil {
panic(err)
}
}
func StartPackServer() {
route.InitGin()
r := route.InitRouter()
r.Run(":36790")
}
......@@ -16,7 +16,7 @@ type (
PushCount uint32 //"push count"
PushStatus config.MSG_STATUS //"发送状态"
Lastupdatetime int64 //laset dispatch time
Rpcid string
//Rpcid string
}
clientMsg struct {
......
package model
import (
"errors"
"ficus/pkg"
"sync"
)
type (
PkgManager struct {
mutex *sync.RWMutex
Pkgs map[string]*pkg.PackageSpec
}
)
var (
DefaultPkgManger *PkgManager = &PkgManager{mutex: new(sync.RWMutex), Pkgs: make(map[string]*pkg.PackageSpec)}
)
func (p *PkgManager) GetPkg(pack *pkg.Package) (pkgspec *pkg.PackageSpec, err error) {
pkgspec, err = p.GetCurrentPkg(pack.Name, pack.Version)
return
}
func (p *PkgManager) GetPkgs() (Pkgs map[string]*pkg.PackageSpec, err error) {
return p.Pkgs, nil
}
func (p *PkgManager) AddPkg(pkgspec *pkg.PackageSpec) error {
name := pkgspec.Pkg.Name
ver := pkgspec.Pkg.Version
if name == "" {
return errors.New("name字段不能为空")
}
if pkgspec.Md5 == "" {
return errors.New("MD5不能为空")
}
if pkgspec.URL == "" {
return errors.New("URL不能为空")
}
p.mutex.Lock()
p.Pkgs[name+":"+ver] = pkgspec
p.mutex.Unlock()
return nil
}
func (p *PkgManager) DeletePkg(pkgName string, ver string) error {
if pkgName == "" {
return errors.New("name字段不能为空")
}
p.mutex.Lock()
delete(p.Pkgs, pkgName+":"+ver)
p.mutex.Unlock()
return nil
}
func (p *PkgManager) GetCurrentPkg(pkgName string, ver string) (pkgspec *pkg.PackageSpec, err error) {
//pkgspec = pkg.NewPackageSpec()
p.mutex.RLock()
if v, ok := p.Pkgs[pkgName+":"+ver]; ok {
pkgspec = v
err = nil
} else {
pkgspec = nil
err = errors.New("未找到对应的包")
}
p.mutex.RUnlock()
return
}
......@@ -5,11 +5,9 @@ import (
"ficus/mission"
"ficus_clientserver/config"
"ficus_clientserver/model"
"ficus_clientserver/serrpcid"
"fmt"
"time"
//"github.com/garyburd/redigo/redis"
"sync"
"github.com/go-redis/redis"
......@@ -38,9 +36,6 @@ func (p *CtrlRedisClient_) UpdateMsgStatus(mis mission.Message) bool {
msg := p.GetMsg(mis.ID)
msg.Mis = mis
msg.PushStatus = config.MSG_SUCCESSED
msg.Rpcid = serrpcid.AppendRpcid(msg.Rpcid)
//kname:=fmt.Sprintf("Msg.%s",mis.ID)
p.setMsg(msg)
return true
}
......
package route
import (
"ficus/pkg"
"ficus_clientserver/model"
"net/http"
"github.com/gin-gonic/gin"
)
func addPkg(c *gin.Context) {
p := &pkg.PackageSpec{}
if err := c.ShouldBindJSON(&p); err != nil {
WrongRequest(c, err)
}
if err := model.DefaultPkgManger.AddPkg(p); err != nil {
WrongRequest(c, err)
} else {
c.JSON(http.StatusOK, gin.H{
"statusCode": 100,
"msg": "",
})
}
}
func deletePkg(c *gin.Context) {
p := &pkg.Package{}
if err := c.ShouldBindJSON(&p); err != nil {
WrongRequest(c, err)
}
if err := model.DefaultPkgManger.DeletePkg(p.Name, p.Version); err != nil {
WrongRequest(c, err)
} else {
c.JSON(http.StatusOK, gin.H{
"statusCode": 100,
"msg": "",
})
}
}
func getPkg(c *gin.Context) {
p := &pkg.Package{}
if err := c.ShouldBindJSON(&p); err != nil {
WrongRequest(c, err)
return
}
if v, err := model.DefaultPkgManger.GetPkg(p); err != nil {
WrongRequest(c, err)
} else {
c.JSON(http.StatusOK, gin.H{
"statusCode": 100,
"package": v,
})
}
}
func getPkgs(c *gin.Context) {
// p := &pkg.Package{}
// if err := c.ShouldBindJSON(&p); err != nil {
// WrongRequest(c, err)
// return
// }
if p, err := model.DefaultPkgManger.GetPkgs(); err != nil {
WrongRequest(c, err)
} else {
list := make([]*pkg.PackageSpec, 0, 0)
for _, v := range p {
list = append(list, v)
}
c.JSON(http.StatusOK, gin.H{
"statusCode": 100,
"packages": list,
})
}
}
package route
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"time"
"github.com/gin-gonic/gin"
log "github.com/sirupsen/logrus"
)
func WrongRequest(c *gin.Context, err error) {
fmt.Fprintln(gin.DefaultWriter, "错误", err)
c.JSON(http.StatusOK, gin.H{
"statusCOde": "101",
"msg": err.Error(),
})
}
func NoResponse(c *gin.Context) {
//返回404状态码
c.JSON(http.StatusNotFound, gin.H{
"status": 404,
"error": "404, page not exists!",
})
}
func InitGin() {
gin.SetMode(gin.ReleaseMode)
}
func InitLog() {
gin.DisableConsoleColor()
f, _ := os.Create("gin.log")
gin.DefaultWriter = io.MultiWriter(f)
//gin.DefaultWriter = io.MultiWriter(f, os.Stdout)
}
func LogMiddle(c *gin.Context) {
// 开始时间
start := time.Now()
body, _ := ioutil.ReadAll(c.Request.Body)
log.Info(string(body))
c.Request.Body = ioutil.NopCloser(bytes.NewBuffer(body)) // 关键点
// 处理请求
c.Next()
// 结束时间
end := time.Now()
//执行时间
latency := end.Sub(start)
path := c.Request.URL.Path
clientIP := c.ClientIP()
method := c.Request.Method
statusCode := c.Writer.Status()
log.Infof("| %3d | %13v | %15s | %s %s |",
statusCode,
latency,
clientIP,
method, path,
)
}
func InitRouter() *gin.Engine {
r := gin.New()
r.Use(gin.Recovery())
//r := gin.Default()
r.Use(LogMiddle)
r.NoRoute(NoResponse)
r.POST("/addpkg", addPkg)
r.POST("/deletepkg", deletePkg)
r.POST("/getpkg", getPkg)
r.GET("/getpkgs", getPkgs)
return r
}
package Route
package route
import (
"ficus_clientserver/config"
......@@ -9,6 +9,8 @@ import (
"sync"
"time"
"unsafe"
log "github.com/sirupsen/logrus"
)
type SliceMock struct {
......@@ -50,8 +52,8 @@ func (p *Ctrl_RotuteRecv) StartCommRetute() {
func (p *Ctrl_RotuteRecv) ConnRotuteAndRead() {
conn, err := net.Dial("tcp", config.ROUTE_HOST_IP_PORT)
if err != nil {
fmt.Println("dial failed:", err)
fmt.Println("wait reconnection …… …… ……")
log.Error("dial failed:", err)
log.Info("wait reconnection …… …… ……")
return
}
......
package Route
package route
import (
json2 "encoding/json"
......
package schedule
import (
"context"
"ficus/mission"
"ficus_clientserver/config"
"ficus_clientserver/redis"
"fmt"
)
type MySchedule struct {
Ctrl_RedisMsg *redis.CtrlRedisClient_
}
func (p *MySchedule) Schedule(ctx context.Context, request *mission.Message) (r bool, err error) {
return true, nil
}
// Parameters:
// - Response
func (p *MySchedule) Feedback(ctx context.Context, response *mission.Message) (r bool, err error) {
//response.ID
//Model.Ctrl_mapClientMsg.Feedback(response.Agent,response.ID,true);
fmt.Println("recv feedback……………id:", response.Agent, response.ID)
//直接更新一个数据到 redis
{
config.Eyelog.WriteCallAndLog(string(response.ID), response.Agent, "客户端发送消息 ... - 消息ID:", response.ID, "客户端id:", response.Agent)
}
{
config.Eyelog.WriteCallAndLog(string(response.ID), "", "收到客户端消息 ... - 消息ID:", response.ID, "客户端id:", response.Agent)
}
p.Ctrl_RedisMsg.UpdateMsgStatus(*response)
return true, nil
}
package thrifthandler
package dispatch
package thrifthandler
import (
"context"
......@@ -36,8 +36,8 @@ func (p *MyDispatchService) Heartbeat(ctx context.Context, whom *native.Agent) (
return true, nil
}
//TODO yunpeng.song@freemud.cn 20190729
func (p *MyDispatchService) Feedback(ctx context.Context, response *mission.Message) (r bool, err error) {
log.WithFields(log.Fields{"data": string(response.Data)}).Info(response)
p.ctrl_RedisClient.UpdateMsgStatus(*response)
return true, nil
}
......
package thrifthandler
package thrifthandler
import (
"context"
"ficus/pkg"
"ficus/service"
"net"
"os"
"ficus_clientserver/model"
log "github.com/sirupsen/logrus"
"github.com/apache/thrift/lib/go/thrift"
)
type (
PkgHandle struct {
}
)
// func (p *PkgHandle) GetSpec(ctx context.Context, pkgName string, ver string) (r *pkg.PackageSpec, err error) {
// log.WithFields(log.Fields{"pkgName": pkgName, "ver": ver}).Info("GetSpec")
// if strings.Compare(pkgName, "FRAMEWORK") == 0 {
// r = pkg.NewPackageSpec()
// r.Pkg = &pkg.Package{Name: pkgName, Version: ver}
// r.URL = string("https://pos-1251026273.cos.ap-beijing.myqcloud.com/framework.zip")
// r.Md5 = "709d3c914f1a9a27805a5ecfcafb7aae"
// } else {
// r = pkg.NewPackageSpec()
// r.Pkg = &pkg.Package{Name: pkgName, Version: ver}
// r.URL = string("https://pos-1251026273.cos.ap-beijing.myqcloud.com/config.zip")
// r.Md5 = "3e240a04e52c518418ef6ab21879470f"
// }
// return r, nil
// }
func PkgServer() {
serverTransport, err := thrift.NewTServerSocket(net.JoinHostPort("127.0.0.1", "9091"))
if err != nil {
log.Println("Error!", err)
os.Exit(1)
}
//redis 控制
//NewTSimpleServer2(processor TProcessor, serverTransport TServerTransport) *TSimpleServer
//processorFactory := dispatch.NewMyTProcessorFactory(model.GetDefaultClientMap(), redis.NewCtrl_RedisClient_())
handle := &PkgHandle{}
processor := service.NewPackageServiceProcessor(handle)
protocolFactory := thrift.NewTCompactProtocolFactory()
transportFactory := thrift.NewTFramedTransportFactory(thrift.NewTTransportFactory())
server := thrift.NewTSimpleServer4(processor, serverTransport, transportFactory, protocolFactory)
log.Println("thrift server in", "9091")
if err := server.Serve(); err != nil {
panic(err)
}
}
func (p *PkgHandle) GetSpec(ctx context.Context, pkgName string, ver string) (r *pkg.PackageSpec, err error) {
log.WithFields(log.Fields{"pkgName": pkgName, "ver": ver}).Info("GetSpec")
return model.DefaultPkgManger.GetCurrentPkg(pkgName,ver)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment