Go微服务实战|第5章:gRPC Bidirectional Streaming RPC

  • 原创
  • Madman
  • /
  • /
  • 0
  • 1700 次阅读

Go 微服务实战.png

Synopsis: gRPC Bidirectional Streaming RPC 是指客户端和服务端都可以同时发送或接收 Streaming 数据流,而且 Requests 和 Responses 的数量不要求一致,即它们是异步的,也许客户端发送了 3 个请求 messages,而服务端响应了 5 个响应 messages。它非常适合当客户端和服务端需要异步发送很多数据时,或者聊天场景,或者客户端和服务端的长时间连接情形等

代码已上传到 https://github.com/wangy8961/grpc-go-tutorial/tree/v0.5 ,欢迎 star

1. Protocol Buffers 定义

要定义 gRPC Bidirectional Streaming RPC,只需要在 Protocol buffers 中为 RPC 方法的请求和响应前都增加 stream 关键字即可

gRPC Bidirectional Streaming RPC

我们将为 Math 服务添加一个新的 RPC 调用,动态 查找客户端发送的一系列数值中的最大值

修改 grpc-go-tutorial/math/mathpb/math.proto 文件:

syntax = "proto3";

option go_package="mathpb";

package math;

// The math service definition.
service Math {
    ...

    // Maximum is bi-directional streaming RPC
    rpc Maximum(stream MaximumRequest) returns (stream MaximumResponse) {};
}

...

// The request message for Maximum.
message MaximumRequest {
    int32 num = 1;
}

// The response message for Maximum.
message MaximumResponse {
    int32 result = 1;
}

然后切换到 .proto 文件所在的目录:

[root@CentOS ~]# cd grpc-go-tutorial/math/mathpb/
[root@CentOS mathpb]# protoc --go_out=plugins=grpc:. *.proto

将会更新 math.pb.go 文件

2. 实现 gRPC 服务端

服务端需要实现 math.pb.go 中的 server API:

// MathServer is the server API for Math service.
type MathServer interface {
    ...

    // Maximum is bi-directional streaming RPC
    Maximum(Math_MaximumServer) error
}

所以,修改 grpc-go-tutorial/math/math_server/main.go 文件:

// Package main implements a server for Math service.
package main

import (
    "context"
    "flag"
    "fmt"
    "io"
    "log"
    "net"

    pb "github.com/wangy8961/grpc-go-tutorial/math/mathpb"
    "google.golang.org/grpc"
)

// server is used to implement mathpb.MathServer.
type server struct{}

...

// Maximum implements mathpb.MathServer
func (s *server) Maximum(stream pb.Math_MaximumServer) error {
    fmt.Printf("--- gRPC Bidirectional Streaming RPC ---\n")

    // Read requests and send responses
    maximum := int32(0)

    for {
        in, err := stream.Recv()
        if err == io.EOF {
            fmt.Printf("Receiving client streaming data completed\n")
            return nil
        }
        if err != nil {
            log.Fatalf("Error while receiving client streaming data: %v", err)
        }

        num := in.Num
        fmt.Printf("request received: %v\n", in)

        if num > maximum {
            maximum = num
            if err := stream.Send(&pb.MaximumResponse{Result: maximum}); err != nil {
                log.Fatalf("Error while sending streaming data to client: %v", err)
            }
        }
    }
}

func main() {
    port := flag.Int("port", 50051, "the port to serve on")
    flag.Parse()

    lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port)) // Specify the port we want to use to listen for client requests
    if err != nil {
        log.Fatalf("failed to listen: %v", err)
    }
    fmt.
                                
                            
未经允许不得转载: LIFE & SHARE - 王颜公子 » Go微服务实战|第5章:gRPC Bidirectional Streaming RPC

分享

作者

作者头像

Madman

如需 Linux / Python 相关问题付费解答,请按如下方式联系我

0 条评论

暂时还没有评论.