go: nsq启动与部署 ## 单实例 ### 启动lookup ``` docker run --name lookupd -p 4160:4160 -p 4161:4161 nsqio/nsq /nsqlookupd ``` ### 启动服务 ``` docker run --name nsqd -p 4150:4150 -p 4151:4151 \ nsqio/nsq /nsqd \ --broadcast-address=0.0.0.0 \ --lookupd-tcp-address=0.0.0.0:4160 \ --mem-queue-size=5000 \ --max-body-size=20971520 \ --max-msg-size=4194304 ``` ### 启动admin ``` docker run --name nsqadmin -p 4171:4171 nsqio/nsq /nsqadmin --lookupd-http-address=0.0.0.0:4161 ``` ### docker compose ``` version: '3' services: nsqlookupd: image: nsqio/nsq command: /nsqlookupd ports: - "4160:4160" - "4161" nsqd: image: nsqio/nsq command: /nsqd --lookupd-tcp-address=nsqlookupd:4160 --mem-queue-size=5000 \ --max-body-size=20971520 \ --max-msg-size=4194304 depends_on: - nsqlookupd ports: - "4150:4150" - "4151" nsqadmin: image: nsqio/nsq command: /nsqadmin --lookupd-http-address=nsqlookupd:4161 depends_on: - nsqlookupd ports: - "8080:4171" networks: default: driver: bridge ipam: config: - subnet: 172.16.58.0/24 ``` ### producer ```go package main import ( "fmt" "github.com/nsqio/go-nsq" "github.com/sirupsen/logrus" ) func main() { url := "127.0.0.1:4150" cfg := nsq.NewConfig() cfg.MaxInFlight = 200 cfg.DialTimeout = time.Second * 10 cfg.WriteTimeout = time.Second * 10 producer, err := nsq.NewProducer(url, cfg) if err != nil { logrus.Fatal(err) } i := 0 for { i++ msg := []byte(fmt.Sprintf("hello:%v", i)) err := producer.Publish("test", msg) if err != nil { logrus.Errorf("publish error:%v", err) } logrus.Infof("write:%s", msg) // time.Sleep(time.Millisecond * 1) if i == 11000 { producer.Stop() break } } } ``` 异步生产消息 ``` var GlobalNsqCh = make(chan *nsq.ProducerTransaction, 10) go func() { for { trans, ok := <-GlobalNsqCh if !ok { return } if trans.Error != nil { logrus.Errorf("publishAsync error:%v, msg will be lost", err) } } }() err = producer.PublishAsync(topic, zipBytes, GlobalNsqCh, args1,args2) if err != nil { logrus.Errorf("publish error:%v", err) return err } ``` ### consumer ```go package main import ( "log" "time" "github.com/nsqio/go-nsq" ) func main() { cfg := nsq.NewConfig() cfg.LookupdPollInterval = time.Second //设置重连时间 c, err := nsq.NewConsumer("test", "test-channel", cfg) // 新建一个消费者 if err != nil { log.Fatal(err) } c.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error { log.Printf("consume:%s %s", message.ID, message.Body) return nil })) if err := c.ConnectToNSQDs([]string{"127.0.0.1:4160"}); err != nil { log.Printf("ConnectToNSQDs error:%v", err) } <-c.StopChan } ``` ## api相关操作 ### 查看某个topic的相关数据 ``` nsqadmin http://127.0.0.1:4171/api/topics/test ``` 回包 ```json { "node": "*", "hostname": "", "topic_name": "test", "depth": 0, "memory_depth": 0, "backend_depth": 0, "message_count": 0, "nodes": [ { "node": "honoryin-LC3:4151", "hostname": "honoryin-LC3", "topic_name": "test", "depth": 0, "memory_depth": 0, "backend_depth": 0, "message_count": 0, "nodes": null, "channels": [ { "node": "honoryin-LC3:4151", "hostname": "honoryin-LC3", "topic_name": "test", "channel_name": "test-channel", "depth": 0, "memory_depth": 0, "backend_depth": 0, "in_flight_count": 0, "deferred_count": 0, "requeue_count": 0, "timeout_count": 0, "message_count": 0, "client_count": 0, "nodes": null, "clients": null, "paused": false, "e2e_processing_latency": { "count": 0, "percentiles": null, "topic": "", "channel": "", "host": "" } } ], "paused": false, "e2e_processing_latency": { "count": 0, "percentiles": null, "topic": "", "channel": "", "host": "" } } ], "channels": [ { "node": "honoryin-LC3:4151", "hostname": "honoryin-LC3", "topic_name": "test", "channel_name": "test-channel", "depth": 0, "memory_depth": 0, "backend_depth": 0, "in_flight_count": 0, "deferred_count": 0, "requeue_count": 0, "timeout_count": 0, "message_count": 0, "client_count": 0, "nodes": null, "clients": null, "paused": false, "e2e_processing_latency": { "count": 0, "percentiles": null, "topic": "", "channel": "", "host": "" } } ], "paused": false, "e2e_processing_latency": { "count": 0, "percentiles": null, "topic": "test", "channel": "", "host": "*" }, "message": "" } ``` 来自 大脸猪 写于 2021-06-11 14:40 -- 更新于2022-01-23 00:26 -- 0 条评论