阿里巴巴的关联网站,上海企业seo,网页设计作业代做,企业文化宣传策划方案作者#xff1a;freewind 比原项目仓库#xff1a; Github地址#xff1a;https://github.com/Bytom/bytom Gitee地址#xff1a;https://gitee.com/BytomBlockchain/bytom 在上一篇#xff0c;我们知道了比原是如何把“请求区块数据”的信息BlockRequestMessage发送给pee…作者freewind 比原项目仓库 Github地址https://github.com/Bytom/bytom Gitee地址https://gitee.com/BytomBlockchain/bytom 在上一篇我们知道了比原是如何把“请求区块数据”的信息BlockRequestMessage发送给peer节点的那么本文研究的重点就是当peer节点收到了这个信息它将如何应答 那么这个问题如果细分的话也可以分为三个小问题 比原节点是如何收到对方发过来的信息的收到BlockRequestMessage后将会给对方发送什么样的信息这个信息是如何发送出去的我们先从第一个小问题开始。 比原节点是如何接收对方发过来的信息的 如果我们在代码中搜索BlockRequestMessage会发现只有在ProtocolReactor.Receive方法中针对该信息进行了应答。那么问题的关键就是比原是如何接收对方发过来的信息并且把它转交给ProtocolReactor.Receive的。 如果我们对前一篇《比原是如何把请求区块数据的信息发出去的》有印象的话会记得比原在发送信息时最后会把信息写入到MConnection.bufWriter中与之相应的MConnection还有一个bufReader用于读取数据它也是与net.Conn绑定在一起的 p2p/connection.go#L114-L118 func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnConfig) *MConnection {mconn : MConnection{conn: conn,bufReader: bufio.NewReaderSize(conn, minReadBufferSize),bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize),其中minReadBufferSize的值为常量1024 所以要读取对方发来的信息一定会读取bufReader。经过简单的搜索我们发现它也是在MConnection.Start中启动的 p2p/connection.go#L152-L159 func (c *MConnection) OnStart() error {// ...go c.sendRoutine()go c.recvRoutine()// ...
}其中的c.recvRoutine()就是我们本次所关注的。它上面的c.sendRoutine是用来发送的是前一篇文章中我们关注的重点。 继续c.recvRoutine() p2p/connection.go#L403-L502 func (c *MConnection) recvRoutine() {// ...for {c.recvMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(c.config.RecvRate), true)// ...pktType : wire.ReadByte(c.bufReader, n, err)c.recvMonitor.Update(int(n))// ...switch pktType {// ...case packetTypeMsg:pkt, n, err : msgPacket{}, int(0), error(nil)wire.ReadBinaryPtr(pkt, c.bufReader, maxMsgPacketTotalSize, n, err)c.recvMonitor.Update(int(n))// ...channel, ok : c.channelsIdx[pkt.ChannelID]// ...msgBytes, err : channel.recvMsgPacket(pkt)// ...if msgBytes ! nil {// ...c.onReceive(pkt.ChannelID, msgBytes)}// ...}}// ...
}经过简化以后这个方法分成了三块内容 第一块就限制接收速率以防止恶意结点突然发送大量数据把节点撑死。跟发送一样它的限制是500K/s第二块是从c.bufReader中读取出下一个数据包的类型。它的值目前有三个两个跟心跳有关packetTypePing和packetTypePong另一个表示是正常的信息数据类型packetTypeMsg也是我们需要关注的第三块就是继续从c.bufReader中读取出完整的数据包然后根据它的ChannelID找到相应的channel去处理它。ChannelID有两个值分别是BlockchainChannel和PexChannel我们目前只需要关注前者即可它对应的reactor是ProtocolReactor。当最后调用c.onReceive(pkt.ChannelID, msgBytes)时读取的二进制数据msgBytes就会被ProtocolReactor.Receive处理我们的重点是看第三块内容。首先是channel.recvMsgPacket(pkt)即通道是怎么从packet包里读取到相应的二进制数据的呢 p2p/connection.go#L667-L682 func (ch *Channel) recvMsgPacket(packet msgPacket) ([]byte, error) {// ...ch.recving append(ch.recving, packet.Bytes...)if packet.EOF byte(0x01) {msgBytes : ch.recving// ...ch.recving ch.recving[:0]return msgBytes, nil}return nil, nil
}这个方法我去掉了一些错误检查和关于性能方面的注释有兴趣的同学可以点接上方的源代码查看这里就忽略了。 这段代码主要是利用了一个叫recving的通道把packet中持有的字节数组加到它后面然后再判断该packet是否代表整个信息结束了如果是的话则把ch.recving的内容完整返回供调用者处理否则的话返回一个nil表示还没拿完暂时处理不了。在前一篇文章中关于发送数据的地方可以与这里对应只不过发送方要麻烦的多需要三个通道sendQueue、sending和send才能实现这边接收方就简单了。 然后回到前面的方法MConnection.recvRoutine我们继续看最后的c.onReceive调用。这个onReceive实际上是一个由别人赋值给该channel的一个函数它位于MConnection创建的地方 p2p/peer.go#L292-L310 func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config *MConnConfig) *MConnection {onReceive : func(chID byte, msgBytes []byte) {reactor : reactorsByCh[chID]if reactor nil {if chID PexChannel {return} else {cmn.PanicSanity(cmn.Fmt(Unknown channel %X, chID))}}reactor.Receive(chID, p, msgBytes)}onError : func(r interface{}) {onPeerError(p, r)}return NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config)
}逻辑也比较简单就是当前面的c.onReceive(pkt.ChannelID, msgBytes)调用时它会根据传入的chID找到相应的Reactor然后执行其Receive方法。对于本文来说就会进入到ProtocolReactor.Receive。 那我们继续看ProtocolReactor.Receive: netsync/protocol_reactor.go#L179-L247 func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {_, msg, err : DecodeMessage(msgBytes)// ...switch msg : msg.(type) {case *BlockRequestMessage:// ...
}其中的DecodeMessage(...)就是把传入的二进制数据反序列化成一个BlockchainMessage对象该对象是一个没有任何内容的interface它有多种实现类型。我们在后面继续对该对象进行判断如果它是BlockRequestMessage类型的信息我们就会继续做相应的处理。处理的代码我在这里暂时省略了因为它是属于下一个小问题的我们先不考虑。 好像不知不觉我们就把第一个小问题的后半部分差不多搞清楚了。那么前半部分是什么我们在前面说读取bufReader的代码的起点是在MConnection.Start中那么前半部分就是比原从启动开始中是在什么情况下怎样一步步走到MConnection.Start的呢 好在前半部分的问题我们在前一篇文章《比原是如何把请求区块数据的信息发出去的》中进行了专门的讨论这里就不讲了有需要的话可以再过去看一下可以先看最后“总结”那一小节。 下面我们进入第二个小问题 收到BlockRequestMessage后将会给对方发送什么样的信息 这里就是接着前面的ProtocolReactor.Receive继续向下讲了。首先我们再贴一下它的较完整的代码 netsync/protocol_reactor.go#L179-L247 func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {_, msg, err : DecodeMessage(msgBytes)// ...switch msg : msg.(type) {case *BlockRequestMessage:var block *types.Blockvar err errorif msg.Height ! 0 {block, err pr.chain.GetBlockByHeight(msg.Height)} else {block, err pr.chain.GetBlockByHash(msg.GetHash())}// ...response, err : NewBlockResponseMessage(block)// ...src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{response})// ...
}可以看到逻辑还是比较简单的即根据对方发过来的BlockRequestMessage中指定的height或者hash信息在本地的区块链数据中找到相应的block组成BlockResponseMessage发过去就行了。 其中chain.GetBlockByHeight(...)和chain.GetBlockByHash(...)如果详细说明的话需要深刻理解区块链数据在比原节点中是如何保存的我们在本文先不讲等到后面专门研究。 在这里我觉得我们只需要知道我们会查询区块数据并且构造出一个BlockResponseMessage再通过BlockchainChannel这个通道发送出去就可以了。 最后一句代码中调用了src.TrySend方法它是把信息向对方peer发送过去。其中的src就是指的对方peer 那么它到底是怎么发送出去的呢下面我们进入最后一个小问题 这个BlockResponseMessage信息是如何发送出去的 我们先看看peer.TrySend代码 p2p/peer.go#L242-L247 func (p *Peer) TrySend(chID byte, msg interface{}) bool {if !p.IsRunning() {return false}return p.mconn.TrySend(chID, msg)
}它在内部将会调用MConnection.TrySend方法其中chID是BlockchainChannel也就是它对应的Reactor是ProtocolReactor。 再接着就是我们熟悉的MConnection.TrySend由于它在前一篇文章中进行了全面的讲解在本文就不提了如果需要可以过去翻看一下。 那么今天的问题就算是解决啦。 到这里我们总算能够完整的理解清楚当我们向一个比原节点请求“区块数据”我们这边需要怎么做对方节点又需要怎么做了。 转载于:https://www.cnblogs.com/bytom/p/9355998.html