当前位置: 首页 > news >正文

模版网站建设企业旅游高端网站建设

模版网站建设企业,旅游高端网站建设,wordpress 侧边栏轮播,旅游网站哪个做的好整合kafka多数据源 项目背景依赖配置生产者消费者消息体 项目背景 在很多与第三方公司对接的时候#xff0c;或者处在不同的网络环境下#xff0c;比如在互联网和政务外网的分布部署服务的时候#xff0c;我们需要对接多台kafka来达到我们的业务需求#xff0c;那么当kafk… 整合kafka多数据源 项目背景依赖配置生产者消费者消息体 项目背景 在很多与第三方公司对接的时候或者处在不同的网络环境下比如在互联网和政务外网的分布部署服务的时候我们需要对接多台kafka来达到我们的业务需求那么当kafka存在多数据源的情况就与单机的情况有所不同。 依赖 implementation org.springframework.kafka:spring-kafka:2.8.2配置 单机的情况 如果是单机的kafka我们直接通过springboot自动配置的就可以使用例如在yml里面直接引用 spring:kafka:producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerbootstrap-servers: server001.bbd:9092在使用的时候直接注入,然后就可以使用里面的方法了 Resourceprivate KafkaTemplateString, String kafkaTemplate;多数据源情况下 本篇文章主要讲的是在多数据源下的使用和单机的有所不同我也看了网上的一些博客但是当我去按照网上的配置的时候总是会报错 kafakTemplate这个bean找不到,所以没办法只有按照springboot自动配置里面的来改 package com.ddb.zggz.config;import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer; import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.*; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; import org.springframework.kafka.support.LoggingProducerListener; import org.springframework.kafka.support.ProducerListener; import org.springframework.kafka.support.converter.RecordMessageConverter; import org.springframework.kafka.transaction.KafkaTransactionManager;import java.io.IOException;Configuration(proxyBeanMethods false) ConditionalOnClass(KafkaTemplate.class) EnableConfigurationProperties(KafkaProperties.class) public class KafkaConfiguration {private final KafkaProperties properties;private final KafkaSecondProperties kafkaSecondProperties;public KafkaConfiguration(KafkaProperties properties, KafkaSecondProperties kafkaSecondProperties) {this.properties properties;this.kafkaSecondProperties kafkaSecondProperties;}Bean(kafkaTemplate)Primarypublic KafkaTemplate?, ? kafkaTemplate(ProducerFactoryObject, Object kafkaProducerFactory,ProducerListenerObject, Object kafkaProducerListener,ObjectProviderRecordMessageConverter messageConverter) {KafkaTemplateObject, Object kafkaTemplate new KafkaTemplate(kafkaProducerFactory);messageConverter.ifUnique(kafkaTemplate::setMessageConverter);kafkaTemplate.setProducerListener(kafkaProducerListener);kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());return kafkaTemplate;}Bean(kafkaSecondTemplate)public KafkaTemplate?, ? kafkaSecondTemplate(Qualifier(kafkaSecondProducerFactory) ProducerFactoryObject, Object kafkaProducerFactory,Qualifier(kafkaSecondProducerListener) ProducerListenerObject, Object kafkaProducerListener,ObjectProviderRecordMessageConverter messageConverter) {KafkaTemplateObject, Object kafkaTemplate new KafkaTemplate(kafkaProducerFactory);messageConverter.ifUnique(kafkaTemplate::setMessageConverter);kafkaTemplate.setProducerListener(kafkaProducerListener);kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());return kafkaTemplate;}Bean(kafkaProducerListener)Primarypublic ProducerListenerObject, Object kafkaProducerListener() {return new LoggingProducerListener();}Bean(kafkaSecondProducerListener)public ProducerListenerObject, Object kafkaSecondProducerListener() {return new LoggingProducerListener();}Bean(kafkaConsumerFactory)Primarypublic ConsumerFactoryObject, Object kafkaConsumerFactory(ObjectProviderDefaultKafkaConsumerFactoryCustomizer customizers) {DefaultKafkaConsumerFactoryObject, Object factory new DefaultKafkaConsumerFactory(this.properties.buildConsumerProperties());customizers.orderedStream().forEach((customizer) - customizer.customize(factory));return factory;}Bean(kafkaSecondConsumerFactory)public ConsumerFactoryObject, Object kafkaSecondConsumerFactory(ObjectProviderDefaultKafkaConsumerFactoryCustomizer customizers) {DefaultKafkaConsumerFactoryObject, Object factory new DefaultKafkaConsumerFactory(this.kafkaSecondProperties.buildConsumerProperties());customizers.orderedStream().forEach((customizer) - customizer.customize(factory));return factory;}Bean(zwKafkaContainerFactory)KafkaListenerContainerFactoryConcurrentMessageListenerContainerInteger, String zwKafkaContainerFactory(Qualifier(value kafkaSecondConsumerFactory) ConsumerFactoryObject, Object kafkaSecondConsumerFactory) {ConcurrentKafkaListenerContainerFactoryInteger, String factory new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(kafkaSecondConsumerFactory);factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;}Bean(kafkaProducerFactory)Primarypublic ProducerFactoryObject, Object kafkaProducerFactory(ObjectProviderDefaultKafkaProducerFactoryCustomizer customizers) {DefaultKafkaProducerFactoryObject, Object factory new DefaultKafkaProducerFactory(this.properties.buildProducerProperties());String transactionIdPrefix this.properties.getProducer().getTransactionIdPrefix();if (transactionIdPrefix ! null) {factory.setTransactionIdPrefix(transactionIdPrefix);}customizers.orderedStream().forEach((customizer) - customizer.customize(factory));return factory;}Bean(kafkaSecondProducerFactory)public ProducerFactoryObject, Object kafkaSecondProducerFactory(ObjectProviderDefaultKafkaProducerFactoryCustomizer customizers) {DefaultKafkaProducerFactoryObject, Object factory new DefaultKafkaProducerFactory(this.kafkaSecondProperties.buildProducerProperties());String transactionIdPrefix this.kafkaSecondProperties.getProducer().getTransactionIdPrefix();if (transactionIdPrefix ! null) {factory.setTransactionIdPrefix(transactionIdPrefix);}customizers.orderedStream().forEach((customizer) - customizer.customize(factory));return factory;}BeanConditionalOnProperty(name spring.kafka.producer.transaction-id-prefix)public KafkaTransactionManager?, ? kafkaTransactionManager(ProducerFactory?, ? producerFactory) {return new KafkaTransactionManager(producerFactory);}BeanConditionalOnProperty(name spring.kafka.jaas.enabled)public KafkaJaasLoginModuleInitializer kafkaJaasInitializer() throws IOException {KafkaJaasLoginModuleInitializer jaas new KafkaJaasLoginModuleInitializer();KafkaProperties.Jaas jaasProperties this.properties.getJaas();if (jaasProperties.getControlFlag() ! null) {jaas.setControlFlag(jaasProperties.getControlFlag());}if (jaasProperties.getLoginModule() ! null) {jaas.setLoginModule(jaasProperties.getLoginModule());}jaas.setOptions(jaasProperties.getOptions());return jaas;}Bean(kafkaAdmin)Primarypublic KafkaAdmin kafkaAdmin() {KafkaAdmin kafkaAdmin new KafkaAdmin(this.properties.buildAdminProperties());kafkaAdmin.setFatalIfBrokerNotAvailable(this.properties.getAdmin().isFailFast());return kafkaAdmin;}} 生产者 package com.ddb.zggz.event;import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject;import com.ddb.zggz.config.ApplicationConfiguration; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback;import javax.annotation.Resource;Component Slf4j public class KafkaPushEvent {Resourceprivate KafkaTemplateString, String kafkaSecondTemplate;Resourceprivate KafkaTemplateString, String kafkaTemplate;Autowiredprivate ApplicationConfiguration configuration;public void pushEvent(PushParam param) {ListenableFutureSendResultString, String sendResultListenableFuture null;if (zw.equals(configuration.getEnvironment())){sendResultListenableFuture kafkaSecondTemplate.send(configuration.getPushTopic(), JSON.toJSONString(param));}if (net.equals(configuration.getEnvironment())){sendResultListenableFuture kafkaTemplate.send(configuration.getPushTopic(), JSON.toJSONString(param));}if (sendResultListenableFuture null){throw new IllegalArgumentException(kakfa发送消息失败);}sendResultListenableFuture.addCallback(new ListenableFutureCallbackSendResultString, String() {Overridepublic void onFailure(Throwable ex) {log.error(kafka发送的message报错发送数据{}, param);}Overridepublic void onSuccess(SendResultString, String result) {log.info(kafka发送的message成功发送数据{}, param);}});}}消费者 package com.ddb.zggz.event;import com.alibaba.fastjson.JSONObject;import com.ddb.zggz.config.ApplicationConfiguration; import com.ddb.zggz.model.dto.ApprovalDTO; import com.ddb.zggz.param.OffShelfParam; import com.ddb.zggz.service.GzApprovalService; import com.ddb.zggz.service.GzServiceService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.DltHandler; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.RetryableTopic; import org.springframework.retry.annotation.Backoff; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils;import java.util.ArrayList; import java.util.List; import java.util.Objects;Component Slf4j public class SendMessageListener {Autowiredprivate GzApprovalService gzApprovalService;Autowiredprivate GzServiceService gzServiceService;KafkaListener(topics ${application.config.push-topic}, groupId zggz,containerFactory zwKafkaContainerFactory)RetryableTopic(include {Exception.class},backoff Backoff(delay 3000, multiplier 1.5, maxDelay 15000))public void listen(ConsumerRecord?, ? consumerRecord) {String value (String) consumerRecord.value();PushParam pushParam JSONObject.parseObject(value, PushParam.class);//版本提审if (version-approval.equals(pushParam.getEvent())) {ApprovalDTO approvalDTO JSONObject.parseObject(JSONObject.toJSONString(pushParam.getData()), ApprovalDTO.class);gzApprovalService.approval(approvalDTO);}//服务下架if (pushParam.getEvent().equals(server-OffShelf-gzt)) {OffShelfParam offShelfParam JSONObject.parseObject(JSONObject.toJSONString(pushParam.getData()), OffShelfParam.class);gzServiceService.offShelfV1(offShelfParam.getReason(), null, offShelfParam.getUserName(), ZGGZ, offShelfParam.getH5Id(), offShelfParam.getAppId(), offShelfParam.getVersion());}}DltHandlerpublic void processMessage(String message) {} } 消息体 package com.ddb.zggz.event;import com.alibaba.fastjson.annotation.JSONField; import com.ddb.zggz.model.GzH5VersionManage; import com.ddb.zggz.model.GzService; import com.ddb.zggz.model.dto.ApprovalDTO; import com.ddb.zggz.param.OffShelfParam; import com.ddb.zggz.param.PublishParam; import com.ddb.zggz.param.ReviewAndRollback; import com.fasterxml.jackson.annotation.JsonFormat; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import com.fasterxml.jackson.databind.annotation.JsonSerialize; import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer; import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer; import lombok.Data;import java.io.Serializable; import java.time.LocalDateTime;/*** author bbd*/ Data public class PushParam implements Serializable {/*** 发送的消息数据*/private Object data;JsonFormat(pattern yyyy-MM-dd HH:mm:ss)JsonSerialize(using LocalDateTimeSerializer.class)JsonDeserialize(using LocalDateTimeDeserializer.class)JSONField(format yyyy-MM-dd HH:mm:ss)private LocalDateTime createTime LocalDateTime.now();/*** 事件名称用于消费者处理相关业务*/private String event;/*** 保存版本参数*/public static PushParam toKafkaVersion(GzH5VersionManage gzH5VersionManage) {PushParam pushParam new PushParam();pushParam.setData(gzH5VersionManage);pushParam.setEvent(save-version);return pushParam;}/*** 保存服务参数*/public static PushParam toKafkaServer(GzService gzService) {PushParam pushParam new PushParam();pushParam.setData(gzService);pushParam.setEvent(save-server);return pushParam;}
http://www.huolong8.cn/news/136011/

相关文章:

  • 免费造网站wordpress for sae 4.4
  • 天津行业建站wordpress更换域名后网站打不开
  • wordpress 电影网站asp网站建设 文献
  • 传世网站建设购物网站开发设计思路
  • 万网放网站网站开发背景设置
  • html中文网站模板下载济宁网站建设多少钱
  • 网站建设关键词排名佛山营销网站建设费用
  • 专门做预言的网站2010年青海省建设厅网站
  • mvc做网站wordpress4.7中文主题
  • 移动网站怎么登录你对网站第一印象
  • 可信网站认证是否必须做合肥网站制作费用
  • 黑龙江网站设计中国建筑招投标平台
  • 建个大型网站需企业邮箱注册免费申请
  • 巴彦淖尔市网站制作聚名网认证
  • 毕业设计旅游网站开发10.制作一个网站一般先要明确( )
  • 网站制作是不是要先用ps做百度网盘资源搜索
  • 昆明网站建设推广优化网站设计主要包含3个方面
  • 建设网站制作汉狮团队中山软件开发项目管理
  • 提升网站建设品质信息南头专业的网站建设公司
  • 企业网站托管外包平台富阳建立网站的
  • 微网站运营私人承包小工程哪里有
  • wordpress如何启用全站sslwordpress新建404页面
  • 网站运营阶段怎么做自己淘宝优惠券网站
  • 网站手机app开发东乡族网站建设
  • 郑州快速网站优化公司哪家好开发菏泽网站建设
  • 如何选择低价网站建设win7配置不能运行wordpress
  • 邹平网站定制网站的构建一般要多久
  • 域名注册网站建设初级网站开发的自我推荐
  • 现在网站建设用什么语言安徽水利建设市场信用信息平台网站
  • 马蜂窝网站怎么做wordpress 微信 支付宝