Go微服务实战|第5章:gRPC Bidirectional Streaming RPC
Synopsis: gRPC Bidirectional Streaming RPC 是指客户端和服务端都可以同时发送或接收 Streaming 数据流,而且 Requests 和 Responses 的数量不要求一致,即它们是异步的,也许客户端发送了 3 个请求 messages,而服务端响应了 5 个响应 messages。它非常适合当客户端和服务端需要异步发送很多数据时,或者聊天场景,或者客户端和服务端的长时间连接情形等
代码已上传到 https://github.com/wangy8961/grpc-go-tutorial/tree/v0.5 ,欢迎 star
1. Protocol Buffers 定义
要定义 gRPC Bidirectional Streaming RPC
,只需要在 Protocol buffers 中为 RPC 方法的请求和响应前都增加 stream
关键字即可
我们将为 Math
服务添加一个新的 RPC 调用,动态 查找客户端发送的一系列数值中的最大值
修改 grpc-go-tutorial/math/mathpb/math.proto
文件:
syntax = "proto3"; option go_package="mathpb"; package math; // The math service definition. service Math { ... // Maximum is bi-directional streaming RPC rpc Maximum(stream MaximumRequest) returns (stream MaximumResponse) {}; } ... // The request message for Maximum. message MaximumRequest { int32 num = 1; } // The response message for Maximum. message MaximumResponse { int32 result = 1; }
然后切换到 .proto
文件所在的目录:
[root@CentOS ~]# cd grpc-go-tutorial/math/mathpb/ [root@CentOS mathpb]# protoc --go_out=plugins=grpc:. *.proto
将会更新 math.pb.go
文件
2. 实现 gRPC 服务端
服务端需要实现 math.pb.go
中的 server API:
// MathServer is the server API for Math service. type MathServer interface { ... // Maximum is bi-directional streaming RPC Maximum(Math_MaximumServer) error }
所以,修改 grpc-go-tutorial/math/math_server/main.go
文件:
// Package main implements a server for Math service. package main import ( "context" "flag" "fmt" "io" "log" "net" pb "github.com/wangy8961/grpc-go-tutorial/math/mathpb" "google.golang.org/grpc" ) // server is used to implement mathpb.MathServer. type server struct{} ... // Maximum implements mathpb.MathServer func (s *server) Maximum(stream pb.Math_MaximumServer) error { fmt.Printf("--- gRPC Bidirectional Streaming RPC ---\n") // Read requests and send responses maximum := int32(0) for { in, err := stream.Recv() if err == io.EOF { fmt.Printf("Receiving client streaming data completed\n") return nil } if err != nil { log.Fatalf("Error while receiving client streaming data: %v", err) } num := in.Num fmt.Printf("request received: %v\n", in) if num > maximum { maximum = num if err := stream.Send(&pb.MaximumResponse{Result: maximum}); err != nil { log.Fatalf("Error while sending streaming data to client: %v", err) } } } } func main() { port := flag.Int("port", 50051, "the port to serve on") flag.Parse() lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port)) // Specify the port we want to use to listen for client requests if err != nil { log
0 条评论
评论者的用户名
评论时间暂时还没有评论.