在 Java 中使用 gRPC 进行流式(stream)传输

在 gRPC 中使用流, 流是一项强大的功能,允许客户端和服务器通过单个连接发送多个消息。 此外,消息的接收顺序与发送顺序相同,但任一方都可以以任何顺序读取或写入消息。

用 Apifox,节省研发团队的每一分钟

在 Java 中使用 gRPC 进行流式(stream)传输

免费使用 Apifox

相关推荐

最新文章

API

一体化协作平台

API 设计

API 文档

API 调试

自动化测试

API Mock

API Hub

立即体验 Apifox
目录
原文链接:https://www.baeldung.com/java-grpc-streaming


gRPC 是一个进行进程间远程过程调用(RPC)的平台。它遵循客户端-服务器模型,性能高效,并支持最重要的计算机语言。在本教程中,我们将重点关注 gRPC 流,流(stream)允许服务器和客户端之间多路复用消息,创建非常高效和灵活的进程间通信

在 Java 中使用 gRPC 进行流式(stream)传输

gRPC 流基础


gRPC 使用 HTTP/2 网络协议进行服务间通信。 HTTP/2 的一个关键优势是它支持流, 每个流都可以在单个连接上复用多个双向消息。


在 gRPC 中,我们可以具有三种功能调用类型的流:

  1. 服务器流 RPC:客户端向服务器发送单个请求,并获取回几条它顺序读取的消息。
  2. 客户端流 RPC:客户端向服务器发送一系列消息。客户端等待服务器处理消息并读取返回的响应。
  3. 双向流 RPC:客户端和服务器可以双向发送多条消息。消息的接收顺序与发送顺序相同。但是,服务器或客户端可以选择回复接收到的消息的顺序。
    为了演示如何使用这些过程调用,我们将编写一个简单的客户端-服务器应用程序示例,用于交换有关证券的信息。

服务定义

我们使用 stock_quote.proto 来定义服务接口和有效负载消息的结构:

service StockQuoteProvider {
  rpc serverSideStreamingGetListStockQuotes(Stock) returns (stream StockQuote) {}
  rpc clientSideStreamingGetStatisticsOfStocks(stream Stock) returns (StockQuote) {}
  rpc bidirectionalStreamingGetListsStockQuotes(stream Stock) returns (stream StockQuote) {}
}

message Stock {
  string ticker_symbol = 1;
  string company_name = 2;
  string description = 3;  
}

message StockQuote {
  double price = 1;
  int32 offer_number = 2;
  string description = 3;
}


StockQuoteProvider 服务有三种方法类型,支持消息流。 在下一节中,我们将介绍它们的实现。


我们从服务的方法签名中看到,客户端通过发送 Stock 消息向服务器查询。服务器使用 StockQuote 消息将响应发送回来。


我们使用在 pom.xml 文件中定义的 protobuf-maven-pluginstock-quote.proto IDL 文件生成 Java 代码。


该插件会在 target/generated-sources/protobuf/java/grpc-java 目录中为客户端存根和服务器端代码生成代码。

服务器实现

StockServer 构造函数使用 gRPC Server 来监听和分派传入的请求:

public class StockServer {

  private int port;  
  private io.grpc.Server server;

  public StockServer(int port) throws IOException {
    this.port = port;
    server = ServerBuilder.forPort(port) 
      .addService(new StockService())
      .build(); 
  }

  //...
}

我们将 StockService 添加到 io.grpc.Server。StockService 扩展了从我们的 proto 文件生成的 StockQuoteProviderImplBase。因此,StockQuoteProviderImplBase 具有三个流服务方法的存根


StockService 需要重写这些存根方法来实现我们服务的实际实现


接下来,我们将看到这三种流案例是如何实现的。

服务器端流

客户端发送一个报价请求,并获得回多个响应,每个响应都有该商品的不同价格:

@Override
public void serverSideStreamingGetListStockQuotes(Stock request, 
  StreamObserver<StockQuote> responseObserver) {

    for (int i = 1; i <= 5; i++) {
        StockQuote stockQuote = StockQuote.newBuilder()
            .setPrice(fetchStockPriceBid(request)) 
            .setOfferNumber(i)
            .setDescription("Price for stock:" + request.getTickerSymbol())
            .build();
        responseObserver.onNext(stockQuote);
    }
    responseObserver.onCompleted();
}

该方法创建一个 StockQuote,获取价格,并标记报价编号。对于每个报价,它通过调用 responseObserver::onNext 向客户端发送一条消息。它使用 reponseObserver::onCompleted 表示 RPC 已完成。

客户端流

客户端发送多个股票,服务器返回一个 StockQuote 与一些统计数据:

