这是用户在 2024-3-22 9:40 为 https://projectreactor.io/docs/core/release/reference/ 保存的双语快照页面,由 沉浸式翻译 提供双语支持。了解如何保存?


1. 关于文档


本节提供 Reactor 参考文档的简要概述。您不需要以线性方式阅读本指南。每件作品都是独立的,尽管它们经常引用其他作品。


Reactor 参考指南以 HTML 文档形式提供。最新副本可在 https://projectreactor.io/docs/core/release/reference/index.html 获取


您可以制作本文档的副本供您自己使用或分发给其他人,前提是您不收取此类副本的任何费用,并且每份副本都包含本版权声明,无论是以印刷版还是电子版形式分发。


1.2.为文档做出贡献


该参考指南是用 Asciidoc 编写的,您可以在 https://github.com/reactor/reactor-core/tree/main/docs/asciidoc 找到其来源。


如果您有改进或建议,我们将很高兴收到您的拉取请求!


我们建议您查看存储库的本地副本,以便您可以通过运行 asciidoctor gradle 任务并检查渲染来生成文档。有些部分依赖于包含的文件,因此 GitHub 渲染并不总是完整的。


为了方便文档编辑,大多数部分末尾都有一个链接,可直接在 GitHub 上打开该部分主要源文件的编辑 UI。这些链接仅出现在本参考指南的 HTML5 版本中。它们如下所示:建议编辑关于文档。

 1.3.寻求帮助


您可以通过多种方式使用 Reactor 寻求帮助:


  • 与 Gitter 社区联系。


  • 在 stackoverflow.com 上提问,网址为 project-reactor


  • 在 Github 问题中报告错误。我们密切监视以下存储库:reactor-core(涵盖基本功能)和reactor-addons(涵盖reactor-test和适配器问题)。


Reactor 的所有内容都是开源的,包括本文档。如果您发现文档存在问题或者想要改进它们,请参与其中。


1.4.从这往哪儿走


  • 如果您想直接跳入代码,请前往入门。


  • 不过,如果您是响应式编程的新手,您可能应该从响应式编程简介开始。


  • 如果您熟悉 Reactor 概念并且只是在寻找适合该工作的工具,但无法想到相关的操作员,请尝试我需要哪个操作员?附录。


  • 为了更深入地了解 Reactor 的核心功能,请前往 Reactor 核心功能来了解:


    • 有关 Reactor 反应类型的更多信息,请参阅 Flux (0-N 项的异步序列)和 Mono (异步 0-1 结果)部分。


    • 如何使用调度程序切换线程上下文。


    • 如何处理错误请参见处理错误部分。


  • 单元测试?是的, reactor-test 项目是可能的!请参阅测试。


  • 以编程方式创建序列提供了一种更高级的创建反应源的方法。


  • 其他高级主题包含在高级功能和概念中。


建议编辑“关于文档”

 2. 入门


本节包含可帮助您开始使用 Reactor 的信息。它包括以下部分:


2.1.介绍反应堆


Reactor 是 JVM 的完全非阻塞反应式编程基础,具有高效的需求管理(以管理“背压”的形式)。它直接与 Java 8 功能 API 集成,特别是 CompletableFutureStreamDuration 。它提供可组合的异步序列 API — Flux (用于 [N] 个元素)和 Mono (用于 [0|1] 个元素) — 并广泛实现了 Reactive Streams 规范。


Reactor 还支持与 reactor-netty 项目的非阻塞进程间通信。 Reactor Netty 适合微服务架构,为 HTTP(包括 Websockets)、TCP 和 UDP 提供背压就绪的网络引擎。完全支持反应式编码和解码。

 2.2.先决条件


Reactor Core 在 Java 8 及更高版本上运行。


它对 org.reactivestreams:reactive-streams:1.0.3 具有传递依赖性。

 安卓支持

  • Reactor 3 并未正式支持或针对 Android(如果强烈需要此类支持,请考虑使用 RxJava 2)。


  • 但是,它应该可以在 Android SDK 26 (Android O) 及更高版本上正常工作。


  • 当启用脱糖功能时,它可能会在 Android SDK 21 (Android 5.0) 及更高版本上正常工作。请参阅 https://developer.android.com/studio/write/java8-support#library-desugaring


  • 我们愿意尽最大努力评估有利于 Android 支持的更改。但是,我们无法做出保证。每个决定都必须根据具体情况做出。


2.3.了解 BOM 和版本控制方案


Reactor 3 使用 BOM(物料清单)模型(从 reactor-core 3.0.4 开始,以及 Aluminium 发布系列)。该精选列表对旨在协同工作的工件进行了分组,尽管这些工件中可能存在不同的版本控制方案,但仍提供相关版本。


请注意,版本控制方案在 3.3.x 和 3.4.x(镝和铕)之间发生了变化。


工件遵循 MAJOR.MINOR.PATCH-QUALIFIER 版本控制方案,而 BOM 使用受 CalVer 启发的 YYYY.MINOR.PATCH-QUALIFIER 方案进行版本控制,其中:


  • MAJOR 是当前一代的 Reactor,每一代都可以给项目结构带来根本性的变化(这可能意味着更重要的迁移工作)


  • YYYY 是给定发布周期中第一个 GA 版本的年份(例如 3.4.x 的 3.4.0)


  • .MINOR 是一个从 0 开始的数字,随着每个新的发布周期递增


    • 就项目而言,它通常反映了更广泛的变化,并且可以表明适度的迁移努力


    • 就 BOM 而言,它允许区分发布周期,以防两个发布周期在同一年首次发布


  • .PATCH 是一个从 0 开始的数字,随着每个服务版本的增加而递增


  • -QUALIFIER 是一个文本限定符,在 GA 版本中被省略(见下文)


因此,遵循该约定的第一个发布周期是 2020.0.x ,代号 Europium 。该方案按顺序使用以下限定符(注意使用破折号分隔符):


  • -M1 .. -M9 :里程碑(我们预计每个服务版本不会超过 9 个)


  • -RC1 .. -RC9 :候选版本(我们预计每个服务版本不会超过 9 个)

  •   -SNAPSHOT :快照


  • GA 版本没有限定符


快照在上面的顺序中显示得较高,因为从概念上讲,它们始终是任何给定补丁的“最新预发行版”。尽管 PATCH 周期的第一个部署的工件始终是 -SNAPSHOT,但在例如之后也会发布一个类似名称但更新的快照。里程碑或候选版本之间。


每个发布周期也会被赋予一个代号,与之前基于代号的方案保持一致,可以用来更非正式地引用它(例如在讨论、博客文章等中……​)。代号代表传统上的 MAJOR.MINOR 数字。它们(大部分)来自元素周期表,按字母顺序递增。


直到 Dysprosium 为止,BOM 都是使用发布序列方案进行版本控制,其中代号后跟限定符,并且限定符略有不同。例如:Aluminium-RELEASE(第一个 GA 版本,现在类似于 YYYY.0.0)、Bismuth-M1、Californium-SR1(服务版本现在类似于 YYYY.0.1)、Dysprosium-RC1、Dysprosium-BUILD-SNAPSHOT (每个补丁之后,我们都会返回到相同的快照版本。现在会类似于 YYYY.0.X-SNAPSHOT,因此我们每个补丁都会获得 1 个快照)

 2.4.获取反应堆


如前所述,在核心中使用 Reactor 最简单的方法是使用 BOM 并将相关依赖项添加到项目中。请注意,添加此类依赖项时,必须省略版本,以便从 BOM 中获取该版本。


但是,如果您想强制使用特定工件的版本,您可以像平常一样在添加依赖项时指定它。您还可以完全放弃 BOM,并通过其工件版本指定依赖项。


从此版本(reactor-core 3.6.4)开始,相关发布系列中最新的稳定 BOM 是 2023.0.4 ,这就是下面的代码片段中使用的内容。此后可能会有更新的版本(包括快照、里程碑和新的发布系列),请参阅 https://projectreactor.io/docs 了解最新的工件和 BOM。


2.4.1. Maven安装


Maven 本身支持 BOM 概念。首先,您需要通过将以下代码段添加到 pom.xml 来导入 BOM:

<dependencyManagement> (1)
    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-bom</artifactId>
            <version>2023.0.4</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
1
请注意 dependencyManagement 标记。这是对常规 dependencies 部分的补充。


如果顶部部分 ( dependencyManagement ) 已存在于您的 pom 中,则仅添加内容。


接下来,像往常一样将您的依赖项添加到相关的反应器项目中,但没有 <version> ,如下所示:

<dependencies>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId> (1)
        (2)
    </dependency>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-test</artifactId> (3)
        <scope>test</scope>
    </dependency>
</dependencies>
1
对核心库的依赖。
2
这里没有版本标签。
3
reactor-test 提供对反应流进行单元测试的工具。


2.4.2.摇篮安装


在 5.0 版本之前,Gradle 没有对 Maven BOM 的核心支持,但您可以使用 Spring 的 gradle-dependency-management 插件。


首先,从 Gradle 插件门户应用插件,如下所示:

plugins {
    id "io.spring.dependency-management" version "1.0.7.RELEASE" (1)
}
1
截至撰写本文时,1.0.7.RELEASE 是该插件的最新版本。检查更新。


然后用它导入BOM,如下:

dependencyManagement {
     imports {
          mavenBom "io.projectreactor:reactor-bom:2023.0.4"
     }
}


最后向您的项目添加依赖项,不带版本号,如下所示:

dependencies {
     implementation 'io.projectreactor:reactor-core' (1)
}
1
该版本没有第三个 : 分隔部分。它取自 BOM。


从 Gradle 5.0 开始,您可以使用对 BOM 的本机 Gradle 支持:

dependencies {
     implementation platform('io.projectreactor:reactor-bom:2023.0.4')
     implementation 'io.projectreactor:reactor-core' (1)
}
1
该版本没有第三个 : 分隔部分。它取自 BOM。


2.4.3.里程碑和快照


里程碑和开发人员预览是通过 Spring Milestones 存储库而不是 Maven Central 分发的。要将其添加到构建配置文件中,请使用以下代码片段:


示例 1. Maven 中的里程碑
<repositories>
	<repository>
		<id>spring-milestones</id>
		<name>Spring Milestones Repository</name>
		<url>https://repo.spring.io/milestone</url>
	</repository>
</repositories>


对于 Gradle,请使用以下代码片段:


示例 2. Gradle 中的里程碑
repositories {
  maven { url 'https://repo.spring.io/milestone' }
  mavenCentral()
}


同样,快照也可在单独的专用存储库中使用,如下例所示:


示例 3.-Maven 中的快照
<repositories>
	<repository>
		<id>spring-snapshots</id>
		<name>Spring Snapshot Repository</name>
		<url>https://repo.spring.io/snapshot</url>
	</repository>
</repositories>

示例 4.-Gradle 中的快照
repositories {
  maven { url 'https://repo.spring.io/snapshot' }
  mavenCentral()
}


2.5.支持与政策


以下条目镜像 https://github.com/reactor/.github/blob/main/SUPPORT.adoc


2.5.1.你有问题吗?


首先搜索 Stack Overflow;如有需要可讨论


如果您不确定为什么某些功能不起作用或想知道是否有更好的方法,请首先查看 Stack Overflow,如有必要,请开始讨论。使用我们为此目的监控的标签中的相关标签:


  • reactor-netty 针对特定的reactor-netty问题


  • project-reactor 用于通用反应器问题


如果您喜欢实时讨论,我们还有一些 Gitter 频道:


  • reactor 是历史上最活跃的一个,大多数社区都可以提供帮助


  • reactor-core 用于围绕库的内部运作进行更高级的精确讨论


  • reactor-netty 用于解决特定于 netty 的问题


请参阅每个项目的自述文件,了解潜在的其他信息来源。


我们通常不鼓励打开 GitHub 问题来提问,而倾向于使用上述两个渠道。


2.5.2.我们的弃用政策


在处理弃用时,给定版本 A.B.C ,我们将确保:


  • 版本 A 中引入的弃用。 B0 将在版本 A 之前被删除。 B+10


  • 版本 A 中引入的弃用。 B1+ 将在版本 A 之前被删除。 B+20


  • 我们将努力在弃用 javadoc 中提及以下内容:


    • 要删除的目标最低版本


    • 指向已弃用方法的替代方法的指针


    • 不推荐使用该方法的版本


此政策自 2021 年 1 月起正式生效,适用于 2020.0 BOM 和较新版本系列中的所有模块,以及 Dysprosium-SR15 之后的 Dysprosium 版本。

弃用删除目标并不是硬性承诺,并且弃用的方法可以比这些最低目标 GA 版本继续存在(即,只有问题最严重的弃用方法才会被积极删除)。

也就是说,已超过其最低删除目标版本的已弃用代码可能会在任何后续版本(包括补丁版本,又称服务版本)中删除,恕不另行通知。所以用户还是应该努力尽早更新自己的代码。


2.5.3.积极发展


下表总结了各种 Reactor 版本系列的开发状态:

 版本  支持的


2022.0.x(核心 3.5.x,netty 1.1.x)


2020.0.x(代号Europium)(核心3.4.x,netty 1.0.x)


Dysprosium Train(核心 3.3.x,netty 0.9.x)


锎及以下(核心 < 3.3,网状 < 0.9)


Reactor 1.x 和 2.x 代


建议编辑“入门”


3. 响应式编程简介


Reactor是响应式编程范式的一种实现,可以概括如下:


反应式编程是一种与数据流和变化传播有关的异步编程范例。这意味着可以通过所使用的编程语言轻松表达静态(例如数组)或动态(例如事件发射器)数据流。
— https://en.wikipedia.org/wiki/Reactive_programming


作为响应式编程方向的第一步,Microsoft 在 .NET 生态系统中创建了响应式扩展 (Rx) 库。然后RxJava在JVM上实现了响应式编程。随着时间的推移,通过 Reactive Streams 的努力出现了 Java 标准化,该规范为 JVM 上的反应式库定义了一组接口和交互规则。它的接口已集成到 Java 9 中的 Flow 类下。


反应式编程范式通常以面向对象语言的形式呈现,作为观察者设计模式的扩展。您还可以将主要反应流模式与熟悉的迭代器设计模式进行比较,因为所有这些库中的 Iterable - Iterator 对都具有对偶性。一个主要区别是,迭代器是基于拉式的,而反应流是基于推式的。


使用迭代器是一种命令式编程模式,即使访问值的方法完全由 Iterable 负责。事实上,由开发人员选择何时访问序列中的 next() 项。在反应式流中,上述对的等效项是 Publisher-Subscriber 。但正是 Publisher 通知订阅者新的可用值,而这个推送方面是反应的关键。此外,应用于推送值的操作是声明式表达的,而不是命令式的:程序员表达计算的逻辑,而不是描述其精确的控制流。


除了推送值之外,还以明确定义的方式涵盖了错误处理和完成方面。 Publisher 可以将新值推送到其 Subscriber (通过调用 onNext ),但也可以发出错误信号(通过调用 onError )或完成(通过调用 onComplete )。错误和完成都会终止序列。这可以总结如下:

onNext x 0..N [onError | onComplete]


这种方法非常灵活。该模式支持没有值、一个值或 n 个值(包括无限序列的值,例如时钟的连续滴答声)的用例。


但为什么我们首先需要这样一个异步反应式库呢?


3.1.阻塞可能会造成浪费


现代应用程序可以覆盖大量并发用户,尽管现代硬件的功能不断提高,但现代软件的性能仍然是一个关键问题。


一般来说,有两种方法可以提高程序的性能:


  • 并行化以使用更多线程和更多硬件资源。


  • 寻求提高现有资源使用效率。


通常,Java 开发人员使用阻塞代码来编写程序。这种做法很好,直到出现性能瓶颈。然后是时候引入额外的线程,运行类似的阻塞代码。但资源利用率的这种扩展会很快引入争用和并发问题。


更糟糕的是,阻塞会浪费资源。如果仔细观察,一旦程序涉及一些延迟(特别是 I/O,例如数据库请求或网络调用),资源就会被浪费,因为线程(可能是许多线程)现在处于空闲状态,等待数据。


因此并行化方法并不是灵丹妙药。有必要充分利用硬件的能力,但推理起来也很复杂,而且容易造成资源浪费。


3.2.异步来拯救?


前面提到的第二种方法,寻求更高的效率,可以解决资源浪费问题。通过编写异步、非阻塞代码,您可以让执行切换到另一个使用相同底层资源的活动任务,并在异步处理完成后返回到当前进程。


但是如何在 JVM 上生成异步代码呢? Java 提供了两种异步编程模型:


  • 回调:异步方法没有返回值,但采用额外的 callback 参数(lambda 或匿名类),当结果可用时调用该参数。一个众所周知的例子是 Swing 的 EventListener 层次结构。


  • Futures:异步方法立即返回 Future<T> 。异步进程计算 T 值,但 Future 对象包装对其的访问。该值不会立即可用,可以轮询该对象,直到该值可用。例如,运行 Callable<T> 任务的 ExecutorService 使用 Future 对象。


这些技术足够好吗?并非适用于所有用例,并且这两种方法都有局限性。


回调很难组合在一起,很快就会导致代码难以阅读和维护(称为“回调地狱”)。


考虑一个示例:在用户界面上显示用户最喜欢的五个,或者在她没有最喜欢的情况下显示建议。这通过三个服务(一个提供最喜欢的 ID,第二个获取最喜欢的详细信息,第三个提供包含详细信息的建议),如下所示:


示例 5. 回调地狱示例
userService.getFavorites(userId, new Callback<List<String>>() { (1)
  public void onSuccess(List<String> list) { (2)
    if (list.isEmpty()) { (3)
      suggestionService.getSuggestions(new Callback<List<Favorite>>() {
        public void onSuccess(List<Favorite> list) { (4)
          UiUtils.submitOnUiThread(() -> { (5)
            list.stream()
                .limit(5)
                .forEach(uiList::show); (6)
            });
        }

        public void onError(Throwable error) { (7)
          UiUtils.errorPopup(error);
        }
      });
    } else {
      list.stream() (8)
          .limit(5)
          .forEach(favId -> favoriteService.getDetails(favId, (9)
            new Callback<Favorite>() {
              public void onSuccess(Favorite details) {
                UiUtils.submitOnUiThread(() -> uiList.show(details));
              }

              public void onError(Throwable error) {
                UiUtils.errorPopup(error);
              }
            }
          ));
    }
  }

  public void onError(Throwable error) {
    UiUtils.errorPopup(error);
  }
});
1
我们有基于回调的服务:一个 Callback 接口,其中包含异步过程成功时调用的方法和发生错误时调用的方法。
2
第一个服务使用最喜欢的 ID 列表调用其回调。
3
如果列表为空,我们必须转到 suggestionService
4
suggestionService 向第二个回调提供 List<Favorite>
5
由于我们处理 UI,因此需要确保我们的消费代码在 UI 线程中运行。
6
我们使用 Java 8 Stream 将处理的建议数量限制为 5,并在 UI 中以图形列表形式显示它们。
7
在每个级别,我们都以相同的方式处理错误:我们在弹出窗口中显示它们。
8
返回最喜欢的 ID 级别。如果服务返回完整列表,我们需要转到 favoriteService 来获取详细的 Favorite 对象。由于我们只需要五个,因此我们首先流式传输 ID 列表以将其限制为五个。
9
再次回调。这次我们得到了一个成熟的 Favorite 对象,我们将其推送到 UI 线程内的 UI。


这是很多代码,有点难以理解并且有重复的部分。考虑它在 Reactor 中的等价物:


示例 6. 与回调代码等效的 Reactor 代码示例
userService.getFavorites(userId) (1)
           .flatMap(favoriteService::getDetails) (2)
           .switchIfEmpty(suggestionService.getSuggestions()) (3)
           .take(5) (4)
           .publishOn(UiUtils.uiThreadScheduler()) (5)
           .subscribe(uiList::show, UiUtils::errorPopup); (6)
1
我们从最喜欢的 ID 流开始。
2
我们将它们异步转换为详细的 Favorite 对象 ( flatMap )。我们现在的流程是 Favorite
3
如果 Favorite 的流程为空,我们会通过 suggestionService 切换到回退。
4
我们最多只对结果流中的五个元素感兴趣。
5
最后,我们要在UI线程中处理每一条数据。
6
我们通过描述如何处理数据的最终形式(在 UI 列表中显示它)以及发生错误时如何执行(显示弹出窗口)来触发流程。


