#### gRPC 的流式,分为三种类型:
- Server-side streaming RPC:服务器端流式 RPC
- Client-side streaming RPC:客户端流式 RPC
- Bidirectional streaming RPC:双向流式 RPC
![image.png](https://blog.zs-fighting.cn/upload/2022/02/image-32fbedc8bda6428691a0de9cc25169df.png)
>比如一个订单导出的接口有20万条记录,如果使用simple rpc来实现的话。那么我们需要一次性接收到20万记录才能进行下一步的操作。但是如果我们使用streaming rpc那么我们就可以接收一条记录处理一条记录,直到所以的数据传输完毕。这样可以较少服务器的瞬时压力,也更有及时性
```golang
// employee.proto
syntax = "proto3";
option go_package = "pb";
message Employee {
int32 id = 1;
int32 no = 2;
string firstName = 3;
string lastName = 4;
float salary = 5;
}
message GetByNoRequest {
int32 no = 1;
}
message EmployeeResponse {
Employee employee = 1;
}
message GetAllRequest {}
message AddPhotoRequest {
bytes data = 1;
}
message AddPhotoResponse {
bool isSuccess = 1;
}
message EmployeeRequest {
Employee employee = 1;
}
service EmployeeService {
rpc GetByNo(GetByNoRequest) returns (EmployeeResponse);
rpc GetAll(GetAllRequest) returns (stream EmployeeResponse); //服务端流式
rpc AddPhoto(stream AddPhotoRequest) returns (AddPhotoResponse); //客户端流式
rpc Save(EmployeeRequest) returns (EmployeeResponse);
rpc SaveAll(stream EmployeeRequest) returns (stream EmployeeResponse); //双向流式
}
```
```golang
// server.go
const port = ":5001"
var employees = []pb.Employee{
{
Id: 1,
No: 1995,
FirstName: "zhang",
LastName: "shun",
Salary: 88.88,
},
{
Id: 2,
No: 1999,
FirstName: "li",
LastName: "si",
Salary: 66.66,
},
}
func main() {
listen, err := net.Listen("tcp", port)
if err != nil {
log.Fatalln(err.Error())
}
server := grpc.NewServer()
pb.RegisterEmployeeServiceServer(server, new(employeeService))
log.Println("gRPC Server started ... " + port)
server.Serve(listen)
}
type employeeService struct{}
func (s *employeeService) GetByNo(ctx context.Context, request *pb.GetByNoRequest) (*pb.EmployeeResponse, error) {
for _, employee := range employees {
if request.No == employee.No {
return &pb.EmployeeResponse{Employee: &employee}, nil
}
}
return nil, errors.New("Not found ")
}
func (s *employeeService) GetAll(request *pb.GetAllRequest, stream pb.EmployeeService_GetAllServer) error {
for _, employee := range employees {
stream.Send(&pb.EmployeeResponse{Employee: &employee})
time.Sleep(2 * time.Second)
}
return nil
}
func (s *employeeService) AddPhoto(stream pb.EmployeeService_AddPhotoServer) error {
md, ok := metadata.FromIncomingContext(stream.Context())
if ok {
fmt.Printf("Employee: %s\n", md.Get(md["no"][0]))
}
var image []byte
for {
data, err := stream.Recv()
if err == io.EOF {
log.Printf("FileSize is %d\n", len(image))
return stream.SendAndClose(&pb.AddPhotoResponse{IsSuccess: true})
}
if err != nil {
log.Fatalln(err.Error())
}
log.Printf("Recv size is %d\n", len(data.Data))
image = append(image, data.Data...)
}
}
func (s *employeeService) Save(ctx context.Context, request *pb.EmployeeRequest) (*pb.EmployeeResponse, error) {
saveFile, err := os.OpenFile("./saveEmployee.txt", os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
defer saveFile.Close()
if err != nil {
log.Fatalln(err.Error())
}
time := time.Now().String()
writer := bufio.NewWriter(saveFile)
writer.WriteString(time + request.String() + "\n")
writer.Flush()
return &pb.EmployeeResponse{Employee: request.Employee}, nil
}
func (s *employeeService) SaveAll(server pb.EmployeeService_SaveAllServer) error {
for {
e, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
stream.Send(&pb.EmployeeResponse{Employee: e.Employee})
employees = append(employees, *e.Employee)
}
for _, employee := range employees {
log.Println(employee)
}
return nil
}
```
```golang
// client.go
package main
const port = ":5001"
func main() {
conn, err := grpc.Dial(port, grpc.WithInsecure())
if err != nil {
log.Fatalln(err.Error())
}
defer conn.Close()
client := pb.NewEmployeeServiceClient(conn)
//getEmployeeByNo(client)
//getAllEmployee(client)
//addPhoto(client)
//save(client)
saveAll(client)
}
func saveAll(client pb.EmployeeServiceClient) {
var employeesList = []pb.Employee{
{
Id: 3,
No: 1980,
FirstName: "wu",
LastName: "yanzu",
Salary: 100.00,
},
{
Id: 4,
No: 1991,
FirstName: "peng",
LastName: "yuyan",
Salary: 101.00,
},
}
stream, err := client.SaveAll(context.Background())
if err != nil {
log.Fatalln(err.Error())
}
sendOverChan := make(chan struct{})
go func() {
for {
res, err := stream.Recv()
if err == io.EOF {
sendOverChan <- struct{}{}
break
}
if err != nil {
log.Fatalln(err.Error())
}
log.Println(res.Employee)
}
}()
for _, employee := range employeesList {
err := stream.Send(&pb.EmployeeRequest{Employee: &employee})
if err != nil {
log.Fatalln(err.Error())
}
}
stream.CloseSend()
<-sendOverChan
}
func save(client pb.EmployeeServiceClient) {
employee := pb.Employee{
Id: 3,
No: 2022,
FirstName: "jin",
LastName: "sanpang",
Salary: 44.44,
}
res, err := client.Save(context.Background(), &pb.EmployeeRequest{Employee: &employee})
if err != nil {
log.Fatalln(err.Error())
}
log.Println(res)
}
func addPhoto(client pb.EmployeeServiceClient) {
imageFile, err := os.Open("/Users/zhangshun/employee.png")
if err != nil {
log.Fatalln(err.Error())
}
md := metadata.New(map[string]string{"no": "1995"})
context := context.Background()
context = metadata.NewOutgoingContext(context, md)
stream, err := client.AddPhoto(context)
if err != nil {
log.Fatalln(err.Error())
}
for {
chunk := make([]byte, 10*1024)
chunkSize, err := imageFile.Read(chunk)
if err == io.EOF {
break
}
if err != nil {
log.Fatalln(err.Error())
}
if chunkSize < len(chunk) {
chunk = chunk[:chunkSize]
}
stream.Send(&pb.AddPhotoRequest{Data: chunk})
time.Sleep(time.Millisecond * 500)
}
res, err := stream.CloseAndRecv()
if err != nil {
log.Fatalln(err.Error())
}
fmt.Println(res.IsSuccess)
}
func getAllEmployee(client pb.EmployeeServiceClient) {
stream, err := client.GetAll(context.Background(), &pb.GetAllRequest{})
if err != nil {
log.Fatalln(err.Error())
}
for {
res, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalln(err.Error())
}
fmt.Println(res.Employee)
}
}
func getEmployeeByNo(client pb.EmployeeServiceClient) {
res, err := client.GetByNo(context.Background(), &pb.GetByNoRequest{No: 1999})
if err != nil {
log.Fatalln(err.Error())
}
log.Println(res)
}
```
gRPC流式传输