wordpress 图片站,合肥最新消息今天,辽宁app开发公司,dedecms制作网站地图kafka netty介绍 在上一个博客中 #xff0c;我介绍了Netty用作Web服务器。 该示例运行良好……只要需要广播服务器即可。 大多数情况下不是很有用。 更有可能的是#xff0c;每个客户端仅接收针对其的数据#xff0c;并保留了特殊情况下的广播#xff0c;例如“服务器在1… kafka netty 介绍 在上一个博客中 我介绍了Netty用作Web服务器。 该示例运行良好……只要需要广播服务器即可。 大多数情况下不是很有用。 更有可能的是每个客户端仅接收针对其的数据并保留了特殊情况下的广播例如“服务器在15分钟内停机” 关于该特定服务器示例的另一件事是一切都是独立的。 例如单片应用程序很好但是在当今环境中分布式微服务要好得多。 可伸缩性和可靠性至关重要。 Netty和Kafka在一起很棒。 Netty擅长处理大量客户Kafka擅长使大量服务协同工作。 结合起来它们是开发中的最佳选择。 但是有些“陷阱”可能会使其变得繁琐。 该博客以及示例微服务/ Netty体系结构和功能全面的代码将有望帮助减轻烦恼并实现甜味。 第一要务 示例代码位于此处 。 有详细的自述文件描述了设置环境所需的内容。 我试图将需求降到最低仅Java 8和Maven 。 SLF4J和Logback用于记录日志。 我为Mac OSX和Ubuntu设置了脚本我在Parallels容器中运行的14.04版本是我测试过的脚本因此如果您在Windows上进行开发则表示歉意。 该代码全是Java并且我在Windows上看到过Kafka教程因此所有内容都应在此处运行。 Maven构建也应该产生可以启动的目标因此在安装Zookeeper / Kafka的时候加了一点肘油您可以按照脚本查看需要的设置手动运行它并不重要。视窗。 注如README.md中所述该脚本将删除任何现有的Zookeeper / Kafka安装和数据。 如果您已有设置请不要使用脚本 安装和配置必备mvn package如果不使用脚本请运行mvn package如果是则运行maclocal_run.sh 或linuxlocal_run.sh 。 该脚本将下载Zk / Kafka如果尚未下载进行安装配置启动它们运行mvn package 启动服务并最终启动服务器。 一旦启动就抵制离开外壳的冲动因为它会自动为架构的每个部分弹出新的选项卡。 启动Whirlpool服务器之后就可以开始了。 我强烈建议创建一个脚本以在本地安装配置构建和启动微服务环境。 创建每个单独的服务是一个很大的痛苦。 必要时也可以使用Docker但我发现只需本地运行所有内容下载所需的内容就少得多。 作为一个预告片这里是UI您也可以从GitHub上的README.md看到它。 要添加股票代码请输入它即“ GOOG”然后单击“股票”下的A按钮。 要删除它请单击X。 要添加一个网站来测试它是打开还是关闭请键入完全限定的URL即http://facebook.com 然后单击“ UpDown”下的A按钮。 要删除它请单击X。 要添加天气检查请在中键入城市州即“芝加哥il”然后单击“城市州”下的A按钮。 要删除它请单击X。 由于订阅与每个服务一起存储在内存中因此订阅在页面刷新甚至登录/注销具有相同的用户ID后都不会丢失。 当然“真实”系统将使用数据库。 订阅每10秒钟更新一次因此我不会压倒Yahoo API因此添加数据后请耐心等待。 建筑 在此示例中我试图考虑可能有用的良好通用服务。 我最终选择了股票报价服务“此网站是否正常运转”服务以及气象服务。 这些中的每一个都独立于各自具有Kafka主题的其他主题运行。 我选择配置Kafka的方式是每个服务使用一个命令主题每个服务使用一个数据主题。 一切都可以只使用一个全局主题读者可以决定要处理的内容但将其分离出来可以使其更加清晰和整洁。 这是数据如何通过Kafka流动的示意图。 它是通过一个免费的基于Keyhole的基于Web的实用程序Mockola完成的 。 请注意服务器知道所有主题但是服务仅知道它们自己的主题。 cmd主题用于将命令发送到服务而数据主题在其上没有-cmd主题用于从服务发送数据。 同样所有这些都可以在一个bus主题上进行处理但是通过将它们分离出来可以更轻松地了解发生了什么。 服务 现在让我们谈谈服务。 这三者非常相似因此有一项基本服务可以完成大部分工作。 每个服务都有三个线程由Java ExecutorService处理。 关于Executor服务的一件好事是如果出现问题它将自动重新启动线程。 这有助于弹性。 每个服务通过告诉基类使用什么主题和命令主题来启动自己。 然后基类启动三个线程一个用于从cmd主题读取命令一个用于定期为客户端收集数据一个用于在数据主题上发送数据。 这些线程使用非阻塞Java并发类ConcurrentLinkedQueue和ConcurrentHashMap 。 哈希映射存储每个用户的订阅集队列存储准备发送给数据主题的响应。 每个服务的流程是同时工作的三个线程。 阅读器使用Kafka使用者从其命令主题读取命令。 根据命令添加或删除订阅。 该线程非常笨拙因为它不要求服务对请求进行任何验证而只是盲目地将发送给订阅的内容添加进去。 生产代码显然会添加一个调用要求服务在允许成功订阅之前验证命令。 创建一个响应以放置主题然后等待下一个命令。 注意 关于数据的一些话题。 我使用JSON作为传输格式但是XML或您想要的其他任何内容也可以使用。 重要的是每个人都同意数据格式并坚持使用。 通用模块具有POJO类这些类定义了数据将遵循的协定。 通常对所有消息有用的是时间戳消息类型和客户端的ID。 另一个有用的东西是到期时间戳。 这些示例消息永远存在。 Message类仅查看Message的类型和ID。 服务器使用它来确定需要处理哪种类型的消息以及谁对该消息感兴趣。 没有这些就很难甚至不可能处理数据。 现在消息格式可以涉及很多其中一些格式使用标题和部分来描述复杂的数据。 本示例尝试使所有内容尽可能简单。 净值服务器 让我们一次上一堂课。 NettyHttpFileHandler 与以前的博客相比该类几乎没有变化。 可重用的片段已移至WebSocketHelper类。 该文件的主要用途是提供浏览器要求的文件。 WebSocket助手 可能令人困惑的第一项是类变量clientAttr 。 在Netty Channel中存储数据要求将其附加到AttributeKey 。 这类似于Java并发类中的Atomic实例-它提供了数据容器。 我们将存储客户端ID在本例中为用户名但也可以很容易地作为会话ID以便我们确定哪个Channel需要接收消息。 realWriteAndFlush()方法设置适当的标题内容长度和cookie。 然后它写入并刷新HTTP响应。 线 channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); 告诉Netty这是需要写入客户端的数据的结尾因此Netty会将其发送出去。 特别说明 关于cookie的创建请确保未设置HTTP Only标志。 如果是则JavaScript无法看到Cookie也不会与WebSocket升级请求一起发送。 这样一来您就必须创建自己的页面刷新管理和会话管理方法。 关于cookie的另一件事是使用Netty cookie编码器的STRICT版本因此它将不允许多个具有相同名称的cookie。 我不确定何时允许这种情况发生。 WebSocketMessageHandler 这个类只是定义了一个接口WhirlpoolServerHandler使用交谈的WhirlpoolMessageHandler 。 WhirlpoolMessageHandler 这是Netty和Kafka之间存在连接的地方。 两个执行器处理一个读取器线程和一个写入器线程。 编写器线程在请求队列中查找消息有关这些消息在一分钟内来自何处的更多信息并将消息放置在适当的Kafka命令主题上。 阅读器线程在Kafka数据主题上查找传入消息为每个主题查找正确的Channel然后将消息写入这些主题。 当客户端通过WebSockets发送消息时 WhirlpoolServerHandler将确保已handleMessage()完整的消息然后调用handleMessage() 。 该方法确定是否为有效消息然后将请求添加到请求队列中以便读取器线程可以将其提取并提供给Kafka。 WhirlpoolServerHandler 这堂课有几件有趣的事。 首先它可以区分HTTPREST和WebSocket消息之间的区别。 执行此操作的Netty重写方法是channelRead0 。 这是Netty用来告诉我们消息何时到达以及消息是哪种类型的方法。 对于HTTP和REST调用 handleHttpRequest调用handleHttpRequest 对于handleWebSocketFrame将调用handleWebSocketFrame 。 如果存在cookie方法则handleHttpRequest方法handleHttpRequest读取该cookie。 在POST上它会查找登录和注销信息。 对于登录它将找出用户名/密码创建cookie并防止多次使用相同的名称登录。 所有这些代码将在应用程序的生产版本中添加额外的安全性进行拆分。 要注销它会查找Channel清理关闭它并使cookie过期。 对于WebSocketUpgrade 它要求Netty处理启动websocket所需的复杂握手。 完成此操作后会将用户添加到握手期间创建的Channel。 这是用户连接到Channel的地方如果cookie没有在请求中出现那将不是一件容易的事。 在此唯一需要注意的另一件事是此类设置为处理为SPA单页应用程序编码的客户端因为它将将所有无法识别的调用重定向到index.html 。 该类中的其他方法更多地是为了提供信息将在高级情况下使用。 漩涡服务器 此类启动Netty服务器并创建通道管道。 这是Netty的一个标准类紧随Netty示例。 最后的想法 显然此代码中还有更多内容。 每个服务和服务器的多个实例可以同时运行并且Zk / Kafka可以集群以帮助提高弹性。 一个测试微服务应用程序弹性的强大实用程序是另一个名为TroubleMaker的免费开源Keyhole实用程序。 我还没有机会测试这个例子但是我很期待这个机会。 我们没有涉及安全性尽管我以前希望展示Netty与Shiro的集成但这是一个非常复杂的话题。 我只能说这是有可能的但是我还没有将所有内容都包裹在脑海中以至于无法形成一个连贯的博客。 希望您喜欢该博客并找到有用的代码。 通过博客或Twitter与我联系 johnwboardman 在这里我总是很欣赏新的关注。 翻译自: https://www.javacodegeeks.com/2016/05/whirlpool-microservices-using-netty-kafka.htmlkafka netty