建设 市民中心网站,微网站建设代运营,怎么用网站视频做自媒体,网址提交1、概述
在Spark中很多地方都涉及网络通信#xff0c;比如Spark各个组件间的消息互通、用户文件与Jar包的上传、节点间的Shuffle过程、Block数据的复制与备份等。Spark 2.0 之后#xff0c;master 和worker 之间完全不使用akka 通信#xff0c;改用netty实现。因为使用Akka…1、概述
在Spark中很多地方都涉及网络通信比如Spark各个组件间的消息互通、用户文件与Jar包的上传、节点间的Shuffle过程、Block数据的复制与备份等。Spark 2.0 之后master 和worker 之间完全不使用akka 通信改用netty实现。因为使用Akka要求message发送端和接收端有相同的版本为了避免Akka造成的版本问题给用户的应用更大灵活性决定使用更通用的RPC实现。
spark 基于netty新的rpc框架借鉴了Akka的中的设计它是基于Actor模型如下图所示 Spark通讯框架中各个组件Client/Master/Worker可以认为是一个个独立的实体各个实体之间通过消息来进行通信。具体各个组件之间的关系图如下 EndpointClient/Master/Worker有1个InBox和N个OutBoxN1N取决于当前Endpoint与多少其他的Endpoint进行通信一个与其通讯的其他Endpoint对应一个OutBoxEndpoint接收到的消息被写入InBox发送出去的消息写入OutBox并被发送到其他Endpoint的InBox中。
2、Spark通信架构 1RpcEndpointRPC端点Spark针对每个节点Client/Master/Worker都称之为一个Rpc端点且都实现RpcEndpoint接口内部根据不同端点的需求设计不同的消息和不同的业务处理如果需要发送询问则调用Dispatcher
2RpcEnvRPC上下文环境每个RPC端点运行时依赖的上下文环境称为RpcEnv
3Dispatcher消息分发器针对于RPC端点需要发送消息或者从远程RPC接收到的消息分发至对应的指令收件箱/发件箱。如果指令接收方是自己则存入收件箱如果指令接收方不是自己则放入发件箱
4Inbox指令消息收件箱一个本地RpcEndpoint对应一个收件箱Dispatcher在每次向Inbox存入消息时都将对应EndpointData加入内部ReceiverQueue中另外Dispatcher创建时会启动一个单独线程进行轮询ReceiverQueue进行收件箱消息消费
5RpcEndpointRefRpcEndpointRef是对远程RpcEndpoint的一个引用。当我们需要向一个具体的RpcEndpoint发送消息时一般我们需要获取到该RpcEndpoint的引用然后通过该应用发送消息。
6OutBox指令消息发件箱对于当前RpcEndpoint来说一个目标RpcEndpoint对应一个发件箱如果向多个目标RpcEndpoint发送信息则有多个OutBox。当消息放入Outbox后紧接着通过TransportClient将消息发送出去。消息放入发件箱以及发送过程是在同一个线程中进行
7RpcAddress表示远程的RpcEndpointRef的地址Host Port。
8TransportClientNetty通信客户端一个OutBox对应一个TransportClientTransportClient不断轮询OutBox根据OutBox消息的receiver信息请求对应的远程TransportServer
9TransportServerNetty通信服务端一个RpcEndpoint对应一个TransportServer接受远程消息后调用Dispatcher分发消息至对应收发件箱
3、相关源码阅读
3.1、RpcEnv
RpcEnv是Rpc的环境相当于Actor中的ActorSystem所有的RPCEndPoint都需要注册给RpcEnv实例对象注册的时候会指定注册的名称这样客户端就可以通过名称查询到RPCEndPoint的RPCEndPointRef引用进而进行通信客户端通过操作RPCEndPointRef要给RpcEndPoint发信息怎么发要RpcEnv去管理RpcEnv在具体的实例看见发的信息因为有Ref肯定有路由就路由到远程的具体的RpcEndPoint实体内部的receive方法中如果不注册的话收不到消息。所有的RpcEndPoint其实都是属于RpcEnv的只有属于他客户端发消息的时候才能把信息路由给RpcEndPoint。
也就是RpcEnv 是一个RPC 环境。 RpcEndpoint需要使用RpcEnv的名称来注册自己以接收消息。RpcEnv将处理从RpcEndpointRef或远程节点发送的消息并将它们传递到相应的RpcEndpoint。
RpcEnv是个抽象类作为Rpc通信肯定要传入SparkConf因为是分布式的在spark2.x版本中使用的具体的实现类是NettyRpcEnv。RpcEnv的结构如下 在RpcEnv的伴生对象中重要的功能是创建一个RpcEnv的实例这个实例就是用于管理endpoint。
在 RpcEnv中有一个重要且常用的方法setupEndpoint方法。该方法就是利用上面创建的RpcEnv实例来初始化注册一个Endpoint并返回该endpoint的RpcEndpointRef(代理对象)。当我们调用RpcEnv中的setupEndpoint来注册一个endpoint到rpcEnv的时候在NettyRpcEnv内部会将该endpoint的名称与其本省的映射关系rpcEndpoint与rpcEndpointRef之间映射关系保存在dispatcher对应的成员变量中。我们拿到Endpoint的代理对象后就能向该endpoint发送消息 还有一个setupEndpointRef 方法来获取到指定endpoint的引用对象 下面看看master端的代码master在启动的时候在其伴生对象中会有一个rpcEnv RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)方法创建一个RpcEnv的实例这个实例就是用于管理endpoint。然后masterEndpoint rpcEnv.setupEndpoint(ENDPOINT_NAME, new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))获得一个具体一个具体的endpoint的实例 3.2、RpcEndpointRef
RpcEndpointRef是RpcEndpoint的一个引用简称代理。RpcEndpointRef是线程安全的。
要想向一个RpcEndpoint发送消息必须先持有其Ref代理,通过该代理才能发送消息。RpcEnv结构如下 常用的发送消息的方法是sendaskaskWithRetry他们之间的区别send发送的消息没有返回值用receive接收即可ask和askWithRetry发送的消息有返回值用receiveAndReply来接收。
下面看看worker端的代码worker获取到master的ref引用对象然后发送注册消息 3.3、在standalone模式中worker向master注册案例
1在worker启动的时候有onStart方法这里面调用了registerWithMaster这里面用了tryRegisterAllMaster方法在具体注册的时候向所有的master提交是用线程池的中一个线程来提交。然后就获得了masterEndpoint。获得了masterEndpoint之后将其作为参数传入registerWithMaster方法。然后通过ask发送消息。 2当调用ask将消息发送出去。其实是调用NettyRpcEndpointRef中ask在方法中当前发送地址nettyEnv.address,目标的master地址this和发送的消息message被封装成了RequestMessage消息。 3在NettyRpcEnv.ask中如果是远程rpc调用的话最终ask将调用postToOutbox函数并且此时消息会被序列化成Byte流。实现如下 4在postToOutbox函数中消息将经过OutboxMessage中的sendWith方法client.sendRpc(content)最终通过TransportClient的sendRpc方法client.sendRpc(content)而在TransportClient中将消息进一步封装然后发送给master。 5在master端TransportRequestHandler的handle方法中由于信息在worker端被分装成了RpcRequest所以在该handle方法中将调用processRpcRequest进行处理。 6processRpcRequest函数将调用rpcHandler的实现类NettyRpcHandler中的receive方法。在该方法中首先通过internalRecieve将消息解包成RequestMessage。然后该消息通过dispatcher的分发给对应的endpoint 7在Dispatcher的postMessage方法中可以看到首先根据对应的endpoint的EndpointData信息主要是该endpoint及其应用以及其信箱inbox然后将消息塞到给endpoint此例中的master的信箱中最后将消息塞到recievers的阻塞队列中。 8在Dispatcher中有一个线程池threadpool在MessageLoop类的run方法中将receivers中的对象取出来交由信箱的process方法去处理。如果没有收到任何消息将会阻塞在take处 9在inbox的proces方法中首先取出消息然后根据消息的类型最终将调用endpoint的receiver方法进行处理也就是master中的receive方法。至此整个一次rpc调用的流程结束。 总结①当调用ask将消息发送出去。其实是调用NettyRpcEndpointRef中的ask等方法并将消息封装②NettyRpcEndpointRef中的ask方法调用了NettyRpcEnv.ask如果是远程rpc调用的话最终ask将调用postToOutbox函数并且此时消息会被序列化成Byte流。③在postToOutbox函数中调用OutboxMessage中的sendWith方法中调用TransportClient的sendRpc方法在TransportClient中将消息进一步封装然后发送给master④在master端TransportRequestHandler的handle方法中进行消息类型判断调用processRpcRequest函数⑤processRpcRequest函数将调用rpcHandler的实现类NettyRpcHandler中的receive方法然后该消息通过dispatcher的分发给对应的endpoint⑥在Dispatcher的postMessage方法中可以看到首先根据对应的endpoint的EndpointData信息放到inbox信箱中最后将消息塞到recievers的阻塞队列中⑦在Dispatcher中有一个线程池threadpool在MessageLoop类的run方法中将receivers中的对象取出来交由信箱的process方法去处理。如果没有收到任何消息将会阻塞在take处⑧在inbox的proces方法中首先取出消息然后根据消息的类型最终将调用endpoint的receiver或receiveAndReply方法进行处理也就是master中的receive方法。