原文链接:https://www.baeldung.com/java-grpc-streaming
gRPC 是一个进行进程间远程过程调用(RPC)的平台。它遵循客户端-服务器模型,性能高效,并支持最重要的计算机语言。在本教程中,我们将重点关注 gRPC 流,流(stream)允许服务器和客户端之间多路复用消息,创建非常高效和灵活的进程间通信。
gRPC 流基础
gRPC 使用 HTTP/2 网络协议进行服务间通信。 HTTP/2 的一个关键优势是它支持流, 每个流都可以在单个连接上复用多个双向消息。
在 gRPC 中,我们可以具有三种功能调用类型的流:
- 服务器流 RPC:客户端向服务器发送单个请求,并获取回几条它顺序读取的消息。
- 客户端流 RPC:客户端向服务器发送一系列消息。客户端等待服务器处理消息并读取返回的响应。
- 双向流 RPC:客户端和服务器可以双向发送多条消息。消息的接收顺序与发送顺序相同。但是,服务器或客户端可以选择回复接收到的消息的顺序。
为了演示如何使用这些过程调用,我们将编写一个简单的客户端-服务器应用程序示例,用于交换有关证券的信息。
服务定义
我们使用 stock_quote.proto
来定义服务接口和有效负载消息的结构:
service StockQuoteProvider {
rpc serverSideStreamingGetListStockQuotes(Stock) returns (stream StockQuote) {}
rpc clientSideStreamingGetStatisticsOfStocks(stream Stock) returns (StockQuote) {}
rpc bidirectionalStreamingGetListsStockQuotes(stream Stock) returns (stream StockQuote) {}
}
message Stock {
string ticker_symbol = 1;
string company_name = 2;
string description = 3;
}
message StockQuote {
double price = 1;
int32 offer_number = 2;
string description = 3;
}
StockQuoteProvider 服务有三种方法类型,支持消息流。 在下一节中,我们将介绍它们的实现。
我们从服务的方法签名中看到,客户端通过发送 Stock 消息向服务器查询。服务器使用 StockQuote 消息将响应发送回来。
我们使用在 pom.xml 文件中定义的 protobuf-maven-plugin
从 stock-quote.proto IDL
文件生成 Java 代码。
该插件会在 target/generated-sources/protobuf/java
和 /grpc-java
目录中为客户端存根和服务器端代码生成代码。
服务器实现
StockServer 构造函数使用 gRPC Server 来监听和分派传入的请求:
public class StockServer {
private int port;
private io.grpc.Server server;
public StockServer(int port) throws IOException {
this.port = port;
server = ServerBuilder.forPort(port)
.addService(new StockService())
.build();
}
//...
}
我们将 StockService 添加到 io.grpc.Server
。StockService 扩展了从我们的 proto 文件生成的 StockQuoteProviderImplBase。因此,StockQuoteProviderImplBase 具有三个流服务方法的存根。
StockService 需要重写这些存根方法来实现我们服务的实际实现。
接下来,我们将看到这三种流案例是如何实现的。
服务器端流
客户端发送一个报价请求,并获得回多个响应,每个响应都有该商品的不同价格:
@Override
public void serverSideStreamingGetListStockQuotes(Stock request,
StreamObserver<StockQuote> responseObserver) {
for (int i = 1; i <= 5; i++) {
StockQuote stockQuote = StockQuote.newBuilder()
.setPrice(fetchStockPriceBid(request))
.setOfferNumber(i)
.setDescription("Price for stock:" + request.getTickerSymbol())
.build();
responseObserver.onNext(stockQuote);
}
responseObserver.onCompleted();
}
该方法创建一个 StockQuote,获取价格,并标记报价编号。对于每个报价,它通过调用 responseObserver::onNext
向客户端发送一条消息。它使用 reponseObserver::onCompleted
表示 RPC 已完成。
客户端流
客户端发送多个股票,服务器返回一个 StockQuote 与一些统计数据:
@Override
public StreamObserver<Stock> clientSideStreamingGetStatisticsOfStocks(
StreamObserver<StockQuote> responseObserver) {
return new StreamObserver<Stock>() {
int count;
double price = 0.0;
StringBuffer sb = new StringBuffer();
@Override
public void onNext(Stock stock) {
count++;
price = +fetchStockPriceBid(stock);
sb.append(":").append(stock.getTickerSymbol());
}
@Override
public void onCompleted() {
responseObserver.onNext(StockQuote.newBuilder()
.setPrice(price / count)
.setDescription("Statistics-" + sb.toString())
.build());
responseObserver.onCompleted();
}
// handle onError() ...
};
}
该方法获取一个 StreamObserver<StockQuote>
作为参数来响应客户端。它返回一个 StreamObserver<Stock>
,其中它处理客户端请求消息。
返回的 StreamObserver<Stock>
重写 onNext()
来在客户端发送请求时得到通知。
当客户端完成发送所有消息时,会调用 StreamObserver<Stock>.onCompleted()
方法。
根据我们收到的所有 Stock 消息,我们找出获取股票价格的平均值,创建一个 StockQuote,并调用 responseObserver::onNext
将结果交付给客户端。
最后,我们重写 StreamObserver<Stock>.onError()
来处理异常终止。
双向流客户端
发送多个股票,服务器为每个请求返回一组价格:
@Override
public StreamObserver<Stock> bidirectionalStreamingGetListsStockQuotes(
StreamObserver<StockQuote> responseObserver) {
return new StreamObserver<Stock>() {
@Override
public void onNext(Stock request) {
for (int i = 1; i <= 5; i++) {
StockQuote stockQuote = StockQuote.newBuilder()
.setPrice(fetchStockPriceBid(request))
.setOfferNumber(i)
.setDescription("Price for stock:" + request.getTickerSymbol())
.build();
responseObserver.onNext(stockQuote);
}
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
//handle OnError() ...
};
}
我们与前一个示例具有相同的方法签名。改变的是实现:我们不等待客户端发送所有消息才响应。
在这种情况下,我们在收到每个传入消息后立即调用 responseObserver::onNext
,并且与收到的顺序相同。
重要的是要注意,我们可以轻松更改响应的顺序(如果需要的话)。
客户端实现
StockClient 的构造函数获取一个 gRPC 通道并实例化由 gRPC Maven 插件生成的存根类:
public class StockClient {
private StockQuoteProviderBlockingStub blockingStub;
private StockQuoteProviderStub nonBlockingStub;
public StockClient(Channel channel) {
blockingStub = StockQuoteProviderGrpc.newBlockingStub(channel);
nonBlockingStub = StockQuoteProviderGrpc.newStub(channel);
}
// ...
}
StockQuoteProviderBlockingStub 和 StockQuoteProviderStub 支持发出同步和异步客户端方法请求。
接下来,我们将看到这三个流 RPC 的客户端实现。
具有服务器端流的客户端 RPC
客户端发出一个请求股票价格的调用到服务器,并获得一系列报价回来:
public void serverSideStreamingListOfStockPrices() {
Stock request = Stock.newBuilder()
.setTickerSymbol("AU")
.setCompanyName("Austich")
.setDescription("server streaming example")
.build();
Iterator<StockQuote> stockQuotes;
try {
logInfo("REQUEST - ticker symbol {0}", request.getTickerSymbol());
stockQuotes = blockingStub.serverSideStreamingGetListStockQuotes(request);
for (int i = 1; stockQuotes.hasNext(); i++) {
StockQuote stockQuote = stockQuotes.next();
logInfo("RESPONSE - Price #" + i + ": {0}", stockQuote.getPrice());
}
} catch (StatusRuntimeException e) {
logInfo("RPC failed: {0}", e.getStatus());
}
}
我们使用 blockingStub::serverSideStreamingGetListStockQuotes
发出同步请求。我们使用 Iterator 获取 StockQuotes 的列表。
具有客户端流的客户端 RPC
客户端向服务器发送一组 Stock,并获得一个带有一些统计数据的 StockQuote 回来:
public void clientSideStreamingGetStatisticsOfStocks()
throws InterruptedException {
StreamObserver<StockQuote> responseObserver =
new StreamObserver<StockQuote>() {
@Override
public void onNext(StockQuote summary) {
logInfo("RESPONSE, got stock statistics - Average Price: {0}, description: {1}",
summary.getPrice(), summary.getDescription());
}
@Override
public void onCompleted() {
logInfo("Finished clientSideStreamingGetStatisticsOfStocks");
}
// Override OnError ...
};
StreamObserver<Stock> requestObserver =
nonBlockingStub.clientSideStreamingGetStatisticsOfStocks(responseObserver);
try {
for (Stock stock : stocks) {
logInfo("REQUEST: {0}, {1}",
stock.getTickerSymbol(), stock.getCompanyName());
requestObserver.onNext(stock);
}
} catch (RuntimeException e) {
requestObserver.onError(e);
throw e;
}
requestObserver.onCompleted();
}
与服务器示例一样,我们使用 StreamObservers 来发送和接收消息。
requestObserver 使用非阻塞存根向服务器发送 Stock 列表。
使用 responseObserver,我们获得带有一些统计信息的 StockQuote。
具有双向流的客户端 RPC
客户端发送一组 Stock,并为每个 Stock 获取回一组价格:
public void bidirectionalStreamingGetListsStockQuotes()
throws InterruptedException{
StreamObserver<StockQuote> responseObserver =
new StreamObserver<StockQuote>() {
@Override
public void onNext(StockQuote stockQuote) {
logInfo("RESPONSE price#{0} : {1}, description:{2}",
stockQuote.getOfferNumber(),
stockQuote.getPrice(),
stockQuote.getDescription());
}
@Override
public void onCompleted() {
logInfo("Finished bidirectionalStreamingGetListsStockQuotes");
}
//Override onError() ...
};
StreamObserver<Stock> requestObserver =
nonBlockingStub.bidirectionalStreamingGetListsStockQuotes(responseObserver);
try {
for (Stock stock : stocks) {
logInfo("REQUEST: {0}, {1}",
stock.getTickerSymbol(), stock.getCompanyName());
requestObserver.onNext(stock);
Thread.sleep(200);
}
} catch (RuntimeException e) {
requestObserver.onError(e);
throw e;
}
requestObserver.onCompleted();
}
实现与客户端流情况非常相似。 我们使用 requestObserver 发送 Stock — 唯一的区别是现在我们使用 responseObserver 获取多个响应。 响应与请求解耦 —— 它们可以以任何顺序到达。
运行服务器和客户端
在使用 Maven 编译代码后,我们只需要打开两个命令窗口。
要运行服务器:
mvn exec:java -Dexec.mainClass=com.baeldung.grpc.streaming.StockServer
要运行客户端:
mvn exec:java -Dexec.mainClass=com.baeldung.grpc.streaming.StockClient
虽然我们用 Java 搭建了一个简单的 gRPC 服务,并创建了相应的客户端,但在实际项目中,很多开发者可能会面临需要在不同语言、不同团队之间调试 gRPC 接口的情况。在这方面,我们可以借助接口调试与管理工具来简化这一流程。
使用 Apifox 调试 gRPC
目前,市面上能够兼容 gRPC 接口的调试与管理工具相对有限,但值得注意的是,Apifox 作为业界领先的接口管理工具,已经支持 gRPC 接口的调试和管理功能,这一功能的推出使得在微服务架构中广泛应用的 gRPC 变得更加便捷。Apifox 提供全面的兼容性,涵盖 gRPC 的四种调用类型:
- Unary:一元调用
- Server Streaming:服务端流
- Client Streming:客户端流
- Bidirectional Streaming:双向流
下文将通过一个示例场景简要演示如何在 Apifox 中新建 gRPC 项目并针对接口发起调试。
步骤1:新建 gRPC 项目
在 Apifox 中登录并新建一个 gRPC 项目,点击“新建项目”按钮,选择 gRPC 类型,填写项目名称后轻点“新建”按钮。
步骤2:导入.proto
文件
导入定义 gRPC 接口所使用的服务、方法和消息的 .proto
文件。你可以将文件拖拽至其中或使用文件在线 URL 完成导入。
步骤3:调试 gRPC
文件导入后,Apifox 将基于 .proto
文件内容生成对应的接口信息,然后就可以进行调试。
通过这些简单的步骤,你可以在 Apifox 中方便地管理和调试你的 gRPC 项目。这个功能非常强大,更具体的你可以访问 Apifox 的 gRPC 帮助文档,赶快去试试吧!
总结
在本文中,我们已经看到了如何在 gRPC 中使用流。 流是一项强大的功能,允许客户端和服务器通过单个连接发送多个消息。 此外,消息的接收顺序与发送顺序相同,但任一方都可以以任何顺序读取或写入消息。
示例的源代码可以在 GitHub 上找到。
知识拓展: