diff --git a/go-iot-mq/main.go b/go-iot-mq/main.go index 112b41cbf0b1eef5bf837dbb4bf378693826e072..07b1781b36ddbb649d17181b3d9004382a812a44 100644 --- a/go-iot-mq/main.go +++ b/go-iot-mq/main.go @@ -44,18 +44,15 @@ func main() { InitInfluxDbClient(globalConfig.InfluxConfig) writeAPI = GlobalInfluxDbClient.WriteAPI(globalConfig.InfluxConfig.Org, globalConfig.InfluxConfig.Bucket) //InitRabbitCon(globalConfig.MQConfig) - err = ConnectToRMQ() - if err != nil { - log.Fatalf("Failed to connect to RabbitMQ: %s", err) - } + err = ConnectToRMQ(globalConfig.MQConfig) + failOnError(err, "Failed to open a channel") zap.S().Infof("消息队列类型 %s", globalConfig.NodeInfo.Type) - CreateRabbitQueue("waring_handler") CreateRabbitQueue("waring_delay_handler") + initMongo() - failOnError(err, "Failed to open a channel") go startHttp() - cus := NewConsumer("", "amqp://guest:guest@localhost:5672", "", "", "") + cus := NewConsumer("", genUrl(globalConfig.MQConfig), "", "", "") err = cus.Connect() if err != nil { log.Fatalf("Failed to connect to RabbitMQ: %s", err) @@ -193,7 +190,7 @@ func InitRabbitCon(config MQConfig) { // // 返回RabbitMQ的连接字符串 func genUrl(config MQConfig) string { - connStr := fmt.Sprintf("amqp://%s:%s@%s:%d/", config.Username, config.Password, config.Host, config.Port) + connStr := fmt.Sprintf("amqp://%s:%s@%s:%d/", url.QueryEscape(config.Username), url.QueryEscape(config.Password), config.Host, config.Port) return connStr } @@ -234,7 +231,7 @@ func initMongo() { // 检查连接 err = client.Ping(context.TODO(), nil) if err != nil { - log.Fatal(err) + zap.S().Fatalf("Failed to connect to MongoDB: %+v", err) } GMongoClient = client diff --git a/go-iot-mq/mq.go b/go-iot-mq/mq.go index ff263fd08ded147f6975d26298fabdffde25b34a..737db00a43e61688f198767bb44c88ec7e6f2898 100644 --- a/go-iot-mq/mq.go +++ b/go-iot-mq/mq.go @@ -17,12 +17,12 @@ const ( var conn *amqp.Connection var chann *amqp.Channel -func ConnectToRMQ() (err error) { - conn, err = amqp.Dial(rmqCredentials) +func ConnectToRMQ(config MQConfig) (err error) { + zap.S().Infof("开始处理rabbitmq") + conn, err = amqp.Dial(genUrl(config)) if err != nil { return errors.New("Error de conexion: " + err.Error()) } - chann, err = conn.Channel() if err != nil { return errors.New("create channel error " + err.Error()) @@ -30,15 +30,13 @@ func ConnectToRMQ() (err error) { err = chann.Qos(1, 0, false) if err != nil { - return errors.New("qos setting error " + err.Error()) + return errors.New("Error al abrir canal: " + err.Error()) } - - observeConnection() - - return nil + observeConnection(config) + return err } -func observeConnection() { +func observeConnection(config MQConfig) { go func() { e := <-conn.NotifyClose(make(chan *amqp.Error)) if e != nil { @@ -47,9 +45,9 @@ func observeConnection() { } zap.S().Errorf("Intentando reconectar con RMQ\n") - closeActiveConnections() + closeActiveConnections(config) - for err := ConnectToRMQ(); err != nil; err = ConnectToRMQ() { + for err := ConnectToRMQ(config); err != nil; err = ConnectToRMQ(config) { zap.S().Error(err) time.Sleep(5 * time.Second) } @@ -57,7 +55,7 @@ func observeConnection() { } // Can be also implemented in graceful shutdowns -func closeActiveConnections() { +func closeActiveConnections(config MQConfig) { if chann.IsClosed() { channel, _ := conn.Channel() @@ -71,7 +69,7 @@ func closeActiveConnections() { } if conn.IsClosed() { - conn, _ = amqp.Dial(rmqCredentials) + conn, _ = amqp.Dial(genUrl(config)) chann, _ = conn.Channel() err := chann.Qos(1, 0, false) if err != nil { diff --git a/go-iot/rabbit_mq.go b/go-iot/rabbit_mq.go index 2e4b5535eb3144a4f847def3aa9fa45b62abbd95..b2756ad2454d3d7c7f398d33950f479abb08949f 100644 --- a/go-iot/rabbit_mq.go +++ b/go-iot/rabbit_mq.go @@ -5,6 +5,7 @@ import ( "fmt" amqp "github.com/rabbitmq/amqp091-go" "go.uber.org/zap" + "net/url" ) var GRabbitMq *amqp.Connection @@ -46,7 +47,7 @@ func InitRabbitCon() { } func genUrl() string { - connStr := fmt.Sprintf("amqp://%s:%s@%s:%d/", globalConfig.MQConfig.Username, globalConfig.MQConfig.Password, globalConfig.MQConfig.Host, globalConfig.MQConfig.Port) + connStr := fmt.Sprintf("amqp://%s:%s@%s:%d/", url.QueryEscape(globalConfig.MQConfig.Username), url.QueryEscape(globalConfig.MQConfig.Password), globalConfig.MQConfig.Host, globalConfig.MQConfig.Port) return connStr } diff --git a/iot-go-project/initialize/init.go b/iot-go-project/initialize/init.go index 3f72b00414711ed6fcc16ef8729e1fb0d4f8420c..2c971253b1aad1897bff31b8d6bc4edcc1057b8d 100644 --- a/iot-go-project/initialize/init.go +++ b/iot-go-project/initialize/init.go @@ -884,7 +884,7 @@ func InitRabbitCon() { } func genUrl() string { - connStr := fmt.Sprintf("amqp://%s:%s@%s:%d/", glob.GConfig.MQConfig.Username, glob.GConfig.MQConfig.Password, glob.GConfig.MQConfig.Host, glob.GConfig.MQConfig.Port) + connStr := fmt.Sprintf("amqp://%s:%s@%s:%d/", url.QueryEscape(glob.GConfig.MQConfig.Username), url.QueryEscape(glob.GConfig.MQConfig.Password), glob.GConfig.MQConfig.Host, glob.GConfig.MQConfig.Port) return connStr }