网站建设做什么费用,营销比较好的知名公司有哪些,jsp网站搭建,动画制作公司排名1. Kafka 消费者的逻辑
配置消费者客户端参数。创建相应的消费者实例。订阅主题。拉取消息并消费#xff1b;提交消息位移#xff1b;关闭消费者实例#xff1b;
2 Kafka 的C API
2.1 RdKafka::Conf
见生成者实现文章。
2.2 RdKafka::Event
见生成者实现文章。
2.3 R…1. Kafka 消费者的逻辑
配置消费者客户端参数。创建相应的消费者实例。订阅主题。拉取消息并消费提交消息位移关闭消费者实例
2 Kafka 的C API
2.1 RdKafka::Conf
见生成者实现文章。
2.2 RdKafka::Event
见生成者实现文章。
2.3 RdKafka::EventCb
见生成者实现文章。
2.4 RdKafka::TopicPartition
static TopicPartition * create(const std::string topic, int partition);
//创建一个TopicPartition对象。static TopicPartition *create (const std::string topic, int partition,int64_t offset);
//创建TopicPartition对象。static void destroy (std::vectorTopicPartition* partitions);
//销毁所有TopicPartition对象。const std::string topic () const;
//返回Topic名称。int partition ();
//返回分区号。int64_t offset();
//返回位移。void set_offset(int64_t offset);
//设置位移。ErrorCode err();
//返回错误码。2.5 RdKafka::RebalanceCb
virtual void rebalance_cb(RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err, std::vector TopicPartition * partitions)0;用于RdKafka::KafkaConsunmer的组再平衡回调函数注册rebalance_cb回调函数会关闭rdkafka的自动分区赋值和再分配并替换应用程序的rebalance_cb回调函数 再平衡回调函数负责对基于RdKafka::ERR_ASSIGN_PARTITIONS和RdKafka::ERR_REVOKE_PARTITIONS事件更新rdkafka的分区分配也能处理任意前两者错误除外其它再平衡失败错误。对于RdKafka::ERR_ASSIGN_PARTITIONS和RdKafka::ERR_REVOKE_PARTITIONS事件之外的其它再平衡失败错误必须调用unassign()同步状态。 没有再平衡回调函数rdkafka也能自动完成再平衡过程但注册一个再平衡回调函数可以使应用程序在执行其它操作时拥有更大的灵活性例如从指定位置获取位移或手动提交位移。 C封装API
class ConsumerRebalanceCb : public RdKafka::RebalanceCb
{
private:static void printTopicPartition (const std::vectorRdKafka::TopicPartition*partitions) // 打印当前获取的分区{for (unsigned int i 0 ; i partitions.size() ; i)std::cerr partitions[i]-topic() [ partitions[i]-partition() ], ;std::cerr \n;}public:void rebalance_cb (RdKafka::KafkaConsumer *consumer,RdKafka::ErrorCode err,std::vectorRdKafka::TopicPartition* partitions){std::cerr RebalanceCb: RdKafka::err2str(err) : ;printTopicPartition(partitions);if (err RdKafka::ERR__ASSIGN_PARTITIONS){consumer-assign(partitions);partition_count (int)partitions.size();}else{consumer-unassign();partition_count 0;}}
private:int partition_count;
};2.6 RdKafka::Message
见生成者实现文章。
2.7 RdKafka::KafkaConsumer核心
KafkaConsumer是高级API要求Kafka 0.9.0以上版本当前支持range和roundrobin分区分配策略。
static KafkaConsumer * create(Conf *conf, std::string errstr);
创建KafkaConsumer对象conf对象必须配置Consumer要加入的消费者组。使用KafkaConsumer::close()进行关闭。ErrorCode assignment(std::vector RdKafka::TopicPartition * partitions);
返回由RdKafka::KafkaConsumer::assign() 设置的当前分区。ErrorCode subscription(std::vector std::string topics);
返回由RdKafka::KafkaConsumer::subscribe() 设置的当前订阅Topic。ErrorCode subscribe(const std::vector std::string topics);
更新订阅Topic分区。ErrorCode unsubscribe();
将当前订阅Topic取消订阅分区。ErrorCode assign(const std::vector TopicPartition * partitions);
将分配分区更新为partitions。ErrorCode unassign();
停止消费并删除当前分配的分区。Message * consume(int timeout_ms);
消费消息或获取错误事件触发回调函数会自动调用注册的回调函数包括RebalanceCb、EventCb、OffsetCommitCb等。需要使用delete释放消息。应用程序必须确保consume在指定时间间隔内调用为了执行等待调用的回调函数即使没有消息。当RebalanceCb被注册时在需要调用和适当处理内部Consumer同步状态时确保consume在指定时间间隔内调用极为重要。应用程序必须禁止对KafkaConsumer对象调用poll函数。
如果RdKafka::Message::err()是ERR_NO_ERROR则返回正常的消息如果RdKafka::Message::err()是ERR_NO_ERRO返回错误事件如果RdKafka::Message::err()是ERR_TIMED_OUT则超时。ErrorCode commitSync();
提交当前分配分区的位移同步操作会阻塞直到位移被提交或提交失败。如果注册了RdKafka::OffsetCommitCb回调函数其会在KafkaConsumer::consume()函数内调用并提交位移。ErrorCode commitAsync();
异步提交位移。ErrorCode commitSync(Message *message);
基于消息对单个topicpartition对象同步提交位移。virtual ErrorCode commitSync (std::vectorTopicPartition* offsets) 0;
对指定多个TopicPartition同步提交位移。ErrorCode commitAsync(Message *message);
基于消息对单个TopicPartition异步提交位移。virtual ErrorCode commitAsync (const std::vectorTopicPartition* offsets) 0;
对多个TopicPartition异步提交位移。ErrorCode close();
正常关闭会阻塞直到四个操作完成触发避免当前分区分配的局部再平衡停止当前赋值消费提交位移离开分组virtual ConsumerGroupMetadata *groupMetadata () 0;
返回本Consumer实例的Consumer Group的元数据。ErrorCode position (std::vectorTopicPartition* partitions)
获取TopicPartition对象中当前位移会别填充TopicPartition对象的offset字段。ErrorCode seek (const TopicPartition partition, int timeout_ms)
定位TopicPartition的Consumer到位移。timeout_ms为0会开始Seek并立即返回timeout_ms非0Seek会等待timeout_ms时间。ErrorCode offsets_store (std::vectorTopicPartition* offsets)
为TopicPartition存储位移位移会在auto.commit.interval.ms时提交或是被手动提交。enable.auto.offset.store属性必须设置为fasle。3 Kafka 消费者客户端开发 3.1 必要的参数配置bootstrap.servers 在创建消费者的时候以下以下三个选项是必选的
bootstrap.servers指定 broker (kafka服务器)的地址清单清单里不需要包含所有的 brokerkafka 地址生产者会从给定的 broker 里查找 broker 的信息。不过建议至少要提供两个 broker 的信息作为容错。group.idconsumer group 是 kafka 提供的可扩展且具有容错性的消费者机制。既然是一个组那么组内必然可以有多个消费者或消费者实例(consumer instance)它们共享一个公共的 ID即group ID。组内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。auto.offset.reset这个参数是针对新的 groupid 中的消费者而言的当有新 groupid 的消费者来