是想建个网站 用本地做服务器,松岗做网站费用,免费自助建站系统平台 贴吧,非凡门户wordpress企业主题本文是我们名为“ Spring Integration for EAI ”的学院课程的一部分。 在本课程中#xff0c;向您介绍了企业应用程序集成模式以及Spring Integration如何解决它们。 接下来#xff0c;您将深入研究Spring Integration的基础知识#xff0c;例如通道#xff0c;转换器和适… 本文是我们名为“ Spring Integration for EAI ”的学院课程的一部分。 在本课程中向您介绍了企业应用程序集成模式以及Spring Integration如何解决它们。 接下来您将深入研究Spring Integration的基础知识例如通道转换器和适配器。 在这里查看 目录 1.简介 2.解释Web服务通道适配器 2.1。 其他属性 2.2。 入站Web服务网关 3.创建一个Spring Web Services项目 3.1。 票务服务界面 3.2。 票证端点 3.3。 服务配置 4.实施Spring集成流程 5.添加客户端超时 6.使用拦截器 7. Web服务重试操作 7.1。 定义重试建议 7.2。 将建议添加到网关 7.3。 修改Web服务端点 7.4。 重试建议政策 7.5。 使用轮询器重试操作 8.下载源代码 1.简介 在本教程中您将看到使用Spring Integration增强的应用程序的第一个示例。 为了实现这一目标本示例将重点介绍与外部Web服务的集成。 首先我将解释什么是必需的适配器它将使我们能够从Spring Integration调用Web服务。 接下来我们将对Spring Web Services项目进行简要说明该项目将是将从我们的应用程序中调用的外部Web服务。 在完成本教程的主要部分之后我们将实现一个将调用Web服务的应用程序。 结束本教程我们将使用Spring Integration提供的一些功能来完成我们的应用程序例如添加超时使用拦截器以及学习如何重试失败的调用。 本教程由以下部分组成 介绍 解释Web服务通道适配器 创建一个Spring Web Services项目 实施Spring Integration流程 添加客户端超时 使用拦截器 Web服务重试操作 2.解释Web服务通道适配器 与外部Web服务的通信是通过带有网关的Spring Integration完成的。 如上一教程中所述您可以找到两种类型的网关入站和出站。 在本教程中我们将使用一种特殊的网关出站Web服务网关 。 在本节中我们将重点介绍这种类型。 为了使用Web服务网关您将需要指定一个新的名称空间 beans xmlnshttp://www.springframework.org/schema/beansxmlns:xsihttp://www.w3.org/2001/XMLSchema-instance xmlns:inthttp://www.springframework.org/schema/integrationxmlns:int-wshttp://www.springframework.org/schema/integration/wsxsi:schemaLocationhttp://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsdhttp://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsdhttp://www.springframework.org/schema/integration/ws http://www.springframework.org/schema/integration/ws/spring-integration-ws.xsd 设置了新的名称空间后我们现在可以使用Web服务网关 int-ws:outbound-gateway idaGatewayrequest-channelrequestChannel reply-channelresponseChannelurihttp://localhost:8080/spring-ws-tickets/tickets marshallermarshallerunmarshallermarshaller/ 那么此网关的行为是什么 该流程的执行如下 消息发送到通道requestChannel 。 然后将此消息发送到Web服务网关该Web服务网关通过设置其request-channel属性来订阅该通道。 网关将请求发送到外部Web服务这将在下一部分中进行说明。 uri属性指定目的地。 网关等待外部Web服务直到它返回响应。 指定的编组器将返回响应并将其编组。 响应被包装到消息中并发送到由responseChannel reply-channel属性指定的channel responseChannel 。 如您所见您只需要定义流请求和答复通道以及调用位置。 发送消息所需的基础结构详细信息由Spring Integration处理。 其他属性 还有其他一些属性可用于定制网关的调用。 以下是主要属性的简要说明 Destination provider:可以使用它代替提供“ uri”属性。 这样您可以实现自己的类该类将动态解析调用了哪个端点。 您应该为bean提供以下接口 public class MyDestinationProvider implements DestinationProvider { Overridepublic URI getDestination() {//resolve destination}
} 在网关定义中我们可以使用此提供程序来代替直接提供URI int-ws:outbound-gateway idaGatewayrequest-channelrequestChannel reply-channelresponseChannel destination-providermyDestinationProvidermarshallermarshaller unmarshallermarshaller/ Message sender:允许我们定义一个WebServiceMessageSender 。 在本教程的后面我们将使用它来定义客户端超时。 Interceptor/Interceptors:您可以定义客户端拦截器。 这也将在本教程的后面部分中进行说明。 入站Web服务网关 本部分只是为了快速了解入站服务网关以了解其一般工作原理因为在本教程中我们将不使用它。 该网关将接收来自外部服务的请求将该请求包装为消息然后将其发送到我们的消息传递系统中。 处理完请求后将向网关发送一条消息以传递Web服务正在等待的响应。 语法类似于出站Web服务网关 int-ws:inbound-gateway idanotherGateway request-channelrequestChannel marshallermarshaller unmarshallermarshaller/ 您可能还记得以前的教程响应将通过临时消息通道到达网关。 如果没有必要请不要显式定义通道。 3.创建一个Spring Web Services项目 本节说明将公开我们的应用程序将使用的Web服务的项目。 它包含一个使用Spring Web Services项目实现的Web应用程序。 该应用程序非常简单。 它包含一个服务界面该界面允许用户从电影院服务订购门票。 请求订单时服务将对其进行处理并返回TicketConfirmation 。 下图显示了其结构 图1 我们将从下至上进行解释。 票务服务界面 这是服务接口和实现 public interface TicketService {public TicketConfirmation order(String filmId, Date sessionDate, int quantity);
} 该实现根据提供的数据构建TicketConfirmation实例。 Service
public class TicketServiceimpl implements TicketService {Overridepublic TicketConfirmation order(String filmId, Date sessionDate, int quantity) {float amount 5.95f * quantity;TicketConfirmation confirmation new TicketConfirmation(filmId, sessionDate, quantity, amount);return confirmation;}
} TicketConfirmation对象是一个不变的类将用于读取确认数据 public final class TicketConfirmation {private String confirmationId;private String filmId;private int quantity;private Date sessionDate;private float amount;public TicketConfirmation(String filmId, Date sessionDate, int quantity, float amount) {this.confirmationId UUID.randomUUID().toString();this.filmId filmId;this.sessionDate new Date(sessionDate.getTime());this.quantity quantity;this.amount amount;}public String getConfirmationId() {return confirmationId;}public String getFilmId() {return filmId;}public int getQuantity() {return quantity;}public Date getSessionDate() {return new Date(sessionDate.getTime());}public float getAmount() {return amount;}
}票证端点 端点负责接收请求并将订单处理委托给Ticket服务 Endpoint
public class TicketEndpoint {Autowiredprivate TicketService ticketService;PayloadRoot(localPartticketRequest, namespacehttp://www.xpadro.spring.samples.com/tickets)public ResponsePayload TicketResponse order(RequestPayload TicketRequest ticketRequest) throws InterruptedException {Calendar sessionDate Calendar.getInstance();sessionDate.set(2013, 9, 26);TicketConfirmation confirmation ticketService.order(ticketRequest.getFilmId(), DateUtils.toDate(ticketRequest.getSessionDate()), ticketRequest.getQuantity().intValue());return buildResponse(confirmation);}private TicketResponse buildResponse(TicketConfirmation confirmation) {TicketResponse response new TicketResponse();response.setConfirmationId(confirmation.getConfirmationId());response.setFilmId(confirmation.getFilmId());response.setSessionDate(DateUtils.convertDate(confirmation.getSessionDate()));BigInteger quantity new BigInteger(Integer.toString(confirmation.getQuantity()));response.setQuantity(quantity);BigDecimal amount new BigDecimal(Float.toString(confirmation.getAmount()));response.setAmount(amount);return response;}
} 该服务将接收使用命名空间http://www.xpadro.spring.samples.com/tickets和ticketRequest请求元素ticketRequest请求。 服务配置 在Spring配置中我们定义了Web服务组件 !-- Detects Endpoint since it is a specialization of Component --
context:component-scan base-packagexpadro.spring.ws/!-- detects PayloadRoot --
ws:annotation-driven/ws:dynamic-wsdl idticketDefinition portTypeNameTickets locationUrihttp://localhost:8080/spring-ws-ticketsws:xsd location/WEB-INF/schemas/xsd/ticket-service.xsd/
/ws:dynamic-wsdl web.xml文件公开了MessageDispatcherServlet: context-paramparam-namecontextConfigLocation/param-nameparam-valueclasspath:xpadro/spring/ws/config/root-config.xml/param-value
/context-paramlistenerlistener-classorg.springframework.web.context.ContextLoaderListener/listener-class
/listenerservletservlet-nameTicket Servlet/servlet-nameservlet-classorg.springframework.ws.transport.http.MessageDispatcherServlet/servlet-classinit-paramparam-namecontextConfigLocation/param-nameparam-valueclasspath:xpadro/spring/ws/config/servlet-config.xml/param-value/init-paramload-on-startup1/load-on-startup
/servletservlet-mappingservlet-nameTicket Servlet/servlet-nameurl-pattern/tickets/*/url-pattern
/servlet-mapping 现在我们只需要将其部署到服务器中即可开始处理票单请求。 4.实施Spring集成流程 我们的Spring Integration应用程序从一个简单的流程开始。 图2 该请求消息将通过系统入口网关。 然后该消息将传递到Web服务出站网关该网关将其发送到端点并等待响应。 收到响应后它将通过响应通道发送响应然后返回系统入口网关然后系统入口网关会将其传递给客户端。 客户端应用程序将TicketRequest发送到TicketService接口。 该接口被网关拦截。 通过这种方式 TicketRequest对象被包装到Spring Integration消息中并发送到消息传递系统。 public interface TicketService {/*** Entry to the messaging system. * All invocations to this method will be* intercepted and sent to the SI system entry gateway* * param request*/Gatewaypublic TicketResponse invoke(TicketRequest request);
} 查看网关配置我们可以看到已将其链接到TicketService接口 int:gateway idsystemEntry default-request-channelrequestChannel default-reply-channelresponseChannelservice-interfacexpadro.spring.integration.ws.gateway.TicketService / 我们还定义了请求和回复渠道。 该请求消息将被发送到requestChannel通道在该通道上订阅了Web服务出站网关 int-ws:outbound-gateway idmarshallingGatewayrequest-channelrequestChannel reply-channelresponseChannelurihttp://localhost:8080/spring-ws-tickets/tickets marshallermarshallerunmarshallermarshaller/ responseChannel被配置为其回复通道系统进入网关已在该通道中订阅。 这样客户端将收到响应。 使用直接通道配置完整流。 这意味着流程是同步的 客户端将阻止等待Web服务响应 context:component-scan base-packagexpadro.spring.integration /!-- Entry to the messaging system --
int:gateway idsystemEntry default-request-channelrequestChannel default-reply-channelresponseChannelservice-interfacexpadro.spring.integration.ws.gateway.TicketService /int:channel idrequestChannel/int-ws:outbound-gateway idmarshallingGatewayrequest-channelrequestChannel reply-channelresponseChannelurihttp://localhost:8080/spring-ws-tickets/tickets marshallermarshallerunmarshallermarshaller/oxm:jaxb2-marshaller idmarshaller contextPathxpadro.spring.integration.ws.types /int:channel idresponseChannel / 系统已设置 我们不必实现任何Java类。 全部通过配置进行配置。 在完成示例之后让我们看一下执行此流程的测试 ContextConfiguration({classpath:xpadro/spring/integration/ws/test/config/int-ws-config.xml})
RunWith(SpringJUnit4ClassRunner.class)
public class TestInvocation {Autowiredprivate TicketService service;Testpublic void testInvocation() throws InterruptedException, ExecutionException {TicketRequest request new TicketRequest();request.setFilmId(aFilm);request.setQuantity(new BigInteger(3));request.setSessionDate(DateUtils.convertDate(new Date()));TicketResponse response service.invoke(request);assertNotNull(response);assertEquals(aFilm, response.getFilmId());assertEquals(new BigInteger(3), response.getQuantity());}
} 在下一节中我们将向此示例应用程序添加一些功能。 5.添加客户端超时 检查网关的名称空间我们可以看到没有用于设置调用超时的配置。 无论如何我们都可以使用消息发送者。 消息发送者是WebServiceMessageSender的实现。 Spring Web Services项目提供的一种有趣的实现是HttpComponentsMessageSender类。 此类将允许我们通过内部使用Apache HttpClient将身份验证或连接池添加到调用中。 而且我们还将能够定义读取和连接超时。 在此示例之后我们将其添加为超时。 首先我们需要使用上述类定义一个bean。 这将是我们的消息发件人 bean idmessageSender classorg.springframework.ws.transport.http.HttpComponentsMessageSenderproperty nameconnectionTimeout value5000/property namereadTimeout value10000/
/bean 接下来我们将在我们的Web服务网关中配置消息发送者 int-ws:outbound-gateway idmarshallingGatewayrequest-channelrequestChannel reply-channelresponseChannelurihttp://localhost:8080/spring-ws-tickets/tickets marshallermarshallerunmarshallermarshaller message-sendermessageSender/ 而已。 现在如果达到超时将抛出WebServiceIOException 。 6.使用拦截器 Web服务网关的名称空间中包含的另一个功能是可以配置客户端拦截器。 这些客户端拦截器是Spring Web Services项目的功能并且引用客户端上的端点拦截器。 ClientInterceptor实现具有以下方法 public interface ClientInterceptor {boolean handleRequest(MessageContext messageContext) throws WebServiceClientException;boolean handleResponse(MessageContext messageContext) throws WebServiceClientException;boolean handleFault(MessageContext messageContext) throws WebServiceClientException;
} handleRequest 在调用端点之前调用此方法。 handleResponse 在端点成功返回之后将调用此方法。 handleFault 如果端点抛出错误则调用此方法。 注意这些方法可以操纵MessageContext 该MessageContext包含请求和响应。 让我们看一个例子。 我们将实现自定义客户端拦截器以在调用端点之前拦截该调用并且将更改请求值。 拦截器实现ClientInterceptor接口 public class MyInterceptor implements ClientInterceptor {private Logger logger LoggerFactory.getLogger(this.getClass());Overridepublic boolean handleRequest(MessageContext messageContext) throws WebServiceClientException {WebServiceMessage message messageContext.getRequest();DOMSource source (DOMSource) message.getPayloadSource();Node quantityNode source.getNode().getAttributes().getNamedItem(quantity);String oldValue quantityNode.getNodeValue();quantityNode.setNodeValue(5);logger.info(Before endpoint invocation. Changed quantity old value {} for {}, oldValue, 5);return true;}Overridepublic boolean handleResponse(MessageContext messageContext) throws WebServiceClientException {logger.info(endpoint invocation succeeds);return true;}Overridepublic boolean handleFault(MessageContext messageContext) throws WebServiceClientException {logger.info(endpoint returned a fault);return true;}
} 现在我们需要将拦截器添加到网关配置中 int-ws:outbound-gateway idmarshallingGatewayrequest-channelrequestChannel reply-channelresponseChannelurihttp://localhost:8080/spring-ws-tickets/tickets marshallermarshallerunmarshallermarshaller message-sendermessageSender interceptormyInterceptor /bean idmyInterceptor classxpadro.spring.integration.ws.interceptor.MyInterceptor / Web服务网关名称空间还允许我们定义interceptors属性。 这使我们可以配置客户端拦截器列表。 该测试将验证请求值已被修改 ContextConfiguration({classpath:xpadro/spring/integration/ws/test/config/int-ws-config.xml})
RunWith(SpringJUnit4ClassRunner.class)
public class TestInvocation {Autowiredprivate TicketService service;Testpublic void testInvocation() throws InterruptedException, ExecutionException {TicketRequest request new TicketRequest();request.setFilmId(aFilm);request.setQuantity(new BigInteger(3));request.setSessionDate(DateUtils.convertDate(new Date()));TicketResponse response service.invoke(request);assertNotNull(response);assertEquals(aFilm, response.getFilmId());assertEquals(new BigInteger(5), response.getQuantity());}
} 在实现自定义拦截器之前请考虑到Spring Web Services项目提供了几种实现 PayloadValidatingInterceptor 使用模式验证Web服务消息的有效负载。 如果未通过验证则处理将被取消。 Wss4jSecurityInterceptor 基于Apache的WSS4J的Web服务安全端点拦截器。 XwsSecurityInterceptor 基于Sun的XML和Web服务安全包的Web服务安全端点拦截器。 7. Web服务重试操作 有时我们可能想调用一项服务但该服务暂时关闭或者该服务仅在某些日子才在线。 如果发生这种情况我们可能要稍后重试调用。 Spring Integration提供了开始重试服务调用直到满足条件的可能性。 这种情况可能是服务最终响应或者我们达到了最大尝试次数。 对于此功能Spring Integration提供了重试建议。 该建议由Spring Retry项目提供支持。 重试建议包含在Web服务出站网关中。 这样网关将Web服务调用委托给重试建议。 如果服务调用失败则建议将根据其配置继续尝试重试该操作。 定义重试建议 我们必须使用RequestHandlerRetryAdvice类定义一个新bean bean idretryAdvice classorg.springframework.integration.handler.advice.RequestHandlerRetryAdvice property nameretryTemplatebean classorg.springframework.retry.support.RetryTemplateproperty namebackOffPolicybean classorg.springframework.retry.backoff.FixedBackOffPolicyproperty namebackOffPeriod value5000 //bean/propertyproperty nameretryPolicybean classorg.springframework.retry.policy.SimpleRetryPolicyproperty namemaxAttempts value5 //bean/property/bean/property
/bean 我们定义了一个建议如果调用失败它将每5秒重新尝试一次直到服务响应或尝试5次为止。 稍后我们将查看建议中定义的这些策略。 将建议添加到网关 定义建议后我们需要将其包含在网关中。 Spring Integration Web Services名称空间已经为此提供了一个元素 int-ws:outbound-gateway idmarshallingGatewayrequest-channelrequestChannel reply-channelresponseChannelurihttp://localhost:8080/spring-ws-tickets/tickets marshallermarshallerunmarshallermarshaller message-sendermessageSender interceptormyInterceptor int-ws:request-handler-advice-chainref beanretryAdvice //int-ws:request-handler-advice-chain
/int-ws:outbound-gateway 我们已将建议集成到网关中。 现在让我们修改示例以查看其工作原理。 修改Web服务端点 我们将修改端点以便在尝试指定次数的重试之前一直失败。 在这种情况下需要两次直到返回响应。 PayloadRoot(localPartticketRequest, namespacehttp://www.xpadro.spring.samples.com/tickets)
public ResponsePayload TicketResponse order(RequestPayload TicketRequest ticketRequest) throws InterruptedException {Calendar sessionDate Calendar.getInstance();sessionDate.set(2013, 9, 26);TicketConfirmation confirmation ticketService.order(ticketRequest.getFilmId(), DateUtils.toDate(ticketRequest.getSessionDate()), ticketRequest.getQuantity().intValue());TicketResponse response buildResponse(confirmation);retries;if (retries 3) {throw new RuntimeException(not enough retries);}else {retries 0;}return response;
} 现在我们将启动测试并使用我们先前定义的拦截器来查看它如何记录尝试 2014-03-26 08:24:50,535|AbstractEndpoint|started org.springframework.integration.endpoint.EventDrivenConsumer392044a1
2014-03-26 08:24:50,626|MyInterceptor|Before endpoint invocation. Changed quantity old value 3 for 5
2014-03-26 08:24:51,224|MyInterceptor|endpoint returned a fault
2014-03-26 08:24:56,236|MyInterceptor|Before endpoint invocation. Changed quantity old value 3 for 5
2014-03-26 08:24:56,282|MyInterceptor|endpoint returned a fault
2014-03-26 08:25:01,285|MyInterceptor|Before endpoint invocation. Changed quantity old value 3 for 5
2014-03-26 08:25:01,377|MyInterceptor|endpoint invocation succeeds 网关一直尝试调用直到服务响应为止因为重试建议具有较高的重试次数五个。 重试建议政策 Spring Integration重试建议是在Spring Retry项目策略上备份的。 这些政策说明如下 退避政策 它确定了重试之间或初次重试之前的时间。 BackOffPolicy接口定义了两种方法 BackOffContext start(RetryContext context); void backOffBackOffContext backOffContext抛出BackOffInterruptedException; start方法允许定义初始行为。 例如初始时间延迟。 backoff方法允许定义重试之间的暂停。 Spring Retry项目提供了回退策略的几种实现 无状态退避策略调用之间不保持任何状态。 FixedBackOffPolicy:重试之间暂停指定的时间。 没有设置初始延迟。 NoBackOffPolicy:重试在它们之间完全没有暂停地执行。 有状态的后退策略在调用之间保持状态。 ExponentialBackOffPolicy:从指定的时间量开始它将在每次调用时相乘。 默认情况下它使时间加倍。 您可以更改乘数。 ExponentialRandomBackOffPolicy:扩展ExponentialBackOffPolicy 。 乘数以随机方式设置。 重试政策 它允许定义重试建议在放弃之前将执行Web服务调用多少次。 RetryPolicy接口定义了几种方法 boolean canRetry(RetryContext context); RetryContext openRetryContext parent; void closeRetryContext context; void registerThrowableRetryContext contextThrowable throwable; 如果可以重试该操作则canRetry方法将返回。 例如如果我们尚未达到最大重试次数则可能发生这种情况。 open方法用于获取所有必要的资源跟踪尝试次数或在上一次重试期间是否引发异常。 每次失败的调用后都会调用registerThrowable方法。 Spring Retry项目提供了重试策略的几种实现 SimpleRetryPolicy:重试调用直到达到最大重试次数。 TimeoutRetryPolicy:它将一直重试直到达到超时为止。 在打开方法期间开始超时。 NeverRetryPolicy:它只会尝试一次调用。 AlwaysRetryPolicy: canRetry方法始终返回true。 它将继续重试直到服务响应为止。 ExceptionClassifierRetryPolicy:根据抛出的ExceptionClassifierRetryPolicy:它定义了不同的最大尝试次数。 CompositeRetryPolicy:它包含将按顺序执行的重试策略的列表。 使用轮询器重试操作 可用的重试策略是使用时间延迟实现的这在大多数情况下都很好但是在本节中我们将实现一个自定义解决方案该解决方案将使我们能够使用通过Cron Expression配置的轮询器。 由于调用可能会失败因此网关不会返回结果。 我们将使流程异步以便允许客户端发送服务请求并继续。 这样流将继续从另一个线程重试直到服务激活程序处理结果或达到重试限制为止。 网关如下 public interface AsyncTicketService {Gatewaypublic void invoke(TicketRequest request);
} 网关未定义回复通道因为不会发送响应。 由于这是一个异步请求因此请求通道包含一个队列。 这将允许其使用者主动轮询来自另一个线程的消息 int:gateway idsystemEntry default-request-channelrequestChannelservice-interfacexpadro.spring.integration.ws.gateway.AsyncTicketService /int:channel idrequestChannelint:queue /
/int:channel 我们已经在Web服务网关中包含了一个轮询器因为现在它将轮询消息 int-ws:outbound-gateway idmarshallingGatewayrequest-channelrequestChannel reply-channelresponseChannelurihttp://localhost:8080/spring-ws-tickets/tickets marshallermarshallerunmarshallermarshaller interceptormyInterceptor int:poller fixed-rate500 /
/int-ws:outbound-gateway 先前的调用可能导致三种不同的结果正确的调用需要重试的失败调用以及需要记录的最终失败的调用。 服务调用已正确调用 我们有一个服务激活器订阅了响应通道。 这是一个简单的示例因此只记录结果 !-- Service is running - Response received --
int:channel idresponseChannel /
int:service-activator refclientServiceActivator methodhandleServiceResult input-channelresponseChannel / 服务调用失败。 重试该操作 如果出现问题并且由于它是异步请求则该异常将被包装到MessageHandlingException 并发送到错误通道该通道由Spring Integration默认配置。 至此我们已经有一个路由器订阅了错误通道。 该路由器处理尝试的重试次数并基于此尝试将失败的消息重定向到适当的通道。 如果要重试该操作它将把消息发送到重试通道 Component(serviceRouter)
public class ServiceRouter {private Logger logger LoggerFactory.getLogger(this.getClass());private int maxRetries 3;private int currentRetries;public String handleServiceError(Message? msg) {logger.info(Handling service failure);if (maxRetries 0) {currentRetries;if (currentRetries maxRetries) {logger.info(Max retries [{}] reached, maxRetries);return failedChannel; }}logger.info(Retry number {} of {}, currentRetries, maxRetries);return retryChannel;}
} 路由器的配置如下所示 !-- Service invocation failed --
int:router refserviceRouter methodhandleServiceError input-channelerrorChannel/
int:channel idretryChannel /
int:channel idfailedChannel / 接下来我们有以下这些端点 !-- Retry --
int:service-activator refclientServiceActivator methodretryFailedInvocation input-channelretryChannel /int:inbound-channel-adapter idretryAdapter channelrequestChannel refclientServiceActivator methodretryInvocation auto-startupfalseint:poller cron0/5 * * * * */
/int:inbound-channel-adapter!-- Log failed invocation --
int:service-activator refclientServiceActivator methodhandleFailedInvocation input-channelfailedChannel / retryAdapter入站通道适配器将继续轮询请求通道但是请注意属性auto-startup设置为false。 这意味着该适配器将被禁用直到有人激活它为止。 我们需要这样做否则它将从头开始轮询并且我们只想在发生失败的调用时将其激活。 服务激活器将根据服务调用的结果启动或停止适配器。 失败时它将启动适配器以开始重试。 如果达到最大重试次数则路由器会将消息重定向到失败的通道在该通道中服务激活器将禁用适配器以阻止其轮询。 如果调用最终成功它将记录消息并停止适配器。 Component(clientServiceActivator)
public class ClientServiceActivator {private Logger logger LoggerFactory.getLogger(this.getClass());AutowiredQualifier(retryAdapter)private AbstractEndpoint retryAdapter;private Message? message;public void handleServiceResult(Message? msg) {logger.info(service successfully invoked. Finishing flow);retryAdapter.stop();}public void retryFailedInvocation(Message? msg) {logger.info(Service invocation failed. Activating retry trigger...);MessageHandlingException exc (MessageHandlingException) msg.getPayload();this.message exc.getFailedMessage();retryAdapter.start();}public Message? retryInvocation() {logger.info(Retrying service invocation...);return message;}public void handleFailedInvocation(MessageHandlingException exception) {logger.info(Maximum number of retries reached. Finishing flow.);retryAdapter.stop();}
} 测试类已被修改以免产生以下结果 ContextConfiguration({classpath:xpadro/spring/integration/ws/test/config/int-ws-async-config.xml})
RunWith(SpringJUnit4ClassRunner.class)
public class TestAsyncInvocation {Autowiredprivate AsyncTicketService service;Testpublic void testInvocation() throws InterruptedException, ExecutionException {TicketRequest request new TicketRequest();request.setFilmId(aFilm);request.setQuantity(new BigInteger(3));request.setSessionDate(DateUtils.convertDate(new Date()));service.invoke(request);Thread.sleep(80000);}
} 而已。 显然无需知道我们可以使用Spring Retry项目的重试建议就可以实现所有这些流程但是此示例的目的是获得更多有关如何使用适配器的激活和停用来构建更复杂的流程的知识路由器重定向和其他功能可以满足您的需求。 8.下载源代码 您可以从此处下载有关spring集成和Web服务的源代码 Spring_Integration_Sample.zip和Spring_WS_Sample.zip 翻译自: https://www.javacodegeeks.com/2015/09/spring-integration-and-web-services.html