kafka相关操作 ## 调试工具操作 ```console - 启动 bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties - 列出topicc ./kafka-topics.sh --zookeeper 9.43.186.132:2181,9.43.186.152:2181,9.43.186.176:2181 --list - 创建topic bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic test --partitions 1 --replication-factor 1 - 查看topic的状态 bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test - 消费者 读数据 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --group superpig - 生产者 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test - 删除topic ./kafka-topics --delete --zookeeper 10.0.8.23:2181 --topic PedesJobResult - 查看kafka group bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9292 --group lx_test --describe ``` ## go客户端,读消息,仅读取 ```go // "github.com/segmentio/kafka-go" r := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{"127.0.0.1:9092"}, Topic: "talos-attr-data", MinBytes: 10e3, // 10KB MaxBytes: 10e6, // 10MB }) r.SetOffset(kafka.LastOffset) i := 0 for { m, err := r.ReadMessage(context.Background()) if err != nil { break } log.Infof("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value)) } r.Close() ``` ## go客户端,按group消费 ```go ctx, cancel = context.WithCancel(ctx) defaultConfig := kafka.ReaderConfig{ Brokers: strings.Split(addr, ","), GroupID: groupID, Topic: topic, MinBytes: 10e3, // 10KB MaxBytes: 20e6, // 20MB CommitInterval: time.Second * 2, QueueCapacity: 50, Logger: kafka.LoggerFunc(logrus.Debugf), ErrorLogger: kafka.LoggerFunc(logrus.Errorf), WatchPartitionChanges: true, } reader = kafka.NewReader(defaultConfig) go func() { select { case <-ctx.Done(): err := k.Close() if err != nil { logrus.Errorf("close reader error:%v", err) } return } }() // 获取消息 m, err := reader.FetchMessage(ctx) if err != nil { } // 操作m的数据 err := reader.CommitMessages(ctx, m) if err != nil { } ``` ## go客户端,写消息 ```go w := &kafka.Writer{ Addr: kafkago.TCP(host), RequiredAcks: kafkago.RequireOne, Async: true, // make the writer asynchronous Compression: kafkago.Snappy, Completion: func(messages []kafkago.Message, err error) { switch e := err.(type) { case nil: for _, msg := range messages { // 发送成功 } case kafkago.WriteErrors: for i := range messages { msg := messages[i] if e[i] != nil { // 处理错误 } } default: // handle other errors logrus.Errorf("kafka completion error:%v", err) for _, msg := range messages { // 处理其它消息 } } }, } msg := kafka.Message{ Topic: topic, Value: val, Time: now, Headers: headers, } w.WriteMessages(context.Background(), msg) ``` ## 搭建docker简单的测试环境 参考:https://juejin.im/entry/6844903829624848398 ```bash docker pull wurstmeister/zookeeper docker pull wurstmeister/kafka docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=190.134.127.193:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://190.134.127.193:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka # KAFKA_ZOOKEEPER_CONNECT KAFKA_ADVERTISED_LISTENERS 可以设置真实IP,让其它机器访问 ``` 来自 大脸猫 写于 2019-04-13 10:49 -- 更新于2021-05-21 16:54 -- 0 条评论