Go微服务实战|第11章:gRPC Interceptors
Synopsis: gRPC 提供了相关 API 让客户端或服务端实现拦截器,它可以拦截每个 RPC 调用的执行,从而可以使用拦截器来执行身份验证/授权、日志记录、监控指标收集等功能
代码已上传到 https://github.com/wangy8961/grpc-go-tutorial/tree/v0.11 ,欢迎 star
拦截器(Interceptor)
类似于 HTTP 应用中的 中间件(Middleware)
,能够让你在真正执行每个 RPC 方法前,进行身份认证、日志记录、缓存等通用操作,如果你熟悉 Python 编程的话,这跟 装饰器(Decorator)
的作用基本一样
gRPC
中使用 UnaryInterceptor 来实现 Unary RPC(第 2 篇) 的拦截器,使用 StreamInterceptor 来实现 Stream RPC(第 3 - 5 篇)的拦截器
1. 客户端拦截器
1.1 UnaryClientInterceptor
客户端如果想在执行每个 Unary RPC 前进行拦截的话,需要调用 grpc.WithUnaryInterceptor
并返回包含客户端一元拦截器的 grpc.DialOption
参数,然后再传入 grpc.Dial()
函数:
// WithUnaryInterceptor returns a DialOption that specifies the interceptor for // unary RPCs. func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption { ... }
那么 UnaryClientInterceptor
类型的参数又是什么呢?
// UnaryInvoker is called by UnaryClientInterceptor to complete RPCs. type UnaryInvoker func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error // UnaryClientInterceptor intercepts the execution of a unary RPC on the client. invoker is the handler to complete the RPC // and it is the responsibility of the interceptor to call it. // This is an EXPERIMENTAL API. type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error
它就是客户端一元拦截器,此函数实现了拦截器的具体逻辑,它的主要参数:
🍭1. ctx context.Context
: 请求上下文
💖2. method string
: 要调用的 Unary RPC 方法名
🏆3. req
和 reply interface{}
: RPC 的请求参数和响应
💎4. cc *ClientConn
: 客户端连接
🚀5. invoker UnaryInvoker
: 是一个函数,用来真正调用客户端 Unary RPC
👑6. opts ...CallOption
: 0 个或多个调用时的选项,比如是否自动附带 access token 到请求头部等
下面我们就来实现一个非常简单的拦截器,记录调用 RPC 的执行耗时:
// unaryInterceptor is an example for client unary interceptor. func unaryInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { // Logic before invoking the invoker start := time.Now() // Calls the invoker to execute RPC err := invoker(ctx, method, req, reply, cc, opts...) // Logic after invoking the invoker end := time.Now() logger("Invoked RPC method: %s, Duration time: %s, Error: %v", method, time.Since(start), err) return err }
代码实现:
创建 grpc-go-tutorial/features/interceptor/client/main.go
文件:
// Package main implements a client for Echo service. package main import ( "context" "flag" "fmt" "log" "time" "golang.org/x/oauth2" "google.golang.org/grpc/credentials/oauth" "google.golang.org/grpc/credentials" pb "github.com/wangy8961/grpc-go-tutorial/features/echopb" "google.golang.org/grpc" ) func unaryCall(client pb.EchoClient) { fmt.Printf("--- gRPC Unary RPC Call ---\n") // 设置 10 秒超时时长,可参考 https://madmalls.com/blog/post/grpc-deadline/#21-contextwithtimeout ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() // 调用 Unary RPC req := &pb.EchoRequest{Message: "madmalls.com"} resp, err := client.UnaryEcho(ctx, req) if err != nil { log.Fatalf("failed to call UnaryEcho: %v", err) } fmt.Printf("response:\n") fmt.Printf(" - %q\n", resp.GetMessage()) } // client-side unary interceptor func unaryAuthInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { opts = append(opts, grpc.PerRPCCredentials(oauth.NewOauthAccess(&oauth2.Token{ AccessToken: "some-oauth2-secret-token", }))) err := invoker(ctx, method, req, reply, cc, opts...) return err } func main() { addr := flag.String("addr", "localhost:50051", "the address to connect to") certFile := flag.String("cacert", "cacert.pem", "CA root certificate") flag.Parse() creds, err := credentials.NewClientTLSFromFile(*certFile, "") if err != nil { log.Fatalf("failed to load CA root certificate: %v", err) } opts := []grpc.DialOption{ // 1. TLS Credential grpc.WithTransportCredentials(creds), // 2. Client Unary Interceptor grpc.WithUnaryInterceptor(unaryAuthInterceptor), } // Set up a connection to the server. conn, err := grpc.Dial(*addr, opts...) // To call service methods, we first need to create a gRPC channel to communicate with the server. We create this by passing the server address and port number to grpc.Dial() if err != nil { log.Fatalf("did not connect: %v", err) } defer conn.Close() c := pb.NewEchoClient(conn) // Once the gRPC channel is setup, we need a client stub to perform RPCs. We get this using the NewEchoClient method provided in the pb package we generated from our .proto. // Contact the server and print out its response. // 1. Unary RPC Call unaryCall(c) }
然后用它来调用我们 上一篇 实现的 OAuth2 Token
认证的 服务端:
D:\golang-code\grpc-go-tutorial\features\interceptor\client>go run main.go -addr 192.168.40.123:50051 --- gRPC Unary RPC Call --- response: - "madmalls.com"
服务端输出如下(说明客户端通过 拦截器
自动附带了 token 到请求头中):
[root@CentOS server]# go run main.go server listening at [::]:50051 --- gRPC Unary RPC --- request received: message:"madmalls.com" Type of 'metadata.MD' is metadata.MD, and its value is map[:authority:[192.168.40.123:50051] authorization:[Bearer some-oauth2-secret-token] content-type:[application/grpc] user-agent:[grpc-g o/1.21.1]] Type of 'authorization' is []string, and its value is [Bearer some-oauth2-secret-token]
1.2 StreamClientInterceptor
客户端如果想在执行每个 Streaming RPC 前进行拦截的话,需要调用 grpc.WithStreamInterceptor
并返回包含客户端流拦截器的 grpc.DialOption
参数,然后再传入 grpc.Dial()
函数:
// WithStreamInterceptor returns a DialOption that specifies the interceptor for // streaming RPCs. func WithStreamInterceptor(f StreamClientInterceptor) DialOption { ... }
同样地,StreamClientInterceptor
实现了客户端流拦截器的具体逻辑,它的参数 streamer
是真正调用客户端 Streaming RPC 的地方:
// Streamer is called by StreamClientInterceptor to create a ClientStream. type Streamer func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) // StreamClientInterceptor intercepts the creation of ClientStream. It may return a custom ClientStream to intercept all I/O // operations. streamer is the handler to create a ClientStream and it is the responsibility of the interceptor to call it. // This is an EXPERIMENTAL API. type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)
代码实现:
修改 grpc-go-tutorial/features/interceptor/client/main.go
文件:
// Package main implements a client for Echo service. package main import ( "context" "flag" "fmt" "io" "log" "time" "golang.org/x/oauth2" "google.golang.org/grpc/credentials/oauth" "google.golang.org/grpc/credentials" pb "github.com/wangy8961/grpc-go-tutorial/features/echopb" "google.golang.org/grpc" ) func unaryCall(client pb.EchoClient) { fmt.Printf("--- gRPC Unary RPC Call ---\n") // 设置 10 秒超时时长,可参考 https://madmalls.com/blog/post/grpc-deadline/#21-contextwithtimeout ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() // 调用 Unary RPC req := &pb.EchoRequest{Message: "madmalls.com"} resp, err := client.UnaryEcho(ctx, req) if err != nil { log.Fatalf("failed to call UnaryEcho: %v", err) } fmt.Printf("response:\n") fmt.Printf(" - %q\n", resp.GetMessage()) } func bidirectionalStreamingCall(c pb.EchoClient) { fmt.Printf("--- gRPC Bidirectional Streaming RPC Call ---\n") // 设置 10 秒超时时长,可参考 https://madmalls.com/blog/post/grpc-deadline/#21-contextwithtimeout ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() // Make bidirectional streaming RPC stream, err := c.BidirectionalStreamingEcho(ctx) if err != nil { log.Fatalf("failed to call BidirectionalStreamingEcho: %v", err) } // Send all requests to the server for i := 0; i < 5; i++ { if err := stream.Send(&pb.EchoRequest{Message: fmt.Sprintf("Request %d", i+1)}); err != nil { log.Fatalf("failed to send request due to error: %v", err) } } // closes the send direction of the stream stream.CloseSend() // Read all the responses var rpcStatus error fmt.Printf("response:\n") for { resp, err := stream.Recv() if err != nil { rpcStatus = err break } fmt.Printf(" - %q\n", resp.Message) } if rpcStatus != io.EOF { log.Fatalf("failed to finish server streaming: %v", rpcStatus) } } // client-side unary interceptor (For Authentication) func unaryAuthInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { opts = append(opts, grpc.PerRPCCredentials(oauth.NewOauthAccess(&oauth2.Token{ AccessToken: "some-oauth2-secret-token", }))) err := invoker(ctx, method, req, reply, cc, opts...) return err } // client-side streaming interceptor (For Authentication) func streamAuthInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { opts = append(opts, grpc.PerRPCCredentials(oauth.NewOauthAccess(&oauth2.Token{ AccessToken: "some-oauth2-secret-token", }))) s, err := streamer(ctx, desc, cc, method, opts...) if err != nil { return nil, err } return s, nil } func main() { addr := flag.String("addr", "localhost:50051", "the address to connect to") certFile := flag.String("cacert", "cacert.pem", "CA root certificate") flag.Parse() creds, err := credentials.NewClientTLSFromFile(*certFile, "") if err != nil { log.Fatalf("failed to load CA root certificate: %v", err) } opts := []grpc.DialOption{ // 1. TLS Credential grpc.WithTransportCredentials(creds), // 2. Client Unary Interceptor grpc.WithUnaryInterceptor(unaryAuthInterceptor),
0 条评论
评论者的用户名
评论时间暂时还没有评论.