Commit 7989aacd by yunpeng.song

初始化

parent 41cdce5e
package main
import (
"os"
"time"
rotatelogs "github.com/lestrrat-go/file-rotatelogs"
"github.com/rifflock/lfshook"
log "github.com/sirupsen/logrus"
)
func InitLog() {
logName := string("ficus")
file, err := os.OpenFile(os.DevNull, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 666)
if err != nil {
log.Fatalln("fail to create ficus.log file!")
}
// 设置日志格式为json格式
log.SetFormatter(&log.JSONFormatter{})
// 设置将日志输出到标准输出(默认的输出为stderr,标准错误)
// 日志消息输出可以是任意的io.writer类型
log.SetOutput(file)
// 设置日志级别为warn以上
log.SetLevel(log.InfoLevel)
logWriter, err := rotatelogs.New(
logName+".%Y-%m-%d-%H-%M.log",
rotatelogs.WithLinkName(logName), // 生成软链,指向最新日志文件
rotatelogs.WithMaxAge(30*24*time.Hour), // 文件最大保存时间
rotatelogs.WithRotationTime(24*time.Hour), // 日志切割时间间隔
)
writeMap := lfshook.WriterMap{
log.InfoLevel: logWriter,
log.ErrorLevel: logWriter,
log.FatalLevel: logWriter,
}
lfHook := lfshook.NewHook(writeMap, &log.JSONFormatter{})
log.AddHook(lfHook)
}
package mqcontrol
import (
"ficus_clientmanager/config"
"fmt"
"github.com/streadway/amqp"
)
// 定义RabbitMQ对象
type RabbitMQ struct {
connection *amqp.Connection // 连接
channel *amqp.Channel // 管道
exchange *config.QueueExchangeCfg // 交换机对象
cfg *config.MQCfg // 交换机连接配置
}
var (
conn *amqp.Connection = nil // 连接
channel *amqp.Channel = nil // 管道
)
// 定义队列交换机对象
// 链接rabbitMQ
func (r *RabbitMQ) MqConnect() (err error) {
cfg := r.cfg
RabbitUrl := fmt.Sprintf("amqp://%s:%s@%s:%s/", cfg.Username, cfg.Userpwd, cfg.Hostip, cfg.Hostport)
fmt.Printf("MQ链接 %s \r\n", RabbitUrl)
conn, err = amqp.Dial(RabbitUrl)
if err != nil {
fmt.Printf("MQ打开链接失败:%s \r\n", err)
return
}
r.connection = conn
fmt.Printf("MQ打开链接成功\r\n")
channel, err = r.connection.Channel()
if err != nil {
fmt.Printf("MQ打开管道失败:%s \r\n", err)
return
}
r.channel = channel
fmt.Printf("MQ打开管道成功\r\n")
return nil
}
// 关闭RabbitMQ连接
func (r *RabbitMQ) MqClose() error {
// 先关闭管道,再关闭链接
err := r.channel.Close()
if err != nil {
fmt.Printf("MQ管道关闭失败:%s \r\n", err)
return err
}
err = r.connection.Close()
if err != nil {
fmt.Printf("MQ链接关闭失败:%s \r\n", err)
return err
}
return err
}
// 创建一个新的操作对象
func New(q *config.QueueExchangeCfg, c *config.MQCfg) *RabbitMQ {
return &RabbitMQ{
exchange: q,
cfg: c,
}
}
package mqcontrol
import "fmt"
// 监听接收者接收任务
func (r *RabbitMQ) Recv() {
// 验证链接是否正常,否则重新链接
if r.channel == nil {
r.MqConnect()
}
exchange := r.exchange
// 用于检查队列是否存在,已经存在不需要重复声明
_, err := r.channel.QueueDeclarePassive(exchange.QuName, exchange.Durable, false, false, true, nil)
if err != nil {
// 队列不存在,声明队列
// name:队列名称;durable:是否持久化,队列存盘,true服务重启后信息不会丢失,影响性能;autoDelete:是否自动删除;noWait:是否非阻塞,
// true为是,不等待RMQ返回信息;args:参数,传nil即可;exclusive:是否设置排他
_, err = r.channel.QueueDeclare(exchange.QuName, exchange.Durable, false, false, true, nil)
if err != nil {
fmt.Printf("MQ注册队列失败:%s \n", err)
return
}
}
// 队列绑定
err = r.channel.QueueBind(exchange.QuName, exchange.RtKey, exchange.ExName, true, nil)
if err != nil {
fmt.Printf("MQ绑定队列失败:%s \n", err)
return
}
// 获取消费通道,确保rabbitMQ一个一个发送消息
err = r.channel.Qos(1, 0, true)
msgList, err := r.channel.Consume(exchange.QuName, "", false, false, false, false, nil)
if err != nil {
fmt.Printf("获取消费通道异常:%s \n", err)
return
}
for msg := range msgList {
// 处理数据
//Todo yunpeng.song@freemud 2019.0724
err := func() error {
fmt.Println(string(msg.Body))
return err
}()
if err != nil {
err = msg.Ack(true)
if err != nil {
fmt.Printf("确认消息未完成异常:%s \n", err)
continue
}
} else {
// 确认消息,必须为false
err = msg.Ack(false)
if err != nil {
fmt.Printf("确认消息完成异常:%s \n", err)
continue
}
continue
}
}
}
......@@ -4,7 +4,6 @@ import (
"bytes"
"encoding/gob"
"errors"
"fmt"
)
var _RPCMAP map[string](map[string]func([]byte)) = make(map[string](map[string]func([]byte)))
......@@ -63,7 +62,6 @@ func SendRpc(pusherName string, key string, funcName string, params interface{})
Params: params,
}
if err = enc.Encode(&data); err != nil {
fmt.Println("asdfasd")
return err
}
......
package mqcontrol
import (
"fmt"
"github.com/streadway/amqp"
)
//发送任务
func (r *RabbitMQ) SendMsg(msg []byte) error {
// 验证链接是否正常,否则重新链接
if r.connection.IsClosed() || r.channel == nil {
fmt.Printf("连接断开,重新连接")
r.MqConnect()
}
exchange := r.exchange
// 用于检查队列是否存在,已经存在不需要重复声明
_, err := r.channel.QueueDeclarePassive(exchange.QuName, exchange.Durable, false, false, true, nil)
if err != nil {
fmt.Printf("MQ队列%s不存在 \r\n", exchange.QuName)
// 队列不存在,声明队列
// name:队列名称;durable:是否持久化,队列存盘,true服务重启后信息不会丢失,影响性能;autoDelete:是否自动删除;noWait:是否非阻塞,
// true为是,不等待RMQ返回信息;args:参数,传nil即可;exclusive:是否设置排他
_, err := r.channel.QueueDeclare(exchange.QuName, exchange.Durable, false, false, true, nil)
if err != nil {
fmt.Printf("MQ注册队列失败:%s \r\n", err)
return err
}
}
fmt.Printf("MQ注册队列%s成功 \r\n", exchange.QuName)
// 队列绑定
err = r.channel.QueueBind(exchange.QuName, exchange.RtKey, exchange.ExName, true, nil)
if err != nil {
fmt.Printf("MQ绑定队列失败:%s \r\n", err)
return err
}
fmt.Printf("MQ绑定队列%s成功,RtKey:%s,ExName:%s\r\n", exchange.QuName, exchange.RtKey, exchange.ExName)
// 用于检查交换机是否存在,已经存在不需要重复声明
err = r.channel.ExchangeDeclarePassive(exchange.ExName, exchange.ExType, exchange.Durable, false, false, true, nil)
if err != nil {
fmt.Printf("MQ交换机%s不存在 \r\n", exchange.ExName)
// 注册交换机
// name:交换机名称,kind:交换机类型,durable:是否持久化,队列存盘,true服务重启后信息不会丢失,影响性能;autoDelete:是否自动删除;
// noWait:是否非阻塞, true为是,不等待RMQ返回信息;args:参数,传nil即可; internal:是否为内部
err = r.channel.ExchangeDeclare(exchange.ExName, exchange.ExType, exchange.Durable, false, false, true, nil)
if err != nil {
fmt.Printf("MQ注册交换机失败:%s \r\n", err)
return err
}
}
fmt.Printf("MQ注册交换机%s成功\r\n", exchange.ExName)
fmt.Printf("MQ打开管道成功 %t \r\n", r.connection.IsClosed())
// 发送任务消息
err = r.channel.Publish(exchange.ExName, exchange.RtKey, false, false, amqp.Publishing{
ContentType: "text/plain",
Body: msg,
})
if err != nil {
fmt.Printf("MQ任务发送失败:%s \r\n", err)
return err
}
fmt.Printf("MQ任务发送成功\r\n")
return nil
}
package router
import (
"encoding/base64"
"encoding/json"
"ficus/mission"
"ficus_clientmanager/model"
"ficus_clientmanager/mqcontrol"
"fmt"
"github.com/gin-gonic/gin"
)
// type Messages struct {
// Msgs []mission.Message `json:"messages"`
// }
type Messages struct {
Msgs []mission.Message `json:"messages"`
}
func UpdateTest(c *gin.Context) {
......@@ -23,15 +21,15 @@ func UpdateTest(c *gin.Context) {
// WrongRequest(c, err)
// }
// c.Request.Body = ioutil.NopCloser(bytes.NewBuffer(body))
agents := model.MsgDatas{}
agents := Messages{}
if err := c.ShouldBindJSON(&agents); err != nil {
WrongRequest(c, err)
return
}
for _, v := range agents.Msgs {
encodeString := base64.StdEncoding.EncodeToString([]byte(v.Data))
v.Data = encodeString
// encodeString := base64.StdEncoding.EncodeToString([]byte(v.Data))
// v.Data = encodeString
if data, err := json.Marshal(v); err != nil {
WrongRequest(c, err)
return
......
package router
import (
"fmt"
"io"
"bytes"
"io/ioutil"
"net/http"
"os"
"time"
"github.com/gin-gonic/gin"
log "github.com/sirupsen/logrus"
)
func WrongRequest(c *gin.Context, err error) {
fmt.Fprintln(gin.DefaultWriter, "错误", err)
log.Println("错误", err)
c.JSON(http.StatusOK, gin.H{
"statusCOde": "101",
"msg": err.Error(),
......@@ -18,7 +19,7 @@ func WrongRequest(c *gin.Context, err error) {
}
func NormalRequest(c *gin.Context) {
fmt.Fprintln(gin.DefaultWriter, "消息投递成功")
log.Println("消息投递成功")
c.JSON(http.StatusOK, gin.H{
"statusCOde": "100",
"msg": "消息投递成功",
......@@ -33,30 +34,46 @@ func NoResponse(c *gin.Context) {
})
}
func InitGin() {
gin.SetMode(gin.TestMode)
gin.SetMode(gin.ReleaseMode)
}
func LogMiddle(c *gin.Context) {
// 开始时间
start := time.Now()
body, _ := ioutil.ReadAll(c.Request.Body)
log.Info(string(body))
c.Request.Body = ioutil.NopCloser(bytes.NewBuffer(body)) // 关键点
// 处理请求
c.Next()
// 结束时间
end := time.Now()
//执行时间
latency := end.Sub(start)
path := c.Request.URL.Path
clientIP := c.ClientIP()
method := c.Request.Method
statusCode := c.Writer.Status()
log.Infof("| %3d | %13v | %15s | %s %s |",
statusCode,
latency,
clientIP,
method, path,
)
}
func InitLog() {
gin.DisableConsoleColor()
f, _ := os.Create("gin.log")
gin.DefaultWriter = io.MultiWriter(f, os.Stdout)
// gin.DisableConsoleColor()
// f, _ := os.Create("gin.log")
// gin.DefaultWriter = io.MultiWriter(f, os.Stdout)
}
func InitRouter() *gin.Engine {
r := gin.Default()
r := gin.New()
r.Use(gin.Recovery())
//r := gin.Default()
r.Use(LogMiddle)
r.NoRoute(NoResponse)
// currentPath, err := tool.GetCurrentDirectory()
// if err != nil {
// fmt.Println(err)
// }
// imgFilePath := fmt.Sprintf("%sautohtml\\*", currentPath)
//r.StaticFS("/", http.Dir("autohtml"))
//r.LoadHTMLGlob("autohtml/**/**/**/*")
// r.LoadHTMLFiles("autohtml/index.html")
// r.GET("/index", func(c *gin.Context) {
// c.HTML(http.StatusOK, "index.html", nil)
// })
r.POST("/updatetest", UpdateTest)
openMq()
//go MQRecvier()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment