最新公告
  • 欢迎您光临网站无忧模板网,本站秉承服务宗旨 履行“站长”责任,销售只是起点 服务永无止境!立即加入钻石VIP
  • python如何连接kafka?

    正文概述    2020-04-17   440

    python如何连接kafka?

    1、kafka-python安装:

    # PyPI安装
    pip install kafka-python
     
    # conda安装
    conda install -c conda-forge kafka-python
     
    # anaconda自带pip安装
    /root/anaconda3/bin/pip install kafka-python

    2、kafka-python生产者

    producer.py

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    import datetime
    import json
    import time
    import uuid
    from kafka import KafkaProducer
    from kafka.errors import KafkaError
    producer = KafkaProducer(bootstrap_servers='100.69.222.221:9092,100.69.222.222:9092,100.69.222.223:9092')
    topic = 'test_20181105'
    def test():
        print('begin')
        try:
            n = 0
            while True:
                dic = {}
                dic['id'] = n
                n = n + 1
                dic['myuuid'] = str(uuid.uuid4().hex)
                dic['time'] = datetime.datetime.now().strftime("%Y%m%d %H:%M:%S")
                producer.send(topic, json.dumps(dic).encode())
                print("send:" + json.dumps(dic))
                time.sleep(0.5)
        except KafkaError as e:
            print(e)
        finally:
            producer.close()
            print('done')
    if __name__ == '__main__':
        test()

    服务器集群中配置好Kafka, 修改上面程序中的ip地址和端口号, 执行python脚本就可以成功将消息发送到 topic: test_20181105

    send:{"id": 1411, "myuuid": "a25a3d0361f94d3b8fffd5967ab5df01", "time": "20181105 16:11:14"}
    send:{"id": 1412, "myuuid": "784efd5389564194941240dca66233b6", "time": "20181105 16:11:14"}
    send:{"id": 1413, "myuuid": "6a211195319e447aa559614662f70590", "time": "20181105 16:11:15"}
    send:{"id": 1414, "myuuid": "2cc45bd82baf4a1cb41ea4786e50a0df", "time": "20181105 16:11:15"}
    send:{"id": 1415, "myuuid": "b7dfed4919c74164b83cf3ec28e257b6", "time": "20181105 16:11:16"}
    send:{"id": 1416, "myuuid": "9218eceb17834c228f5ab01ca7595272", "time": "20181105 16:11:16"}
    send:{"id": 1417, "myuuid": "c2751c54c390453f9eedd417fb1e5a31", "time": "20181105 16:11:17"}
    send:{"id": 1418, "myuuid": "9bbc4ef2cfbb42148332eb979b1142cb", "time": "20181105 16:11:17"}
    send:{"id": 1419, "myuuid": "f4998a862494445c976137793b55ed73", "time": "20181105 16:11:18"}

    3、kafka-python消费者

    consumer.py

    #!/bin/env python
    from kafka import KafkaConsumer
    # connect to Kafka server and pass the topic we want to consume
    consumer = KafkaConsumer('test_20181105',group_id = 'test_group2', bootstrap_servers='100.69.222.221:9092,100.69.222.222:9092,100.69.222.223:9092')
    try:
        for msg in consumer:
            print(msg)
            # print("%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value))
    except KeyboardInterrupt as e:
        print(e)

    同样修改上面的Ip地址和端口号,就可以接收 topic: test_20181105上的消息:

    ConsumerRecord(topic='test_20181105', partition=1, offset=951, timestamp=1541405600340, timestamp_type=0, key=None, value=b'{"id": 1663, "myuuid": "0f744021b2d9468886908ee6685a0fdb", "time": "20181105 16:13:20"}', checksum=1357895145, serialized_key_size=-1, serialized_value_size=87)
    ConsumerRecord(topic='test_20181105', partition=0, offset=935, timestamp=1541405600841, timestamp_type=0, key=None, value=b'{"id": 1664, "myuuid": "9379f68f656644bdb2d30911f06240e4", "time": "20181105 16:13:20"}', checksum=-715594646, serialized_key_size=-1, serialized_value_size=87)
    ConsumerRecord(topic='test_20181105', partition=1, offset=952, timestamp=1541405601341, timestamp_type=0, key=None, value=b'{"id": 1665, "myuuid": "f4a5fa5b32cd4b7991612b626bea4b0e", "time": "20181105 16:13:21"}', checksum=-2068072013, serialized_key_size=-1, serialized_value_size=87)

    可以通过设置不同的group_id 来实现消息队列或消息订阅:

    如果所有的consumer都具有相同的group,这种情况和queue模式很像;消息将会在consumers之间负载均衡.

    如果所有的consumer都具有不同的group,那这就是"发布-订阅";消息将会广播给所有的消费者.


    下载网 » python如何连接kafka?

    常见问题FAQ

    免费下载或者VIP会员专享资源能否直接商用?
    本站所有资源版权均属于原作者所有,这里所提供资源均只能用于参考学习用,请勿直接商用。若由于商用引起版权纠纷,一切责任均由使用者承担。更多说明请参考 VIP介绍。
    提示下载完但解压或打开不了?
    最常见的情况是下载不完整: 可对比下载完压缩包的与网盘上的容量,若小于网盘提示的容量则是这个原因。这是浏览器下载的bug,建议用百度网盘软件或迅雷下载。若排除这种情况,可在对应资源底部留言,或 联络我们.。
    找不到素材资源介绍文章里的示例图片?
    对于PPT,KEY,Mockups,APP,网页模版等类型的素材,文章内用于介绍的图片通常并不包含在对应可供下载素材包内。这些相关商业图片需另外购买,且本站不负责(也没有办法)找到出处。 同样地一些字体文件也是这种情况,但部分素材会在素材包内有一份字体下载链接清单。
    模板不会安装或需要功能定制以及二次开发?
    请QQ联系我们

    发表评论

    还没有评论,快来抢沙发吧!

    如需帝国cms功能定制以及二次开发请联系我们

    联系作者

    请选择支付方式

    ×
    迅虎支付宝
    迅虎微信
    支付宝当面付
    余额支付
    ×
    微信扫码支付 0 元