湛江网站制作网站,女和女做网站,深圳大型商城网站建设,农夫山泉软文300字zookeeper搭建#xff0c;可以搭建集群#xff0c;也可以单机#xff08;本地学习#xff0c;没必要搭建zookeeper集群#xff0c;单机完全够用了#xff0c;主要学习的是kafka#xff09;
1. 首先官网下载zookeeper#xff1a;Apache ZooKeeper
2. 下载好之后上传到…zookeeper搭建可以搭建集群也可以单机本地学习没必要搭建zookeeper集群单机完全够用了主要学习的是kafka
1. 首先官网下载zookeeperApache ZooKeeper
2. 下载好之后上传到centOS or 其他虚拟机
3. 解压
4. 到zookeeper的config目录下copy 一份zoo_sample.cfg并改名为zoo.cfg
5. 配置环境变量这一步不是必须如果想在任何目录下可以执行zookeeper的命令可以执行此选项如果不执行此选项每次执行zookeeper命令需要到zookeeper的bin目录下去执行 - 进入/etc 目录 cd /etc - 修改 profile 文件 vim profile, 修改后的结果如下 改好后wq! 保存退出这样就可以启动zookeeper了zkServer.sh start
kafka 集群搭建
1. 下载kafkaApache Kafka
2. 上传centOS解压
3. 配置kafka环境变量如上图所示
4. 进入config目录下copy 2份 server.properties重命名为 server.properties1 和 server.properties2这一步的目的是在一台虚拟机上模拟3个kafka只是在配置文件里做区分每个 server.properties 需要配置以下内容分别如下
server.properties: server.properties1: server.properties2: 5. 分别启动这三个配置文件
进入config目录下执行以下命令
kafka-server-start.sh -daemon server.properties
kafka-server-start.sh -daemon server.properties1
kafka-server-start.sh -daemon server.properties2 **** -daemon 是后台运行
不断的刷新 JPS 命令查看kafka启动情况 可以看到3台kafka都已经启动检查kafka在zookeeper里的情况
在任何目录下执行命令进入到zk 客户端
zkCli.sh 再执行以下命令查看zookeeper下所有的文件夹
ls / 再执行以下命令可以查看kafka broker id情况
ls /brokers/ids 可以看到3台kafka都已经启动了
java 代码测试
java代码
package com.tech;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class SimpleDemo {static class MyProducer{// 这里是centOS里的ipstatic final String BOOTSTRAP_SERVERS 192.168.116.128:9092,192.168.116.128:9093,192.168.116.128:9094;public static void main(String[] args) {Properties properties new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);properties.put(ProducerConfig.CLIENT_ID_CONFIG,MY-CLIENT);KafkaProducerString,String producer new KafkaProducer(properties);ProducerRecordString,String record new ProducerRecord(test,hello xma);producer.send(record);}}static class MyConsumer{static final String BOOTSTRAP_SERVERS 192.168.116.128:9092,192.168.116.128:9093,192.168.116.128:9094;public static void main(String[] args) {Properties properties new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS);properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);properties.put(ConsumerConfig.GROUP_ID_CONFIG,TEST-GROUP);KafkaConsumerString,String consumer new KafkaConsumerString, String(properties);consumer.subscribe(Collections.singletonList(test));while (true) {ConsumerRecordsString, String records consumer.poll(Duration.ofSeconds(3));for (ConsumerRecordString, String record : records) {System.out.println(record.key():record.value());}}}}
}producer也可以在centOS里启动
kafka-console-producer.sh --broker-list 192.168.116.128:9092,192.168.116.128:9093,192.168.116.128:9094 --topic test
***** 注意如果consumer消费消息的时候出现如下错误需要配置centOS里的your.host.name信息
java.net.UnknownHostException:XXX
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.116.128 your.host.name
配置好之后需重启虚拟机