@Override
public StreamObserver<Stock> clientSideStreamingGetStatisticsOfStocks(
  StreamObserver<StockQuote> responseObserver) {

  return new StreamObserver<Stock>() {
    int count;
    double price = 0.0;
    StringBuffer sb = new StringBuffer();

    @Override
    public void onNext(Stock stock) {
      count++;
      price = +fetchStockPriceBid(stock);  
      sb.append(":").append(stock.getTickerSymbol());
    }

    @Override 
    public void onCompleted() {
      responseObserver.onNext(StockQuote.newBuilder()
          .setPrice(price / count)
          .setDescription("Statistics-" + sb.toString())
          .build());
      responseObserver.onCompleted(); 
    }

    // handle onError() ...
  };
}


该方法获取一个 StreamObserver<StockQuote> 作为参数来响应客户端。它返回一个 StreamObserver<Stock>,其中它处理客户端请求消息。


返回的 StreamObserver<Stock> 重写 onNext() 来在客户端发送请求时得到通知。
当客户端完成发送所有消息时,会调用 StreamObserver<Stock>.onCompleted() 方法。

根据我们收到的所有 Stock 消息,我们找出获取股票价格的平均值,创建一个 StockQuote,并调用 responseObserver::onNext 将结果交付给客户端。


最后,我们重写 StreamObserver<Stock>.onError() 来处理异常终止。

双向流客户端

发送多个股票,服务器为每个请求返回一组价格:

@Override
public StreamObserver<Stock> bidirectionalStreamingGetListsStockQuotes(
  StreamObserver<StockQuote> responseObserver) {

  return new StreamObserver<Stock>() {

    @Override
    public void onNext(Stock request) {
      for (int i = 1; i <= 5; i++) {
        StockQuote stockQuote = StockQuote.newBuilder()
            .setPrice(fetchStockPriceBid(request))
            .setOfferNumber(i) 
            .setDescription("Price for stock:" + request.getTickerSymbol())
            .build();
        responseObserver.onNext(stockQuote); 
      }
    }

    @Override
    public void onCompleted() {
      responseObserver.onCompleted();
    }

    //handle OnError() ...
  }; 
}


我们与前一个示例具有相同的方法签名。改变的是实现:我们不等待客户端发送所有消息才响应。


在这种情况下,我们在收到每个传入消息后立即调用 responseObserver::onNext,并且与收到的顺序相同。  


重要的是要注意,我们可以轻松更改响应的顺序(如果需要的话)。

客户端实现

StockClient 的构造函数获取一个 gRPC 通道并实例化由 gRPC Maven 插件生成的存根类:

public class StockClient {

  private StockQuoteProviderBlockingStub blockingStub;
  private StockQuoteProviderStub nonBlockingStub;

  public StockClient(Channel channel) {
    blockingStub = StockQuoteProviderGrpc.newBlockingStub(channel);
    nonBlockingStub = StockQuoteProviderGrpc.newStub(channel);
  }

  // ...
}


StockQuoteProviderBlockingStub 和 StockQuoteProviderStub 支持发出同步和异步客户端方法请求


接下来,我们将看到这三个流 RPC 的客户端实现。

具有服务器端流的客户端 RPC

客户端发出一个请求股票价格的调用到服务器,并获得一系列报价回来:

public void serverSideStreamingListOfStockPrices() {

  Stock request = Stock.newBuilder()
      .setTickerSymbol("AU")
      .setCompanyName("Austich")
      .setDescription("server streaming example")  
      .build();

  Iterator<StockQuote> stockQuotes;
  try {
    logInfo("REQUEST - ticker symbol {0}", request.getTickerSymbol());

    stockQuotes = blockingStub.serverSideStreamingGetListStockQuotes(request);
    
    for (int i = 1; stockQuotes.hasNext(); i++) {
      StockQuote stockQuote = stockQuotes.next();
      logInfo("RESPONSE - Price #" + i + ": {0}", stockQuote.getPrice());
    }
  } catch (StatusRuntimeException e) {
    logInfo("RPC failed: {0}", e.getStatus());
  }
}

我们使用 blockingStub::serverSideStreamingGetListStockQuotes 发出同步请求。我们使用 Iterator 获取 StockQuotes 的列表。

具有客户端流的客户端 RPC

客户端向服务器发送一组 Stock,并获得一个带有一些统计数据的 StockQuote 回来:

public void clientSideStreamingGetStatisticsOfStocks() 
  throws InterruptedException {

  StreamObserver<StockQuote> responseObserver = 
    new StreamObserver<StockQuote>() {

    @Override
    public void onNext(StockQuote summary) {
      logInfo("RESPONSE, got stock statistics - Average Price: {0}, description: {1}",
        summary.getPrice(), summary.getDescription());
    }

    @Override
    public void onCompleted() {
      logInfo("Finished clientSideStreamingGetStatisticsOfStocks"); 
    }

    // Override OnError ...
  };

  StreamObserver<Stock> requestObserver = 
    nonBlockingStub.clientSideStreamingGetStatisticsOfStocks(responseObserver);

  try {
    for (Stock stock : stocks) {
      logInfo("REQUEST: {0}, {1}", 
        stock.getTickerSymbol(), stock.getCompanyName());
      requestObserver.onNext(stock);
    }
  } catch (RuntimeException e) {
    requestObserver.onError(e);
    throw e;
  }

  requestObserver.onCompleted();  
}


与服务器示例一样,我们使用 StreamObservers 来发送和接收消息。  


requestObserver 使用非阻塞存根向服务器发送 Stock 列表。


使用 responseObserver,我们获得带有一些统计信息的 StockQuote。

具有双向流的客户端 RPC

客户端发送一组 Stock,并为每个 Stock 获取回一组价格:

public void bidirectionalStreamingGetListsStockQuotes() 
  throws InterruptedException{

  StreamObserver<StockQuote> responseObserver = 
    new StreamObserver<StockQuote>() {

    @Override
    public void onNext(StockQuote stockQuote) {
      logInfo("RESPONSE price#{0} : {1}, description:{2}",
        stockQuote.getOfferNumber(), 
        stockQuote.getPrice(), 
        stockQuote.getDescription());
    }

    @Override
    public void onCompleted() {
      logInfo("Finished bidirectionalStreamingGetListsStockQuotes");
    }

    //Override onError() ...
  };

  StreamObserver<Stock> requestObserver = 
    nonBlockingStub.bidirectionalStreamingGetListsStockQuotes(responseObserver);

  try {
    for (Stock stock : stocks) {
      logInfo("REQUEST: {0}, {1}", 
        stock.getTickerSymbol(), stock.getCompanyName());
      requestObserver.onNext(stock);
      Thread.sleep(200); 
    }
  } catch (RuntimeException e) {
    requestObserver.onError(e);
    throw e;
  }  

  requestObserver.onCompleted();
} 


实现与客户端流情况非常相似。 我们使用 requestObserver 发送 Stock — 唯一的区别是现在我们使用 responseObserver 获取多个响应。 响应与请求解耦 —— 它们可以以任何顺序到达。

运行服务器和客户端

在使用 Maven 编译代码后,我们只需要打开两个命令窗口。
要运行服务器:

mvn exec:java -Dexec.mainClass=com.baeldung.grpc.streaming.StockServer


要运行客户端:

mvn exec:java -Dexec.mainClass=com.baeldung.grpc.streaming.StockClient  


虽然我们用 Java 搭建了一个简单的 gRPC 服务,并创建了相应的客户端,但在实际项目中,很多开发者可能会面临需要在不同语言、不同团队之间调试 gRPC 接口的情况。在这方面,我们可以借助接口调试与管理工具来简化这一流程。

使用 Apifox 调试 gRPC

目前,市面上能够兼容 gRPC 接口的调试与管理工具相对有限,但值得注意的是,Apifox 作为业界领先的接口管理工具,已经支持 gRPC 接口的调试和管理功能,这一功能的推出使得在微服务架构中广泛应用的 gRPC 变得更加便捷。Apifox 提供全面的兼容性,涵盖 gRPC 的四种调用类型:

  • Unary:一元调用
  • Server Streaming:服务端流
  • Client Streming:客户端流
  • Bidirectional Streaming:双向流
    下文将通过一个示例场景简要演示如何在 Apifox 中新建 gRPC 项目并针对接口发起调试。

步骤1:新建 gRPC 项目

Apifox 中登录并新建一个 gRPC 项目,点击“新建项目”按钮,选择 gRPC 类型,填写项目名称后轻点“新建”按钮。

Apifox 新建 gRPC 项目

步骤2:导入.proto文件

导入定义 gRPC 接口所使用的服务、方法和消息的 .proto 文件。你可以将文件拖拽至其中或使用文件在线 URL 完成导入。

Apifox 导入.proto文件

步骤3:调试 gRPC

文件导入后,Apifox 将基于 .proto 文件内容生成对应的接口信息,然后就可以进行调试。

Apifox 调试 gRPC

通过这些简单的步骤,你可以在 Apifox 中方便地管理和调试你的 gRPC 项目。这个功能非常强大,更具体的你可以访问 Apifox 的 gRPC 帮助文档,赶快去试试吧!

Apifox 接口调试工具

总结

在本文中,我们已经看到了如何在 gRPC 中使用流。 流是一项强大的功能,允许客户端和服务器通过单个连接发送多个消息。 此外,消息的接收顺序与发送顺序相同,但任一方都可以以任何顺序读取或写入消息


示例的源代码可以在 GitHub 上找到

知识拓展: