网站的静态页面用什么做,营销型网站建设服务商,elementor wordpress,wordpress5.0.2运行慢1、Kafka使用背景在我们大量使用分布式数据库、分布式计算集群的时候#xff0c;是否会遇到这样的一些问题#xff1a;我们想分析下用户行为#xff08;pageviews#xff09;#xff0c;以便我们设计出更好的广告位我想对用户的搜索关键词进行统计#xff0c;分析出当前的… 1、Kafka使用背景在我们大量使用分布式数据库、分布式计算集群的时候是否会遇到这样的一些问题我们想分析下用户行为pageviews以便我们设计出更好的广告位我想对用户的搜索关键词进行统计分析出当前的流行趋势有些数据存储数据库浪费直接存储硬盘效率又低 这些场景都有一个共同点数据是由上游模块产生上游模块使用上游模块的数据计算、统计、分析这个时候就可以使用消息系统尤其是分布式消息系统2、Kafka的定义What is Kafka它是一个分布式消息系统由linkedin使用scala编写用作LinkedIn的活动流Activity Stream和运营数据处理管道Pipeline的基础。具有高水平扩展和高吞吐量。3、Kafka和其他主流分布式消息系统的对比 定义解释1、Java 和 scala都是运行在JVM上的语言。2、erlang和最近比较火的和go语言一样是从代码级别就支持高并发的一种语言所以RabbitMQ天生就有很高的并发性能但是 有RabbitMQ严格按照AMQP进行实现受到了很多限制。kafka的设计目标是高吞吐量所以kafka自己设计了一套高性能但是不通用的协议他也是仿照AMQP Advanced Message Queuing Protocol 高级消息队列协议设计的。 3、事务的概念在数据库中多个操作一起提交要么操作全部成功要么全部失败。举个例子 在转账的时候付款和收款就是一个事物的例子你给一个人转账你转成功并且对方正常行收到款项后这个操作才算成功有一方失败那么这个操作就是失败的。 对应消在息队列中就是多条消息一起发送要么全部成功要么全部失败。3个中只有ActiveMQ支持这个是因为RabbitMQ和Kafka为了更高的性能而放弃了对事务的支持 。4、集群多台服务器组成的整体叫做集群这个整体对生产者和消费者来说是透明的。其实对消费系统组成的集群添加一台服务器减少一台服务器对生产者和消费者都是无感之的。5、负载均衡对消息系统来说负载均衡是大量的生产者和消费者向消息系统发出请求消息系统必须均衡这些请求使得每一台服务器的请求达到平衡而不是大量的请求落到某一台或几台使得这几台服务器高负荷或超负荷工作严重情况下会停止服务或宕机。6、动态扩容是很多公司要求的技术之一不支持动态扩容就意味着停止服务这对很多公司来说是不可以接受的。 注阿里巴巴的Metal,RocketMQ都有Kafka的影子他们要么改造了Kafka或者借鉴了Kafka最后Kafka的动态扩容是通过Zookeeper来实现的。 Zookeeper是一种在分布式系统中被广泛用来作为分布式状态管理、分布式协调管理、分布式配置管理、和分布式锁服务的集群。kafka增加和减少服务器都会在Zookeeper节点上触发相应的事件kafka系统会捕获这些事件进行新一轮的负载均衡客户端也会捕获这些事件来进行新一轮的处理。二、Kafka相关概念1、 AMQP协议Advanced Message Queuing Protocol 高级消息队列协议The Advanced Message Queuing Protocol (AMQP)是一个标准开放的应用层的消息中间件Message Oriented Middleware协议。AMQP定义了通过网络发送的字节流的数据格式。因此兼容性非常好任何实现AMQP协议的程序都可以和与AMQP协议兼容的其他程序交互可以很容易做到跨语言跨平台。上面说的3种比较流行的消息队列协议要么支持AMQP协议要么借鉴了AMQP协议的思想进行了开发、实现、设计。2、 一些基本的概念1、消费者Consumer从消息队列中请求消息的客户端应用程序2、生产者Producer 向broker发布消息的应用程序3、AMQP服务端broker用来接收生产者发送的消息并将这些消息路由给服务器中的队列便于fafka将生产者发送的消息动态的添加到磁盘并给每一条消息一个偏移量所以对于kafka一个broker就是一个应用程序的实例kafka支持的客户端语言Kafka客户端支持当前大部分主流语言包括C、C、Erlang、Java、.net、perl、PHP、Python、Ruby、Go、Javascript可以使用以上任何一种语言和kafka服务器进行通信即辨析自己的consumer从kafka集群订阅消息也可以自己写producer程序 3、Kafka架构生产者生产消息、kafka集群、消费者获取消息这样一种架构如下图kafka集群中的消息是通过Topic主题来进行组织的如下图一些基本的概念1、主题Topic一个主题类似新闻中的体育、娱乐、教育等分类概念在实际工程中通常一个业务一个主题。2、分区Partition一个Topic中的消息数据按照多个分区组织分区是kafka消息队列组织的最小单位一个分区可以看作是一个FIFO First Input First Output的缩写先入先出队列的队列。kafka分区是提高kafka性能的关键所在当你发现你的集群性能不高时常用手段就是增加Topic的分区分区里面的消息是按照从新到老的顺序进行组织消费者从队列头订阅消息生产者从队列尾添加消息。工作图备份Replication为了保证分布式可靠性kafka0.8开始对每个分区的数据进行备份不同的Broker上防止其中一个Broker宕机造成分区上的数据不可用。kafka0.7是一个很大的改变1、增加了备份2、增加了控制借点概念增加了集群领导者选举 。三、Zookeeper(动物园)集群搭建kafka集群是把状态保存在Zookeeper中的首先要搭建Zookeeper集群。1、软件环境(3台服务器-测试,一般是奇数台服务器)vim /etc/hosts(3台服务器都要写)192.168.11.128 server1192.168.11.129 server2192.168.11.130 server31、Linux服务器一台、三台、五台、2*n1Zookeeper集群的工作是超过半数才能对外提供服务3台中超过两台超过半数允许1台挂掉 是否可以用偶数其实没必要。如果有四台那么挂掉一台还剩下三台服务器如果在挂掉一个就不行了这里记住是超过半数。2、Java jdk1.8 zookeeper是用java写的所以他的需要JAVA环境java是运行在java虚拟机上的3、Zookeeper的稳定版本Zookeeper 3.4.14版本-------------------------------------------------------------------------------------------------------------------------------------------------2、配置安装zookeeper下面的操作是: 3台服务器统一操作1、安装java我这里采用jdk安装先准备jdk的包解压带/usr/local下创建软链接写java环境变量vim /etc/profile.d/aa.sh加载环境变量source /etc/profile(加载全部环境变量) 或者 source /etc/profile.d/aa.sh(加载这一个环境变量)查看java有没有安装成功如上图所示表示java环境已经部署成功------------------------------------------------------------------------------------------------------------------------用yum安装javayum list java* yum -y install java------------------------------------------------------------------------------------------------------------------------2、下载Zookeeper3台服务器统一操作首先要注意在生产环境中目录结构要定义好防止在项目过多的时候找不到所需的项目#目录统一放在/opt下面#首先创建Zookeeper项目目录cd /optmkdir zookeeper //项目目录mkdir zookeeper/zkdata //存放快照日志mkdir zookeeper/zkdatalog //存放事物日志下载Zookeeper#下载软件cd /opt/zookeeper/wget http://mirrors.cnnic.cn/apache/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz解压软件tar xf zookeeper-3.4.14.tar.gz3、修改配置文件进入到解压好的目录里面的conf目录中查看#zoo_sample.cfg 这个文件是官方给我们的zookeeper的样板文件给他复制一份命名为zoo.cfgzoo.cfg是官方指定的文件命名规则。3台服务器的配置文件vim zoo.cfg上面定义了dataDir和clientPort这2行就要注释否则后面起集群会报错。#server.1 这个1是服务器的标识也可以是其他的数字 表示这个是第几号服务器用来标识服务器这个标识要写到快照目录下面myid文件里#192.168.11.139为集群里的IP地址第一个端口是master和slave之间的通信端口默认是2888第二个端口是leader选举的端口集群刚启动的时候选举或者leader挂掉之后进行新的选举的端口默认是3888配置文件解释#tickTime这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔也就是每个 tickTime 时间就会发送一个心跳。#initLimit这个配置项是用来配置 Zookeeper 接受客户端这里所说的客户端不是用户连接 Zookeeper 服务器的客户端而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 5个心跳的时间也就是 tickTime长度后 Zookeeper 服务器还没有收到客户端的返回信息那么表明这个客户端连接失败。总的时间长度就是 5*200010 秒#syncLimit这个配置项标识 Leader 与Follower 之间发送消息请求和应答时间长度最长不能超过多少个 tickTime 的时间长度总的时间长度就是5*200010秒#dataDir快照日志的存储路径#dataLogDir事物日志的存储路径如果不配置这个那么事物日志会默认存储到dataDir制定的目录这样会严重影响zk的性能当zk吞吐量较大的时候产生的事物日志、快照日志太多#clientPort这个端口就是客户端连接 Zookeeper 服务器的端口Zookeeper 会监听这个端口接受客户端的访问请求。修改他的端口改大点创建myid文件每一台的都不一样#server1echo 1 /opt/zookeeper/zkdata/myid#server2echo 2 /opt/zookeeper/zkdata/myid#server3echo 3 /opt/zookeeper/zkdata/myid4、重要配置说明1、myid文件和server.myid 在快照目录下存放的标识本台服务器的文件他是整个zk集群用来发现彼此的一个重要标识。2、zoo.cfg 文件是zookeeper配置文件 在conf目录里。3、log4j.properties文件是zk的日志输出文件 在conf目录里用java写的程序基本上有个共同点日志都用log4j来进行管理。4、zkEnv.sh和zkServer.sh文件zkServer.sh 主的管理程序文件zkEnv.sh 是主要配置zookeeper集群启动时配置环境变量的文件5、还有一个需要注意ZooKeeper server will not remove old snapshots and log files when using the default configuration (see autopurge below), this is the responsibility of the operatorzookeeper不会主动的清除旧的快照和日志文件这个是操作者的责任。但是可以通过命令去定期的清理#以上这个脚本定义了删除对应两个目录中的文件保留最新的66个文件可以将他写到crontab中设置为每天凌晨2点执行一次就可以了。5、启动服务并查看1、启动服务#进去到Zookeeper的bin目录下cd /opt/zookeeper/zookeeper-3.4.14/bin#启动服务(3台都需要操作)2、检查服务状态 #检查服务器状态会有一个leader和2个followerzk集群一般只有一个leader多个follower主一般是相应客户端的读写请求而从主同步数据当主挂掉之后就会从follower里投票选举一个leader出来。可以用jps查看zk的进程这是整个工程的main#执行命令jps四kafka集群搭建1软件环境1、linux一台或多台大于等于22、已经搭建好的zookeeper集群3、软件版本kafka_2.11-0.9.0.1.tgz2、创建目录并下载安装软件3台服务器一起操作#创建目录cd /optmkdir kafka #创建项目目录cd kafkamkdir kafkalogs #创建kafka消息目录主要存放kafka消息#下载软件wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.2.0/kafka_2.11-2.2.0.tgz#解压软件tar -zxvf kafka_2.11-2.2.0.tgz3修改配置文件进入到config目录cd /opt/kafka/kafka_2.11-2.2.0/config/主要关注server.properties 这个文件即可我们可以发现在目录下有很多文件这里可以发现有Zookeeper文件我们可以根据Kafka内带的zk集群来启动但是建议使用独立的zk集群修改配置文件broker.id0 #当前机器在集群中的唯一标识和zookeeper的myid性质一样port19092 #当前kafka对外提供服务的端口默认是9092host.name192.168.7.100 #这个参数默认是关闭的在0.8.1有个bugDNS解析问题失败率的问题。num.network.threads3 #这个是borker进行网络处理的线程数num.io.threads8 #这个是borker进行I/O处理的线程数log.dirs/opt/kafka/kafkalogs/ #消息存放的目录这个目录可以配置为“”逗号分割的表达式上面的num.io.threads要大于这个目录的个数这个目录如果配置多个目录新创建的topic他把消息持久化的地方是当前以逗号分割的目录中那个分区数最少就放那一个socket.send.buffer.bytes102400 #发送缓冲区buffer大小数据不是一下子就发送的先回存储到缓冲区了到达一定的大小后在发送能提高性能socket.receive.buffer.bytes102400 #kafka接收缓冲区大小当数据到达一定大小后在序列化到磁盘socket.request.max.bytes104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数这个值不能超过java的堆栈大小num.partitions1 #默认的分区数一个topic默认1个分区数log.retention.hours168 #默认消息的最大持久化时间168小时7天message.max.byte5242880 #消息保存的最大值5Mdefault.replication.factor2 #kafka保存消息的副本数如果一个副本失效了另一个还可以继续提供服务replica.fetch.max.bytes5242880 #取消息的最大直接数log.segment.bytes1073741824 #这个参数是因为kafka的消息是以追加的形式落地到文件当超过这个值的时候kafka会新起一个文件log.retention.check.interval.ms300000 #每隔300000毫秒去检查上面配置的log失效时间log.retention.hours168 到目录查看是否有过期的消息如果有删除log.cleaner.enablefalse #是否启用log压缩一般不用启用启用的话可以提高性能zookeeper.connect192.168.11.139:12181,192.168.11.140:12181,192.168.11.141:1218 #设置zookeeper的连接端口上面是参数的解释实际的修改项为配置文件修改结束4,启动kafka集群并测试启动服务2.检查服务是否启动#执行命令jps3. 创建Topic验证是否创建成功更多请看官方文档http://kafka.apache.org/documentation.html#创建Topic话题#解释--replication-factor 2 #复制两份--partitions 1 #创建1个分区--topic #主题为meinv在一台服务器上创建一个发布者#创建一个broker发布者在一台服务器上创建一个订阅者到此服务搭建结束5、其他说明标注5.1、日志说明默认kafka的日志是保存在/opt/kafka/kafka_2.11-2.2.0/logs目录下的这里说几个需要注意的日志server.log #kafka的运行日志state-change.log #kafka他是用zookeeper来保存状态所以他可能会进行切换切换的日志就保存在这里controller.log #kafka选择一个节点作为“controller”,当发现有节点down掉的时候它负责在游泳分区的所有节点中选择新的leader,这使得Kafka可以批量的高效的管理所有分区节点的主从关系。如果controller down掉了活着的节点中的一个会备切换为新的controller.5.2、上面的大家你完成之后可以登录zk来查看zk的目录情况#使用客户端进入zkcd/opt/zookeeper/zookeeper-3.4.14/bin./zkCli.sh -server 192.168.11.139:12181 #默认是不用加’-server‘参数的因为我们修改了他的端口#查看目录情况 执行“ls /”[zk: 127.0.0.1:12181(CONNECTED) 0] ls /#显示结果[consumers, config, controller, isr_change_notification, admin, brokers, zookeeper, controller_epoch]上面的显示结果中只有zookeeper是zookeeper原生的其他都是Kafka创建的#标注一个重要的[zk: 127.0.0.1:12181(CONNECTED) 1] get /brokers/ids/1{jmx_port:-1,timestamp:1456125963355,endpoints:[PLAINTEXT://192.168.7.100:19092],host:192.168.7.100,version:2,port:19092}cZxid 0x1000001c1ctime Mon Feb 22 15:26:03 CST 2016mZxid 0x1000001c1mtime Mon Feb 22 15:26:03 CST 2016pZxid 0x1000001c1cversion 0dataVersion 0aclVersion 0ephemeralOwner 0x152e40aead20016dataLength 139numChildren 0[zk: 127.0.0.1:12181(CONNECTED) 2] #还有一个是查看partion[zk: 127.0.0.1:12181(CONNECTED) 7] get /brokers/topics/shuaige/partitions/0nullcZxid 0x100000029ctime Mon Feb 22 10:05:11 CST 2016mZxid 0x100000029mtime Mon Feb 22 10:05:11 CST 2016pZxid 0x10000002acversion 1dataVersion 0aclVersion 0ephemeralOwner 0x0dataLength 0numChildren 1[zk: 127.0.0.1:12181(CONNECTED) 8] 转载于:https://blog.51cto.com/14181888/2394008