国内用什么做网站,门户wordpress主题下载,南宁伯才网络公司做网站好吗,wordpress会员无法注册作者 | 李志信 导读#xff1a;有了上一篇文章《Dubbo-go 源码笔记#xff08;一#xff09;Server 端开启服务过程》的铺垫#xff0c;可以类比客户端启动于服务端的启动过程。其中最大的区别是服务端通过 zk 注册服务#xff0c;发布自己的ivkURL并订阅事件开启监听有了上一篇文章《Dubbo-go 源码笔记一Server 端开启服务过程》的铺垫可以类比客户端启动于服务端的启动过程。其中最大的区别是服务端通过 zk 注册服务发布自己的ivkURL并订阅事件开启监听而客户应该是通过zk注册组件拿到需要调用的serviceURL更新invoker并重写用户的RPCService从而实现对远程过程调用细节的封装。 配置文件和客户端源代码
1. client 配置文件
helloworld 提供的 demoprofiles/client.yaml。
registries :demoZk:protocol: zookeepertimeout : 3saddress: 127.0.0.1:2181username: password:
references:UserProvider:# 可以指定多个registry使用逗号隔开;不指定默认向所有注册中心注册registry: demoZkprotocol : dubbointerface : com.ikurento.user.UserProvidercluster: failovermethods :- name: GetUserretries: 3
可看到配置文件与之前讨论过的 Server 端非常类似其 refrences 部分字段就是对当前服务要主调的服务的配置其中详细说明了调用协议、注册协议、接口 id、调用方法、集群策略等这些配置都会在之后与注册组件交互、重写 ivk、调用的过程中使用到。
2. 客户端使用框架源码
user.go
func init() {config.SetConsumerService(userProvider)hessian.RegisterPOJO(User{})
}
main.go
func main() {hessian.RegisterPOJO(User{})config.Load()time.Sleep(3e9)println(\n\n\nstart to test dubbo)user : User{}err : userProvider.GetUser(context.TODO(), []interface{}{A001}, user)if err ! nil {panic(err)}println(response result: %v\n, user)initSignal()
}
在官网提供的 helloworld demo 的源码中可看到与服务端类似在 user.go 内注册了 rpc-service以及需要 rpc 传输的结构体 user。
在 main 函数中同样调用了 config.Load() 函数之后就可以通过实现好的 rpc-serviceuserProvider 直接调用对应的功能函数即可实现 rpc 调用。
可以猜到从 hessian 注册结构、SetConsumerService到调用函数 .GetUser() 期间用户定义的 rpc-service 也就是 userProvider 对应的函数被重写重写后的 GetUser 函数已经包含实现了远程调用逻辑的 invoker。
接下来就要通过阅读源码看看 dubbo-go 是如何做到的。
实现远程过程调用
1. 加载配置文件
// file: config/config_loader.go :Load()// Load Dubbo Init
func Load() {// init routerinitRouter()// init the global event dispatcherextension.SetAndInitGlobalDispatcher(GetBaseConfig().EventDispatcherType)// start the metadata report if config setif err : startMetadataReport(GetApplicationConfig().MetadataType, GetBaseConfig().MetadataReportConfig); err ! nil {logger.Errorf(Provider starts metadata report error, and the error is {%#v}, err)return}// reference configloadConsumerConfig()
在 main 函数中调用了 config.Load() 函数进而调用了 loadConsumerConfig类似于之前讲到的 server 端配置读入函数。
在 loadConsumerConfig 函数中进行了三步操作
// config/config_loader.go
func loadConsumerConfig() {// 1 init other consumer configconConfigType : consumerConfig.ConfigTypefor key, value : range extension.GetDefaultConfigReader() {}checkApplicationName(consumerConfig.ApplicationConfig)configCenterRefreshConsumer()checkRegistries(consumerConfig.Registries, consumerConfig.Registry)// 2 refer-implement-referencefor key, ref : range consumerConfig.References {if ref.Generic {genericService : NewGenericService(key)SetConsumerService(genericService)}rpcService : GetConsumerService(key)ref.id keyref.Refer(rpcService)ref.Implement(rpcService)}// 3 wait for invoker is available, if wait over default 3s, then panicfor {}
}
检查配置文件并将配置写入内存在 for 循环内部依次引用refer并且实例化implement每个被调 reference等待三秒钟所有 invoker 就绪
其中重要的就是 for 循环里面的引用和实例化两步操作会在接下来展开讨论。
至此配置已经被写入了框架。
2. 获取远程 Service URL实现可供调用的 invoker
上述的 ref.Refer 完成的就是这部分的操作。 图一
1构造注册 url
和 server 端类似存在注册 url 和服务 urldubbo 习惯将服务 url 作为注册 url 的 sub。
// file: config/reference_config.go: Refer()
func (c *ReferenceConfig) Refer(_ interface{}) {//一配置url参数(serviceUrl)将会作为subcfgURL : common.NewURLWithOptions(common.WithPath(c.id),common.WithProtocol(c.Protocol),common.WithParams(c.getUrlMap()),common.WithParamsValue(constant.BEAN_NAME_KEY, c.id),)...// 二注册地址可以通过url格式给定也可以通过配置格式给定// 这一步的意义就是配置-提取信息生成URLif c.Url ! {// 用户给定url信息可以是点对点的地址也可以是注册中心的地址// 1. user specified URL, could be peer-to-peer address, or register centers address.urlStrings : gxstrings.RegSplit(c.Url, \\s*[;]\\s*)for _, urlStr : range urlStrings {serviceUrl, err : common.NewURL(urlStr)...}} else {// 配置读入注册中心的信息// assemble SubURL from register centers configuration mode// 这是注册urlprotocol registry,包含了zk的用户名、密码、ip等等c.urls loadRegistries(c.Registry, consumerConfig.Registries, common.CONSUMER)...// set url to regUrlsfor _, regUrl : range c.urls {regUrl.SubURL cfgURL// regUrl的subURl存当前配置url}}//至此无论通过什么形式已经拿到了全部的regURL// 三获取registryProtocol实例调用其Refer方法传入新构建好的regURLif len(c.urls) 1 {// 这一步访问到registry/protocol/protocol.go registryProtocol.Refer// 这里是registryc.invoker extension.GetProtocol(c.urls[0].Protocol).Refer(*c.urls[0])} else {// 如果有多个注册中心即有多个invoker,则采取集群策略invokers : make([]protocol.Invoker, 0, len(c.urls))...}
这个函数中已经处理完从 Register 配置到 RegisterURL 的转换,即图一中部分 接下来已经拿到的 url 将被传递给 RegistryProtocol进一步 refer。
2registryProtocol 获取到 zkRegistry 实例进一步 Refer
// file: registry/protocol/protocol.go: Refer// Refer provider service from registry center
// 拿到的是配置文件registries的url他能够生成一个invoker 指向目的addr以供客户端直接调用。
func (proto *registryProtocol) Refer(url common.URL) protocol.Invoker {var registryUrl url// 这里拿到的是referenceConfigserviceUrl里面包含了Reference的所有信息包含interfaceName、method等等var serviceUrl registryUrl.SubURLif registryUrl.Protocol constant.REGISTRY_PROTOCOL {// registryUrl.Proto registryprotocol : registryUrl.GetParam(constant.REGISTRY_KEY, )registryUrl.Protocol protocol//替换成了具体的值比如zookeeper}// 接口对象var reg registry.Registry// 一实例化接口对象缓存策略if regI, loaded : proto.registries.Load(registryUrl.Key()); !loaded {// 缓存中不存在当前registry新建一个regreg getRegistry(registryUrl)// 缓存起来proto.registries.Store(registryUrl.Key(), reg)} else {reg regI.(registry.Registry)}// 到这里获取到了reg实例 zookeeper的registry//二根据Register的实例zkRegistry和传入的regURL新建一个directory// 这一步存在复杂的异步逻辑从注册中心拿到了目的service的真实addr获取了invoker并放入directory// 这一步将在下面详细给出步骤// new registry directory for store service url from registrydirectory, err : extension.GetDefaultRegistryDirectory(registryUrl, reg)if err ! nil {logger.Errorf(consumer service %v create registry directory error, error message is %s, and will return nil invoker!,serviceUrl.String(), err.Error())return nil}// 三DoRegister 在zk上注册当前client serviceerr reg.Register(*serviceUrl)if err ! nil {logger.Errorf(consumer service %v register registry %v error, error message is %s,serviceUrl.String(), registryUrl.String(), err.Error())}// 四new cluster invoker将directory写入集群获得具有集群策略的invokercluster : extension.GetCluster(serviceUrl.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER))invoker : cluster.Join(directory)// invoker保存proto.invokers append(proto.invokers, invoker)return invoker
}
可详细阅读上述注释这个函数完成了从 url 到 invoker 的全部过程
一首先获得 Registry 对象默认是之前实例化的 zkRegistry和之前 server 获取 Registry 的处理很类似。
二通过构造一个新的 directory异步拿到之前在 zk 上注册的 server 端信息生成 invoker。
三在 zk 上注册当前 service。
四集群策略获得最终 invoker。
这一步完成了图一中所有余下的绝大多数操作接下来就需要详细地查看 directory 的构造过程。
3构造 directory包含较复杂的异步操作 图二
上述的 extension.GetDefaultRegistryDirectory(registryUrl, reg) 函数本质上调用了已经注册好的 NewRegistryDirectory 函数:
// file: registry/directory/directory.go: NewRegistryDirectory()// NewRegistryDirectory will create a new RegistryDirectory
// 这个函数作为default注册在extension上面
// url为注册urlreg为zookeeper registry
func NewRegistryDirectory(url *common.URL, registry registry.Registry) (cluster.Directory, error) {if url.SubURL nil {return nil, perrors.Errorf(url is invalid, suburl can not be nil)}dir : RegistryDirectory{BaseDirectory: directory.NewBaseDirectory(url),cacheInvokers: []protocol.Invoker{},cacheInvokersMap: sync.Map{},serviceType: url.SubURL.Service(),registry: registry,}dir.consumerConfigurationListener newConsumerConfigurationListener(dir)go dir.subscribe(url.SubURL)return dir, nil
}
首先构造了一个注册 directory开启协程调用其 subscribe 函数传入 serviceURL。
这个 directory 目前包含了对应的 zkRegistry以及传入的 URL它的 cacheInvokers 部分是空的。
进入 dir.subscribe(url.SubURL) 这个异步函数
/ file: registry/directory/directory.go: subscribe()// subscribe from registry
func (dir *RegistryDirectory) subscribe(url *common.URL) {// 增加两个监听dir.consumerConfigurationListener.addNotifyListener(dir)dir.referenceConfigurationListener newReferenceConfigurationListener(dir, url)// subscribe调用dir.registry.Subscribe(url, dir)
}
重点来了它调用了 zkRegistry 的 Subscribe 方法,与此同时将自己作为 ConfigListener 传入。 我认为这种传入 listener 的设计模式非常值得学习而且很有 java 的味道。 针对等待 zk 返回订阅信息这样的异步操作需要传入一个 Listener这个 Listener 需要实现 Notify 方法进而在作为参数传入内部之后可以被异步地调用 Notify将内部触发的异步事件“传递出来”再进一步处理加工。 层层的 Listener 事件链能将传入的原始 serviceURL 通过 zkConn 发送给 zk 服务获取到服务端注册好的 url 对应的二进制信息。 而 Notify 回调链则将这串 byte[] 一步一步解析、加工以事件的形式向外传递最终落到 directory 上的时候已经是成型的 newInvokers 了。 具体细节不再以源码形式展示可参照上图查阅源码。 至此已经拿到了 server 端注册好的真实 invoker。
完成了图一中的部分 4构造带有集群策略的 clusterinvoker
经过上述操作已经拿到了 server 端 Invokers放入了 directory 的 cacheinvokers 数组里面缓存。
后续的操作对应本文从 url 到 invoker 的过程的最后一步由 directory 生成带有特性集群策略的 invoker。
// 四new cluster invoker将directory写入集群获得具有集群策略的invokercluster : extension.GetCluster(serviceUrl.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER))invoker : cluster.Join(directory)
123
Join 函数的实现就是如下函数
// file: cluster/cluster_impl/failover_cluster_invokers.go: newFailoverClusterInvoker()func newFailoverClusterInvoker(directory cluster.Directory) protocol.Invoker {return failoverClusterInvoker{baseClusterInvoker: newBaseClusterInvoker(directory),}
}
12345
dubbo-go 框架默认选择 failover 策略既然返回了一个 invoker我们查看一下 failoverClusterInvoker 的 Invoker 方法看它是如何将集群策略封装到 Invoker 函数内部的
// file: cluster/cluster_impl/failover_cluster_invokers.go: Invoker()// Invoker 函数
func (invoker *failoverClusterInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {...//调用List方法拿到directory缓存的所有invokersinvokers : invoker.directory.List(invocation)if err : invoker.checkInvokers(invokers, invocation); err ! nil {// 检查是否可以实现调用return protocol.RPCResult{Err: err}}// 获取来自用户方向传入的methodName : invocation.MethodName()retries : getRetries(invokers, methodName)loadBalance : getLoadBalance(invokers[0], invocation)for i : 0; i retries; i {// 重要这里是集群策略的体现失败后重试//Reselect before retry to avoid a change of candidate invokers.//NOTE: if invokers changed, then invoked also lose accuracy.if i 0 {if err : invoker.checkWhetherDestroyed(); err ! nil {return protocol.RPCResult{Err: err}}invokers invoker.directory.List(invocation)if err : invoker.checkInvokers(invokers, invocation); err ! nil {return protocol.RPCResult{Err: err}}}// 这里是负载均衡策略的体现选择特定ivk进行调用。ivk : invoker.doSelect(loadBalance, invocation, invokers, invoked)if ivk nil {continue}invoked append(invoked, ivk)//DO INVOKEresult ivk.Invoke(ctx, invocation)if result.Error() ! nil {providers append(providers, ivk.GetUrl().Key())continue}return result}...
} 看了很多 Invoke 函数的实现所有类似的 Invoker 函数都包含两个方向一个是用户方向的 invcation一个是函数方向的底层 invokers。 而集群策略的 invoke 函数本身作为接线员把 invocation 一步步解析根据调用需求和集群策略选择特定的 invoker 来执行。 proxy 函数也是这样一个是用户方向的 ins[] reflect.Type, 一个是函数方向的 invoker。 proxy 函数负责将 ins 转换为 invocation调用对应 invoker 的 invoker 函数实现连通。 而出于这样的设计可以在一步步 Invoker 封装的过程中每个 Invoker 只关心自己负责操作的部分从而使整个调用栈解耦。 妙啊 至此我们理解了 failoverClusterInvoker 的 Invoke 函数实现也正是和这个集群策略 Invoker 被返回接受来自上方的调用。
已完成图一中的 5在 zookeeper 上注册当前 client
拿到 invokers 后可以回到这个函数了 // file: config/refrence_config.go: Refer()if len(c.urls) 1 {// 这一步访问到registry/protocol/protocol.go registryProtocol.Referc.invoker extension.GetProtocol(c.urls[0].Protocol).Refer(*c.urls[0])// 一拿到了真实的invokers} else {// 如果有多个注册中心即有多个invoker,则采取集群策略invokers : make([]protocol.Invoker, 0, len(c.urls))...cluster : extension.GetCluster(hitClu)// If zone-aware policy select, the invoker wrap sequence would be:// ZoneAwareClusterInvoker(StaticDirectory) -// FailoverClusterInvoker(RegistryDirectory, routing happens here) - Invokerc.invoker cluster.Join(directory.NewStaticDirectory(invokers))}// 二create proxy为函数配置代理if c.Async {callback : GetCallback(c.id)c.pxy extension.GetProxyFactory(consumerConfig.ProxyFactory).GetAsyncProxy(c.invoker, callback, cfgURL)} else {// 这里c.invoker已经是目的addr了c.pxy extension.GetProxyFactory(consumerConfig.ProxyFactory).GetProxy(c.invoker, cfgURL)}
我们有了可以打通的 invokers但还不能直接调用因为 invoker 的入参是 invocation而调用函数使用的是具体的参数列表需要通过一层 proxy 来规范入参和出参。
接下来新建一个默认 proxy放置在 c.proxy 内以供后续使用。
至此完成了图一中最后的操作 3. 将调用逻辑以代理函数的形式写入 rpc-service
上面完成了 config.Refer 操作回到 config/config_loader.go: loadConsumerConfig() 下一个重要的函数是 Implement它的操作较为简单旨在使用上面生成的 c.proxy 代理链接用户自己定义的 rpcService 到 clusterInvoker 的信息传输。
函数较长只选取了重要的部分:
// file: common/proxy/proxy.go: Implement()// Implement
// proxy implement
// In consumer, RPCService like:
// type XxxProvider struct {
// Yyy func(ctx context.Context, args []interface{}, rsp *Zzz) error
// }
// Implement 实现的过程就是proxy根据函数名和返回值通过调用invoker 构造出拥有远程调用逻辑的代理函数
// 将当前rpc所有可供调用的函数注册到proxy.rpc内
func (p *Proxy) Implement(v common.RPCService) {// makeDubboCallProxy 这是一个构造代理函数这个函数的返回值是func(in []reflect.Value) []reflect.Value 这样一个函数// 这个被返回的函数是请求实现的载体由他来发起调用获取结果makeDubboCallProxy : func(methodName string, outs []reflect.Type) func(in []reflect.Value) []reflect.Value {return func(in []reflect.Value) []reflect.Value {// 根据methodName和outs的类型构造这样一个函数这个函数能将in 输入的value转换为输出的value// 这个函数具体的实现如下...// 目前拿到了 methodName、所有入参的interface和value出参数reply// 一根据这些生成一个 rpcinvocationinv invocation_impl.NewRPCInvocationWithOptions(invocation_impl.WithMethodName(methodName),invocation_impl.WithArguments(inIArr),invocation_impl.WithReply(reply.Interface()),invocation_impl.WithCallBack(p.callBack),invocation_impl.WithParameterValues(inVArr))for k, value : range p.attachments {inv.SetAttachments(k, value)}// add user setAttachmentatm : invCtx.Value(constant.AttachmentKey) // 如果传入的ctx里面有attachment也要写入invif m, ok : atm.(map[string]string); ok {for k, value : range m {inv.SetAttachments(k, value)}}// 至此构造inv完毕// (二触发Invoker 之前已经将cluster_invoker放入proxy使用Invoke方法通过getty远程过程调用result : p.invoke.Invoke(invCtx, inv)// 如果有attachment则加入if len(result.Attachments()) 0 {invCtx context.WithValue(invCtx, constant.AttachmentKey, result.Attachments())}...}}numField : valueOfElem.NumField()for i : 0; i numField; i {t : typeOf.Field(i)methodName : t.Tag.Get(dubbo)if methodName {methodName t.Name}f : valueOfElem.Field(i)if f.Kind() reflect.Func f.IsValid() f.CanSet() { // 针对于每个函数outNum : t.Type.NumOut()// 规定函数输出只能有1/2个if outNum ! 1 outNum ! 2 {logger.Warnf(method %s of mtype %v has wrong number of in out parameters %d; needs exactly 1/2,t.Name, t.Type.String(), outNum)continue}// The latest return type of the method must be error.// 规定最后一个返回值一定是errorif returnType : t.Type.Out(outNum - 1); returnType ! typError {logger.Warnf(the latest return type %s of method %q is not error, returnType, t.Name)continue}// 获取到所有的出参类型放到数组里var funcOuts make([]reflect.Type, outNum)for i : 0; i outNum; i {funcOuts[i] t.Type.Out(i)}// do method proxy here:// 三调用make函数传入函数名和返回值获得能调用远程的proxy将这个proxy替换掉原来的函数位置f.Set(reflect.MakeFunc(f.Type(), makeDubboCallProxy(methodName, funcOuts)))logger.Debugf(set method [%s], methodName)}}...
}
正如之前所说proxy 的作用是将用户定义的函数参数列表转化为抽象的 invocation 传入 Invoker进行调用。
其中已标明有三处较为重要的地方
在代理函数中实现由参数列表生成 Invocation 的逻辑在代理函数实现调用 Invoker 的逻辑将代理函数替换为原始 rpc-service 对应函数
至此也就解决了一开始的问题 // file: client.go: main()config.Load()user : User{}err : userProvider.GetUser(context.TODO(), []interface{}{A001}, user)
这里直接调用用户定义的 rpcService 的函数 GetUser此处实际调用的是经过重写入的函数代理所以就能实现远程调用了。
从 client 到 server 的 invoker 嵌套链- 小结
在阅读 dubbo-go 源码的过程中我们能够发现一条清晰的 invoker-proxy 嵌套链希望能够通过图的形式来展现 如果你有任何疑问欢迎钉钉扫码加入钉钉交流群钉钉群号 23331795。
作者简介
李志信 (GitHubID LaurenceLiZhixin)中山大学软件工程专业在校学生擅长使用 Java/Go 语言专注于云原生和微服务等技术方向。 原文链接 本文为阿里云原创内容未经允许不得转载。