深圳外贸网站建设企业,广西建设厅培训中心官网,精品手机网站案例,me微擎怎么做网站为啥有这个技术#xff1f;#xff1f;#xff1f;
1. 这个stream是操作消息队列的#xff0c;简化#xff0c;学习消息队列的成本降低。
2. 可操作rabbitMQ兔子message queue#xff0c;kafaka#xff0c;可理解为jdbc可操作oracle, mysql..
3. spring家的技术学就完了…为啥有这个技术
1. 这个stream是操作消息队列的简化学习消息队列的成本降低。
2. 可操作rabbitMQ兔子message queuekafaka可理解为jdbc可操作oracle, mysql..
3. spring家的技术学就完了。。stream对消息驱动需要了解的概念消息驱动生产者消费者再建一个服务消息消费者问题对消息驱动需要了解的概念
(1) 网站 文档 https://spring.io/projects/spring-cloud-stream#overview
https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.1.RELEASE/reference/html/ (2) 介绍
什么是SpringCloudStream
官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架。
应用程序通过inputs或者outputs与Spring Cloud Stream中binder对象交互。
通过我们配置来binding(绑定)
而Spring Cloud Stream的binder对象负责与消息中间件交互。
所以,我们只需要搞清楚如何与Spring Cloud Stream
交互就可以访便使用消息驱动的方式。
通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。
Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现
引用了发布-订阅、消费组、分区的三个核心概念。
目前仅支持RabbitMQ、Kafka.
(3) 标准mq Message 生产者/消费者之间靠消息媒介传递信息内容 消息通道MessageChannel 消息必须走特定的通道 消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息处理器订阅 消息通道里的消息如何被消费呢谁负责收发处理 (4) 为什么用Cloud Stream
1 绑定stream凭什么可以统一底层差异
在没有绑定器这个概念的情况下我们的SpringBoot应用要直接与消息中间件进行信息交互的时候, 于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性 通过定义绑定器作为中间层完美地实现了应用程序与消息中间件细节之间的隔离。 通过向应用程序暴露统- -的Channel通道 使得应用程序不需要再考虑各种不同的消息中间件实现。
2 binder架构 INPUT对应于消费者
OUTPUT对应于生产者5 Stream中的消息通信方式遵循了发布-订阅模式
Topic主题进行广播
在RabbitMQ就是Exchange 交换机
在kafka中就是Topic
6 常用api注解 消息驱动生产者消费者
生产者 pom dependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-stream-rabbit/artifactId/dependencyymal
server:port: 8802spring:application:name: cloud-stream-consumercloud:stream:binders: # 在此处配置要绑定的rabbitmq的服务信息defaultRabbit: # 表示定义的名称用于于binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq的相关的环境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理input: # 这个名字是一个通道的名称destination: studyExchange # 表示要使用的Exchange名称定义content-type: application/json # 设置消息类型本次为json文本则设置“text/plain”binder: defaultRabbit # 设置要绑定的消息服务的具体设置eureka:client: # 客户端进行Eureka注册的配置service-url:defaultZone: http://localhost:7001/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔默认是30秒lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔默认是90秒instance-id: receive-8802.com # 在信息列表时显示主机名称prefer-ip-address: true # 访问的路径变为IP地址
发送消息接口实现
public interface IMessageProvider
{public String send();
}/// 实现
import com.atguigu.springcloud.service.IMessageProvider;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.integration.support.MessageBuilderFactory;
import org.springframework.messaging.MessageChannel;
import org.springframework.integration.support.MessageBuilder;
import javax.annotation.Resource;
import org.springframework.cloud.stream.messaging.Source;import javax.annotation.Resource;
import java.util.UUID;EnableBinding(Source.class) //定义消息的推送管道
public class MessageProviderImpl implements IMessageProvider
{Resourceprivate MessageChannel output; // 消息发送管道Overridepublic String send(){String serial UUID.randomUUID().toString();output.send(MessageBuilder.withPayload(serial).build());System.out.println(*****serial: serial);return null;}
}
消费者 yaml, pom同时yaml要改端口。 定义controller接收消息
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;Component
EnableBinding(Sink.class)
public class ReceiveMessageListenerController {Value(${server.port})private String serverPort;StreamListener(Sink.INPUT)public void input(MessageString message) {System.out.println(消费者1号接受message.getPayload()\t port:serverPort);}}再建一个服务消息消费者问题
1. 消息重复。发送者发送消息两个消费者都会接收到消息如果是支付多个模块
收到一条消息多个模块会收到坏账需要分组只对一个支付模块发消息.
2. 消息持久化。当关掉消费者消息丢失。a. 新增group配置自定义group
group: damnb. 持久化服务挂了保证消息不丢失
页数配置分组解决