From e8dce976c91951e7ce6249be9f8dbf600708d4ae Mon Sep 17 00:00:00 2001 From: szw <1639914395@qq.com> Date: Wed, 19 Jun 2024 18:05:46 +0800 Subject: [PATCH 1/3] =?UTF-8?q?feat:=E5=A2=9E=E5=8A=A0MongoDB=E5=92=8Crabb?= =?UTF-8?q?itMq=E9=93=BE=E6=8E=A5=E5=9C=B0=E5=9D=80=E7=9A=84=E7=94=A8?= =?UTF-8?q?=E6=88=B7=E5=90=8D=E4=B8=8E=E5=AF=86=E7=A0=81=E7=9A=84url?= =?UTF-8?q?=E7=BC=96=E7=A0=81,=E8=A7=A3=E5=86=B3=E7=B1=BB=E4=BC=BC?= =?UTF-8?q?=E7=94=A8=E6=88=B7=E5=90=8D=E6=88=96=E8=80=85=E5=AF=86=E7=A0=81?= =?UTF-8?q?=E5=AD=98=E5=9C=A8@=E8=BF=99=E7=A7=8D=E7=89=B9=E6=AE=8A?= =?UTF-8?q?=E5=AD=97=E7=AC=A6=E5=BD=B1=E5=93=8D=E9=93=BE=E6=8E=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go-iot-mq/main.go | 3 ++- go-iot/rabbit_mq.go | 3 ++- iot-go-project/initialize/init.go | 5 +++-- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/go-iot-mq/main.go b/go-iot-mq/main.go index 6a043a0..6199335 100644 --- a/go-iot-mq/main.go +++ b/go-iot-mq/main.go @@ -15,6 +15,7 @@ import ( "log" "net/http" _ "net/http/pprof" + "net/url" "os" "strconv" ) @@ -221,7 +222,7 @@ var GMongoClient *mongo.Client // initMongo 函数用于初始化 MongoDB 连接 func initMongo() { - connStr := fmt.Sprintf("mongodb://%s:%s@%s:%d", globalConfig.MongoConfig.Username, globalConfig.MongoConfig.Password, globalConfig.MongoConfig.Host, globalConfig.MongoConfig.Port) + connStr := fmt.Sprintf("mongodb://%s:%s@%s:%d", url.QueryEscape(globalConfig.MongoConfig.Username), url.QueryEscape(globalConfig.MongoConfig.Password), globalConfig.MongoConfig.Host, globalConfig.MongoConfig.Port) client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(connStr)) if err != nil { log.Fatal(err) diff --git a/go-iot/rabbit_mq.go b/go-iot/rabbit_mq.go index 2e4b553..b2756ad 100644 --- a/go-iot/rabbit_mq.go +++ b/go-iot/rabbit_mq.go @@ -5,6 +5,7 @@ import ( "fmt" amqp "github.com/rabbitmq/amqp091-go" "go.uber.org/zap" + "net/url" ) var GRabbitMq *amqp.Connection @@ -46,7 +47,7 @@ func InitRabbitCon() { } func genUrl() string { - connStr := fmt.Sprintf("amqp://%s:%s@%s:%d/", globalConfig.MQConfig.Username, globalConfig.MQConfig.Password, globalConfig.MQConfig.Host, globalConfig.MQConfig.Port) + connStr := fmt.Sprintf("amqp://%s:%s@%s:%d/", url.QueryEscape(globalConfig.MQConfig.Username), url.QueryEscape(globalConfig.MQConfig.Password), globalConfig.MQConfig.Host, globalConfig.MQConfig.Port) return connStr } diff --git a/iot-go-project/initialize/init.go b/iot-go-project/initialize/init.go index bedae9c..44bff51 100644 --- a/iot-go-project/initialize/init.go +++ b/iot-go-project/initialize/init.go @@ -22,6 +22,7 @@ import ( "igp/models" "igp/router" "log" + "net/url" "os" "syscall" "time" @@ -128,7 +129,7 @@ func initDb() { } func initMongo() { - connStr := fmt.Sprintf("mongodb://%s:%s@%s:%d", glob.GConfig.MongoConfig.Username, glob.GConfig.MongoConfig.Password, glob.GConfig.MongoConfig.Host, glob.GConfig.MongoConfig.Port) + connStr := fmt.Sprintf("mongodb://%s:%s@%s:%d", url.QueryEscape(glob.GConfig.MongoConfig.Username), url.QueryEscape(glob.GConfig.MongoConfig.Password), glob.GConfig.MongoConfig.Host, glob.GConfig.MongoConfig.Port) client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(connStr)) if err != nil { log.Fatal(err) @@ -315,7 +316,7 @@ func InitRabbitCon() { } func genUrl() string { - connStr := fmt.Sprintf("amqp://%s:%s@%s:%d/", glob.GConfig.MQConfig.Username, glob.GConfig.MQConfig.Password, glob.GConfig.MQConfig.Host, glob.GConfig.MQConfig.Port) + connStr := fmt.Sprintf("amqp://%s:%s@%s:%d/", url.QueryEscape(glob.GConfig.MQConfig.Username), url.QueryEscape(glob.GConfig.MQConfig.Password), glob.GConfig.MQConfig.Host, glob.GConfig.MQConfig.Port) return connStr } -- Gitee From ede5cf6c6bc2069da0ffa02ab3d67ce93f0477b5 Mon Sep 17 00:00:00 2001 From: szw <1639914395@qq.com> Date: Wed, 19 Jun 2024 19:28:49 +0800 Subject: [PATCH 2/3] =?UTF-8?q?feat:=E4=BF=AE=E6=94=B9gim=E9=93=BE?= =?UTF-8?q?=E6=8E=A5mq=E9=85=8D=E7=BD=AE=E4=B8=BA=E8=AF=BB=E5=8F=96?= =?UTF-8?q?=E9=85=8D=E7=BD=AE=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go-iot-mq/main.go | 15 ++++++--------- go-iot-mq/mq.go | 29 +++++++++++++---------------- 2 files changed, 19 insertions(+), 25 deletions(-) diff --git a/go-iot-mq/main.go b/go-iot-mq/main.go index 6199335..0bd7f97 100644 --- a/go-iot-mq/main.go +++ b/go-iot-mq/main.go @@ -44,18 +44,15 @@ func main() { InitInfluxDbClient(globalConfig.InfluxConfig) writeAPI = GlobalInfluxDbClient.WriteAPI(globalConfig.InfluxConfig.Org, globalConfig.InfluxConfig.Bucket) //InitRabbitCon(globalConfig.MQConfig) - err = ConnectToRMQ() - if err != nil { - log.Fatalf("Failed to connect to RabbitMQ: %s", err) - } + err = ConnectToRMQ(globalConfig.MQConfig) + failOnError(err, "Failed to open a channel") zap.S().Infof("消息队列类型 %s", globalConfig.NodeInfo.Type) - CreateRabbitQueue("waring_handler") CreateRabbitQueue("waring_delay_handler") + initMongo() - failOnError(err, "Failed to open a channel") go startHttp() - cus := NewConsumer("", "amqp://guest:guest@localhost:5672", "", "", "") + cus := NewConsumer("", genUrl(globalConfig.MQConfig), "", "", "") err = cus.Connect() if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %s", err) @@ -193,7 +190,7 @@ func InitRabbitCon(config MQConfig) { // // 返回RabbitMQ的连接字符串 func genUrl(config MQConfig) string { - connStr := fmt.Sprintf("amqp://%s:%s@%s:%d/", config.Username, config.Password, config.Host, config.Port) + connStr := fmt.Sprintf("amqp://%s:%s@%s:%d/", url.QueryEscape(config.Username), url.QueryEscape(config.Password), config.Host, config.Port) return connStr } @@ -231,7 +228,7 @@ func initMongo() { // 检查连接 err = client.Ping(context.TODO(), nil) if err != nil { - log.Fatal(err) + zap.S().Fatalf("Failed to connect to MongoDB: %+v", err) } GMongoClient = client diff --git a/go-iot-mq/mq.go b/go-iot-mq/mq.go index ff263fd..c7d42b8 100644 --- a/go-iot-mq/mq.go +++ b/go-iot-mq/mq.go @@ -2,7 +2,6 @@ package main // RMQ PACKAGE - "rmq" import ( - "errors" "go.uber.org/zap" "time" @@ -17,28 +16,26 @@ const ( var conn *amqp.Connection var chann *amqp.Channel -func ConnectToRMQ() (err error) { - conn, err = amqp.Dial(rmqCredentials) +func ConnectToRMQ(config MQConfig) (err error) { + zap.S().Infof("开始处理rabbitmq") + conn, err = amqp.Dial(genUrl(config)) if err != nil { - return errors.New("Error de conexion: " + err.Error()) + zap.S().Fatalf("Failed to connect to RabbitMQ %+v", err) } - chann, err = conn.Channel() if err != nil { - return errors.New("create channel error " + err.Error()) + zap.S().Fatalf("create channel error %+v", err) } err = chann.Qos(1, 0, false) if err != nil { - return errors.New("qos setting error " + err.Error()) + zap.S().Fatalf("qos setting error %+v", err) } - - observeConnection() - - return nil + observeConnection(config) + return err } -func observeConnection() { +func observeConnection(config MQConfig) { go func() { e := <-conn.NotifyClose(make(chan *amqp.Error)) if e != nil { @@ -47,9 +44,9 @@ func observeConnection() { } zap.S().Errorf("Intentando reconectar con RMQ\n") - closeActiveConnections() + closeActiveConnections(config) - for err := ConnectToRMQ(); err != nil; err = ConnectToRMQ() { + for err := ConnectToRMQ(config); err != nil; err = ConnectToRMQ(config) { zap.S().Error(err) time.Sleep(5 * time.Second) } @@ -57,7 +54,7 @@ func observeConnection() { } // Can be also implemented in graceful shutdowns -func closeActiveConnections() { +func closeActiveConnections(config MQConfig) { if chann.IsClosed() { channel, _ := conn.Channel() @@ -71,7 +68,7 @@ func closeActiveConnections() { } if conn.IsClosed() { - conn, _ = amqp.Dial(rmqCredentials) + conn, _ = amqp.Dial(genUrl(config)) chann, _ = conn.Channel() err := chann.Qos(1, 0, false) if err != nil { -- Gitee From 119d5b81ae0ee1948d7dd1d563845dce4b28aef6 Mon Sep 17 00:00:00 2001 From: szw <1639914395@qq.com> Date: Wed, 19 Jun 2024 20:50:56 +0800 Subject: [PATCH 3/3] =?UTF-8?q?fix:=E4=BF=AE=E6=94=B9gim=E8=BF=9E=E6=8E=A5?= =?UTF-8?q?=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go-iot-mq/mq.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/go-iot-mq/mq.go b/go-iot-mq/mq.go index c7d42b8..737db00 100644 --- a/go-iot-mq/mq.go +++ b/go-iot-mq/mq.go @@ -2,6 +2,7 @@ package main // RMQ PACKAGE - "rmq" import ( + "errors" "go.uber.org/zap" "time" @@ -20,16 +21,16 @@ func ConnectToRMQ(config MQConfig) (err error) { zap.S().Infof("开始处理rabbitmq") conn, err = amqp.Dial(genUrl(config)) if err != nil { - zap.S().Fatalf("Failed to connect to RabbitMQ %+v", err) + return errors.New("Error de conexion: " + err.Error()) } chann, err = conn.Channel() if err != nil { - zap.S().Fatalf("create channel error %+v", err) + return errors.New("create channel error " + err.Error()) } err = chann.Qos(1, 0, false) if err != nil { - zap.S().Fatalf("qos setting error %+v", err) + return errors.New("Error al abrir canal: " + err.Error()) } observeConnection(config) return err -- Gitee