Have you ever wondered while developing a REST API that the server could get the capability to stream responses using the same TCP connection? Or, reversely, the REST client could have the ability to stream the requests to the server? This could have saved the cost of bringing up another service (like WebSocket) just to fulfill such a requirement.
For such cases, REST isn’t the only API architecture available. People can now bank on the gRPC model as it has begun to play a crucial role. gRPC’s unidirectional-streaming RPC feature could be the perfect choice to meet those requirements.
Objective
In this blog, you’ll get to know what client streaming & server streaming uni-directional RPCs are. I will also discuss how to implement, test, and run them using a live, fully functional example.
Previously, in Part-1 of this blog series, we’ve learned the basics of gRPC, how to implement a Simple/Unary gRPC, how to write unit tests, how to launch the server & client. Part-1 is a step-by-step guide to implement a Stack Machine server & client leveraging Simple/Unary RPC.
If you’ve missed that, it is highly recommended to go through it to get familiar with the basics of the gRPC framework.
Introduction
Let’s understand how Client streaming & Server streaming RPCs work at a very high level.
Client streaming RPCs where:
-
- the client writes a sequence of messages and sends them to the server using a provided stream
- once the client has finished writing the messages, it waits for the server to read them and return its response
Server streaming RPCs where:
-
- the client sends a request to the server and gets a stream to read a sequence of messages back
- the client reads from the returned stream until there are no more messages
The best thing is gRPC guarantees message ordering within an individual RPC call.
Now let’s improve the “Stack Machine” server & client codes to support unidirectional streaming.
Implementing Server Streaming RPC
We’ll see an example of Server Streaming first by implementing the FIB operation.
Where the FIB RPC will:
-
- perform a Fibonacci operation
- accept an integer input i.e. generate first N numbers of the Fibonacci series
- will respond with a stream of integers i.e. first N numbers of the Fibonacci series
And later we’ll see how Client Streaming can be implemented so that a client can input a stream of instructions to the Stack Machine in real-time rather than sending a single request consisting of a set of instructions.
Update Protobuf
We already have defined the gRPC service Machine and a Simple (Unary) RPC method Execute inside our service definition in part-1 of the blog series. Now, let’s update the service definition to add one server streaming RPC called ServerStreamingExecute.
-
- A server streaming RPC where the client sends a request to the server using the stub and waits for a response to come back as a stream of result
- To specify a server-side streaming method, need to place the stream keyword before the response type
// ServerStreamingExecute accepts a set of Instructions from client and returns a stream of Result.
rpc ServerStreamingExecute(InstructionSet) returns (stream Result) {}
source: machine/machine.proto
Generating pdated Client & Server Interface Go Code
We need to generate the gRPC client and server interfaces from our machine/machine.proto service definition.
~/disk/E/workspace/grpc-eg-go $ SRC_DIR=./ $ DST_DIR=$SRC_DIR $ protoc \ -I=$SRC_DIR \ --go_out=plugins=grpc:$DST_DIR \ $SRC_DIR/machine/machine.proto
You can observe that the declaration of ServerStreamingExecute() in the MachineClient and MachineServer interface has been auto-generated:
... type MachineClient interface { Execute(ctx context.Context, in *InstructionSet, opts ...grpc.CallOption) (*Result, error) + ServerStreamingExecute(ctx context.Context, in *InstructionSet, opts ...grpc.CallOption) (Machine_ServerStreamingExecuteClient, error) } ... type MachineServer interface { Execute(context.Context, *InstructionSet) (*Result, error) + ServerStreamingExecute(*InstructionSet, Machine_ServerStreamingExecuteServer) error }
source: machine/machine.pb.go
Update the Server
Just in case if you’re wondering, What if my service doesn’t implement some of the RPCs declared in the machine.pb.go file, then you’ll encounter the following error while launching your gRPC server.
~/disk/E/workspace/grpc-eg-go $ go run cmd/run_machine_server.go # command-line-arguments cmd/run_machine_server.go:32:44: cannot use &server.MachineServer literal (type *server.MachineServer) as type machine.MachineServer in argument to machine.RegisterMachineServer: *server.MachineServer does not implement machine.MachineServer (missing ServerStreamingExecute method)
So, it’s always the best practice to keep your service in sync with the service definition i.e. machine/machine.proto & machine/machine.pb.go. If you do not want to support a particular RPC, or its implementation is not yet ready, just respond with Unimplemented error status. Example:
// ServerStreamingExecute runs the set of instructions given and streams a sequence of Results.
func (s *MachineServer) ServerStreamingExecute(instructions *machine.InstructionSet, stream machine.Machine_ServerStreamingExecuteServer) error { return status.Error(codes.Unimplemented, "ServerStreamingExecute() not implemented yet") }
source: server/machine.go
Before we implement the ServerStreamingExecute() RPC, let’s write a Fibonacci series generator called FibonacciRange().
package utils func FibonacciRange(n int) <-chan int { ch := make(chan int) fn := make([]int, n+1, n+2) fn[0] = 0 fn[1] = 1 go func() { defer close(ch) for i := 0; i <= n; i++ { var f int if i < 2 { f = fn[i] } else { f = fn[i-1] + fn[i-2] } fn[i] = f ch <- f } }() return ch }
source: utils/fibonacci.go
The blog series assumes that you’re familiar with Golang basics & its concurrency paradigms & concepts like Channels. You can read more about the Channels from the official document.
This function yields the numbers of Fibonacci series till the Nth position.
Let’s also add a small unit test to validate the FibonacciRange() generator.
package utils import ( "testing" ) func TestFibonacciRange(t *testing.T) { fibOf5 := []int{0, 1, 1, 2, 3, 5} i := 0 for f := range FibonacciRange(5) { if f != fibOf5[i] { t.Errorf("got %d, want %d", f, fibOf5[i]) } i++ } }
source: utils/fibonacci_test.go
Let’s implement ServerStreamingExecute() to handle the basic instructions PUSH/POP, and FIB with proper error handling. On completion of the execution of instructions set, it should POP the result from the Stack and should respond with a Result object to the client.
func (s *MachineServer) ServerStreamingExecute(instructions *machine.InstructionSet, stream machine.Machine_ServerStreamingExecuteServer) error { if len(instructions.GetInstructions()) == 0 { return status.Error(codes.InvalidArgument, "No valid instructions received") } var stack stack.Stack for _, instruction := range instructions.GetInstructions() { operand := instruction.GetOperand() operator := instruction.GetOperator() op_type := OperatorType(operator) log.Printf("Operand: %v, Operator: %v\n", operand, operator) switch op_type { case PUSH: stack.Push(float32(operand)) case POP: stack.Pop() case FIB: n, popped := stack.Pop() if !popped { return status.Error(codes.Aborted, "Invalid sets of instructions. Execution aborted") } if op_type == FIB { for f := range utils.FibonacciRange(int(n)) { log.Println(float32(f)) stream.Send(&machine.Result{Output: float32(f)}) } } default: return status.Errorf(codes.Unimplemented, "Operation '%s' not implemented yet", operator) } } return nil }
source: server/machine.go
Update the Client
Now, update the client code to call ServerStreamingExecute() where the client will receivenumbers of the Fibonacci series through the stream and print the same.
func runServerStreamingExecute(client machine.MachineClient, instructions *machine.InstructionSet) { log.Printf("Executing %v", instructions) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() stream, err := client.ServerStreamingExecute(ctx, instructions) if err != nil { log.Fatalf("%v.Execute(_) = _, %v: ", client, err) } for { result, err := stream.Recv() if err == io.EOF { log.Println("EOF") break } if err != nil { log.Printf("Err: %v", err) break } log.Printf("output: %v", result.GetOutput()) } log.Println("DONE!") }
source: client/machine.go
Test
To write the unit test we’ll have to generate the mock of multiple interfaces as required.
mockgen is the ready-to-go framework for mocking in Golang, so we’ll be leveraging it in our unit tests.
Server
As we’ve updated our interface i.e. machine/machine.pb.go, let’s update the mock for MachineClient interface. And as we’ve introduced a new RPC ServerStreamingExecute(), let’s generate the mock for ServerStream interface Machine_ServerStreamingExecuteServer as well.
~/disk/E/workspace/grpc-eg-go $ mockgen github.com/toransahu/grpc-eg-go/machine MachineClient,Machine_ServerStreamingExecuteServer > mock_machine/machine_mock.go
The updated mock_machine/machine_mock.go should look like this.
Now, we’re good to write unit test for server-side streaming RPC ServerStreamingExecute():
func TestServerStreamingExecute(t *testing.T) { s := MachineServer{} // set up test table tests := []struct { instructions []*machine.Instruction want []float32 }{ { instructions: []*machine.Instruction{ {Operand: 5, Operator: "PUSH"}, {Operator: "FIB"}, }, want: []float32{0, 1, 1, 2, 3, 5}, }, { instructions: []*machine.Instruction{ {Operand: 6, Operator: "PUSH"}, {Operator: "FIB"}, }, want: []float32{0, 1, 1, 2, 3, 5, 8}, }, } ctrl := gomock.NewController(t) defer ctrl.Finish() mockServerStream := mock_machine.NewMockMachine_ServerStreamingExecuteServer(ctrl) for _, tt := range tests { mockResults := []*machine.Result{} mockServerStream.EXPECT().Send(gomock.Any()).DoAndReturn( func(result *machine.Result) error { mockResults = append(mockResults, result) return nil }).AnyTimes() req := &machine.InstructionSet{Instructions: tt.instructions} err := s.ServerStreamingExecute(req, mockServerStream) if err != nil { t.Errorf("ServerStreamingExecute(%v) got unexpected error: %v", req, err) } for i, result := range mockResults { got := result.GetOutput() want := tt.want[i] if got != want { t.Errorf("got %v, want %v", got, want) } } } }
Please refer to the server/machine_test.go for detailed content.
Let’s run the unit test:
~/disk/E/workspace/grpc-eg-go $ go test server/machine.go server/machine_test.go ok command-line-arguments 0.003s
Client
For our new RPC ServerStreamingExecute(), let’s add the mock for ClientStream interface Machine_ServerStreamingExecuteClient as well.
~/disk/E/workspace/grpc-eg-go $ mockgen github.com/toransahu/grpc-eg-go/machine MachineClient,Machine_ServerStreamingExecuteServer,Machine_ServerStreamingExecuteClient > mock_machine/machine_mock.go
source: mock_machine/machine_mock.go
Let’s add unit test to test client-side logic for server-side streaming RPC ServerStreamingExecute() using mock MockMachine_ServerStreamingExecuteClient :
func TestServerStreamingExecute(t *testing.T) { instructions := []*machine.Instruction{} instructions = append(instructions, &machine.Instruction{Operand: 1, Operator: "PUSH"}) instructions = append(instructions, &machine.Instruction{Operator: "FIB"}) instructionSet := &machine.InstructionSet{Instructions: instructions} ctrl := gomock.NewController(t) defer ctrl.Finish() mockMachineClient := mock_machine.NewMockMachineClient(ctrl) clientStream := mock_machine.NewMockMachine_ServerStreamingExecuteClient(ctrl) clientStream.EXPECT().Recv().Return(&machine.Result{Output: 0}, nil) mockMachineClient.EXPECT().ServerStreamingExecute( gomock.Any(), // context instructionSet, // rpc uniary message ).Return(clientStream, nil) if err := testServerStreamingExecute(t, mockMachineClient, instructionSet); err != nil { t.Fatalf("Test failed: %v", err) } }
Please refer to the mock_machine/machine_mock_test.go for detailed content.
Let’s run the unit test:
~/disk/E/workspace/grpc-eg-go $ go test mock_machine/machine_mock.go mock_machine/machine_mock_test.go ok command-line-arguments 0.003s
Run
nit tests assure us that the business logic of the server & client codes is working as expected, let’s try running the server and communicating to it via our client code.
Server
To start the server we need to run the previously created cmd/run_machine_server.go file.
~/disk/E/workspace/grpc-eg-go $ go run cmd/run_machine_server.go
Client
Now, let’s run the client code client/machine.go.
~/disk/E/workspace/grpc-eg-go $ go run client/machine.go Executing instructions:<operator:"PUSH" operand:5 > instructions:<operator:"PUSH" operand:6 > instructions:<operator:"MUL" > output:30 Executing instructions:<operator:"PUSH" operand:6 > instructions:<operator:"FIB" > output: 0 output: 1 output: 1 output: 2 output: 3 output: 5 output: 8 EOF DONE!
Awesome! A Server Streaming RPC has been successfully implemented.
Implementing Client Streaming RPC
We have learned how to implement a Server Streaming RPC, now it’s time to explore the Client Streaming RPC.
To do so, we’ll not introduce another RPC, rather we’ll update the existing Execute() RPC to accept a stream of Instructions from the client in real-time rather than sending a single request comprisesa set of Instructions.
Update the protobuf
So, let’s update the interface:
service Machine { - rpc Execute(InstructionSet) returns (Result) {} + rpc Execute(stream Instruction) returns (Result) {} rpc ServerStreamingExecute(InstructionSet) returns (stream Result) {} }
source: machine/machine.proto
Generating the updated client and server interface Go code
Now let’s generate an updated golang code from the machine/machine.proto by running:
~/disk/E/workspace/grpc-eg-go $ SRC_DIR=./ $ DST_DIR=$SRC_DIR $ protoc \ -I=$SRC_DIR \ --go_out=plugins=grpc:$DST_DIR \ $SRC_DIR/machine/machine.proto
You’ll notice that the declaration of Execute() has been updated from MachineServer & MachineClient interfaces.
type MachineServer interface { - Execute(context.Context, *InstructionSet) (*Result, error) + Execute(Machine_ExecuteServer) error ServerStreamingExecute(*InstructionSet, Machine_ServerStreamingExecuteServer) error } type MachineClient interface { - Execute(ctx context.Context, in *InstructionSet, opts ...grpc.CallOption) (*Result, error) + Execute(ctx context.Context, opts ...grpc.CallOption) (Machine_ExecuteClient, error) ServerStreamingExecute(ctx context.Context, in *InstructionSet, opts ...grpc.CallOption) (Machine_ServerStreamingExecuteClient, error) }
source: machine/machine.pb.go
Update the Server
Let’s update the server code to make Execute() a client streaming uni-directional RPC so that it should be able to accept stream the instructions from the client and respond with a Result struct.
func (s *MachineServer) Execute(stream machine.Machine_ExecuteServer) error { var stack stack.Stack for { instruction, err := stream.Recv() if err == io.EOF { log.Println("EOF") output, popped := stack.Pop() if !popped { return status.Error(codes.Aborted, "Invalid sets of instructions. Execution aborted") } if err := stream.SendAndClose(&machine.Result{ Output: output, }); err != nil { return err } return nil } if err != nil { return err } operand := instruction.GetOperand() operator := instruction.GetOperator() op_type := OperatorType(operator) fmt.Printf("Operand: %v, Operator: %v\n", operand, operator) switch op_type { case PUSH: stack.Push(float32(operand)) case POP: stack.Pop() case ADD, SUB, MUL, DIV: item2, popped := stack.Pop() item1, popped := stack.Pop() if !popped { return status.Error(codes.Aborted, "Invalid sets of instructions. Execution aborted") } if op_type == ADD { stack.Push(item1 + item2) } else if op_type == SUB { stack.Push(item1 - item2) } else if op_type == MUL { stack.Push(item1 * item2) } else if op_type == DIV { stack.Push(item1 / item2) } default: return status.Errorf(codes.Unimplemented, "Operation '%s' not implemented yet", operator) } } }
source: server/machine.go
Update the Client
Now update the client code to make client.Execute() a uni-directional streaming RPC, so that the client can stream the instructions to the server and can receive a Result struct once the streaming completes.
func runExecute(client machine.MachineClient, instructions *machine.InstructionSet) { log.Printf("Streaming %v", instructions) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() stream, err := client.Execute(ctx) if err != nil { log.Fatalf("%v.Execute(ctx) = %v, %v: ", client, stream, err) } for _, instruction := range instructions.GetInstructions() { if err := stream.Send(instruction); err != nil { log.Fatalf("%v.Send(%v) = %v: ", stream, instruction, err) } } result, err := stream.CloseAndRecv() if err != nil { log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil) } log.Println(result) }
source: client/machine.go
Test
Generate mock for Machine_ExecuteClient and Machine_ExecuteServer interface to test client-streaming RPC Execute():
~/disk/E/workspace/grpc-eg-go $ mockgen github.com/toransahu/grpc-eg-go/machine MachineClient,Machine_ServerStreamingExecuteClient,Machine_ServerStreamingExecuteServer,Machine_ExecuteServer,Machine_ExecuteClient > mock_machine/machine_mock.go
The updated mock_machine/machine_mock.go should look like this.
Server
Let’s update the unit test to test the server-side logic of client streaming Execute() RPC using mock:
func TestExecute(t *testing.T) { s := MachineServer{} ctrl := gomock.NewController(t) defer ctrl.Finish() mockServerStream := mock_machine.NewMockMachine_ExecuteServer(ctrl) mockResult := &machine.Result{} callRecv1 := mockServerStream.EXPECT().Recv().Return(&machine.Instruction{Operand: 5, Operator: "PUSH"}, nil) callRecv2 := mockServerStream.EXPECT().Recv().Return(&machine.Instruction{Operand: 6, Operator: "PUSH"}, nil).After(callRecv1) callRecv3 := mockServerStream.EXPECT().Recv().Return(&machine.Instruction{Operator: "MUL"}, nil).After(callRecv2) mockServerStream.EXPECT().Recv().Return(nil, io.EOF).After(callRecv3) mockServerStream.EXPECT().SendAndClose(gomock.Any()).DoAndReturn( func(result *machine.Result) error { mockResult = result return nil }) err := s.Execute(mockServerStream) if err != nil { t.Errorf("Execute(%v) got unexpected error: %v", mockServerStream, err) } got := mockResult.GetOutput() want := float32(30) if got != want { t.Errorf("got %v, wanted %v", got, want) } }
Please refer to the server/machine_test.go for detailed content.
Let’s run the unit test:
~/disk/E/workspace/grpc-eg-go $ go test server/machine.go server/machine_test.go ok command-line-arguments 0.003s
Client
Now, add unit test to test client-side logic of client streaming Execute() RPC using mock:
func TestExecute(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() mockMachineClient := mock_machine.NewMockMachineClient(ctrl) mockClientStream := mock_machine.NewMockMachine_ExecuteClient(ctrl) mockClientStream.EXPECT().Send(gomock.Any()).Return(nil).AnyTimes() mockClientStream.EXPECT().CloseAndRecv().Return(&machine.Result{Output: 30}, nil) mockMachineClient.EXPECT().Execute( gomock.Any(), // context ).Return(mockClientStream, nil) testExecute(t, mockMachineClient) }
Please refer to the mock_machine/machine_mock_test.go for detailed content.
Let’s run the unit test:
~/disk/E/workspace/grpc-eg-go $ go test mock_machine/machine_mock.go mock_machine/machine_mock_test.go ok command-line-arguments 0.003s
Run all the unit tests at once:
~/disk/E/workspace/grpc-eg-go $ go test ./... ? github.com/toransahu/grpc-eg-go/client [no test files] ? github.com/toransahu/grpc-eg-go/cmd [no test files] ? github.com/toransahu/grpc-eg-go/machine [no test files] ok github.com/toransahu/grpc-eg-go/mock_machine (cached) ok github.com/toransahu/grpc-eg-go/server (cached) ok github.com/toransahu/grpc-eg-go/utils (cached) ? github.com/toransahu/grpc-eg-go/utils/stack [no test files]
Run
Now we are assured through unit tests that the business logic of the server & client codes is working as expected. Let’s try running the server and communicating with it via our client code.
Server
To launch the server we need to run the previously created cmd/run_machine_server.go file.
~/disk/E/workspace/grpc-eg-go $ go run cmd/run_machine_server.go
Client
Now, let’s run the client code client/machine.go.
~/disk/E/workspace/grpc-eg-go $ go run client/machine.go Streaming instructions:<operator:"PUSH" operand:5 > instructions:<operator:"PUSH" operand:6 > instructions:<operator:"MUL" > output:30 Executing instructions:<operator:"PUSH" operand:6 > instructions:<operator:"FIB" > output: 0 output: 1 output: 1 output: 2 output: 3 output: 5 output: 8 EOF DONE!
Awesome!! We have successfully transformed a Unary RPC into Server Streaming RPC.
At the end of this blog, we’ve learned:
-
-
- How to define an interface for uni-directional streaming RPCs using protobuf?
- How to write gRPC server & client logic for uni-directional streaming RPCs?
- How to write and run the unit test for server-streaming & client-streaming RPCs?
- How to run the gRPC server and a client can communicate to it?
-
The source code of this example is available at toransahu/grpc-eg-go.
You can also git checkout to this commit SHA for Part-2(a) and to this commit SHA for Part-2(b).
See you in the next part of this blog series.