Go微服务实战|第5章:gRPC Bidirectional Streaming RPC
Synopsis: gRPC Bidirectional Streaming RPC 是指客户端和服务端都可以同时发送或接收 Streaming 数据流,而且 Requests 和 Responses 的数量不要求一致,即它们是异步的,也许客户端发送了 3 个请求 messages,而服务端响应了 5 个响应 messages。它非常适合当客户端和服务端需要异步发送很多数据时,或者聊天场景,或者客户端和服务端的长时间连接情形等
代码已上传到 https://github.com/wangy8961/grpc-go-tutorial/tree/v0.5 ,欢迎 star
1. Protocol Buffers 定义
要定义 gRPC Bidirectional Streaming RPC,只需要在 Protocol buffers 中为 RPC 方法的请求和响应前都增加 stream 关键字即可

我们将为 Math 服务添加一个新的 RPC 调用,动态 查找客户端发送的一系列数值中的最大值
修改 grpc-go-tutorial/math/mathpb/math.proto 文件:
syntax = "proto3";
option go_package="mathpb";
package math;
// The math service definition.
service Math {
...
// Maximum is bi-directional streaming RPC
rpc Maximum(stream MaximumRequest) returns (stream MaximumResponse) {};
}
...
// The request message for Maximum.
message MaximumRequest {
int32 num = 1;
}
// The response message for Maximum.
message MaximumResponse {
int32 result = 1;
}
然后切换到 .proto 文件所在的目录:
[root@CentOS ~]# cd grpc-go-tutorial/math/mathpb/ [root@CentOS mathpb]# protoc --go_out=plugins=grpc:. *.proto
将会更新 math.pb.go 文件
2. 实现 gRPC 服务端
服务端需要实现 math.pb.go 中的 server API:
// MathServer is the server API for Math service. type MathServer interface { ... // Maximum is bi-directional streaming RPC Maximum(Math_MaximumServer) error }
所以,修改 grpc-go-tutorial/math/math_server/main.go 文件:
// Package main implements a server for Math service. package main import ( "context" "flag" "fmt" "io" "log" "net" pb "github.com/wangy8961/grpc-go-tutorial/math/mathpb" "google.golang.org/grpc" ) // server is used to implement mathpb.MathServer. type server struct{} ... // Maximum implements mathpb.MathServer func (s *server) Maximum(stream pb.Math_MaximumServer) error { fmt.Printf("--- gRPC Bidirectional Streaming RPC ---\n") // Read requests and send responses maximum := int32(0) for { in, err := stream.Recv() if err == io.EOF { fmt.Printf("Receiving client streaming data completed\n") return nil } if err != nil { log.Fatalf("Error while receiving client streaming data: %v", err) } num := in.Num fmt.Printf("request received: %v\n", in) if num > maximum { maximum = num if err := stream.Send(&pb.MaximumResponse{Result: maximum}); err != nil { log.Fatalf("Error while sending streaming data to client: %v", err) } } } } func main() { port := flag.Int("port", 50051, "the port to serve on") flag.Parse() lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port)) // Specify the port we want to use to listen for client requests if err != nil { log
0 条评论
评论者的用户名
评论时间暂时还没有评论.