https://blog.csdn.net/see_you_see_me/article/details/78468421
yum -y install python-pip pip install kafka-python from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092') Traceback (most recent call last): File "", line 1, in File "/usr/lib/python2.7/site-packages/kafka/producer/kafka.py", line 362, in __init__ **self.config) File "/usr/lib/python2.7/site-packages/kafka/client_async.py", line 219, in __init__ self.config['api_version'] = self.check_version(timeout=check_timeout) File "/usr/lib/python2.7/site-packages/kafka/client_async.py", line 819, in check_version raise Errors.NoBrokersAvailable() kafka.errors.NoBrokersAvailable: NoBrokersAvailable producer = KafkaProducer(bootstrap_servers=['localhost:9092'],api_version=(0,10)) kafka server.properties: listeners=PLAINTEXT://:9092 advertised.listeners=PLAINTEXT://kafka.hostname:9092
3、编写测试代码
无论使用哪种语言操作kafka其本质上都是在围绕两个角色进行的,分别是Producer、Consumer
已经在kafka boker里面创建好一个Topic,
1、创建Produer
1)、命令行方式—普通的发送方式
[root@node1 python_app]# python Python 2.6.6 (r266:84292, Nov 22 2013, 12:16:22) [GCC 4.4.7 20120313 (Red Hat 4.4.7-4)] on linux2 Type "help", "copyright", "credits" or "license" for more information. >>> from kafka import KafkaProducer >>> producer = KafkaProducer(bootstrap_servers='localhost:9092') >>> for _ in range(100): ... producer.send('world',b'some_message_bytes') ...
上面的几行功能分别是:
导入KafkaProducer
创建连接到192.168.120.11:9092这个Broker的Producer,
循环向world这个Topic发送100个消息,消息内容都是some_message_bytes’,这种发送方式不指定Partition,kafka会均匀大把这些消息分别写入5个Partiton里面,
更详细的说明可以参考 https://kafka-python.readthedocs.io/en/master/index.html
2)、命令行方式—发送json字符串
json作为一种强大的文本格式,已经得到非常普遍的应用,kafak-python也支持发送json格式的消息
其实如果你参考https://kafka-python.readthedocs.io/en/master/index.html这里的KafkaProducer里面的发送json
一定会报错的,这应该是这个文档的一个bug,
>>> producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8')) Traceback (most recent call last): File "", line 1, in File "/usr/lib/python2.6/site-packages/kafka_python-1.3.4-py2.6.egg/kafka/producer/kafka.py", line 347, in __init__ **self.config) File "/usr/lib/python2.6/site-packages/kafka_python-1.3.4-py2.6.egg/kafka/client_async.py", line 220, in __init__ self.config['api_version'] = self.check_version(timeout=check_timeout) File "/usr/lib/python2.6/site-packages/kafka_python-1.3.4-py2.6.egg/kafka/client_async.py", line 841, in check_version raise Errors.NoBrokersAvailable() kafka.errors.NoBrokersAvailable: NoBrokersAvailable
经过测试,应该是这样写才是OK的
>>> producer = KafkaProducer(bootstrap_servers='localhost:9092',value_serializer=lambda v: json.dumps(v).encode('utf-8')) >>> producer.send('world', {'key1': 'value1'}) >>>
3)、命令行方式—发送普通字符串
>>> producer.send('world', key=b'foo', value=b'bar') >>>
4)、命令行方式–发送压缩字符串
>>> producer = KafkaProducer(bootstrap_servers='192.168.120.11:9092',compression_type='gzip') >>> producer.send('world', b'msg 1')
经过测试这种方式发送的内容,在接收方收到的消息仍然是普通的字符串,也许是没有安装python-lz4,原文中有这样的内容:kafka-python supports gzip compression/decompression natively. To produce or consume lz4 compressed messages, you should install python-lz4 (pip install lz4). To enable snappy, install python-snappy (also requires snappy library). See Installation for more information.
上面都是测试各个命令的使用,接下来,我们写一个完整的脚本,这个脚本的功能是把指定目录下的文件名发送到world这个topic
file_monitor.py脚本
#-*- coding: utf-8 -*- from kafka import KafkaProducer import json import os import time from sys import argv producer = KafkaProducer(bootstrap_servers='192.168.120.11:9092') def log(str): t = time.strftime(r"%Y-%m-%d_%H-%M-%S",time.localtime()) print("[%s]%s"%(t,str)) def list_file(path): dir_list = os.listdir(path); for f in dir_list: producer.send('world',f) producer.flush() log('send: %s' % (f)) list_file(argv[1]) producer.close() log('done')
假如我们要监控/opt/jdk1.8.0_91/lib/missioncontrol/features这个目录下的文件,可以这样执行
python file_monitor.py /opt/jdk1.8.0_91/lib/missioncontrol/features
执行结果如下:
[root@node2 python_app]# python file_monitor.py /opt/jdk1.8.0_91/lib/missioncontrol/features [2017-11-07_17-41-04]send: org.eclipse.ecf.filetransfer.ssl.feature_1.0.0.v20140827-1444 [2017-11-07_17-41-04]send: org.eclipse.emf.common_2.10.1.v20140901-1043 [2017-11-07_17-41-04]send: com.jrockit.mc.feature.rcp.ja_5.5.0.165303 [2017-11-07_17-41-04]send: com.jrockit.mc.feature.console_5.5.0.165303 [2017-11-07_17-41-04]send: org.eclipse.ecf.core.feature_1.1.0.v20140827-1444 [2017-11-07_17-41-04]send: org.eclipse.equinox.p2.core.feature_1.3.0.v20140523-0116 [2017-11-07_17-41-04]send: org.eclipse.ecf.filetransfer.httpclient4.ssl.feature_1.0.0.v20140827-1444 [2017-11-07_17-41-04]send: com.jrockit.mc.feature.rcp_5.5.0.165303 [2017-11-07_17-41-04]send: org.eclipse.babel.nls_eclipse_zh_4.4.0.v20140623020002 [2017-11-07_17-41-04]send: com.jrockit.mc.rcp.product_5.5.0.165303 [2017-11-07_17-41-04]send: org.eclipse.help_2.0.102.v20141007-2301 [2017-11-07_17-41-04]send: org.eclipse.ecf.core.ssl.feature_1.0.0.v20140827-1444 [2017-11-07_17-41-04]send: org.eclipse.ecf.filetransfer.httpclient4.feature_3.9.1.v20140827-1444 [2017-11-07_17-41-04]send: org.eclipse.e4.rcp_1.3.100.v20141007-2033 [2017-11-07_17-41-04]send: org.eclipse.babel.nls_eclipse_ja_4.4.0.v20140623020002 [2017-11-07_17-41-04]send: com.jrockit.mc.feature.flightrecorder_5.5.0.165303 [2017-11-07_17-41-04]send: org.eclipse.emf.ecore_2.10.1.v20140901-1043 [2017-11-07_17-41-04]send: org.eclipse.equinox.p2.rcp.feature_1.2.0.v20140523-0116 [2017-11-07_17-41-04]send: org.eclipse.ecf.filetransfer.feature_3.9.0.v20140827-1444 [2017-11-07_17-41-04]send: com.jrockit.mc.feature.core_5.5.0.165303 [2017-11-07_17-41-04]send: org.eclipse.rcp_4.4.0.v20141007-2301 [2017-11-07_17-41-04]send: com.jrockit.mc.feature.rcp.zh_CN_5.5.0.165303 [2017-11-07_17-41-04]done
2、创建Consumer
通常使用Kafka时会创建不同的Topic,并且在Topic里面创建多个Partiton,因此作为Consumer,通常是连接到指定的Broker,指定的Topic来消费消息。
完整的python 脚本
consumer.py
#-*- coding: utf-8 -*- from kafka import KafkaConsumer consumer=KafkaConsumer('world',group_id='consumer-20171017',bootstrap_servers=['localhost:9092']) for msg in consumer: print msg
重新启动file_monitor.py脚本
[root@node2 python_app]# python file_monitor.py /opt/jdk1.8.0_91/lib/missioncontrol/features [2017-11-07_18-00-31]send: org.eclipse.ecf.filetransfer.ssl.feature_1.0.0.v20140827-1444 [2017-11-07_18-00-31]send: org.eclipse.emf.common_2.10.1.v20140901-1043 [2017-11-07_18-00-31]send: com.jrockit.mc.feature.rcp.ja_5.5.0.165303 [2017-11-07_18-00-31]send: com.jrockit.mc.feature.console_5.5.0.165303 [2017-11-07_18-00-31]send: org.eclipse.ecf.core.feature_1.1.0.v20140827-1444 [2017-11-07_18-00-31]send: org.eclipse.equinox.p2.core.feature_1.3.0.v20140523-0116 [2017-11-07_18-00-31]send: org.eclipse.ecf.filetransfer.httpclient4.ssl.feature_1.0.0.v20140827-1444 [2017-11-07_18-00-31]send: com.jrockit.mc.feature.rcp_5.5.0.165303 [2017-11-07_18-00-31]send: org.eclipse.babel.nls_eclipse_zh_4.4.0.v20140623020002 [2017-11-07_18-00-31]send: com.jrockit.mc.rcp.product_5.5.0.165303 [2017-11-07_18-00-31]send: org.eclipse.help_2.0.102.v20141007-2301 [2017-11-07_18-00-31]send: org.eclipse.ecf.core.ssl.feature_1.0.0.v20140827-1444 [2017-11-07_18-00-31]send: org.eclipse.ecf.filetransfer.httpclient4.feature_3.9.1.v20140827-1444 [2017-11-07_18-00-31]send: org.eclipse.e4.rcp_1.3.100.v20141007-2033 [2017-11-07_18-00-31]send: org.eclipse.babel.nls_eclipse_ja_4.4.0.v20140623020002 [2017-11-07_18-00-31]send: com.jrockit.mc.feature.flightrecorder_5.5.0.165303 [2017-11-07_18-00-31]send: org.eclipse.emf.ecore_2.10.1.v20140901-1043 [2017-11-07_18-00-31]send: org.eclipse.equinox.p2.rcp.feature_1.2.0.v20140523-0116 [2017-11-07_18-00-31]send: org.eclipse.ecf.filetransfer.feature_3.9.0.v20140827-1444 [2017-11-07_18-00-31]send: com.jrockit.mc.feature.core_5.5.0.165303 [2017-11-07_18-00-31]send: org.eclipse.rcp_4.4.0.v20141007-2301 [2017-11-07_18-00-31]send: com.jrockit.mc.feature.rcp.zh_CN_5.5.0.165303 [2017-11-07_18-00-31]done
然后启动consumer.py脚本
[root@node1 python_app]# python consumer.py [2017-09-23_11-34-00]start consumer [2017-09-23_11-34-10]world:3:121: key=None value=org.eclipse.ecf.filetransfer.ssl.feature_1.0.0.v20140827-1444 [2017-09-23_11-34-10]world:2:70: key=None value=org.eclipse.emf.common_2.10.1.v20140901-1043 [2017-09-23_11-34-10]world:3:122: key=None value=com.jrockit.mc.feature.rcp.ja_5.5.0.165303 [2017-09-23_11-34-10]world:2:71: key=None value=com.jrockit.mc.feature.console_5.5.0.165303 [2017-09-23_11-34-10]world:0:89: key=None value=org.eclipse.ecf.core.feature_1.1.0.v20140827-1444 [2017-09-23_11-34-10]world:4:101: key=None value=org.eclipse.equinox.p2.core.feature_1.3.0.v20140523-0116 [2017-09-23_11-34-10]world:1:117: key=None value=org.eclipse.ecf.filetransfer.httpclient4.ssl.feature_1.0.0.v20140827-1444 [2017-09-23_11-34-10]world:2:72: key=None value=com.jrockit.mc.feature.rcp_5.5.0.165303 [2017-09-23_11-34-10]world:4:102: key=None value=org.eclipse.babel.nls_eclipse_zh_4.4.0.v20140623020002 [2017-09-23_11-34-10]world:2:73: key=None value=com.jrockit.mc.rcp.product_5.5.0.165303 [2017-09-23_11-34-10]world:3:123: key=None value=org.eclipse.help_2.0.102.v20141007-2301 [2017-09-23_11-34-10]world:3:124: key=None value=org.eclipse.ecf.core.ssl.feature_1.0.0.v20140827-1444 [2017-09-23_11-34-10]world:0:90: key=None value=com.jrockit.mc.feature.flightrecorder_5.5.0.165303 [2017-09-23_11-34-10]world:3:125: key=None value=org.eclipse.ecf.filetransfer.httpclient4.feature_3.9.1.v20140827-1444 [2017-09-23_11-34-10]world:3:126: key=None value=org.eclipse.e4.rcp_1.3.100.v20141007-2033 [2017-09-23_11-34-10]world:3:127: key=None value=org.eclipse.babel.nls_eclipse_ja_4.4.0.v20140623020002 [2017-09-23_11-34-10]world:2:74: key=None value=org.eclipse.emf.ecore_2.10.1.v20140901-1043 [2017-09-23_11-34-10]world:3:128: key=None value=org.eclipse.equinox.p2.rcp.feature_1.2.0.v20140523-0116 [2017-09-23_11-34-10]world:0:91: key=None value=com.jrockit.mc.feature.core_5.5.0.165303 [2017-09-23_11-34-10]world:3:129: key=None value=org.eclipse.ecf.filetransfer.feature_3.9.0.v20140827-1444 [2017-09-23_11-34-11]world:3:130: key=None value=org.eclipse.rcp_4.4.0.v20141007-2301 [2017-09-23_11-34-11]world:3:131: key=None value=com.jrockit.mc.feature.rcp.zh_CN_5.5.0.165303
可以看到file_monitor.py脚本发送了一批文件名到word这个topic,并且consumer.py收到了这些文件名。