pykafa

from kafka import KafkaProducer from kafka import KafkaConsumer from kafka.errors import KafkaError import json class Kafka_consumer(): ''' 使用Kafka—python的消费模块 ''' def __init__(self, kafkahost, kafkaport, kafkatopic, groupid): self.kafkaHost = kafkahost self.kafkaPort = kafkaport self.kafkatopic = kafkatopic self.groupid = groupid self.consumer = KafkaConsumer(self.kafkatopic, group_id = self.groupid, bootstrap_servers = '{kafka_host}:{kafka_port}'.format( kafka_host=self.kafkaHost, kafka_port=self.kafkaPort ), auto_offset_reset='earliest',enable_auto_commit=False) def consume_data(self): try: for message in self.consumer: # print json.loads(message.value) yield message except KeyboardInterrupt, e: print e def main(): ''' 测试consumer和producer :return: ''' ##测试消费模块 #消费模块的返回格式为 # ConsumerRecord( # topic=u'ranktest', # partition=0, # offset=202, # timestamp=None, # timestamp_type=None, # key=None, # value='"{abetst}:{null}---0"', # checksum=-1868164195, #\serialized_key_size=-1, # serialized_value_size=21) consumer = Kafka_consumer('127. [Read More]