gRPC 基础
1. 什么是 RPC
1.1 RPC 的基本概念
RPC 的全称是 Remote Procedure Call,中文意思是远程过程调用。
远程过程调用可以拆分为:“远程”与“过程调用”。
-
过程调用
“过程”你可以理解为函数、方法。比如在你的代码里,你写了一个函数:
int result = add(1, 2);。这就是一个典型的本地过程调用。它发生在同一个程序、同一个内存空间里,调用方(你的代码)和被调用方(add函数)是“在一起”的。 -
远程
“远程”意味着调用方和被调用方不在一起。它们可能位于:同一台机器上的两个不同进程、同一个局域网内的两台不同机器、甚至跨越了互联网,位于地球两端的两个数据中心。
所以RPC 的目标就是:让调用一个远程的服务(函数、方法),就像调用本地函数一样简单、自然。
换句话说,开发者不需要显式地处理网络通信的细节(比如用什么协议、如何序列化数据、如何连接网络等)。你只需要像调用本地函数一样,传入参数,然后获取返回值,底层的复杂操作由 RPC 框架帮你完成。
换做更为通俗易懂的话来说就是,比如你现在饿了,想吃饭。

如果是在 20 年前,你只能自己吭哧吭哧跑到店里去买。

但现在有了手机、网络和外卖平台,你只需要在家动动手指点个外卖,骑手就能直接把饭配送到家。你不需要关注网络是怎么传输的、平台是怎么操作的、骑手是怎么配送的,只负责享受美味就行了。
这个过程其实就是 RPC 的核心思想。
1.2 为什么需要 RPC
随着软件架构从单体应用向微服务、分布式系统演进,一个大型应用会被拆分成多个独立的、功能单一的服务。这些服务需要频繁地进行通信和协作。
如果没有 RPC,我们将面临如下许多问题:
- 开发效率低:每个服务间的通信都需要开发者手动写网络代码,处理套接字、数据包、序列化等,重复劳动且容易出错。
- 代码耦合高:网络通信逻辑和业务逻辑混杂在一起,代码难以维护。
- 技术栈不统一:不同服务可能使用不同的通信协议和数据格式,造成集成困难。
而随着 RPC 的出现,正是为了将网络通信的复杂性封装起来,让开发者可以更专注于业务逻辑本身,从而实现高效的分布式系统开发。
1.3 RPC 的核心工作原理与流程
-
客户端调用
客户端代码像调用本地方法一样,调用一个接口方法:
result = clientStub.getUser(123);。这里的clientStub是 RPC 框架在客户端生成的代理对象,它并不是真正的服务实现,而是一个“伪装者”。 -
序列化
客户端存根(Client Stub)将调用的方法名(如
getUser)、参数(如123)以及其他信息(如请求ID)序列化成一种能够通过网络传输的格式(字节流)。常见的序列化协议有:JSON、XML、Protobuf、Thrift、Avro 等。Protobuf 因其高效和跨语言特性而被广泛使用。[!tip]
这里的Protobuf全称为Protocol Buffers,是由Google开发的。
-
网络传输
客户端存根通过网络模块,将序列化后的字节流发送到服务端。这需要知道服务端的地址(IP和端口)。这里涉及到网络通信模型(如 BIO, NIO, Netty)、传输协议(最常用的是 TCP,也可以是 HTTP/1.1, HTTP/2 或 UDP)等。
[!tip]
不过gRPC将网络服务都给我们封装好了,到时候直接使用就可以了。
-
服务端处理
服务端收到字节流后,服务端存根(Server Stub)会将其反序列化,还原出原始的方法名和参数。服务端存根根据方法名找到对应的本地服务实现,并传入参数进行调用:
realService.getUser(123)。 -
返回响应
服务端存根将执行结果(或异常)序列化。通过网络将序列化后的数据返回给客户端。
-
客户端处理响应
客户端存根收到响应后,反序列化出结果。最后,这个结果被返回给最初发起调用的客户端代码。至此,对于客户端开发者来说,一次普通的“本地调用”结束。
1.4 RPC 框架核心组件
一个成熟的 RPC 框架通常包含以下组件:
- 客户端存根 (Client Stub): 代理对象,负责序列化、网络发送、接收响应、反序列化。
- 服务端存根 (Server Stub): 负责反序列化请求、调用实际服务、序列化响应。
- 序列化/反序列化模块: 核心的编码解码器。
- 网络传输模块: 负责底层通信,通常基于高性能网络库(如 Netty)。
- 服务注册与发现 (Service Registry & Discovery): 在微服务中,服务实例是动态变化的。服务提供者向注册中心注册自己的地址,服务消费者从注册中心拉取地址列表。常见的注册中心有 Nacos, Consul, Zookeeper, Etcd。
- 负载均衡 (Load Balance): 当同一个服务有多个实例时,客户端需要决定调用哪一个。策略包括轮询、随机、加权、最少连接等。
- 容错机制: 如超时控制、重试、熔断器、降级等,保证系统的可靠性。
- 监控与治理: 跟踪调用链路、统计性能指标,便于运维和问题排查。
1.5 常见的 RPC 框架
[!important]
RPC既是一种规范和概念,又是一种设计思想和工作模式。并不是一种具体的实现,真正实现了RPC的是如下这些框架。
| 框架 | 主要特性 | 序列化方式 | 通信协议 | 多语言支持 | 服务治理 | 典型应用场景 |
|---|---|---|---|---|---|---|
| gRPC | 高性能、流式传输、丰富的生态工具链 | Protocol Buffers (Protobuf) | HTTP/2 | 支持主流语言 (C++, Go, Java, Python等) | 基础能力,或依赖第三方组件 | 云原生与微服务、服务网格Sidecar、实时流式通信、多数据中心通信 |
| Apache Thrift | 跨语言兼容性、轻量级部署、灵活的序列化协议 | Binary, Compact, JSON等多种协议 | 自定义二进制协议 (可基于TCP或HTTP) | 支持广泛,包括Java, C++, Python, PHP, Go等 | 无,或依赖第三方组件 | 内网高性能通信、多语言异构系统、资源受限的边缘设备或IoT |
| Apache Dubbo | 服务治理能力强,集成度高 | Hessian2等 | 自定义Dubbo协议 (基于TCP) | 主要面向Java生态 | 完善的服务治理能力 | Java生态的电商、支付等复杂业务系统 |
2. 什么是 gRPC
参考文档:什么是 gRPC。
2.1 gRPC 基本概念
gRPC 是 Google 开发的一个高性能、开源、通用的 RPC 框架。它基于 HTTP/2 协议标准设计,默认使用 Protocol Buffers 作为接口定义语言(IDL)和序列化工具。
gRPC 不像传统的 REST API 那样使用 HTTP/1.1,而是建立在 HTTP/2 协议之上。这带来了许多先天优势:
- 二进制协议:HTTP/2 使用二进制帧进行通信,比 HTTP/1.1 的文本格式更高效、解析更快、占用带宽更少。
- 多路复用:在单个 TCP 连接上,可以同时发送多个请求和响应,避免了 HTTP/1.1 的“队头阻塞”问题,极大地提高了连接利用率。
- 头部压缩:使用 HPACK 算法压缩请求和响应的头部信息,减少了开销。
- 服务器推送:对于传统的 HTTP/1.x 来说,服务器可以主动向客户端推送数据,这对于某些流式场景非常有用。
[!tip]
- HTTP/1.0 协议,是请求响应模式,只能够客户端主动向服务器发起请求,但是服务器不能够像客户端主动响应数据,也就是我们说的单工。同时,它还是是一个短连接的协议,虽然底层的 TCP 是支持长连接的协议,但 HTTP/1.0 默认每次请求都重新建立 TCP 连接,所以是短连接。对于传输的数据,传输的是文本结构的数据(HTTP 的报文头是文本格式的,但传输的实体数据(Body)可以是任意格式(包括二进制))。
- HTTP/1.1 协议,它也是是请求响应模式,也是单工通信。但是在HTTP/1.0的协议的基础上,改为了有效的长连接。这里的长连接是在原来的短连接的基础上升级为长连接的,但是也只是延迟了一段时间,当有一段时间没通信之后,也会被关闭的。在这期间,我们可以通过升级的方式,升级为WebSocket协议,WebSocket协议是双工的,它能够实现服务器像客户端主动推送数据。HTTP/1.1 引入了管线化 (Pipelining),允许在同一连接上并行发送多个请求,但响应仍需按顺序返回,且浏览器普遍禁用此特性。
- HTTP/2.0协议,该协议是一个二进制协议,效率高于HTTP/1.x,但是由于传输数据的时候是二进制数据传输,所以他的可读性差。它可以用来实现双工通信。同时,还具有多路复用(一个请求,一个连接,可以实现请求多个数据)的特点。
2.2 gRPC 的核心特性
2.2.1 gRPC 的四种通信模式
[!tip]
这里的
.proto文件示例不用太关注语法,后续会在Protocol Buffers小节中详细讲解。
2.2.2.1 一元 RPC
这是最简单、最常见的模式,类似于普通的函数调用。
- 定义:客户端向服务器发送一个请求,服务器返回一个响应。
- 通信流: 请求 -> 处理 -> 响应。
- 类比:浏览网页。你在浏览器输入一个 URL(请求),服务器返回一个 HTML 页面(响应)。
详细工作流程:
- 客户端:调用生成的存根(Stub)方法,并发送一个请求消息。
- gRPC 客户端库:将消息序列化,通过 HTTP/2 发送一个请求帧(HEADERS 帧 + DATA 帧)。
- 服务端:收到请求后,执行你定义的业务逻辑方法。
- 服务端:方法执行完毕,返回一个响应消息。
- gRPC 服务端库:将响应消息序列化,通过 HTTP/2 发送一个响应帧(HEADERS 帧 + DATA 帧,通常还包含表示结束的 HEADERS 帧)。
- 客户端:收到响应,反序列化消息,存根方法返回结果。
.proto文件示例:
service Greeter {
// 定义一个一元 RPC 方法
rpc SayHello (HelloRequest) returns (HelloReply);
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}
适用场景:
- 简单的查询、创建、更新、删除操作。
- 任何不需要流式传输的请求-响应场景。
- 例如:用户登录、获取商品信息、下单支付。
2.2.2.2 服务端流式 RPC
客户端发送一个请求,服务端返回一个消息流。客户端从流中读取一系列消息,直到没有更多消息为止。
- 定义:客户端发送一个请求,服务器返回一个消息流(多个响应)。
- 通信流: 请求 -> 处理 -> 响应流。
- 类比:订阅新闻简报。你进行一次订阅(请求),之后会定期收到多封新闻邮件(响应流)。
详细工作流程:
- 客户端:调用存根方法,发送单个请求。此时会返回一个流对象(如
StreamObserver或AsyncStreamObserver)。 - 服务端:收到请求后,可以多次调用
onNext(response)方法,向流中写入多个响应消息。 - 服务端在发送完所有消息后,会调用
onCompleted()来关闭流。 - 客户端:在循环中不断从流对象读取消息,直到遇到流结束(EOF)或错误。
.proto文件示例:
service StockService {
// 定义一个服务端流式 RPC 方法
rpc GetStockPrices (StockRequest) returns (stream StockPrice);
}
message StockRequest {
string symbol = 1;
}
message StockPrice {
double price = 1;
string timestamp = 2;
}
适用场景:
- 服务器需要推送大量数据,而这些数据在请求时无法立即全部准备好。
- 实时通知或数据订阅。
- 例如:实时股票报价推送、服务器端日志文件下载、新闻 feed 流。
2.2.2.3 客户端流式 RPC
客户端发送一个消息流给服务器,服务器在收到所有消息后返回一个响应。
- 定义:客户端发送一个消息流(多个请求),服务器返回一个响应。
- 通信流: 请求流 -> 处理 -> 响应
- 类比:上传一个大文件。你不断地上传文件数据块(请求流),当服务器接收完所有数据块后,返回一个“上传成功”的确认(响应)。
详细工作流程:
- 客户端:调用存根方法,获取一个流对象。
- 客户端:多次调用
onNext(request)方法,向流中写入多个请求消息。 - 客户端发送完所有消息后,调用
onCompleted()通知服务器。 - 服务端:在方法中,通过一个请求流对象读取客户端发送的所有消息。当读到 EOF 时,执行业务逻辑,然后返回一个响应。
.proto文件示例:
service UploadService {
// 定义一个客户端流式 RPC 方法
rpc UploadFile (stream FileChunk) returns (UploadStatus);
}
message FileChunk {
bytes content = 1;
}
message UploadStatus {
string message = 1;
bool success = 2;
}
适用场景:
- 客户端需要上传大量数据。
- 需要聚合客户端数据后再处理的场景。
- 例如:传感器数据采集(多个传感器持续上报数据,服务器聚合后进行分析)、大文件上传、批量数据导入。
2.2.2.4 双向流式 RPC
客户端和服务端都使用一个读写流来发送一系列消息。这两个流是独立的,因此客户端和服务端可以以任意顺序读写。
- 定义:客户端和服务端都发送一个消息流。两个流相互独立,可以同时进行读写。
- 通信流: 请求流 <-> 响应流。
- 类比:实时在线聊天。你和朋友可以随时发送消息(请求流),也可以随时收到对方的消息(响应流),发送和接收是同时进行的。
详细工作流程:
- 客户端:调用存根方法,获取一个流对象用于读写。
- 客户端和服务端:可以几乎同时开始发送消息流。
- 客户端可以随时调用
onNext(request)发送请求。 - 服务端可以随时调用
onNext(response)发送响应。 - 任何一方都可以随时关闭自己的流(调用
onCompleted())。通常,当一方关闭写流后,它会继续等待读取另一方的流,直到对方也关闭。
.proto文件示例:
service ChatService {
// 定义一个双向流式 RPC 方法
rpc Chat (stream ChatMessage) returns (stream ChatMessage);
}
message ChatMessage {
string user = 1;
string text = 2;
}
适用场景:
- 需要长时间连接的实时通信。
- 双方都需要主动推送消息的场景。
- 例如:在线聊天室、游戏、实时指令控制系统(如无人机控制)、复杂的多步协商过程。
2.2.2 超时
gRPC 默认的请求的超时时间是很长的,当你没有设置请求超时时间时,所有在运行的请求都占用大量资源且可能运行很长的时间,导致服务资源损耗过高,使得后来的请求响应过慢,甚至会引起整个进程崩溃。为了避免这种情况,我们的服务应该设置超时时间。
与传统的相对超时不同,gRPC 使用绝对截止时间的概念。
核心特点:
- 绝对时间:Deadline是一个具体的时间点(如"2024-01-01 10:30:00"),而不是相对时长(如"5秒后"),但是API为了方便,通常允许你设置"持续时间",例如:
withDeadlineAfter(5, SECONDS)。gRPC 会在内部将这个相对时常转为绝对时间点。 - 自动传播:在服务链调用中,Deadline会自动从客户端传播到所有下游服务。
- 上下文感知:服务端可以通过检查Context来感知是否已超时。
为什么使用Deadline而不是Timeout?
- 更准确地反映分布式系统中的时间约束。
- 避免在服务链中累计超时时间计算错误。
- 所有服务共享同一个时间目标。
Deadline的状态:
- 未超时:正常处理请求。
- 已超时:Context会被取消,服务应停止不必要的处理。
[!caution]
- 客户端应设置合理的 deadline,避免无限等待(默认无 deadline 会导致潜在挂起)。
- 服务端应读取 deadline 并尽早检查
Context是否已取消,以便及时中断耗时计算或外部调用。- Deadline 会以 protobuf 元数据形式传递(gRPC 内部实现),服务端可通过
Context.current().getDeadline()读取。
超时处理代码示例:
// 客户端 - 设置Deadline
public class DeadlineClient {
public void callWithDeadline() {
// 创建Channel和Stub
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8080)
.usePlaintext()
.build();
GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(channel);
try {
// 设置2秒的Deadline
HelloRequest request = HelloRequest.newBuilder().setName("World").build();
HelloReply response = stub
.withDeadline(Deadline.after(2, TimeUnit.SECONDS))
.sayHello(request);
System.out.println("成功收到响应: " + response.getMessage());
} catch (StatusRuntimeException e) {
// 处理超时异常
if (e.getStatus().getCode() == Status.Code.DEADLINE_EXCEEDED) {
System.out.println("请求超时!服务端处理时间过长");
} else {
System.out.println("RPC调用失败: " + e.getStatus());
}
} finally {
channel.shutdown();
}
}
}
// 服务端 - 检查Deadline
public class DeadlineAwareService extends GreeterGrpc.GreeterImplBase {
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
// 重要:在长时间操作中定期检查Deadline
for (int i = 0; i < 10; i++) {
// 检查当前Context是否已被取消(超时)
if (Context.current().isCancelled()) {
System.out.println("检测到客户端已取消请求,停止处理");
responseObserver.onError(Status.CANCELLED.asRuntimeException());
return;
}
// 模拟工作
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
responseObserver.onError(Status.INTERNAL.asRuntimeException());
return;
}
}
// 返回响应
HelloReply reply = HelloReply.newBuilder()
.setMessage("Hello " + request.getName())
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}
2.2.3 元数据
元数据是 gRPC 中用于在客户端和服务端之间传递额外信息的机制,类似于 HTTP 头部。
核心特点:
- 键值对结构:由键(Key)和值(Value)组成。
- 类型安全:键需要预定义,值有特定的序列化方式。
- 双向传递:可以发送请求元数据和接收响应元数据。
元数据的类型:
- 请求头(Headers):在 RPC 开始时发送。
- 尾随数据(Trailers):在 RPC 结束时发送。
常见用途:
- 认证信息:Bearer tokens、API keys
- 跟踪信息:Trace ID、Span ID
- 路由信息:用户ID、租户信息
- 控制信息:缓存指令、特性开关
元数据键的类型:
- ASCII字符串:
Metadata.Key.of("key", Metadata.ASCII_STRING_MARSHALLER) - 二进制数据:
Metadata.Key.of("key-bin", Metadata.BINARY_BYTE_MARSHALLER)
[!caution]
- 不要把大体积二进制数据放到 Metadata(headers 有大小限制,且在某些传输层会有限制)。
- 认证 token 常放在 header(例如
authorization)。- 读取 header 在服务器 interceptor 中最方便(
interceptCall(..., Metadata headers, ...))。
元数据代码示例:
// 定义通用的元数据键
public class MetadataKeys {
public static final Metadata.Key<String> AUTH_TOKEN =
Metadata.Key.of("auth-token", Metadata.ASCII_STRING_MARSHALLER);
public static final Metadata.Key<String> CLIENT_VERSION =
Metadata.Key.of("client-version", Metadata.ASCII_STRING_MARSHALLER);
public static final Metadata.Key<String> TRACE_ID =
Metadata.Key.of("trace-id", Metadata.ASCII_STRING_MARSHALLER);
public static final Metadata.Key<String> USER_ID =
Metadata.Key.of("user-id", Metadata.ASCII_STRING_MARSHALLER);
}
// 客户端 - 发送元数据
public class MetadataClient {
public void callWithMetadata() {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8080)
.usePlaintext()
.build();
// 创建包含元数据的Stub
Metadata headers = new Metadata();
headers.put(MetadataKeys.AUTH_TOKEN, "bearer abc123");
headers.put(MetadataKeys.CLIENT_VERSION, "1.0.0");
headers.put(MetadataKeys.TRACE_ID, "trace-" + System.currentTimeMillis());
headers.put(MetadataKeys.USER_ID, "user123");
GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(channel);
stub = MetadataUtils.attachHeaders(stub, headers);
try {
HelloRequest request = HelloRequest.newBuilder().setName("World").build();
HelloReply response = stub.sayHello(request);
System.out.println("响应: " + response.getMessage());
} finally {
channel.shutdown();
}
}
}
// 服务端 - 处理元数据
public class MetadataAwareService extends GreeterGrpc.GreeterImplBase {
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
// 在实际应用中,需要通过ServerInterceptor获取Metadata
// 这里演示业务逻辑
// 认证逻辑(简化)
String authToken = "从Metadata中获取"; // 实际从拦截器获取
if (!isValidToken(authToken)) {
responseObserver.onError(Status.UNAUTHENTICATED
.withDescription("无效的认证令牌")
.asRuntimeException());
return;
}
// 处理请求
HelloReply reply = HelloReply.newBuilder()
.setMessage("Hello " + request.getName())
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
private boolean isValidToken(String token) {
return token != null && token.startsWith("bearer ");
}
}
2.2.4 拦截器
拦截器是 gRPC 中的 AOP(面向切面编程)机制,允许在 RPC 调用的不同阶段插入自定义逻辑。
核心概念:
- 横切关注点:将日志、认证、监控等通用逻辑从业务代码中分离。
- 调用链:多个拦截器按顺序形成处理链。
- 透明性:业务代码无需知道拦截器的存在。
拦截器类型:
- 客户端拦截器
- 在请求发送前执行。
- 可以修改请求参数、添加头部信息。
- 处理响应和异常。
- 服务端拦截器
- 在请求处理前执行。
- 可以验证认证、记录日志。
- 处理响应发送。
拦截器的执行阶段:
客户端拦截器: 创建调用 → 发送头部 → 发送消息 → 接收头部 → 接收消息 → 完成
服务端拦截器: 接收调用 → 接收头部 → 接收消息 → 发送头部 → 发送消息 → 完成
拦截器代码示例:
// 日志拦截器
public class LoggingInterceptor implements ServerInterceptor {
private static final Logger logger = Logger.getLogger(LoggingInterceptor.class.getName());
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
long startTime = System.currentTimeMillis();
String methodName = call.getMethodDescriptor().getFullName();
logger.info("开始处理RPC调用: " + methodName);
logger.info("请求头: " + headers);
// 包装ServerCall以记录响应信息
ServerCall<ReqT, RespT> loggingCall = new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendHeaders(Metadata responseHeaders) {
logger.info("发送响应头 for " + methodName);
super.sendHeaders(responseHeaders);
}
@Override
public void sendMessage(RespT message) {
logger.info("发送响应消息 for " + methodName);
super.sendMessage(message);
}
@Override
public void close(Status status, Metadata trailers) {
long duration = System.currentTimeMillis() - startTime;
if (status.isOk()) {
logger.info("RPC调用成功: " + methodName + ", 耗时: " + duration + "ms");
} else {
logger.warning("RPC调用失败: " + methodName + ", 状态: " + status + ", 耗时: " + duration + "ms");
}
super.close(status, trailers);
}
};
return next.startCall(loggingCall, headers);
}
}
// 认证拦截器
public class AuthenticationInterceptor implements ServerInterceptor {
private static final Metadata.Key<String> AUTH_HEADER =
Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER);
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
// 获取认证头
String authHeader = headers.get(AUTH_HEADER);
// 验证认证
if (!isAuthenticated(authHeader)) {
// 认证失败,立即返回错误
call.close(Status.UNAUTHENTICATED
.withDescription("身份验证失败")
.withCause(new RuntimeException("Invalid credentials")),
new Metadata());
return new ServerCall.Listener<ReqT>() {}; // 返回空的监听器
}
// 认证成功,继续处理
String userId = extractUserId(authHeader);
// 将用户信息放入Context,供后续服务使用
Context context = Context.current().withValue(
ContextKeys.USER_ID, userId);
return Contexts.interceptCall(context, call, headers, next);
}
private boolean isAuthenticated(String authHeader) {
// 实现实际的认证逻辑
return authHeader != null && authHeader.startsWith("Bearer valid-");
}
private String extractUserId(String authHeader) {
// 从token中提取用户ID
return authHeader.replace("Bearer valid-", "");
}
}
// 上下文键定义
public class ContextKeys {
public static final Context.Key<String> USER_ID =
Context.key("user-id");
public static final Context.Key<String> CLIENT_INFO =
Context.key("client-info");
}
// 使用拦截器的服务端
public class ServerWithInterceptors {
public static void main(String[] args) throws IOException, InterruptedException {
Server server = ServerBuilder.forPort(8080)
.addService(new GreeterServiceImpl())
// 注意:拦截器的顺序很重要!
.intercept(new LoggingInterceptor()) // 最先执行,最后结束
.intercept(new AuthenticationInterceptor()) // 第二个执行
.intercept(new MetricsInterceptor()) // 最后执行,最先结束
.build();
server.start();
server.awaitTermination();
}
}
3. Protocol Buffers
参考文档:Protocol Buffers 中文文档。
Protocol Buffers 是一种语言无关、平台无关、可扩展的用于序列化结构化数据的机制。
它就像 JSON,但更小更快,并且能生成原生语言绑定。你只需定义一次你想要的数据结构,然后就可以使用特殊生成的源代码,轻松地使用多种语言从各种数据流中读写你的结构化数据。
Protocol Buffers 是定义语言(在 .proto 文件中创建)、proto 编译器生成的用于与数据交互的代码、特定语言的运行时库、写入文件(或通过网络连接发送)的数据的序列化格式以及序列化数据的组合。
3.1 为什么使用 Protocol Buffers
在数据序列化领域,我们已经有了像 JSON 和 XML 这样广为人知和使用的格式。那么,为什么 Google 要发明 Protocol Buffers,并且为什么它在微服务、gRPC 和数据存储等领域变得如此流行?主要有以下几个关键原因:
-
卓越的性能(更小、更快)
这是 Protobuf 最显著的优势。与 JSON 和 XML 这样的文本格式不同,Protobuf 将数据序列化为二进制格式。它不使用字段名(如
"username","user_id"),而是使用紧凑的字段编号(如1,2)来标识字段。同时,它采用了各种编码技术(如 Varints 用于小整数)来进一步减小体积。由于数据体积更小,在网络传输时速度自然更快。同时,二进制格式的解析速度远快于 JSON/XML 的文本解析。Protobuf 的编解码器是高度优化的,反序列化过程几乎可以直接映射到内存中的数据结构,而不需要复杂的词法分析。
-
强类型约束和清晰的接口定义(.proto 文件)
Protobuf 要求你必须先在一个
.proto文件中严格定义数据的结构(即message)。这个文件就是你的数据契约。例如:// 定义阶段:明确数据结构 message User { int32 id = 1; string email = 2; repeated string phones = 3; // repeated 表示数组/列表 }这种方式带来了巨大的好处:
- 代码自文档化:
.proto文件清晰地展示了数据结构,任何开发者都能一目了然。 - 编译时类型检查:当你使用
protoc编译器生成代码后,生成的类(如 Java 的User类)是强类型的。如果你试图将一个字符串赋值给id字段,在编译时就会报错,而不是在运行时才发现类型不匹配的异常。 - 减少 Bug:避免了在运行时因字段名拼写错误或类型错误而导致的隐蔽问题。
- 代码自文档化:
-
跨语言和跨平台
这是 Protobuf 的另一个核心设计目标。你用一种语言(与语言无关的
.proto语法)定义数据结构,然后 Proto 编译器 (protoc) 可以为你选择的任何支持的语言生成源代码。支持语言:包括但不限于 C++, C#, Dart, Go, Java, Kotlin, Python, Ruby, Objective-C, PHP, Swift。
一致性:无论你的团队使用 Python 开发后端服务,使用 Go 开发中间件,使用 Java 开发 Android 应用,还是使用 Swift 开发 iOS 应用,它们都可以使用由同一个 .proto 文件生成的代码来互相通信。这确保了数据模型在所有端的一致性。
-
完美的向前和向后兼容性
在软件开发和系统演进中,API 和数据结构的变更是家常便饭。Protobuf 被设计为可以优雅地处理这种变化。
核心规则:永远不要修改字段的编号。
假设 v1 版本的
User消息是这样的:message User { int32 id = 1; string name = 2; }在你想在 v2 版本中增加一个
email字段,并弃用name字段:message User { int32 id = 1; string name = 2 [deprecated=true]; // 标记为弃用,但字段编号保留 string email = 3; // 新的字段使用新的编号 }这种机制保证了完美的兼容性:
- 向后兼容:使用 v1 老代码的程序可以解析由 v2 新代码序列化的数据。它会忽略掉它不认识的字段(编号为 3 的
email字段),正常读取id和name。 - 向前兼容:使用 v2 新代码的程序可以解析由 v1 老代码序列化的数据。对于缺失的
email字段,它会简单地赋予其默认值(如空字符串)。
这种设计使得系统各个部分的独立升级和部署变得非常容易。

- 向后兼容:使用 v1 老代码的程序可以解析由 v2 新代码序列化的数据。它会忽略掉它不认识的字段(编号为 3 的
-
自动代码生成和开发效率
一旦你定义了
.proto文件,protoc编译器就会为你生成大量的样板代码。这些生成的代码包含了:- 数据的构建器/设置器(在不同语言中模式不同,如 Builder 模式或简单的 setter)。
- 完整的序列化/反序列化方法(如
SerializeToString(),ParseFromString())。 - 验证和访问方法。
这意味着开发者无需手动编写繁琐的数据解析、验证和序列化代码,可以更专注于业务逻辑,从而显著提高开发效率。
3.2 Protocol Buffer 编译器安装
[!tip]
安装Protocol Buffer 编译器的目的是可以把protobuf的接口定义语言(IDL),转换为具体的某一种开发语言,如Java。
官方安装介绍:Protocol Buffer 编译器安装
GitHub下载安装:Protocol Buffer 编译器安装GitHub地址。
我的当前电脑是Mac,我既可以使用包管理器Homebrew安装,也可以使用二进制文件安装,这里我介绍使用二进制文件安装。


之后下载完毕之后,在目录中解压,,之后配置环境变量:
export PROTOCOL_BUFFERS_HOME="/xxxx/protoc-33.0-osx-aarch_64/bin"
export PATH=$PATH:$PROTOCOL_BUFFERS_HOME
保存之后退出,重载环境变量:
source ~/.bash_profile
最后使用命令查看安装情况:
protoc --version
如果有输出版本号,那么就算安装成功了。
3.3 Protocol Buffers语法详解
参考文章:
在正式开始使用Protocol Buffers之前,你要编写一个.proto文件。所有的protobuf相关的内容,都是写在这个文件中的。
-
语法版本(syntax)
对于
.proto文件的第一行,一般是指定protobuf版本,对于protobuf来讲,有proto 2、proto 3、editions三个版本。目前主流的是使用proto 3。所以在文件的第一行,使用关键字
syntax来指定版本:syntax = "proto3"; -
注释
同样,在protobuf中也是支持注释的,使用
//表示单行注释,使用/* */表示多行注释 -
与Java语言相关的语法
该部分语法只适用于Java语言,其他语言可能不适用。
// 表示后续protobuf生成的java代码,是一个源文件还是多个源文件(xxx.java) option java_multiple_files = false; // 指定protobuf生成的类将放置到哪个包中 option java_package = "cn.yunrain"; // 指定protobuf生成的外部类的名字,改外部类用于管理内部类(内部类才是真正开发使用的) option java_outer_classname = "UserService"; -
逻辑包(packages)
你可以向
.proto文件添加一个可选的package说明符,以防止协议消息类型之间的名称冲突。有点像Java中的包。例如package foo.bar; message Open { ... }然后你可以在定义你的消息类型的字段时使用包说明符:
message Foo { ... foo.bar.Open open = 1; ... }包说明符影响生成代码的方式取决于你选择的语言,对于Java和Kotlin来说,除非你在你的
.proto文件中明确提供一个option java_package,否则该包将用作 Java 包。[!caution]
即使
package指令不直接影响生成的代码(例如在 Python 中),仍然强烈建议为.proto文件指定包,否则可能导致描述符中的命名冲突,并使 proto 对其他语言不可移植。 -
导入(import)
在实际开发过程中,肯定是有多个
.proto文件的,不可能把所有的protobuf内容都写到一个.proto文件。比如我这里有两个.proto文件:UserService.proto OrderService.proto其中,如果我们
OrderService.proto文件想要使用UserService.proto文件中的定义的类型,那么此时我们就在OrderService.proto文件中导入(import)UserService.proto文件,这样就可以不用重复开发了。例如:
import "文件位置/UserService.proto" -
基本类型

对于上图中的这些基本类型,后续都在message中使用。
-
枚举(enum)
当您定义消息类型时,您可能希望它的某个字段只能是预定义列表中的值之一。例如,假设您想为每个
SearchRequest添加一个corpus字段,其中 corpus 可以是UNIVERSAL、WEB、IMAGES、LOCAL、NEWS、PRODUCTS或VIDEO。您可以通过在您的消息定义中添加一个enum并为每个可能的值定义一个常量来非常简单地实现这一点。在下面的例子中,我们添加了一个名为
Corpus的enum,包含了所有可能的值,以及一个类型为Corpus的字段:enum Corpus { CORPUS_UNSPECIFIED = 0; // 枚举值的编号必须从0开始 CORPUS_UNIVERSAL = 1; CORPUS_WEB = 2; CORPUS_IMAGES = 3; CORPUS_LOCAL = 4; CORPUS_NEWS = 5; CORPUS_PRODUCTS = 6; CORPUS_VIDEO = 7; } message SearchRequest { string query = 1; int32 page_number = 2; int32 results_per_page = 3; Corpus corpus = 4; }SearchRequest.corpus字段的默认值是CORPUS_UNSPECIFIED,因为这是枚举中定义的第一个值。同时,枚举值的编号必须从0开始。 -
消息(Message)
消息(Message)是protobuf中的重中之重,关键字就是
message,你完全可以把它看中Java中的一个封装类。例如如下消息:message SearchRequest { string query = 1; // string 代表数据类型;1代表字段编号 int32 page_number = 2; int32 results_per_page = 3; }对于
SearchRequest消息来说,string query = 1;;这段代码就是由三部分字段类型、字段编号、字段基数组成。字段类型:就是string,也就上述提到的基本类型。
字段编号:等号后面的值就是字段编号,你必须为消息定义中的每个字段赋予一个介于
1和536,870,911之间的编号,并遵守以下限制:- 给定的编号必须在该消息的所有字段中是唯一的。
- 字段编号
19,000到19,999为 Protocol Buffers 实现保留。如果你在消息中使用这些保留的字段编号,protocol buffer 编译器会报错。 - 您不能使用任何保留的字段编号或任何已分配给扩展的字段编号。
一旦您的消息类型投入使用,这个编号就不能更改,因为它在消息的有线格式中标识该字段。“更改”字段编号等同于删除该字段并创建一个具有相同类型但编号不同的新字段。同时字段编号永远不应被重用。您应该为最常设置的字段使用 1 到 15 的字段编号。较小的字段编号值在有线格式中占用更少的空间。
字段基数:字段基数用于修饰message中的字段。
-
Singular:表示单一,也是默认的关键字,表示这个字段的值只能是0个或者1个。上述的
string query = 1;中,string 前面其实就是Singular,只不过被省略了,因为他是默认值。 -
repeated:表示这个字段的返回值可以是多个,等价于Java中的List。
-
map:这是一个键值对字段类型,Map 字段只是一种特殊类型的 repeated 字段的简写。例如:
message Test6 { map<string, int32> g = 7; }这实际上等同于:
message Test6 { message g_Entry { string key = 1; int32 value = 2; } repeated g_Entry g = 7; }
[!tip]
在protobuf中,消息是可以定义多个的,和Java中的类一样,你具体要定义多少个消息,取决于你自己的业务。
-
消息嵌套
你可以在其他消息类型内部定义和使用消息类型,如下例所示——这里的
Result消息是在SearchResponse消息内部定义的。message SearchResponse { message Result { string url = 1; string title = 2; repeated string snippets = 3; } repeated Result results = 1; }你可以随心所欲地嵌套消息。在下面的例子中,请注意两个名为
Inner的嵌套类型是完全独立的,因为它们是在不同的消息中定义的:message Outer { // Level 0 message MiddleAA { // Level 1 message Inner { // Level 2 int64 ival = 1; bool booly = 2; } } message MiddleBB { // Level 1 message Inner { // Level 2 int32 ival = 1; bool booly = 2; } } } -
optional关键字
在 proto2 中,字段规则是必须显式声明的:
required:字段必须提供值,否则消息被视为"未初始化”。optional:字段可选,可以不提供值。repeated:字段可以重复多次。
但是在Proto3 中,则简化了字段规则:
- 移除了
required(因为它容易导致兼容性问题)。 - 所有字段默认都是可选的(类似 proto2 的
optional)。 - 移除了显式的
optional关键字(在早期版本中)。 - 引入了字段 Presence 的概念。
在 Proto3 的早期版本中,所有字段都是"可选的",但你无法区分:
- 字段被显式设置为默认值(如
0、""、false)。 - 字段根本没有被设置。
// Proto3 早期版本 syntax = "proto3"; message User { string name = 1; // 总是可选的,无法检测是否设置 int32 age = 2; // 如果 age=0,不知道是显式设置还是未设置 string email = 3; }问题:在某些业务场景中,我们需要区分"年龄为 0"和"年龄未设置",这在早期 proto3 中是无法实现的。
从 Protocol Buffers 3.12 版本开始,
optional关键字重新被引入到 proto3 中,但它的含义发生了变化:它现在用于启用字段 Presence 检测。// Proto3.12+ 使用 optional 启用 Presence 检测 syntax = "proto3"; message User { string name = 1; // 普通字段 optional int32 age = 2; // 启用 Presence 检测 optional string email = 3; // 启用 Presence 检测 }当字段标记为
optional时,生成的代码会提供方法来检测该字段是否被显式设置:User user = User.newBuilder() .setName("Alice") .build(); System.out.println(user.getAge()); // 输出: 0 System.out.println(user.hasAge()); // 输出: false user = user.toBuilder() .setAge(0) .build(); System.out.println(user.getAge()); // 输出: 0 System.out.println(user.hasAge()); // 输出: true -
oneof关键字
oneof 用于处理这样一种场景:一个消息(Message)中可能有多个字段,但这些字段在同一时间只会有一个被设置值。您可以使用 oneof 功能来强制执行此行为并节省内存。
oneof 字段类似于 optional 字段,不同之处在于 oneof 中的所有字段共享内存,并且在同一时间最多只能设置一个字段。设置 oneof 的任何成员都会自动清除所有其他成员。您可以使用特殊的
case()或WhichOneof()方法(取决于您选择的语言)来检查 oneof 中设置了哪个值(如果有)。[!caution]
- 设置了多个值,则由 protobuf 中的顺序决定的最后一个设置的值将覆盖所有先前的值。
- oneof 字段的字段编号必须在封闭消息中是唯一的。
- 你可以使用oneof添加任何类型的字段,但是
map与repeated字段除外。如果需要向 oneof 添加重复字段,可以使用包含该重复字段的消息。
假设你正在设计一个
ResponseMessage,它可能返回一个成功结果,也可能返回一个错误。在没有oneof时,你可能会这样设计:message ResponseMessage { int32 success_code = 1; string data = 2; int32 error_code = 3; string error_message = 4; }这种设计存在一个问题:一个响应不可能同时既是成功的又是失败的,但我们的结构体却允许
success_code、data、error_code和error_message同时存在值。这会导致逻辑上的歧义和额外的验证工作。使用
oneof可以完美地解决这个问题:message ResponseMessage { // oneof 块开始 oneof result { Success success = 1; Error error = 2; } // oneof 块结束 // 可以定义其他不属于 oneof 的普通字段 string request_id = 3; // 定义嵌套的 Success 和 Error 消息 message Success { int32 code = 1; string data = 2; } message Error { int32 code = 1; string message = 2; } } -
Any关键字
Any类型允许你在消息中嵌入任意类型的消息,而无需在.proto定义中直接引入它们。这类似于编程语言中的Object类型或“类型擦除”容器。当你的系统需要处理来自不同模块或服务的、在编译时未知的任意类型数据时,
Any非常有用。常见场景包括:- 通用事件总线:一个事件需要携带不同结构的负载(Payload)。
- RPC 通用返回值:一个 RPC 方法可能返回多种不同类型的消息。
- 插件系统:主程序需要处理插件定义的未知消息类型。
要使用
Any,你需要先导入它的定义:// 在 .proto 文件顶部导入 import "google/protobuf/any.proto"; message Event { string event_type = 1; google.protobuf.Any payload = 2; // 可以容纳任何消息 }Any本质上是一个容器,它包含两个部分:- 序列化的消息字节:你放入的任何消息都会被序列化成字节存储在这里。
- 类型URL:一个唯一标识符,用于说明这些字节代表的是什么类型的消息。格式通常是
type.googleapis.com/+ 消息的完整名称。例如,如果你把一个Person消息打包进Any,它的类型URL可能看起来像:type.googleapis.com/tutorial.Person。
-
服务(service)
在 Protocol Buffers 中,service 用于定义一组相关的远程过程调用(RPC)方法。它类似于编程中的接口(Interface)或抽象类,只定义方法的签名,而不包含具体实现。具体的实现由服务端实现。
service ServiceName { rpc MethodName(RequestMessage) returns (ResponseMessage) {} }
4. gRPC Demo 示例
项目结构:
我们先来创建一个父Maven项目:





4.1 gRPC-API 模块
在API模块,一般是定义message与service,同时,一般将message与service放到proto文件夹下,改文件夹与../src/main/java平级。

现在我编写Hello.proto文件如下:
// 指定使用的Protocol Buffers语法版本,目前主流使用proto3
syntax = "proto3";
// Java相关的配置选项
option java_multiple_files = false; // 将所有生成的类放在一个Java文件中
option java_package = "cn.yunrain"; // 指定生成的Java类的包名
option java_outer_classname = "HelloProto"; // 指定外层包装类的类名
// 定义请求消息体
message HelloOneRequest{
string name = 1;
}
// 定义响应消息体
message HelloOneResponse{
string result = 1;
}
// 定义gRPC服务接口
service HelloOneService{
// 定义一个RPC方法:接收HelloOneRequest参数,返回HelloOneResponse
rpc helloOne(HelloOneRequest) returns (HelloOneResponse);
}
现在protobuf文件定义好了,我们可以使用protobuf编译器,来让他转为对应的Java代码。你可以使用protoc命令,但是这样太繁琐了,我们直接让他集成到Maven中,使用Maven插件来完成。
插件地址:grpc-java。
在开始之前,你得引入对应的依赖:
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>1.76.0</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>1.76.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>1.76.0</version>
</dependency>
<!-- 除了grpc-java的依赖,你还得引入protobuf-java的依赖,否则会报错 -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>4.31.1</version>
</dependency>

之后引入对应的插件:
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.7.1</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.25.8:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.76.0:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
现在我将这些依赖和Maven都加入到API模块,对应插件的含义:

现在我们先执行compile命令,看看怎么个事!



接下来我们执行插件的compile-custom命令在看看:



从生成的Grpc服务类可知,里面有很多内部类,大体可以分为以下几个部分:
-
serviceName+Impl+Base:这是一个抽象类,在实际开发服务端端(server)时候,往往需要继承这个类,然后重写里面的方法。
-
XxxStub:凡是以Stub结尾的类,对应的都是客户端的代理对象。它们的区别其实就是网络通信的方式不同。
类名 类型 作用描述 使用场景 XxxStub类 异步非阻塞Stub 客户端异步调用,使用StreamObserver XxxBlockingStub类 同步阻塞Stub 客户端同步调用,线程阻塞等待响应 XxxFutureStub类 可同步异步Stub 客户端调用返回ListenableFuture,只能应用于一元 RPC XxxBlockingV2Stub类 阻塞V2版本Stub 增强的同步Stub,可能支持更多特性 -
XxxSupplier:凡是以Supplier结尾的类,其实都是描述符提供者。
类名 类型 作用描述 XxxBaseDescriptorSupplier类 提供基础服务描述符 XxxFileDescriptorSupplier类 提供proto文件描述符 XxxMethodDescriptorSupplier类 提供方法级别描述符
好了,现在api模块整理完毕之后,接下来看看server模块。
4.2 gRPC-Server 模块
[!tip]
该模块需要引入之前的API模块。
在server模块中,一般你需要实现业务接口,添加具体的功能,一般还是(Mybatis+SQL),这一步我就省略了,之后你就可以创建服务端了。
但是注意,你的业务代码一定要继承XxxImplBase抽象类:
package cn.yunrain.service;
import cn.yunrain.HelloOneServiceGrpc;
import cn.yunrain.HelloProto;
import io.grpc.stub.StreamObserver;
public class HelloServiceImpl extends HelloOneServiceGrpc.HelloOneServiceImplBase {
/**
* 重写 gRPC 框架生成的 helloOne 方法。
* 该方法由客户端请求时被自动调用。
*
* @param request 客户端请求对象,包含客户端传递的参数(此处是 name)
* @param responseObserver 用于向客户端发送响应的流式观察者
*/
@Override
public void helloOne(HelloProto.HelloOneRequest request, StreamObserver<HelloProto.HelloOneResponse> responseObserver) {
// 1. 接受客户端(client)请求参数
String name = request.getName();
// 2. 执行业务 service+dao 调用对应的业务功能,
// 此处仅简单打印请求内容,用于测试或调试。
System.out.println("name:" + name);
// 3. 构建HelloOneResponse响应对象
HelloProto.HelloOneResponse response = HelloProto.HelloOneResponse.newBuilder()
.setResult("Hello " + name)
.build();
// 4. 通过 responseObserver 向客户端发送响应消息
responseObserver.onNext(response);
// 通知客户端:服务端已完成本次响应(流式调用的结束信号)
responseObserver.onCompleted();
}
}
这里大家有可能有疑问, 为啥我的proto文件中定义的service服务接口中helloOne的返回值是HelloOneResponse,为啥这里的却是void呢?
这是因为 gRPC 采用异步回调模型,虽然proto定义中返回值是HelloOneResponse,但实际响应是通过StreamObserver参数的onNext()方法回调返回的,所以Java方法返回void。
上诉业务代码写完了,接下来我们来写服务端:
package cn.yunrain;
import cn.yunrain.service.HelloServiceImpl;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import java.io.IOException;
public class App {
public static void main(String[] args) throws IOException, InterruptedException {
// 创建gRPC服务器实例,指定监听端口9000
Server server = ServerBuilder
.forPort(9000) // 设置服务端的端口
.addService(new HelloServiceImpl()) // 发布服务实现类,将自定义的业务服务注册到gRPC服务器
.build();
// 启动gRPC服务器,开始监听客户端连接请求
server.start();
// 阻塞主线程,保持服务器运行直到主动关闭或发生异常
server.awaitTermination();
}
}
4.3 gRPC-Client 模块
[!tip]
该模块需要引入之前的API模块。
在这个模块中,主要是客户端通过代理对象实现远端功能的调用。
package cn.yunrain;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
/**
* Hello world!
*
*/
public class App {
public static void main( String[] args ) {
// 创建通信的信道
ManagedChannel channel = ManagedChannelBuilder
.forAddress("localhost", 9000) // 这里的地址需要和服务端的地址保持一致
.usePlaintext()
.build();
try {
// 获取代理对象XxxStub
HelloOneServiceGrpc.HelloOneServiceBlockingStub helloOneService = HelloOneServiceGrpc.newBlockingStub(channel);
// 准备请求参数
HelloProto.HelloOneRequest oneRequest = HelloProto.HelloOneRequest.newBuilder()
.setName("nxz")
.build();
// 执行远程方法,RPC调用
HelloProto.HelloOneResponse oneResponse = helloOneService.helloOne(oneRequest);
System.out.println("result = " + oneResponse.getResult());
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
channel.shutdown();
}
}
}
之后,先启动服务端,再启动客户端,你就能够发现RPC调用是否正常了。
4.4 gRPC 四种通信模式示例
[!tip]
该小节是以上述4.1~4.3小节所建立的模块为基础的。
4.4.1 一元 RPC
上述模块给的例子其实就是一元 RPC,这里不再赘述。
4.4.2 服务端流式 RPC
客户端发送一个请求,服务端返回一个消息流。客户端从流中读取一系列消息,直到没有更多消息为止。
语法:
service 服务名称 {
// 定义一个服务端流式 RPC 方法
rpc 具体gRPC方法 (请求消息) returns (stream关键字 响应消息);
}
首先在 API 模块中定义 gRPC 服务,如图:

针对 API 模块,重新执行 Maven 插件使其生成对应的最新的代码:

之后编写服务端:
package cn.yunrain.service;
import cn.yunrain.HelloOneServiceGrpc;
import cn.yunrain.HelloProto;
import io.grpc.stub.StreamObserver;
public class HelloServiceImpl extends HelloOneServiceGrpc.HelloOneServiceImplBase {
// 之前的代码...
@Override
public void helloServerStream(HelloProto.HelloOneRequest request, StreamObserver<HelloProto.HelloOneResponse> responseObserver){
// 针对服务端流式 RPC
String name = request.getName();
System.out.println("name:" + name);
for (int i = 0; i < 10; i++) {
HelloProto.HelloOneResponse streamResponse = HelloProto.HelloOneResponse.newBuilder()
.setResult("Hello current server response index : " + i)
.build();
// 通过 responseObserver 向客户端发送响应消息
responseObserver.onNext(streamResponse);
// 每次休眠500ms 模拟流式响应
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
// 通知客户端:服务端已完成本次响应(流式调用的结束信号)
responseObserver.onCompleted();
}
}
从服务端流式 RPC 服务端的代码中,能够看出,服务端在持续(for循环)的发送响应(responseObserver.onNext()),当所有的响应都发送完毕之后,才调用的 responseObserver.onCompleted()。
客户端代码:
package cn.yunrain;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.util.Date;
import java.util.concurrent.TimeUnit;
public class GrpcClientTwo {
public static void main(String[] args) throws InterruptedException {
// 创建通信信道,使用明文传输(不加密,适用于开发环境)
ManagedChannel channel = ManagedChannelBuilder
.forAddress("localhost", 9000)
.usePlaintext() // 在生产环境中应使用TLS加密
.build();
// 获取异步客户端代理(Stub)
// 注意,这里这里使用newStub而不是newBlockingStub,因为要处理服务端 流式 响应
// 同步阻塞式(BlockingStub)的代理 会等待所有响应完成,及调用了onCompleted()方法的时候,客户端才能够获取到值,不符合我们想要的流式效果
HelloOneServiceGrpc.HelloOneServiceStub helloOneService = HelloOneServiceGrpc.newStub(channel);
// 构建请求消息
HelloProto.HelloOneRequest request = HelloProto.HelloOneRequest.newBuilder()
.setName("server stream nxz")
.build();
try {
// 调用服务端流式RPC方法
// 第一个参数:请求对象
// 第二个参数:响应观察者,用于处理服务端的流式响应
helloOneService.helloServerStream(request, new StreamObserver<HelloProto.HelloOneResponse>() {
@Override
public void onNext(HelloProto.HelloOneResponse helloOneResponse) {
// 服务端每次发送一个流式响应项(onNext)时触发
System.out.println("当前时间:" + new Date() + ";收到服务端流式响应:" + helloOneResponse.getResult());
}
@Override
public void onError(Throwable throwable) {
// 当RPC调用过程中发生错误时触发
System.err.println("当前时间:" + new Date() + ";RPC调用发生错误: " + throwable.getMessage());
throwable.printStackTrace();
}
@Override
public void onCompleted() {
// 服务端完成所有流式响应后触发(调用onCompleted)
System.out.println("当前时间:" + new Date() + ";服务端流式数据已经全部响应完毕");
}
});
// 由于是异步调用,需要保持主线程运行以接收流式响应,在实际应用中,可能需要更复杂的线程管理机制
channel.awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
channel.shutdownNow();
}
}
}
从服务端流式 RPC 客户端的代码中,能够看出,客户端只发送了一个请求,但是客户端的响应却是一个流式观察者,也就是上述代码中的new StreamObserver<HelloProto.HelloOneResponse>()匿名内部类的实现。在这个实现中,客户端没收到一次服务端的响应,对应的onNext方法就会被调用一次,当服务端响应完毕,调用了responseObserver.onCompleted()的时候,客户端对应的onCompleted方法也会被调用,同时,客户端要一直运行这,防止服务端响应还没结束客户端就停止了。
同样的,先启动服务端,再启动客户端:


4.4.3 客户端流式 RPC
客户端发送一个消息流给服务器,服务器在收到所有消息后返回一个响应。
语法:
service 服务名称 {
// 定义一个客户端流式 RPC 方法
rpc 具体gRPC方法 (stream关键字 请求消息) returns (响应消息);
}
首先在 API 模块中定义 gRPC 服务,如图:

针对 API 模块,重新执行 Maven 插件使其生成对应的最新的代码:

之后编写服务端:
package cn.yunrain.service;
import cn.yunrain.HelloOneServiceGrpc;
import cn.yunrain.HelloProto;
import io.grpc.stub.StreamObserver;
public class HelloServiceImpl extends HelloOneServiceGrpc.HelloOneServiceImplBase {
// ...其他代码不变
/**
* 客户端流式 RPC 服务端实现
* 该方法处理客户端流式调用场景:客户端发送一系列消息,服务端在客户端完成发送后返回单个响应
*
* @param responseObserver 服务端用于向客户端发送响应的观察者对象
* (注意:在客户端流式RPC中,服务端只发送一个响应)
* @return 返回一个StreamObserver,用于接收和处理客户端发送的流式请求消息
*/
@Override
public StreamObserver<HelloProto.HelloOneRequest> helloClientStream(StreamObserver<HelloProto.HelloOneResponse> responseObserver) {
return new StreamObserver<HelloProto.HelloOneRequest>() {
/**
* 当客户端发送一条流式请求消息时触发
* 在客户端流式RPC中,此方法可能被多次调用(每次客户端发送一个消息时)
*
* @param helloOneRequest 客户端发送的单个请求消息
*/
@Override
public void onNext(HelloProto.HelloOneRequest helloOneRequest) {
System.out.println("接收到client发送的一条消息: " + helloOneRequest.getName());
}
/**
* 当RPC调用过程中发生错误时触发
* 可能是网络问题、序列化错误或其他异常情况
*
* @param throwable 包含错误信息的异常对象
*/
@Override
public void onError(Throwable throwable) {
System.err.println("客户端流式RPC发生错误: " + throwable.getMessage());
}
/**
* 当客户端完成所有流式消息发送时触发
* 这是服务端发送响应的时机点
*/
@Override
public void onCompleted() {
System.out.println("client消息发送完毕,服务端开始响应");
// 构建最终响应消息
// 在实际应用中,这里通常会基于客户端发送的所有消息来构建响应
HelloProto.HelloOneResponse response = HelloProto.HelloOneResponse.newBuilder()
.setResult("Hello client stream response")
.build();
// 通过 responseObserver 向客户端发送单个响应消息
// 注意:在客户端流式RPC中,服务端只发送一个响应
responseObserver.onNext(response);
// 通知客户端:服务端已完成响应处理,RPC调用正式结束
// 这是客户端知道可以结束等待的信号
responseObserver.onCompleted();
}
};
}
}
从客户端流式 RPC 的服务端代码可以看出,从方法签名上,返回值已经不再是void了,而是StreamObserver<请求消息>一个流式观察者请求对象,这是因为服务端要持续监控客户端输入过来的消息,也就是请求流,所以这里的反复返回值为StreamObserver<HelloProto.HelloOneRequest>,同时这里也是一个匿名内部类的实现,因为是客户端流式,所以当客户端调用了onCompleted方法的时候,服务端的onCompleted方法才会被调用。
[!tip]
想必你也注意到了服务端的你们内部类的实现中也有onNext方法了吧,它表示服务端可以持续的响应数据,有点双向流式 RPC 的感觉了。
客户端代码:
package cn.yunrain;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.TimeUnit;
/**
* 客户端流式RPC调用示例
* <p>
* 演示客户端如何发送流式请求到服务端,并在客户端发送完成后接收服务端的单个响应
*/
public class GrpcClientThree {
public static void main(String[] args) {
// 创建与 gRPC 服务器的通信通道
// usePlaintext() 表示使用明文通信(不加密),仅适用于开发和测试环境
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 9000)
.usePlaintext() // 生产环境应使用useTransportSecurity()启用TLS
.build();
try {
// 对于客户端流式 RPC,必须使用异步存根(Stub)
// 因为客户端需要持续发送多个请求,然后等待服务端的单个响应
HelloOneServiceGrpc.HelloOneServiceStub helloOneService = HelloOneServiceGrpc.newStub(channel);
// 创建响应观察者,用于处理服务端返回的响应
// 在客户端流式RPC中,服务端只会在客户端完成所有请求发送后返回一个响应
StreamObserver<HelloProto.HelloOneResponse> responseObserver =
new StreamObserver<HelloProto.HelloOneResponse>() {
/**
* 当服务端发送响应时触发
* 在客户端流式RPC中,此方法只会被调用一次(服务端发送单个响应)
*
* @param helloOneResponse 服务端返回的响应消息
*/
@Override
public void onNext(HelloProto.HelloOneResponse helloOneResponse) {
System.out.println("收到服务端响应: " + helloOneResponse.getResult());
// 此处可以处理服务端的响应数据
}
/**
* 当RPC调用过程中发生错误时触发
*
* @param throwable 包含错误信息的异常对象
*/
@Override
public void onError(Throwable throwable) {
System.err.println("RPC调用发生错误: " + throwable.getMessage());
// 实际项目中应添加错误处理逻辑,如重试机制、错误上报等
}
/**
* 当服务端完成响应发送时触发
* 表示整个RPC调用已正常结束
*/
@Override
public void onCompleted() {
System.out.println("收到服务端的全部响应消息,客户端流式 RPC 调用结束");
// 可以在此处进行资源清理或其他后续处理
}
};
// 调用客户端流式RPC方法,获取请求观察者用于发送流式请求
// 注意:此时RPC调用已经开始,但请求尚未发送
StreamObserver<HelloProto.HelloOneRequest> requestStreamObserver =
helloOneService.helloClientStream(responseObserver);
// 模拟发送多个流式请求消息
for (int i = 0; i < 10; i++) {
// 构建单个请求消息
HelloProto.HelloOneRequest request = HelloProto.HelloOneRequest.newBuilder()
.setName("客户端流式 RPC 请求消息 index : " + i)
.build();
// 通过请求观察者发送请求消息
// 在客户端流式RPC中,可以多次调用onNext发送多个请求
requestStreamObserver.onNext(request);
System.out.println("已发送第 " + (i + 1) + " 条请求消息");
// 每次休眠500ms,模拟真实的流式数据发送场景
// 在实际应用中,这可能是从文件、数据库或其他数据源读取数据的间隔
try {
Thread.sleep(500);
} catch (InterruptedException e) {
System.err.println("请求发送过程被中断");
throw new RuntimeException(e);
}
}
// 重要:告诉服务端客户端的所有请求消息已发送完毕
// 这是触发服务端发送响应的关键信号
// 如果没有调用onCompleted(),服务端会一直等待更多请求
requestStreamObserver.onCompleted();
System.out.println("所有请求消息发送完毕,等待服务端响应...");
// 客户端流式数据发送完毕后,等待服务端处理完成并返回响应
// 由于是异步调用,需要保持主线程运行以接收响应
// awaitTermination会阻塞当前线程,直到通道终止或超时
boolean terminated = channel.awaitTermination(30, TimeUnit.SECONDS);
if (!terminated) {
System.out.println("等待超时,服务端未在指定时间内完成响应");
}
} catch (InterruptedException e) {
System.err.println("客户端等待过程中被中断");
throw new RuntimeException(e);
} finally {
// 确保资源被正确释放,关闭通信通道
// shutdownNow()会立即终止通道,中断所有正在进行的调用
System.out.println("关闭gRPC通信通道");
channel.shutdownNow();
}
}
}
从客户端流式 RPC 的客户端代码可以看出,客户端需要不断的发送请求数据,同时,还得监测服务端的响应。


4.4.4 双向流式 RPC
客户端和服务端都使用一个读写流来发送一系列消息。这两个流是独立的,因此客户端和服务端可以以任意顺序读写。
语法:
service 服务名称 {
// 定义一个双向流式 RPC 方法
rpc 具体gRPC方法 (stream关键字 请求消息) returns (stream关键字 响应消息);
}
首先在 API 模块中定义 gRPC 服务,如图:

针对 API 模块,重新执行 Maven 插件使其生成对应的最新的代码:

之后编写服务端:
package cn.yunrain.service;
import cn.yunrain.HelloOneServiceGrpc;
import cn.yunrain.HelloProto;
import io.grpc.stub.StreamObserver;
public class HelloServiceImpl extends HelloOneServiceGrpc.HelloOneServiceImplBase {
// 其他代码不变...
/**
* 双向流式 RPC 服务端实现
* 该方法处理双向流式调用场景:客户端和服务端都可以独立地发送一系列消息
* 两个数据流相互独立,可以实现真正的全双工通信
*
* @param responseObserver 服务端用于向客户端发送流式响应的观察者对象
* 在双向流式中,服务端可以在任何时候发送响应消息
* @return 返回一个StreamObserver,用于接收和处理客户端发送的流式请求消息
*/
@Override
public StreamObserver<HelloProto.HelloOneRequest> helloBiStream(StreamObserver<HelloProto.HelloOneResponse> responseObserver) {
return new StreamObserver<HelloProto.HelloOneRequest>() {
/**
* 当客户端发送一条流式请求消息时触发
* 在双向流式RPC中,此方法可能被多次调用,每次都可以立即响应
* 注意:双向流式的特点是可以对每个请求立即响应,而不需要等待所有请求完成
*
* @param helloOneRequest 客户端发送的单个请求消息
*/
@Override
public void onNext(HelloProto.HelloOneRequest helloOneRequest) {
System.out.println("接收到client发送的一条消息: " + helloOneRequest.getName());
// 立即构建并发送响应,实现请求-响应的实时交互
// 在双向流式中,服务端可以在收到每个请求后立即响应
// 也可以选择累积多个请求后再批量响应,具体取决于业务需求
HelloProto.HelloOneResponse response = HelloProto.HelloOneResponse.newBuilder()
.setResult("Hello client stream response " + helloOneRequest.getName())
.build();
// 通过响应观察者实时发送响应消息
// 在双向流式中,服务端可以多次调用onNext发送多个响应
responseObserver.onNext(response);
}
/**
* 当RPC调用过程中发生错误时触发
* 可能是网络问题、序列化错误或其他异常情况
*
* @param throwable 包含错误信息的异常对象
*/
@Override
public void onError(Throwable throwable) {
System.err.println("双向流式RPC发生错误: " + throwable.getMessage());
}
/**
* 当客户端完成所有流式消息发送时触发
* 表示客户端已经发送完所有请求,但服务端可能还在处理或发送响应
* 注意:在双向流式中,客户端调用onCompleted()只表示请求流结束,
* 服务端响应流可以继续发送数据,直到服务端调用responseObserver.onCompleted()
*/
@Override
public void onCompleted() {
System.out.println("client消息发送完毕,服务端开始响应");
// 构建最终响应消息
// 在双向流式中,除了实时响应外,还可以在客户端请求完成后发送一个总结性响应
HelloProto.HelloOneResponse response = HelloProto.HelloOneResponse.newBuilder()
.setResult("Hello client stream response")
.build();
// 发送最终响应
responseObserver.onNext(response);
// 重要:通知客户端服务端的响应流已结束
// 这是双向流式RPC正式结束的信号
responseObserver.onCompleted();
}
};
}
}
客户端代码:
package cn.yunrain;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.TimeUnit;
/**
* 双向流式RPC客户端示例
*
* 演示客户端如何与服务端建立双向流式通信:
* - 客户端可以持续发送多个请求
* - 服务端可以实时响应每个请求
* - 两个数据流完全独立,实现全双工通信
*/
public class GrpcClientFour {
public static void main(String[] args) {
// 创建与gRPC服务器的通信通道
// usePlaintext()表示使用明文传输,仅适用于开发测试环境
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 9000)
.usePlaintext() // 生产环境应使用TLS加密
.build();
try {
// 创建异步存根,双向流式RPC必须使用异步方式
HelloOneServiceGrpc.HelloOneServiceStub helloOneService =
HelloOneServiceGrpc.newStub(channel);
// 创建响应观察者,用于实时处理服务端返回的流式响应
StreamObserver<HelloProto.HelloOneResponse> responseObserver =
new StreamObserver<HelloProto.HelloOneResponse>() {
/**
* 当服务端发送流式响应时触发
* 在双向流式RPC中,此方法可能被多次调用,服务端可以实时响应每个请求
*
* @param helloOneResponse 服务端返回的单个响应消息
*/
@Override
public void onNext(HelloProto.HelloOneResponse helloOneResponse) {
System.out.println("实时收到服务端响应: " + helloOneResponse.getResult());
// 此处可以实时处理服务端的响应数据
// 在双向流式中,响应可能是对特定请求的回复,也可能是服务端主动推送的消息
}
/**
* 当RPC调用过程中发生错误时触发
*
* @param throwable 包含错误信息的异常对象
*/
@Override
public void onError(Throwable throwable) {
System.err.println("双向流式RPC发生错误: " + throwable.getMessage());
// 实际项目中应添加错误处理逻辑,如重试、告警等
}
/**
* 当服务端完成所有响应发送时触发
* 表示服务端的响应流已结束,整个RPC调用正式完成
*/
@Override
public void onCompleted() {
System.out.println("服务端已完成所有响应发送,双向流式RPC调用结束");
// 可以在此处进行资源清理、统计汇总等后续处理
}
};
// 调用双向流式RPC方法,获取请求观察者用于发送流式请求
// 注意:此时双向通信通道已建立,客户端和服务端都可以开始发送消息
StreamObserver<HelloProto.HelloOneRequest> requestStreamObserver =
helloOneService.helloBiStream(responseObserver);
// 模拟发送多个流式请求消息
// 在双向流式中,客户端可以在任何时候发送请求,服务端也可以实时响应
for (int i = 0; i < 10; i++) {
// 构建单个请求消息
HelloProto.HelloOneRequest request = HelloProto.HelloOneRequest.newBuilder()
.setName("Client message " + i)
.build();
// 发送请求消息
// 在双向流式中,客户端发送请求后可以立即收到服务端的响应
requestStreamObserver.onNext(request);
System.out.println("已发送第 " + (i + 1) + " 条请求消息");
// 休眠500ms,模拟真实的流式数据发送间隔
// 在实际应用中,这可能是用户输入、传感器数据读取等场景
try {
Thread.sleep(500);
} catch (InterruptedException e) {
System.err.println("请求发送过程被中断");
throw new RuntimeException(e);
}
}
// 重要:告诉服务端客户端的所有请求消息已发送完毕
// 注意:这并不代表RPC调用结束,只是客户端的请求流结束了
// 服务端可能还在处理并发送响应,直到服务端调用responseObserver.onCompleted()
requestStreamObserver.onCompleted();
System.out.println("所有请求消息发送完毕,等待服务端完成响应流...");
// 等待服务端完成响应流的发送
// 由于是双向流式,服务端可能在客户端请求结束后继续发送响应
boolean terminated = channel.awaitTermination(30, TimeUnit.SECONDS);
if (!terminated) {
System.out.println("等待超时,服务端未在指定时间内完成响应流");
} else {
System.out.println("双向流式RPC调用正常结束");
}
} catch (InterruptedException e) {
System.err.println("客户端等待过程中被中断");
throw new RuntimeException(e);
} finally {
// 确保资源被正确释放,优雅关闭通信通道
// shutdown()会等待现有调用完成,比shutdownNow()更温和
channel.shutdown();
}
}
}


Comments