kafka-python的API简单介绍

2021/4/23 12:25:26

本文主要是介绍kafka-python的API简单介绍,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

在上一篇文章中说明了kafka-python的API使用的理论概念,这篇文章来说明API的实际使用。

在官方文档详细列出了kafka-python的API接口https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

对于生成者我们着重于介绍一个send方法,其余的方法提到的时候会说明,在官方文档中有许多可配置参数可以查看,也可以查看上一篇博文中的参数。

#send方法的详细说明,send用于向主题发送信息
send(topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None)

 – topic where the message will be published,指定向哪个主题发送消息。
 – message value. Must be type bytes, or be serializable to bytes via configured value_serializer. If value is None, key is required and message acts as a ‘delete’. 
               #value为要发送的消息值,,如果这个值为空,则必须有对应的key值,并且空值被标记为删除。可以通过配置
 – a key to associate with the message. Can be used to determine which partition to send the message to. If partition is None (and producer’s partitioner config is left as default), 
               then messages with the same key will be delivered to the same partition (but if key is None, partition is chosen randomly). Must be 
                type bytes, or be serializable to bytes via configured key_serializer.
         #key与value对应的键值,。kafka根据key值确定消息发往哪个分区(如果分区被指定则发往指定的分区),具有相同key的消息被发往同一个分区,如果key
               #为NONE则随机选择分区,可以使用参数序列化为字节类型。
 – a list of header key value pairs. List items are tuples of str key and bytes value.
               #键值对的列表头部,列表项是str(key)和bytes(value)。
 – epoch milliseconds (from Jan 1 1970 UTC) to use as the message timestamp. Defaults to current time.
               #时间戳

消息发送成功,返回的是RecordMetadata的对象;否则的话引发KafkaTimeoutError异常

在进行实际测试前,先创建一个topics,这里我们利用控制台创建:

[root@test3 bin]# ./kafka-topics. --zookeeper=.:,.: --create --topic kafkatest --replication-factor  --partitions /kafka-topics. --zookeeper=.:,.: --list --/kafka-topics. --zookeeper=.:,.: --describe --    ReplicationFactor:    Leader:     Replicas: ,,    Isr: ,,    Leader:     Replicas: ,,    Isr: ,,    Leader:     Replicas: ,,    Isr: ,,

 