如果您想确保在 800 毫秒内检索到最喜爱的 ID,或者如果需要更长的时间,则从缓存中获取它们,该怎么办?在基于回调的代码中,这是一项复杂的任务。在 Reactor 中,它变得就像在链中添加 timeout 运算符一样简单,如下所示:


示例 7. 具有超时和回退功能的 Reactor 代码示例
userService.getFavorites(userId)
           .timeout(Duration.ofMillis(800)) (1)
           .onErrorResume(cacheService.cachedFavoritesFor(userId)) (2)
           .flatMap(favoriteService::getDetails) (3)
           .switchIfEmpty(suggestionService.getSuggestions())
           .take(5)
           .publishOn(UiUtils.uiThreadScheduler())
           .subscribe(uiList::show, UiUtils::errorPopup);
1
如果上面的部分超过 800 毫秒没有发出任何信号,则传播错误。
2
如果出现错误,请返回到 cacheService
3
链的其余部分与前面的示例类似。


Future 对象比回调好一点,但尽管 CompletableFuture 在 Java 8 中带来了改进,但它们在组合方面仍然表现不佳。将多个 Future 对象编排在一起是可行的,但并不容易。此外, Future 还有其他问题:


  • 通过调用 get() 方法,很容易导致 Future 对象出现另一种阻塞情况。


  • 它们不支持惰性计算。


  • 它们缺乏对多个值和高级错误处理的支持。


考虑另一个例子:我们得到一个 ID 列表,我们想要从中获取名称和统计信息,并将它们成对地组合起来,所有这些都是异步的。以下示例使用 CompletableFuture 类型的列表执行此操作:


示例 8. CompletableFuture 组合示例
CompletableFuture<List<String>> ids = ifhIds(); (1)

CompletableFuture<List<String>> result = ids.thenComposeAsync(l -> { (2)
	Stream<CompletableFuture<String>> zip =
			l.stream().map(i -> { (3)
				CompletableFuture<String> nameTask = ifhName(i); (4)
				CompletableFuture<Integer> statTask = ifhStat(i); (5)

				return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + name + " has stats " + stat); (6)
			});
	List<CompletableFuture<String>> combinationList = zip.collect(Collectors.toList()); (7)
	CompletableFuture<String>[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]);

	CompletableFuture<Void> allDone = CompletableFuture.allOf(combinationArray); (8)
	return allDone.thenApply(v -> combinationList.stream()
			.map(CompletableFuture::join) (9)
			.collect(Collectors.toList()));
});

List<String> results = result.join(); (10)
assertThat(results).contains(
		"Name NameJoe has stats 103",
		"Name NameBart has stats 104",
		"Name NameHenry has stats 105",
		"Name NameNicole has stats 106",
		"Name NameABSLAJNFOAJNFOANFANSF has stats 121");
1
我们从一个 future 开始,它为我们提供了要处理的 id 值列表。
2
获得列表后,我们希望开始一些更深入的异步处理。
3
对于列表中的每个元素:
4
异步获取关联名称。
5
异步获取相关统计信息。
6  合并两个结果。
7
我们现在有一个代表所有组合任务的 future 列表。为了执行这些任务,我们需要将列表转换为数组。
8
将数组传递给 CompletableFuture.allOf ,它会输出一个 Future ,该 Future 在所有任务完成后完成。
9
棘手的一点是 allOf 返回 CompletableFuture<Void> ,因此我们在 future 列表上重申,通过使用 join() 收集它们的结果(这里,这不会阻止,因为 allOf 确保期货全部完成)。
10
一旦整个异步管道被触发,我们就会等待它被处理并返回我们可以断言的结果列表。


由于 Reactor 有更多开箱即用的组合运算符,因此可以简化此过程,如下所示:


示例 9. 与未来代码等效的 Reactor 代码示例
Flux<String> ids = ifhrIds(); (1)

Flux<String> combinations =
		ids.flatMap(id -> { (2)
			Mono<String> nameTask = ifhrName(id); (3)
			Mono<Integer> statTask = ifhrStat(id); (4)

			return nameTask.zipWith(statTask, (5)
					(name, stat) -> "Name " + name + " has stats " + stat);
		});

Mono<List<String>> result = combinations.collectList(); (6)

List<String> results = result.block(); (7)
assertThat(results).containsExactly( (8)
		"Name NameJoe has stats 103",
		"Name NameBart has stats 104",
		"Name NameHenry has stats 105",
		"Name NameNicole has stats 106",
		"Name NameABSLAJNFOAJNFOANFANSF has stats 121"
);
1
这次,我们从异步提供的 ids 序列( Flux<String> )开始。
2
对于序列中的每个元素,我们异步处理它两次(在主体 flatMap 调用的函数内)。
3
获取关联名称。
4
获取相关统计数据。
5
异步组合两个值。
6
当这些值可用时,将它们聚合到 List 中。
7
在生产中,我们将通过进一步组合或订阅它来继续异步使用 Flux 。最有可能的是,我们会返回 result Mono 。由于我们正在进行测试,因此我们会阻塞,等待处理完成,然后直接返回聚合的值列表。
8  断言结果。


使用回调和 Future 对象的危险是相似的,这正是反应式编程通过 Publisher-Subscriber 对解决的问题。


3.3.从命令式编程到反应式编程


反应式库(例如 Reactor)旨在解决 JVM 上“经典”异步方法的这些缺点,同时还关注一些其他方面:


  • 可组合性和可读性


  • 数据作为一个流,通过丰富的运算符词汇进行操作


  • 在您订阅之前什么都不会发生


  • 背压或消费者向生产者发出排放率过高信号的能力


  • 与并发无关的高水平但高价值的抽象


3.3.1.可组合性和可读性


“可组合性”是指编排多个异步任务的能力,其中我们使用先前任务的结果将输入提供给后续任务。或者,我们可以以分叉连接方式运行多个任务。此外,我们可以将异步任务作为更高级别系统中的离散组件进行重用。


编排任务的能力与代码的可读性和可维护性紧密耦合。随着异步进程层的数量和复杂性的增加,编写和读取代码变得越来越困难。正如我们所看到的,回调模型很简单,但其主要缺点之一是,对于复杂的流程,您需要从回调中执行回调,而回调本身又嵌套在另一个回调中,等等。这种混乱被称为“回调地狱”。正如您可以猜测的那样(或从经验中知道),这样的代码很难回溯和推理。


Reactor 提供了丰富的组合选项,其中代码反映了抽象过程的组织,并且所有内容通常都保持在同一级别(嵌套被最小化)。


3.3.2.装配线类比


您可以将反应式应用程序处理的数据视为在装配线上移动。 Reactor既是传送带,又是工作站。原材料从源头(原始的 Publisher )流出,最终成为成品,准备推向消费者(或 Subscriber )。


原材料可以经历各种转变和其他中间步骤,或者成为将中间件聚合在一起的更大装配线的一部分。如果某一点出现故障或堵塞(也许装箱产品需要花费不成比例的长时间),受影响的工作站可以向上游发出信号,以限制原材料的流动。

 3.3.3.运营商


在 Reactor 中,操作员是我们装配类比中的工作站。每个运算符都会向 Publisher 添加行为,并将上一步的 Publisher 包装到一个新实例中。整个链因此被链接起来,使得数据源自第一个 Publisher 并沿着链向下移动,并由每个链接进行转换。最终, Subscriber 完成了该过程。请记住,在 Subscriber 订阅 Publisher 之前不会发生任何事情,我们很快就会看到。


了解运算符创建新实例可以帮助您避免常见错误,这种错误会让您相信您在链中使用的运算符没有被应用。请参阅常见问题解答中的此项。


虽然反应式流规范根本没有指定运算符,但反应式库(例如 Reactor)的最佳附加值之一是它们提供的丰富的运算符词汇表。这些涵盖了很多领域,从简单的转换和过滤到复杂的编排和错误处理。


3.3.4.在你 subscribe() 之前什么都不会发生


在 Reactor 中,当您编写 Publisher 链时,默认情况下数据不会开始注入其中。相反,您创建异步流程的抽象描述(这有助于提高可重用性和组合性)。


通过订阅行为,您将 Publisher 绑定到 Subscriber ,这会触发整个链中的数据流。这是通过来自 Subscriber 的单个 request 信号在内部实现的,该信号向上游传播,一直返回到源 Publisher

 3.3.5.背压


向上游传播信号也用于实现反压,我们在装配线类比中将其描述为当工作站处理速度比上游工作站慢时沿生产线发送的反馈信号。


Reactive Streams 规范定义的真实机制非常接近这个类比:订阅者可以在无界模式下工作,让源以最快的可实现速率推送所有数据,或者可以使用 request 机制来向源发出信号,表明它已准备好处理最多 n 个元素。


中间操作员还可以更改传输中的请求。想象一个 buffer 运算符将元素按十个为一组进行分组。如果订阅者请求一个缓冲区,则源产生十个元素是可以接受的。一些运算符还实现预取策略,这可以避免 request(1) 往返,并且如果在请求之前生成元素的成本不是太高,那么这是有益的。


这将推模型转换为推拉混合模型,其中下游可以从上游拉取 n 个元素(如果这些元素随时可用)。但如果元素还没有准备好,它们在生产时就会被上游推送。


3.3.6.热与冷


Rx 反应库家族区分了两大类反应序列:热反应序列和冷反应序列。这种区别主要与反应流如何对订阅者做出反应有关:


  • 对于每个 Subscriber ,冷序列都会重新开始,包括在数据源处。例如,如果源包装 HTTP 调用,则会为每个订阅发出新的 HTTP 请求。


  • 对于每个 Subscriber ,热序列不会从头开始。相反,迟到的订阅者会收到订阅后发出的信号。但请注意,某些热反应流可以全部或部分缓存或重放排放历史记录。从一般角度来看,当没有订阅者正在侦听时,热序列甚至可以发出(“订阅之前不会发生任何事情”规则的例外)。


有关 Reactor 上下文中的热与冷的更多信息,请参阅此特定于 Reactor 的部分。


建议编辑“反应式编程简介”


4. Reactor核心特点


Reactor 项目的主要工件是 reactor-core ,这是一个专注于 Reactive Streams 规范并针对 Java 8 的反应式库。


Reactor 引入了可组合的反应类型,它实现了 Publisher ,但也提供了丰富的运算符词汇: FluxMonoFlux 对象表示 0..N 个项目的反应序列,而 Mono 对象表示单值或空 (0..1) 结果。


这种区别将一些语义信息带入类型中,指示异步处理的粗略基数。例如,HTTP 请求仅产生一个响应,因此执行 count 操作没有多大意义。因此,将此类 HTTP 调用的结果表示为 Mono<HttpResponse> 比将其表示为 Flux<HttpResponse> 更有意义,因为它仅提供与零个或一个项目的上下文相关的运算符物品。


更改处理的最大基数的运算符也会切换到相关类型。例如, count 运算符存在于 Flux 中,但它返回 Mono<Long>


4.1. Flux ,0-N 项的异步序列


下图显示了 Flux 如何转换项目:

Flux


Flux<T> 是标准 Publisher<T> ,表示 0 到 N 个发出项目的异步序列,可以选择由完成信号或错误终止。与反应流规范中一样,这三种类型的信号转换为对下游订阅者的 onNextonCompleteonError 方法的调用。


由于可能的信号范围很大, Flux 是通用的反应类型。请注意,所有事件,甚至终止事件,都是可选的:没有 onNext 事件,但 onComplete 事件表示空的有限序列,但删除 onComplete 并且您有一个无限的空序列(不是特别有用,除了围绕取消的测试)。同样,无限序列不一定是空的。例如, Flux.interval(Duration) 生成一个无限的 Flux<Long> 并从时钟发出规则的滴答声。


4.2. Mono ,异步 0-1 结果


下图显示了 Mono 如何转换项目:

Mono


Mono<T> 是一种专门的 Publisher<T> ,它通过 onNext 信号最多发出一项,然后以 onComplete 信号终止(成功的 Mono ,有或没有值),或仅发出单个 onError 信号(失败 Mono )。


大多数 Mono 实现在调用 onNext 后应立即在其 Subscriber 上调用 onCompleteMono.never() 是一个异常值:它不发出任何信号,这在技术上并没有被禁止,尽管在测试之外并不是很有用。另一方面,明确禁止 onNextonError 的组合。


Mono 仅提供可用于 Flux 的运算符子集,以及一些运算符(特别是那些将 Mono 与另一个 Publisher 。例如, Mono#concatWith(Publisher) 返回 FluxMono#then(Mono) 返回另一个 Mono


请注意,您可以使用 Mono 来表示仅具有完成概念的无值异步流程(类似于 Runnable )。要创建一个,您可以使用空的 Mono<Void>


4.3.创建 Flux 或 Mono 并订阅它的简单方法


开始使用 FluxMono 的最简单方法是使用在各自类中找到的众多工厂方法之一。


例如,要创建 String 序列,您可以枚举它们或将它们放入集合中并从中创建 Flux,如下所示:

Flux<String> seq1 = Flux.just("foo", "bar", "foobar");

List<String> iterable = Arrays.asList("foo", "bar", "foobar");
Flux<String> seq2 = Flux.fromIterable(iterable);


工厂方法的其他示例包括:

Mono<String> noData = Mono.empty(); (1)

Mono<String> data = Mono.just("foo");

Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3); (2)
1
请注意,工厂方法遵循泛型类型,即使它没有值。
2
第一个参数是范围的开始,第二个参数是要生产的项目数。


在订阅方面, FluxMono 使用 Java 8 lambda。您可以选择多种 .subscribe() 变体,这些变体采用 lambda 来实现不同的回调组合,如以下方法签名所示:


示例 10. Flux 基于 Lambda 的订阅变体
subscribe(); (1)

subscribe(Consumer<? super T> consumer); (2)

subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer); (3)

subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer); (4)

subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer,
          Consumer<? super Subscription> subscriptionConsumer); (5)
1
订阅并触发序列。
2
对每个产生的价值做一些事情。
3
处理价值观,但也要对错误做出反应。
4
处理值和错误,但在序列成功完成时运行一些代码。
5
处理值和错误并成功完成,但也对此 subscribe 调用生成的 Subscription 执行某些操作。

这些变体返回对订阅的引用,您可以在不需要更多数据时使用该引用取消订阅。取消后,源应停止生成值并清理其创建的所有资源。这种取消和清理行为在 Reactor 中由通用 Disposable 接口表示。


4.3.1. subscribe 方法示例


本节包含 subscribe 方法的五个签名中每个签名的最小示例。以下代码显示了不带参数的基本方法的示例:

Flux<Integer> ints = Flux.range(1, 3); (1)
ints.subscribe(); (2)
1
设置一个 Flux ,在订阅者附加时生成三个值。
2
以最简单的方式订阅。


前面的代码不会产生可见的输出,但它确实有效。 Flux 产生三个值。如果我们提供 lambda,我们就可以使值可见。 subscribe 方法的下一个示例显示了显示值的一种方法:

Flux<Integer> ints = Flux.range(1, 3); (1)
ints.subscribe(i -> System.out.println(i)); (2)
1
设置一个 Flux ,在订阅者附加时生成三个值。
2
与将打印值的订阅者一起订阅。


前面的代码产生以下输出:

1
2
3


为了演示下一个签名,我们故意引入一个错误,如下例所示:

Flux<Integer> ints = Flux.range(1, 4) (1)
      .map(i -> { (2)
        if (i <= 3) return i; (3)
        throw new RuntimeException("Got to 4"); (4)
      });
ints.subscribe(i -> System.out.println(i), (5)
      error -> System.err.println("Error: " + error));
1
设置一个 Flux,在订阅者连接时生成四个值。
2
我们需要一个映射,以便我们可以以不同的方式处理某些值。
3
对于大多数值,返回该值。
4
对于一个值,强制产生一个错误。
5
使用包含错误处理程序的订阅者进行订阅。


现在我们有两个 lambda 表达式:一个用于我们期望的内容,另一个用于错误。前面的代码产生以下输出:

1
2
3
Error: java.lang.RuntimeException: Got to 4


subscribe 方法的下一个签名包括错误处理程序和完成事件处理程序,如以下示例所示:

Flux<Integer> ints = Flux.range(1, 4); (1)
ints.subscribe(i -> System.out.println(i),
    error -> System.err.println("Error " + error),
    () -> System.out.println("Done")); (2)
1
设置一个 Flux,在订阅者连接时生成四个值。
2
使用包含完成事件处理程序的订阅者进行订阅。


错误信号和完成信号都是终止事件并且是互斥的(你永远不会同时得到两者)。为了使完成消费者工作,我们必须注意不要触发错误。


完成回调没有输入,由一对空括号表示:它与 Runnable 接口中的 run 方法匹配。前面的代码产生以下输出:

1
2
3
4
Done


4.3.2.使用 Disposable 取消 subscribe()


所有这些基于 lambda 的 subscribe() 变体都有一个 Disposable 返回类型。在本例中, Disposable 接口表示可以通过调用其 dispose() 方法来取消订阅。


对于 FluxMono ,取消是源应停止生成元素的信号。但是,不能保证立即执行:某些源可能会非常快地生成元素,甚至在收到取消指令之前它们就可以完成。


Disposable 周围的一些实用程序可在 Disposables 类中使用。其中, Disposables.swap() 创建一个 Disposable 包装器,让您可以原子地取消和替换具体的 Disposable 。例如,在 UI 场景中,每当用户单击按钮时,您想要取消请求并用新请求替换它,这可能很有用。处理包装纸本身即可将其关闭。这样做会处理当前的具体值和所有未来尝试的替换。


另一个有趣的实用程序是 Disposables.composite(…​) 。此组合允许您收集多个 Disposable — 例如,与服务调用关联的多个正在进行的请求 — 并稍后立即处理所有这些请求。一旦组合的 dispose() 方法被调用,任何添加另一个 Disposable 的尝试都会立即释放它。


4.3.3. Lambda 的替代方案: BaseSubscriber


还有一种更通用的 subscribe 方法,它采用成熟的 Subscriber ,而不是从 lambda 中组合一个。为了帮助编写这样的 Subscriber ,我们提供了一个名为 BaseSubscriber 的可扩展类。


BaseSubscriber (或其子类)的实例是一次性的,这意味着如果 BaseSubscriber 订阅了第二个 Publisher ,则它会取消对第一个 Publisher 的订阅 Publisher 。这是因为使用一个实例两次会违反反应流规则,即 SubscriberonNext 方法不得并行调用。因此,只有直接在 Publisher#subscribe(Subscriber) 调用中声明匿名实现才可以。


现在我们可以实现其中之一。我们称之为 SampleSubscriber 。以下示例显示了如何将其附加到 Flux

SampleSubscriber<Integer> ss = new SampleSubscriber<Integer>();
Flux<Integer> ints = Flux.range(1, 4);
ints.subscribe(ss);


以下示例显示了 SampleSubscriber 作为 BaseSubscriber 的简约实现的样子:

package io.projectreactor.samples;

import org.reactivestreams.Subscription;

import reactor.core.publisher.BaseSubscriber;

public class SampleSubscriber<T> extends BaseSubscriber<T> {

	@Override
	public void hookOnSubscribe(Subscription subscription) {
		System.out.println("Subscribed");
		request(1);
	}

	@Override
	public void hookOnNext(T value) {
		System.out.println(value);
		request(1);
	}
}


SampleSubscriber 类扩展了 BaseSubscriber ,这是 Reactor 中用户定义的 Subscribers 推荐的抽象类。该类提供了可以重写的挂钩来调整订阅者的行为。默认情况下,它会触发无界请求,其行为与 subscribe() 完全相同。但是,当您需要自定义请求量时,扩展 BaseSubscriber 会更有用。


对于自定义请求量,最低限度是实现 hookOnSubscribe(Subscription subscription)hookOnNext(T value) ,就像我们所做的那样。在我们的例子中, hookOnSubscribe 方法将一条语句打印到标准输出并发出第一个请求。然后 hookOnNext 方法打印一条语句并执行其他请求,一次一个请求。


SampleSubscriber 类产生以下输出:

Subscribed
1
2
3
4


BaseSubscriber 还提供了 requestUnbounded() 方法来切换到无界模式(相当于 request(Long.MAX_VALUE) ),以及 cancel() 方法。


它还具有额外的钩子: hookOnCompletehookOnErrorhookOnCancelhookFinally (当序列终止时总是调用它,带有作为 SignalType 参数传入的终止类型)


您几乎肯定想要实现 hookOnErrorhookOnCancelhookOnComplete 方法。您可能还想实现 hookFinally 方法。 SampleSubscriber 是执行有界请求的 Subscriber 的绝对最小实现。


4.3.4.关于背压和重塑请求的方法


在 Reactor 中实现背压时,消费者压力传播回源的方式是向上游算子发送 request 。当前请求的总和有时被称为当前“需求”或“待处理请求”。需求上限为 Long.MAX_VALUE ,代表无限制的请求(意味着“尽可能快地生产”——基本上禁用背压)。


第一个请求来自订阅时的最终订阅者,但最直接的订阅方式都会立即触发 Long.MAX_VALUE 的无限请求:


  • subscribe() 及其大多数基于 lambda 的变体(具有 Consumer<Subscription> 的变体除外)


  • block()blockFirst()blockLast()


  • 迭代 toIterable()toStream()


自定义原始请求的最简单方法是将 subscribeBaseSubscriber 结合使用,并覆盖 hookOnSubscribe 方法,如以下示例所示:

Flux.range(1, 10)
    .doOnRequest(r -> System.out.println("request of " + r))
    .subscribe(new BaseSubscriber<Integer>() {

      @Override
      public void hookOnSubscribe(Subscription subscription) {
        request(1);
      }

      @Override
      public void hookOnNext(Integer integer) {
        System.out.println("Cancelling after having received " + integer);
        cancel();
      }
    });


前面的代码片段打印出以下内容:

request of 1
Cancelling after having received 1

在操作请求时,您必须小心地产生足够的需求以使序列前进,否则您的 Flux 可能会“卡住”。这就是为什么 BaseSubscriber 默认为 hookOnSubscribe 中的无界请求。当覆盖此钩子时,您通常应该至少调用一次 request

改变下游需求的运营商


需要记住的一件事是,上游链中的每个运营商都可以重塑订阅级别表达的需求。一个教科书案例是 buffer(N) 运算符:如果它接收到 request(2) ,则它被解释为对两个完整缓冲区的需求。因此,由于缓冲区需要 N 元素才能被视为已满,因此 buffer 运算符将请求重塑为 2 x N


您可能还注意到,某些运算符具有采用名为 prefetchint 输入参数的变体。这是修改下游请求的另一类运算符。这些通常是处理内部序列的运算符,从每个传入元素派生出 Publisher (如 flatMap )。


预取是一种调整对这些内部序列发出的初始请求的方法。如果未指定,大多数这些运算符都以 32 需求开始。


这些操作符通常还会实现补充优化:一旦操作符看到 75% 的预取请求已得到满足,它就会从上游重新请求 75%。这是一种启发式优化,以便这些操作员能够主动预测即将到来的请求。


最后,几个运算符可让您直接调整请求: limitRatelimitRequest


limitRate(N) 拆分下游请求,以便它们以较小的批次向上游传播。例如,向 limitRate(10) 发出的 100 请求最多会导致 1010 个请求传播到上游。请注意,在这种形式中, limitRate 实际上实现了前面讨论的补充优化。


操作员有一个变体,还可以让您调整补充量(在变体中称为 lowTide ): limitRate(highTide, lowTide) 。选择 0 中的 lowTide 会导致严格的 highTide 请求批次,而不是通过补货策略进一步返工的批次。


另一方面, limitRequest(N) 将下游请求限制为最大总需求。它将请求添加到 N 。如果单个 request 没有使总需求溢出 N ,则该特定请求将完全向上游传播。在源发出该数量后, limitRequest 认为序列已完成,向下游发送 onComplete 信号,并取消源。


4.4.以编程方式创建序列


在本节中,我们将介绍通过以编程方式定义其关联事件( onNextonErroronComplete )。所有这些方法都有一个共同的事实:它们公开一个 API 来触发我们称为接收器的事件。实际上有一些水槽变体,我们很快就会介绍。


4.4.1.同步 generate


以编程方式创建 Flux 的最简单形式是通过 generate 方法,该方法采用生成器函数。


这适用于同步和一对一的发射,这意味着接收器是一个 SynchronousSink 并且它的 next() 方法每次回调调用最多只能调用一次。然后,您可以另外调用 error(Throwable)complete() ,但这是可选的。


最有用的变体可能还可以让您保留一种状态,您可以在接收器使用中参考该状态来决定下一步要发出什么。然后生成器函数变成 BiFunction<S, SynchronousSink<T>, S> ,其中 <S> 是状态对象的类型。您必须为初始状态提供 Supplier<S> ,并且您的生成器函数现在在每一轮返回一个新状态。


例如,您可以使用 int 作为状态:


示例 11. 基于状态的 generate 示例
Flux<String> flux = Flux.generate(
    () -> 0, (1)
    (state, sink) -> {
      sink.next("3 x " + state + " = " + 3*state); (2)
      if (state == 10) sink.complete(); (3)
      return state + 1; (4)
    });
1
我们提供初始状态值 0。
2
我们使用状态来选择要发出的内容(3 乘法表中的一行)。
3
我们还用它来选择何时停止。
4
我们返回一个在下一次调用中使用的新状态(除非序列在本次调用中终止)。


上述代码生成 3 的表,顺序如下:

3 x 0 = 0
3 x 1 = 3
3 x 2 = 6
3 x 3 = 9
3 x 4 = 12
3 x 5 = 15
3 x 6 = 18
3 x 7 = 21
3 x 8 = 24
3 x 9 = 27
3 x 10 = 30


您还可以使用可变的 <S> 。例如,可以使用单个 AtomicLong 作为状态来重写上面的示例,并在每一轮中对其进行变异:


示例 12. 可变状态变体
Flux<String> flux = Flux.generate(
    AtomicLong::new, (1)
    (state, sink) -> {
      long i = state.getAndIncrement(); (2)
      sink.next("3 x " + i + " = " + 3*i);
      if (i == 10) sink.complete();
      return state; (3)
    });
1
这次,我们生成一个可变对象作为状态。
2
我们在这里改变状态。
3
我们返回与新状态相同的实例。

如果您的状态对象需要清理一些资源,请使用 generate(Supplier<S>, BiFunction, Consumer<S>) 变体来清理最后一个状态实例。


以下示例使用包含 Consumergenerate 方法:

Flux<String> flux = Flux.generate(
    AtomicLong::new,
      (state, sink) -> { (1)
      long i = state.getAndIncrement(); (2)
      sink.next("3 x " + i + " = " + 3*i);
      if (i == 10) sink.complete();
      return state; (3)
    }, (state) -> System.out.println("state: " + state)); (4)
1
同样,我们生成一个可变对象作为状态。
2
我们在这里改变状态。
3
我们返回与新状态相同的实例。
4
我们将最后一个状态值 (11) 视为此 Consumer lambda 的输出。


如果状态包含数据库连接或需要在流程结束时处理的其他资源, Consumer lambda 可以关闭连接或以其他方式处理应在结束时完成的任何任务的过程。


4.4.2.异步和多线程: create


createFlux 的编程创建的更高级形式,适用于每轮多次发射,甚至来自多个线程。


它公开了 FluxSink 及其 nexterrorcomplete 方法。与 generate 相反,它没有基于状态的变体。另一方面,它可以在回调中触发多线程事件。


create 对于桥接现有 API 与反应式世界非常有用 - 例如基于侦听器的异步 API。

create 不会并行化您的代码,也不会使其异步,即使它可以与异步 API 一起使用。如果您在 create lambda 中进行阻塞,则会面临死锁和类似副作用的风险。即使使用 subscribeOn ,也需要注意的是,长阻塞 create lambda(例如调用 sink.next(t) 的无限循环)可能会锁定管道:请求永远不会被执行,因为循环使它们应该运行的同一线程处于饥饿状态。使用 subscribeOn(Scheduler, false) 变体: requestOnSeparateThread = false 将为 create 使用 Scheduler 线程,并且仍然通过执行 request


假设您使用基于侦听器的 API。它按块处理数据并有两个事件:(1) 数据块已准备好,(2) 处理完成(终止事件),如 MyEventListener 界面所示:

interface MyEventListener<T> {
    void onDataChunk(List<T> chunk);
    void processComplete();
}


您可以使用 create 将其桥接到 Flux<T>

Flux<String> bridge = Flux.create(sink -> {
    myEventProcessor.register( (4)
      new MyEventListener<String>() { (1)

        public void onDataChunk(List<String> chunk) {
          for(String s : chunk) {
            sink.next(s); (2)
          }
        }

        public void processComplete() {
            sink.complete(); (3)
        }
    });
});
1
桥接到 MyEventListener API
2
块中的每个元素都成为 Flux 中的元素。
3
processComplete 事件被转换为 onComplete
4
每当 myEventProcessor 执行时,所有这些都是异步完成的。


此外,由于 create 可以桥接异步 API 并管理反压,因此您可以通过指示 OverflowStrategy 来细化反压行为方式:


  • IGNORE 完全忽略下游背压请求。当下游队列已满时,这可能会产生 IllegalStateException


  • ERROR 当下游无法跟上时发出 IllegalStateException 信号。


  • DROP 如果下游未准备好接收传入信号,则丢弃该信号。


  • LATEST 让下游只获取来自上游的最新信号。


  • BUFFER (默认)在下游无法跟上时缓冲所有信号。 (这会进行无限制的缓冲,并可能导致 OutOfMemoryError )。


Mono 还有一个 create 生成器。 Mono 的 MonoSink 不允许多次发射。它将丢弃第一个信号之后的所有信号。


4.4.3.异步但单线程: push


pushgeneratecreate 之间的中间立场,适合处理来自单个生产者的事件。它与 create 类似,因为它也可以是异步的,并且可以使用 create 支持的任何溢出策略来管理背压。然而,一次只有一个生产线程可以调用 nextcompleteerror

Flux<String> bridge = Flux.push(sink -> {
    myEventProcessor.register(
      new SingleThreadEventListener<String>() { (1)

        public void onDataChunk(List<String> chunk) {
          for(String s : chunk) {
            sink.next(s); (2)
          }
        }

        public void processComplete() {
            sink.complete(); (3)
        }

        public void processError(Throwable e) {
            sink.error(e); (4)
        }
    });
});
1
桥接到 SingleThreadEventListener API。
2
使用 next 从单个侦听器线程将事件推送到接收器。
3
从同一侦听器线程生成的 complete 事件。
4
error 事件也从同一侦听器线程生成。

混合推/拉模型


大多数 Reactor 运算符(例如 create )遵循混合推/拉模型。我们的意思是,尽管大部分处理是异步的(建议采用推送方法),但其中有一个小的拉取组件:请求。


消费者从源中提取数据,因为在第一次请求之前它不会发出任何数据。只要数据可用,源就会将数据推送给消费者,但要在其请求的数量范围内。


请注意, push()create() 都允许设置 onRequest 消费者,以便管理请求量并确保仅在以下情况下才通过接收器推送数据:有待处理的请求。

Flux<String> bridge = Flux.create(sink -> {
    myMessageProcessor.register(
      new MyMessageListener<String>() {

        public void onMessage(List<String> messages) {
          for(String s : messages) {
            sink.next(s); (3)
          }
        }
    });
    sink.onRequest(n -> {
        List<String> messages = myMessageProcessor.getHistory(n); (1)
        for(String s : messages) {
           sink.next(s); (2)
        }
    });
});
1
发出请求时轮询消息。
2
如果消息立即可用,则将它们推送到接收器。
3
稍后异步到达的剩余消息也会被传递。

push()create() 之后清理


两个回调 onDisposeonCancel 在取消或终止时执行任何清理。 onDispose 可用于在 Flux 完成、出错或取消时执行清理。 onCancel 可用于在使用 onDispose 进行清理之前执行任何特定于取消的操作。

Flux<String> bridge = Flux.create(sink -> {
    sink.onRequest(n -> channel.poll(n))
        .onCancel(() -> channel.cancel()) (1)
        .onDispose(() -> channel.close())  (2)
    });
1
onCancel 首先被调用,仅用于取消信号。
2
onDispose 被调用用于完成、错误或取消信号。

 4.4.4.处理


handle 方法有点不同:它是一个实例方法,这意味着它链接在现有源上(与常见运算符一样)。它出现在 MonoFlux 中。


它接近于 generate ,因为它使用 SynchronousSink 并且只允许一对一的发射。但是, handle 可用于从每个源元素生成任意值,可能会跳过某些元素。这样,它就可以作为 mapfilter 的组合。句柄签名如下:

Flux<R> handle(BiConsumer<T, SynchronousSink<R>>);


让我们考虑一个例子。反应式流规范不允许序列中存在 null 值。如果您想要执行 map 但想要使用预先存在的方法作为映射函数,并且该方法有时返回 null,该怎么办?


例如,以下方法可以安全地应用于整数源:

public String alphabet(int letterNumber) {
	if (letterNumber < 1 || letterNumber > 26) {
		return null;
	}
	int letterIndexAscii = 'A' + letterNumber - 1;
	return "" + (char) letterIndexAscii;
}


然后我们可以使用 handle 删除任何空值:


示例 13. 将 handle 用于“映射并消除空值”场景
Flux<String> alphabet = Flux.just(-1, 30, 13, 9, 20)
    .handle((i, sink) -> {
        String letter = alphabet(i); (1)
        if (letter != null) (2)
            sink.next(letter); (3)
    });

alphabet.subscribe(System.out::println);
1  映射到字母。
2
如果“地图函数”返回 null…​。
3
通过不调用 sink.next 将其过滤掉。


将打印出:

M
I
T


4.5.线程和调度程序


Reactor 与 RxJava 一样,可以被认为是与并发无关的。也就是说,它不强制执行并发模型。相反,它让你(开发人员)来指挥。但是,这并不妨碍该库帮助您处理并发性。


获取 FluxMono 并不一定意味着它在专用的 Thread 中运行。相反,大多数运算符继续在前一个运算符执行的 Thread 中工作。除非指定,否则最顶层的运算符(源)本身在进行 subscribe() 调用的 Thread 上运行。以下示例在新线程中运行 Mono

public static void main(String[] args) throws InterruptedException {
  final Mono<String> mono = Mono.just("hello "); (1)

  Thread t = new Thread(() -> mono
      .map(msg -> msg + "thread ")
      .subscribe(v -> (2)
          System.out.println(v + Thread.currentThread().getName()) (3)
      )
  )
  t.start();
  t.join();

}
1
Mono<String> 在线程 main 中组装。
2
但是,它是在线程 Thread-0 中订阅的。
3
因此, maponNext 回调实际上都在 Thread-0 中运行


前面的代码产生以下输出:

hello thread Thread-0


在 Reactor 中,执行模型和执行发生的位置由所使用的 Scheduler 决定。 Scheduler 具有与 ExecutorService 类似的调度职责,但是拥有专用的抽象可以让它做更多的事情,特别是充当时钟并支持更广泛的实现(测试的虚拟时间,蹦床或立即调度等)。


Schedulers 类具有静态方法,可以访问以下执行上下文:


  • 无执行上下文( Schedulers.immediate() ):在处理时,提交的 Runnable 将被直接执行,有效地将它们运行在当前的 Thread 上(可以看作是“空对象”或无操作 Scheduler )。


  • 单个可重用线程 ( Schedulers.single() )。请注意,此方法为所有调用者重用同一线程,直到调度程序被释放。如果您想要每个调用专用线程,请为每个调用使用 Schedulers.newSingle()


  • 无界弹性线程池 ( Schedulers.elastic() )。随着 Schedulers.boundedElastic() 的引入,这个不再是首选,因为它有隐藏背压问题并导致太多线程的趋势(见下文)。


  • 有界弹性线程池 ( Schedulers.boundedElastic() )。这是为阻塞进程提供自己的线程的便捷方法,这样它就不会占用其他资源。这是 I/O 阻塞工作的更好选择。请参阅如何包装同步、阻塞调用?,但新线程不会给系统带来太大压力。从 3.6.0 开始,根据设置可以提供两种不同的实现:


    • 基于 ExecutorService ,在任务之间重用平台线程。此实现与其前身 elastic() 一样,根据需要创建新的工作池并重用空闲的工作池。闲置时间过长(默认为 60 秒)的工作池也会被丢弃。与它的前身 elastic() 不同,它对可以创建的支持线程数量有上限(默认为 CPU 核心数量 x 10)。达到上限后提交的最多 100 000 个任务将被排队,并在线程可用时重新调度(当延迟调度时,延迟在线程可用时开始)。


    • 基于每个任务线程,设计为在 VirtualThread 实例上运行。要使用该功能,应用程序应在 Java 21+ 环境中运行并将 reactor.schedulers.defaultBoundedElasticOnVirtualThreads 系统属性设置为 true 。设置上述内容后,共享的 Schedulers.boundedElastic() 将返回 BoundedElasticScheduler 的特定实现,该实现是为在 VirtualThread 类的新实例上运行每个任务而定制的。此实现在行为方面与基于 ExecutorService 的实现类似,但没有空闲池并为每个任务创建一个新的 VirtualThread


  • 为并行工作而调整的固定工作人员池 ( Schedulers.parallel() )。它会创建与 CPU 核心数量一样多的工作线程。


此外,您可以使用 Schedulers.fromExecutorService(ExecutorService) 从任何预先存在的 ExecutorService 中创建 Scheduler 。 (您也可以从 Executor 创建一个,但不鼓励这样做。)


您还可以使用 newXXX 方法创建各种调度程序类型的新实例。例如, Schedulers.newParallel(yourScheduleName) 创建一个名为 yourScheduleName 的新并行调度程序。


虽然 boundedElastic 是为了帮助处理无法避免的遗留阻塞代码,但 singleparallel 则不然。因此,使用 Reactor 阻塞 API( block()blockFirst()blockLast() (以及迭代 toIterable()toStream() )在默认的单一和并行调度程序中)会导致抛出 IllegalStateException


自定义 Schedulers 还可以通过创建实现 NonBlocking 标记接口的 Thread 实例来标记为“仅非阻塞”。


某些运算符默认使用 Schedulers 中的特定调度程序(并且通常允许您选择提供不同的调度程序)。例如,调用 Flux.interval(Duration.ofMillis(300)) 工厂方法会生成每 300 毫秒计时一次的 Flux<Long> 。默认情况下,这是由 Schedulers.parallel() 启用的。以下行将 Scheduler 更改为类似于 Schedulers.single() 的新实例:

Flux.interval(Duration.ofMillis(300), Schedulers.newSingle("test"))


Reactor 提供了两种在反应链中切换执行上下文(或 Scheduler )的方法: publishOnsubscribeOn 。两者都采用 Scheduler 并让您将执行上下文切换到该调度程序。但是 publishOn 在链中的位置很重要,而 subscribeOn 的位置则无关紧要。要理解这种差异,您首先必须记住,在您订阅之前什么都不会发生。


在 Reactor 中,当您链接运算符时,您可以根据需要将任意多个 FluxMono 实现相互包装起来。一旦您订阅,就会创建一个 Subscriber 对象链,向后(沿着链向上)到第一个发布者。这对您来说实际上是隐藏的。您所看到的只是 Flux (或 Mono )和 Subscription 的外层,但这些中间特定于运营商的订阅者才是真正工作发生的地方。


有了这些知识,我们就可以仔细研究 publishOnsubscribeOn 运算符:


4.5.1. publishOn 方法


publishOn 的应用方式与任何其他运营商相同,位于订户链的中间。它从上游获取信号并在下游重播它们,同时从关联的 Scheduler 对工作线程执行回调。因此,它会影响后续运算符的执行位置(直到链接另一个 publishOn 为止),如下所示:


  • 将执行上下文更改为 Scheduler 选择的 Thread


  • 根据规范, onNext 调用按顺序发生,因此这会占用单个线程


  • 除非它们在特定的 Scheduler 上工作,否则 publishOn 之后的运算符将继续在同一线程上执行


以下示例使用 publishOn 方法:

Scheduler s = Schedulers.newParallel("parallel-scheduler", 4); (1)

final Flux<String> flux = Flux
    .range(1, 2)
    .map(i -> 10 + i)  (2)
    .publishOn(s)  (3)
    .map(i -> "value " + i);  (4)

new Thread(() -> flux.subscribe(System.out::println));  (5)
1
创建一个由四个 Thread 实例支持的新 Scheduler
2
第一个 map 在 <5> 中的匿名线程上运行。
3
publishOn 将整个序列切换到从 <1> 中选取的 Thread 上。
4
第二个 map 在 <1> 的 Thread 上运行。
5
这个匿名 Thread 是订阅发生的地方。打印发生在最新的执行上下文上,即来自 publishOn 的执行上下文。


4.5.2. subscribeOn 方法


subscribeOn 适用于订阅过程,构建反向链时。通常建议将其放置在数据源之后,因为中间运算符可能会影响执行的上下文。


但是,这不会影响对 publishOn 的后续调用的行为 - 它们仍然会切换其后的链部分的执行上下文。


  • 更改整个运营商链订阅的 Thread


  • Scheduler 中选取一个线程


只有下游链中最近的 subscribeOn 调用才能有效地将订阅和请求信号调度到可以拦截它们的源或操作员( doFirstdoOnRequest )。使用多个 subscribeOn 调用会引入不必要的没有价值的线程切换。


以下示例使用 subscribeOn 方法:

Scheduler s = Schedulers.newParallel("parallel-scheduler", 4); (1)

final Flux<String> flux = Flux
    .range(1, 2)
    .map(i -> 10 + i)  (2)
    .subscribeOn(s)  (3)
    .map(i -> "value " + i);  (4)

new Thread(() -> flux.subscribe(System.out::println));  (5)
1
创建一个由四个 Thread 支持的新 Scheduler
2
第一个 map 在这四个线程之一上运行...​
3
…​因为 subscribeOn 从订阅时间 (<5>) 开始切换整个序列。
4
第二个 map 也在同一线程上运行。
5
这个匿名 Thread 是订阅最初发生的地方,但 subscribeOn 立即将其转移到四个调度程序线程之一。

 4.6.处理错误


要快速查看可用于错误处理的运算符,请参阅相关的运算符决策树。


在响应式流中,错误是终端事件。一旦发生错误,它就会停止序列并沿着运算符链传播到最后一步,即您定义的 Subscriber 及其 onError 方法。


此类错误仍应在应用程序级别处理。例如,您可以在 UI 中显示错误通知或在 REST 端点中发送有意义的错误负载。因此,订阅者的 onError 方法应该始终被定义。


如果未定义, onError 会抛出 UnsupportedOperationException 。您可以使用 Exceptions.isErrorCallbackNotImplemented 方法进一步检测和分类它。


Reactor 还提供了处理链中间错误的替代方法,如错误处理运算符。以下示例展示了如何执行此操作:

Flux.just(1, 2, 0)
    .map(i -> "100 / " + i + " = " + (100 / i)) //this triggers an error with 0
    .onErrorReturn("Divided by zero :("); // error handling example

在了解错误处理运算符之前,您必须记住,反应序列中的任何错误都是终止事件。即使使用错误处理运算符,它也不会让原始序列继续。相反,它将 onError 信号转换为新序列(后备序列)的开始。换句话说,它替换了其上游终止的序列。


现在我们可以一一考虑每种错误处理方法。当相关时,我们与命令式编程的 try 模式进行比较。


4.6.1.错误处理运算符


您可能熟悉在 try-catch 块中处理异常的几种方法。最值得注意的是,这些包括以下内容:


  • 捕获并返回静态默认值。


  • 使用后备方法捕获并执行替代路径。


  • 捕获并动态计算回退值。


  • 捕获,包装到 BusinessException ,然后重新抛出。


  • 捕获、记录特定于错误的消息,然后重新抛出。


  • 使用 finally 块来清理资源或 Java 7“try-with-resource”构造。


所有这些在 Reactor 中都有等价物,以错误处理运算符的形式。在研究这些运算符之前,我们首先要在反应链和 try-catch 块之间建立并行。


订阅时,链末尾的 onError 回调类似于 catch 块。在那里,如果抛出 Exception ,执行会跳到 catch,如以下示例所示:

Flux<String> s = Flux.range(1, 10)
    .map(v -> doSomethingDangerous(v)) (1)
    .map(v -> doSecondTransform(v)); (2)
s.subscribe(value -> System.out.println("RECEIVED " + value), (3)
            error -> System.err.println("CAUGHT " + error) (4)
);
1
执行可能引发异常的转换。
2
如果一切顺利,就会进行第二次转变。
3
每个成功转换的值都会被打印出来。
4
如果出现错误,序列将终止并显示错误消息。


前面的示例在概念上类似于以下 try-catch 块:

try {
    for (int i = 1; i < 11; i++) {
        String v1 = doSomethingDangerous(i); (1)
        String v2 = doSecondTransform(v1); (2)
        System.out.println("RECEIVED " + v2);
    }
} catch (Throwable t) {
    System.err.println("CAUGHT " + t); (3)
}
1
如果这里抛出异常...​
2
...​循环的其余部分被跳过...​
3
...​并且执行直接到这里。


现在我们已经建立了并行,我们可以看看不同的错误处理情况及其等效的运算符。

 静态回退值


相当于“捕获并返回静态默认值”的是 onErrorReturn 。以下示例展示了如何使用它:

try {
  return doSomethingDangerous(10);
}
catch (Throwable error) {
  return "RECOVERED";
}


以下示例显示了 Reactor 等效项:

Flux.just(10)
    .map(this::doSomethingDangerous)
    .onErrorReturn("RECOVERED");


您还可以选择对异常应用 Predicate 来决定是否恢复,如以下示例所示:

Flux.just(10)
    .map(this::doSomethingDangerous)
    .onErrorReturn(e -> e.getMessage().equals("boom10"), "recovered10"); (1)
1
仅当异常消息为 "boom10" 时才恢复

捕获并吞下错误


如果您甚至不想用后备值替换异常,而是忽略它并仅传播到目前为止已生成的元素,那么您想要的本质上是将 onError 信号替换为 < b1> 信号。这可以通过 onErrorComplete 运算符来完成:

Flux.just(10,20,30)
    .map(this::doSomethingDangerousOn30)
    .onErrorComplete(); (1)
1
通过将 onError 转换为 onComplete 来恢复


onErrorReturn 一样, onErrorComplete 具有变体,可让您根据异常的类或 Predicate 过滤要依赖的异常。

 回退方法


如果您想要多个默认值并且您有另一种(更安全)的数据处理方式,则可以使用 onErrorResume 。这相当于“使用后备方法捕获并执行替代路径”。


例如,如果您的名义进程正在从外部且不可靠的服务获取数据,但您还保留了相同数据的本地缓存,该数据可能有点过时但更可靠,您可以执行以下操作:

String v1;
try {
  v1 = callExternalService("key1");
}
catch (Throwable error) {
  v1 = getFromCache("key1");
}

String v2;
try {
  v2 = callExternalService("key2");
}
catch (Throwable error) {
  v2 = getFromCache("key2");
}


以下示例显示了 Reactor 等效项:

Flux.just("key1", "key2")
    .flatMap(k -> callExternalService(k) (1)
        .onErrorResume(e -> getFromCache(k)) (2)
    );
1
对于每个键,异步调用外部服务。
2
如果外部服务调用失败,则回退到该密钥的缓存。请注意,无论源错误 e 是什么,我们始终应用相同的后备。


onErrorReturn 一样, onErrorResume 具有变体,可让您根据异常的类或 Predicate 过滤要依赖的异常。事实上,它需要 Function ,您还可以根据遇到的错误选择不同的后备序列进行切换。以下示例展示了如何执行此操作:

Flux.just("timeout1", "unknown", "key2")
    .flatMap(k -> callExternalService(k)
        .onErrorResume(error -> { (1)
            if (error instanceof TimeoutException) (2)
                return getFromCache(k);
            else if (error instanceof UnknownKeyException)  (3)
                return registerNewEntry(k, "DEFAULT");
            else
                return Flux.error(error); (4)
        })
    );
1
该功能允许动态选择如何继续。
2
如果源超时,则命中本地缓存。
3
如果消息来源说密钥未知,请创建一个新条目。
4
在所有其他情况下,“重新抛出”。
 动态回退值


即使您没有替代(更安全)的数据处理方式,您也可能希望从收到的异常中计算回退值。这相当于“捕获并动态计算回退值”。


例如,如果您的返回类型( MyWrapper )有一个专门用于保存异常的变体(例如 Future.complete(T success)Future.completeExceptionally(Throwable error) ),您可以实例化错误保存变体并传递异常。


一个命令式的例子如下所示:

try {
  Value v = erroringMethod();
  return MyWrapper.fromValue(v);
}
catch (Throwable error) {
  return MyWrapper.fromError(error);
}


您可以通过使用 onErrorResume 以及一点样板文件,以与后备方法解决方案相同的方式被动地执行此操作,如下所示:

erroringFlux.onErrorResume(error -> Mono.just( (1)
        MyWrapper.fromError(error) (2)
));
1
由于您期望错误的 MyWrapper 表示,因此您需要获取 onErrorResumeMono<MyWrapper> 。我们为此使用 Mono.just()
2
我们需要计算异常的值。在这里,我们通过使用相关的 MyWrapper 工厂方法包装异常来实现这一点。
 捕获并重新抛出


在命令式世界中,“捕获、包装到 BusinessException 并重新抛出”如下所示:

try {
  return callExternalService(k);
}
catch (Throwable error) {
  throw new BusinessException("oops, SLA exceeded", error);
}


在“后备方法”示例中, flatMap 内的最后一行提示我们以反应方式实现相同的目标,如下所示:

Flux.just("timeout1")
    .flatMap(k -> callExternalService(k))
    .onErrorResume(original -> Flux.error(
            new BusinessException("oops, SLA exceeded", original))
    );


但是,有一种更直接的方法可以使用 onErrorMap 实现相同的效果:

Flux.just("timeout1")
    .flatMap(k -> callExternalService(k))
    .onErrorMap(original -> new BusinessException("oops, SLA exceeded", original));

侧面记录或反应


如果您希望错误继续传播,但仍希望在不修改序列的情况下对其做出反应(例如记录它),则可以使用 doOnError 运算符。这相当于“捕获、记录特定于错误的消息,然后重新抛出”模式,如以下示例所示:

try {
  return callExternalService(k);
}
catch (RuntimeException error) {
  //make a record of the error
  log("uh oh, falling back, service failed for key " + k);
  throw error;
}


doOnError 运算符以及所有以 doOn 为前缀的运算符有时被称为具有“副作用”。它们让您可以查看序列事件的内部情况,而无需修改它们。


与前面显示的命令式示例类似,以下示例仍然传播错误,但确保我们至少记录外部服务发生故障:

LongAdder failureStat = new LongAdder();
Flux<String> flux =
Flux.just("unknown")
    .flatMap(k -> callExternalService(k) (1)
        .doOnError(e -> {
            failureStat.increment();
            log("uh oh, falling back, service failed for key " + k); (2)
        })
        (3)
    );
1
可能失败的外部服务调用...​
2
...​带有日志记录和统计副作用......​
3
…​之后,它仍然会因错误而终止,除非我们在这里使用错误恢复运算符。


我们还可以想象我们有统计计数器来增加第二个错误副作用。


使用资源和 Final 块


使用命令式编程绘制的最后一个相似之处是清理,可以通过使用“使用 finally 块来清理资源”或使用“Java 7 try-with-resource 构造”来完成”,均如下所示:


示例 14.finally 的命令式使用
Stats stats = new Stats();
stats.startTimer();
try {
  doSomethingDangerous();
}
finally {
  stats.stopTimerAndRecordTiming();
}

示例 15. try-with-resource 的命令式使用
try (SomeAutoCloseable disposableInstance = new SomeAutoCloseable()) {
  return disposableInstance.toString();
}


两者都有其 Reactor 等效项: doFinallyusing


doFinally 是关于您希望在序列终止(使用 onCompleteonError )或取消时执行的副作用。它会提示您哪种终止会触发副作用。以下示例展示了如何使用 doFinally


最终反应: doFinally()
Stats stats = new Stats();
LongAdder statsCancel = new LongAdder();

Flux<String> flux =
Flux.just("foo", "bar")
    .doOnSubscribe(s -> stats.startTimer())
    .doFinally(type -> { (1)
        stats.stopTimerAndRecordTiming();(2)
        if (type == SignalType.CANCEL) (3)
          statsCancel.increment();
    })
    .take(1); (4)
1
doFinally 使用 SignalType 作为终止类型。
2
finally 块类似,我们总是记录时间。
3
在这里,我们还仅在取消的情况下增加统计数据。
4
take(1) 从上游请求正好 1,并在发出一项后取消。


另一方面, using 处理 Flux 从资源派生的情况,并且每当处理完成时都必须对该资源进行操作。在下面的示例中,我们将“try-with-resource”的 AutoCloseable 接口替换为 Disposable


示例 16. 一次性资源
AtomicBoolean isDisposed = new AtomicBoolean();
Disposable disposableInstance = new Disposable() {
    @Override
    public void dispose() {
        isDisposed.set(true); (4)
    }

    @Override
    public String toString() {
        return "DISPOSABLE";
    }
};


现在我们可以对其进行相当于“try-with-resource”的响应式操作,如下所示:


示例 17. 反应式 try-with-resource: using()
Flux<String> flux =
Flux.using(
        () -> disposableInstance, (1)
        disposable -> Flux.just(disposable.toString()), (2)
        Disposable::dispose (3)
);
1
第一个 lambda 生成资源。在这里,我们返回模拟 Disposable
2
第二个 lambda 处理资源,返回 Flux<T>
3
当 <2> 中的 Flux 终止或取消时,将调用第三个 lambda,以清理资源。
4
订阅并执行序列后, isDisposed 原子布尔值变为 true

演示 onError 的终端方面


为了证明所有这些运算符在发生错误时都会导致上游原始序列终止,我们可以使用一个更直观的示例 Flux.intervalinterval 运算符每 x 个时间单位标记一次, Long 值逐渐增加。以下示例使用 interval 运算符:

Flux<String> flux =
Flux.interval(Duration.ofMillis(250))
    .map(input -> {
        if (input < 3) return "tick " + input;
        throw new RuntimeException("boom");
    })
    .onErrorReturn("Uh oh");

flux.subscribe(System.out::println);
Thread.sleep(2100); (1)
1
请注意,默认情况下 interval 在计时器 Scheduler 上执行。如果我们想在主类中运行该示例,则需要在此处添加 sleep 调用,以便应用程序不会在没有产生任何值的情况下立即退出。


上例每250ms打印一行,如下:

tick 0
tick 1
tick 2
Uh oh


即使多运行一秒, interval 也不会再有任何蜱虫进来。该序列确实因错误而终止。

 重试


关于错误处理还有另一个令人感兴趣的运算符,您可能会想在上一节中描述的情况下使用它。 retry ,正如其名称所示,允许您重试产生错误的序列。


需要记住的是,它是通过重新订阅上游 Flux 来工作的。这确实是一个不同的序列,并且原始序列仍然终止。为了验证这一点,我们可以重新使用前面的示例并附加 retry(1) 来重试一次,而不是使用 onErrorReturn 。以下示例展示了如何执行此操作:

Flux.interval(Duration.ofMillis(250))
    .map(input -> {
        if (input < 3) return "tick " + input;
        throw new RuntimeException("boom");
    })
    .retry(1)
    .elapsed() (1)
    .subscribe(System.out::println, System.err::println); (2)

Thread.sleep(2100); (3)
1
elapsed 将每个值与自发出前一个值以来的持续时间相关联。
2
我们还想看看何时存在 onError
3
确保我们有足够的时间进行 4x2 动作。


前面的示例产生以下输出:

259,tick 0
249,tick 1
251,tick 2
506,tick 0 (1)
248,tick 1
253,tick 2
java.lang.RuntimeException: boom
1
新的 interval 从刻度 0 开始。额外的 250 毫秒持续时间来自第四个刻度,即导致异常和后续重试的刻度。


从上面的例子可以看出, retry(1) 只是重新订阅了原来的 interval 一次,从0重新开始tick。第二次,由于仍然出现异常,所以放弃并向下游传播错误。


有一个更高级的 retry 版本(称为 retryWhen ),它使用“同伴” Flux 来判断特定故障是否应该重试。这个同伴 Flux 由操作员创建,但由用户修饰,以便自定义重试条件。


伴随 Flux 是一个 Flux<RetrySignal> ,它被传递给 Retry 策略/函数,作为 retryWhen 的唯一参数提供。作为用户,您定义该函数并使其返回新的 Publisher<?>Retry 类是一个抽象类,但如果您想使用简单的 lambda ( Retry.from(Function) ) 转换伴生类,它提供了一个工厂方法。


重试周期如下:


  1. 每次发生错误(可能会重试)时,都会将 RetrySignal 发送到已由您的函数修饰的同伴 Flux 中。这里有 Flux 可以让您鸟瞰迄今为止的所有尝试。 RetrySignal 允许访问错误及其周围的元数据。


  2. 如果同伴 Flux 发出一个值,则会发生重试。


  3. 如果伴随 Flux 完成,则错误被吞掉,重试周期停止,并且生成的序列也完成。


  4. 如果伴随 Flux 产生错误 ( e ),则重试周期将停止,并且生成的序列错误 e


前两种情况的区别很重要。简单地完成同伴就可以有效地消除错误。考虑使用 retryWhen 模拟 retry(3) 的以下方法:

Flux<String> flux = Flux
    .<String>error(new IllegalArgumentException()) (1)
    .doOnError(System.out::println) (2)
    .retryWhen(Retry.from(companion -> (3)
        companion.take(3))); (4)
1
这会不断产生错误,需要重试。
2
doOnError 在重试之前,我们可以记录并查看所有失败。
3
Retry 改编自一个非常简单的 Function lambda
4
在这里,我们认为前三个错误是可以重试的 ( take(3) ),然后放弃。


实际上,前面的示例会生成空的 Flux ,但它成功完成。由于同一个 Flux 上的 retry(3) 会因最新错误而终止,因此这个 retryWhen 示例与 retry(3) 并不完全相同。


达到相同的行为还需要一些额外的技巧:

AtomicInteger errorCount = new AtomicInteger();
Flux<String> flux =
		Flux.<String>error(new IllegalArgumentException())
				.doOnError(e -> errorCount.incrementAndGet())
				.retryWhen(Retry.from(companion -> (1)
						companion.map(rs -> { (2)
							if (rs.totalRetries() < 3) return rs.totalRetries(); (3)
							else throw Exceptions.propagate(rs.failure()); (4)
						})
				));
1
我们通过改编 Function lambda 来自定义 Retry ,而不是提供具体的类
2
同伴发出 RetrySignal 对象,该对象包含迄今为止的重试次数和最后一次失败的次数
3
为了允许三次重试,我们考虑索引 < 3 并返回一个要发出的值(这里我们只是返回索引)。
4
为了终止错误的序列,我们在这三次重试之后抛出原始异常。

人们可以使用 Retry 中公开的构建器以更流畅的方式以及更精细调整的重试策略来实现相同的目的。例如: errorFlux.retryWhen(Retry.max(3));

您可以使用类似的代码来实现“指数退避和重试”模式,如常见问题解答中所示。


核心提供的 Retry 帮助器 RetrySpecRetryBackoffSpec 都允许高级自定义,例如:


  • 设置 filter(Predicate) 为可以触发重试的异常


  • 通过 modifyErrorFilter(Function) 修改先前设置的过滤器


  • 触发副作用,例如围绕重试触发器进行记录(即延迟之前和之后的退避),前提是重试已验证( doBeforeRetry()doAfterRetry() 是累加的)


  • 围绕重试触发器触发异步 Mono<Void> ,这允许在基本延迟之上添加异步行为,从而进一步延迟触发器( doBeforeRetryAsyncdoAfterRetryAsync 是添加剂)


  • 通过 onRetryExhaustedThrow(BiFunction) 自定义达到最大尝试次数时的异常。默认使用 Exceptions.retryExhausted(…​) ,可以与 Exceptions.isRetryExhausted(Throwable) 区分


  • 激活瞬态错误的处理(见下文)


重试并出现暂时性错误


一些长期存在的源可能会出现零星的错误爆发,然后是一段较长的时间,在此期间一切都运行顺利。本文档将这种错误模式称为瞬态错误。


在这种情况下,最好单独处理每个突发,以便下一个突发不会继承前一个突发的重试状态。例如,使用指数退避策略,每个后续突发都应该从最小退避 Duration 开始延迟重试尝试,而不是不断增长的退避。


表示 retryWhen 状态的 RetrySignal 接口有一个可用于此目的的 totalRetriesInARow() 值。与通常单调递增的 totalRetries() 索引不同,每次重试恢复错误时(即,当重试尝试导致传入 onNext 再次代替 onError )。


RetrySpecRetryBackoffSpec 中的 transientErrors(boolean) 配置参数设置为 true 时,生成的策略将使用该 totalRetriesInARow()

AtomicInteger errorCount = new AtomicInteger(); (1)
Flux<Integer> transientFlux = httpRequest.get() (2)
        .doOnError(e -> errorCount.incrementAndGet());

transientFlux.retryWhen(Retry.max(2).transientErrors(true))  (3)
             .blockLast();
assertThat(errorCount).hasValue(6); (4)
1
为了便于说明,我们将计算重试序列中的错误数量。
2
我们假设一个http请求源,例如。流端点有时会连续失败两次,然后恢复。
3
我们在该源上使用 retryWhen ,配置为最多 2 次重试尝试,但处于 transientErrors 模式。
4
最后,在 6 尝试在 errorCount 中注册后,获得有效响应,并且 transientFlux 成功完成。


如果没有 transientErrors(true) ,第二个突发将超过 2 配置的最大尝试,整个序列最终将失败。


如果您想在没有实际 http 远程端点的情况下在本地尝试此操作,可以将伪 httpRequest 方法实现为 Supplier ,如下所示:

final AtomicInteger transientHelper = new AtomicInteger();
Supplier<Flux<Integer>> httpRequest = () ->
    Flux.generate(sink -> { (1)
        int i = transientHelper.getAndIncrement();
        if (i == 10) { (2)
            sink.next(i);
            sink.complete();
        }
        else if (i % 3 == 0) { (3)
            sink.next(i);
        }
        else {
            sink.error(new IllegalStateException("Transient error at " + i)); (4)
        }
    });
1
我们 generate 一个有大量错误的来源。
2
当计数器达到 10 时,它将成功完成。
3
如果 transientHelper 原子是 3 的倍数,我们会发出 onNext ,从而结束当前的突发。
4
在其他情况下,我们会发出 onError 。这是 3 次中的 2 次,因此 2 个 onError 的突发被 1 个 onNext 中断。


4.6.2.处理运算符或函数中的异常


一般来说,所有运算符本身都可以包含可能触发异常的代码或调用可能同样失败的用户定义回调,因此它们都包含某种形式的错误处理。


根据经验,未经检查的异常始终通过 onError 传播。例如,在 map 函数内抛出 RuntimeException 会转换为 onError 事件,如以下代码所示:

Flux.just("foo")
    .map(s -> { throw new IllegalArgumentException(s); })
    .subscribe(v -> System.out.println("GOT VALUE"),
               e -> System.out.println("ERROR: " + e));


前面的代码打印出以下内容:

ERROR: java.lang.IllegalArgumentException: foo

您可以在将 Exception 传递到 onError 之前通过使用挂钩对其进行调整。


然而,Reactor 定义了一组始终被视为致命的异常(例如 OutOfMemoryError )。请参阅 Exceptions.throwIfFatal 方法。这些错误意味着 Reactor 无法继续运行,并且会被抛出而不是传播。


在内部,还存在未经检查的异常仍然无法传播的情况(最明显的是在订阅和请求阶段),因为并发竞争可能导致双重 onErroronComplete 条件。当这些竞争发生时,无法传播的错误将被“丢弃”。这些情况仍然可以通过使用可定制的钩子在一定程度上进行管理。请参阅“脱钩”。


您可能会问:“检查异常怎么样?”


例如,如果您需要调用一些将其声明为 throws 异常的方法,那么您仍然必须在 try-catch 块中处理这些异常。不过,您有多种选择:


  1. 捕获异常并从中恢复。该序列正常继续。


  2. 捕获异常,将其包装成未经检查的异常,然后抛出它(中断序列)。 Exceptions 实用程序类可以帮助您完成此任务(我们接下来会介绍)。


  3. 如果您需要返回 Flux (例如,您位于 flatMap 中),请将异常包装在产生错误的 Flux 中,如下所示: < b3>。 (该序列也终止。)


Reactor 有一个 Exceptions 实用程序类,您可以使用它来确保仅当异常是已检查异常时才包装异常:


  • 如有必要,请使用 Exceptions.propagate 方法来包装异常。它还首先调用 throwIfFatal 并且不包装 RuntimeException


  • 使用 Exceptions.unwrap 方法获取原始的未包装异常(回到特定于反应器的异常层次结构的根本原因)。


考虑以下 map 示例,该示例使用可以抛出 IOException 的转换方法:

public String convert(int i) throws IOException {
    if (i > 3) {
        throw new IOException("boom " + i);
    }
    return "OK " + i;
}


现在假设您想在 map 中使用该方法。您现在必须显式捕获异常,并且您的映射函数无法重新抛出该异常。因此,您可以将其作为 RuntimeException 传播到地图的 onError 方法,如下所示:

Flux<String> converted = Flux
    .range(1, 10)
    .map(i -> {
        try { return convert(i); }
        catch (IOException e) { throw Exceptions.propagate(e); }
    });


稍后,当订阅前面的 Flux 并对错误做出反应(例如在 UI 中)时,如果您想对 IOException 执行一些特殊操作,则可以恢复到原始异常。以下示例展示了如何执行此操作:

converted.subscribe(
    v -> System.out.println("RECEIVED: " + v),
    e -> {
        if (Exceptions.unwrap(e) instanceof IOException) {
            System.out.println("Something bad happened with I/O");
        } else {
            System.out.println("Something bad happened");
        }
    }
);

 4.7.水槽


在 Reactor 中,接收器是一个类,它允许以独立的方式安全地手动触发信号,创建一个类似 Publisher 的结构,能够处理多个 Subscriberunicast() 口味)。


3.5.0 之前,还有一组 Processor 实现已被逐步淘汰。


4.7.1.使用 Sinks.OneSinks.Many 从多个线程安全地生成


reactor-core 公开的默认风格 Sinks 确保检测到多线程使用,并且从下游订阅者的角度来看,不会导致规范违规或未定义的行为。使用 tryEmit* API 时,并行调用很快就会失败。使用 emit* API 时,提供的 EmissionFailureHandler 可能允许重试争用(例如,忙循环),否则接收器将因错误而终止。


这是对 Processor.onNext 的改进,后者必须在外部同步,否则从下游订阅者的角度来看会导致未定义的行为。


处理器是一种特殊的 Publisher ,也是 Subscriber 。它们最初的目的是作为中间步骤的可能表示,然后可以在反应流实现之间共享。然而,在 Reactor 中,此类步骤由 Publisher 运算符表示。


第一次遇到 Processor 时的一个常见错误是直接调用公开的 onNextonCompleteonError 方法从 Subscriber 界面。


此类手动调用应谨慎进行,特别是关于响应式流规范的调用外部同步。处理器实际上可能有点用,除非遇到基于反应流的 API,它需要传递 Subscriber ,而不是公开 Publisher


水槽通常是更好的选择。


Sinks 构建器为主要支持的生产者类型提供指导 API。您将识别 Flux 中发现的一些行为,例如 onBackpressureBuffer

Sinks.Many<Integer> replaySink = Sinks.many().replay().all();


多个生产者线程可以通过执行以下操作同时在接收器上生成数据:

//thread1
replaySink.emitNext(1, EmitFailureHandler.FAIL_FAST);

//thread2, later
replaySink.emitNext(2, EmitFailureHandler.FAIL_FAST);

//thread3, concurrently with thread 2
//would retry emitting for 2 seconds and fail with EmissionException if unsuccessful
replaySink.emitNext(3, EmitFailureHandler.busyLooping(Duration.ofSeconds(2)));

//thread3, concurrently with thread 2
//would return FAIL_NON_SERIALIZED
EmitResult result = replaySink.tryEmitNext(4);


使用 busyLooping 时,请注意 EmitFailureHandler 返回的实例不能重复使用,例如,每个 emitNext


Sinks.Many 可以作为 Flux 呈现给下游消费者,如下例所示:

Flux<Integer> fluxView = replaySink.asFlux();
fluxView
	.takeWhile(i -> i < 10)
	.log()
	.blockLast();


类似地, Sinks.EmptySinks.One 风格可以被视为具有 asMono() 方法的 Mono


Sinks 类别是:


  1. many().multicast() :一个接收器,仅将新推送的数据传输给其订阅者,尊重他们的背压(新推送的数据如“订阅者订阅后”)。


  2. many().unicast() :与上面相同,但在第一个订阅者寄存器之前推送的数据被缓冲。


  3. many().replay() :一个接收器,它将向新订阅者重播指定历史大小的推送数据,然后继续实时推送新数据。


  4. one() :将向其订阅者播放单个元素的接收器


  5. empty() :一个接收器,仅向其订阅者播放终端信号(错误或完成),但仍然可以被视为 Mono<T> (注意通用类型 <T>


4.7.2.可用接收器概述

Sinks.many().unicast().onBackpressureBuffer(args?)


单播 Sinks.Many 可以通过使用内部缓冲区来处理背压。权衡是它最多可以有一个 Subscriber


基本单播接收器是通过 Sinks.many().unicast().onBackpressureBuffer() 创建的。但是 Sinks.many().unicast() 中还有一些额外的 unicast 静态工厂方法,可以进行更精细的调整。


例如,默认情况下,它是无界的:如果您通过它推送任意数量的数据,而它的 Subscriber 尚未请求数据,它会缓冲所有数据。您可以通过在 Sinks.many().unicast().onBackpressureBuffer(Queue) 工厂方法中为内部缓冲提供自定义 Queue 实现来更改此设置。如果该队列是有界的,则当缓冲区已满并且没有收到来自下游的足够请求时,接收器可能会拒绝推送值。

Sinks.many().multicast().onBackpressureBuffer(args?)


多播 Sinks.Many 可以向多个订阅者发送消息,同时为其每个订阅者提供背压。订阅者在订阅后仅接收通过接收器推送的信号。


基本多播接收器是通过 Sinks.many().multicast().onBackpressureBuffer() 创建的。


默认情况下,如果所有订阅者都被取消(这基本上意味着它们都已取消订阅),它将清除其内部缓冲区并停止接受新订阅者。您可以通过使用 Sinks.many().multicast() 下的 multicast 静态工厂方法中的 autoCancel 参数来调整它。

Sinks.many().multicast().directAllOrNothing()


具有简单化背压处理的多播 Sinks.Many :如果任何订阅者太慢(需求为零),则所有订阅者的 onNext 都会被丢弃。


然而,慢速订阅者不会被终止,一旦慢速订阅者再次开始请求,所有订阅者都将恢复接收从那里推送的元素。


一旦 Sinks.Many 终止(通常通过调用其 emitError(Throwable)emitComplete() 方法),它会让更多订阅者订阅,但立即向他们重播终止信号。

Sinks.many().multicast().directBestEffort()


尽力处理背压的多播 Sinks.Many :如果订阅者太慢(需求为零),则仅针对该慢速订阅者丢弃 onNext


然而,慢速订阅者不会被终止,一旦他们再次开始请求,他们将恢复接收新推送的元素。


一旦 Sinks.Many 终止(通常通过调用其 emitError(Throwable)emitComplete() 方法),它会让更多订阅者订阅,但立即向他们重播终止信号。

Sinks.many().replay()


重播 Sinks.Many 缓存发出的元素并将其重播给迟到的订阅者。


它可以以多种配置创建:


  • 缓存有限历史记录 ( Sinks.many().replay().limit(int) ) 或无限历史记录 ( Sinks.many().replay().all() )。


  • 缓存基于时间的重播窗口 ( Sinks.many().replay().limit(Duration) )。


  • 缓存历史记录大小和时间窗口的组合 ( Sinks.many().replay().limit(int, Duration) )。


用于微调上述内容的其他重载也可以在 Sinks.many().replay() 下找到,以及允许缓存单个元素的变体( latest()latestOrDefault(T) ) 。

Sinks.unsafe().many()


高级用户和操作员构建者可能会考虑使用 Sinks.unsafe().many() ,它将提供相同的 Sinks.Many 工厂,而无需额外的生产者线程安全性。因此,每个接收器的开销会更少,因为线程安全接收器必须检测多线程访问。


库开发人员不应公开不安全的接收器,但可以在受控的调用​​环境中内部使用它们,在该环境中他们可以确保导致 onNextonCompleteonError

 汇.one()


该方法直接构造一个简单的 Sinks.One<T> 实例。 Sinks 的这种风格可以作为 Mono 查看(通过其 asMono() 视图方法),并且具有稍微不同的 emit 方法来更好地传达这种类似 Mono 的语义:


  • emitValue(T value) 生成 onNext(value) 信号,并且在大多数实现中 - 还将触发隐式 onComplete()


  • emitEmpty() 生成一个隔离的 onComplete() 信号,旨在生成空 Mono 的等效信号


  • emitError(Throwable t) 生成 onError(t) 信号


Sinks.one() 接受对这些方法中任一方法的一次调用,有效地生成一个 Mono ,该 Mono 要么以一个值完成,要么以空完成或失败。

 水槽.empty()


该方法直接构造一个简单的 Sinks.Empty<T> 实例。 Sinks 的这种风格类似于 Sinks.One<T> ,只是它不提供 emitValue 方法。


结果,它只能生成一个完成为空或失败的 Mono


尽管无法触发 onNext ,接收器仍然使用通用 <T> 进行类型化,因为它允许轻松组合和包含在需要特定类型的运算符链中。


建议编辑“Reactor核心特性”

 5.Kotlin支持


Kotlin 是一种针对 JVM(和其他平台)的静态类型语言,它允许编写简洁而优雅的代码,同时提供与 Java 编写的现有库非常好的互操作性。


本节介绍 Reactor 对 Kotlin 的支持。

 5.1.要求


Reactor 支持 Kotlin 1.1+ 并需要 kotlin-stdlib (或其 kotlin-stdlib-jdk7kotlin-stdlib-jdk8 变体之一)。

 5.2.扩展


Dysprosium-M1 (即 reactor-core 3.3.0.M1 )开始,Kotlin 扩展已移至专用的 reactor-kotlin-extensions 模块,其新包名称以 reactor.kotlin 开头而不是简单的 reactor


因此, reactor-core 模块中的 Kotlin 扩展已被弃用。新依赖项的groupId和artifactId是:

io.projectreactor.kotlin:reactor-kotlin-extensions


得益于其出色的 Java 互操作性和 Kotlin 扩展,Reactor Kotlin API 利用常规 Java API,并通过 Reactor 工件中开箱即用的一些特定于 Kotlin 的 API 进行了增强。


请记住,需要导入 Kotlin 扩展才能使用。例如,这意味着仅当导入 import reactor.kotlin.core.publisher.toFluxThrowable.toFlux Kotlin 扩展才可用。也就是说,与静态导入类似,IDE 在大多数情况下应该自动建议导入。


例如,Kotlin 具体化类型参数为 JVM 泛型类型擦除提供了一种解决方法,而 Reactor 提供了一些扩展来利用此功能。


下表将使用 Java 的 Reactor 与使用 Kotlin 和扩展的 Reactor 进行了比较:

 爪哇

 带扩展的 Kotlin

Mono.just("foo")

"foo".toMono()

Flux.fromIterable(list)

list.toFlux()

Mono.error(new RuntimeException())

RuntimeException().toMono()

Flux.error(new RuntimeException())

RuntimeException().toFlux()

flux.ofType(Foo.class)

  flux.ofType<Foo>()flux.ofType(Foo::class)

StepVerifier.create(flux).verifyComplete()

flux.test().verifyComplete()


Reactor KDoc API 列出并记录了所有可用的 Kotlin 扩展。

 5.3.空安全


Kotlin 的关键功能之一是空安全,它在编译时干净地处理 null 值,而不是在运行时遇到著名的 NullPointerException 。这通过可空性声明和表达“值或无值”语义使应用程序更安全,而无需支付 Optional 等包装器的成本。 (Kotlin 允许使用具有可为 null 值的函数构造。请参阅此 Kotlin null 安全性综合指南。)


尽管 Java 不允许在其类型系统中表达 null 安全性,但 Reactor 现在通过 reactor.util.annotation 包中声明的工具友好注释提供整个 Reactor API 的 null 安全性。默认情况下,Kotlin 中使用的 Java API 中的类型被识别为放宽空检查的平台类型。 Kotlin 对 JSR 305 注释和 Reactor 可空性注释的支持为 Kotlin 开发人员提供了整个 Reactor API 的空安全性,并具有在编译时处理 null 相关问题的优点。


您可以通过添加带有以下选项的 -Xjsr305 编译器标志来配置 JSR 305 检查: -Xjsr305={strict|warn|ignore}


对于 kotlin 版本 1.1.50+,默认行为与 -Xjsr305=warn 相同。 strict 值需要考虑 Reactor API 完全空安全性,但应被视为实验性的,因为 Reactor API 空性声明甚至可能在次要版本之间演变,因为可能会在未来)。


尚不支持泛型类型参数、变量参数和数组元素的可为空性,但应该会在即将发布的版本中提供支持。请参阅此讨论以获取最新信息。


建议编辑“Kotlin 支持”

 6. 测试


无论您编写了简单的 Reactor 操作符链还是您自己的操作符,自动化测试始终是一个好主意。


Reactor 附带了一些专用于测试的元素,这些元素聚集到它们自己的工件中: reactor-test 。您可以在 Github 上的 reactor-core 存储库内找到该项目。


要在测试中使用它,您必须将其添加为测试依赖项。以下示例显示如何在 Maven 中添加 reactor-test 作为依赖项:


示例 18.Maven 中的 Reactor-test,位于 <dependencies>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <scope>test</scope>
    (1)
</dependency>
1
如果您使用 BOM,则无需指定 <version>


以下示例展示了如何在 Gradle 中添加 reactor-test 作为依赖项:


示例 19. Gradle 中的 Reactor-test,修改 dependencies
dependencies {
   testCompile 'io.projectreactor:reactor-test'
}


reactor-test 的三个主要用途如下:


  • 使用 StepVerifier 逐步测试序列是否遵循给定场景。


  • 使用 TestPublisher 生成数据以测试下游运算符(包括您自己的运算符)的行为。


  • 在可以经历多个替代 Publisher 的序列中(例如,使用 switchIfEmpty 的链,探测这样的 Publisher 以确保它被使用(即,已订阅)。


6.1.使用 StepVerifier 测试场景


测试 Reactor 序列的最常见情况是在代码中定义 FluxMono (例如,它可能由方法返回)并想要测试订阅时它的行为方式。


这种情况很好地转化为定义“测试场景”,您可以在其中根据事件逐步定义您的期望。您可以提出并回答以下问题:


  • 下一个预期事件是什么?


  • 您希望 Flux 发出特定值吗?


  • 或者在接下来的 300 毫秒内什么都不做?


您可以通过 StepVerifier API 表达所有这些。


例如,您可以在代码库中使用以下实用方法来装饰 Flux

public <T> Flux<T> appendBoomError(Flux<T> source) {
  return source.concatWith(Mono.error(new IllegalArgumentException("boom")));
}


为了测试它,您需要验证以下场景:


我希望这个 Flux 首先发出 thing1 ,然后发出 thing2 ,然后产生一条错误消息 boom 。订阅并验证这些期望。


StepVerifier API 中,这转化为以下测试:

@Test
public void testAppendBoomError() {
  Flux<String> source = Flux.just("thing1", "thing2"); (1)

  StepVerifier.create( (2)
    appendBoomError(source)) (3)
    .expectNext("thing1") (4)
    .expectNext("thing2")
    .expectErrorMessage("boom") (5)
    .verify(); (6)
}
1
由于我们的方法需要一个源 Flux ,因此定义一个简单的源用于测试目的。
2
创建一个包装并验证 FluxStepVerifier 构建器。
3
传递要测试的 Flux (调用我们的实用程序方法的结果)。
4
我们期望在订阅时发生的第一个信号是 onNext ,其值为 thing1
5
我们期望发生的最后一个信号是用 onError 终止序列。异常应该有 boom 作为消息。
6
通过调用 verify() 触发测试非常重要。


API 是一个构建器。首先创建 StepVerifier 并传递要测试的序列。这提供了多种方法供您选择:


  • 表达对下一个出现信号的期望。如果收到任何其他信号(或者信号的内容与预期不符),则整个测试将失败并返回有意义的 AssertionError 。例如,您可以使用 expectNext(T…​)expectNextCount(long)


  • 消耗下一个信号。当您想要跳过部分序列或想要对信号内容应用自定义 assertion 时(例如,检查是否存在 onNext 事件并断言发出的项目是大小为 5 的列表)。例如,您可以使用 consumeNextWith(Consumer<T>)


  • 采取各种操作,例如暂停或运行任意代码。例如,如果您想要操作特定于测试的状态或上下文。为此,您可以使用 thenAwait(Duration)then(Runnable)


对于终端事件,相应的期望方法( expectComplete()expectError() 及其所有变体)切换到您无法再表达期望的API。在最后一步中,您所能做的就是对 StepVerifier 执行一些附加配置,然后通常使用 verify() 或其变体之一触发验证。


此时发生的情况是 StepVerifier 订阅测试的 FluxMono 并播放序列,将每个新信号与场景中的下一步进行比较。只要这些匹配,就认为测试成功。一旦出现差异,就会抛出 AssertionError


请记住 verify() 步骤,它会触发验证。为了提供帮助,该 API 包含一些快捷方法,将终端期望与对 verify() 的调用结合起来: verifyComplete()verifyError()verifyErrorMessage(String) , 和别的。


请注意,如果基于 lambda 的期望之一抛出 AssertionError ,则按原样报告,导致测试失败。这对于自定义断言很有用。


默认情况下, verify() 方法和派生快捷方法( verifyThenAssertThatverifyComplete() 等)没有超时。他们可以无限期地阻止。您可以使用 StepVerifier.setDefaultTimeout(Duration) 为这些方法全局设置超时,或使用 verify(Duration) 在每次调用的基础上指定一个超时。


6.1.1.更好地识别测试失败


StepVerifier 提供了两个选项来更好地识别哪个期望步骤导致测试失败:


  • as(String) :在大多数 expect* 方法之后使用,对前面的期望进行描述。如果期望失败,其错误消息包含描述。终端期望和 verify 不能这样描述。


  • StepVerifierOptions.create().scenarioName(String) :通过使用 StepVerifierOptions 创建 StepVerifier ,您可以使用 scenarioName 方法为整个场景命名,即也用于断言错误消息。


请注意,在这两种情况下,仅保证在生成自己的 AssertionErrorStepVerifier 方法中使用消息中的描述或名称(例如,手动或通过 assertNext 中的断言库不会将描述或名称添加到错误消息中)。

 6.2.操纵时间


您可以将 StepVerifier 与基于时间的运算符一起使用,以避免相应测试的长时间运行。您可以通过 StepVerifier.withVirtualTime 构建器来执行此操作。


它看起来像下面的例子:

StepVerifier.withVirtualTime(() -> Mono.delay(Duration.ofDays(1)))
//... continue expectations here


此虚拟时间功能​​插入 Reactor 的 Schedulers 工厂中的自定义 Scheduler 。由于这些定时运算符通常使用默认的 Schedulers.parallel() 调度程序,因此将其替换为 VirtualTimeScheduler 就可以了。然而,一个重要的先决条件是在虚拟时间调度器被激活之后算子被实例化。


为了增加正确发生这种情况的机会, StepVerifier 不采用简单的 Flux 作为输入。 withVirtualTime 采用 Supplier ,它会指导您在完成调度程序设置后延迟创建测试通量的实例。


请格外小心,确保 Supplier<Publisher<T>> 可以以惰性方式使用。否则,无法保证虚拟时间。特别是避免在测试代码中较早实例化 Flux 并让 Supplier 返回该变量。相反,始终在 lambda 内实例化 Flux


有两种处理时间的期望方法,无论有没有虚拟时间,它们都有效:


  • thenAwait(Duration) :暂停步骤的评估(允许出现一些信号或延迟结束)。


  • expectNoEvent(Duration) :还让序列在给定的持续时间内播放,但如果在此期间出现任何信号,则测试失败。


这两种方法都在经典模式下将线程暂停给定的持续时间,并在虚拟模式下提前虚拟时钟。


expectNoEvent 还将 subscription 视为事件。如果您将其用作第一步,通常会失败,因为检测到订阅信号。请改用 expectSubscription().expectNoEvent(duration)


为了快速评估上面 Mono.delay 的行为,我们可以按如下方式完成代码编写:

StepVerifier.withVirtualTime(() -> Mono.delay(Duration.ofDays(1)))
    .expectSubscription() (1)
    .expectNoEvent(Duration.ofDays(1)) (2)
    .expectNext(0L) (3)
    .verifyComplete(); (4)
1  请参阅前面的提示。
2
预计一整天不会发生任何事情。
3
然后期望发出 0 的延迟。
4
然后期待完成(并触发验证)。


我们可以在上面使用 thenAwait(Duration.ofDays(1)) ,但是 expectNoEvent 的好处是可以保证没有任何事情发生得比它应该发生的更早。


请注意, verify() 返回一个 Duration 值。这是整个测试的实时持续时间。


虚拟时间并不是灵丹妙药。所有 Schedulers 都替换为相同的 VirtualTimeScheduler 。在某些情况下,您可以锁定验证过程,因为在表达期望之前虚拟时钟尚未向前移动,导致期望等待只能通过提前时间产生的数据。在大多数情况下,您需要提前虚拟时钟才能发出序列。对于无限序列,虚拟时间也变得非常有限,这可能会占用运行序列及其验证的线程。


6.3.使用 StepVerifier 执行执行后断言


在描述了场景的最终期望之后,您可以切换到补充断言 API,而不是触发 verify() 。为此,请改用 verifyThenAssertThat()


verifyThenAssertThat() 返回一个 StepVerifier.Assertions 对象,一旦整个场景成功执行,您可以使用它来断言一些状态元素(因为它也调用 verify() ) 。典型的(尽管是高级的)用法是捕获由某些操作符删除的元素并断言它们(请参阅有关 Hooks 的部分)。


6.4.测试 Context


有关 Context 的更多信息,请参阅将上下文添加到响应序列。


StepVerifierContext 的传播有一些期望:


  • expectAccessibleContext :返回一个 ContextExpectations 对象,您可以使用该对象设置对传播的 Context 的期望。请务必调用 then() 以返回序列期望集。


  • expectNoAccessibleContext :设置一个期望,即没有 Context 可以在被测试的运算符链上传播。当被测 Publisher 不是 Reactor 或没有任何可以传播 Context 的运算符(例如,生成器源)时,最有可能发生这种情况。


此外,您可以使用 StepVerifierOptions 创建验证程序,将特定于测试的初始 Context 关联到 StepVerifier


以下代码片段演示了这些功能:

StepVerifier.create(Mono.just(1).map(i -> i + 10),
				StepVerifierOptions.create().withInitialContext(Context.of("thing1", "thing2"))) (1)
		            .expectAccessibleContext() (2)
		            .contains("thing1", "thing2") (3)
		            .then() (4)
		            .expectNext(11)
		            .verifyComplete(); (5)
1
使用 StepVerifierOptions 创建 StepVerifier 并传入初始 Context
2
开始设置关于 Context 传播的期望。仅此一点就可以确保 Context 被传播。
3
Context 特定期望的示例。它必须包含键“thing1”的值“thing2”。
4
我们 then() 切换回对数据设置正常期望。
5
让我们不要忘记 verify() 整套期望。


6.5.使用 TestPublisher 手动发射


对于更高级的测试用例,完全掌握数据源可能会很有用,以触发与您想要测试的特定情况紧密匹配的精心选择的信号。


另一种情况是,当您实现了自己的运算符,并且想要验证它在响应式流规范方面的行为方式时,尤其是在其源代码表现不佳的情况下。


对于这两种情况, reactor-test 提供 TestPublisher 类。这是一个 Publisher<T> ,可让您以编程方式触发各种信号:


  • next(T)next(T, T…​) 触发 1-n 个 onNext 信号。


  • emit(T…​) 触发 1-n 个 onNext 信号并执行 complete()


  • complete()onComplete 信号终止。


  • error(Throwable)onError 信号终止。


您可以通过 create 工厂方法获得行为良好的 TestPublisher 。此外,您还可以使用 createNonCompliant 工厂方法创建行为不当的 TestPublisher 。后者从 TestPublisher.Violation 枚举中获取一个或多个值。这些值定义了发布者可以忽略规范的哪些部分。这些枚举值包括:


  • REQUEST_OVERFLOW :尽管请求不足,仍允许进行 next 调用,而不触发 IllegalStateException


  • ALLOW_NULL :允许使用 null 值进行 next 调用,而不触发 NullPointerException


  • CLEANUP_ON_TERMINATE :允许连续发送多次终止信号。这包括 complete()error()emit()


  • DEFER_CANCELLATION :允许 TestPublisher 忽略取消信号并继续发出信号,就好像取消在与所述信号的竞争中失败了一样。


最后, TestPublisher 跟踪订阅后的内部状态,这可以通过其各种 assert* 方法进行断言。


您可以通过使用转换方法 flux()mono() 将其用作 FluxMono


6.6.使用 PublisherProbe 检查执行路径


在构建复杂的运算符链时,您可能会遇到存在多个可能的执行路径的情况,这些执行路径由不同的子序列具体化。


大多数时候,这些子序列会产生一个足够具体的 onNext 信号,您可以通过查看最终结果来断言它已被执行。


例如,考虑以下方法,该方法从源构建运算符链,并在源为空时使用 switchIfEmpty 回退到特定替代方案:

public Flux<String> processOrFallback(Mono<String> source, Publisher<String> fallback) {
    return source
            .flatMapMany(phrase -> Flux.fromArray(phrase.split("\\s+")))
            .switchIfEmpty(fallback);
}


您可以测试使用了 switchIfEmpty 的哪个逻辑分支,如下所示:

@Test
public void testSplitPathIsUsed() {
    StepVerifier.create(processOrFallback(Mono.just("just a  phrase with    tabs!"),
            Mono.just("EMPTY_PHRASE")))
                .expectNext("just", "a", "phrase", "with", "tabs!")
                .verifyComplete();
}

@Test
public void testEmptyPathIsUsed() {
    StepVerifier.create(processOrFallback(Mono.empty(), Mono.just("EMPTY_PHRASE")))
                .expectNext("EMPTY_PHRASE")
                .verifyComplete();
}


但是,请考虑一个示例,其中该方法会生成 Mono<Void> 。它等待源完成,执行附加任务,然后完成。如果源为空,则必须执行类似于后备 Runnable 的任务。下面的例子展示了这种情况:

private Mono<String> executeCommand(String command) {
    return Mono.just(command + " DONE");
}

public Mono<Void> processOrFallback(Mono<String> commandSource, Mono<Void> doWhenEmpty) {
    return commandSource
            .flatMap(command -> executeCommand(command).then()) (1)
            .switchIfEmpty(doWhenEmpty); (2)
}
1
then() 忘记了命令结果。它只关心它是否已完成。
2
如何区分两种情况都是空序列?


要验证您的 processOrFallback 方法确实经过 doWhenEmpty 路径,您需要编写一些样板文件。也就是说,您需要一个 Mono<Void> 来:


  • 捕获它已被订阅的事实。


  • 让您在整个过程终止后断言该事实。


在版本 3.1 之前,您需要为要断言的每个状态手动维护一个 AtomicBoolean ,并将相应的 doOn* 回调附加到您要评估的发布者。当必须定期应用此模式时,这可能是很多样板文件。幸运的是,3.1.0 引入了 PublisherProbe 的替代方案。以下示例展示了如何使用它:

@Test
public void testCommandEmptyPathIsUsed() {
    PublisherProbe<Void> probe = PublisherProbe.empty(); (1)

    StepVerifier.create(processOrFallback(Mono.empty(), probe.mono())) (2)
                .verifyComplete();

    probe.assertWasSubscribed(); (3)
    probe.assertWasRequested(); (4)
    probe.assertWasNotCancelled(); (5)
}
1
创建一个转换为空序列的探针。
2
通过调用 probe.mono() 使用探针代替 Mono<Void>
3
完成序列后,探针可以让您断言它已被使用。您可以检查是否已订阅...​
4
...以及实际请求的数据...​
5
……以及是否被取消。


您还可以通过调用 .flux() 而不是 .mono() 来使用探针代替 Flux<T> 。对于需要探测执行路径但还需要探测器发出数据的情况,可以使用 PublisherProbe.of(Publisher) 包装任何 Publisher<T>


建议编辑“测试”

 7. 调试Reactor


从命令式和同步编程范式切换到反应式和异步编程范式有时可能会令人望而生畏。学习曲线中最陡峭的步骤之一是如何在出现问题时进行分析和调试。


在命令式世界中,调试通常非常简单。您可以阅读堆栈跟踪并查看问题的根源。这完全是你的代码的失败吗?故障是否发生在某些库代码中?如果是这样,您的代码的哪一部分调用了库,可能传递了最终导致失败的不正确参数?


7.1.典型的 Reactor 堆栈跟踪


当您转向异步代码时,事情会变得更加复杂。


考虑以下堆栈跟踪:


示例 20. 典型的 Reactor 堆栈跟踪
java.lang.IndexOutOfBoundsException: Source emitted more than one item
    at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:445)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:379)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)
    at reactor.core.publisher.FluxRange$RangeSubscription.slowPath(FluxRange.java:154)
    at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:109)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:332)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90)
    at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:68)
    at reactor.core.publisher.FluxMapFuseable.subscribe(FluxMapFuseable.java:63)
    at reactor.core.publisher.FluxFlatMap.subscribe(FluxFlatMap.java:97)
    at reactor.core.publisher.MonoSingle.subscribe(MonoSingle.java:58)
    at reactor.core.publisher.Mono.subscribe(Mono.java:3096)
    at reactor.core.publisher.Mono.subscribeWith(Mono.java:3204)
    at reactor.core.publisher.Mono.subscribe(Mono.java:3090)
    at reactor.core.publisher.Mono.subscribe(Mono.java:3057)
    at reactor.core.publisher.Mono.subscribe(Mono.java:3029)
    at reactor.guide.GuideTests.debuggingCommonStacktrace(GuideTests.java:995)


那里发生了很多事情。我们得到一个 IndexOutOfBoundsException ,它告诉我们一个 source emitted more than one item


我们可能很快就会假设这个源是 Flux 或 Mono,正如下一行提到的 MonoSingle 所证实的那样。因此,这似乎是来自 single 运营商的某种投诉。


参考 Mono#single 运算符的 Javadoc,我们看到 single 有一个约定:源必须恰好发出一个元素。看来我们有一个源发出了多个信号,因此违反了该合同。


我们可以更深入地挖掘并确定其来源吗?下面的行不是很有帮助。它们通过多次调用 subscriberequest 带领我们了解似乎是反应链的内部结构。


通过浏览这些行,我们至少可以开始形成出错的链类型的图片:它似乎涉及 MonoSingleFluxFlatMapFluxRange (每个类在跟踪中都有几行,但总体而言涉及这三个类)。那么可能是 range().flatMap().single() 链?


但是如果我们在应用程序中大量使用该模式怎么办?这仍然没有告诉我们太多信息,仅仅搜索 single 并不能找到问题。最后一行引用了我们的一些代码。终于,我们越来越接近了。


不过,坚持住。当我们转到源文件时,我们看到的是订阅了一个预先存在的 Flux ,如下所示:

toDebug
    .subscribeOn(Schedulers.immediate())
    .subscribe(System.out::println, Throwable::printStackTrace);


所有这些都发生在订阅时,但 Flux 本身并未在那里声明。更糟糕的是,当我们转到声明变量的位置时,我们会看到以下内容:

public Mono<String> toDebug; //please overlook the public class attribute


该变量未在声明的地方实例化。我们必须假设最坏的情况,即我们发现应用程序中可能有几个不同的代码路径来设置它。我们仍然不确定是哪一个导致了这个问题。


这在 Reactor 中相当于运行时错误,而不是编译错误。


我们想要更容易地找到的是运算符被添加到链中的位置 - 即 Flux 被声明的位置。我们通常将其称为 Flux 的“组件”。


7.2.激活调试模式 - 又名回溯


本节介绍启用调试功能的最简单但也是最慢的方法,因为它捕获每个运算符的堆栈跟踪。请参阅 checkpoint() 替代方案以获取更细粒度的调试方式,并参阅生产就绪的全局调试以获取更高级和高性能的全局选项。


尽管堆栈跟踪仍然能够为有一定经验的人传达一些信息,但我们可以看到,在更高级的情况下,它本身并不理想。


幸运的是,Reactor 附带了专为调试而设计的汇编时工具。


这是通过在应用程序启动时(或至少在实例化有问题的 FluxMono 之前)通过 Hooks.onOperatorDebug() 方法激活全局调试模式来完成的,如下所示如下:

Hooks.onOperatorDebug();


这开始通过包装操作符的构造并捕获堆栈跟踪来检测对 Reactor 操作符方法的调用(它们被组装到链中)。由于这是在声明操作符链时完成的,因此应该在此之前激活挂钩,因此最安全的方法是在应用程序启动时激活它。


稍后,如果发生异常,失败的操作员可以引用该捕获并重新处理堆栈跟踪,附加附加信息。


我们将这种捕获的程序集信息(以及通常由 Reactor 添加到异常中的附加信息)称为回溯。


在下一节中,我们将了解堆栈跟踪有何不同以及如何解释新信息。


7.3.在调试模式下读取堆栈跟踪


当我们重用最初的示例但激活 operatorStacktrace 调试功能时,会发生以下情况:


  1. 堆栈跟踪指向订阅站点,因此不太有趣,在第一帧之后被剪切并放在一边。


  2. 一个特殊的抑制异常被添加到原始异常中(或者如果已经存在则进行修改)。


  3. 为该特殊异常构造一条消息,其中包含多个部分。


  4. 第一部分将追溯到发生故障的操作员的组装地点。


  5. 第二部分将尝试显示从该运算符构建的链,并且已经看到错误传播


  6. 最后一部分是原始堆栈跟踪


打印后的完整堆栈跟踪如下:

java.lang.IndexOutOfBoundsException: Source emitted more than one item
    at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:127) (1)
    Suppressed: The stacktrace has been enhanced by Reactor, refer to additional information below: (2)
Assembly trace from producer [reactor.core.publisher.MonoSingle] : (3)
    reactor.core.publisher.Flux.single(Flux.java:7915)
    reactor.guide.GuideTests.scatterAndGather(GuideTests.java:1017)
Error has been observed at the following site(s): (4)
    *_______Flux.single ⇢ at reactor.guide.GuideTests.scatterAndGather(GuideTests.java:1017) (5)
    |_ Mono.subscribeOn ⇢ at reactor.guide.GuideTests.debuggingActivated(GuideTests.java:1071) (6)
Original Stack Trace: (7)
        at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:127)
...
(8)
...
        at reactor.core.publisher.Mono.subscribeWith(Mono.java:4363)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4223)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4159)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4131)
        at reactor.guide.GuideTests.debuggingActivated(GuideTests.java:1067)
1
原始堆栈跟踪被截断为单个帧。
2
这是新的:我们看到捕获堆栈的包装操作符。这是回溯开始出现的地方。
3
首先,我们了解有关操作员组装地点的一些详细信息。
4
其次,我们得到了操作符链的概念,错误通过该操作符链从第一个传播到最后一个(错误站点到订阅站点)。
5
每个看到错误的操作员都会被提及,以及使用它的用户类和行。这里我们有一个“根”。
6
这里我们有链的一个简单部分。
7
堆栈跟踪的其余部分被移到最后......​
8
…​显示了操作员的一些内部结构(因此我们在这里删除了一些片段)。


捕获的堆栈跟踪作为抑制的 OnAssemblyException 附加到原始错误。它分为三个部分,但第一部分是最有趣的。它显示了触发异常的操作符的构建路径。在这里,它表明导致我们问题的 single 实际上是在 scatterAndGather 方法中创建的。


现在我们已经掌握了足够的信息来找到罪魁祸首,我们可以对 scatterAndGather 方法进行有意义的研究:

private Mono<String> scatterAndGather(Flux<String> urls) {
    return urls.flatMap(url -> doRequest(url))
           .single(); (1)
}
1
果然,这是我们的 single


现在我们可以看到错误的根本原因是 flatMap 对几个 URL 执行多次 HTTP 调用,但与 single 链接,这限制性太强。经过简短的 git blame 并与该行的作者进行快速讨论后,我们发现他打算使用限制较少的 take(1) 来代替。


我们已经解决了我们的问题。


现在考虑堆栈跟踪中的以下部分:

Error has been observed at the following site(s):


在此特定示例中,回溯的第二部分不一定有趣,因为错误实际上发生在链中的最后一个运算符(最接近 subscribe 的运算符)中。考虑另一个例子可能会更清楚:

FakeRepository.findAllUserByName(Flux.just("pedro", "simon", "stephane"))
              .transform(FakeUtils1.applyFilters)
              .transform(FakeUtils2.enrichUser)
              .blockLast();


现在想象一下,在 findAllUserByName 内部,有一个失败的 map 。在这里,我们将在回溯的第二部分看到以下内容:

Error has been observed at the following site(s):
    *________Flux.map ⇢ at reactor.guide.FakeRepository.findAllUserByName(FakeRepository.java:27)
    |_       Flux.map ⇢ at reactor.guide.FakeRepository.findAllUserByName(FakeRepository.java:28)
    |_    Flux.filter ⇢ at reactor.guide.FakeUtils1.lambda$static$1(FakeUtils1.java:29)
    |_ Flux.transform ⇢ at reactor.guide.GuideDebuggingExtraTests.debuggingActivatedWithDeepTraceback(GuideDebuggingExtraTests.java:39)
    |_   Flux.elapsed ⇢ at reactor.guide.FakeUtils2.lambda$static$0(FakeUtils2.java:30)
    |_ Flux.transform ⇢ at reactor.guide.GuideDebuggingExtraTests.debuggingActivatedWithDeepTraceback(GuideDebuggingExtraTests.java:40)


这对应于收到错误通知的操作符链的部分:


  1. 异常源自第一个 map 。这个被 * 连接器识别为根,并且 _ 用于缩进。


  2. 第二个 map 会发现异常(两者实际上都对应于 findAllUserByName 方法)。


  3. 然后可以看到 filtertransform ,这表明链的一部分是由可重用的转换函数构建的(这里是 applyFilters 实用方法)。


  4. 最后,它由 elapsedtransform 看到。再次, elapsed 由第二个变换的变换函数应用。


在某些情况下,相同的异常通过多个链传播,“根”标记 *_ 允许我们更好地分离这些链。如果某个站点被多次访问,则调用站点信息后会有一个 (observed x times)


例如,让我们考虑以下代码片段:

public class MyClass {
    public void myMethod() {
        Flux<String> source = Flux.error(sharedError);
        Flux<String> chain1 = source.map(String::toLowerCase).filter(s -> s.length() < 4);
        Flux<String> chain2 = source.filter(s -> s.length() > 5).distinct();

        Mono<Void> when = Mono.when(chain1, chain2);
    }
}


在上面的代码中,错误通过两个单独的链 chain1chain2 传播到 when 。它将导致包含以下内容的回溯:

Error has been observed at the following site(s):
    *_____Flux.error ⇢ at myClass.myMethod(MyClass.java:3) (observed 2 times)
    |_      Flux.map ⇢ at myClass.myMethod(MyClass.java:4)
    |_   Flux.filter ⇢ at myClass.myMethod(MyClass.java:4)
    *_____Flux.error ⇢ at myClass.myMethod(MyClass.java:3) (observed 2 times)
    |_   Flux.filter ⇢ at myClass.myMethod(MyClass.java:5)
    |_ Flux.distinct ⇢ at myClass.myMethod(MyClass.java:5)
    *______Mono.when ⇢ at myClass.myMethod(MyClass.java:7)

 我们看到:


  1. 有 3 个“根”元素( when 是真正的根)。


  2. Flux.error 开始的两条链是可见的。


  3. 两个链似乎都基于相同的 Flux.error 源( observed 2 times )。

  4.  第一条链是 Flux.error().map().filter


  5. 第二条链是 `Flux.error().filter().distinct()


关于回溯和抑制异常的注释:由于回溯作为抑制异常附加到原始错误,这可能会在一定程度上干扰使用此机制的另一种类型的异常:复合异常。此类异常可以直接通过 Exceptions.multiple(Throwable…​) 创建,也可以通过某些可能连接多个错误源的运算符(如 Flux#flatMapDelayError )创建。它们可以通过 Exceptions.unwrapMultiple(Throwable) 展开为 List ,在这种情况下,回溯将被视为组合的组件,并且是返回的 List 的一部分。如果这在某种程度上是不可取的,则可以通过 Exceptions.isTraceback(Throwable) 检查来识别回溯,并通过使用 Exceptions.unwrapMultipleExcludingTracebacks(Throwable) 将其从此类展开中排除。


我们在这里处理一种形式的检测,创建堆栈跟踪的成本很高。这就是为什么此调试功能只能以受控方式激活,作为最后的手段。


7.3.1. checkpoint() 替代方案


调试模式是全局的,会影响应用程序内组装到 FluxMono 中的每个运算符。这样做的好处是允许事后调试:无论错误是什么,我们都可以获得附加信息来调试它。


正如我们之前所看到的,这种全局知识是以影响性能为代价的(由于填充的堆栈跟踪的数量)。如果我们知道可能有问题的操作员,那么成本就可以降低。然而,我们通常不知道哪些运算符可能有问题,除非我们在野外观察到错误,发现我们缺少程序集信息,然后修改代码以激活程序集跟踪,希望再次观察到相同的错误。


在这种情况下,我们必须切换到调试模式并做好准备,以便更好地观察错误的第二次发生,这一次捕获所有附加信息。


如果您可以识别在应用程序中组装的可维护性至关重要的反应链,则可以使用 checkpoint() 运算符实现这两种技术的混合。


您可以将此运算符链接到方法链中。 checkpoint 运算符的工作方式类似于钩子版本,但仅适用于该特定链的链接。


还有一个 checkpoint(String) 变体,可让您向程序集回溯添加唯一的 String 标识符。这样,堆栈跟踪就被省略,您可以依靠描述来识别组装站点。 checkpoint(String) 比常规 checkpoint 的处理成本更低。


最后但并非最不重要的一点是,如果您想向检查点添加更通用的描述,但仍依赖堆栈跟踪机制来识别程序集站点,则可以使用 checkpoint("description", true) 版本强制执行该行为。我们现在回到回溯的初始消息,并用 description 进行了扩充,如以下示例所示:

Assembly trace from producer [reactor.core.publisher.ParallelSource], described as [descriptionCorrelation1234] : (1)
	reactor.core.publisher.ParallelFlux.checkpoint(ParallelFlux.java:215)
	reactor.core.publisher.FluxOnAssemblyTest.parallelFluxCheckpointDescriptionAndForceStack(FluxOnAssemblyTest.java:225)
Error has been observed at the following site(s):
	|_	ParallelFlux.checkpoint ⇢ reactor.core.publisher.FluxOnAssemblyTest.parallelFluxCheckpointDescriptionAndForceStack(FluxOnAssemblyTest.java:225)
1
descriptionCorrelation1234checkpoint 中提供的描述。


该描述可以是静态标识符或用户可读的描述或更广泛的相关 ID(例如,来自 HTTP 请求的标头)。


当全局调试与检查点一起启用时,将应用全局调试回溯样式,并且检查点仅反映在“已观察到错误...”部分。因此,在这种情况下,重检查点的名称不可见。


7.4.生产就绪的全局调试


Project Reactor 附带一个单独的 Java 代理,可以检测您的代码并添加调试信息,而无需支付在每个操作符调用上捕获堆栈跟踪的成本。该行为与激活调试模式(又称回溯)非常相似,但没有运行时性能开销。


要在您的应用程序中使用它,您必须将其添加为依赖项。


以下示例显示如何在 Maven 中添加 reactor-tools 作为依赖项:


示例 21.Maven 中的reactor-tools,位于 <dependencies>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-tools</artifactId>
    (1)
</dependency>
1
如果您使用 BOM,则无需指定 <version>


以下示例展示了如何在 Gradle 中添加 reactor-tools 作为依赖项:


例22. Gradle中的reactor-tools,修改 dependencies
dependencies {
   compile 'io.projectreactor:reactor-tools'
}


它还需要显式初始化:

ReactorDebugAgent.init();

由于实现会在加载类时对其进行检测,因此最好将其放置在 main(String[]) 方法中的其他所有内容之前:
public static void main(String[] args) {
    ReactorDebugAgent.init();
    SpringApplication.run(Application.class, args);
}


如果您无法立即运行 init,您也可以使用 processExistingClasses() 重新处理现有的类。例如,在 JUnit5 中,从 TestExecutionListener 甚至类 static 初始化块中进行测试:

ReactorDebugAgent.init();
ReactorDebugAgent.processExistingClasses();

请注意,由于需要迭代所有加载的类并应用转换,重新处理需要几秒钟的时间。仅当您发现某些调用站点未检测时才使用它。

7.4.1. Limitations


ReactorDebugAgent 作为 Java 代理实现,并使用 ByteBuddy 执行自附加。自附加可能无法在某些 JVM 上工作,请参阅 ByteBuddy 的文档了解更多详细信息。


7.4.2.将 ReactorDebugAgent 作为 Java 代理运行


如果您的环境不支持 ByteBuddy 的自附加,您可以将 reactor-tools 作为 Java 代理运行:

java -javaagent reactor-tools.jar -jar app.jar


7.4.3.在构建时运行 ReactorDebugAgent


也可以在构建时运行 reactor-tools 。为此,您需要将其应用为 ByteBuddy 构建工具的插件。


该转换将仅应用于您项目的类。类路径库不会被检测。

示例 23.带有 ByteBuddy Maven 插件的reactor-tools
<dependencies>
	<dependency>
		<groupId>io.projectreactor</groupId>
		<artifactId>reactor-tools</artifactId>
		(1)
		<classifier>original</classifier> (2)
		<scope>runtime</scope>
	</dependency>
</dependencies>

<build>
	<plugins>
		<plugin>
			<groupId>net.bytebuddy</groupId>
			<artifactId>byte-buddy-maven-plugin</artifactId>
			<configuration>
				<transformations>
					<transformation>
						<plugin>reactor.tools.agent.ReactorDebugByteBuddyPlugin</plugin>
					</transformation>
				</transformations>
			</configuration>
		</plugin>
	</plugins>
</build>
1
如果您使用 BOM,则无需指定 <version>
2
classifier 这里很重要。

示例 24. 带有 ByteBuddy 的 Gradle 插件的reactor-tools
plugins {
	id 'net.bytebuddy.byte-buddy-gradle-plugin' version '1.10.9'
}

configurations {
	byteBuddyPlugin
}

dependencies {
	byteBuddyPlugin(
			group: 'io.projectreactor',
			name: 'reactor-tools',
			(1)
			classifier: 'original', (2)
	)
}

byteBuddy {
	transformation {
		plugin = "reactor.tools.agent.ReactorDebugByteBuddyPlugin"
		classPath = configurations.byteBuddyPlugin
	}
}
1
如果您使用 BOM,则无需指定 version
2
classifier 这里很重要。


7.5。记录序列


除了堆栈跟踪调试和分析之外,工具包中另一个强大的工具是能够以异步顺序跟踪和记录事件。


log() 运算符就可以做到这一点。它链接在一个序列内,查看其上游 FluxMono 的每个事件(包括 onNextonError 和 < b5> 以及订阅、取消和请求)。


关于日志记录实现的说明


log 运算符使用 Loggers 实用程序类,该实用程序类通过 SLF4J 获取常见的日志记录框架,例如 Log4J 和 Logback,并且如果 SLF4J 是,则默认记录到控制台不可用。


控制台回退使用 System.err 表示 WARNERROR 日志级别,使用 System.out 表示其他所有日志级别。


如果您更喜欢 JDK java.util.logging 回退(如 3.0.x 中那样),则可以通过将 reactor.logging.fallback 系统属性设置为 JDK 来获取它。


在所有情况下,在生产中登录时,您应该注意配置底层日志框架以使用其最异步和非阻塞的方法 - 例如,Logback 中的 AsyncAppenderAsyncLogger 在 Log4j 2 中。


例如,假设我们激活并配置了 Logback 以及类似 range(1,10).take(3) 的链。通过在 take 之前放置 log() ,我们可以深入了解它的工作原理以及它向上游传播到范围的事件类型,如以下示例所示:

Flux<Integer> flux = Flux.range(1, 10)
                         .log()
                         .take(3);
flux.subscribe();


这会打印出以下内容(通过记录器的控制台附加程序):

10:45:20.200 [main] INFO  reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) (1)
10:45:20.205 [main] INFO  reactor.Flux.Range.1 - | request(3) (2)
10:45:20.205 [main] INFO  reactor.Flux.Range.1 - | onNext(1) (3)
10:45:20.205 [main] INFO  reactor.Flux.Range.1 - | onNext(2)
10:45:20.205 [main] INFO  reactor.Flux.Range.1 - | onNext(3)
10:45:20.205 [main] INFO  reactor.Flux.Range.1 - | cancel() (4)


在这里,除了记录器自己的格式化程序(时间、线程、级别、消息)之外, log() 运算符还以其自己的格式输出一些内容:

1
reactor.Flux.Range.1 是日志的自动类别,以防您在链中多次使用该运算符。它可以让您区分记录了哪个操作员的事件(在本例中为 range )。您可以使用 log(String) 方法签名用您自己的自定义类别覆盖标识符。在几个分隔字符之后,将打印实际事件。在这里,我们得到一个 onSubscribe 调用、一个 request 调用、三个 onNext 调用和一个 cancel 调用。对于第一行 onSubscribe ,我们得到 Subscriber 的实现,它通常对应于特定于运算符的实现。在方括号之间,我们得到了额外的信息,包括算子是否可以通过同步或异步融合自动优化。
2
在第二行,我们可以看到 take 将向上游的请求限制为 3 个。
3
然后范围连续发送三个值。
4
在最后一行,我们看到 cancel()


第二行 (2) 和最后一行 (4) 最有趣。我们可以看到 take 在那里起作用。它利用背压来向源请求准确的预期元素数量。收到足够的元素后,它通过调用 cancel() 告诉源不再需要任何元素。请注意,如果下游本身使用了背压,例如。通过仅请求 1 个元素, take 运算符将遵守这一点(当将请求从下游传播到上游时,它会限制请求)。


建议编辑“调试反应器”


8. 公开 Reactor 指标


Project Reactor 是一个旨在提高性能和更好地利用资源的库。但要真正了解系统的性能,最好能够监控其各个组件。


这就是 Reactor 通过 reactor-core-micrometer 模块提供与 Micrometer 内置集成的原因。该模块在 2022.0 BOM 版本中引入,提供了对 Micrometer 的显式依赖,这使得它能够为指标和观察提供微调的 API。


到 Reactor-Core 3.5.0 为止,指标被实现为运算符,如果 Micrometer 不在类路径上,则这些运算符将是无操作的。


reactor-core-micrometer API 要求用户显式提供某种形式的注册表,而不是依赖于硬编码的全局注册表。当将检测应用于具有原生命名或标签概念的类时,这些 API 将尝试发现反应链中的此类元素。否则,API 将期望与注册表一起提供命名计量表的前缀。

 8.1.调度程序指标


Reactor 中的每个异步操作都是通过线程和调度程序中描述的调度程序抽象来完成的。这就是为什么监视调度程序、留意开始看起来可疑的关键指标并做出相应反应非常重要。


reactor-core-micrometer 模块提供了一个“定时” Scheduler 包装器,用于围绕通过它提交的任务执行测量,可以按如下方式使用:

Scheduler originalScheduler = Schedulers.newParallel("test", 4);

Scheduler schedulerWithMetrics = Micrometer.timedScheduler(
	originalScheduler, (1)
	applicationDefinedMeterRegistry, (2)
	"testingMetrics", (3)
	Tags.of(Tag.of("additionalTag", "yes")) (4)
);
1  要换行的 Scheduler
2
用于发布指标的 MeterRegistry
3
命名仪表时使用的前缀。例如,这将导致创建一个 testingMetrics.scheduler.tasks.completed 仪表。
4
添加到为该包装创建的所有计量表的可选标签 Scheduler

当包装常见的 Scheduler (例如 Schedulers.single() )或在多个地方使用的 Scheduler 时,只有 Runnable 任务通过 Micrometer#timedScheduler 返回的包装器实例提交的数据将被检测。


有关生成的仪表和关联的默认标签,请参阅 Micrometer.timedScheduler()

 8.2.发布商指标


有时,能够在反应式管道中的某个阶段记录指标很有用。


一种方法是从提供给 tap 运算符的自定义 SignalListener 手动将值推送到您选择的指标后端。


开箱即用的实现实际上是由 reactor-core-micrometer 模块通过 Micrometer#metrics API 提供的。考虑以下管道:

listenToEvents()
    .doOnNext(event -> log.info("Received {}", event))
    .delayUntil(this::processEvent)
    .retry()
    .subscribe();


要启用此源 Flux (从 listenToEvents() 返回)的指标,我们需要打开指标集合:

listenToEvents()
    .name("events") (1)
    .tap(Micrometer.metrics( (2)
        applicationDefinedMeterRegistry (3)
    ))
    .doOnNext(event -> log.info("Received {}", event))
    .delayUntil(this::processEvent)
    .retry()
    .subscribe();
1
反应式管道此阶段的每个指标都将使用“事件”作为命名前缀(可选,默认为 reactor 前缀)。
2
我们使用 tap 运算符与 reactor-core-micrometer 中提供的 SignalListener 实现相结合来收集指标。
3
与该模块中的其他 API 一样,需要显式提供要发布指标的 MeterRegistry


Micrometer.metrics() 中提供了公开指标的详细信息。

 8.2.1.标签


除了 Micrometer.metrics() 中描述的常见标签之外,用户还可以通过 tag 运算符将自定义标签添加到其反应链中:

listenToEvents()
    .name("events") (1)
    .tag("source", "kafka") (2)
    .tap(Micrometer.metrics(applicationDefinedRegistry)) (3)
    .doOnNext(event -> log.info("Received {}", event))
    .delayUntil(this::processEvent)
    .retry()
    .subscribe();
1
此阶段的每个指标都将使用“事件”前缀进行标识。
2
将自定义标签“source”设置为值“kafka”。
3
除了常见标签之外,所有报告的指标还将分配有 source=kafka 标签。


请注意,根据您使用的监控系统,在使用标签时使用名称可能被视为强制,因为否则会导致两个默认命名序列之间出现一组不同的标签。某些系统(例如 Prometheus)可能还要求具有相同名称的每个指标具有完全相同的标签集。

 8.2.2.观察


除了完整指标之外, reactor-core-micrometer 模块还提供基于 Micrometer 的 Observation 的替代方案。根据配置和运行时类路径, Observation 可以转换为计时器、跨度、日志语句或任意组合。


可以通过 tap 运算符和 Micrometer.observation 实用程序观察反应链,如下所示:

listenToEvents()
    .name("events") (1)
    .tap(Micrometer.observation( (2)
		applicationDefinedRegistry)) (3)
    .doOnNext(event -> log.info("Received {}", event))
    .delayUntil(this::processEvent)
    .retry()
    .subscribe();
1
此管道的 Observation 将使用“events”前缀进行标识。
2
我们将 tap 运算符与 observation 实用程序一起使用。
3
必须提供一个注册表来发布观察结果。请注意,这是一个 ObservationRegistry


Micrometer.observation() 中提供了观察及其标签的详细信息。


您还可以通过 Micrometer.observation(ObservationRegistry registry, Function<ObservationRegistry, Observation> observationSupplier) 与您自己的观察供应商完全定制千分尺的观察,如下所示:

listenToEvents()
    .name("events") (1)
    .tap(Micrometer.observation( (2)
    	applicationDefinedRegistry, (3)
    	registry -> Observation.createNotStarted( (4)
    		myConvention, (5)
            myContextSupplier, (6)
            registry)))
    .doOnNext(event -> log.info("Received {}", event))
    .delayUntil(this::processEvent)
    .retry()
    .subscribe();
1
此管道的 Observation 将使用“events”前缀进行标识。
2
我们将 tap 运算符与 observation 实用程序一起使用。
3
必须提供一个注册表来发布观察结果。请注意,这是一个 ObservationRegistry
4
我们提供自己的函数来创建观察
5  使用自定义 ObservationConvention
6
和自定义 Supplier<Context>


建议编辑“暴露反应器指标”


8.3. Reactor-Core-Micrometer 模块的仪表和标签

8.3.1. Micrometer.metrics()


下面是指标点击侦听器功能使用的计量列表,通过 Micrometer.metrics(MeterRegistry meterRegistry) 公开。


请注意,下面的指标使用动态 %s 前缀。当应用于使用 name(String n) 运算符的 FluxMono 时,它将替换为 n 。否则,它将替换为默认值 "reactor"
 流动持续时间


订阅与序列终止或取消之间经过的持续时间的时间。添加 TerminationTags#STATUS 标记以指定导致计时器结束的事件( "completed""completedEmpty""error""cancelled" )。


指标名称 %s.flow.duration - 由于它包含 %s ,因此该名称是动态的,将在运行时解析。输入 distribution summary


*.active 指标中可能会缺少启动观察后添加的键值。

表 1. 低基数键

 姓名

 描述

  exception (必填)


当 STATUS 为 "error" 时 FLOW_DURATION 使用的标记,用于存储发生的异常。

  status (必填)

 终止状态:


  • "completed" 用于以 onComplete 终止的序列,并带有 onNext(s)


  • "completedEmpty" 对于在 onComplete 之前没有任何 onNext 的情况下终止的序列


  • "error" 用于以 onError 终止的序列


  • "cancelled" 对于已取消订阅的序列

  type (必填)


序列的类型( "Flux""Mono" )。

 格式错误的源事件


计算从格式错误的源接收到的事件数(即 onComplete 之后的 onNext)。


指标名称 %s.malformed.source - 由于它包含 %s ,因此该名称是动态的,将在运行时解析。输入 counter


*.active 指标中可能会缺少启动观察后添加的键值。

表 2. 低基数键

 姓名

 描述

  type (必填)


序列的类型( "Flux""Mono" )。

 下次延迟时


测量每个 onNext 之间(或第一个 onNext 和 onSubscribe 事件之间)的延迟。


指标名称 %s.onNext.delay - 由于它包含 %s ,因此该名称是动态的,将在运行时解析。键入 timer 和基本单位 nanoseconds


*.active 指标中可能会缺少启动观察后添加的键值。

Micrometer 内部使用 nanoseconds 作为基本单位。然而,每个后端决定实际的基本单元。 (即普罗米修斯使用秒)

表 3. 低基数键

 姓名

 描述

  type (必填)


序列的类型( "Flux""Mono" )。

 要求金额


计算所有订阅者对命名序列(例如 Flux.name(String) )请求的数量,直到至少一个订阅者请求无限数量。


指标名称 %s.requested - 由于它包含 %s ,因此该名称是动态的,将在运行时解析。输入 distribution summary


*.active 指标中可能会缺少启动观察后添加的键值。

表 4. 低基数键

 姓名

 描述

  type (必填)


序列的类型( "Flux""Mono" )。

 已订阅


计算序列的订阅数量。


指标名称 %s.subscribed - 由于它包含 %s ,因此该名称是动态的,将在运行时解析。输入 counter


*.active 指标中可能会缺少启动观察后添加的键值。

表 5. 低基数键

 姓名

 描述

  type (必填)


序列的类型( "Flux""Mono" )。

8.3.2. Micrometer.timedScheduler()


下面是 TimedScheduler 功能使用的计量列表,通过 Micrometer.timedScheduler(Scheduler original, MeterRegistry meterRegistry, String metricsPrefix) 公开。


请注意,下面的指标使用动态 %s 前缀。在实践中,这被替换为提供的 metricsPrefix
 活动任务


LongTaskTimer 反映当前正在运行的任务。请注意,这反映了所有类型的活动任务,包括延迟或定期安排的任务(每次迭代都被视为活动任务)。


指标名称 %s.scheduler.tasks.active - 由于它包含 %s ,因此该名称是动态的,将在运行时解析。输入 long task timer


*.active 指标中可能会缺少启动观察后添加的键值。

Micrometer 内部使用 nanoseconds 作为基本单位。然而,每个后端决定实际的基本单元。 (即普罗米修斯使用秒)
 任务完成


反映已完成执行的任务的计时器。请注意,这反映了所有类型的活动任务,包括延迟或定期的任务(每次迭代都被视为单独的已完成任务)。


指标名称 %s.scheduler.tasks.completed - 由于它包含 %s ,因此该名称是动态的,将在运行时解析。输入 timer


*.active 指标中可能会缺少启动观察后添加的键值。

Micrometer 内部使用 nanoseconds 作为基本单位。然而,每个后端决定实际的基本单元。 (即普罗米修斯使用秒)
 待处理任务


LongTaskTimer 反映已提交立即执行但由于调度程序已达到最大容量而无法立即启动的任务。请注意,仅考虑通过 Scheduler#schedule(Runnable) 和 Scheduler.Worker#schedule(Runnable) 立即提交。


指标名称 %s.scheduler.tasks.pending - 由于它包含 %s ,因此该名称是动态的,将在运行时解析。输入 long task timer


*.active 指标中可能会缺少启动观察后添加的键值。

Micrometer 内部使用 nanoseconds 作为基本单位。然而,每个后端决定实际的基本单元。 (即普罗米修斯使用秒)
 已提交的任务


每次提交任务时计数器都会加一(通过 Scheduler 和 Scheduler.Worker 上的任何调度方法)。


请注意,实际上有 4 个计数器,可以通过 SubmittedTags#SUBMISSION 标签来区分。因此,所有这些的总和可以与 TASKS_COMPLETED 计数器进行比较。


指标名称 %s.scheduler.tasks.submitted - 由于它包含 %s ,因此该名称是动态的,将在运行时解析。输入 counter


*.active 指标中可能会缺少启动观察后添加的键值。

表 6. 低基数键

 姓名

 描述

  submission.type (必填)


提交类型:

  •   "direct"Scheduler#schedule(Runnable)

  •   "delayed"Scheduler#schedule(Runnable,long,TimeUnit)


  • 初始延迟后 "periodic_initial" 用于 Scheduler#schedulePeriodically(Runnable,long,long,TimeUnit)


  • "periodic_iteration" 用于 Scheduler#schedulePeriodically(Runnable,long,long,TimeUnit) 进一步的周期性迭代

8.3.3. Micrometer.observation()


下面是观察点击侦听器功能使用的仪表列表,通过 Micrometer.observation(ObservationRegistry registry) 公开。


这是匿名观察,但您可以使用 name(String) 运算符创建具有自定义名称的类似观察。


您还可以通过 Micrometer.observation(ObservationRegistry registry, Function<ObservationRegistry, Observation> observationSupplier) 与您自己的观察供应商完全自定义 Micrometer 的观察,从而允许配置其属性(名称、上下文名称、低基数键和高基数键,...​)。
 匿名的


Micrometer.observation() 的匿名版本,当序列尚未通过例如明确命名时Flux#name(String) 运算符。


指标名称 reactor.observation 。输入 timer


指标名称 reactor.observation.active 。输入 long task timer


*.active 指标中可能会缺少启动观察后添加的键值。

Micrometer 内部使用 nanoseconds 作为基本单位。然而,每个后端决定实际的基本单元。 (即普罗米修斯使用秒)

表 7. 低基数键

 姓名

 描述

  reactor.status (必填)


序列的状态,指示它如何终止( "completed""completedEmpty""error""cancelled" )。

  reactor.type (必填)


序列的类型,即 "Flux""Mono"


9. 高级功能和概念


本章介绍 Reactor 的高级功能和概念,包括以下内容:


9.1.互用运算符的使用


从干净代码的角度来看,代码重用通常是一件好事。 Reactor 提供了一些模式,可以帮助您重用和交互代码,特别是对于您可能希望在代码库中定期应用的运算符或运算符组合。如果您将操作符链视为菜谱,则可以创建操作符菜谱的“食谱”。


9.1.1.使用 transform 运算符


transform 运算符允许您将运算符链的一部分封装到函数中。该函数在组装时应用于原始运算符链,以使用封装的运算符对其进行扩充。这样做对序列的所有订阅者应用相同的操作,基本上相当于直接链接操作符。以下代码显示了一个示例:

Function<Flux<String>, Flux<String>> filterAndMap =
f -> f.filter(color -> !color.equals("orange"))
      .map(String::toUpperCase);

Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
	.doOnNext(System.out::println)
	.transform(filterAndMap)
	.subscribe(d -> System.out.println("Subscriber to Transformed MapAndFilter: "+d));


下图显示了 transform 运算符如何封装流:

Transform Operator : encapsulate flows


前面的示例产生以下输出:

blue
Subscriber to Transformed MapAndFilter: BLUE
green
Subscriber to Transformed MapAndFilter: GREEN
orange
purple
Subscriber to Transformed MapAndFilter: PURPLE


9.1.2.使用 transformDeferred 运算符


transformDeferred 运算符与 transform 类似,也允许您将运算符封装在函数中。主要区别在于,此函数基于每个订阅者应用于原始序列。这意味着该函数实际上可以为每个订阅生成不同的操作符链(通过维护某种状态)。以下代码显示了一个示例:

AtomicInteger ai = new AtomicInteger();
Function<Flux<String>, Flux<String>> filterAndMap = f -> {
	if (ai.incrementAndGet() == 1) {
return f.filter(color -> !color.equals("orange"))
        .map(String::toUpperCase);
	}
	return f.filter(color -> !color.equals("purple"))
	        .map(String::toUpperCase);
};

Flux<String> composedFlux =
Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
    .doOnNext(System.out::println)
    .transformDeferred(filterAndMap);

composedFlux.subscribe(d -> System.out.println("Subscriber 1 to Composed MapAndFilter :"+d));
composedFlux.subscribe(d -> System.out.println("Subscriber 2 to Composed MapAndFilter: "+d));


下图显示了 transformDeferred 运算符如何处理每个订阅者的转换:

Compose Operator : Per Subscriber transformation


前面的示例产生以下输出:

blue
Subscriber 1 to Composed MapAndFilter :BLUE
green
Subscriber 1 to Composed MapAndFilter :GREEN
orange
purple
Subscriber 1 to Composed MapAndFilter :PURPLE
blue
Subscriber 2 to Composed MapAndFilter: BLUE
green
Subscriber 2 to Composed MapAndFilter: GREEN
orange
Subscriber 2 to Composed MapAndFilter: ORANGE
purple


9.2.热与冷


到目前为止,我们认为所有 Flux (和 Mono )都是相同的:它们都表示异步数据序列,并且在订阅之前不会发生任何事情。


但实际上,出版商有两大类:热的和冷的。


前面的描述适用于冷漠的出版商家族。他们为每个订阅重新生成数据。如果没有创建订阅,则永远不会生成数据。


想象一下 HTTP 请求:每个新订阅者都会触发一个 HTTP 调用,但如果没有人对结果感兴趣,则不会进行任何调用。


另一方面,热门发布商不依赖于任何数量的订阅者。他们可能会立即开始发布数据,并且每当有新的 Subscriber 到来时就会继续这样做(在这种情况下,订阅者只会看到订阅后发出的新元素)。对于热门发布商来说,在您订阅之前确实会发生一些事情。


Reactor 中的几个热门运算符的一个例子是 just :它在汇编时直接捕获值,然后将其重播给任何订阅它的人。再次使用 HTTP 调用类比,如果捕获的数据是 HTTP 调用的结果,则在实例化 just 时仅进行一次网络调用。


要将 just 转换为冷发布者,您可以使用 defer 。它将我们示例中的 HTTP 请求推迟到订阅时间(并且会导致每个新订阅都有单独的网络调用)。


相反, share()replay(…​) 可用于将冷发布者变成热门发布者(至少在第一次订阅发生后)。这两个在 Sinks 类中也有 Sinks.Many 等价物,允许以编程方式提供序列。


考虑两个示例,一个演示冷 Flux,另一个使用 Sinks 模拟热 Flux。以下代码显示了第一个示例:

Flux<String> source = Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
                          .map(String::toUpperCase);

source.subscribe(d -> System.out.println("Subscriber 1: "+d));
source.subscribe(d -> System.out.println("Subscriber 2: "+d));


第一个示例产生以下输出:

Subscriber 1: BLUE
Subscriber 1: GREEN
Subscriber 1: ORANGE
Subscriber 1: PURPLE
Subscriber 2: BLUE
Subscriber 2: GREEN
Subscriber 2: ORANGE
Subscriber 2: PURPLE


下图显示了重播行为:

Replaying behavior


两个订阅者都会捕获所有四种颜色,因为每个订阅者都会导致 Flux 上的运算符定义的进程运行。


将第一个示例与第二个示例进行比较,如以下代码所示:

Sinks.Many<String> hotSource = Sinks.unsafe().many().multicast().directBestEffort();

Flux<String> hotFlux = hotSource.asFlux().map(String::toUpperCase);

hotFlux.subscribe(d -> System.out.println("Subscriber 1 to Hot Source: "+d));

hotSource.emitNext("blue", FAIL_FAST); (1)
hotSource.tryEmitNext("green").orThrow(); (2)

hotFlux.subscribe(d -> System.out.println("Subscriber 2 to Hot Source: "+d));

hotSource.emitNext("orange", FAIL_FAST);
hotSource.emitNext("purple", FAIL_FAST);
hotSource.emitComplete(FAIL_FAST);
1
有关接收器的更多详细信息,请参阅接收器
2
旁注: orThrow() 这里是 emitNext + Sinks.EmitFailureHandler.FAIL_FAST 的替代方案,适合测试,因为在那里抛出是可以接受的(比反应式应用程序更是如此)。


第二个示例产生以下输出:

Subscriber 1 to Hot Source: BLUE
Subscriber 1 to Hot Source: GREEN
Subscriber 1 to Hot Source: ORANGE
Subscriber 2 to Hot Source: ORANGE
Subscriber 1 to Hot Source: PURPLE
Subscriber 2 to Hot Source: PURPLE


下图显示了订阅的广播方式:

Broadcasting a subscription


订阅者 1 捕获所有四种颜色。订阅者 2 在生成前两种颜色后创建,仅捕获最后两种颜色。此差异导致输出中 ORANGEPURPLE 增加一倍。无论何时附加订阅,操作员在此 Flux 上描述的过程都会运行。


9.3.使用 ConnectableFlux 向多个订阅者广播


有时,您可能不希望仅将某些处理推迟到一个订阅者的订阅时间,但您实际上可能希望其中几个订阅者会合,然后触发订阅和数据生成。


这就是 ConnectableFlux 的用途。 Flux API 中涵盖了返回 ConnectableFlux 的两个主要模式: publishreplay


  • publish 通过将这些请求转发到源,动态地尝试尊重来自不同订阅者的需求(在背压方面)。最值得注意的是,如果任何订阅者有 0 的待处理需求,则发布将暂停其对源的请求。


  • replay 缓冲通过第一个订阅看到的数据,最多可达可配置的限制(时间和缓冲区大小)。它将数据重播给后续订阅者。


ConnectableFlux 提供了额外的方法来管理下游订阅与原始源的订阅。这些附加方法包括以下内容:


  • 一旦您达到 Flux 的足够订阅量,就可以手动调用 connect() 。这会触发对上游源的订阅。


  • 一旦进行了 n 订阅, autoConnect(n) 就可以自动执行相同的工作。


  • refCount(n) 不仅自动跟踪传入订阅,还检测这些订阅何时被取消。如果没有跟踪到足够的订阅者,则源将“断开连接”,如果出现其他订阅者,则会导致稍后对源进行新的订阅。


  • refCount(int, Duration) 添加“宽限期”。一旦跟踪的订阅者数量变得太少,它就会在断开源连接之前等待 Duration ,这可能允许足够的新订阅者进入并再次跨越连接阈值。


考虑以下示例:

Flux<Integer> source = Flux.range(1, 3)
                           .doOnSubscribe(s -> System.out.println("subscribed to source"));

ConnectableFlux<Integer> co = source.publish();

co.subscribe(System.out::println, e -> {}, () -> {});
co.subscribe(System.out::println, e -> {}, () -> {});

System.out.println("done subscribing");
Thread.sleep(500);
System.out.println("will now connect");

co.connect();


前面的代码产生以下输出:

done subscribing
will now connect
subscribed to source
1
1
2
2
3
3


以下代码使用 autoConnect

Flux<Integer> source = Flux.range(1, 3)
                           .doOnSubscribe(s -> System.out.println("subscribed to source"));

Flux<Integer> autoCo = source.publish().autoConnect(2);

autoCo.subscribe(System.out::println, e -> {}, () -> {});
System.out.println("subscribed first");
Thread.sleep(500);
System.out.println("subscribing second");
autoCo.subscribe(System.out::println, e -> {}, () -> {});


前面的代码产生以下输出:

subscribed first
subscribing second
subscribed to source
1
1
2
2
3
3


9.4.三种批处理


当您有很多元素并且想要将它们分成批次时,Reactor 中有三种广泛的解决方案:分组、窗口和缓冲。这三个在概念上很接近,因为它们将 Flux<T> 重新分配到聚合中。分组和窗口化创建一个 Flux<Flux<T>> ,而缓冲聚合到一个 Collection<T>


9.4.1.使用 Flux<GroupedFlux<T>> 分组


分组是将源 Flux<T> 分成多个批次的行为,每个批次都匹配一个键。


关联的运算符是 groupBy


每个组都表示为 GroupedFlux<T> ,它允许您通过调用其 key() 方法来检索密钥。


各组的内容没有必要的连续性。一旦源元素产生新的密钥,该密钥的组就会被打开,并且与该密钥匹配的元素最终会出现在该组中(可以同时打开多个组)。


这意味着团体:


  1. 始终不相交(源元素属于一个且仅一个组)。


  2. 可以包含原始序列中不同位置的元素。

  3.  永远不会空。


以下示例根据值是偶数还是奇数对值进行分组:

StepVerifier.create(
	Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
		.groupBy(i -> i % 2 == 0 ? "even" : "odd")
		.concatMap(g -> g.defaultIfEmpty(-1) //if empty groups, show them
				.map(String::valueOf) //map to string
				.startWith(g.key())) //start with the group's key
	)
	.expectNext("odd", "1", "3", "5", "11", "13")
	.expectNext("even", "2", "4", "6", "12")
	.verifyComplete();

分组最适合组数中到少量的情况。这些组还必须强制被消耗(例如由 flatMap ),以便 groupBy 继续从上游获取数据并提供更多组。有时,这两个约束会相乘并导致挂起,例如当基数较高且使用组的 flatMap 并发性太低时。


9.4.2.使用 Flux<Flux<T>> 进行窗口化


窗口化是按照大小、时间、边界定义谓词或边界定义 Publisher 等标准将源 Flux<T> 分割为窗口的行为。


关联的运算符为 windowwindowTimeoutwindowUntilwindowWhilewindowWhen


与根据传入键随机重叠的 groupBy 相反,窗口(大多数时候)是按顺序打开的。


不过,某些变体仍然可以重叠。例如,在 window(int maxSize, int skip) 中, maxSize 参数是窗口关闭之前的元素数量, skip 参数是之后源中的元素数量将打开一个新窗口。因此,如果 maxSize > skip ,则在前一个窗口关闭之前打开一个新窗口,并且两个窗口重叠。


以下示例显示重叠窗口:

StepVerifier.create(
	Flux.range(1, 10)
		.window(5, 3) //overlapping windows
		.concatMap(g -> g.defaultIfEmpty(-1)) //show empty windows as -1
	)
		.expectNext(1, 2, 3, 4, 5)
		.expectNext(4, 5, 6, 7, 8)
		.expectNext(7, 8, 9, 10)
		.expectNext(10)
		.verifyComplete();

使用反向配置 ( maxSize < skip ),源中的一些元素将被删除,并且不属于任何窗口。


在通过 windowUntilwindowWhile 进行基于谓词的窗口的情况下,后续源元素与谓词不匹配也可能导致空窗口,如以下示例所示:

StepVerifier.create(
	Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
		.windowWhile(i -> i % 2 == 0)
		.concatMap(g -> g.defaultIfEmpty(-1))
	)
		.expectNext(-1, -1, -1) //re