网站正在建设中中文,免费下载素材网址,请人做阿里巴巴网站需要注意,做视频直播的网站目录 1.服务器用docker安装kafka
2.springboot集成kafka实现生产者和消费者 1.服务器用docker安装kafka ①、安装docker#xff08;docker类似于linux的软件商店#xff0c;下载所有应用都能从docker去下载#xff09; a、自动安装 curl -fsSL https://get.docker.com | b…目录 1.服务器用docker安装kafka
2.springboot集成kafka实现生产者和消费者 1.服务器用docker安装kafka ①、安装dockerdocker类似于linux的软件商店下载所有应用都能从docker去下载 a、自动安装 curl -fsSL https://get.docker.com | bash -s docker --mirror Aliyun b、启动docker sudo systemctl start docker c、 通过运行hello-world镜像来验证是否正确安装了Docker Engine-Community。 // 拉取镜像 sudo docker pull hello-world // 执行 hello-world sudo docker run hello-world d、安装成功 ②、zookeeper a、docker search zookeeper b、docker pull zookeeper ③、安装kafka a、docker search kafka b、docker pull wurstmeister/kafka ④、运行zookeeper a、docker run -d --restartalways --log-driver json-file --log-opt max-size100m --log-opt max-file2 --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime zookeeper ⑤、运行kafka a、 docker run -d --restartalways --log-driver json-file --log-opt max-size100m --log-opt max-file2 --name kafka -p 9092:9092 -e KAFKA_BROKER_ID0 -e KAFKA_ZOOKEEPER_CONNECT42.194.238.131:2181/kafka -e KAFKA_ADVERTISED_LISTENERSPLAINTEXT://42.194.238.131:9092 -e KAFKA_LISTENERSPLAINTEXT://0.0.0.0:9092 -v /etc/localtime:/etc/localtime wurstmeister/kafka b、参数说明 参数说明 -e KAFKA_BROKER_ID0 在kafka集群中每个kafka都有一个BROKER_ID来区分自己 -e KAFKA_ZOOKEEPER_CONNECT172.21.10.10:2181/kafka 配置zookeeper管理kafka的路径172.21.10.10:2181/kafka -e KAFKA_ADVERTISED_LISTENERSPLAINTEXT://172.21.10.10:9092 把kafka的地址端口注册给zookeeper如果是远程访问要改成外网IP,类如Java程序访问出现无法连接。 -e KAFKA_LISTENERSPLAINTEXT://0.0.0.0:9092 配置kafka的监听端口 -v /etc/localtime:/etc/localtime 容器时间同步虚拟机的时间 ⑥、检验kafka是否可以使用
docker exec -it kafka bash
cd /opt/kafka_2.13-2.8.1/
cd bin a、运行kafka生产者并发送消息 ./kafka-console-producer.sh --broker-list localhost:9092 --topic test b、在开一个页面运行kafka消费者发送消息 ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning ⑦、结果是这个样子的 ⑧、每条消息都有一个主题消费者指定监听哪个主题的消息如果进来消息队列的是我们指定监听的主题就消费否则不消费topic这里指定的生产和消费的主题 ⑨、消费者宕掉了生产者接着发消息不会丢消费者重启之后会重新接收到宕机之后发的所有消息
2.springboot集成kafka实现生产者和消费者 ①、在pom中创建依赖 dependency groupIdorg.springframework.kafka/groupId artifactIdspring-kafka/artifactId version2.7.8/version /dependency ②、配置kafka a、在 application.yml 文件中添加以下配置注yml中两个相同名字的会报错,比如两个spring spring: kafka: #自己的kafka所在的ip地址和端口号 bootstrap-servers: localhost:9092 consumer: #一个group-id代表一个消费组一个消息可以被几个消费组消费 group-id: my-group auto-offset-reset: earliest producer: #序列化 value-serializer: org.apache.kafka.common.serialization.StringSerializer key-serializer: org.apache.kafka.common.serialization.StringSerializer b、创建一个生产者 Configuration
public class KafkaProducerConfig {Value(${spring.kafka.bootstrap-servers})private String bootstrapServers;Beanpublic MapString, Object producerConfigs() {MapString, Object props new HashMap();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}Beanpublic ProducerFactoryString, String producerFactory() {return new DefaultKafkaProducerFactory(producerConfigs());}Beanpublic KafkaTemplateString, String kafkaTemplate() {return new KafkaTemplate(producerFactory());}}sendMessage 方法用于发送消息到 Kafka。 RestController
public class KafkaController {Autowiredprivate KafkaTemplateString, String kafkaTemplate;PostMapping(/send)public void sendMessage(RequestBody String message) {kafkaTemplate.send(my-topic, message);}} c、 创建一个消费者 Configuration
EnableKafka
public class KafkaConsumerConfig {Value(${spring.kafka.bootstrap-servers})private String bootstrapServers;Value(${spring.kafka.consumer.group-id})private String groupId;Beanpublic MapString, Object consumerConfigs() {MapString, Object props new HashMap();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, earliest);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return props;}Beanpublic ConsumerFactoryString, String consumerFactory() {return new DefaultKafkaConsumerFactory(consumerConfigs());}Beanpublic ConcurrentKafkaListenerContainerFactoryString, String kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactoryString, String factory new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(consumerFactory());return factory;}} KafkaListener 注解声明了一个消费者方法用于接收从
my-topic 主题中读取的消息 Service
public class KafkaConsumer {KafkaListener(topics my-topic, groupId my-group-id)public void consume(String message) {System.out.println(Received message: message);}}