博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
python连接kafka生产者,消费者脚本
阅读量:5766 次
发布时间:2019-06-18

本文共 3118 字,大约阅读时间需要 10 分钟。

# -*- coding: utf-8 -*-'''''    使用kafka-Python 1.3.3模块    # pip install kafka==1.3.5    # pip install kafka-python==1.3.5'''import sysimport timeimport jsonfrom kafka import KafkaProducerfrom kafka import KafkaConsumerfrom kafka.errors import KafkaErrorKAFAKA_HOST = "101.236.51.235"KAFAKA_PORT = 9092KAFAKA_TOPIC = "test"class Kafka_producer():    '''''    生产模块:根据不同的key,区分消息    '''    def __init__(self, kafkahost,kafkaport, kafkatopic, key):        self.kafkaHost = kafkahost        self.kafkaPort = kafkaport        self.kafkatopic = kafkatopic        self.key = key        print("producer:h,p,t,k",kafkahost,kafkaport,kafkatopic,key)        bootstrap_servers = '{kafka_host}:{kafka_port}'.format(                kafka_host=self.kafkaHost,                kafka_port=self.kafkaPort                )        print("boot svr:",bootstrap_servers)        self.producer = KafkaProducer(bootstrap_servers = bootstrap_servers                )    def sendjsondata(self, params):        try:            parmas_message = json.dumps(params,ensure_ascii=False)            producer = self.producer            print(parmas_message)            v = parmas_message.encode('utf-8')            k = key.encode('utf-8')            print("send msg:(k,v)",k,v)            producer.send(self.kafkatopic, key=k, value= v)            producer.flush()        except KafkaError as e:            print (e)class Kafka_consumer():    '''''    消费模块: 通过不同groupid消费topic里面的消息    '''    def __init__(self, kafkahost, kafkaport, kafkatopic, groupid):        self.kafkaHost = kafkahost        self.kafkaPort = kafkaport        self.kafkatopic = kafkatopic        self.groupid = groupid        self.key = key        self.consumer = KafkaConsumer(self.kafkatopic, group_id = self.groupid,                bootstrap_servers = '{kafka_host}:{kafka_port}'.format(                    kafka_host=self.kafkaHost,                    kafka_port=self.kafkaPort )                )    def consume_data(self):        try:            for message in self.consumer:                yield message        except KeyboardInterrupt as e:            print (e)def main(xtype, group, key):    '''''    测试consumer和producer    '''    if xtype == "p":        # 生产模块        producer = Kafka_producer(KAFAKA_HOST, KAFAKA_PORT, KAFAKA_TOPIC, key)        print ("===========> producer:", producer)        for _id in range(100):           params = '{"msg" : "%s"}' % str(_id)           params=[{
"msg0" :_id},{
"msg1" :_id}] producer.sendjsondata(params) time.sleep(1) if xtype == 'c': # 消费模块 consumer = Kafka_consumer(KAFAKA_HOST, KAFAKA_PORT, KAFAKA_TOPIC, group) print ("===========> consumer:", consumer) message = consumer.consume_data() for msg in message: print ('msg---------------->k,v', msg.key,msg.value) print ('offset---------------->', msg.offset)if __name__ == '__main__': xtype = sys.argv[1] group = sys.argv[2] key = sys.argv[3] main(xtype, group, key)

使用方式

生产消息

python testkafka.py p g k

消费消息

python testkafka.py c g k

转载于:https://www.cnblogs.com/reblue520/p/8270412.html

你可能感兴趣的文章
深入理解脚本化CSS系列第四篇——脚本化样式表
查看>>
分镜头脚本
查看>>
ASP.NET中的cookie编程技术
查看>>
链表基本操作的实现(转)
查看>>
邮件发送1
查看>>
[转] libcurl异步方式使用总结(附流程图)
查看>>
编译安装LNMP
查看>>
[转]基于display:table的CSS布局
查看>>
企业级 SpringBoot 教程 (二)Spring Boot配置文件详解
查看>>
crm 02--->讲师页面及逻辑
查看>>
Docker: 如何修改 Docker 的镜像存储位置
查看>>
AS3.0 Bitmap类实现图片3D旋转效果
查看>>
Eigen ,MKL和 matlab 矩阵乘法速度比较
查看>>
对象继承其他对象的方法和属性
查看>>
带三角的面包屑导航栏(新增递增数字)
查看>>
分享一段微信摇一摇代码,有兴趣的可以试一试
查看>>
Web应用程序安全与风险
查看>>
codeforces 984 A. Game
查看>>
CSS居中
查看>>
One Person Game(概率+数学)
查看>>