一个简易的生产者demo如下:(摘自:https://blog.csdn.net/luanpeng825485697/article/details/81036028)

= KafkaProducer(bootstrap_servers=[= += =  %kafkatest, key=bytes(str(i), value=msg.encode(.(

一个消费者的demo接收上面生产者发送的数据。

= KafkaConsumer(, bootstrap_servers=[], auto_offset_reset= msg = msg.key.decode(encoding== msg.value.decode(encoding= % (msg.topic, msg.partition, msg.offset, key, value))

#这是一个阻塞的过程,当生产者有消息传来的时候,就会读取消息,若是没有消息就会阻塞等待
#参数表示重置偏移量,有两个取值,latest表示读取消息队列中最新的消息,另一个取值earliest表示读取最早的消息。

执行上面的两个demo,得到的结果如下:

消费者群组

在上一篇博文中,说明了消费者群组与消费者的概念,这里我们来定义一个消费者群组。

一个群组里的消费者订阅的是同一个主题,每个消费者接收主题一部分分区的消息。每个消费者接收主题一部分分区的消息

创建一个消费者群组如下:

= KafkaConsumer(, group_id=, bootstrap_servers=[], auto_offset_reset= msg = msg.key.decode(encoding== msg.value.decode(encoding= % (msg.topic, msg.partition, msg.offset, key, value))

消费者群组中的消费者总是消费订阅主题的部分数据。

在pycharm中把上面的代码复制一份,这样在一个test1群组中就有了两个消费者,同时执行。

分析: kafkatest主题有3个分区,3个分区会被分配给test1群组中的两个消费者,在上面一篇博文中提到,默认的分配策略时range。也就是说一个消费者可能由2个分区,另一个消费者只有一个分区;执行结果如下:

下面会通过实例来说明几个消费者的方法的使用

 kafka-python的API官方文档介绍的很清楚,可以查看:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

>>>>>> consumer = KafkaConsumer(, group_id=, bootstrap_servers=[>>>, , >>> consumer.partitions_for_topic(, , >>>}

>>> consumer.    

TypeError: partition must be a TopicPartition namedtuple

#需要注意的是position方法需要传入的是一个kafka-python自带的一种数据结构TopicPartition,这种数据结构的定义如下,在使用的时候需要导入
TopicPartition = namedtuple("TopicPartition", ["topic", "partition"])

.position(TopicPartition(topic='kafkatest', partition=1))
17580

下面说明poll()方法的用法:

方法:  batches by topic-partition. On each poll, consumer will try to use the  consumed offset as the starting offset and fetch sequentially. The  consumed offset can be manually set through seek() or automatically set as the  committed offset - – Milliseconds spent waiting  poll  data is not available  the buffer. If , returns immediately with any 
                records that are available currently  the buffer,  returns empty. Must not be negative. Default:  – The maximum number of records returned  a single call to poll(). Default: Inherit value from max_poll_records. 
                            默认从max_poll_records继承值。
= KafkaConsumer(, bootstrap_servers=[= consumer.poll(timeout_ms=.()


#执行结果如下,返回的是一个字典,consumerRecord对象包含着消息的一些元数据信息
{TopicPartition(topic='kafkatest', partition=2): [ConsumerRecord(topic='kafkatest', partition=2, offset=21929, timestamp=1545978879892, timestamp_type=0, key=b'138', value=b'producer1+138', checksum=-660348132, serialized_key_size=3, serialized_value_size=13)]}
{TopicPartition(topic='kafkatest', partition=0): [ConsumerRecord(topic='kafkatest', partition=0, offset=22064, timestamp=1545978882893, timestamp_type=0, key=b'141', value=b'producer1+141', checksum=-1803506349, serialized_key_size=3, serialized_value_size=13)], TopicPartition(topic='kafkatest', partition=2): [ConsumerRecord(topic='kafkatest', partition=2, offset=21930, timestamp=1545978880892, timestamp_type=0, key=b'139', value=b'producer1+139', checksum=-1863433503, serialized_key_size=3, serialized_value_size=13), ConsumerRecord(topic='kafkatest', partition=2, offset=21931, timestamp=1545978881893, timestamp_type=0, key=b'140', value=b'producer1+140', checksum=-280146643, serialized_key_size=3, serialized_value_size=13)]}
{TopicPartition(topic='kafkatest', partition=2): [ConsumerRecord(topic='kafkatest', partition=2, offset=21932, timestamp=1545978884894, timestamp_type=0, key=b'143', value=b'producer1+143', checksum=1459018748, serialized_key_size=3, serialized_value_size=13)]}
{TopicPartition(topic='kafkatest', partition=1): [ConsumerRecord(topic='kafkatest', partition=1, offset=22046, timestamp=1545978883894, timestamp_type=0, key=b'142', value=b'producer1+142', checksum=-2023137030, serialized_key_size=3, serialized_value_size=13)], TopicPartition(topic='kafkatest', partition=0): [ConsumerRecord(topic='kafkatest', partition=0, offset=22065, timestamp=1545978885894, timestamp_type=0, key=b'144', value=b'producer1+144', checksum=1999922748, serialized_key_size=3, serialized_value_size=13)]}

seek()方法的用法:

 the same partition  this API is arbitrarily used

与seek相关的还有两个方法:

seek_to_beginning(**

subscribe()方法,给当前消费者订阅主题。

Subscribe to a list of topics, or a topic regex pattern.
#订阅一个主体列表,或者主题的正则表达式
Partitions will be dynamically assigned via a group coordinator. Topic subscriptions are not incremental: this list will replace the current assignment (if there is one).
#分区将会通过分区协调器自动分配。主题订阅不是增量的,这个列表将会替换已经存在的主题。

This method is incompatible with assign().    
#这个方法与assign()方法是不兼容的。    

#说明一下listener参数:监听回调,该回调将在每次重新平衡操作之前和之后调用。
作为组管理的一部分,消费者将跟踪属于特定组的使用者列表,并在以下事件之一触发时触发重新平衡操作:

  任何订阅主题的分区数都会发生变化   主题已创建或删除   消费者组织的现有成员死亡   将新成员添加到使用者组 触发任何这些事件时,将首先调用提供的侦听器以指示已撤消使用者的分配,然后在收到新分配时再次调用。请注意,此侦听器将立即覆盖先前对subscribe的调用中设置的任何侦听器。

但是,可以保证通过此接口撤消/分配的分区来自此呼叫中订阅的主题。

>>> consumer.subscription()                       #当前消费者订阅的主题
{'lianxi'}>>> consumer.subscribe(("kafkatest","lianxi"))    #订阅主题,会覆盖之前的主题>>> consumer.subscription()                       #可以看到已经覆盖
{'lianxi', 'kafkatest'}

unsubscribe() :取消订阅所有主题并清除所有已分配的分区。

assign(partitions):

Manually assign a list of TopicPartitions to this consumer.
#手动将TopicPartitions指定给此消费者。
#这个函数和subscribe函数不能同时使用
>>> consumer.assign(TopicPartition("kafkatest",1))

assignment():

Get the TopicPartitions currently assigned to this consumer.
如果分区是使用assign()直接分配的,那么这将只返回先前分配的相同分区。如果使用subscribe()订阅了主题,那么这将给出当前分配给使用者的主题分区集(如果分配尚未发生,
或者分区正在重新分配的过程中,则可能是None)

beginning_offsets(partitions)

Get the first offset  the partition does not exist.  #这个方法可能会阻塞,如果给定的分区没有出现。

partitions参数仍然是TopicPartition类型。
>>> consumer.beginning_offsets(TopicPartition("kafkatest",1))
#这个方法在kafka-python-1.3.1中没有

close(autocommit=True)

Close the consumer, waiting indefinitely ) – If auto-commit is configured  this consumer, this optional flag causes the consumer to attempt to commit any 
      pending consumed offsets prior to close. Default: True
      #如果为此使用者配置了自动提交,则此可选标志会导致使用者在关闭之前尝试提交任何待处理的消耗偏移量。默认值:True

commit(offsets=None)

Commit offsets to kafka, blocking +

commit_async(offsets=None, callback=None)

Commit offsets to kafka asynchronously, optionally firing callback.
#异步提交,可选择的触发回调,其余的和上面的commit一样。

committed(partition)

Get the  committed offset  the consumer

pase, pased和resume

pase:暂停当前正在进行的请求。需要使用resume恢复
pased:获取使用pase暂停时的分区信息
resume: 从pase状态恢复。
除了pased之外,其余两个方法的参数均为TopicPartation类型

kafka-python除了有消费者和生成者之外,还有一个客户端,下面我们来说明客户端API。

客户端API

客户端API的官方文档为: https://kafka-python.readthedocs.io/en/master/apidoc/KafkaClient.html

简单说明怎么使用客户端API创建主题。

>>>>>> kc = KafkaClient(bootstrap_servers=>>>: , : , : , : , : , : None, : None, : [(, , )], : , : , : , : None, : True, : None, : None, : None, : None, : None, : (, ), : , :<class >, : None, : , : None, : None, : None}
#这些参数的具体意思可以查看上面的官方文档。

>>> kc.add_topic("clent-1")           #添加主题

kafka-python还提供了其余两个API,broker连接API和集群连接API

 



这篇关于kafka-python的API简单介绍的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程