Commit cf2bde48 by yunpeng.song

初始化

parents
[mq]
path=mqcfg.ini
package config
//rabbit
type MQCfg struct {
Hostip string // 主机ip
Hostport string // 主机端口
Username string // 用户名
Userpwd string // 密码
}
type QueueExchangeCfg struct {
Durable bool // true,持久化
QuName string // 队列名称
RtKey string // key值
ExName string // 交换机名称
ExType string // 交换机类型
}
func NewMQCfg(hostip, hostport, userName, userPwd string) *MQCfg {
return &MQCfg{hostip, hostport, userName, userPwd}
}
func NewExchangeCfg(durable bool, quName, rtKey, exName, exType string) *QueueExchangeCfg {
return &QueueExchangeCfg{durable, quName, rtKey, exName, exType}
}
package config
import (
"ficus_clientmanager/tool"
"fmt"
"github.com/Unknwon/goconfig"
)
var (
mqCfgFile *goconfig.ConfigFile // mq的配置文件
defalutCfgFile *goconfig.ConfigFile // 应用程序默认配置文件
path string // 应用程序当前路径
mqCfgPath string // mq 配置文件路径
)
func LoadConfig() {
path, err := tool.GetCurrentDirectory()
if err != nil {
fmt.Printf("错误getCurrentDirectory")
}
fmt.Printf(path)
if err = loaddefalutCfgFile(path + "config.ini"); err != nil {
panic("loaddefalutCfgFile failed")
}
mqCfgPath, err := defalutCfgFile.GetValue("mq", "path")
if err != nil {
fmt.Printf("错误init")
}
loadMqCfgFile(path + mqCfgPath)
}
func loaddefalutCfgFile(path string) error {
var err error = nil
defalutCfgFile, err = goconfig.LoadConfigFile(path)
return err
}
func loadMqCfgFile(path string) error {
var err error = nil
mqCfgFile, err = goconfig.LoadConfigFile(path)
return err
}
func GetMqConfig() *MQCfg {
hostip, _ := mqCfgFile.GetValue("mq", "hostip")
hostport, _ := mqCfgFile.GetValue("mq", "hostport")
username, _ := mqCfgFile.GetValue("mq", "username")
userpwd, _ := mqCfgFile.GetValue("mq", "userpwd")
mqCfg := &MQCfg{
Hostip: hostip,
Hostport: hostport,
Username: username,
Userpwd: userpwd,
}
return mqCfg
}
func GetQueueExchangeCfg() *QueueExchangeCfg {
durable, _ := mqCfgFile.Bool("exchange", "durable")
quName, _ := mqCfgFile.GetValue("exchange", "quName")
rtKey, _ := mqCfgFile.GetValue("exchange", "rtKey")
exName, _ := mqCfgFile.GetValue("exchange", "exName")
exType, _ := mqCfgFile.GetValue("exchange", "exType")
exchangeCfg := &QueueExchangeCfg{
Durable: durable,
QuName: quName,
RtKey: rtKey,
ExName: exName,
ExType: exType,
}
return exchangeCfg
}
package main
import (
"ficus_clientmanager/config"
"ficus_clientmanager/router"
"fmt"
"github.com/gin-gonic/gin"
)
func main() {
config.LoadConfig()
router.InitGin()
router.InitLog()
fmt.Fprintln(gin.DefaultWriter, "start")
r := router.InitRouter()
r.Run(":20991")
}
[mq]
hostip=127.0.0.1
hostport=5672
username=guest
userpwd=guest
[exchange]
durable = true
quName=test4
rtKey=ficus.key
exName=amq.direct
exType=direct
\ No newline at end of file
package mqcontrol
import (
"ficus_clientmanager/config"
"fmt"
"github.com/streadway/amqp"
)
// 定义RabbitMQ对象
type RabbitMQ struct {
connection *amqp.Connection // 连接
channel *amqp.Channel // 管道
exchange *config.QueueExchangeCfg // 交换机对象
cfg *config.MQCfg // 交换机连接配置
}
// 定义队列交换机对象
// 链接rabbitMQ
func (r *RabbitMQ) MqConnect() (err error) {
cfg := r.cfg
RabbitUrl := fmt.Sprintf("amqp://%s:%s@%s:%s/", cfg.Username, cfg.Userpwd, cfg.Hostip, cfg.Hostport)
r.connection, err = amqp.Dial(RabbitUrl)
if err != nil {
fmt.Printf("MQ打开链接失败:%s \r\n", err)
return
}
fmt.Printf("MQ打开链接成功\r\n")
r.channel, err = r.connection.Channel()
if err != nil {
fmt.Printf("MQ打开管道失败:%s \r\n", err)
return
}
fmt.Printf("MQ打开管道成功\r\n")
return nil
}
// 关闭RabbitMQ连接
func (r *RabbitMQ) MqClose() error {
// 先关闭管道,再关闭链接
err := r.channel.Close()
if err != nil {
fmt.Printf("MQ管道关闭失败:%s \r\n", err)
return err
}
err = r.connection.Close()
if err != nil {
fmt.Printf("MQ链接关闭失败:%s \r\n", err)
return err
}
return err
}
// 创建一个新的操作对象
func New(q *config.QueueExchangeCfg, c *config.MQCfg) *RabbitMQ {
return &RabbitMQ{
exchange: q,
cfg: c,
}
}
package mqcontrol
import "fmt"
// 监听接收者接收任务
func (r *RabbitMQ) Recv() {
// 验证链接是否正常,否则重新链接
if r.channel == nil {
r.MqConnect()
}
exchange := r.exchange
// 用于检查队列是否存在,已经存在不需要重复声明
_, err := r.channel.QueueDeclarePassive(exchange.QuName, exchange.Durable, false, false, true, nil)
if err != nil {
// 队列不存在,声明队列
// name:队列名称;durable:是否持久化,队列存盘,true服务重启后信息不会丢失,影响性能;autoDelete:是否自动删除;noWait:是否非阻塞,
// true为是,不等待RMQ返回信息;args:参数,传nil即可;exclusive:是否设置排他
_, err = r.channel.QueueDeclare(exchange.QuName, exchange.Durable, false, false, true, nil)
if err != nil {
fmt.Printf("MQ注册队列失败:%s \n", err)
return
}
}
// 队列绑定
err = r.channel.QueueBind(exchange.QuName, exchange.RtKey, exchange.ExName, true, nil)
if err != nil {
fmt.Printf("MQ绑定队列失败:%s \n", err)
return
}
// 获取消费通道,确保rabbitMQ一个一个发送消息
err = r.channel.Qos(1, 0, true)
msgList, err := r.channel.Consume(exchange.QuName, "", false, false, false, false, nil)
if err != nil {
fmt.Printf("获取消费通道异常:%s \n", err)
return
}
for msg := range msgList {
// 处理数据
//Todo yunpeng.song@freemud 2019.0724
err := func() error {
fmt.Println(string(msg.Body))
return err
}()
if err != nil {
err = msg.Ack(true)
if err != nil {
fmt.Printf("确认消息未完成异常:%s \n", err)
continue
}
} else {
// 确认消息,必须为false
err = msg.Ack(false)
if err != nil {
fmt.Printf("确认消息完成异常:%s \n", err)
continue
}
continue
}
}
}
package mqcontrol
import (
"fmt"
"github.com/streadway/amqp"
)
//发送任务
func (r *RabbitMQ) SendMsg(msg []byte) error {
// 验证链接是否正常,否则重新链接
if r.connection.IsClosed() || r.channel == nil {
fmt.Printf("连接断开,重新连接")
r.MqConnect()
}
exchange := r.exchange
// 用于检查队列是否存在,已经存在不需要重复声明
_, err := r.channel.QueueDeclarePassive(exchange.QuName, exchange.Durable, false, false, true, nil)
if err != nil {
fmt.Printf("MQ队列%s不存在 \r\n", exchange.QuName)
// 队列不存在,声明队列
// name:队列名称;durable:是否持久化,队列存盘,true服务重启后信息不会丢失,影响性能;autoDelete:是否自动删除;noWait:是否非阻塞,
// true为是,不等待RMQ返回信息;args:参数,传nil即可;exclusive:是否设置排他
_, err := r.channel.QueueDeclare(exchange.QuName, exchange.Durable, false, false, true, nil)
if err != nil {
fmt.Printf("MQ注册队列失败:%s \r\n", err)
return err
}
}
fmt.Printf("MQ注册队列%s成功 \r\n", exchange.QuName)
// 队列绑定
err = r.channel.QueueBind(exchange.QuName, exchange.RtKey, exchange.ExName, true, nil)
if err != nil {
fmt.Printf("MQ绑定队列失败:%s \r\n", err)
return err
}
fmt.Printf("MQ绑定队列%s成功,RtKey:%s,ExName:%s\r\n", exchange.QuName, exchange.RtKey, exchange.ExName)
// 用于检查交换机是否存在,已经存在不需要重复声明
err = r.channel.ExchangeDeclarePassive(exchange.ExName, exchange.ExType, exchange.Durable, false, false, true, nil)
if err != nil {
fmt.Printf("MQ交换机%s不存在 \r\n", exchange.ExName)
// 注册交换机
// name:交换机名称,kind:交换机类型,durable:是否持久化,队列存盘,true服务重启后信息不会丢失,影响性能;autoDelete:是否自动删除;
// noWait:是否非阻塞, true为是,不等待RMQ返回信息;args:参数,传nil即可; internal:是否为内部
err = r.channel.ExchangeDeclare(exchange.ExName, exchange.ExType, exchange.Durable, false, false, true, nil)
if err != nil {
fmt.Printf("MQ注册交换机失败:%s \r\n", err)
return err
}
}
fmt.Printf("MQ注册交换机%s成功\r\n", exchange.ExName)
// 发送任务消息
err = r.channel.Publish(exchange.ExName, exchange.RtKey, false, false, amqp.Publishing{
ContentType: "text/plain",
Body: msg,
})
if err != nil {
fmt.Printf("MQ任务发送失败:%s \r\n", err)
return err
}
fmt.Printf("MQ任务发送成功\r\n")
return nil
}
package router
import (
"bytes"
ficus "ficus/proto"
"ficus_clientmanager/config"
"ficus_clientmanager/mqcontrol"
"fmt"
"io/ioutil"
"github.com/gin-gonic/gin"
)
type Agents struct {
Agents []ficus.Agent `json:"agents"`
}
func UpdateTest(c *gin.Context) {
body, err := ioutil.ReadAll(c.Request.Body)
fmt.Println(string(body))
if err != nil {
WrongRequest(c, err)
}
c.Request.Body = ioutil.NopCloser(bytes.NewBuffer(body))
agents := Agents{}
if err := c.ShouldBindJSON(&agents); err != nil {
WrongRequest(c, err)
return
}
mqCfg := config.GetMqConfig()
exchange := config.GetQueueExchangeCfg()
rabbitMQ := mqcontrol.New(exchange, mqCfg)
if err := rabbitMQ.MqConnect(); err != nil {
WrongRequest(c, err)
return
}
if err := rabbitMQ.SendMsg(body); err != nil {
WrongRequest(c, err)
return
}
if err := rabbitMQ.MqClose(); err != nil {
WrongRequest(c, err)
return
}
NormalRequest(c)
}
func MQRecvier() {
mqCfg := config.GetMqConfig()
exchange := config.GetQueueExchangeCfg()
rabbitMQ := mqcontrol.New(exchange, mqCfg)
if err := rabbitMQ.MqConnect(); err != nil {
panic("cuowu")
}
defer rabbitMQ.MqClose()
rabbitMQ.Recv()
}
package router
import (
"fmt"
"io"
"net/http"
"os"
"github.com/gin-gonic/gin"
)
func WrongRequest(c *gin.Context, err error) {
fmt.Fprintln(gin.DefaultWriter, "错误", err)
c.JSON(http.StatusOK, gin.H{
"statusCOde": "101",
"msg": err.Error(),
})
}
func NormalRequest(c *gin.Context) {
fmt.Fprintln(gin.DefaultWriter, "消息投递成功")
c.JSON(http.StatusOK, gin.H{
"statusCOde": "100",
"msg": "消息投递成功",
})
}
func NoResponse(c *gin.Context) {
//返回404状态码
c.JSON(http.StatusNotFound, gin.H{
"status": 404,
"error": "404, server not exists!",
})
}
func InitGin() {
gin.SetMode(gin.TestMode)
}
func InitLog() {
gin.DisableConsoleColor()
f, _ := os.Create("gin.log")
gin.DefaultWriter = io.MultiWriter(f, os.Stdout)
}
func InitRouter() *gin.Engine {
r := gin.Default()
r.NoRoute(NoResponse)
// currentPath, err := tool.GetCurrentDirectory()
// if err != nil {
// fmt.Println(err)
// }
// imgFilePath := fmt.Sprintf("%sautohtml\\*", currentPath)
//r.StaticFS("/", http.Dir("autohtml"))
//r.LoadHTMLGlob("autohtml/**/**/**/*")
r.LoadHTMLFiles("autohtml/index.html")
r.GET("/index", func(c *gin.Context) {
c.HTML(http.StatusOK, "index.html", nil)
})
r.POST("/updatetest", UpdateTest)
//mq 消费者
// go MQRecvier()
return r
}
package tool
import (
"errors"
"os"
"os/exec"
"path/filepath"
"strings"
)
func GetCurrentDirectory() (string, error) {
file, err := exec.LookPath(os.Args[0])
if err != nil {
return "", err
}
path, err := filepath.Abs(file)
if err != nil {
return "", err
}
i := strings.LastIndex(path, "/")
if i < 0 {
i = strings.LastIndex(path, "\\")
}
if i < 0 {
return "", errors.New(`error: Can't find "/" or "\".`)
}
return string(path[0 : i+1]), nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment