当前位置: 首页 > news >正文

网站建设经营服务合同百度新闻官网

网站建设经营服务合同,百度新闻官网,建设网站个人简介范文,教育培训机构有哪些kafka的基本了解 kafka也是 目前常用的消息中间件,支持同步与异步通信,和rabbitmq一样,工作模式大概相同,并且被spingboot整合的后的都是 中间件Template的实列化客户端类 ,消费者监听注解为KafkaListener,和RabbitListener和很相似,这些消息中间件使用过后,发现大致都是相同的…kafka的基本了解 kafka也是 目前常用的消息中间件,支持同步与异步通信,和rabbitmq一样,工作模式大概相同,并且被spingboot整合的后的都是 中间件Template的实列化客户端类 ,消费者监听注解为KafkaListener,和RabbitListener和很相似,这些消息中间件使用过后,发现大致都是相同的. rabbitmq快速入门学习 对比 一般选择rabbitmq是完全足够的 环境安装 docker拉取镜像 kafka对zookeeper强依赖,毕竟能装载的数据量有这么大 docker pull zookeeper:3.4.14 #启动 这里使用的是host模式,一般说是需要统一docker网络 docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14 #拉取 kafka镜像主义版本依赖问题 docker pull wurstmeister/kafka:2.12-2.3.1 #启动 docker run -d --name kafka \ --env KAFKA_ADVERTISED_HOST_NAME192.168.249.132 \ --env KAFKA_ZOOKEEPER_CONNECT192.168.249.132:2181 \ --env KAFKA_ADVERTISED_LISTENERSPLAINTEXT://192.168.249.132:9092 \ --env KAFKA_LISTENERSPLAINTEXT://0.0.0.0:9092 \ --env KAFKA_HEAP_OPTS-Xmx256M -Xms256M \ --nethost wurstmeister/kafka:2.12-2.3.1和消息队列一样,角色分别是生成者和消费者,生产者发送消息交给kafka服务,其中kafka一般是集群方式,单点为broker kafka没有queue队列一说是将消息存放在topic主题中和然后分发给消费者,这里的消费者是以组为单位,一组里面可以有多个消费者,但是只有一台机子消费者可以消费消息而rabbitmq可以是以队列为单位消费,通过交换机进行分发,如果队列绑定多个消费者,那么可以自己选择轮询或者其他消费机制 在 Kafka 和 RabbitMQ 中消费者组或集群的行为略有不同 Kafka: Kafka Consumer Group: 在 Kafka 中一个消费者组可以有多个消费者。每个分区内的消息只能由消费者组内的一个消费者消费但不同分区的消息可以被不同的消费者处理。这种方式确保了分布式消费者组的横向扩展每个消费者只负责处理特定分区的消息。 RabbitMQ: RabbitMQ Consumer Group: RabbitMQ 中没有严格的消费者组的概念而是通过队列的方式来进行消息的订阅。多个消费者可以订阅同一个队列每个消息只能被其中一个消费者消费。RabbitMQ 不同于 Kafka 的是消息不是在队列之间划分而是通过交换机将消息路由到一个或多个队列。 总结: 在 Kafka 中分区是横向划分消息的单元每个分区只能由一个消费者处理但不同分区可以并行处理。 在 RabbitMQ 中队列是消息的接收单元每个消息只能被一个消费者接收但不同队列的消息是相互独立的。 这里不做过多讨论 基本使用 1.引入依赖 dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactId /dependency2.消息发送 /*** 生产者*/ public class ProducerQuickStart {public static void main(String[] args) {//1.kafka的配置信息Properties properties new Properties();//kafka的连接地址properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,192.168.200.130:9092);//发送失败失败的重试次数properties.put(ProducerConfig.RETRIES_CONFIG,5);//消息key的序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);//消息value的序列化器properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);//2.生产者对象KafkaProducerString,String producer new KafkaProducerString, String(properties);//封装发送的消息 需要指定topic 消息是存放在topic中的ProducerRecordString,String record new ProducerRecordString, String(topic,100001,hello kafka);//3.发送消息producer.send(record);//4.关闭消息通道必须关闭否则消息发送不成功producer.close();}}3.消息消费 其中消息的序列化器和生产者一样,其次消费者单位是组,如果一个消费者组内只有一个可以接收信息,而有多个消费者监听该topic组,那么消息会发给每一个组,默认就是fanout扇出交换机 /*** 消费者*/ public class ConsumerQuickStart {public static void main(String[] args) {//1.添加kafka的配置信息Properties properties new Properties();//kafka的连接地址properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 192.168.200.130:9092);//消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, group2);//消息的反序列化器properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer);//2.消费者对象KafkaConsumerString, String consumer new KafkaConsumerString, String(properties);//3.订阅主题consumer.subscribe(Collections.singletonList(topic));//当前线程一直处于监听状态while (true) {//4.获取消息ConsumerRecordsString, String consumerRecords consumer.poll(Duration.ofMillis(1000));for (ConsumerRecordString, String consumerRecord : consumerRecords) {System.out.println(consumerRecord.key());System.out.println(consumerRecord.value());}}}}集群达到高可用 和rqabbit以及其他消息中间件一样为了数据的安全性和可用性,一般都是以集群方式存在 还有集群的数据备份 数据同步和故障转移 ISRin-sync replica需要同步复制保存的follower 如果leader失效后需要选出新的leader选举的原则如下 第一选举时优先从ISR中选定因为这个列表中follower的数据是与leader同步的 第二如果ISR列表中的follower都不行了就只能从其他follower中选取 极端情况就是所有副本都失效了这时有两种方案 第一等待ISR中的一个活过来选为Leader数据可靠但活过来的时间不确定 第二选择第一个活过来的Replication不一定是ISR中的选为leader以最快速度恢复可用性但数据不一定完整 生产者性质 kafka支持同步发送消息和异步 同步发送 使用send()方法发送它会返回一个Future对象调用get()方法进行等待就可以知道消息是否发送成功 RecordMetadata recordMetadata producer.send(kvProducerRecord).get(); System.out.println(recordMetadata.offset());异步发送 调用send()方法并指定一个回调函数服务器在返回响应时调用函数 //异步消息发送 producer.send(kvProducerRecord, new Callback() {Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e ! null){System.out.println(记录异常信息到日志表中);}System.out.println(recordMetadata.offset());} });如果没有得到kafka服务端的ack确认则会触发回调,可以在生产者配置文件中进行配置 //ack配置 消息确认机制 prop.put(ProducerConfig.ACKS_CONFIG,all);以及失败重试机制,配置后只有重试耗尽才会抛出失败 代码配置代码的配置方式 //ack配置 消息确认机制 prop.put(ProducerConfig.ACKS_CONFIG,all);参数的选择说明 生产者从服务器收到的错误有可能是临时性错误在这种情况下retries参数的值决定了生产者可以重发消息的次数如果达到这个次数生产者会放弃重试返回错误默认情况下生产者会在每次重试之间等待100ms 代码中配置方式 //重试次数 prop.put(ProducerConfig.RETRIES_CONFIG,10);消息压缩 默认情况下 消息发送时不会被压缩。 代码中配置方式 //数据压缩 prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,lz4);压缩算法说明snappy占用较少的 CPU 却能提供较好的性能和相当可观的压缩比 如果看重性能和网络带宽建议采用lz4占用较少的 CPU 压缩和解压缩速度较快压缩比也很客观gzip占用较多的 CPU但会提供更高的压缩比网络带宽有限可以使用这种算法 使用压缩可以降低网络传输开销和存储开销而这往往是向 Kafka 发送消息的瓶颈所在。 消费者 接下来探讨消费者, 消费者- 消费者组Consumer Group 指的就是由一个或多个消费者组成的群体 一个发布在Topic上消息被分发给此消费者组中的一个消费者 所有的消费者都在一个组中那么这就变成了queue模型 所有的消费者都在不同的组中那么就完全变成了发布-订阅模型 (rabitmq的基本工作模式) 7.2)消息有序性 应用场景 即时消息中的单对单聊天和群聊保证发送方消息发送顺序与接收方的顺序一致 充值转账两个渠道在同一个时间进行余额变更短信通知必须要有顺序 topic分区中消息只能由消费者组中的唯一一个消费者处理所以消息肯定是按照先后顺序进行处理的。但是它也仅仅是保证Topic的一个分区顺序处理不能保证跨分区的消息先后处理顺序。 所以如果你想要顺序的处理Topic的所有消息那就只提供一个分区。 7.3)提交和偏移量 kafka不会像其他JMS队列那样需要得到消费者的确认消费者可以使用kafka来追踪消息在分区的位置偏移量 消费者会往一个叫做_consumer_offset的特殊主题发送消息消息里包含了每个分区的偏移量。如果消费者发生崩溃或有新的消费者加入群组就会触发再均衡,没有消费者的确认就只能通过偏移量了 正常的情况 异常 如果消费者2挂掉以后会发生再均衡消费者2负责的分区会被其他消费者进行消费 再均衡后不可避免会出现一些问题 问题一 如果提交偏移量小于客户端处理的最后一个消息的偏移量那么处于两个偏移量之间的消息就会被重复处理。 问题二 如果提交的偏移量大于客户端的最后一个消息的偏移量那么处于两个偏移量之间的消息将会丢失。 如果想要解决这些问题还要知道目前kafka提交偏移量的方式 提交偏移量的方式有两种分别是自动提交偏移量和手动提交 自动提交偏移量 当enable.auto.commit被设置为true提交方式就是让消费者自动提交偏移量每隔5秒消费者会自动把从poll()方法接收的最大偏移量提交上去 手动提交 当enable.auto.commit被设置为false可以有以下三种提交方式 提交当前偏移量同步提交 异步提交 同步和异步组合提交 1.提交当前偏移量同步提交 把enable.auto.commit设置为false,让应用程序决定何时提交偏移量。使用commitSync()提交偏移量commitSync()将会提交poll返回的最新的偏移量所以在处理完所有记录后要确保调用了commitSync()方法。否则还是会有消息丢失的风险。 只要没有发生不可恢复的错误commitSync()方法会一直尝试直至提交成功如果提交失败也可以记录到错误日志里。 while (true){ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000));for (ConsumerRecordString, String record : records) {System.out.println(record.value());System.out.println(record.key());try {consumer.commitSync();//同步提交当前最新的偏移量}catch (CommitFailedException e){System.out.println(记录提交失败的异常e);}} }2.异步提交 手动提交有一个缺点那就是当发起提交调用时应用会阻塞。当然我们可以减少手动提交的频率但这个会增加消息重复的概率和自动提交一样。另外一个解决办法是使用异步提交的API。 while (true){ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000));for (ConsumerRecordString, String record : records) {System.out.println(record.value());System.out.println(record.key());}consumer.commitAsync(new OffsetCommitCallback() {Overridepublic void onComplete(MapTopicPartition, OffsetAndMetadata map, Exception e) {if(e!null){System.out.println(记录错误的提交偏移量 map,异常信息e);}}}); }3.同步和异步组合提交 异步提交也有个缺点那就是如果服务器返回提交失败异步提交不会进行重试。相比较起来同步提交会进行重试直到成功或者最后抛出异常给应用。异步提交没有实现重试是因为如果同时存在多个异步提交进行重试可能会导致位移覆盖。 举个例子假如我们发起了一个异步提交commitA此时的提交位移为2000随后又发起了一个异步提交commitB且位移为3000commitA提交失败但commitB提交成功此时commitA进行重试并成功的话会将实际上将已经提交的位移从3000回滚到2000导致消息重复消费。 try {while (true){ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000));for (ConsumerRecordString, String record : records) {System.out.println(record.value());System.out.println(record.key());}consumer.commitAsync();} }catch (Exception e){e.printStackTrace();System.out.println(记录错误信息e); }finally {try {consumer.commitSync();}finally {consumer.close();} }springboot集成kafka 1.先写依赖 dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependency!-- kafkfa --dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactIdexclusionsexclusiongroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactId/exclusion/exclusions/dependencydependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactId/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactId/dependency /dependencies 生产者端配置文件 server:port: 9991 spring:application:name: kafka-demokafka:bootstrap-servers: 192.168.200.130:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer消费者端 server:port: 9991 spring:application:name: kafka-demokafka:bootstrap-servers: 192.168.200.130:9092consumer: #这里消费者组就是服务名group-id: ${spring.application.name}key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer生产者 RestController public class HelloController {Autowiredprivate KafkaTemplateString,String kafkaTemplate;GetMapping(/hello)public String hello(){kafkaTemplate.send(topic,程序员); //如果是对象kafkaTemplate.send(topic, User user new User();user.setUsername(xiaowang);user.setAge(18);kafkaTemplate.send(user-topic, JSON.toJSONString(user)); );return ok;} }消费者 package com.heima.kafka.listener;import com.alibaba.fastjson.JSON; import com.heima.kafka.pojo.User; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils;Component public class HelloListener {KafkaListener(topics user-topic)public void onMessage(String message){if(!StringUtils.isEmpty(message)){User user JSON.parseObject(message, User.class);System.out.println(user);}} }springboot整合后和rabbitmq的使用方法差不多,大致内容和功能都可以实现,延迟队列和匹配routkey
http://www.huolong8.cn/news/208478/

相关文章:

  • 网站建设小程序开发报价湘潭网站建设 w磐石网络
  • php开发一个企业网站价格wordpress 破解商场主题
  • 做外掛网站空间网站建设培训班上的讲话
  • 做网站要钱吗网站做外链多少钱
  • 遂溪 网站建设监理网站
  • wordpress响应式网站模板下载广西城乡建设部网站
  • 网站设计 中国风温州设计集团网站建设
  • 百度指数 网站公司网站留言板
  • 自建网站如何备案广告设计
  • 奇趣网做网站wordpress短代码转php
  • 建设工程网站教程如何为网站开发app
  • 网站界面设计实训报告网站优化需要什么软件
  • 福建省网站备案用户注销(删除)备案申请表如何申请百度定位地址
  • 教育培训类网站建设与维护温州网站升级
  • 小网站关键词搜什么网络优化基础知识
  • 学校建设网站费用申请在哪个网站可以学做甜点
  • 开发网站多少钱一个月销售型网站如何做推广
  • 建网站的英文大型网页游戏开发
  • iis怎么使用来建设一个网站上海网站搜索排名优化哪家好
  • 潍坊响应式网站建设要多久潜江网站搭建
  • 网站建设需求报告小程序制作视频教程
  • 华汇建设集团有限公司网站企业网站模板优化
  • 网站的建设时间表成都网页设计培训学校排名
  • 设计建设网站哪家好网站做程序员
  • 北京丰台网站建设网站建设说明书模板
  • 怎么自己免费创建网站汕头市企业网站建设教程
  • 网站开发技术案例个人网站设计与实现源码
  • 国泰君安官方网站建设集团平台公司是什么意思
  • 网站开发工具与环境删除windows wordpress
  • 手机网站开发教程pdf梓潼网站建设