帝国网站开发,线上推广员是做什么的,wordpress4.7.4源码,网站导航栏垂直kafka的基础概念 Producer (消息生产者) 向主题发布消息的客户端应用程序称为生产者(Producer)#xff0c;生产者用于持续不断的向某个主题发送消息。 Consumer (消息消费者) 订阅主题消息的客户端程序称为消费者(Consumer)#xff0c;消费者用于处理生产者产生的消息。 Co…kafka的基础概念 Producer (消息生产者) 向主题发布消息的客户端应用程序称为生产者(Producer)生产者用于持续不断的向某个主题发送消息。 Consumer (消息消费者) 订阅主题消息的客户端程序称为消费者(Consumer)消费者用于处理生产者产生的消息。 Consumer Group (消费者组)
每个消费者属于一个特定的消费者群组(可为每个消费者指定group name若不指定group name则属于默认的group)。
每个消费者群组都有一个唯一的GroupId。
Brokers(kafka服务器)
一个独立的 Kafka 服务器就被称为 brokerbroker 接收来自生产者的消息为消息设置偏移量并提交消息到磁盘保存。
一个broker可以容纳多个topic。每个broker都有各自的broker.id。
Topics(主题)
消息的种类称为 主题(Topic),可以说一个主题代表了一类消息。相当于是对消息进行分类。
Partition(分区)
主题(Topic)可以被分为若干个分区(partition)同一个主题中的分区可以不在一个机器上有可能会部署在多个机器上
由此来实现 kafka 的伸缩性单一主题中的分区有序但是无法保证主题中所有的分区有序。
安装kafka创建 topic
Windows安装kafka, 详情见https://blog.csdn.net/sinat_32502451/article/details/133067851
Linux 安装kafka详情见https://blog.csdn.net/sinat_32502451/article/details/133080353
添加依赖包 dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactIdversion2.1.10.RELEASE/version/dependencydependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion2.0.0/version/dependencykafka生产者示例
按以下步骤发送消息
设置 broker服务器的ip和端口生产者初始化发送消息
public class KafkaDemoProducer {public static final String BROKER_LIST localhost:9092;public static final String TOPIC myTopic1;public static void main(String[] args) {//属性配置Properties properties getProperties(BROKER_LIST);//生产者初始化KafkaProducerString, String producer new KafkaProducer(properties);ProducerRecordString, String record new ProducerRecord(TOPIC, hello kafka);//发送消息try {producer.send(record);} catch (Exception e) {System.out.println(send error. e);}producer.close();}private static Properties getProperties(String brokerList) {Properties properties new Properties();properties.put(key.serializer,org.apache.kafka.common.serialization.StringSerializer);properties.put(value.serializer,org.apache.kafka.common.serialization.StringSerializer);properties.put(bootstrap.servers, brokerList);return properties;}}kafka消费者示例
主要按照以下步骤 设置 broker服务器的ip和端口 设置 消费者群组id 初始化消费者 消费者订阅主题 消费者批量拉取消息
public class KafkaDemoConsumer {public static final String BROKER_LIST localhost:9092;public static final String TOPIC myTopic1;public static final String GROUP_ID group.demo;public static void main(String[] args) {consumerRecord();}public static void consumerRecord() {//属性配置Properties properties getProperties(BROKER_LIST, GROUP_ID);//消费者初始化KafkaConsumerString, String consumer new KafkaConsumer(properties);//消息者订阅主题consumer.subscribe(Collections.singletonList(TOPIC));//循环while (true) {//每次拉取 1千条消息ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000));for (ConsumerRecordString, String record : records) {System.out.println( 消费kafka消息 record.value());}}}public static Properties getProperties(String brokerList, String groupId) {Properties properties new Properties();//序列化properties.put(key.deserializer,org.apache.kafka.common.serialization.StringDeserializer);properties.put(value.deserializer,org.apache.kafka.common.serialization.StringDeserializer);//broker服务器的ip和端口多个用逗号隔开properties.put(bootstrap.servers, brokerList);//消费者群组idproperties.put(group.id, groupId);return properties;}}
观察idea 控制台可以看到 成功消费了消息 消费kafka消息hello kafka参考资料
《深入理解kafka 核心设计与实践原理》 kafka的简单理解