官方网站下载拼多多app,网络营销推广方法和工具,网站优化简历模板,wordpress广告联盟插件在《用最少的代码模拟gRPC四种消息交换模式》中#xff0c;我使用很简单的代码模拟了gRPC四种消息交换模式#xff08;Unary、Client Streaming、Server Streaming和Duplex Streaming#xff09;#xff0c;现在我们更近一步#xff0c;试着使用极简的方式打造一个gRPC框架…在《用最少的代码模拟gRPC四种消息交换模式》中我使用很简单的代码模拟了gRPC四种消息交换模式Unary、Client Streaming、Server Streaming和Duplex Streaming现在我们更近一步试着使用极简的方式打造一个gRPC框架https://github.com/jiangjinnan/grpc-mini。这个gRPC是对ASP.NET Core gRPC实现原理的模拟并不是想重新造一个轮子。一、“标准”的gRPC定义、承载和调用二、将gRPC方法抽象成委托三、将委托转换成RequestDelegate UnaryCallHandler ClientStreamingCallHandler ServerStreamingCallHandler DuplexStreamingCallHandler四、路由注册五、为gRPC服务定义一个接口六、重新定义和承载服务一、“标准”的gRPC定义、承载和调用可能有些读者朋友们对ASP.NET Core gRPC还不是太熟悉所以我们先来演示一下如何在一个ASP.NET Core应用中如何定义和承载一个简单的gRPC服务并使用自动生成的客户端代码进行调用。我们新建一个空的解决方案并在其中添加如下所示的三个项目。我们在类库项目Proto中定义了如下所示Greeter服务并利用其中定义的四个操作分别模拟四种消息交换模式。HelloRequest 和HelloReply 是它们涉及的两个ProtoBuf消息。syntax proto3;
import google/protobuf/empty.proto;service Greeter {rpc SayHelloUnary (HelloRequest) returns ( HelloReply);rpc SayHelloServerStreaming (google.protobuf.Empty) returns (stream HelloReply);rpc SayHelloClientStreaming (stream HelloRequest) returns (HelloReply);rpc SayHelloDuplexStreaming (stream HelloRequest) returns (stream HelloReply);
}message HelloRequest {string name 1;
}message HelloReply {string message 1;
}ASP.NET Core项目中定义了如下的GreeterServce服务实现了定义的四个操作基类GreeterBase是针对上面这个.proto文件生成的类型。public class GreeterService: GreeterBase
{public override TaskHelloReply SayHelloUnary(HelloRequest request, ServerCallContext context) Task.FromResult(new HelloReply { Message $Hello, {request.Name} });public override async TaskHelloReply SayHelloClientStreaming(IAsyncStreamReaderHelloRequest reader, ServerCallContext context){var list new Liststring();while (await reader.MoveNext(CancellationToken.None)){list.Add(reader.Current.Name);}return new HelloReply { Message $Hello, {string.Join(,, list)} };}public override async Task SayHelloServerStreaming(Empty request, IServerStreamWriterHelloReply responseStream, ServerCallContext context){await responseStream.WriteAsync(new HelloReply { Message Hello, Foo! });await Task.Delay(1000);await responseStream.WriteAsync(new HelloReply { Message Hello, Bar! });await Task.Delay(1000);await responseStream.WriteAsync(new HelloReply { Message Hello, Baz! });}public override async Task SayHelloDuplexStreaming(IAsyncStreamReaderHelloRequest reader, IServerStreamWriterHelloReply writer, ServerCallContext context){while (await reader.MoveNext()){await writer.WriteAsync(new HelloReply { Message $Hello {reader.Current.Name} });}}
}具体的服务承载代码如下。我们采用Minimal API的形式通过调用IServiceCollection接口的AddGrpc扩展方法注册相关服务并调用MapGrpcServiceTService将定义在GreeterServce中的四个方法映射我对应的路由终结点。var builder WebApplication.CreateBuilder(args);
builder.Services.AddGrpc();
builder.WebHost.ConfigureKestrel(kestrel kestrel.ConfigureEndpointDefaults(options options.Protocols HttpProtocols.Http2));
var app builder.Build();
app.MapGrpcServiceGreeterService();
app.Run();在控制台项目Client中我们利用生成出来的客户端类型GreeterClient分别一对应的服务交换模式调用了四个gRPC方法。var channel GrpcChannel.ForAddress(http://localhost:5000);
var client new GreeterClient(channel);Console.WriteLine(Unary);
await UnaryCallAsync();Console.WriteLine(\nServer Streaming);
await ServerStreamingCallAsync();Console.WriteLine(\nClient Streaming);
await ClientStreamingCallAsync();Console.WriteLine(\nDuplex Streaming);
await DuplexStreamingCallAsync();
Console.ReadLine();async Task UnaryCallAsync()
{var request new HelloRequest { Name foobar };var reply await client.SayHelloUnaryAsync(request);Console.WriteLine(reply.Message);
}
async Task ServerStreamingCallAsync()
{var streamingCall client.SayHelloServerStreaming(new Empty());var reader streamingCall.ResponseStream;while (await reader.MoveNext(CancellationToken.None)){Console.WriteLine(reader.Current.Message);}
}
async Task ClientStreamingCallAsync()
{var streamingCall client.SayHelloClientStreaming();var writer streamingCall.RequestStream;await writer.WriteAsync(new HelloRequest { Name Foo });await Task.Delay(1000);await writer.WriteAsync(new HelloRequest { Name Bar });await Task.Delay(1000);await writer.WriteAsync(new HelloRequest { Name Baz });await writer.CompleteAsync();var reply await streamingCall.ResponseAsync;Console.WriteLine(reply.Message);
}
async Task DuplexStreamingCallAsync()
{var streamingCall client.SayHelloDuplexStreaming();var writer streamingCall.RequestStream;var reader streamingCall.ResponseStream;_ Task.Run(async () {await writer.WriteAsync(new HelloRequest { Name Foo });await Task.Delay(1000);await writer.WriteAsync(new HelloRequest { Name Bar });await Task.Delay(1000);await writer.WriteAsync(new HelloRequest { Name Baz });await writer.CompleteAsync();});await foreach (var reply in reader.ReadAllAsync()){Console.WriteLine(reply.Message);}
}如下所示的是客户端控制台上的输出结果。二、将gRPC方法抽象成委托通过上面的演示我们也知道承载的gRPC类型最终会将其实现的方法注册成路由终结点这一点其实和MVC是一样的。但是gRPC的方法和定义在Controller类型中的Action方法不同之处在于前者的签名其实是固定的。如果我们将请求和响应消息类型使用Request和Reply来表示四种消息交换模式的方法签名就可以写成如下的形式。TaskReply Unary(Request request, ServerCallContext context);
TaskReply ClientStreaming(IAsyncStreamReaderRequest reader, ServerCallContext context);
Task ServerStreaming(Empty request, IServerStreamWriterReply responseStream, ServerCallContext context);
Task DuplexStreaming(IAsyncStreamReaderRequest reader, IServerStreamWriterReply writer, ServerCallContext context);“流式”方法中用来读取请求和写入响应的IAsyncStreamReaderT和IServerStreamWriterT定义如下。public interface IAsyncStreamReaderout T
{T Current { get; }Taskbool MoveNext(CancellationToken cancellationToken default);
}
public interface IAsyncStreamWriterin T
{Task WriteAsync(T message, CancellationToken cancellationToken default);
}
public interface IServerStreamWriterin T : IAsyncStreamWriterT
{
}
public interface IClientStreamWriterin T : IAsyncStreamWriterT
{Task CompleteAsync();
}表示服务端调用上下文的ServerCallContext 类型具有丰富的成员但是它的本质就是对HttpContext上下文的封装所以我们对它进行了简化。如下面的代码片段所示我们给予这个上下文类型两个属性成员一个是表示请求上下文的HttpContext另一个则是用来设置响应状态StatusCode后者对应的枚举定义了完整的gRPC状态码。public class ServerCallContext
{public StatusCode StatusCode { get; set; } StatusCode.OK;public HttpContext HttpContext { get; }public ServerCallContext(HttpContext httpContext) HttpContext httpContext;
}public enum StatusCode
{OK 0,Cancelled 1,Unknown 2,InvalidArgument 3,DeadlineExceeded 4,NotFound 5,AlreadyExists 6,PermissionDenied 7,Unauthenticated 0x10,ResourceExhausted 8,FailedPrecondition 9,Aborted 10,OutOfRange 11,Unimplemented 12,Internal 13,Unavailable 14,DataLoss 0xF
}既然方法签名固定意味着我们可以将四种gRPC方法定义成如下四个对应的委托泛型参数TService、TRequest和TResponse分别表示服务、请求和响应类型。public delegate TaskTResponse UnaryMethodTService, TRequest, TResponse(TService service, TRequest request, ServerCallContext context)where TService : classwhere TRequest : IMessageTRequestwhere TResponse : IMessageTResponse;public delegate TaskTResponse ClientStreamingMethodTService, TRequest, TResponse(TService service, IAsyncStreamReaderTRequest reader, ServerCallContext context)where TService : classwhere TRequest : IMessageTRequestwhere TResponse : IMessageTResponse;public delegate Task ServerStreamingMethodTService, TRequest, TResponse(TService service, TRequest request, IServerStreamWriterTResponse writer, ServerCallContext context)where TService : classwhere TRequest : IMessageTRequestwhere TResponse : IMessageTResponse;public delegate Task DuplexStreamingMethodTService, TRequest, TResponse(TService service, IAsyncStreamReaderTRequest reader, IServerStreamWriterTResponse writer, ServerCallContext context)where TService : classwhere TRequest : IMessageTRequestwhere TResponse : IMessageTResponse;我们知道路由的本质就是创建一组路由模式Pattern和对应处理器之间的映射关系。路由模式很简单对应的路由模板为“{ServiceName}/{MethodName}”并且采用Post请求方法。对应的处理器最终体现为一个RequestDelegate。那么只要我们能够将上述四种委托类型都转换成RequestDelegate委托一切都迎刃而解了。三、将委托转换成RequestDelegate为了将四种委托类型转化成RequestDelegate我们将后者实现为一个ServiceCallHandler类型并为其定义了如下两个基类。ServerCallHandlerBase的HandleCallAsync方法正好与RequestDelegate委托的签名一致所以这个方法最终会用来处理gRPC请求。不同的消息交换模式采用不同的请求处理方式只需实现抽象方法HandleCallAsyncCore就可以了。HandleCallAsync方法在调用此抽象方法之前将响应的ContentType设置成gRPC标准的响应类型“application/grpc”。在此之后将状态码设置为“grpc-status”首部它将在HTTP2的DATA帧发送完毕后以HEADERS帧发送到客户端。这两项操作都是gRPC协议的一部分。public abstract class ServerCallHandlerBase
{public async Task HandleCallAsync(HttpContext httpContext){try{var serverCallContext new ServerCallContext(httpContext);var response httpContext.Response;response.ContentType application/grpc;await HandleCallAsyncCore(serverCallContext);SetStatus(serverCallContext.StatusCode);}catch{SetStatus(StatusCode.Unknown);}void SetStatus(StatusCode statusCode){httpContext.Response.AppendTrailer(grpc-status, ((int)statusCode).ToString());}}protected abstract Task HandleCallAsyncCore(ServerCallContext serverCallContext);
}public abstract class ServerCallHandlerTService, TRequest, TResponse : ServerCallHandlerBasewhere TService : classwhere TRequest : IMessageTRequestwhere TResponse : IMessageTResponse
{protected ServerCallHandler(MessageParserTRequest requestParser) RequestParser requestParser;public MessageParserTRequest RequestParser { get; }
}ServerCallHandlerTService, TRequest, TResponse派生自ServerCallHandlerBase并利用三个泛型参数TService、TRequest、TResponse来表示服务、请求和响应类型RequestParser用来提供发序列化请求消息的MessageParserTRequest对象。针对四种消息交换模式的ServiceCallHandler类型均继承这个泛型基类。UnaryCallHandler基于Unary消息交换模式的ServerCallHandler的具体类型为UnaryCallHandlerTService, TRequest, TResponse它由上述的UnaryMethodTService, TRequest, TResponse委托构建而成。在重写的HandleCallAsyncCore方法中我们利用HttpContext提供的IServiceProvider对象将服务实例创建出来后从请求主体中将请求消息读取出来然后交给指定的委托对象进行处理并得到响应消息该响应消息最终用来对当前请求予以回复。internal class UnaryCallHandlerTService, TRequest, TResponse : ServerCallHandlerTService, TRequest, TResponsewhere TService : classwhere TRequest : IMessageTRequestwhere TResponse : IMessageTResponse
{private readonly UnaryMethodTService, TRequest, TResponse _handler;public UnaryCallHandler(UnaryMethodTService, TRequest, TResponse handler, MessageParserTRequest requestParser):base(requestParser) _handler handler;protected override async Task HandleCallAsyncCore(ServerCallContext serverCallContext){using var scope serverCallContext.HttpContext.RequestServices.CreateScope();var service ActivatorUtilities.CreateInstanceTService(scope.ServiceProvider);var httpContext serverCallContext.HttpContext;var request await httpContext.Request.BodyReader.ReadSingleMessageAsyncTRequest(RequestParser);var reply await _handler(service, request!, serverCallContext);await httpContext.Response.BodyWriter.WriteMessageAsync(reply);}
}请求消息是通过如下这个ReadSingleMessageAsyncTMessage方法读取出来的。按照gRPC协议通过网络传输的请求和响应消息都会在前面追加5个字节第一个字节表示消息是否经过加密后面四个字节是一个以大端序表示的整数表示消息的长度。对于其他消息交换模式也是调用Buffers的TryReadMessageTRequest方法从缓冲区中读取请求消息。public static async TaskTMessage ReadSingleMessageAsyncTMessage(this PipeReader reader, MessageParserTMessage parser) where TMessage:IMessageTMessage
{while (true){var result await reader.ReadAsync();var buffer result.Buffer;if (Buffers.TryReadMessage(parser, ref buffer, out var message)){return message!;}reader.AdvanceTo(buffer.Start, buffer.End);if (result.IsCompleted){break;}}throw new IOException(Fails to read message.);
}internal static class Buffers
{public static readonly int HeaderLength 5;public static bool TryReadMessageTRequest(MessageParserTRequest parser, ref ReadOnlySequencebyte buffer, out TRequest? message) where TRequest: IMessageTRequest{if (buffer.Length HeaderLength){message default;return false;}Spanbyte lengthBytes stackalloc byte[4];buffer.Slice(1, 4).CopyTo(lengthBytes);var length BinaryPrimitives.ReadInt32BigEndian(lengthBytes);if (buffer.Length length HeaderLength){message default;return false;}message parser.ParseFrom(buffer.Slice(HeaderLength, length));buffer buffer.Slice(length HeaderLength);return true;}
}如下这个WriteMessageAsync扩展方法负责输出响应消息。public static ValueTaskFlushResult WriteMessageAsync(this PipeWriter writer, IMessage message)
{var length message.CalculateSize();var span writer.GetSpan(5 length);span[0] 0;BinaryPrimitives.WriteInt32BigEndian(span.Slice(1, 4), length);message.WriteTo(span.Slice(5, length));writer.Advance(5 length);return writer.FlushAsync();
}ClientStreamingCallHandlerClientStreamingCallHandlerTService, TRequest, TResponse代表Client Streaming模式下的ServerCallHandler它由对应的ClientStreamingMethodTService, TRequest, TResponse委托创建而成。在重写的HandleCallAsyncCore方法中除了服务实例它还需要一个用来以“流”的方式读取请求的IAsyncStreamReaderTRequest对象它们都将作为参数传递给指定的委托后者执行后会返回最终的响应消息。此消息同样通过上面这个WriteMessageAsync扩展方法予以回复。internal class ClientStreamingCallHandlerTService, TRequest, TResponse : ServerCallHandlerTService, TRequest, TResponsewhere TService : classwhere TRequest : IMessageTRequestwhere TResponse : IMessageTResponse
{private readonly ClientStreamingMethodTService, TRequest, TResponse _handler;public ClientStreamingCallHandler(ClientStreamingMethodTService, TRequest, TResponse handler, MessageParserTRequest requestParser):base(requestParser){_handler handler;}protected override async Task HandleCallAsyncCore(ServerCallContext serverCallContext){using var scope serverCallContext.HttpContext.RequestServices.CreateScope();var service ActivatorUtilities.CreateInstanceTService(scope.ServiceProvider);var reader serverCallContext.HttpContext.Request.BodyReader;var writer serverCallContext.HttpContext.Response.BodyWriter;var streamReader new HttpContextStreamReaderTRequest(serverCallContext.HttpContext, RequestParser);var response await _handler(service, streamReader, serverCallContext);await writer.WriteMessageAsync(response);}
}IAsyncStreamReaderT接口的实现类型为如下这个HttpContextStreamReaderT。在了解了请求消息在网络中的结构之后对于实现在该类型中针对请求的读取操作应该不难理解。public class HttpContextStreamReaderT : IAsyncStreamReaderT where T : IMessageT
{private readonly PipeReader _reader;private readonly MessageParserT _parser;private ReadOnlySequencebyte _buffer;public HttpContextStreamReader(HttpContext httpContext, MessageParserT parser){_reader httpContext.Request.BodyReader;_parser parser;}public T Current { get; private set; } default!;public async Taskbool MoveNext(CancellationToken cancellationToken){var completed false;if (_buffer.IsEmpty){var result await _reader.ReadAsync(cancellationToken);_buffer result.Buffer;completed result.IsCompleted;}if (Buffers.TryReadMessage(_parser, ref _buffer, out var mssage)){Current mssage!;_reader.AdvanceTo(_buffer.Start, _buffer.End);return true;}_reader.AdvanceTo(_buffer.Start, _buffer.End);_buffer default;return !completed await MoveNext(cancellationToken);}
}ServerStreamingCallHandlerServerStreamingCallHandlerTService, TRequest, TResponse代表Server Streaming模式下的ServerCallHandler它由对应的ServerStreamingMethodTService, TRequest, TResponse委托创建而成。在重写的HandleCallAsyncCore方法中除了服务实例它还需要一个用来以“流”的方式写入响应的IAsyncStreamWriterTResponse对象它们都将作为参数传递给指定的委托。internal class ServerStreamingCallHandlerTService, TRequest, TResponse : ServerCallHandlerTService, TRequest, TResponsewhere TService : classwhere TRequest : IMessageTRequestwhere TResponse : IMessageTResponse
{private readonly ServerStreamingMethodTService, TRequest, TResponse _handler;public ServerStreamingCallHandler(ServerStreamingMethodTService, TRequest, TResponse handler, MessageParserTRequest requestParser):base(requestParser) _handler handler;protected override async Task HandleCallAsyncCore(ServerCallContext serverCallContext){using var scope serverCallContext.HttpContext.RequestServices.CreateScope();var service ActivatorUtilities.CreateInstanceTService(scope.ServiceProvider);var httpContext serverCallContext.HttpContext;var streamWriter new HttpContextStreamWriterTResponse(httpContext);var request await httpContext.Request.BodyReader.ReadSingleMessageAsync(RequestParser);await _handler(service, request, streamWriter, serverCallContext);}
}IAsyncStreamWriterT接口的实现类型为如下这个HttpContextStreamWriterT它直接调用上面定义的WriteMessageAsync扩展方法将指定的消息写入响应主体的输出流。public class HttpContextStreamWriterT : IServerStreamWriterT where T : IMessageT
{private readonly PipeWriter _writer;public HttpContextStreamWriter(HttpContext httpContext) _writer httpContext.Response.BodyWriter;public Task WriteAsync(T message, CancellationToken cancellationToken default){cancellationToken.ThrowIfCancellationRequested();return _writer.WriteMessageAsync(message).AsTask();}
}DuplexStreamingCallHandlerDuplexStreamingCallHandlerTService, TRequest, TResponse代表Duplex Streaming模式下的ServerCallHandler它由对应的DuplexStreamingMethodTService, TRequest, TResponse委托创建而成。在重写的HandleCallAsyncCore方法中除了服务实例它还需要分别创建以“流”的方式读/写请求/响应的IAsyncStreamReaderTRequest和IAsyncStreamWriterTResponse对象对应的类型分别为上面定义的HttpContextStreamReaderTRequest和HttpContextStreamWriterTResponse。internal class DuplexStreamingCallHandlerTService, TRequest, TResponse : ServerCallHandlerTService, TRequest, TResponsewhere TService : classwhere TRequest : IMessageTRequestwhere TResponse : IMessageTResponse
{private readonly DuplexStreamingMethodTService, TRequest, TResponse _handler;public DuplexStreamingCallHandler(DuplexStreamingMethodTService, TRequest, TResponse handler, MessageParserTRequest requestParser) :base(requestParser) _handler handler;protected override async Task HandleCallAsyncCore(ServerCallContext serverCallContext){using var scope serverCallContext.HttpContext.RequestServices.CreateScope();var service ActivatorUtilities.CreateInstanceTService(scope.ServiceProvider); var streamReader new HttpContextStreamReaderTRequest(serverCallContext.HttpContext, RequestParser);var streamWriter new HttpContextStreamWriterTResponse(serverCallContext.HttpContext);await _handler(service, streamReader, streamWriter, serverCallContext);}
}四、路由注册目前我们将针对四种消息交换模式的gRPC方法抽象成对应的泛型委托并且可以利用它们创建ServerCallHandler后者可以提供作为路由终结点处理器的RequestDelegate委托。枚举和对应ServerCallHandler之间的映射关系如下所示UnaryMethodTService, TRequest, TResponseUnaryCallHandlerTService, TRequest, TResponseClientStreamingMethodTService, TRequest, TResponseClientStreamingCallHandlerTService, TRequest, TResponseServerStreamingMethodTService, TRequest, TResponseServerStreamingCallHandlerTService, TRequest, TResponseDuplexStreamingMethodTService, TRequest, TResponseDuplexStreamingCallHandlerTService, TRequest, TResponse现在我们将整个路由注册的流程串起来为此我们定义了如下这个IServiceBinderTService接口它提供了两种方式将定义在服务类型TService中的gRPC方法注册成对应的路由终结点。public interface IServiceBinderTService where TService : class
{IServiceBinderTService AddUnaryMethodTRequest, TResponse(string methodName, FuncTService, FuncTRequest, ServerCallContext, TaskTResponse methodAccessor, MessageParserTRequest parser)where TRequest : IMessageTRequestwhere TResponse : IMessageTResponse;IServiceBinderTService AddClientStreamingMethodTRequest, TResponse(string methodName, FuncTService, FuncIAsyncStreamReaderTRequest, ServerCallContext, TaskTResponse methodAccessor, MessageParserTRequest parser)where TRequest : IMessageTRequestwhere TResponse : IMessageTResponse;IServiceBinderTService AddServerStreamingMethodTRequest, TResponse(string methodName, FuncTService, FuncTRequest, IServerStreamWriterTResponse, ServerCallContext, Task methodAccessor, MessageParserTRequest parser)where TRequest : IMessageTRequestwhere TResponse : IMessageTResponse;IServiceBinderTService AddDuplexStreamingMethodTRequest, TResponse(string methodName, FuncTService, FuncIAsyncStreamReaderTRequest, IServerStreamWriterTResponse, ServerCallContext, Task methodAccessor, MessageParserTRequest parser)where TRequest : IMessageTRequestwhere TResponse : IMessageTResponse;IServiceBinderTService AddUnaryMethodTRequest, TResponse(ExpressionFuncTService, TaskTResponse methodAccessor, MessageParserTRequest parser)where TRequest : IMessageTRequestwhere TResponse : IMessageTResponse;IServiceBinderTService AddClientStreamingMethodTRequest, TResponse( ExpressionFuncTService, TaskTResponse methodAccessor, MessageParserTRequest parser)where TRequest : IMessageTRequestwhere TResponse : IMessageTResponse;IServiceBinderTService AddServerStreamingMethodTRequest, TResponse( ExpressionFuncTService, Task methodAccessor, MessageParserTRequest parser)where TRequest : IMessageTRequestwhere TResponse : IMessageTResponse;IServiceBinderTService AddDuplexStreamingMethodTRequest, TResponse( ExpressionFuncTService, Task methodAccessor, MessageParserTRequest parser)where TRequest : IMessageTRequestwhere TResponse : IMessageTResponse;
}路由终结点由路由模式和处理器两个元素组成路由模式主要体现在由gRPC服务和操作名称组成的路由模板我们默认使用服务类型的名称和方法名称提出Async后缀。为了能够对这两个名称进行定制我们定义了如下两个特性GrpcServiceAttribute和GrpcMethodAttribute它们可以分别标注在服务类型和操作方法上来指定一个任意的名称。[AttributeUsage(AttributeTargets.Class)]
public class GrpcServiceAttribute: Attribute
{public string? ServiceName { get; set; }
}[AttributeUsage(AttributeTargets.Method)]
public class GrpcMethodAttribute : Attribute
{public string? MethodName { get; set; }
}如下所示的ServiceBinderTService 是对IServiceBinderTService 接口的实现它是对一个IEndpointRouteBuilder 对象的封装。对于实现的第一组方法我们利用提供的方法名称与解析TService类型得到的服务名称合并进而得到路由终结点的URL模板。这些方法还提供了一个针对gRPC方法签名的FuncTService,Func…委托我们利用它来将提供用于构建对应ServiceCallHandler的委托。我们最终利用IEndpointRouteBuilder 对象完成针对路由终结点的注册。public class ServiceBinderTService : IServiceBinderTService where TService : class
{private readonly IEndpointRouteBuilder _routeBuilder;public ServiceBinder(IEndpointRouteBuilder routeBuilder) _routeBuilder routeBuilder;public IServiceBinderTService AddUnaryMethodTRequest, TResponse(string methodName, FuncTService, FuncTRequest, ServerCallContext, TaskTResponse methodAccessor, MessageParserTRequest parser)where TRequest : IMessageTRequestwhere TResponse : IMessageTResponse{TaskTResponse GetMethod(TService service, TRequest request, ServerCallContext context) methodAccessor(service)(request, context);var callHandler new UnaryCallHandlerTService, TRequest, TResponse(GetMethod, parser);_routeBuilder.MapPost(ServiceBinderTService.GetPath(methodName), callHandler.HandleCallAsync);return this;}public IServiceBinderTService AddClientStreamingMethodTRequest, TResponse(string methodName, FuncTService, FuncIAsyncStreamReaderTRequest, ServerCallContext, TaskTResponse methodAccessor, MessageParserTRequest parser)where TRequest : IMessageTRequestwhere TResponse : IMessageTResponse{TaskTResponse GetMethod(TService service, IAsyncStreamReaderTRequest reader, ServerCallContext context) methodAccessor(service)(reader, context);var callHandler new ClientStreamingCallHandlerTService, TRequest, TResponse(GetMethod, parser);_routeBuilder.MapPost(ServiceBinderTService.GetPath(methodName), callHandler.HandleCallAsync);return this;}public IServiceBinderTService AddServerStreamingMethodTRequest, TResponse(string methodName, FuncTService, FuncTRequest, IServerStreamWriterTResponse, ServerCallContext, Task methodAccessor, MessageParserTRequest parser)where TRequest : IMessageTRequestwhere TResponse : IMessageTResponse{ServerStreamingMethodTService, TRequest, TResponse handler (service, request, writer, context) methodAccessor(service)(request, writer, context);var callHandler new ServerStreamingCallHandlerTService, TRequest, TResponse(handler, parser);_routeBuilder.MapPost(ServiceBinderTService.GetPath(methodName), callHandler.HandleCallAsync);return this;}public IServiceBinderTService AddDuplexStreamingMethodTRequest, TResponse(string methodName, FuncTService, FuncIAsyncStreamReaderTRequest, IServerStreamWriterTResponse, ServerCallContext, Task methodAccessor, MessageParserTRequest parser)where TRequest : IMessageTRequestwhere TResponse : IMessageTResponse{DuplexStreamingMethodTService, TRequest, TResponse handler (service, reader, writer, context) methodAccessor(service)(reader, writer, context);var callHandler new DuplexStreamingCallHandlerTService, TRequest, TResponse(handler, parser);_routeBuilder.MapPost(ServiceBinderTService.GetPath(methodName), callHandler.HandleCallAsync);return this;}private static string GetPath(string methodName){var serviceName typeof(TService).GetCustomAttributeGrpcServiceAttribute()?.ServiceName ?? typeof(TService).Name;if (methodName.EndsWith(Async)){methodName methodName.Substring(0, methodName.Length - 5);}return ${serviceName}/{methodName};}public IServiceBinderTService AddUnaryMethodTRequest, TResponse(ExpressionFuncTService, TaskTResponse methodAccessor, MessageParserTRequest parser)where TRequest : IMessageTRequestwhere TResponse : IMessageTResponse{var method CreateDelegateUnaryMethodTService, TRequest,TResponse(methodAccessor, out var methodName);var serviceName typeof(TService).GetCustomAttributeGrpcServiceAttribute()?.ServiceName ?? typeof(TService).Name;var callHandler new UnaryCallHandlerTService, TRequest, TResponse(method, parser);_routeBuilder.MapPost(ServiceBinderTService.GetPath(methodName), callHandler.HandleCallAsync);return this;}public IServiceBinderTService AddClientStreamingMethodTRequest, TResponse( ExpressionFuncTService, TaskTResponse methodAccessor, MessageParserTRequest parser)where TRequest : IMessageTRequestwhere TResponse : IMessageTResponse{var method CreateDelegateClientStreamingMethodTService, TRequest, TResponse(methodAccessor, out var methodName);var serviceName typeof(TService).GetCustomAttributeGrpcServiceAttribute()?.ServiceName ?? typeof(TService).Name;var callHandler new ClientStreamingCallHandlerTService, TRequest, TResponse(method, parser);_routeBuilder.MapPost(ServiceBinderTService.GetPath(methodName), callHandler.HandleCallAsync);return this;}public IServiceBinderTService AddServerStreamingMethodTRequest, TResponse(ExpressionFuncTService, Task methodAccessor, MessageParserTRequest parser)where TRequest : IMessageTRequestwhere TResponse : IMessageTResponse{var method CreateDelegateServerStreamingMethodTService, TRequest, TResponse(methodAccessor, out var methodName);var serviceName typeof(TService).GetCustomAttributeGrpcServiceAttribute()?.ServiceName ?? typeof(TService).Name;var callHandler new ServerStreamingCallHandlerTService, TRequest, TResponse(method, parser);_routeBuilder.MapPost(ServiceBinderTService.GetPath(methodName), callHandler.HandleCallAsync);return this;}public IServiceBinderTService AddDuplexStreamingMethodTRequest, TResponse(ExpressionFuncTService, Task methodAccessor, MessageParserTRequest parser)where TRequest : IMessageTRequestwhere TResponse : IMessageTResponse{var method CreateDelegateDuplexStreamingMethodTService, TRequest, TResponse(methodAccessor, out var methodName);var serviceName typeof(TService).GetCustomAttributeGrpcServiceAttribute()?.ServiceName ?? typeof(TService).Name;var callHandler new DuplexStreamingCallHandlerTService, TRequest, TResponse(method, parser);_routeBuilder.MapPost(ServiceBinderTService.GetPath(methodName), callHandler.HandleCallAsync);return this;}private TDelegate CreateDelegateTDelegate(LambdaExpression expression, out string methodName) where TDelegate : Delegate{var method ((MethodCallExpression)expression.Body).Method;methodName method.GetCustomAttributeGrpcMethodAttribute()?.MethodName ?? method.Name;return (TDelegate)Delegate.CreateDelegate(typeof(TDelegate), method);}
}由于第二组方法提供的针对gRPC方法调用的表达式所以我们可以得到描述方法的MethodInfo对象该对象不但解决了委托对象的创建问题还可以提供方法的名称所以这组方法无需提供gRPC方法的名称。但是提供的表达式并不能严格匹配方法的签名所以无法提供编译时的错误检验所以各有优缺点。五、为gRPC服务定义一个接口由于路由终结点的注册是针对服务类型进行的所以我们决定让服务类型自身来完成所有的路由注册工作。在这里我们使用C# 11中一个叫做“静态接口方法”的特性为服务类型定义如下这个IGrpcServiceTService接口服务类型TService定义的所有gRPC方法的路由注册全部在静态方法Bind中完成该方法将上述的IServiceBinderTService作为参数。public interface IGrpcServiceTService where TService:class
{static abstract void Bind(IServiceBinderTService binder);
}我们定义了如下这个针对IEndpointRouteBuilder 接口的扩展方法完成针对指定服务类型的路由注册。为了与现有的方法区别开来我特意将其命名为MapGrpcService2。该方法根据指定的IEndpointRouteBuilder 对象将ServiceBinderTService对象创建出来并作为参数调用服务类型的静态Bind方法。到此为止整个Mini版的gRPC服务端框架就构建完成了接下来我们看看它能否工作。public static class EndpointRouteBuilderExtensions
{public static IEndpointRouteBuilder MapGrpcService2TService(this IEndpointRouteBuilder routeBuilder) where TService : class, IGrpcServiceTService{var binder new ServiceBinderTService(routeBuilder);TService.Bind(binder);return routeBuilder;}
}六、重新定义和承载服务我们开篇演示了ASP.NET Core gRPC的服务定义、承载和调用。如果我们上面构建的Mini版gRPC框架能够正常工作意味着客户端代码可以保持不变我们现在就来试试看。我们在Server项目中将GreeterService服务类型改成如下的形式它不再继承任何基类只实现IGrpcServiceGreeterService接口。针对四种消息交换模式的四个方法的实现方法保持不变在实现的静态Bind方法中我们采用两种形式完成了针对这四个方法的路由注册。[GrpcService(ServiceName Greeter)]
public class GreeterService: IGrpcServiceGreeterService
{public TaskHelloReply SayHelloUnaryAsync(HelloRequest request, ServerCallContext context) Task.FromResult(new HelloReply { Message $Hello, {request.Name} });public async TaskHelloReply SayHelloClientStreamingAsync(IAsyncStreamReaderHelloRequest reader, ServerCallContext context){var list new Liststring();while (await reader.MoveNext(CancellationToken.None)){list.Add(reader.Current.Name);}return new HelloReply { Message $Hello, {string.Join(,, list)} };}public async Task SayHelloServerStreamingAsync(Empty request, IServerStreamWriterHelloReply responseStream, ServerCallContext context){await responseStream.WriteAsync(new HelloReply { Message Hello, Foo! });await Task.Delay(1000);await responseStream.WriteAsync(new HelloReply { Message Hello, Bar! });await Task.Delay(1000);await responseStream.WriteAsync(new HelloReply { Message Hello, Baz! });}public async Task SayHelloDuplexStreamingAsync(IAsyncStreamReaderHelloRequest reader, IServerStreamWriterHelloReply writer, ServerCallContext context){while (await reader.MoveNext()){await writer.WriteAsync(new HelloReply { Message $Hello {reader.Current.Name} });}}public static void Bind(IServiceBinderGreeterService binder){binder.AddUnaryMethodHelloRequest, HelloReply(it it.SayHelloUnaryAsync(default!,default!), HelloRequest.Parser).AddClientStreamingMethodHelloRequest, HelloReply(it it.SayHelloClientStreamingAsync(default!, default!), HelloRequest.Parser).AddServerStreamingMethodEmpty, HelloReply(nameof(SayHelloServerStreamingAsync), it it.SayHelloServerStreamingAsync, Empty.Parser).AddDuplexStreamingMethodHelloRequest, HelloReply(nameof(SayHelloDuplexStreamingAsync), it it.SayHelloDuplexStreamingAsync, HelloRequest.Parser);}}
}服务承载程序直接将针对MapGrpcServiceGreeterService方法的调用换成MapGrpcService2GreeterService。由于整个框架根本不需要预先注册任何的服务所以针对AddGrpc扩展方法的调用也可以删除。using GrpcMini;
using Microsoft.AspNetCore.Server.Kestrel.Core;var builder WebApplication.CreateBuilder(args);
builder.WebHost.ConfigureKestrel(kestrel kestrel.ConfigureEndpointDefaults(options options.Protocols HttpProtocols.Http2));
var app builder.Build();
app.MapGrpcService2Server.Greeter();
app.Run();再次运行我们的程序客户端依然可以得到相同的输出。