gRPC与Protocol Buffers
约 1444 字大约 5 分钟
grpcprotobuf
2025-06-02
概述
gRPC是Google开源的高性能远程过程调用(RPC)框架,基于HTTP/2协议和Protocol Buffers(Protobuf)序列化格式。它提供了跨语言的服务定义和代码生成能力,支持四种通信模式(Unary、Server Streaming、Client Streaming、Bidirectional Streaming),广泛应用于微服务间通信、移动端与后端通信等场景。
Protocol Buffers编码
基本编码原理
Protobuf使用二进制编码,核心是Varint(可变长整数)编码和Wire Type(线缆类型)。
Varint编码规则:
- 每个字节的最高位(MSB)标识是否还有后续字节
- 0: 这是最后一个字节
- 1: 后面还有字节
- 使用小端序
示例:编码整数 300
300 的二进制: 1 00101100
Varint编码:
低7位: 0101100 → 加MSB=1 → 10101100 (0xAC)
高位: 0000010 → 加MSB=0 → 00000010 (0x02)
结果: [0xAC, 0x02] (2字节)
对比: 固定int32编码需要4字节 [0x2C, 0x01, 0x00, 0x00]ZigZag编码
用于高效编码负数。Varint对负数效率很低(-1的补码表示需要10字节),ZigZag将有符号整数映射为无符号整数:
ZigZag编码:
0 → 0
-1 → 1
1 → 2
-2 → 3
2 → 4
公式: (n << 1) ^ (n >> 31) // 32位
(n << 1) ^ (n >> 63) // 64位Wire Types
Wire Type | 含义 | 用于的Protobuf类型
0 | Varint | int32, int64, uint32, uint64, sint32, sint64, bool, enum
1 | 64-bit | fixed64, sfixed64, double
2 | Length-delimited| string, bytes, embedded messages, repeated fields
5 | 32-bit | fixed32, sfixed32, float每个字段编码为: (field_number << 3) | wire_type
示例:message Person { string name = 1; int32 age = 2; }
Person { name: "abc", age: 25 }
编码结果:
0A 03 61 62 63 | 10 19
│ │ └──────┘ │ └── 25的Varint
│ │ "abc" └── field=2, wire_type=0
│ └── 长度=3
└── field=1, wire_type=2Proto3语法
syntax = "proto3";
package order.v1;
option go_package = "github.com/example/order/v1;orderv1";
option java_package = "com.example.order.v1";
import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";
// 枚举
enum OrderStatus {
ORDER_STATUS_UNSPECIFIED = 0; // proto3要求第一个值为0
ORDER_STATUS_CREATED = 1;
ORDER_STATUS_PAID = 2;
ORDER_STATUS_SHIPPED = 3;
ORDER_STATUS_COMPLETED = 4;
}
// 消息定义
message Order {
string id = 1;
string user_id = 2;
repeated OrderItem items = 3; // 重复字段(列表)
OrderStatus status = 4;
map<string, string> metadata = 5; // Map类型
google.protobuf.Timestamp created_at = 6;
// oneof: 同一时间只能设置其中一个字段
oneof payment {
CreditCardPayment credit_card = 7;
WalletPayment wallet = 8;
}
}
message OrderItem {
string product_id = 1;
int32 quantity = 2;
double price = 3; // 使用Decimal类型更精确
}
message CreditCardPayment {
string card_number = 1;
string expiry = 2;
}
message WalletPayment {
string wallet_id = 1;
}gRPC服务定义
// 四种RPC模式
service OrderService {
// 1. Unary RPC(一元调用)
rpc CreateOrder(CreateOrderRequest) returns (CreateOrderResponse);
// 2. Server Streaming RPC(服务端流)
rpc WatchOrderStatus(WatchOrderRequest) returns (stream OrderStatusUpdate);
// 3. Client Streaming RPC(客户端流)
rpc BatchCreateOrders(stream CreateOrderRequest) returns (BatchCreateResponse);
// 4. Bidirectional Streaming RPC(双向流)
rpc OrderChat(stream OrderMessage) returns (stream OrderMessage);
}
message CreateOrderRequest {
string user_id = 1;
repeated OrderItem items = 2;
}
message CreateOrderResponse {
string order_id = 1;
OrderStatus status = 2;
}
message WatchOrderRequest {
string order_id = 1;
}
message OrderStatusUpdate {
string order_id = 1;
OrderStatus status = 2;
google.protobuf.Timestamp updated_at = 3;
}服务端实现
Go实现
package main
import (
"context"
"log"
"net"
"time"
pb "github.com/example/order/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
type orderServer struct {
pb.UnimplementedOrderServiceServer
}
// Unary RPC
func (s *orderServer) CreateOrder(ctx context.Context,
req *pb.CreateOrderRequest) (*pb.CreateOrderResponse, error) {
// 从metadata中获取认证信息
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Error(codes.Unauthenticated, "no metadata")
}
token := md.Get("authorization")
if len(token) == 0 {
return nil, status.Error(codes.Unauthenticated, "no token")
}
// 检查deadline
if deadline, ok := ctx.Deadline(); ok {
if time.Until(deadline) < 0 {
return nil, status.Error(codes.DeadlineExceeded, "deadline exceeded")
}
}
order := createOrderInDB(req)
return &pb.CreateOrderResponse{
OrderId: order.ID,
Status: pb.OrderStatus_ORDER_STATUS_CREATED,
}, nil
}
// Server Streaming RPC
func (s *orderServer) WatchOrderStatus(req *pb.WatchOrderRequest,
stream pb.OrderService_WatchOrderStatusServer) error {
for {
update := getOrderUpdate(req.OrderId)
if err := stream.Send(update); err != nil {
return err
}
time.Sleep(time.Second)
}
}
func main() {
lis, _ := net.Listen("tcp", ":50051")
grpcServer := grpc.NewServer(
grpc.UnaryInterceptor(loggingInterceptor),
)
pb.RegisterOrderServiceServer(grpcServer, &orderServer{})
log.Fatal(grpcServer.Serve(lis))
}Java实现
public class OrderServiceImpl extends OrderServiceGrpc.OrderServiceImplBase {
@Override
public void createOrder(CreateOrderRequest request,
StreamObserver<CreateOrderResponse> responseObserver) {
// 业务逻辑
String orderId = orderRepository.create(request);
CreateOrderResponse response = CreateOrderResponse.newBuilder()
.setOrderId(orderId)
.setStatus(OrderStatus.ORDER_STATUS_CREATED)
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
@Override
public void watchOrderStatus(WatchOrderRequest request,
StreamObserver<OrderStatusUpdate> responseObserver) {
// Server Streaming
String orderId = request.getOrderId();
orderEventBus.subscribe(orderId, update -> {
responseObserver.onNext(update);
});
}
@Override
public StreamObserver<CreateOrderRequest> batchCreateOrders(
StreamObserver<BatchCreateResponse> responseObserver) {
// Client Streaming
return new StreamObserver<>() {
List<String> orderIds = new ArrayList<>();
@Override
public void onNext(CreateOrderRequest request) {
String id = orderRepository.create(request);
orderIds.add(id);
}
@Override
public void onCompleted() {
responseObserver.onNext(BatchCreateResponse.newBuilder()
.addAllOrderIds(orderIds).build());
responseObserver.onCompleted();
}
@Override
public void onError(Throwable t) {
log.error("Batch create error", t);
}
};
}
}拦截器(Interceptor)
// Go Unary拦截器示例
func loggingInterceptor(ctx context.Context, req interface{},
info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
start := time.Now()
// 前置处理
log.Printf("gRPC call: %s", info.FullMethod)
// 调用实际的handler
resp, err := handler(ctx, req)
// 后置处理
duration := time.Since(start)
if err != nil {
log.Printf("gRPC error: %s, duration: %v, err: %v",
info.FullMethod, duration, err)
} else {
log.Printf("gRPC success: %s, duration: %v",
info.FullMethod, duration)
}
return resp, err
}
// 链式拦截器
grpcServer := grpc.NewServer(
grpc.ChainUnaryInterceptor(
loggingInterceptor,
authInterceptor,
rateLimitInterceptor,
),
)Metadata与Deadline
// 客户端设置metadata和deadline
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
md := metadata.New(map[string]string{
"authorization": "Bearer " + token,
"x-request-id": uuid.New().String(),
})
ctx = metadata.NewOutgoingContext(ctx, md)
resp, err := client.CreateOrder(ctx, req)
if err != nil {
st, _ := status.FromError(err)
if st.Code() == codes.DeadlineExceeded {
log.Println("Request timed out")
}
}gRPC vs REST对比
| 特性 | gRPC | REST (JSON) |
|---|---|---|
| 协议 | HTTP/2 | HTTP/1.1 或 HTTP/2 |
| 序列化 | Protobuf(二进制) | JSON(文本) |
| 性能 | 高(~10x faster) | 中 |
| 代码生成 | 强类型,多语言 | 需额外工具(OpenAPI) |
| 流式传输 | 原生支持 | 需WebSocket/SSE |
| 浏览器支持 | 需grpc-web代理 | 原生支持 |
| 可读性 | 二进制,不可直接阅读 | JSON,人类可读 |
总结
gRPC结合Protocol Buffers提供了高效、类型安全的跨语言RPC通信方案。Protobuf的Varint和ZigZag编码实现了紧凑的二进制格式,HTTP/2的多路复用和头部压缩进一步提升了网络效率。四种通信模式覆盖了从简单请求响应到复杂双向流的各种场景。拦截器机制则提供了灵活的中间件能力。在微服务内部通信、移动端到后端通信等场景中,gRPC是REST的优秀替代方案。
贡献者
更新日志
2026/3/14 13:09
查看所有更新日志
9f6c2-feat: organize wiki content and refresh site setup于