kafka相关操作

调试工具操作

1、启动
nohup ./kafka-server-start.sh ../config/server.properties &

2、列出topicc
./kafka-topics.sh  --zookeeper 9.43.186.132:2181,9.43.186.152:2181,9.43.186.176:2181 --list 

3、创建topic
./kafka-topics.sh  --zookeeper 9.43.186.132:2181,9.43.186.152:2181,9.43.186.176:2181 --topic jouislu_test_topic --replication-factor 1 --partitions 1 --create

4、消费者 读数据
./kafka-console-consumer.sh --zookeeper  9.43.186.132:2181,9.43.186.152:2181,9.43.186.176:2181   --topic PedesInvadeJobResult --from-beginning
./kafka-console-consumer.sh --zookeeper  10.0.8.23:2181 --topic jouislu_test_topic --from-beginning
./kafka-console-consumer.sh --zookeeper  10.0.8.8:2181 --topic jouislu_test_topic --from-beginning
5、生产者
./kafka-console-producer.sh --broker-list 10.0.8.8:9092 --topic jouislu_test_topic 

6、删除topic
./kafka-topics --delete --zookeeper 10.0.8.23:2181 --topic PedesJobResult

go客户端

/*
"github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster"
*/
ctx, cancel = context.WithCancel(context.Background())

config := cluster.NewConfig()
config.Consumer.Return.Errors = true
config.Group.Return.Notifications = true
config.Version = sarama.V2_0_0_0

// 这里很有迷惑性,实际上,这个选项只有第一次new consumer的时候才会有效,当partion已经存在offset,这是没用的
// 如果想每次重启,都忽略中间产生的消息,必须更换group_ip
config.Consumer.Offsets.Initial = sarama.OffsetNewest

var topicArr = []string{topic}
// KafkaAddresses=["kafka.service.consul:9092"]
consumer, err := cluster.NewConsumer(kafkaAddress,
    kafkaGroupID, topicArr, config)

if err != nil {
    logging.Errorf("cluster.NewConsumer  err:%s", err)
    return nil
}

go func() {
    for err := range consumer.Errors() {
        logging.Errorf("consumer.Error: groupId:%s:Error: %s\n", kafkaGroupID, err.Error())
    }
}()
go func() {
    for ntf := range consumer.Notifications() {
        logging.Infof("consumer.Notification: groupId:%s Rebalanced: %+v \n", kafkaGroupID, ntf)
    }
}()

logging.Infof("NewKafka loop before")
Loop:
    for {
        select {
        case msgc, ok := <-consumer.Messages():
            if ok {
                //logging.Debugf("read msg %v", msgc)
                // do sth
                // 如果sarama.OffsetNewest ,commit意义不大
                // consumer.MarkOffset(msg, "")

            } else {
                logging.Errorf("read msg not ok %v", topic)
            }
        case <-ctx.Done():
            logging.Infof("kafka done %v", topic)
            break Loop
        case <-time.After(time.Second * 3):
            //logging.Debugf("NewKafka %v timeout", topic)
        }
    }
    logging.Infof("NewKafka kafka exit %v", topic)
    consumer.Close()

来自 大脸猫 写于 2019-04-13 10:49 -- 更新于2019-04-13 13:34 -- 0 条评论

0条评论

评论: