Go微服务实战|第11章:gRPC Interceptors

  • 原创
  • Madman
  • /
  • /
  • 0
  • 1515 次阅读

Go 微服务实战.png

Synopsis: gRPC 提供了相关 API 让客户端或服务端实现拦截器,它可以拦截每个 RPC 调用的执行,从而可以使用拦截器来执行身份验证/授权、日志记录、监控指标收集等功能

代码已上传到 https://github.com/wangy8961/grpc-go-tutorial/tree/v0.11 ,欢迎 star

拦截器(Interceptor) 类似于 HTTP 应用中的 中间件(Middleware),能够让你在真正执行每个 RPC 方法前,进行身份认证、日志记录、缓存等通用操作,如果你熟悉 Python 编程的话,这跟 装饰器(Decorator) 的作用基本一样

gRPC 中使用 UnaryInterceptor 来实现 Unary RPC(第 2 篇) 的拦截器,使用 StreamInterceptor 来实现 Stream RPC(第 3 - 5 篇)的拦截器

1. 客户端拦截器

1.1 UnaryClientInterceptor

客户端如果想在执行每个 Unary RPC 前进行拦截的话,需要调用 grpc.WithUnaryInterceptor 并返回包含客户端一元拦截器的 grpc.DialOption 参数,然后再传入 grpc.Dial() 函数:

// WithUnaryInterceptor returns a DialOption that specifies the interceptor for
// unary RPCs.
func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption {
    ...
}

那么 UnaryClientInterceptor 类型的参数又是什么呢?

// UnaryInvoker is called by UnaryClientInterceptor to complete RPCs.
type UnaryInvoker func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error

// UnaryClientInterceptor intercepts the execution of a unary RPC on the client. invoker is the handler to complete the RPC
// and it is the responsibility of the interceptor to call it.
// This is an EXPERIMENTAL API.
type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error

它就是客户端一元拦截器,此函数实现了拦截器的具体逻辑,它的主要参数:

🍭1. ctx context.Context: 请求上下文

💖2. method string: 要调用的 Unary RPC 方法名

🏆3. reqreply interface{}: RPC 的请求参数和响应

💎4. cc *ClientConn: 客户端连接

🚀5. invoker UnaryInvoker: 是一个函数,用来真正调用客户端 Unary RPC

👑6. opts ...CallOption: 0 个或多个调用时的选项,比如是否自动附带 access token 到请求头部等

下面我们就来实现一个非常简单的拦截器,记录调用 RPC 的执行耗时:

// unaryInterceptor is an example for client unary interceptor.
func unaryInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
    // Logic before invoking the invoker
    start := time.Now()
    // Calls the invoker to execute RPC
    err := invoker(ctx, method, req, reply, cc, opts...)
    // Logic after invoking the invoker
    end := time.Now()
    logger("Invoked RPC method: %s, Duration time: %s, Error: %v", method, time.Since(start), err)
    return err
}

代码实现:

创建 grpc-go-tutorial/features/interceptor/client/main.go 文件:

// Package main implements a client for Echo service.
package main

import (
    "context"
    "flag"
    "fmt"
    "log"
    "time"

    "golang.org/x/oauth2"
    "google.golang.org/grpc/credentials/oauth"

    "google.golang.org/grpc/credentials"

    pb "github.com/wangy8961/grpc-go-tutorial/features/echopb"
    "google.golang.org/grpc"
)

func unaryCall(client pb.EchoClient) {
    fmt.Printf("--- gRPC Unary RPC Call ---\n")

    // 设置 10 秒超时时长,可参考 https://madmalls.com/blog/post/grpc-deadline/#21-contextwithtimeout
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()
    // 调用 Unary RPC
    req := &pb.EchoRequest{Message: "madmalls.com"}
    resp, err := client.UnaryEcho(ctx, req)
    if err != nil {
        log.Fatalf("failed to call UnaryEcho: %v", err)
    }

    fmt.Printf("response:\n")
    fmt.Printf(" - %q\n", resp.GetMessage())
}

// client-side unary interceptor
func unaryAuthInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
    opts = append(opts, grpc.PerRPCCredentials(oauth.NewOauthAccess(&oauth2.Token{
        AccessToken: "some-oauth2-secret-token",
    })))
    err := invoker(ctx, method, req, reply, cc, opts...)
    return err
}

func main() {
    addr := flag.String("addr", "localhost:50051", "the address to connect to")
    certFile := flag.String("cacert", "cacert.pem", "CA root certificate")
    flag.Parse()

    creds, err := credentials.NewClientTLSFromFile(*certFile, "")
    if err != nil {
        log.Fatalf("failed to load CA root certificate: %v", err)
    }

    opts := []grpc.DialOption{
        // 1. TLS Credential
        grpc.WithTransportCredentials(creds),
        // 2. Client Unary Interceptor
        grpc.WithUnaryInterceptor(unaryAuthInterceptor),
    }

    // Set up a connection to the server.
    conn, err := grpc.Dial(*addr, opts...) // To call service methods, we first need to create a gRPC channel to communicate with the server. We create this by passing the server address and port number to grpc.Dial()
    if err != nil {
        log.Fatalf("did not connect: %v", err)
    }
    defer conn.Close()

    c := pb.NewEchoClient(conn) // Once the gRPC channel is setup, we need a client stub to perform RPCs. We get this using the NewEchoClient method provided in the pb package we generated from our .proto.

    // Contact the server and print out its response.
    // 1. Unary RPC Call
    unaryCall(c)
}

然后用它来调用我们 上一篇 实现的 OAuth2 Token 认证的 服务端

D:\golang-code\grpc-go-tutorial\features\interceptor\client>go run main.go -addr 192.168.40.123:50051
--- gRPC Unary RPC Call ---
response:
 - "madmalls.com"

服务端输出如下(说明客户端通过 拦截器 自动附带了 token 到请求头中):

[root@CentOS server]# go run main.go 
server listening at [::]:50051
--- gRPC Unary RPC ---
request received: message:"madmalls.com" 
Type of 'metadata.MD' is metadata.MD, and its value is map[:authority:[192.168.40.123:50051] authorization:[Bearer some-oauth2-secret-token] content-type:[application/grpc] user-agent:[grpc-g
o/1.21.1]] Type of 'authorization' is []string, and its value is [Bearer some-oauth2-secret-token]

1.2 StreamClientInterceptor

客户端如果想在执行每个 Streaming RPC 前进行拦截的话,需要调用 grpc.WithStreamInterceptor 并返回包含客户端流拦截器的 grpc.DialOption 参数,然后再传入 grpc.Dial() 函数:

// WithStreamInterceptor returns a DialOption that specifies the interceptor for
// streaming RPCs.
func WithStreamInterceptor(f StreamClientInterceptor) DialOption {
    ...
}

同样地,StreamClientInterceptor 实现了客户端流拦截器的具体逻辑,它的参数 streamer 是真正调用客户端 Streaming RPC 的地方:

// Streamer is called by StreamClientInterceptor to create a ClientStream.
type Streamer func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error)

// StreamClientInterceptor intercepts the creation of ClientStream. It may return a custom ClientStream to intercept all I/O
// operations. streamer is the handler to create a ClientStream and it is the responsibility of the interceptor to call it.
// This is an EXPERIMENTAL API.
type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)

代码实现:

修改 grpc-go-tutorial/features/interceptor/client/main.go 文件:

// Package main implements a client for Echo service.
package main

import (
    "context"
    "flag"
    "fmt"
    "io"
    "log"
    "time"

    "golang.org/x/oauth2"
    "google.golang.org/grpc/credentials/oauth"

    "google.golang.org/grpc/credentials"

    pb "github.com/wangy8961/grpc-go-tutorial/features/echopb"
    "google.golang.org/grpc"
)

func unaryCall(client pb.EchoClient) {
    fmt.Printf("--- gRPC Unary RPC Call ---\n")

    // 设置 10 秒超时时长,可参考 https://madmalls.com/blog/post/grpc-deadline/#21-contextwithtimeout
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    // 调用 Unary RPC
    req := &pb.EchoRequest{Message: "madmalls.com"}
    resp, err := client.UnaryEcho(ctx, req)
    if err != nil {
        log.Fatalf("failed to call UnaryEcho: %v", err)
    }

    fmt.Printf("response:\n")
    fmt.Printf(" - %q\n", resp.GetMessage())
}

func bidirectionalStreamingCall(c pb.EchoClient) {
    fmt.Printf("--- gRPC Bidirectional Streaming RPC Call ---\n")

    // 设置 10 秒超时时长,可参考 https://madmalls.com/blog/post/grpc-deadline/#21-contextwithtimeout
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    // Make bidirectional streaming RPC
    stream, err := c.BidirectionalStreamingEcho(ctx)
    if err != nil {
        log.Fatalf("failed to call BidirectionalStreamingEcho: %v", err)
    }

    // Send all requests to the server
    for i := 0; i < 5; i++ {
        if err := stream.Send(&pb.EchoRequest{Message: fmt.Sprintf("Request %d", i+1)}); err != nil {
            log.Fatalf("failed to send request due to error: %v", err)
        }
    }

    // closes the send direction of the stream
    stream.CloseSend()

    // Read all the responses
    var rpcStatus error
    fmt.Printf("response:\n")
    for {
        resp, err := stream.Recv()
        if err != nil {
            rpcStatus = err
            break
        }
        fmt.Printf(" - %q\n", resp.Message)
    }
    if rpcStatus != io.EOF {
        log.Fatalf("failed to finish server streaming: %v", rpcStatus)
    }
}

// client-side unary interceptor (For Authentication)
func unaryAuthInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
    opts = append(opts, grpc.PerRPCCredentials(oauth.NewOauthAccess(&oauth2.Token{
        AccessToken: "some-oauth2-secret-token",
    })))
    err := invoker(ctx, method, req, reply, cc, opts...)
    return err
}

// client-side streaming interceptor (For Authentication)
func streamAuthInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
    opts = append(opts, grpc.PerRPCCredentials(oauth.NewOauthAccess(&oauth2.Token{
        AccessToken: "some-oauth2-secret-token",
    })))
    s, err := streamer(ctx, desc, cc, method, opts...)
    if err != nil {
        return nil, err
    }
    return s, nil
}

func main() {
    addr := flag.String("addr", "localhost:50051", "the address to connect to")
    certFile := flag.String("cacert", "cacert.pem", "CA root certificate")
    flag.Parse()

    creds, err := credentials.NewClientTLSFromFile(*certFile, "")
    if err != nil {
        log.Fatalf("failed to load CA root certificate: %v", err)
    }

    opts := []grpc.DialOption{
        // 1. TLS Credential
        grpc.WithTransportCredentials(creds),
        // 2. Client Unary Interceptor
        grpc.WithUnaryInterceptor(unaryAuthInterceptor),
        // 3. Client Streaming Interceptor
        grpc
                                
                            
未经允许不得转载: LIFE & SHARE - 王颜公子 » Go微服务实战|第11章:gRPC Interceptors

分享

作者

作者头像

Madman

如需 Linux / Python 相关问题付费解答,请按如下方式联系我

0 条评论

暂时还没有评论.

专题系列