Commit 41cdce5e by yunpeng.song

初始化

parent cf2bde48
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"github.com/Unknwon/goconfig" "github.com/Unknwon/goconfig"
"github.com/gin-gonic/gin"
) )
var ( var (
...@@ -19,13 +20,14 @@ func LoadConfig() { ...@@ -19,13 +20,14 @@ func LoadConfig() {
if err != nil { if err != nil {
fmt.Printf("错误getCurrentDirectory") fmt.Printf("错误getCurrentDirectory")
} }
fmt.Printf(path) fmt.Fprintln(gin.DefaultWriter, path)
if err = loaddefalutCfgFile(path + "config.ini"); err != nil { if err = loaddefalutCfgFile(path + "config.ini"); err != nil {
fmt.Fprintln(gin.DefaultWriter, "loaddefalutCfgFile failed")
panic("loaddefalutCfgFile failed") panic("loaddefalutCfgFile failed")
} }
mqCfgPath, err := defalutCfgFile.GetValue("mq", "path") mqCfgPath, err := defalutCfgFile.GetValue("mq", "path")
if err != nil { if err != nil {
fmt.Printf("错误init") fmt.Fprintln(gin.DefaultWriter, "错误init")
} }
loadMqCfgFile(path + mqCfgPath) loadMqCfgFile(path + mqCfgPath)
} }
......
package main package main
import ( import (
"ficus_clientmanager/config"
"ficus_clientmanager/router" "ficus_clientmanager/router"
"fmt" "fmt"
...@@ -9,10 +8,10 @@ import ( ...@@ -9,10 +8,10 @@ import (
) )
func main() { func main() {
config.LoadConfig()
router.InitGin() router.InitGin()
router.InitLog() router.InitLog()
defer router.CloseRouter()
fmt.Fprintln(gin.DefaultWriter, "start") fmt.Fprintln(gin.DefaultWriter, "start")
r := router.InitRouter() r := router.InitRouter()
r.Run(":20991") r.Run(":36789")
} }
package model
import (
"ficus/mission"
"ficus/proto"
)
type (
MsgData struct {
ID string `thrift:"id,1" db:"id" json:"id"`
State *mission.MissionStatus `thrift:"state,2" db:"state" json:"state"`
Priority int32 `thrift:"priority,3" db:"priority" json:"priority"`
Tries int32 `thrift:"tries,4" db:"tries" json:"tries"`
TriesMax int32 `thrift:"triesMax,5" db:"triesMax" json:"triesMax"`
Agent string `thrift:"agent,6" db:"agent" json:"agent"`
Proto proto.Type `thrift:"proto,7" db:"proto" json:"proto"`
Data string `thrift:"data,8" db:"data" json:"data"`
}
MsgDatas struct {
Msgs []MsgData `json:"messages"`
}
)
{
"注释-Connects":[
{"Name":"连接名称",
"Addr":"连接地址:amqp://username:password@IP:Port/vhost"}
],
"注释-Channels":[
{"Name":"信道名称",
"Connect":"信道所属连接"}
],
"注释-Exchanges":[
{"Name":"交换机名称",
"Channel":"交换机所属信道",
"Type":"交换机类型['direct':'路由','topic':'通配符','fanout':'订阅','headers ':'头部']",
"Durable":"是否持久化(bool)",
"AutoDeleted":"是否自动删除(bool)",
"Internal":"只允许MQ内部使用(bool)",
"NoWait":"非阻塞模式(bool)",
"EBind":[
{"Destination":"目标交换机",
"Key":"路由键",
"NoWait":"非阻塞模式(bool)"}
]}
],
"注释-Queue":[
{"Name":"队列名称",
"Durable":"是否持久化(bool)",
"AutoDelete":"是否自动删除(bool)",
"Exclusive":"排他性,只允许创建队列的用户访问(bool)",
"NoWait":"非阻塞模式(bool)",
"Bind":[
{"Key":"路由键", "ExchangeName":"交换机", "NoWait":"非阻塞模式(bool)"}
]}
],
"注释-Pusher-发送者":[
{"Name":"名称",
"Channel":"所属信道",
"Exchange":"交换机",
"Key":"路邮键",
"Mandtory":"(bool)",
"Immediate":"(bool)",
"ContentType":"消息类型",
"DeliveryMode":"是否持久化,2表示持久化,0表示非持久化(uint8)"}
],
"注释-Poper--接收者":[
{"Name":"名称",
"Channel":"所属信道",
"QName":"队列名称",
"Consumer":"消费者",
"AutoACK":"自动确认(bool)",
"Exclusive":"排他性(bool)",
"NoLocal":"非本地(bool)",
"NoWait":"非阻塞模式(bool)"
}
],
"Connects":[
{"Name":"myConnect",
"Addr":"amqp://guest:guest@127.0.0.1:5672/"}
],
"Channels":[
{"Name":"myChannel",
"Connect":"myConnect"}
],
"Exchanges":[
{"Name":"myExchange",
"Channel":"myChannel",
"Type":"direct",
"Durable":false,
"AutoDeleted":false,
"Internal":false,
"NoWait":false,
"Args":{
"alternate-exchange":"errExchange"
}
}
],
"Queue":[
{"Name":"myQueue",
"Channel":"myChannel",
"Durable":false,
"AutoDelete":false,
"Exclusive":false,
"NoWait":false,
"Bind":[
{"Key":"myQueue", "ExchangeName":"myExchange", "NoWait":false}
],
"Args":{
"":""
}
}
],
"Pusher":[
{"Name":"myPusher",
"Channel":"myChannel",
"Exchange":"myExchange",
"Key":"myQueue",
"Mandtory":false,
"Immediate":false,
"ContentType":"text/plain",
"DeliveryMode":0}
],
"Poper":[
{"Name":"myPoper",
"Channel":"myChannel",
"QName":"myQueue",
"Consumer":"",
"AutoACK":true,
"Exclusive":false,
"NoLocal":false,
"NoWait":false
}
]
}
...@@ -6,8 +6,8 @@ userpwd=guest ...@@ -6,8 +6,8 @@ userpwd=guest
[exchange] [exchange]
durable = true durable = false
quName=test4 quName=test1
rtKey=ficus.key rtKey=test.key1
exName=amq.direct exName=amq.direct
exType=direct exType=direct
\ No newline at end of file
{
"注释-Connects":[
{"Name":"连接名称",
"Addr":"连接地址:amqp://username:password@IP:Port/vhost"}
],
"注释-Channels":[
{"Name":"信道名称",
"Connect":"信道所属连接"}
],
"注释-Exchanges":[
{"Name":"交换机名称",
"Channel":"交换机所属信道",
"Type":"交换机类型['direct':'路由','topic':'通配符','fanout':'订阅','headers ':'头部']",
"Durable":"是否持久化(bool)",
"AutoDeleted":"是否自动删除(bool)",
"Internal":"只允许MQ内部使用(bool)",
"NoWait":"非阻塞模式(bool)",
"EBind":[
{"Destination":"目标交换机",
"Key":"路由键",
"NoWait":"非阻塞模式(bool)"}
]}
],
"注释-Queue":[
{"Name":"队列名称",
"Durable":"是否持久化(bool)",
"AutoDelete":"是否自动删除(bool)",
"Exclusive":"排他性,只允许创建队列的用户访问(bool)",
"NoWait":"非阻塞模式(bool)",
"Bind":[
{"Key":"路由键", "ExchangeName":"交换机", "NoWait":"非阻塞模式(bool)"}
]}
],
"注释-Pusher-发送者":[
{"Name":"名称",
"Channel":"所属信道",
"Exchange":"交换机",
"Key":"路邮键",
"Mandtory":"(bool)",
"Immediate":"(bool)",
"ContentType":"消息类型",
"DeliveryMode":"是否持久化,2表示持久化,0表示非持久化(uint8)"}
],
"注释-Poper--接收者":[
{"Name":"名称",
"Channel":"所属信道",
"QName":"队列名称",
"Consumer":"消费者",
"AutoACK":"自动确认(bool)",
"Exclusive":"排他性(bool)",
"NoLocal":"非本地(bool)",
"NoWait":"非阻塞模式(bool)"
}
],
"Connects":[
{"Name":"myConnect",
"Addr":"amqp://username:password@IP:Port/"}
],
"Channels":[
{"Name":"myChannel",
"Connect":"myConnect"}
],
"Exchanges":[
{"Name":"myExchange",
"Channel":"myChannel",
"Type":"direct",
"Durable":false,
"AutoDeleted":false,
"Internal":false,
"NoWait":false,
"Args":{
"alternate-exchange":"errExchange"
}
}
],
"Queue":[
{"Name":"myQueue",
"Channel":"myChannel",
"Durable":false,
"AutoDelete":false,
"Exclusive":false,
"NoWait":false,
"Bind":[
{"Key":"myQueue", "ExchangeName":"myExchange", "NoWait":false}
],
"Args":{
"":""
}
}
],
"Pusher":[
{"Name":"myPusher",
"Channel":"myChannel",
"Exchange":"myExchange",
"Key":"myQueue",
"Mandtory":false,
"Immediate":false,
"ContentType":"text/plain",
"DeliveryMode":0}
],
"Poper":[
{"Name":"myPoper",
"Channel":"myChannel",
"QName":"myQueue",
"Consumer":"",
"AutoACK":true,
"Exclusive":false,
"NoLocal":false,
"NoWait":false
}
]
}
...@@ -15,23 +15,31 @@ type RabbitMQ struct { ...@@ -15,23 +15,31 @@ type RabbitMQ struct {
cfg *config.MQCfg // 交换机连接配置 cfg *config.MQCfg // 交换机连接配置
} }
var (
conn *amqp.Connection = nil // 连接
channel *amqp.Channel = nil // 管道
)
// 定义队列交换机对象 // 定义队列交换机对象
// 链接rabbitMQ // 链接rabbitMQ
func (r *RabbitMQ) MqConnect() (err error) { func (r *RabbitMQ) MqConnect() (err error) {
cfg := r.cfg cfg := r.cfg
RabbitUrl := fmt.Sprintf("amqp://%s:%s@%s:%s/", cfg.Username, cfg.Userpwd, cfg.Hostip, cfg.Hostport) RabbitUrl := fmt.Sprintf("amqp://%s:%s@%s:%s/", cfg.Username, cfg.Userpwd, cfg.Hostip, cfg.Hostport)
r.connection, err = amqp.Dial(RabbitUrl) fmt.Printf("MQ链接 %s \r\n", RabbitUrl)
conn, err = amqp.Dial(RabbitUrl)
if err != nil { if err != nil {
fmt.Printf("MQ打开链接失败:%s \r\n", err) fmt.Printf("MQ打开链接失败:%s \r\n", err)
return return
} }
r.connection = conn
fmt.Printf("MQ打开链接成功\r\n") fmt.Printf("MQ打开链接成功\r\n")
r.channel, err = r.connection.Channel() channel, err = r.connection.Channel()
if err != nil { if err != nil {
fmt.Printf("MQ打开管道失败:%s \r\n", err) fmt.Printf("MQ打开管道失败:%s \r\n", err)
return return
} }
r.channel = channel
fmt.Printf("MQ打开管道成功\r\n") fmt.Printf("MQ打开管道成功\r\n")
return nil return nil
} }
......
package mqcontrol
import (
"bytes"
"encoding/gob"
"errors"
"fmt"
)
var _RPCMAP map[string](map[string]func([]byte)) = make(map[string](map[string]func([]byte)))
//事件结构
type Event struct {
FuncName string
Params interface{}
}
//添加回调函数
func AddRpc(poper string, name string, callback func([]byte)) (err error) {
if _, ok := _RPCMAP[poper]; !ok {
_RPCMAP[poper] = make(map[string]func([]byte))
}
if _, ok := _RPCMAP[poper][name]; ok {
return errors.New("回调函数已存在")
} else {
_RPCMAP[poper][name] = callback
}
return nil
}
//解码并回调函数
func callbackRpc(d MSG) {
var buf bytes.Buffer
buf.Write(d.Body)
dec := gob.NewDecoder(&buf)
var data Event
dec.Decode(&data)
if _, ok := _RPCMAP[d.Poper]; ok {
if _, ok := _RPCMAP[d.Poper][data.FuncName]; ok {
_RPCMAP[d.Poper][data.FuncName](data.Params.([]byte))
}
}
//注,else情况还没处理
d.Ack(false)
}
//收到Rpc调用
func RecvRpc(poperName string) (err error) {
if err = Pop(poperName, callbackRpc); err != nil {
return err
}
return nil
}
//发送RPC调用
func SendRpc(pusherName string, key string, funcName string, params interface{}) (err error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
var data = Event{
FuncName: funcName,
Params: params,
}
if err = enc.Encode(&data); err != nil {
fmt.Println("asdfasd")
return err
}
if err := Push(pusherName, key, buf.Bytes()); err != nil {
return err
}
return nil
}
...@@ -50,6 +50,7 @@ func (r *RabbitMQ) SendMsg(msg []byte) error { ...@@ -50,6 +50,7 @@ func (r *RabbitMQ) SendMsg(msg []byte) error {
} }
} }
fmt.Printf("MQ注册交换机%s成功\r\n", exchange.ExName) fmt.Printf("MQ注册交换机%s成功\r\n", exchange.ExName)
fmt.Printf("MQ打开管道成功 %t \r\n", r.connection.IsClosed())
// 发送任务消息 // 发送任务消息
err = r.channel.Publish(exchange.ExName, exchange.RtKey, false, false, amqp.Publishing{ err = r.channel.Publish(exchange.ExName, exchange.RtKey, false, false, amqp.Publishing{
ContentType: "text/plain", ContentType: "text/plain",
......
package router package router
import ( import (
"bytes" "encoding/base64"
ficus "ficus/proto" "encoding/json"
"ficus_clientmanager/config" "ficus/mission"
"ficus_clientmanager/model"
"ficus_clientmanager/mqcontrol" "ficus_clientmanager/mqcontrol"
"fmt" "fmt"
"io/ioutil"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
) )
type Agents struct { // type Messages struct {
Agents []ficus.Agent `json:"agents"` // Msgs []mission.Message `json:"messages"`
} // }
func UpdateTest(c *gin.Context) { func UpdateTest(c *gin.Context) {
body, err := ioutil.ReadAll(c.Request.Body) // body, err := ioutil.ReadAll(c.Request.Body)
fmt.Println(string(body)) // fmt.Println(string(body))
if err != nil { // if err != nil {
WrongRequest(c, err) // WrongRequest(c, err)
} // }
c.Request.Body = ioutil.NopCloser(bytes.NewBuffer(body)) // c.Request.Body = ioutil.NopCloser(bytes.NewBuffer(body))
agents := Agents{} agents := model.MsgDatas{}
if err := c.ShouldBindJSON(&agents); err != nil { if err := c.ShouldBindJSON(&agents); err != nil {
WrongRequest(c, err) WrongRequest(c, err)
return return
} }
mqCfg := config.GetMqConfig() for _, v := range agents.Msgs {
exchange := config.GetQueueExchangeCfg() encodeString := base64.StdEncoding.EncodeToString([]byte(v.Data))
rabbitMQ := mqcontrol.New(exchange, mqCfg) v.Data = encodeString
if err := rabbitMQ.MqConnect(); err != nil { if data, err := json.Marshal(v); err != nil {
WrongRequest(c, err) WrongRequest(c, err)
return return
} else {
fmt.Println(string(data))
if err := mqcontrol.Push("myPusher", "myQueue", data); err != nil {
WrongRequest(c, err)
return
}
}
} }
if err := rabbitMQ.SendMsg(body); err != nil {
WrongRequest(c, err) NormalRequest(c)
return }
func callback(d mqcontrol.MSG) {
fmt.Fprintln(gin.DefaultWriter, string(d.Body))
msg2 := mission.Message{}
if err := json.Unmarshal(d.Body, &msg2); err != nil {
fmt.Println("JSON ERR:", err)
} else {
fmt.Println(string(msg2.Data))
fmt.Fprintln(gin.DefaultWriter, msg2.Agent)
} }
if err := rabbitMQ.MqClose(); err != nil {
WrongRequest(c, err) }
return
func openMq() (err error) {
//初始化mq
fmt.Fprintln(gin.DefaultWriter, "初始化mq")
if err = mqcontrol.Init("./mq.json"); err != nil {
fmt.Println(err)
} }
return
}
NormalRequest(c) func closeMq() (err error) {
if err = mqcontrol.Fini(); err != nil {
fmt.Println(err)
}
return
} }
func MQRecvier() { func mQRecvier() {
mqCfg := config.GetMqConfig() if err := mqcontrol.Pop("myPoper", callback); err != nil {
exchange := config.GetQueueExchangeCfg() fmt.Println(err)
rabbitMQ := mqcontrol.New(exchange, mqCfg)
if err := rabbitMQ.MqConnect(); err != nil {
panic("cuowu")
} }
defer rabbitMQ.MqClose()
rabbitMQ.Recv()
} }
...@@ -44,6 +44,7 @@ func InitLog() { ...@@ -44,6 +44,7 @@ func InitLog() {
func InitRouter() *gin.Engine { func InitRouter() *gin.Engine {
r := gin.Default() r := gin.Default()
r.NoRoute(NoResponse) r.NoRoute(NoResponse)
// currentPath, err := tool.GetCurrentDirectory() // currentPath, err := tool.GetCurrentDirectory()
// if err != nil { // if err != nil {
...@@ -52,14 +53,16 @@ func InitRouter() *gin.Engine { ...@@ -52,14 +53,16 @@ func InitRouter() *gin.Engine {
// imgFilePath := fmt.Sprintf("%sautohtml\\*", currentPath) // imgFilePath := fmt.Sprintf("%sautohtml\\*", currentPath)
//r.StaticFS("/", http.Dir("autohtml")) //r.StaticFS("/", http.Dir("autohtml"))
//r.LoadHTMLGlob("autohtml/**/**/**/*") //r.LoadHTMLGlob("autohtml/**/**/**/*")
r.LoadHTMLFiles("autohtml/index.html") // r.LoadHTMLFiles("autohtml/index.html")
r.GET("/index", func(c *gin.Context) { // r.GET("/index", func(c *gin.Context) {
c.HTML(http.StatusOK, "index.html", nil) // c.HTML(http.StatusOK, "index.html", nil)
}) // })
r.POST("/updatetest", UpdateTest) r.POST("/updatetest", UpdateTest)
openMq()
//mq 消费者 //go MQRecvier()
// go MQRecvier()
return r return r
} }
func CloseRouter() {
closeMq()
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment