这是用户在 2024-3-22 9:40 为 https://projectreactor.io/docs/core/release/reference/ 保存的双语快照页面,由 沉浸式翻译 提供双语支持。了解如何保存?


1. 关于文档


本节提供 Reactor 参考文档的简要概述。您不需要以线性方式阅读本指南。每件作品都是独立的,尽管它们经常引用其他作品。


Reactor 参考指南以 HTML 文档形式提供。最新副本可在 https://projectreactor.io/docs/core/release/reference/index.html 获取


您可以制作本文档的副本供您自己使用或分发给其他人,前提是您不收取此类副本的任何费用,并且每份副本都包含本版权声明,无论是以印刷版还是电子版形式分发。


1.2.为文档做出贡献


该参考指南是用 Asciidoc 编写的,您可以在 https://github.com/reactor/reactor-core/tree/main/docs/asciidoc 找到其来源。


如果您有改进或建议,我们将很高兴收到您的拉取请求!


我们建议您查看存储库的本地副本,以便您可以通过运行 asciidoctor gradle 任务并检查渲染来生成文档。有些部分依赖于包含的文件,因此 GitHub 渲染并不总是完整的。


为了方便文档编辑,大多数部分末尾都有一个链接,可直接在 GitHub 上打开该部分主要源文件的编辑 UI。这些链接仅出现在本参考指南的 HTML5 版本中。它们如下所示:建议编辑关于文档。

 1.3.寻求帮助


您可以通过多种方式使用 Reactor 寻求帮助:


  • 与 Gitter 社区联系。


  • 在 stackoverflow.com 上提问,网址为 project-reactor


  • 在 Github 问题中报告错误。我们密切监视以下存储库:reactor-core(涵盖基本功能)和reactor-addons(涵盖reactor-test和适配器问题)。


Reactor 的所有内容都是开源的,包括本文档。如果您发现文档存在问题或者想要改进它们,请参与其中。


1.4.从这往哪儿走


  • 如果您想直接跳入代码,请前往入门。


  • 不过,如果您是响应式编程的新手,您可能应该从响应式编程简介开始。


  • 如果您熟悉 Reactor 概念并且只是在寻找适合该工作的工具,但无法想到相关的操作员,请尝试我需要哪个操作员?附录。


  • 为了更深入地了解 Reactor 的核心功能,请前往 Reactor 核心功能来了解:


    • 有关 Reactor 反应类型的更多信息,请参阅 Flux (0-N 项的异步序列)和 Mono (异步 0-1 结果)部分。


    • 如何使用调度程序切换线程上下文。


    • 如何处理错误请参见处理错误部分。


  • 单元测试?是的, reactor-test 项目是可能的!请参阅测试。


  • 以编程方式创建序列提供了一种更高级的创建反应源的方法。


  • 其他高级主题包含在高级功能和概念中。


建议编辑“关于文档”

 2. 入门


本节包含可帮助您开始使用 Reactor 的信息。它包括以下部分:


2.1.介绍反应堆


Reactor 是 JVM 的完全非阻塞反应式编程基础,具有高效的需求管理(以管理“背压”的形式)。它直接与 Java 8 功能 API 集成,特别是 CompletableFutureStreamDuration 。它提供可组合的异步序列 API — Flux (用于 [N] 个元素)和 Mono (用于 [0|1] 个元素) — 并广泛实现了 Reactive Streams 规范。


Reactor 还支持与 reactor-netty 项目的非阻塞进程间通信。 Reactor Netty 适合微服务架构,为 HTTP(包括 Websockets)、TCP 和 UDP 提供背压就绪的网络引擎。完全支持反应式编码和解码。

 2.2.先决条件


Reactor Core 在 Java 8 及更高版本上运行。


它对 org.reactivestreams:reactive-streams:1.0.3 具有传递依赖性。

 安卓支持

  • Reactor 3 并未正式支持或针对 Android(如果强烈需要此类支持,请考虑使用 RxJava 2)。


  • 但是,它应该可以在 Android SDK 26 (Android O) 及更高版本上正常工作。


  • 当启用脱糖功能时,它可能会在 Android SDK 21 (Android 5.0) 及更高版本上正常工作。请参阅 https://developer.android.com/studio/write/java8-support#library-desugaring


  • 我们愿意尽最大努力评估有利于 Android 支持的更改。但是,我们无法做出保证。每个决定都必须根据具体情况做出。


2.3.了解 BOM 和版本控制方案


Reactor 3 使用 BOM(物料清单)模型(从 reactor-core 3.0.4 开始,以及 Aluminium 发布系列)。该精选列表对旨在协同工作的工件进行了分组,尽管这些工件中可能存在不同的版本控制方案,但仍提供相关版本。


请注意,版本控制方案在 3.3.x 和 3.4.x(镝和铕)之间发生了变化。


工件遵循 MAJOR.MINOR.PATCH-QUALIFIER 版本控制方案,而 BOM 使用受 CalVer 启发的 YYYY.MINOR.PATCH-QUALIFIER 方案进行版本控制,其中:


  • MAJOR 是当前一代的 Reactor,每一代都可以给项目结构带来根本性的变化(这可能意味着更重要的迁移工作)


  • YYYY 是给定发布周期中第一个 GA 版本的年份(例如 3.4.x 的 3.4.0)


  • .MINOR 是一个从 0 开始的数字,随着每个新的发布周期递增


    • 就项目而言,它通常反映了更广泛的变化,并且可以表明适度的迁移努力


    • 就 BOM 而言,它允许区分发布周期,以防两个发布周期在同一年首次发布


  • .PATCH 是一个从 0 开始的数字,随着每个服务版本的增加而递增


  • -QUALIFIER 是一个文本限定符,在 GA 版本中被省略(见下文)


因此,遵循该约定的第一个发布周期是 2020.0.x ,代号 Europium 。该方案按顺序使用以下限定符(注意使用破折号分隔符):


  • -M1 .. -M9 :里程碑(我们预计每个服务版本不会超过 9 个)


  • -RC1 .. -RC9 :候选版本(我们预计每个服务版本不会超过 9 个)

  •   -SNAPSHOT :快照


  • GA 版本没有限定符


快照在上面的顺序中显示得较高,因为从概念上讲,它们始终是任何给定补丁的“最新预发行版”。尽管 PATCH 周期的第一个部署的工件始终是 -SNAPSHOT,但在例如之后也会发布一个类似名称但更新的快照。里程碑或候选版本之间。


每个发布周期也会被赋予一个代号,与之前基于代号的方案保持一致,可以用来更非正式地引用它(例如在讨论、博客文章等中……​)。代号代表传统上的 MAJOR.MINOR 数字。它们(大部分)来自元素周期表,按字母顺序递增。


直到 Dysprosium 为止,BOM 都是使用发布序列方案进行版本控制,其中代号后跟限定符,并且限定符略有不同。例如:Aluminium-RELEASE(第一个 GA 版本,现在类似于 YYYY.0.0)、Bismuth-M1、Californium-SR1(服务版本现在类似于 YYYY.0.1)、Dysprosium-RC1、Dysprosium-BUILD-SNAPSHOT (每个补丁之后,我们都会返回到相同的快照版本。现在会类似于 YYYY.0.X-SNAPSHOT,因此我们每个补丁都会获得 1 个快照)

 2.4.获取反应堆


如前所述,在核心中使用 Reactor 最简单的方法是使用 BOM 并将相关依赖项添加到项目中。请注意,添加此类依赖项时,必须省略版本,以便从 BOM 中获取该版本。


但是,如果您想强制使用特定工件的版本,您可以像平常一样在添加依赖项时指定它。您还可以完全放弃 BOM,并通过其工件版本指定依赖项。


从此版本(reactor-core 3.6.4)开始,相关发布系列中最新的稳定 BOM 是 2023.0.4 ,这就是下面的代码片段中使用的内容。此后可能会有更新的版本(包括快照、里程碑和新的发布系列),请参阅 https://projectreactor.io/docs 了解最新的工件和 BOM。


2.4.1. Maven安装


Maven 本身支持 BOM 概念。首先,您需要通过将以下代码段添加到 pom.xml 来导入 BOM:

<dependencyManagement> (1)
    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-bom</artifactId>
            <version>2023.0.4</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
1
请注意 dependencyManagement 标记。这是对常规 dependencies 部分的补充。


如果顶部部分 ( dependencyManagement ) 已存在于您的 pom 中,则仅添加内容。


接下来,像往常一样将您的依赖项添加到相关的反应器项目中,但没有 <version> ,如下所示:

<dependencies>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId> (1)
        (2)
    </dependency>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-test</artifactId> (3)
        <scope>test</scope>
    </dependency>
</dependencies>
1
对核心库的依赖。
2
这里没有版本标签。
3
reactor-test 提供对反应流进行单元测试的工具。


2.4.2.摇篮安装


在 5.0 版本之前,Gradle 没有对 Maven BOM 的核心支持,但您可以使用 Spring 的 gradle-dependency-management 插件。


首先,从 Gradle 插件门户应用插件,如下所示:

plugins {
    id "io.spring.dependency-management" version "1.0.7.RELEASE" (1)
}
1
截至撰写本文时,1.0.7.RELEASE 是该插件的最新版本。检查更新。


然后用它导入BOM,如下:

dependencyManagement {
     imports {
          mavenBom "io.projectreactor:reactor-bom:2023.0.4"
     }
}


最后向您的项目添加依赖项,不带版本号,如下所示:

dependencies {
     implementation 'io.projectreactor:reactor-core' (1)
}
1
该版本没有第三个 : 分隔部分。它取自 BOM。


从 Gradle 5.0 开始,您可以使用对 BOM 的本机 Gradle 支持:

dependencies {
     implementation platform('io.projectreactor:reactor-bom:2023.0.4')
     implementation 'io.projectreactor:reactor-core' (1)
}
1
该版本没有第三个 : 分隔部分。它取自 BOM。


2.4.3.里程碑和快照


里程碑和开发人员预览是通过 Spring Milestones 存储库而不是 Maven Central 分发的。要将其添加到构建配置文件中,请使用以下代码片段:


示例 1. Maven 中的里程碑
<repositories>
	<repository>
		<id>spring-milestones</id>
		<name>Spring Milestones Repository</name>
		<url>https://repo.spring.io/milestone</url>
	</repository>
</repositories>


对于 Gradle,请使用以下代码片段:


示例 2. Gradle 中的里程碑
repositories {
  maven { url 'https://repo.spring.io/milestone' }
  mavenCentral()
}


同样,快照也可在单独的专用存储库中使用,如下例所示:


示例 3.-Maven 中的快照
<repositories>
	<repository>
		<id>spring-snapshots</id>
		<name>Spring Snapshot Repository</name>
		<url>https://repo.spring.io/snapshot</url>
	</repository>
</repositories>

示例 4.-Gradle 中的快照
repositories {
  maven { url 'https://repo.spring.io/snapshot' }
  mavenCentral()
}


2.5.支持与政策


以下条目镜像 https://github.com/reactor/.github/blob/main/SUPPORT.adoc


2.5.1.你有问题吗?


首先搜索 Stack Overflow;如有需要可讨论


如果您不确定为什么某些功能不起作用或想知道是否有更好的方法,请首先查看 Stack Overflow,如有必要,请开始讨论。使用我们为此目的监控的标签中的相关标签:


  • reactor-netty 针对特定的reactor-netty问题


  • project-reactor 用于通用反应器问题


如果您喜欢实时讨论,我们还有一些 Gitter 频道:


  • reactor 是历史上最活跃的一个,大多数社区都可以提供帮助


  • reactor-core 用于围绕库的内部运作进行更高级的精确讨论


  • reactor-netty 用于解决特定于 netty 的问题


请参阅每个项目的自述文件,了解潜在的其他信息来源。


我们通常不鼓励打开 GitHub 问题来提问,而倾向于使用上述两个渠道。


2.5.2.我们的弃用政策


在处理弃用时,给定版本 A.B.C ,我们将确保:


  • 版本 A 中引入的弃用。 B0 将在版本 A 之前被删除。 B+10


  • 版本 A 中引入的弃用。 B1+ 将在版本 A 之前被删除。 B+20


  • 我们将努力在弃用 javadoc 中提及以下内容:


    • 要删除的目标最低版本


    • 指向已弃用方法的替代方法的指针


    • 不推荐使用该方法的版本


此政策自 2021 年 1 月起正式生效,适用于 2020.0 BOM 和较新版本系列中的所有模块,以及 Dysprosium-SR15 之后的 Dysprosium 版本。

弃用删除目标并不是硬性承诺,并且弃用的方法可以比这些最低目标 GA 版本继续存在(即,只有问题最严重的弃用方法才会被积极删除)。

也就是说,已超过其最低删除目标版本的已弃用代码可能会在任何后续版本(包括补丁版本,又称服务版本)中删除,恕不另行通知。所以用户还是应该努力尽早更新自己的代码。


2.5.3.积极发展


下表总结了各种 Reactor 版本系列的开发状态:

 版本  支持的


2022.0.x(核心 3.5.x,netty 1.1.x)


2020.0.x(代号Europium)(核心3.4.x,netty 1.0.x)


Dysprosium Train(核心 3.3.x,netty 0.9.x)


锎及以下(核心 < 3.3,网状 < 0.9)


Reactor 1.x 和 2.x 代


建议编辑“入门”


3. 响应式编程简介


Reactor是响应式编程范式的一种实现,可以概括如下:


反应式编程是一种与数据流和变化传播有关的异步编程范例。这意味着可以通过所使用的编程语言轻松表达静态(例如数组)或动态(例如事件发射器)数据流。
— https://en.wikipedia.org/wiki/Reactive_programming


作为响应式编程方向的第一步,Microsoft 在 .NET 生态系统中创建了响应式扩展 (Rx) 库。然后RxJava在JVM上实现了响应式编程。随着时间的推移,通过 Reactive Streams 的努力出现了 Java 标准化,该规范为 JVM 上的反应式库定义了一组接口和交互规则。它的接口已集成到 Java 9 中的 Flow 类下。


反应式编程范式通常以面向对象语言的形式呈现,作为观察者设计模式的扩展。您还可以将主要反应流模式与熟悉的迭代器设计模式进行比较,因为所有这些库中的 Iterable - Iterator 对都具有对偶性。一个主要区别是,迭代器是基于拉式的,而反应流是基于推式的。


使用迭代器是一种命令式编程模式,即使访问值的方法完全由 Iterable 负责。事实上,由开发人员选择何时访问序列中的 next() 项。在反应式流中,上述对的等效项是 Publisher-Subscriber 。但正是 Publisher 通知订阅者新的可用值,而这个推送方面是反应的关键。此外,应用于推送值的操作是声明式表达的,而不是命令式的:程序员表达计算的逻辑,而不是描述其精确的控制流。


除了推送值之外,还以明确定义的方式涵盖了错误处理和完成方面。 Publisher 可以将新值推送到其 Subscriber (通过调用 onNext ),但也可以发出错误信号(通过调用 onError )或完成(通过调用 onComplete )。错误和完成都会终止序列。这可以总结如下:

onNext x 0..N [onError | onComplete]


这种方法非常灵活。该模式支持没有值、一个值或 n 个值(包括无限序列的值,例如时钟的连续滴答声)的用例。


但为什么我们首先需要这样一个异步反应式库呢?


3.1.阻塞可能会造成浪费


现代应用程序可以覆盖大量并发用户,尽管现代硬件的功能不断提高,但现代软件的性能仍然是一个关键问题。


一般来说,有两种方法可以提高程序的性能:


  • 并行化以使用更多线程和更多硬件资源。


  • 寻求提高现有资源使用效率。


通常,Java 开发人员使用阻塞代码来编写程序。这种做法很好,直到出现性能瓶颈。然后是时候引入额外的线程,运行类似的阻塞代码。但资源利用率的这种扩展会很快引入争用和并发问题。


更糟糕的是,阻塞会浪费资源。如果仔细观察,一旦程序涉及一些延迟(特别是 I/O,例如数据库请求或网络调用),资源就会被浪费,因为线程(可能是许多线程)现在处于空闲状态,等待数据。


因此并行化方法并不是灵丹妙药。有必要充分利用硬件的能力,但推理起来也很复杂,而且容易造成资源浪费。


3.2.异步来拯救?


前面提到的第二种方法,寻求更高的效率,可以解决资源浪费问题。通过编写异步、非阻塞代码,您可以让执行切换到另一个使用相同底层资源的活动任务,并在异步处理完成后返回到当前进程。


但是如何在 JVM 上生成异步代码呢? Java 提供了两种异步编程模型:


  • 回调:异步方法没有返回值,但采用额外的 callback 参数(lambda 或匿名类),当结果可用时调用该参数。一个众所周知的例子是 Swing 的 EventListener 层次结构。


  • Futures:异步方法立即返回 Future<T> 。异步进程计算 T 值,但 Future 对象包装对其的访问。该值不会立即可用,可以轮询该对象,直到该值可用。例如,运行 Callable<T> 任务的 ExecutorService 使用 Future 对象。


这些技术足够好吗?并非适用于所有用例,并且这两种方法都有局限性。


回调很难组合在一起,很快就会导致代码难以阅读和维护(称为“回调地狱”)。


考虑一个示例:在用户界面上显示用户最喜欢的五个,或者在她没有最喜欢的情况下显示建议。这通过三个服务(一个提供最喜欢的 ID,第二个获取最喜欢的详细信息,第三个提供包含详细信息的建议),如下所示:


示例 5. 回调地狱示例
userService.getFavorites(userId, new Callback<List<String>>() { (1)
  public void onSuccess(List<String> list) { (2)
    if (list.isEmpty()) { (3)
      suggestionService.getSuggestions(new Callback<List<Favorite>>() {
        public void onSuccess(List<Favorite> list) { (4)
          UiUtils.submitOnUiThread(() -> { (5)
            list.stream()
                .limit(5)
                .forEach(uiList::show); (6)
            });
        }

        public void onError(Throwable error) { (7)
          UiUtils.errorPopup(error);
        }
      });
    } else {
      list.stream() (8)
          .limit(5)
          .forEach(favId -> favoriteService.getDetails(favId, (9)
            new Callback<Favorite>() {
              public void onSuccess(Favorite details) {
                UiUtils.submitOnUiThread(() -> uiList.show(details));
              }

              public void onError(Throwable error) {
                UiUtils.errorPopup(error);
              }
            }
          ));
    }
  }

  public void onError(Throwable error) {
    UiUtils.errorPopup(error);
  }
});
1
我们有基于回调的服务:一个 Callback 接口,其中包含异步过程成功时调用的方法和发生错误时调用的方法。
2
第一个服务使用最喜欢的 ID 列表调用其回调。
3
如果列表为空,我们必须转到 suggestionService
4
suggestionService 向第二个回调提供 List<Favorite>
5
由于我们处理 UI,因此需要确保我们的消费代码在 UI 线程中运行。
6
我们使用 Java 8 Stream 将处理的建议数量限制为 5,并在 UI 中以图形列表形式显示它们。
7
在每个级别,我们都以相同的方式处理错误:我们在弹出窗口中显示它们。
8
返回最喜欢的 ID 级别。如果服务返回完整列表,我们需要转到 favoriteService 来获取详细的 Favorite 对象。由于我们只需要五个,因此我们首先流式传输 ID 列表以将其限制为五个。
9
再次回调。这次我们得到了一个成熟的 Favorite 对象,我们将其推送到 UI 线程内的 UI。


这是很多代码,有点难以理解并且有重复的部分。考虑它在 Reactor 中的等价物:


示例 6. 与回调代码等效的 Reactor 代码示例
userService.getFavorites(userId) (1)
           .flatMap(favoriteService::getDetails) (2)
           .switchIfEmpty(suggestionService.getSuggestions()) (3)
           .take(5) (4)
           .publishOn(UiUtils.uiThreadScheduler()) (5)
           .subscribe(uiList::show, UiUtils::errorPopup); (6)
1
我们从最喜欢的 ID 流开始。
2
我们将它们异步转换为详细的 Favorite 对象 ( flatMap )。我们现在的流程是 Favorite
3
如果 Favorite 的流程为空,我们会通过 suggestionService 切换到回退。
4
我们最多只对结果流中的五个元素感兴趣。
5
最后,我们要在UI线程中处理每一条数据。
6
我们通过描述如何处理数据的最终形式(在 UI 列表中显示它)以及发生错误时如何执行(显示弹出窗口)来触发流程。


如果您想确保在 800 毫秒内检索到最喜爱的 ID,或者如果需要更长的时间,则从缓存中获取它们,该怎么办?在基于回调的代码中,这是一项复杂的任务。在 Reactor 中,它变得就像在链中添加 timeout 运算符一样简单,如下所示:


示例 7. 具有超时和回退功能的 Reactor 代码示例
userService.getFavorites(userId)
           .timeout(Duration.ofMillis(800)) (1)
           .onErrorResume(cacheService.cachedFavoritesFor(userId)) (2)
           .flatMap(favoriteService::getDetails) (3)
           .switchIfEmpty(suggestionService.getSuggestions())
           .take(5)
           .publishOn(UiUtils.uiThreadScheduler())
           .subscribe(uiList::show, UiUtils::errorPopup);
1
如果上面的部分超过 800 毫秒没有发出任何信号,则传播错误。
2
如果出现错误,请返回到 cacheService
3
链的其余部分与前面的示例类似。


Future 对象比回调好一点,但尽管 CompletableFuture 在 Java 8 中带来了改进,但它们在组合方面仍然表现不佳。将多个 Future 对象编排在一起是可行的,但并不容易。此外, Future 还有其他问题:


  • 通过调用 get() 方法,很容易导致 Future 对象出现另一种阻塞情况。


  • 它们不支持惰性计算。


  • 它们缺乏对多个值和高级错误处理的支持。


考虑另一个例子:我们得到一个 ID 列表,我们想要从中获取名称和统计信息,并将它们成对地组合起来,所有这些都是异步的。以下示例使用 CompletableFuture 类型的列表执行此操作:


示例 8. CompletableFuture 组合示例
CompletableFuture<List<String>> ids = ifhIds(); (1)

CompletableFuture<List<String>> result = ids.thenComposeAsync(l -> { (2)
	Stream<CompletableFuture<String>> zip =
			l.stream().map(i -> { (3)
				CompletableFuture<String> nameTask = ifhName(i); (4)
				CompletableFuture<Integer> statTask = ifhStat(i); (5)

				return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + name + " has stats " + stat); (6)
			});
	List<CompletableFuture<String>> combinationList = zip.collect(Collectors.toList()); (7)
	CompletableFuture<String>[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]);

	CompletableFuture<Void> allDone = CompletableFuture.allOf(combinationArray); (8)
	return allDone.thenApply(v -> combinationList.stream()
			.map(CompletableFuture::join) (9)
			.collect(Collectors.toList()));
});

List<String> results = result.join(); (10)
assertThat(results).contains(
		"Name NameJoe has stats 103",
		"Name NameBart has stats 104",
		"Name NameHenry has stats 105",
		"Name NameNicole has stats 106",
		"Name NameABSLAJNFOAJNFOANFANSF has stats 121");
1
我们从一个 future 开始,它为我们提供了要处理的 id 值列表。
2
获得列表后,我们希望开始一些更深入的异步处理。
3
对于列表中的每个元素:
4
异步获取关联名称。
5
异步获取相关统计信息。
6  合并两个结果。
7
我们现在有一个代表所有组合任务的 future 列表。为了执行这些任务,我们需要将列表转换为数组。
8
将数组传递给 CompletableFuture.allOf ,它会输出一个 Future ,该 Future 在所有任务完成后完成。
9
棘手的一点是 allOf 返回 CompletableFuture<Void> ,因此我们在 future 列表上重申,通过使用 join() 收集它们的结果(这里,这不会阻止,因为 allOf 确保期货全部完成)。
10
一旦整个异步管道被触发,我们就会等待它被处理并返回我们可以断言的结果列表。


由于 Reactor 有更多开箱即用的组合运算符,因此可以简化此过程,如下所示:


示例 9. 与未来代码等效的 Reactor 代码示例
Flux<String> ids = ifhrIds(); (1)

Flux<String> combinations =
		ids.flatMap(id -> { (2)
			Mono<String> nameTask = ifhrName(id); (3)
			Mono<Integer> statTask = ifhrStat(id); (4)

			return nameTask.zipWith(statTask, (5)
					(name, stat) -> "Name " + name + " has stats " + stat);
		});

Mono<List<String>> result = combinations.collectList(); (6)

List<String> results = result.block(); (7)
assertThat(results).containsExactly( (8)
		"Name NameJoe has stats 103",
		"Name NameBart has stats 104",
		"Name NameHenry has stats 105",
		"Name NameNicole has stats 106",
		"Name NameABSLAJNFOAJNFOANFANSF has stats 121"
);
1
这次,我们从异步提供的 ids 序列( Flux<String> )开始。
2
对于序列中的每个元素,我们异步处理它两次(在主体 flatMap 调用的函数内)。
3
获取关联名称。
4
获取相关统计数据。
5
异步组合两个值。
6
当这些值可用时,将它们聚合到 List 中。
7
在生产中,我们将通过进一步组合或订阅它来继续异步使用 Flux 。最有可能的是,我们会返回 result Mono 。由于我们正在进行测试,因此我们会阻塞,等待处理完成,然后直接返回聚合的值列表。
8  断言结果。


使用回调和 Future 对象的危险是相似的,这正是反应式编程通过 Publisher-Subscriber 对解决的问题。


3.3.从命令式编程到反应式编程


反应式库(例如 Reactor)旨在解决 JVM 上“经典”异步方法的这些缺点,同时还关注一些其他方面:


  • 可组合性和可读性


  • 数据作为一个流,通过丰富的运算符词汇进行操作


  • 在您订阅之前什么都不会发生


  • 背压或消费者向生产者发出排放率过高信号的能力


  • 与并发无关的高水平但高价值的抽象


3.3.1.可组合性和可读性


“可组合性”是指编排多个异步任务的能力,其中我们使用先前任务的结果将输入提供给后续任务。或者,我们可以以分叉连接方式运行多个任务。此外,我们可以将异步任务作为更高级别系统中的离散组件进行重用。


编排任务的能力与代码的可读性和可维护性紧密耦合。随着异步进程层的数量和复杂性的增加,编写和读取代码变得越来越困难。正如我们所看到的,回调模型很简单,但其主要缺点之一是,对于复杂的流程,您需要从回调中执行回调,而回调本身又嵌套在另一个回调中,等等。这种混乱被称为“回调地狱”。正如您可以猜测的那样(或从经验中知道),这样的代码很难回溯和推理。


Reactor 提供了丰富的组合选项,其中代码反映了抽象过程的组织,并且所有内容通常都保持在同一级别(嵌套被最小化)。


3.3.2.装配线类比


您可以将反应式应用程序处理的数据视为在装配线上移动。 Reactor既是传送带,又是工作站。原材料从源头(原始的 Publisher )流出,最终成为成品,准备推向消费者(或 Subscriber )。


原材料可以经历各种转变和其他中间步骤,或者成为将中间件聚合在一起的更大装配线的一部分。如果某一点出现故障或堵塞(也许装箱产品需要花费不成比例的长时间),受影响的工作站可以向上游发出信号,以限制原材料的流动。

 3.3.3.运营商


在 Reactor 中,操作员是我们装配类比中的工作站。每个运算符都会向 Publisher 添加行为,并将上一步的 Publisher 包装到一个新实例中。整个链因此被链接起来,使得数据源自第一个 Publisher 并沿着链向下移动,并由每个链接进行转换。最终, Subscriber 完成了该过程。请记住,在 Subscriber 订阅 Publisher 之前不会发生任何事情,我们很快就会看到。


了解运算符创建新实例可以帮助您避免常见错误,这种错误会让您相信您在链中使用的运算符没有被应用。请参阅常见问题解答中的此项。


虽然反应式流规范根本没有指定运算符,但反应式库(例如 Reactor)的最佳附加值之一是它们提供的丰富的运算符词汇表。这些涵盖了很多领域,从简单的转换和过滤到复杂的编排和错误处理。


3.3.4.在你 subscribe() 之前什么都不会发生


在 Reactor 中,当您编写 Publisher 链时,默认情况下数据不会开始注入其中。相反,您创建异步流程的抽象描述(这有助于提高可重用性和组合性)。


通过订阅行为,您将 Publisher 绑定到 Subscriber ,这会触发整个链中的数据流。这是通过来自 Subscriber 的单个 request 信号在内部实现的,该信号向上游传播,一直返回到源 Publisher

 3.3.5.背压


向上游传播信号也用于实现反压,我们在装配线类比中将其描述为当工作站处理速度比上游工作站慢时沿生产线发送的反馈信号。


Reactive Streams 规范定义的真实机制非常接近这个类比:订阅者可以在无界模式下工作,让源以最快的可实现速率推送所有数据,或者可以使用 request 机制来向源发出信号,表明它已准备好处理最多 n 个元素。


中间操作员还可以更改传输中的请求。想象一个 buffer 运算符将元素按十个为一组进行分组。如果订阅者请求一个缓冲区,则源产生十个元素是可以接受的。一些运算符还实现预取策略,这可以避免 request(1) 往返,并且如果在请求之前生成元素的成本不是太高,那么这是有益的。


这将推模型转换为推拉混合模型,其中下游可以从上游拉取 n 个元素(如果这些元素随时可用)。但如果元素还没有准备好,它们在生产时就会被上游推送。


3.3.6.热与冷


Rx 反应库家族区分了两大类反应序列:热反应序列和冷反应序列。这种区别主要与反应流如何对订阅者做出反应有关:


  • 对于每个 Subscriber ,冷序列都会重新开始,包括在数据源处。例如,如果源包装 HTTP 调用,则会为每个订阅发出新的 HTTP 请求。


  • 对于每个 Subscriber ,热序列不会从头开始。相反,迟到的订阅者会收到订阅后发出的信号。但请注意,某些热反应流可以全部或部分缓存或重放排放历史记录。从一般角度来看,当没有订阅者正在侦听时,热序列甚至可以发出(“订阅之前不会发生任何事情”规则的例外)。


有关 Reactor 上下文中的热与冷的更多信息,请参阅此特定于 Reactor 的部分。


建议编辑“反应式编程简介”


4. Reactor核心特点


Reactor 项目的主要工件是 reactor-core ,这是一个专注于 Reactive Streams 规范并针对 Java 8 的反应式库。


Reactor 引入了可组合的反应类型,它实现了 Publisher ,但也提供了丰富的运算符词汇: FluxMonoFlux 对象表示 0..N 个项目的反应序列,而 Mono 对象表示单值或空 (0..1) 结果。


这种区别将一些语义信息带入类型中,指示异步处理的粗略基数。例如,HTTP 请求仅产生一个响应,因此执行 count 操作没有多大意义。因此,将此类 HTTP 调用的结果表示为 Mono<HttpResponse> 比将其表示为 Flux<HttpResponse> 更有意义,因为它仅提供与零个或一个项目的上下文相关的运算符物品。


更改处理的最大基数的运算符也会切换到相关类型。例如, count 运算符存在于 Flux 中,但它返回 Mono<Long>


4.1. Flux ,0-N 项的异步序列


下图显示了 Flux 如何转换项目:

Flux


Flux<T> 是标准 Publisher<T> ,表示 0 到 N 个发出项目的异步序列,可以选择由完成信号或错误终止。与反应流规范中一样,这三种类型的信号转换为对下游订阅者的 onNextonCompleteonError 方法的调用。


由于可能的信号范围很大, Flux 是通用的反应类型。请注意,所有事件,甚至终止事件,都是可选的:没有 onNext 事件,但 onComplete 事件表示空的有限序列,但删除 onComplete 并且您有一个无限的空序列(不是特别有用,除了围绕取消的测试)。同样,无限序列不一定是空的。例如, Flux.interval(Duration) 生成一个无限的 Flux<Long> 并从时钟发出规则的滴答声。


4.2. Mono ,异步 0-1 结果


下图显示了 Mono 如何转换项目:

Mono


Mono<T> 是一种专门的 Publisher<T> ,它通过 onNext 信号最多发出一项,然后以 onComplete 信号终止(成功的 Mono ,有或没有值),或仅发出单个 onError 信号(失败 Mono )。


大多数 Mono 实现在调用 onNext 后应立即在其 Subscriber 上调用 onCompleteMono.never() 是一个异常值:它不发出任何信号,这在技术上并没有被禁止,尽管在测试之外并不是很有用。另一方面,明确禁止 onNextonError 的组合。


Mono 仅提供可用于 Flux 的运算符子集,以及一些运算符(特别是那些将 Mono 与另一个 Publisher 。例如, Mono#concatWith(Publisher) 返回 FluxMono#then(Mono) 返回另一个 Mono


请注意,您可以使用 Mono 来表示仅具有完成概念的无值异步流程(类似于 Runnable )。要创建一个,您可以使用空的 Mono<Void>


4.3.创建 Flux 或 Mono 并订阅它的简单方法


开始使用 FluxMono 的最简单方法是使用在各自类中找到的众多工厂方法之一。


例如,要创建 String 序列,您可以枚举它们或将它们放入集合中并从中创建 Flux,如下所示:

Flux<String> seq1 = Flux.just("foo", "bar", "foobar");

List<String> iterable = Arrays.asList("foo", "bar", "foobar");
Flux<String> seq2 = Flux.fromIterable(iterable);


工厂方法的其他示例包括:

Mono<String> noData = Mono.empty(); (1)

Mono<String> data = Mono.just("foo");

Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3); (2)
1
请注意,工厂方法遵循泛型类型,即使它没有值。
2
第一个参数是范围的开始,第二个参数是要生产的项目数。


在订阅方面, FluxMono 使用 Java 8 lambda。您可以选择多种 .subscribe() 变体,这些变体采用 lambda 来实现不同的回调组合,如以下方法签名所示:


示例 10. Flux 基于 Lambda 的订阅变体
subscribe(); (1)

subscribe(Consumer<? super T> consumer); (2)

subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer); (3)

subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer); (4)

subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer,
          Consumer<? super Subscription> subscriptionConsumer); (5)
1
订阅并触发序列。
2
对每个产生的价值做一些事情。
3
处理价值观,但也要对错误做出反应。
4
处理值和错误,但在序列成功完成时运行一些代码。
5
处理值和错误并成功完成,但也对此 subscribe 调用生成的 Subscription 执行某些操作。

这些变体返回对订阅的引用,您可以在不需要更多数据时使用该引用取消订阅。取消后,源应停止生成值并清理其创建的所有资源。这种取消和清理行为在 Reactor 中由通用 Disposable 接口表示。


4.3.1. subscribe 方法示例


本节包含 subscribe 方法的五个签名中每个签名的最小示例。以下代码显示了不带参数的基本方法的示例:

Flux<Integer> ints = Flux.range(1, 3); (1)
ints.subscribe(); (2)
1
设置一个 Flux ,在订阅者附加时生成三个值。
2
以最简单的方式订阅。


前面的代码不会产生可见的输出,但它确实有效。 Flux 产生三个值。如果我们提供 lambda,我们就可以使值可见。 subscribe 方法的下一个示例显示了显示值的一种方法:

Flux<Integer> ints = Flux.range(1, 3); (1)
ints.subscribe(i -> System.out.println(i)); (2)
1
设置一个 Flux ,在订阅者附加时生成三个值。
2
与将打印值的订阅者一起订阅。


前面的代码产生以下输出:

1
2
3


为了演示下一个签名,我们故意引入一个错误,如下例所示:

Flux<Integer> ints = Flux.range(1, 4) (1)
      .map(i -> { (2)
        if (i <= 3) return i; (3)
        throw new RuntimeException("Got to 4"); (4)
      });
ints.subscribe(i -> System.out.println(i), (5)
      error -> System.err.println("Error: " + error));
1
设置一个 Flux,在订阅者连接时生成四个值。
2
我们需要一个映射,以便我们可以以不同的方式处理某些值。
3
对于大多数值,返回该值。
4
对于一个值,强制产生一个错误。
5
使用包含错误处理程序的订阅者进行订阅。


现在我们有两个 lambda 表达式:一个用于我们期望的内容,另一个用于错误。前面的代码产生以下输出:

1
2
3
Error: java.lang.RuntimeException: Got to 4


subscribe 方法的下一个签名包括错误处理程序和完成事件处理程序,如以下示例所示:

Flux<Integer> ints = Flux.range(1, 4); (1)
ints.subscribe(i -> System.out.println(i),
    error -> System.err.println("Error " + error),
    () -> System.out.println("Done")); (2)
1
设置一个 Flux,在订阅者连接时生成四个值。
2
使用包含完成事件处理程序的订阅者进行订阅。


错误信号和完成信号都是终止事件并且是互斥的(你永远不会同时得到两者)。为了使完成消费者工作,我们必须注意不要触发错误。


完成回调没有输入,由一对空括号表示:它与 Runnable 接口中的 run 方法匹配。前面的代码产生以下输出:

1
2
3
4
Done


4.3.2.使用 Disposable 取消 subscribe()


所有这些基于 lambda 的 subscribe() 变体都有一个 Disposable 返回类型。在本例中, Disposable 接口表示可以通过调用其 dispose() 方法来取消订阅。


对于 FluxMono ,取消是源应停止生成元素的信号。但是,不能保证立即执行:某些源可能会非常快地生成元素,甚至在收到取消指令之前它们就可以完成。


Disposable 周围的一些实用程序可在 Disposables 类中使用。其中, Disposables.swap() 创建一个 Disposable 包装器,让您可以原子地取消和替换具体的 Disposable 。例如,在 UI 场景中,每当用户单击按钮时,您想要取消请求并用新请求替换它,这可能很有用。处理包装纸本身即可将其关闭。这样做会处理当前的具体值和所有未来尝试的替换。


另一个有趣的实用程序是 Disposables.composite(…​) 。此组合允许您收集多个 Disposable — 例如,与服务调用关联的多个正在进行的请求 — 并稍后立即处理所有这些请求。一旦组合的 dispose() 方法被调用,任何添加另一个 Disposable 的尝试都会立即释放它。


4.3.3. Lambda 的替代方案: BaseSubscriber


还有一种更通用的 subscribe 方法,它采用成熟的 Subscriber ,而不是从 lambda 中组合一个。为了帮助编写这样的 Subscriber ,我们提供了一个名为 BaseSubscriber 的可扩展类。


BaseSubscriber (或其子类)的实例是一次性的,这意味着如果 BaseSubscriber 订阅了第二个 Publisher ,则它会取消对第一个 Publisher 的订阅 Publisher 。这是因为使用一个实例两次会违反反应流规则,即 SubscriberonNext 方法不得并行调用。因此,只有直接在 Publisher#subscribe(Subscriber) 调用中声明匿名实现才可以。


现在我们可以实现其中之一。我们称之为 SampleSubscriber 。以下示例显示了如何将其附加到 Flux

SampleSubscriber<Integer> ss = new SampleSubscriber<Integer>();
Flux<Integer> ints = Flux.range(1, 4);
ints.subscribe(ss);


以下示例显示了 SampleSubscriber 作为 BaseSubscriber 的简约实现的样子:

package io.projectreactor.samples;

import org.reactivestreams.Subscription;

import reactor.core.publisher.BaseSubscriber;

public class SampleSubscriber<T> extends BaseSubscriber<T> {

	@Override
	public void hookOnSubscribe(Subscription subscription) {
		System.out.println("Subscribed");
		request(1);
	}

	@Override
	public void hookOnNext(T value) {
		System.out.println(value);
		request(1);
	}
}


SampleSubscriber 类扩展了 BaseSubscriber ,这是 Reactor 中用户定义的 Subscribers 推荐的抽象类。该类提供了可以重写的挂钩来调整订阅者的行为。默认情况下,它会触发无界请求,其行为与 subscribe() 完全相同。但是,当您需要自定义请求量时,扩展 BaseSubscriber 会更有用。


对于自定义请求量,最低限度是实现 hookOnSubscribe(Subscription subscription)hookOnNext(T value) ,就像我们所做的那样。在我们的例子中, hookOnSubscribe 方法将一条语句打印到标准输出并发出第一个请求。然后 hookOnNext 方法打印一条语句并执行其他请求,一次一个请求。


SampleSubscriber 类产生以下输出:

Subscribed
1
2
3
4


BaseSubscriber 还提供了 requestUnbounded() 方法来切换到无界模式(相当于 request(Long.MAX_VALUE) ),以及 cancel() 方法。


它还具有额外的钩子: hookOnCompletehookOnErrorhookOnCancelhookFinally (当序列终止时总是调用它,带有作为 SignalType 参数传入的终止类型)


您几乎肯定想要实现 hookOnErrorhookOnCancelhookOnComplete 方法。您可能还想实现 hookFinally 方法。 SampleSubscriber 是执行有界请求的 Subscriber 的绝对最小实现。


4.3.4.关于背压和重塑请求的方法


在 Reactor 中实现背压时,消费者压力传播回源的方式是向上游算子发送 request 。当前请求的总和有时被称为当前“需求”或“待处理请求”。需求上限为 Long.MAX_VALUE ,代表无限制的请求(意味着“尽可能快地生产”——基本上禁用背压)。


第一个请求来自订阅时的最终订阅者,但最直接的订阅方式都会立即触发 Long.MAX_VALUE 的无限请求:


  • subscribe() 及其大多数基于 lambda 的变体(具有 Consumer<Subscription> 的变体除外)


  • block()blockFirst()blockLast()


  • 迭代 toIterable()toStream()


自定义原始请求的最简单方法是将 subscribeBaseSubscriber 结合使用,并覆盖 hookOnSubscribe 方法,如以下示例所示:

Flux.range(1, 10)
    .doOnRequest(r -> System.out.println("request of " + r))
    .subscribe(new BaseSubscriber<Integer>() {

      @Override
      public void hookOnSubscribe(Subscription subscription) {
        request(1);
      }

      @Override
      public void hookOnNext(Integer integer) {
        System.out.println("Cancelling after having received " + integer);
        cancel();
      }
    });


前面的代码片段打印出以下内容:

request of 1
Cancelling after having received 1

在操作请求时,您必须小心地产生足够的需求以使序列前进,否则您的 Flux 可能会“卡住”。这就是为什么 BaseSubscriber 默认为 hookOnSubscribe 中的无界请求。当覆盖此钩子时,您通常应该至少调用一次 request

改变下游需求的运营商


需要记住的一件事是,上游链中的每个运营商都可以重塑订阅级别表达的需求。一个教科书案例是 buffer(N) 运算符:如果它接收到 request(2) ,则它被解释为对两个完整缓冲区的需求。因此,由于缓冲区需要 N 元素才能被视为已满,因此 buffer 运算符将请求重塑为 2 x N


您可能还注意到,某些运算符具有采用名为 prefetchint 输入参数的变体。这是修改下游请求的另一类运算符。这些通常是处理内部序列的运算符,从每个传入元素派生出 Publisher (如 flatMap )。


预取是一种调整对这些内部序列发出的初始请求的方法。如果未指定,大多数这些运算符都以 32 需求开始。


这些操作符通常还会实现补充优化:一旦操作符看到 75% 的预取请求已得到满足,它就会从上游重新请求 75%。这是一种启发式优化,以便这些操作员能够主动预测即将到来的请求。


最后,几个运算符可让您直接调整请求: limitRatelimitRequest


limitRate(N) 拆分下游请求,以便它们以较小的批次向上游传播。例如,向 limitRate(10) 发出的 100 请求最多会导致 1010 个请求传播到上游。请注意,在这种形式中, limitRate 实际上实现了前面讨论的补充优化。


操作员有一个变体,还可以让您调整补充量(在变体中称为 lowTide ): limitRate(highTide, lowTide) 。选择 0 中的 lowTide 会导致严格的 highTide 请求批次,而不是通过补货策略进一步返工的批次。


另一方面, limitRequest(N) 将下游请求限制为最大总需求。它将请求添加到 N 。如果单个 request 没有使总需求溢出 N ,则该特定请求将完全向上游传播。在源发出该数量后, limitRequest 认为序列已完成,向下游发送 onComplete 信号,并取消源。


4.4.以编程方式创建序列


在本节中,我们将介绍通过以编程方式定义其关联事件( onNextonErroronComplete )。所有这些方法都有一个共同的事实:它们公开一个 API 来触发我们称为接收器的事件。实际上有一些水槽变体,我们很快就会介绍。


4.4.1.同步 generate


以编程方式创建 Flux 的最简单形式是通过 generate 方法,该方法采用生成器函数。


这适用于同步和一对一的发射,这意味着接收器是一个 SynchronousSink 并且它的 next() 方法每次回调调用最多只能调用一次。然后,您可以另外调用 error(Throwable)complete() ,但这是可选的。


最有用的变体可能还可以让您保留一种状态,您可以在接收器使用中参考该状态来决定下一步要发出什么。然后生成器函数变成 BiFunction<S, SynchronousSink<T>, S> ,其中 <S> 是状态对象的类型。您必须为初始状态提供 Supplier<S> ,并且您的生成器函数现在在每一轮返回一个新状态。


例如,您可以使用 int 作为状态:


示例 11. 基于状态的 generate 示例
Flux<String> flux = Flux.generate(
    () -> 0, (1)
    (state, sink) -> {
      sink.next("3 x " + state + " = " + 3*state); (2)
      if (state == 10) sink.complete(); (3)
      return state + 1; (4)
    });
1
我们提供初始状态值 0。
2
我们使用状态来选择要发出的内容(3 乘法表中的一行)。
3
我们还用它来选择何时停止。
4
我们返回一个在下一次调用中使用的新状态(除非序列在本次调用中终止)。


上述代码生成 3 的表,顺序如下:

3 x 0 = 0
3 x 1 = 3
3 x 2 = 6
3 x 3 = 9
3 x 4 = 12
3 x 5 = 15
3 x 6 = 18
3 x 7 = 21
3 x 8 = 24
3 x 9 = 27
3 x 10 = 30


您还可以使用可变的 <S> 。例如,可以使用单个 AtomicLong 作为状态来重写上面的示例,并在每一轮中对其进行变异:


示例 12. 可变状态变体
Flux<String> flux = Flux.generate(
    AtomicLong::new, (1)
    (state, sink) -> {
      long i = state.getAndIncrement(); (2)
      sink.next("3 x " + i + " = " + 3*i);
      if (i == 10) sink.complete();
      return state; (3)
    });
1
这次,我们生成一个可变对象作为状态。
2
我们在这里改变状态。
3
我们返回与新状态相同的实例。

如果您的状态对象需要清理一些资源,请使用 generate(Supplier<S>, BiFunction, Consumer<S>) 变体来清理最后一个状态实例。


以下示例使用包含 Consumergenerate 方法:

Flux<String> flux = Flux.generate(
    AtomicLong::new,
      (state, sink) -> { (1)
      long i = state.getAndIncrement(); (2)
      sink.next("3 x " + i + " = " + 3*i);
      if (i == 10) sink.complete();
      return state; (3)
    }, (state) -> System.out.println("state: " + state)); (4)
1
同样,我们生成一个可变对象作为状态。
2
我们在这里改变状态。
3
我们返回与新状态相同的实例。
4
我们将最后一个状态值 (11) 视为此 Consumer lambda 的输出。


如果状态包含数据库连接或需要在流程结束时处理的其他资源, Consumer lambda 可以关闭连接或以其他方式处理应在结束时完成的任何任务的过程。


4.4.2.异步和多线程: create


createFlux 的编程创建的更高级形式,适用于每轮多次发射,甚至来自多个线程。


它公开了 FluxSink 及其 nexterrorcomplete 方法。与 generate 相反,它没有基于状态的变体。另一方面,它可以在回调中触发多线程事件。


create 对于桥接现有 API 与反应式世界非常有用 - 例如基于侦听器的异步 API。

create 不会并行化您的代码,也不会使其异步,即使它可以与异步 API 一起使用。如果您在 create lambda 中进行阻塞,则会面临死锁和类似副作用的风险。即使使用 subscribeOn ,也需要注意的是,长阻塞 create lambda(例如调用 sink.next(t) 的无限循环)可能会锁定管道:请求永远不会被执行,因为循环使它们应该运行的同一线程处于饥饿状态。使用 subscribeOn(Scheduler, false) 变体: requestOnSeparateThread = false 将为 create 使用 Scheduler 线程,并且仍然通过执行 request


假设您使用基于侦听器的 API。它按块处理数据并有两个事件:(1) 数据块已准备好,(2) 处理完成(终止事件),如 MyEventListener 界面所示:

interface MyEventListener<T> {
    void onDataChunk(List<T> chunk);
    void processComplete();
}


您可以使用 create 将其桥接到 Flux<T>

Flux<String> bridge = Flux.create(sink -> {
    myEventProcessor.register( (4)
      new MyEventListener<String>() { (1)

        public void onDataChunk(List<String> chunk) {
          for(String s : chunk) {
            sink.next(s); (2)
          }
        }

        public void processComplete() {
            sink.complete(); (3)
        }
    });
});
1
桥接到 MyEventListener API
2
块中的每个元素都成为 Flux 中的元素。
3
processComplete 事件被转换为 onComplete
4
每当 myEventProcessor 执行时,所有这些都是异步完成的。


此外,由于 create 可以桥接异步 API 并管理反压,因此您可以通过指示 OverflowStrategy 来细化反压行为方式:


  • IGNORE 完全忽略下游背压请求。当下游队列已满时,这可能会产生 IllegalStateException


  • ERROR 当下游无法跟上时发出 IllegalStateException 信号。


  • DROP 如果下游未准备好接收传入信号,则丢弃该信号。


  • LATEST 让下游只获取来自上游的最新信号。


  • BUFFER (默认)在下游无法跟上时缓冲所有信号。 (这会进行无限制的缓冲,并可能导致 OutOfMemoryError )。


Mono 还有一个 create 生成器。 Mono 的 MonoSink 不允许多次发射。它将丢弃第一个信号之后的所有信号。


4.4.3.异步但单线程: push


pushgeneratecreate 之间的中间立场,适合处理来自单个生产者的事件。它与 create 类似,因为它也可以是异步的,并且可以使用 create 支持的任何溢出策略来管理背压。然而,一次只有一个生产线程可以调用 nextcompleteerror

Flux<String> bridge = Flux.push(sink -> {
    myEventProcessor.register(
      new SingleThreadEventListener<String>() { (1)

        public void onDataChunk(List<String> chunk) {
          for(String s : chunk) {
            sink.next(s); (2)
          }
        }

        public void processComplete() {
            sink.complete(); (3)
        }

        public void processError(Throwable e) {
            sink.error(e); (4)
        }
    });
});
1
桥接到 SingleThreadEventListener API。
2
使用 next 从单个侦听器线程将事件推送到接收器。
3
从同一侦听器线程生成的 complete 事件。
4
error 事件也从同一侦听器线程生成。

混合推/拉模型


大多数 Reactor 运算符(例如 create )遵循混合推/拉模型。我们的意思是,尽管大部分处理是异步的(建议采用推送方法),但其中有一个小的拉取组件:请求。


消费者从源中提取数据,因为在第一次请求之前它不会发出任何数据。只要数据可用,源就会将数据推送给消费者,但要在其请求的数量范围内。


请注意, push()create() 都允许设置 onRequest 消费者,以便管理请求量并确保仅在以下情况下才通过接收器推送数据:有待处理的请求。

Flux<String> bridge = Flux.create(sink -> {
    myMessageProcessor.register(
      new MyMessageListener<String>() {

        public void onMessage(List<String> messages) {
          for(String s : messages) {
            sink.next(s); (3)
          }
        }
    });
    sink.onRequest(n -> {
        List<String> messages = myMessageProcessor.getHistory(n); (1)
        for(String s : messages) {
           sink.next(s); (2)
        }
    });
});
1
发出请求时轮询消息。
2
如果消息立即可用,则将它们推送到接收器。
3
稍后异步到达的剩余消息也会被传递。

push()create() 之后清理


两个回调 onDisposeonCancel 在取消或终止时执行任何清理。 onDispose 可用于在 Flux 完成、出错或取消时执行清理。 onCancel 可用于在使用 onDispose 进行清理之前执行任何特定于取消的操作。

Flux<String> bridge = Flux.create(sink -> {
    sink.onRequest(n -> channel.poll(n))
        .onCancel(() -> channel.cancel()) (1)
        .onDispose(() -> channel.close())  (2)
    });
1
onCancel 首先被调用,仅用于取消信号。
2
onDispose 被调用用于完成、错误或取消信号。

 4.4.4.处理


handle 方法有点不同:它是一个实例方法,这意味着它链接在现有源上(与常见运算符一样)。它出现在 MonoFlux 中。


它接近于 generate ,因为它使用 SynchronousSink 并且只允许一对一的发射。但是, handle 可用于从每个源元素生成任意值,可能会跳过某些元素。这样,它就可以作为 mapfilter 的组合。句柄签名如下:

Flux<R> handle(BiConsumer<T, SynchronousSink<R>>);


让我们考虑一个例子。反应式流规范不允许序列中存在 null 值。如果您想要执行 map 但想要使用预先存在的方法作为映射函数,并且该方法有时返回 null,该怎么办?


例如,以下方法可以安全地应用于整数源:

public String alphabet(int letterNumber) {
	if (letterNumber < 1 || letterNumber > 26) {
		return null;
	}
	int letterIndexAscii = 'A' + letterNumber - 1;
	return "" + (char) letterIndexAscii;
}


然后我们可以使用 handle 删除任何空值:


示例 13. 将 handle 用于“映射并消除空值”场景
Flux<String> alphabet = Flux.just(-1, 30, 13, 9, 20)
    .handle((i, sink) -> {
        String letter = alphabet(i); (1)
        if (letter != null) (2)
            sink.next(letter); (3)
    });

alphabet.subscribe(System.out::println);
1  映射到字母。
2
如果“地图函数”返回 null…​。
3
通过不调用 sink.next 将其过滤掉。


将打印出:

M
I
T


4.5.线程和调度程序


Reactor 与 RxJava 一样,可以被认为是与并发无关的。也就是说,它不强制执行并发模型。相反,它让你(开发人员)来指挥。但是,这并不妨碍该库帮助您处理并发性。


获取 FluxMono 并不一定意味着它在专用的 Thread 中运行。相反,大多数运算符继续在前一个运算符执行的 Thread 中工作。除非指定,否则最顶层的运算符(源)本身在进行 subscribe() 调用的 Thread 上运行。以下示例在新线程中运行 Mono

public static void main(String[] args) throws InterruptedException {
  final Mono<String> mono = Mono.just("hello "); (1)

  Thread t = new Thread(() -> mono
      .map(msg -> msg + "thread ")
      .subscribe(v -> (2)
          System.out.println(v + Thread.currentThread().getName()) (3)
      )
  )
  t.start();
  t.join();

}
1
Mono<String> 在线程 main 中组装。
2
但是,它是在线程 Thread-0 中订阅的。
3
因此, maponNext 回调实际上都在 Thread-0 中运行


前面的代码产生以下输出:

hello thread Thread-0


在 Reactor 中,执行模型和执行发生的位置由所使用的 Scheduler 决定。 Scheduler 具有与 ExecutorService 类似的调度职责,但是拥有专用的抽象可以让它做更多的事情,特别是充当时钟并支持更广泛的实现(测试的虚拟时间,蹦床或立即调度等)。


Schedulers 类具有静态方法,可以访问以下执行上下文:


  • 无执行上下文( Schedulers.immediate() ):在处理时,提交的 Runnable 将被直接执行,有效地将它们运行在当前的 Thread 上(可以看作是“空对象”或无操作 Scheduler )。


  • 单个可重用线程 ( Schedulers.single() )。请注意,此方法为所有调用者重用同一线程,直到调度程序被释放。如果您想要每个调用专用线程,请为每个调用使用 Schedulers.newSingle()


  • 无界弹性线程池 ( Schedulers.elastic() )。随着 Schedulers.boundedElastic() 的引入,这个不再是首选,因为它有隐藏背压问题并导致太多线程的趋势(见下文)。


  • 有界弹性线程池 ( Schedulers.boundedElastic() )。这是为阻塞进程提供自己的线程的便捷方法,这样它就不会占用其他资源。这是 I/O 阻塞工作的更好选择。请参阅如何包装同步、阻塞调用?,但新线程不会给系统带来太大压力。从 3.6.0 开始,根据设置可以提供两种不同的实现:


    • 基于 ExecutorService ,在任务之间重用平台线程。此实现与其前身 elastic() 一样,根据需要创建新的工作池并重用空闲的工作池。闲置时间过长(默认为 60 秒)的工作池也会被丢弃。与它的前身 elastic() 不同,它对可以创建的支持线程数量有上限(默认为 CPU 核心数量 x 10)。达到上限后提交的最多 100 000 个任务将被排队,并在线程可用时重新调度(当延迟调度时,延迟在线程可用时开始)。


    • 基于每个任务线程,设计为在 VirtualThread 实例上运行。要使用该功能,应用程序应在 Java 21+ 环境中运行并将 reactor.schedulers.defaultBoundedElasticOnVirtualThreads 系统属性设置为 true 。设置上述内容后,共享的 Schedulers.boundedElastic() 将返回 BoundedElasticScheduler 的特定实现,该实现是为在 VirtualThread 类的新实例上运行每个任务而定制的。此实现在行为方面与基于 ExecutorService 的实现类似,但没有空闲池并为每个任务创建一个新的 VirtualThread


  • 为并行工作而调整的固定工作人员池 ( Schedulers.parallel() )。它会创建与 CPU 核心数量一样多的工作线程。


此外,您可以使用 Schedulers.fromExecutorService(ExecutorService) 从任何预先存在的 ExecutorService 中创建 Scheduler 。 (您也可以从 Executor 创建一个,但不鼓励这样做。)


您还可以使用 newXXX 方法创建各种调度程序类型的新实例。例如, Schedulers.newParallel(yourScheduleName) 创建一个名为 yourScheduleName 的新并行调度程序。


虽然 boundedElastic 是为了帮助处理无法避免的遗留阻塞代码,但 singleparallel 则不然。因此,使用 Reactor 阻塞 API( block()blockFirst()blockLast() (以及迭代 toIterable()toStream() )在默认的单一和并行调度程序中)会导致抛出 IllegalStateException


自定义 Schedulers 还可以通过创建实现 NonBlocking 标记接口的 Thread 实例来标记为“仅非阻塞”。


某些运算符默认使用 Schedulers 中的特定调度程序(并且通常允许您选择提供不同的调度程序)。例如,调用 Flux.interval(Duration.ofMillis(300)) 工厂方法会生成每 300 毫秒计时一次的 Flux<Long> 。默认情况下,这是由 Schedulers.parallel() 启用的。以下行将 Scheduler 更改为类似于 Schedulers.single() 的新实例:

Flux.interval(Duration.ofMillis(300), Schedulers.newSingle("test"))


Reactor 提供了两种在反应链中切换执行上下文(或 Scheduler )的方法: publishOnsubscribeOn 。两者都采用 Scheduler 并让您将执行上下文切换到该调度程序。但是 publishOn 在链中的位置很重要,而 subscribeOn 的位置则无关紧要。要理解这种差异,您首先必须记住,在您订阅之前什么都不会发生。


在 Reactor 中,当您链接运算符时,您可以根据需要将任意多个 FluxMono 实现相互包装起来。一旦您订阅,就会创建一个 Subscriber 对象链,向后(沿着链向上)到第一个发布者。这对您来说实际上是隐藏的。您所看到的只是 Flux (或 Mono )和 Subscription 的外层,但这些中间特定于运营商的订阅者才是真正工作发生的地方。


有了这些知识,我们就可以仔细研究 publishOnsubscribeOn 运算符:


4.5.1. publishOn 方法


publishOn 的应用方式与任何其他运营商相同,位于订户链的中间。它从上游获取信号并在下游重播它们,同时从关联的 Scheduler 对工作线程执行回调。因此,它会影响后续运算符的执行位置(直到链接另一个 publishOn 为止),如下所示:


  • 将执行上下文更改为 Scheduler 选择的 Thread


  • 根据规范, onNext 调用按顺序发生,因此这会占用单个线程


  • 除非它们在特定的 Scheduler 上工作,否则 publishOn 之后的运算符将继续在同一线程上执行


以下示例使用 publishOn 方法:

Scheduler s = Schedulers.newParallel("parallel-scheduler", 4); (1)

final Flux<String> flux = Flux
    .range(1, 2)
    .map(i -> 10 + i)  (2)
    .publishOn(s)  (3)
    .map(i -> "value " + i);  (4)

new Thread(() -> flux.subscribe(System.out::println));  (5)
1
创建一个由四个 Thread 实例支持的新 Scheduler
2
第一个 map 在 <5> 中的匿名线程上运行。
3
publishOn 将整个序列切换到从 <1> 中选取的 Thread 上。
4
第二个 map 在 <1> 的 Thread 上运行。
5
这个匿名 Thread 是订阅发生的地方。打印发生在最新的执行上下文上,即来自 publishOn 的执行上下文。


4.5.2. subscribeOn 方法


subscribeOn 适用于订阅过程,构建反向链时。通常建议将其放置在数据源之后,因为中间运算符可能会影响执行的上下文。


但是,这不会影响对 publishOn 的后续调用的行为 - 它们仍然会切换其后的链部分的执行上下文。


  • 更改整个运营商链订阅的 Thread


  • Scheduler 中选取一个线程


只有下游链中最近的 subscribeOn 调用才能有效地将订阅和请求信号调度到可以拦截它们的源或操作员( doFirstdoOnRequest )。使用多个 subscribeOn 调用会引入不必要的没有价值的线程切换。


以下示例使用 subscribeOn 方法:

Scheduler s = Schedulers.newParallel("parallel-scheduler", 4); (1)

final Flux<String> flux = Flux
    .range(1, 2)
    .map(i -> 10 + i)  (2)
    .subscribeOn(s)  (3)
    .map(i -> "value " + i);  (4)

new Thread(() -> flux.subscribe(System.out::println));  (5)
1
创建一个由四个 Thread 支持的新 Scheduler
2
第一个 map 在这四个线程之一上运行...​
3
…​因为 subscribeOn 从订阅时间 (<5>) 开始切换整个序列。
4
第二个 map 也在同一线程上运行。
5
这个匿名 Thread 是订阅最初发生的地方,但 subscribeOn 立即将其转移到四个调度程序线程之一。

 4.6.处理错误


要快速查看可用于错误处理的运算符,请参阅相关的运算符决策树。


在响应式流中,错误是终端事件。一旦发生错误,它就会停止序列并沿着运算符链传播到最后一步,即您定义的 Subscriber 及其 onError 方法。


此类错误仍应在应用程序级别处理。例如,您可以在 UI 中显示错误通知或在 REST 端点中发送有意义的错误负载。因此,订阅者的 onError 方法应该始终被定义。


如果未定义, onError 会抛出 UnsupportedOperationException 。您可以使用 Exceptions.isErrorCallbackNotImplemented 方法进一步检测和分类它。


Reactor 还提供了处理链中间错误的替代方法,如错误处理运算符。以下示例展示了如何执行此操作:

Flux.just(1, 2, 0)
    .map(i -> "100 / " + i + " = " + (100 / i)) //this triggers an error with 0
    .onErrorReturn("Divided by zero :("); // error handling example

在了解错误处理运算符之前,您必须记住,反应序列中的任何错误都是终止事件。即使使用错误处理运算符,它也不会让原始序列继续。相反,它将 onError 信号转换为新序列(后备序列)的开始。换句话说,它替换了其上游终止的序列。


现在我们可以一一考虑每种错误处理方法。当相关时,我们与命令式编程的 try 模式进行比较。


4.6.1.错误处理运算符


您可能熟悉在 try-catch 块中处理异常的几种方法。最值得注意的是,这些包括以下内容:


  • 捕获并返回静态默认值。


  • 使用后备方法捕获并执行替代路径。


  • 捕获并动态计算回退值。


  • 捕获,包装到 BusinessException ,然后重新抛出。


  • 捕获、记录特定于错误的消息,然后重新抛出。


  • 使用 finally 块来清理资源或 Java 7“try-with-resource”构造。


所有这些在 Reactor 中都有等价物,以错误处理运算符的形式。在研究这些运算符之前,我们首先要在反应链和 try-catch 块之间建立并行。


订阅时,链末尾的 onError 回调类似于 catch 块。在那里,如果抛出 Exception ,执行会跳到 catch,如以下示例所示:

Flux<String> s = Flux.range(1, 10)
    .map(v -> doSomethingDangerous(v)) (1)
    .map(v -> doSecondTransform(v)); (2)
s.subscribe(value -> System.out.println("RECEIVED " + value), (3)
            error -> System.err.println("CAUGHT " + error) (4)
);
1
执行可能引发异常的转换。
2
如果一切顺利,就会进行第二次转变。
3
每个成功转换的值都会被打印出来。
4
如果出现错误,序列将终止并显示错误消息。


前面的示例在概念上类似于以下 try-catch 块:

try {
    for (int i = 1; i < 11; i++) {
        String v1 = doSomethingDangerous(i); (1)
        String v2 = doSecondTransform(v1); (2)
        System.out.println("RECEIVED " + v2);
    }
} catch (Throwable t) {
    System.err.println("CAUGHT " + t); (3)
}
1
如果这里抛出异常...​
2
...​循环的其余部分被跳过...​
3
...​并且执行直接到这里。


现在我们已经建立了并行,我们可以看看不同的错误处理情况及其等效的运算符。

 静态回退值


相当于“捕获并返回静态默认值”的是 onErrorReturn 。以下示例展示了如何使用它:

try {
  return doSomethingDangerous(10);
}
catch (Throwable error) {
  return "RECOVERED";
}


以下示例显示了 Reactor 等效项:

Flux.just(10)
    .map(this::doSomethingDangerous)
    .onErrorReturn("RECOVERED");


您还可以选择对异常应用 Predicate 来决定是否恢复,如以下示例所示:

Flux.just(10)
    .map(this::doSomethingDangerous)
    .onErrorReturn(e -> e.getMessage().equals("boom10"), "recovered10"); (1)
1
仅当异常消息为 "boom10" 时才恢复

捕获并吞下错误


如果您甚至不想用后备值替换异常,而是忽略它并仅传播到目前为止已生成的元素,那么您想要的本质上是将 onError 信号替换为 < b1> 信号。这可以通过 onErrorComplete 运算符来完成:

Flux.just(10,20,30)
    .map(this::doSomethingDangerousOn30)
    .onErrorComplete(); (1)
1
通过将 onError 转换为 onComplete 来恢复


onErrorReturn 一样, onErrorComplete 具有变体,可让您根据异常的类或 Predicate 过滤要依赖的异常。

 回退方法


如果您想要多个默认值并且您有另一种(更安全)的数据处理方式,则可以使用 onErrorResume 。这相当于“使用后备方法捕获并执行替代路径”。


例如,如果您的名义进程正在从外部且不可靠的服务获取数据,但您还保留了相同数据的本地缓存,该数据可能有点过时但更可靠,您可以执行以下操作:

String v1;
try {
  v1 = callExternalService("key1");
}
catch (Throwable error) {
  v1 = getFromCache("key1");
}

String v2;
try {
  v2 = callExternalService("key2");
}
catch (Throwable error) {
  v2 = getFromCache("key2");
}


以下示例显示了 Reactor 等效项:

Flux.just("key1", "key2")
    .flatMap(k -> callExternalService(k) (1)
        .onErrorResume(e -> getFromCache(k)) (2)
    );
1
对于每个键,异步调用外部服务。
2
如果外部服务调用失败,则回退到该密钥的缓存。请注意,无论源错误 e 是什么,我们始终应用相同的后备。


onErrorReturn 一样, onErrorResume 具有变体,可让您根据异常的类或 Predicate 过滤要依赖的异常。事实上,它需要 Function ,您还可以根据遇到的错误选择不同的后备序列进行切换。以下示例展示了如何执行此操作:

Flux.just("timeout1", "unknown", "key2")
    .flatMap(k -> callExternalService(k)
        .onErrorResume(error -> { (1)
            if (error instanceof TimeoutException) (2)
                return getFromCache(k);
            else if (error instanceof UnknownKeyException)  (3)
                return registerNewEntry(k, "DEFAULT");
            else
                return Flux.error(error); (4)
        })
    );
1
该功能允许动态选择如何继续。
2
如果源超时,则命中本地缓存。
3
如果消息来源说密钥未知,请创建一个新条目。
4
在所有其他情况下,“重新抛出”。
 动态回退值


即使您没有替代(更安全)的数据处理方式,您也可能希望从收到的异常中计算回退值。这相当于“捕获并动态计算回退值”。


例如,如果您的返回类型( MyWrapper )有一个专门用于保存异常的变体(例如 Future.complete(T success)Future.completeExceptionally(Throwable error) ),您可以实例化错误保存变体并传递异常。


一个命令式的例子如下所示:

try {
  Value v = erroringMethod();
  return MyWrapper.fromValue(v);
}
catch (Throwable error) {
  return MyWrapper.fromError(error);
}


您可以通过使用 onErrorResume 以及一点样板文件,以与后备方法解决方案相同的方式被动地执行此操作,如下所示:

erroringFlux.onErrorResume(error -> Mono.just( (1)
        MyWrapper.fromError(error) (2)
));
1
由于您期望错误的 MyWrapper 表示,因此您需要获取 onErrorResumeMono<MyWrapper> 。我们为此使用 Mono.just()
2
我们需要计算异常的值。在这里,我们通过使用相关的 MyWrapper 工厂方法包装异常来实现这一点。
 捕获并重新抛出


在命令式世界中,“捕获、包装到 BusinessException 并重新抛出”如下所示:

try {
  return callExternalService(k);
}
catch (Throwable error) {
  throw new BusinessException("oops, SLA exceeded", error);
}


在“后备方法”示例中, flatMap 内的最后一行提示我们以反应方式实现相同的目标,如下所示:

Flux.just("timeout1")
    .flatMap(k -> callExternalService(k))
    .onErrorResume(original -> Flux.error(
            new BusinessException("oops, SLA exceeded", original))
    );


但是,有一种更直接的方法可以使用 onErrorMap 实现相同的效果:

Flux.just("timeout1")
    .flatMap(k -> callExternalService(k))
    .onErrorMap(original -> new BusinessException("oops, SLA exceeded", original));

侧面记录或反应


如果您希望错误继续传播,但仍希望在不修改序列的情况下对其做出反应(例如记录它),则可以使用 doOnError 运算符。这相当于“捕获、记录特定于错误的消息,然后重新抛出”模式,如以下示例所示:

try {
  return callExternalService(k);
}
catch (RuntimeException error) {
  //make a record of the error
  log("uh oh, falling back, service failed for key " + k);
  throw error;
}


doOnError 运算符以及所有以 doOn 为前缀的运算符有时被称为具有“副作用”。它们让您可以查看序列事件的内部情况,而无需修改它们。


与前面显示的命令式示例类似,以下示例仍然传播错误,但确保我们至少记录外部服务发生故障:

LongAdder failureStat = new LongAdder();
Flux<String> flux =
Flux.just("unknown")
    .flatMap(k -> callExternalService(k) (1)
        .doOnError(e -> {
            failureStat.increment();
            log("uh oh, falling back, service failed for key " + k); (2)
        })
        (3)
    );
1
可能失败的外部服务调用...​
2
...​带有日志记录和统计副作用......​
3
…​之后,它仍然会因错误而终止,除非我们在这里使用错误恢复运算符。


我们还可以想象我们有统计计数器来增加第二个错误副作用。


使用资源和 Final 块


使用命令式编程绘制的最后一个相似之处是清理,可以通过使用“使用 finally 块来清理资源”或使用“Java 7 try-with-resource 构造”来完成”,均如下所示:


示例 14.finally 的命令式使用
Stats stats = new Stats();
stats.startTimer();
try {
  doSomethingDangerous();
}
finally {
  stats.stopTimerAndRecordTiming();
}

示例 15. try-with-resource 的命令式使用
try (SomeAutoCloseable disposableInstance = new SomeAutoCloseable()) {
  return disposableInstance.toString();
}


两者都有其 Reactor 等效项: doFinallyusing


doFinally 是关于您希望在序列终止(使用 onCompleteonError )或取消时执行的副作用。它会提示您哪种终止会触发副作用。以下示例展示了如何使用 doFinally


最终反应: doFinally()
Stats stats = new Stats();
LongAdder statsCancel = new LongAdder();

Flux<String> flux =
Flux.just("foo", "bar")
    .doOnSubscribe(s -> stats.startTimer())
    .doFinally(type -> { (1)
        stats.stopTimerAndRecordTiming();(2)
        if (type == SignalType.CANCEL) (3)
          statsCancel.increment();
    })
    .take(1); (4)
1
doFinally 使用 SignalType 作为终止类型。
2
finally 块类似,我们总是记录时间。
3
在这里,我们还仅在取消的情况下增加统计数据。
4
take(1) 从上游请求正好 1,并在发出一项后取消。


另一方面, using 处理 Flux 从资源派生的情况,并且每当处理完成时都必须对该资源进行操作。在下面的示例中,我们将“try-with-resource”的 AutoCloseable 接口替换为 Disposable


示例 16. 一次性资源
AtomicBoolean isDisposed = new AtomicBoolean();
Disposable disposableInstance = new Disposable() {
    @Override
    public void dispose() {
        isDisposed.set(true); (4)
    }

    @Override
    public String toString() {
        return "DISPOSABLE";
    }
};


现在我们可以对其进行相当于“try-with-resource”的响应式操作,如下所示:


示例 17. 反应式 try-with-resource: using()
Flux<String> flux =
Flux.using(
        () -> disposableInstance, (1)
        disposable -> Flux.just(disposable.toString()), (2)
        Disposable::dispose (3)
);
1
第一个 lambda 生成资源。在这里,我们返回模拟 Disposable
2
第二个 lambda 处理资源,返回 Flux<T>
3
当 <2> 中的 Flux 终止或取消时,将调用第三个 lambda,以清理资源。
4
订阅并执行序列后, isDisposed 原子布尔值变为 true

演示 onError 的终端方面


为了证明所有这些运算符在发生错误时都会导致上游原始序列终止,我们可以使用一个更直观的示例 Flux.intervalinterval 运算符每 x 个时间单位标记一次, Long 值逐渐增加。以下示例使用 interval 运算符:

Flux<String> flux =
Flux.interval(Duration.ofMillis(250))
    .map(input -> {
        if (input < 3) return "tick " + input;
        throw new RuntimeException("boom");
    })
    .onErrorReturn("Uh oh");

flux.subscribe(System.out::println);
Thread.sleep(2100); (1)
1
请注意,默认情况下 interval 在计时器 Scheduler 上执行。如果我们想在主类中运行该示例,则需要在此处添加 sleep 调用,以便应用程序不会在没有产生任何值的情况下立即退出。


上例每250ms打印一行,如下:

tick 0
tick 1
tick 2
Uh oh


即使多运行一秒, interval 也不会再有任何蜱虫进来。该序列确实因错误而终止。

 重试


关于错误处理还有另一个令人感兴趣的运算符,您可能会想在上一节中描述的情况下使用它。 retry ,正如其名称所示,允许您重试产生错误的序列。


需要记住的是,它是通过重新订阅上游 Flux 来工作的。这确实是一个不同的序列,并且原始序列仍然终止。为了验证这一点,我们可以重新使用前面的示例并附加 retry(1) 来重试一次,而不是使用 onErrorReturn 。以下示例展示了如何执行此操作:

Flux.interval(Duration.ofMillis(250))
    .map(input -> {
        if (input < 3) return "tick " + input;
        throw new RuntimeException("boom");
    })
    .retry(1)
    .elapsed() (1)
    .subscribe(System.out::println, System.err::println); (2)

Thread.sleep(2100); (3)
1
elapsed 将每个值与自发出前一个值以来的持续时间相关联。
2
我们还想看看何时存在 onError
3
确保我们有足够的时间进行 4x2 动作。


前面的示例产生以下输出:

259,tick 0
249,tick 1
251,tick 2
506,tick 0 (1)
248,tick 1
253,tick 2
java.lang.RuntimeException: boom
1
新的 interval 从刻度 0 开始。额外的 250 毫秒持续时间来自第四个刻度,即导致异常和后续重试的刻度。


从上面的例子可以看出, retry(1) 只是重新订阅了原来的 interval 一次,从0重新开始tick。第二次,由于仍然出现异常,所以放弃并向下游传播错误。


有一个更高级的 retry 版本(称为 retryWhen ),它使用“同伴” Flux 来判断特定故障是否应该重试。这个同伴 Flux 由操作员创建,但由用户修饰,以便自定义重试条件。


伴随 Flux 是一个 Flux<RetrySignal> ,它被传递给 Retry 策略/函数,作为 retryWhen 的唯一参数提供。作为用户,您定义该函数并使其返回新的 Publisher<?>Retry 类是一个抽象类,但如果您想使用简单的 lambda ( Retry.from(Function) ) 转换伴生类,它提供了一个工厂方法。


重试周期如下:


  1. 每次发生错误(可能会重试)时,都会将 RetrySignal 发送到已由您的函数修饰的同伴 Flux 中。这里有 Flux 可以让您鸟瞰迄今为止的所有尝试。 RetrySignal 允许访问错误及其周围的元数据。


  2. 如果同伴 Flux 发出一个值,则会发生重试。


  3. 如果伴随 Flux 完成,则错误被吞掉,重试周期停止,并且生成的序列也完成。


  4. 如果伴随 Flux 产生错误 ( e ),则重试周期将停止,并且生成的序列错误 e


前两种情况的区别很重要。简单地完成同伴就可以有效地消除错误。考虑使用 retryWhen 模拟 retry(3) 的以下方法:

Flux<String> flux = Flux
    .<String>error(new IllegalArgumentException()) (1)
    .doOnError(System.out::println) (2)
    .retryWhen(Retry.from(companion -> (3)
        companion.take(3))); (4)
1
这会不断产生错误,需要重试。
2
doOnError 在重试之前,我们可以记录并查看所有失败。
3
Retry 改编自一个非常简单的 Function lambda
4
在这里,我们认为前三个错误是可以重试的 ( take(3) ),然后放弃。


实际上,前面的示例会生成空的 Flux ,但它成功完成。由于同一个 Flux 上的 retry(3) 会因最新错误而终止,因此这个 retryWhen 示例与 retry(3) 并不完全相同。


达到相同的行为还需要一些额外的技巧:

AtomicInteger errorCount = new AtomicInteger();
Flux<String> flux =
		Flux.<String>error(new IllegalArgumentException())
				.doOnError(e -> errorCount.incrementAndGet())
				.retryWhen(Retry.from(companion -> (1)
						companion.map(rs -> { (2)
							if (rs.totalRetries() < 3) return rs.totalRetries(); (3)
							else throw Exceptions.propagate(rs.failure()); (4)
						})
				));
1
我们通过改编 Function lambda 来自定义 Retry ,而不是提供具体的类
2
同伴发出 RetrySignal 对象,该对象包含迄今为止的重试次数和最后一次失败的次数
3
为了允许三次重试,我们考虑索引 < 3 并返回一个要发出的值(这里我们只是返回索引)。
4
为了终止错误的序列,我们在这三次重试之后抛出原始异常。

人们可以使用 Retry 中公开的构建器以更流畅的方式以及更精细调整的重试策略来实现相同的目的。例如: errorFlux.retryWhen(Retry.max(3));

您可以使用类似的代码来实现“指数退避和重试”模式,如常见问题解答中所示。


核心提供的 Retry 帮助器 RetrySpecRetryBackoffSpec 都允许高级自定义,例如:


  • 设置 filter(Predicate) 为可以触发重试的异常


  • 通过 modifyErrorFilter(Function) 修改先前设置的过滤器


  • 触发副作用,例如围绕重试触发器进行记录(即延迟之前和之后的退避),前提是重试已验证( doBeforeRetry()doAfterRetry() 是累加的)


  • 围绕重试触发器触发异步 Mono<Void> ,这允许在基本延迟之上添加异步行为,从而进一步延迟触发器( doBeforeRetryAsyncdoAfterRetryAsync 是添加剂)


  • 通过 onRetryExhaustedThrow(BiFunction) 自定义达到最大尝试次数时的异常。默认使用 Exceptions.retryExhausted(…​) ,可以与 Exceptions.isRetryExhausted(Throwable) 区分


  • 激活瞬态错误的处理(见下文)


重试并出现暂时性错误


一些长期存在的源可能会出现零星的错误爆发,然后是一段较长的时间,在此期间一切都运行顺利。本文档将这种错误模式称为瞬态错误。


在这种情况下,最好单独处理每个突发,以便下一个突发不会继承前一个突发的重试状态。例如,使用指数退避策略,每个后续突发都应该从最小退避 Duration 开始延迟重试尝试,而不是不断增长的退避。


表示 retryWhen 状态的 RetrySignal 接口有一个可用于此目的的 totalRetriesInARow() 值。与通常单调递增的 totalRetries() 索引不同,每次重试恢复错误时(即,当重试尝试导致传入 onNext 再次代替 onError )。


RetrySpecRetryBackoffSpec 中的 transientErrors(boolean) 配置参数设置为 true 时,生成的策略将使用该 totalRetriesInARow()

AtomicInteger errorCount = new AtomicInteger(); (1)
Flux<Integer> transientFlux = httpRequest.get() (2)
        .doOnError(e -> errorCount.incrementAndGet());

transientFlux.retryWhen(Retry.max(2).transientErrors(true))  (3)
             .blockLast();
assertThat(errorCount).hasValue(6); (4)
1
为了便于说明,我们将计算重试序列中的错误数量。
2
我们假设一个http请求源,例如。流端点有时会连续失败两次,然后恢复。
3
我们在该源上使用 retryWhen ,配置为最多 2 次重试尝试,但处于 transientErrors 模式。
4
最后,在 6 尝试在 errorCount 中注册后,获得有效响应,并且 transientFlux 成功完成。


如果没有 transientErrors(true) ,第二个突发将超过 2 配置的最大尝试,整个序列最终将失败。


如果您想在没有实际 http 远程端点的情况下在本地尝试此操作,可以将伪 httpRequest 方法实现为 Supplier ,如下所示:

final AtomicInteger transientHelper = new AtomicInteger();
Supplier<Flux<Integer>> httpRequest = () ->
    Flux.generate(sink -> { (1)
        int i = transientHelper.getAndIncrement();
        if (i == 10) { (2)
            sink.next(i);
            sink.complete();
        }
        else if (i % 3 == 0) { (3)
            sink.next(i);
        }
        else {
            sink.error(new IllegalStateException("Transient error at " + i)); (4)
        }
    });
1
我们 generate 一个有大量错误的来源。
2
当计数器达到 10 时,它将成功完成。
3
如果 transientHelper 原子是 3 的倍数,我们会发出 onNext ,从而结束当前的突发。
4
在其他情况下,我们会发出 onError 。这是 3 次中的 2 次,因此 2 个 onError 的突发被 1 个 onNext 中断。


4.6.2.处理运算符或函数中的异常


一般来说,所有运算符本身都可以包含可能触发异常的代码或调用可能同样失败的用户定义回调,因此它们都包含某种形式的错误处理。


根据经验,未经检查的异常始终通过 onError 传播。例如,在 map 函数内抛出 RuntimeException 会转换为 onError 事件,如以下代码所示:

Flux.just("foo")
    .map(s -> { throw new IllegalArgumentException(s); })
    .subscribe(v -> System.out.println("GOT VALUE"),
               e -> System.out.println("ERROR: " + e));


前面的代码打印出以下内容:

ERROR: java.lang.IllegalArgumentException: foo

您可以在将 Exception 传递到 onError 之前通过使用挂钩对其进行调整。


然而,Reactor 定义了一组始终被视为致命的异常(例如 OutOfMemoryError )。请参阅 Exceptions.throwIfFatal 方法。这些错误意味着 Reactor 无法继续运行,并且会被抛出而不是传播。


在内部,还存在未经检查的异常仍然无法传播的情况(最明显的是在订阅和请求阶段),因为并发竞争可能导致双重 onErroronComplete 条件。当这些竞争发生时,无法传播的错误将被“丢弃”。这些情况仍然可以通过使用可定制的钩子在一定程度上进行管理。请参阅“脱钩”。


您可能会问:“检查异常怎么样?”


例如,如果您需要调用一些将其声明为 throws 异常的方法,那么您仍然必须在 try-catch 块中处理这些异常。不过,您有多种选择:


  1. 捕获异常并从中恢复。该序列正常继续。


  2. 捕获异常,将其包装成未经检查的异常,然后抛出它(中断序列)。 Exceptions 实用程序类可以帮助您完成此任务(我们接下来会介绍)。


  3. 如果您需要返回 Flux (例如,您位于 flatMap 中),请将异常包装在产生错误的 Flux 中,如下所示: < b3>。 (该序列也终止。)


Reactor 有一个 Exceptions 实用程序类,您可以使用它来确保仅当异常是已检查异常时才包装异常:


  • 如有必要,请使用 Exceptions.propagate 方法来包装异常。它还首先调用 throwIfFatal 并且不包装 RuntimeException


  • 使用 Exceptions.unwrap 方法获取原始的未包装异常(回到特定于反应器的异常层次结构的根本原因)。


考虑以下 map 示例,该示例使用可以抛出 IOException 的转换方法:

public String convert(int i) throws IOException {
    if (i > 3) {
        throw new IOException("boom " + i);
    }
    return "OK " + i;
}


现在假设您想在 map 中使用该方法。您现在必须显式捕获异常,并且您的映射函数无法重新抛出该异常。因此,您可以将其作为 RuntimeException 传播到地图的 onError 方法,如下所示:

Flux<String> converted = Flux
    .range(1, 10)
    .map(i -> {
        try { return convert(i); }
        catch (IOException e) { throw Exceptions.propagate(e); }
    });


稍后,当订阅前面的 Flux 并对错误做出反应(例如在 UI 中)时,如果您想对 IOException 执行一些特殊操作,则可以恢复到原始异常。以下示例展示了如何执行此操作:

converted.subscribe(
    v -> System.out.println("RECEIVED: " + v),
    e -> {
        if (Exceptions.unwrap(e) instanceof IOException) {
            System.out.println("Something bad happened with I/O");
        } else {
            System.out.println("Something bad happened");
        }
    }
);

 4.7.水槽


在 Reactor 中,接收器是一个类,它允许以独立的方式安全地手动触发信号,创建一个类似 Publisher 的结构,能够处理多个 Subscriberunicast() 口味)。


3.5.0 之前,还有一组 Processor 实现已被逐步淘汰。


4.7.1.使用 Sinks.OneSinks.Many 从多个线程安全地生成


reactor-core 公开的默认风格 Sinks 确保检测到多线程使用,并且从下游订阅者的角度来看,不会导致规范违规或未定义的行为。使用 tryEmit* API 时,并行调用很快就会失败。使用 emit* API 时,提供的 EmissionFailureHandler 可能允许重试争用(例如,忙循环),否则接收器将因错误而终止。


这是对 Processor.onNext 的改进,后者必须在外部同步,否则从下游订阅者的角度来看会导致未定义的行为。


处理器是一种特殊的 Publisher ,也是 Subscriber 。它们最初的目的是作为中间步骤的可能表示,然后可以在反应流实现之间共享。然而,在 Reactor 中,此类步骤由 Publisher 运算符表示。


第一次遇到 Processor 时的一个常见错误是直接调用公开的 onNextonCompleteonError 方法从 Subscriber 界面。


此类手动调用应谨慎进行,特别是关于响应式流规范的调用外部同步。处理器实际上可能有点用,除非遇到基于反应流的 API,它需要传递 Subscriber ,而不是公开 Publisher


水槽通常是更好的选择。


Sinks 构建器为主要支持的生产者类型提供指导 API。您将识别 Flux 中发现的一些行为,例如 onBackpressureBuffer

Sinks.Many<Integer> replaySink = Sinks.many().replay().all();


多个生产者线程可以通过执行以下操作同时在接收器上生成数据:

//thread1
replaySink.emitNext(1, EmitFailureHandler.FAIL_FAST);

//thread2, later
replaySink.emitNext(2, EmitFailureHandler.FAIL_FAST);

//thread3, concurrently with thread 2
//would retry emitting for 2 seconds and fail with EmissionException if unsuccessful
replaySink.emitNext(3, EmitFailureHandler.busyLooping(Duration.ofSeconds(2)));

//thread3, concurrently with thread 2
//would return FAIL_NON_SERIALIZED
EmitResult result = replaySink.tryEmitNext(4);


使用 busyLooping 时,请注意 EmitFailureHandler 返回的实例不能重复使用,例如,每个 emitNext


Sinks.Many 可以作为 Flux 呈现给下游消费者,如下例所示:

Flux<Integer> fluxView = replaySink.asFlux();
fluxView
	.takeWhile(i -> i < 10)
	.log()
	.blockLast();


类似地, Sinks.EmptySinks.One 风格可以被视为具有 asMono() 方法的 Mono


Sinks 类别是:


  1. many().multicast() :一个接收器,仅将新推送的数据传输给其订阅者,尊重他们的背压(新推送的数据如“订阅者订阅后”)。


  2. many().unicast() :与上面相同,但在第一个订阅者寄存器之前推送的数据被缓冲。


  3. many().replay() :一个接收器,它将向新订阅者重播指定历史大小的推送数据,然后继续实时推送新数据。


  4. one() :将向其订阅者播放单个元素的接收器


  5. empty() :一个接收器,仅向其订阅者播放终端信号(错误或完成),但仍然可以被视为 Mono<T> (注意通用类型 <T>


4.7.2.可用接收器概述

Sinks.many().unicast().onBackpressureBuffer(args?)


单播 Sinks.Many 可以通过使用内部缓冲区来处理背压。权衡是它最多可以有一个 Subscriber


基本单播接收器是通过 Sinks.many().unicast().onBackpressureBuffer() 创建的。但是 Sinks.many().unicast() 中还有一些额外的 unicast 静态工厂方法,可以进行更精细的调整。


例如,默认情况下,它是无界的:如果您通过它推送任意数量的数据,而它的 Subscriber 尚未请求数据,它会缓冲所有数据。您可以通过在 Sinks.many().unicast().onBackpressureBuffer(Queue) 工厂方法中为内部缓冲提供自定义 Queue 实现来更改此设置。如果该队列是有界的,则当缓冲区已满并且没有收到来自下游的足够请求时,接收器可能会拒绝推送值。

Sinks.many().multicast().onBackpressureBuffer(args?)


多播 Sinks.Many 可以向多个订阅者发送消息,同时为其每个订阅者提供背压。订阅者在订阅后仅接收通过接收器推送的信号。


基本多播接收器是通过 Sinks.many().multicast().onBackpressureBuffer() 创建的。


默认情况下,如果所有订阅者都被取消(这基本上意味着它们都已取消订阅),它将清除其内部缓冲区并停止接受新订阅者。您可以通过使用 Sinks.many().multicast() 下的 multicast 静态工厂方法中的 autoCancel 参数来调整它。

Sinks.many().multicast().directAllOrNothing()


具有简单化背压处理的多播 Sinks.Many :如果任何订阅者太慢(需求为零),则所有订阅者的 onNext 都会被丢弃。


然而,慢速订阅者不会被终止,一旦慢速订阅者再次开始请求,所有订阅者都将恢复接收从那里推送的元素。


一旦 Sinks.Many 终止(通常通过调用其 emitError(Throwable)emitComplete() 方法),它会让更多订阅者订阅,但立即向他们重播终止信号。

Sinks.many().multicast().directBestEffort()


尽力处理背压的多播 Sinks.Many :如果订阅者太慢(需求为零),则仅针对该慢速订阅者丢弃 onNext


然而,慢速订阅者不会被终止,一旦他们再次开始请求,他们将恢复接收新推送的元素。


一旦 Sinks.Many 终止(通常通过调用其 emitError(Throwable)emitComplete() 方法),它会让更多订阅者订阅,但立即向他们重播终止信号。

Sinks.many().replay()


重播 Sinks.Many 缓存发出的元素并将其重播给迟到的订阅者。


它可以以多种配置创建:


  • 缓存有限历史记录 ( Sinks.many().replay().limit(int) ) 或无限历史记录 ( Sinks.many().replay().all() )。


  • 缓存基于时间的重播窗口 ( Sinks.many().replay().limit(Duration) )。


  • 缓存历史记录大小和时间窗口的组合 ( Sinks.many().replay().limit(int, Duration) )。


用于微调上述内容的其他重载也可以在 Sinks.many().replay() 下找到,以及允许缓存单个元素的变体( latest()latestOrDefault(T) ) 。

Sinks.unsafe().many()


高级用户和操作员构建者可能会考虑使用 Sinks.unsafe().many() ,它将提供相同的 Sinks.Many 工厂,而无需额外的生产者线程安全性。因此,每个接收器的开销会更少,因为线程安全接收器必须检测多线程访问。


库开发人员不应公开不安全的接收器,但可以在受控的调用​​环境中内部使用它们,在该环境中他们可以确保导致 onNextonCompleteonError

 汇.one()


该方法直接构造一个简单的 Sinks.One<T> 实例。 Sinks 的这种风格可以作为 Mono 查看(通过其 asMono() 视图方法),并且具有稍微不同的 emit 方法来更好地传达这种类似 Mono 的语义:


  • emitValue(T value) 生成 onNext(value) 信号,并且在大多数实现中 - 还将触发隐式 onComplete()


  • emitEmpty() 生成一个隔离的 onComplete() 信号,旨在生成空 Mono 的等效信号


  • emitError(Throwable t) 生成 onError(t) 信号


Sinks.one() 接受对这些方法中任一方法的一次调用,有效地生成一个 Mono ,该 Mono 要么以一个值完成,要么以空完成或失败。

 水槽.empty()


该方法直接构造一个简单的 Sinks.Empty<T> 实例。 Sinks 的这种风格类似于 Sinks.One<T> ,只是它不提供 emitValue 方法。


结果,它只能生成一个完成为空或失败的 Mono


尽管无法触发 onNext ,接收器仍然使用通用 <T> 进行类型化,因为它允许轻松组合和包含在需要特定类型的运算符链中。


建议编辑“Reactor核心特性”

 5.Kotlin支持


Kotlin 是一种针对 JVM(和其他平台)的静态类型语言,它允许编写简洁而优雅的代码,同时提供与 Java 编写的现有库非常好的互操作性。


本节介绍 Reactor 对 Kotlin 的支持。

 5.1.要求


Reactor 支持 Kotlin 1.1+ 并需要 kotlin-stdlib (或其 kotlin-stdlib-jdk7kotlin-stdlib-jdk8 变体之一)。

 5.2.扩展


Dysprosium-M1 (即 reactor-core 3.3.0.M1 )开始,Kotlin 扩展已移至专用的 reactor-kotlin-extensions 模块,其新包名称以 reactor.kotlin 开头而不是简单的 reactor


因此, reactor-core 模块中的 Kotlin 扩展已被弃用。新依赖项的groupId和artifactId是:

io.projectreactor.kotlin:reactor-kotlin-extensions


得益于其出色的 Java 互操作性和 Kotlin 扩展,Reactor Kotlin API 利用常规 Java API,并通过 Reactor 工件中开箱即用的一些特定于 Kotlin 的 API 进行了增强。


请记住,需要导入 Kotlin 扩展才能使用。例如,这意味着仅当导入 import reactor.kotlin.core.publisher.toFluxThrowable.toFlux Kotlin 扩展才可用。也就是说,与静态导入类似,IDE 在大多数情况下应该自动建议导入。


例如,Kotlin 具体化类型参数为 JVM 泛型类型擦除提供了一种解决方法,而 Reactor 提供了一些扩展来利用此功能。


下表将使用 Java 的 Reactor 与使用 Kotlin 和扩展的 Reactor 进行了比较:

 爪哇

 带扩展的 Kotlin

Mono.just("foo")

"foo".toMono()

Flux.fromIterable(list)

list.toFlux()

Mono.error(new RuntimeException())

RuntimeException().toMono()

Flux.error(new RuntimeException())

RuntimeException().toFlux()

flux.ofType(Foo.class)

  flux.ofType<Foo>()flux.ofType(Foo::class)

StepVerifier.create(flux).verifyComplete()

flux.test().verifyComplete()


Reactor KDoc API 列出并记录了所有可用的 Kotlin 扩展。

 5.3.空安全


Kotlin 的关键功能之一是空安全,它在编译时干净地处理 null 值,而不是在运行时遇到著名的 NullPointerException 。这通过可空性声明和表达“值或无值”语义使应用程序更安全,而无需支付 Optional 等包装器的成本。 (Kotlin 允许使用具有可为 null 值的函数构造。请参阅此 Kotlin null 安全性综合指南。)


尽管 Java 不允许在其类型系统中表达 null 安全性,但 Reactor 现在通过 reactor.util.annotation 包中声明的工具友好注释提供整个 Reactor API 的 null 安全性。默认情况下,Kotlin 中使用的 Java API 中的类型被识别为放宽空检查的平台类型。 Kotlin 对 JSR 305 注释和 Reactor 可空性注释的支持为 Kotlin 开发人员提供了整个 Reactor API 的空安全性,并具有在编译时处理 null 相关问题的优点。


您可以通过添加带有以下选项的 -Xjsr305 编译器标志来配置 JSR 305 检查: -Xjsr305={strict|warn|ignore}


对于 kotlin 版本 1.1.50+,默认行为与 -Xjsr305=warn 相同。 strict 值需要考虑 Reactor API 完全空安全性,但应被视为实验性的,因为 Reactor API 空性声明甚至可能在次要版本之间演变,因为可能会在未来)。


尚不支持泛型类型参数、变量参数和数组元素的可为空性,但应该会在即将发布的版本中提供支持。请参阅此讨论以获取最新信息。


建议编辑“Kotlin 支持”

 6. 测试


无论您编写了简单的 Reactor 操作符链还是您自己的操作符,自动化测试始终是一个好主意。


Reactor 附带了一些专用于测试的元素,这些元素聚集到它们自己的工件中: reactor-test 。您可以在 Github 上的 reactor-core 存储库内找到该项目。


要在测试中使用它,您必须将其添加为测试依赖项。以下示例显示如何在 Maven 中添加 reactor-test 作为依赖项:


示例 18.Maven 中的 Reactor-test,位于 <dependencies>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <scope>test</scope>
    (1)
</dependency>
1
如果您使用 BOM,则无需指定 <version>


以下示例展示了如何在 Gradle 中添加 reactor-test 作为依赖项:


示例 19. Gradle 中的 Reactor-test,修改 dependencies
dependencies {
   testCompile 'io.projectreactor:reactor-test'
}


reactor-test 的三个主要用途如下:


  • 使用 StepVerifier 逐步测试序列是否遵循给定场景。


  • 使用 TestPublisher 生成数据以测试下游运算符(包括您自己的运算符)的行为。


  • 在可以经历多个替代 Publisher 的序列中(例如,使用 switchIfEmpty 的链,探测这样的 Publisher 以确保它被使用(即,已订阅)。


6.1.使用 StepVerifier 测试场景


测试 Reactor 序列的最常见情况是在代码中定义 FluxMono (例如,它可能由方法返回)并想要测试订阅时它的行为方式。


这种情况很好地转化为定义“测试场景”,您可以在其中根据事件逐步定义您的期望。您可以提出并回答以下问题:


  • 下一个预期事件是什么?


  • 您希望 Flux 发出特定值吗?


  • 或者在接下来的 300 毫秒内什么都不做?


您可以通过 StepVerifier API 表达所有这些。


例如,您可以在代码库中使用以下实用方法来装饰 Flux

public <T> Flux<T> appendBoomError(Flux<T> source) {
  return source.concatWith(Mono.error(new IllegalArgumentException("boom")));
}


为了测试它,您需要验证以下场景:


我希望这个 Flux 首先发出 thing1 ,然后发出 thing2 ,然后产生一条错误消息 boom 。订阅并验证这些期望。


StepVerifier API 中,这转化为以下测试:

@Test
public void testAppendBoomError() {
  Flux<String> source = Flux.just("thing1", "thing2"); (1)

  StepVerifier.create( (2)
    appendBoomError(source)) (3)
    .expectNext("thing1") (4)
    .expectNext("thing2")
    .expectErrorMessage("boom") (5)
    .verify(); (6)
}
1
由于我们的方法需要一个源 Flux ,因此定义一个简单的源用于测试目的。
2
创建一个包装并验证 FluxStepVerifier 构建器。
3
传递要测试的 Flux (调用我们的实用程序方法的结果)。
4
我们期望在订阅时发生的第一个信号是 onNext ,其值为 thing1
5
我们期望发生的最后一个信号是用 onError 终止序列。异常应该有 boom 作为消息。
6
通过调用 verify() 触发测试非常重要。


API 是一个构建器。首先创建 StepVerifier 并传递要测试的序列。这提供了多种方法供您选择:


  • 表达对下一个出现信号的期望。如果收到任何其他信号(或者信号的内容与预期不符),则整个测试将失败并返回有意义的 AssertionError 。例如,您可以使用 expectNext(T…​)expectNextCount(long)


  • 消耗下一个信号。当您想要跳过部分序列或想要对信号内容应用自定义 assertion 时(例如,检查是否存在 onNext 事件并断言发出的项目是大小为 5 的列表)。例如,您可以使用 consumeNextWith(Consumer<T>)


  • 采取各种操作,例如暂停或运行任意代码。例如,如果您想要操作特定于测试的状态或上下文。为此,您可以使用 thenAwait(Duration)then(Runnable)


对于终端事件,相应的期望方法( expectComplete()expectError() 及其所有变体)切换到您无法再表达期望的API。在最后一步中,您所能做的就是对 StepVerifier 执行一些附加配置,然后通常使用 verify() 或其变体之一触发验证。


此时发生的情况是 StepVerifier 订阅测试的 FluxMono 并播放序列,将每个新信号与场景中的下一步进行比较。只要这些匹配,就认为测试成功。一旦出现差异,就会抛出 AssertionError


请记住 verify() 步骤,它会触发验证。为了提供帮助,该 API 包含一些快捷方法,将终端期望与对 verify() 的调用结合起来: verifyComplete()verifyError()verifyErrorMessage(String) , 和别的。


请注意,如果基于 lambda 的期望之一抛出 AssertionError ,则按原样报告,导致测试失败。这对于自定义断言很有用。


默认情况下, verify() 方法和派生快捷方法( verifyThenAssertThatverifyComplete() 等)没有超时。他们可以无限期地阻止。您可以使用 StepVerifier.setDefaultTimeout(Duration) 为这些方法全局设置超时,或使用 verify(Duration) 在每次调用的基础上指定一个超时。


6.1.1.更好地识别测试失败


StepVerifier 提供了两个选项来更好地识别哪个期望步骤导致测试失败:


  • as(String) :在大多数 expect* 方法之后使用,对前面的期望进行描述。如果期望失败,其错误消息包含描述。终端期望和 verify 不能这样描述。


  • StepVerifierOptions.create().scenarioName(String) :通过使用 StepVerifierOptions 创建 StepVerifier ,您可以使用 scenarioName 方法为整个场景命名,即也用于断言错误消息。


请注意,在这两种情况下,仅保证在生成自己的 AssertionErrorStepVerifier 方法中使用消息中的描述或名称(例如,手动或通过 assertNext 中的断言库不会将描述或名称添加到错误消息中)。

 6.2.操纵时间


您可以将 StepVerifier 与基于时间的运算符一起使用,以避免相应测试的长时间运行。您可以通过 StepVerifier.withVirtualTime 构建器来执行此操作。


它看起来像下面的例子:

StepVerifier.withVirtualTime(() -> Mono.delay(Duration.ofDays(1)))
//... continue expectations here


此虚拟时间功能​​插入 Reactor 的 Schedulers 工厂中的自定义 Scheduler 。由于这些定时运算符通常使用默认的 Schedulers.parallel() 调度程序,因此将其替换为 VirtualTimeScheduler 就可以了。然而,一个重要的先决条件是在虚拟时间调度器被激活之后算子被实例化。


为了增加正确发生这种情况的机会, StepVerifier 不采用简单的 Flux 作为输入。 withVirtualTime 采用 Supplier ,它会指导您在完成调度程序设置后延迟创建测试通量的实例。


请格外小心,确保 Supplier<Publisher<T>> 可以以惰性方式使用。否则,无法保证虚拟时间。特别是避免在测试代码中较早实例化 Flux 并让 Supplier 返回该变量。相反,始终在 lambda 内实例化 Flux


有两种处理时间的期望方法,无论有没有虚拟时间,它们都有效:


  • thenAwait(Duration) :暂停步骤的评估(允许出现一些信号或延迟结束)。


  • expectNoEvent(Duration) :还让序列在给定的持续时间内播放,但如果在此期间出现任何信号,则测试失败。


这两种方法都在经典模式下将线程暂停给定的持续时间,并在虚拟模式下提前虚拟时钟。


expectNoEvent 还将 subscription 视为事件。如果您将其用作第一步,通常会失败,因为检测到订阅信号。请改用 expectSubscription().expectNoEvent(duration)


为了快速评估上面 Mono.delay 的行为,我们可以按如下方式完成代码编写:

StepVerifier.withVirtualTime(() -> Mono.delay(Duration.ofDays(1)))
    .expectSubscription() (1)
    .expectNoEvent(Duration.ofDays(1)) (2)
    .expectNext(0L) (3)
    .verifyComplete(); (4)
1  请参阅前面的提示。
2
预计一整天不会发生任何事情。
3
然后期望发出 0 的延迟。
4
然后期待完成(并触发验证)。


我们可以在上面使用 thenAwait(Duration.ofDays(1)) ,但是 expectNoEvent 的好处是可以保证没有任何事情发生得比它应该发生的更早。


请注意, verify() 返回一个 Duration 值。这是整个测试的实时持续时间。


虚拟时间并不是灵丹妙药。所有 Schedulers 都替换为相同的 VirtualTimeScheduler 。在某些情况下,您可以锁定验证过程,因为在表达期望之前虚拟时钟尚未向前移动,导致期望等待只能通过提前时间产生的数据。在大多数情况下,您需要提前虚拟时钟才能发出序列。对于无限序列,虚拟时间也变得非常有限,这可能会占用运行序列及其验证的线程。


6.3.使用 StepVerifier 执行执行后断言


在描述了场景的最终期望之后,您可以切换到补充断言 API,而不是触发 verify() 。为此,请改用 verifyThenAssertThat()


verifyThenAssertThat() 返回一个 StepVerifier.Assertions 对象,一旦整个场景成功执行,您可以使用它来断言一些状态元素(因为它也调用 verify() ) 。典型的(尽管是高级的)用法是捕获由某些操作符删除的元素并断言它们(请参阅有关 Hooks 的部分)。


6.4.测试 Context


有关 Context 的更多信息,请参阅将上下文添加到响应序列。


StepVerifierContext 的传播有一些期望:


  • expectAccessibleContext :返回一个 ContextExpectations 对象,您可以使用该对象设置对传播的 Context 的期望。请务必调用 then() 以返回序列期望集。


  • expectNoAccessibleContext :设置一个期望,即没有 Context 可以在被测试的运算符链上传播。当被测 Publisher 不是 Reactor 或没有任何可以传播 Context 的运算符(例如,生成器源)时,最有可能发生这种情况。


此外,您可以使用 StepVerifierOptions 创建验证程序,将特定于测试的初始 Context 关联到 StepVerifier


以下代码片段演示了这些功能:

StepVerifier.create(Mono.just(1).map(i -> i + 10),
				StepVerifierOptions.create().withInitialContext(Context.of("thing1", "thing2"))) (1)
		            .expectAccessibleContext() (2)
		            .contains("thing1", "thing2") (3)
		            .then() (4)
		            .expectNext(11)
		            .verifyComplete(); (5)
1
使用 StepVerifierOptions 创建 StepVerifier 并传入初始 Context
2
开始设置关于 Context 传播的期望。仅此一点就可以确保 Context 被传播。
3
Context 特定期望的示例。它必须包含键“thing1”的值“thing2”。
4
我们 then() 切换回对数据设置正常期望。
5
让我们不要忘记 verify() 整套期望。


6.5.使用 TestPublisher 手动发射


对于更高级的测试用例,完全掌握数据源可能会很有用,以触发与您想要测试的特定情况紧密匹配的精心选择的信号。


另一种情况是,当您实现了自己的运算符,并且想要验证它在响应式流规范方面的行为方式时,尤其是在其源代码表现不佳的情况下。


对于这两种情况, reactor-test 提供 TestPublisher 类。这是一个 Publisher<T> ,可让您以编程方式触发各种信号:


  • next(T)next(T, T…​) 触发 1-n 个 onNext 信号。


  • emit(T…​) 触发 1-n 个 onNext 信号并执行 complete()


  • complete()onComplete 信号终止。


  • error(Throwable)onError 信号终止。


您可以通过 create 工厂方法获得行为良好的 TestPublisher 。此外,您还可以使用 createNonCompliant 工厂方法创建行为不当的 TestPublisher 。后者从 TestPublisher.Violation 枚举中获取一个或多个值。这些值定义了发布者可以忽略规范的哪些部分。这些枚举值包括:


  • REQUEST_OVERFLOW :尽管请求不足,仍允许进行 next 调用,而不触发 IllegalStateException


  • ALLOW_NULL :允许使用 null 值进行 next 调用,而不触发 NullPointerException


  • CLEANUP_ON_TERMINATE :允许连续发送多次终止信号。这包括 complete()error()emit()


  • DEFER_CANCELLATION :允许 TestPublisher 忽略取消信号并继续发出信号,就好像取消在与所述信号的竞争中失败了一样。


最后, TestPublisher 跟踪订阅后的内部状态,这可以通过其各种 assert* 方法进行断言。


您可以通过使用转换方法 flux()mono() 将其用作 FluxMono


6.6.使用 PublisherProbe 检查执行路径


在构建复杂的运算符链时,您可能会遇到存在多个可能的执行路径的情况,这些执行路径由不同的子序列具体化。


大多数时候,这些子序列会产生一个足够具体的 onNext 信号,您可以通过查看最终结果来断言它已被执行。


例如,考虑以下方法,该方法从源构建运算符链,并在源为空时使用 switchIfEmpty 回退到特定替代方案:

public Flux<String> processOrFallback(Mono<String> source, Publisher<String> fallback) {
    return source
            .flatMapMany(phrase -> Flux.fromArray(phrase.split("\\s+")))
            .switchIfEmpty(fallback);
}


您可以测试使用了 switchIfEmpty 的哪个逻辑分支,如下所示:

@Test
public void testSplitPathIsUsed() {
    StepVerifier.create(processOrFallback(Mono.just("just a  phrase with    tabs!"),
            Mono.just("EMPTY_PHRASE")))
                .expectNext("just", "a", "phrase", "with", "tabs!")
                .verifyComplete();
}

@Test
public void testEmptyPathIsUsed() {
    StepVerifier.create(processOrFallback(Mono.empty(), Mono.just("EMPTY_PHRASE")))
                .expectNext("EMPTY_PHRASE")
                .verifyComplete();
}


但是,请考虑一个示例,其中该方法会生成 Mono<Void> 。它等待源完成,执行附加任务,然后完成。如果源为空,则必须执行类似于后备 Runnable 的任务。下面的例子展示了这种情况:

private Mono<String> executeCommand(String command) {
    return Mono.just(command + " DONE");
}

public Mono<Void> processOrFallback(Mono<String> commandSource, Mono<Void> doWhenEmpty) {
    return commandSource
            .flatMap(command -> executeCommand(command).then()) (1)
            .switchIfEmpty(doWhenEmpty); (2)
}
1
then() 忘记了命令结果。它只关心它是否已完成。
2
如何区分两种情况都是空序列?


要验证您的 processOrFallback 方法确实经过 doWhenEmpty 路径,您需要编写一些样板文件。也就是说,您需要一个 Mono<Void> 来:


  • 捕获它已被订阅的事实。


  • 让您在整个过程终止后断言该事实。


在版本 3.1 之前,您需要为要断言的每个状态手动维护一个 AtomicBoolean ,并将相应的 doOn* 回调附加到您要评估的发布者。当必须定期应用此模式时,这可能是很多样板文件。幸运的是,3.1.0 引入了 PublisherProbe 的替代方案。以下示例展示了如何使用它:

@Test
public void testCommandEmptyPathIsUsed() {
    PublisherProbe<Void> probe = PublisherProbe.empty(); (1)

    StepVerifier.create(processOrFallback(Mono.empty(), probe.mono())) (2)
                .verifyComplete();

    probe.assertWasSubscribed(); (3)
    probe.assertWasRequested(); (4)
    probe.assertWasNotCancelled(); (5)
}
1
创建一个转换为空序列的探针。
2
通过调用 probe.mono() 使用探针代替 Mono<Void>
3
完成序列后,探针可以让您断言它已被使用。您可以检查是否已订阅...​
4
...以及实际请求的数据...​
5
……以及是否被取消。


您还可以通过调用 .flux() 而不是 .mono() 来使用探针代替 Flux<T> 。对于需要探测执行路径但还需要探测器发出数据的情况,可以使用 PublisherProbe.of(Publisher) 包装任何 Publisher<T>


建议编辑“测试”

 7. 调试Reactor


从命令式和同步编程范式切换到反应式和异步编程范式有时可能会令人望而生畏。学习曲线中最陡峭的步骤之一是如何在出现问题时进行分析和调试。


在命令式世界中,调试通常非常简单。您可以阅读堆栈跟踪并查看问题的根源。这完全是你的代码的失败吗?故障是否发生在某些库代码中?如果是这样,您的代码的哪一部分调用了库,可能传递了最终导致失败的不正确参数?


7.1.典型的 Reactor 堆栈跟踪


当您转向异步代码时,事情会变得更加复杂。


考虑以下堆栈跟踪:


示例 20. 典型的 Reactor 堆栈跟踪
java.lang.IndexOutOfBoundsException: Source emitted more than one item
    at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:445)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:379)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)
    at reactor.core.publisher.FluxRange$RangeSubscription.slowPath(FluxRange.java:154)
    at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:109)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:332)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90)
    at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:68)
    at reactor.core.publisher.FluxMapFuseable.subscribe(FluxMapFuseable.java:63)
    at reactor.core.publisher.FluxFlatMap.subscribe(FluxFlatMap.java:97)
    at reactor.core.publisher.MonoSingle.subscribe(MonoSingle.java:58)
    at reactor.core.publisher.Mono.subscribe(Mono.java:3096)
    at reactor.core.publisher.Mono.subscribeWith(Mono.java:3204)
    at reactor.core.publisher.Mono.subscribe(Mono.java:3090)
    at reactor.core.publisher.Mono.subscribe(Mono.java:3057)
    at reactor.core.publisher.Mono.subscribe(Mono.java:3029)
    at reactor.guide.GuideTests.debuggingCommonStacktrace(GuideTests.java:995)


那里发生了很多事情。我们得到一个 IndexOutOfBoundsException ,它告诉我们一个 source emitted more than one item


我们可能很快就会假设这个源是 Flux 或 Mono,正如下一行提到的 MonoSingle 所证实的那样。因此,这似乎是来自 single 运营商的某种投诉。


参考 Mono#single 运算符的 Javadoc,我们看到 single 有一个约定:源必须恰好发出一个元素。看来我们有一个源发出了多个信号,因此违反了该合同。


我们可以更深入地挖掘并确定其来源吗?下面的行不是很有帮助。它们通过多次调用 subscriberequest 带领我们了解似乎是反应链的内部结构。


通过浏览这些行,我们至少可以开始形成出错的链类型的图片:它似乎涉及 MonoSingleFluxFlatMapFluxRange (每个类在跟踪中都有几行,但总体而言涉及这三个类)。那么可能是 range().flatMap().single() 链?


但是如果我们在应用程序中大量使用该模式怎么办?这仍然没有告诉我们太多信息,仅仅搜索 single 并不能找到问题。最后一行引用了我们的一些代码。终于,我们越来越接近了。


不过,坚持住。当我们转到源文件时,我们看到的是订阅了一个预先存在的 Flux ,如下所示:

toDebug
    .subscribeOn(Schedulers.immediate())
    .subscribe(System.out::println, Throwable::printStackTrace);


所有这些都发生在订阅时,但 Flux 本身并未在那里声明。更糟糕的是,当我们转到声明变量的位置时,我们会看到以下内容:

public Mono<String> toDebug; //please overlook the public class attribute


该变量未在声明的地方实例化。我们必须假设最坏的情况,即我们发现应用程序中可能有几个不同的代码路径来设置它。我们仍然不确定是哪一个导致了这个问题。


这在 Reactor 中相当于运行时错误,而不是编译错误。


我们想要更容易地找到的是运算符被添加到链中的位置 - 即 Flux 被声明的位置。我们通常将其称为 Flux 的“组件”。


7.2.激活调试模式 - 又名回溯


本节介绍启用调试功能的最简单但也是最慢的方法,因为它捕获每个运算符的堆栈跟踪。请参阅 checkpoint() 替代方案以获取更细粒度的调试方式,并参阅生产就绪的全局调试以获取更高级和高性能的全局选项。


尽管堆栈跟踪仍然能够为有一定经验的人传达一些信息,但我们可以看到,在更高级的情况下,它本身并不理想。


幸运的是,Reactor 附带了专为调试而设计的汇编时工具。


这是通过在应用程序启动时(或至少在实例化有问题的 FluxMono 之前)通过 Hooks.onOperatorDebug() 方法激活全局调试模式来完成的,如下所示如下:

Hooks.onOperatorDebug();


这开始通过包装操作符的构造并捕获堆栈跟踪来检测对 Reactor 操作符方法的调用(它们被组装到链中)。由于这是在声明操作符链时完成的,因此应该在此之前激活挂钩,因此最安全的方法是在应用程序启动时激活它。


稍后,如果发生异常,失败的操作员可以引用该捕获并重新处理堆栈跟踪,附加附加信息。


我们将这种捕获的程序集信息(以及通常由 Reactor 添加到异常中的附加信息)称为回溯。


在下一节中,我们将了解堆栈跟踪有何不同以及如何解释新信息。


7.3.在调试模式下读取堆栈跟踪


当我们重用最初的示例但激活 operatorStacktrace 调试功能时,会发生以下情况:


  1. 堆栈跟踪指向订阅站点,因此不太有趣,在第一帧之后被剪切并放在一边。


  2. 一个特殊的抑制异常被添加到原始异常中(或者如果已经存在则进行修改)。


  3. 为该特殊异常构造一条消息,其中包含多个部分。


  4. 第一部分将追溯到发生故障的操作员的组装地点。


  5. 第二部分将尝试显示从该运算符构建的链,并且已经看到错误传播


  6. 最后一部分是原始堆栈跟踪


打印后的完整堆栈跟踪如下:

java.lang.IndexOutOfBoundsException: Source emitted more than one item
    at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:127) (1)
    Suppressed: The stacktrace has been enhanced by Reactor, refer to additional information below: (2)
Assembly trace from producer [reactor.core.publisher.MonoSingle] : (3)
    reactor.core.publisher.Flux.single(Flux.java:7915)
    reactor.guide.GuideTests.scatterAndGather(GuideTests.java:1017)
Error has been observed at the following site(s): (4)
    *_______Flux.single ⇢ at reactor.guide.GuideTests.scatterAndGather(GuideTests.java:1017) (5)
    |_ Mono.subscribeOn ⇢ at reactor.guide.GuideTests.debuggingActivated(GuideTests.java:1071) (6)
Original Stack Trace: (7)
        at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:127)
...
(8)
...
        at reactor.core.publisher.Mono.subscribeWith(Mono.java:4363)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4223)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4159)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4131)
        at reactor.guide.GuideTests.debuggingActivated(GuideTests.java:1067)
1
原始堆栈跟踪被截断为单个帧。
2
这是新的:我们看到捕获堆栈的包装操作符。这是回溯开始出现的地方。
3
首先,我们了解有关操作员组装地点的一些详细信息。
4
其次,我们得到了操作符链的概念,错误通过该操作符链从第一个传播到最后一个(错误站点到订阅站点)。
5
每个看到错误的操作员都会被提及,以及使用它的用户类和行。这里我们有一个“根”。
6
这里我们有链的一个简单部分。
7
堆栈跟踪的其余部分被移到最后......​
8
…​显示了操作员的一些内部结构(因此我们在这里删除了一些片段)。


捕获的堆栈跟踪作为抑制的 OnAssemblyException 附加到原始错误。它分为三个部分,但第一部分是最有趣的。它显示了触发异常的操作符的构建路径。在这里,它表明导致我们问题的 single 实际上是在 scatterAndGather 方法中创建的。


现在我们已经掌握了足够的信息来找到罪魁祸首,我们可以对 scatterAndGather 方法进行有意义的研究:

private Mono<String> scatterAndGather(Flux<String> urls) {
    return urls.flatMap(url -> doRequest(url))
           .single(); (1)
}
1
果然,这是我们的 single


现在我们可以看到错误的根本原因是 flatMap 对几个 URL 执行多次 HTTP 调用,但与 single 链接,这限制性太强。经过简短的 git blame 并与该行的作者进行快速讨论后,我们发现他打算使用限制较少的 take(1) 来代替。


我们已经解决了我们的问题。


现在考虑堆栈跟踪中的以下部分:

Error has been observed at the following site(s):


在此特定示例中,回溯的第二部分不一定有趣,因为错误实际上发生在链中的最后一个运算符(最接近 subscribe 的运算符)中。考虑另一个例子可能会更清楚:

FakeRepository.findAllUserByName(Flux.just("pedro", "simon", "stephane"))
              .transform(FakeUtils1.applyFilters)
              .transform(FakeUtils2.enrichUser)
              .blockLast();


现在想象一下,在 findAllUserByName 内部,有一个失败的 map 。在这里,我们将在回溯的第二部分看到以下内容:

Error has been observed at the following site(s):
    *________Flux.map ⇢ at reactor.guide.FakeRepository.findAllUserByName(FakeRepository.java:27)
    |_       Flux.map ⇢ at reactor.guide.FakeRepository.findAllUserByName(FakeRepository.java:28)
    |_    Flux.filter ⇢ at reactor.guide.FakeUtils1.lambda$static$1(FakeUtils1.java:29)
    |_ Flux.transform ⇢ at reactor.guide.GuideDebuggingExtraTests.debuggingActivatedWithDeepTraceback(GuideDebuggingExtraTests.java:39)
    |_   Flux.elapsed ⇢ at reactor.guide.FakeUtils2.lambda$static$0(FakeUtils2.java:30)
    |_ Flux.transform ⇢ at reactor.guide.GuideDebuggingExtraTests.debuggingActivatedWithDeepTraceback(GuideDebuggingExtraTests.java:40)


这对应于收到错误通知的操作符链的部分:


  1. 异常源自第一个 map 。这个被 * 连接器识别为根,并且 _ 用于缩进。


  2. 第二个 map 会发现异常(两者实际上都对应于 findAllUserByName 方法)。


  3. 然后可以看到 filtertransform ,这表明链的一部分是由可重用的转换函数构建的(这里是 applyFilters 实用方法)。


  4. 最后,它由 elapsedtransform 看到。再次, elapsed 由第二个变换的变换函数应用。


在某些情况下,相同的异常通过多个链传播,“根”标记 *_ 允许我们更好地分离这些链。如果某个站点被多次访问,则调用站点信息后会有一个 (observed x times)


例如,让我们考虑以下代码片段:

public class MyClass {
    public void myMethod() {
        Flux<String> source = Flux.error(sharedError);
        Flux<String> chain1 = source.map(String::toLowerCase).filter(s -> s.length() < 4);
        Flux<String> chain2 = source.filter(s -> s.length() > 5).distinct();

        Mono<Void> when = Mono.when(chain1, chain2);
    }
}


在上面的代码中,错误通过两个单独的链 chain1chain2 传播到 when 。它将导致包含以下内容的回溯:

Error has been observed at the following site(s):
    *_____Flux.error ⇢ at myClass.myMethod(MyClass.java:3) (observed 2 times)
    |_      Flux.map ⇢ at myClass.myMethod(MyClass.java:4)
    |_   Flux.filter ⇢ at myClass.myMethod(MyClass.java:4)
    *_____Flux.error ⇢ at myClass.myMethod(MyClass.java:3) (observed 2 times)
    |_   Flux.filter ⇢ at myClass.myMethod(MyClass.java:5)
    |_ Flux.distinct ⇢ at myClass.myMethod(MyClass.java:5)
    *______Mono.when ⇢ at myClass.myMethod(MyClass.java:7)

 我们看到:


  1. 有 3 个“根”元素( when 是真正的根)。


  2. Flux.error 开始的两条链是可见的。


  3. 两个链似乎都基于相同的 Flux.error 源( observed 2 times )。

  4.  第一条链是 Flux.error().map().filter


  5. 第二条链是 `Flux.error().filter().distinct()


关于回溯和抑制异常的注释:由于回溯作为抑制异常附加到原始错误,这可能会在一定程度上干扰使用此机制的另一种类型的异常:复合异常。此类异常可以直接通过 Exceptions.multiple(Throwable…​) 创建,也可以通过某些可能连接多个错误源的运算符(如 Flux#flatMapDelayError )创建。它们可以通过 Exceptions.unwrapMultiple(Throwable) 展开为 List ,在这种情况下,回溯将被视为组合的组件,并且是返回的 List 的一部分。如果这在某种程度上是不可取的,则可以通过 Exceptions.isTraceback(Throwable) 检查来识别回溯,并通过使用 Exceptions.unwrapMultipleExcludingTracebacks(Throwable) 将其从此类展开中排除。


我们在这里处理一种形式的检测,创建堆栈跟踪的成本很高。这就是为什么此调试功能只能以受控方式激活,作为最后的手段。


7.3.1. checkpoint() 替代方案


调试模式是全局的,会影响应用程序内组装到 FluxMono 中的每个运算符。这样做的好处是允许事后调试:无论错误是什么,我们都可以获得附加信息来调试它。


正如我们之前所看到的,这种全局知识是以影响性能为代价的(由于填充的堆栈跟踪的数量)。如果我们知道可能有问题的操作员,那么成本就可以降低。然而,我们通常不知道哪些运算符可能有问题,除非我们在野外观察到错误,发现我们缺少程序集信息,然后修改代码以激活程序集跟踪,希望再次观察到相同的错误。


在这种情况下,我们必须切换到调试模式并做好准备,以便更好地观察错误的第二次发生,这一次捕获所有附加信息。


如果您可以识别在应用程序中组装的可维护性至关重要的反应链,则可以使用 checkpoint() 运算符实现这两种技术的混合。


您可以将此运算符链接到方法链中。 checkpoint 运算符的工作方式类似于钩子版本,但仅适用于该特定链的链接。


还有一个 checkpoint(String) 变体,可让您向程序集回溯添加唯一的 String 标识符。这样,堆栈跟踪就被省略,您可以依靠描述来识别组装站点。 checkpoint(String) 比常规 checkpoint 的处理成本更低。


最后但并非最不重要的一点是,如果您想向检查点添加更通用的描述,但仍依赖堆栈跟踪机制来识别程序集站点,则可以使用 checkpoint("description", true) 版本强制执行该行为。我们现在回到回溯的初始消息,并用 description 进行了扩充,如以下示例所示:

Assembly trace from producer [reactor.core.publisher.ParallelSource], described as [descriptionCorrelation1234] : (1)
	reactor.core.publisher.ParallelFlux.checkpoint(ParallelFlux.java:215)
	reactor.core.publisher.FluxOnAssemblyTest.parallelFluxCheckpointDescriptionAndForceStack(FluxOnAssemblyTest.java:225)
Error has been observed at the following site(s):
	|_	ParallelFlux.checkpoint ⇢ reactor.core.publisher.FluxOnAssemblyTest.parallelFluxCheckpointDescriptionAndForceStack(FluxOnAssemblyTest.java:225)
1
descriptionCorrelation1234checkpoint 中提供的描述。


该描述可以是静态标识符或用户可读的描述或更广泛的相关 ID(例如,来自 HTTP 请求的标头)。


当全局调试与检查点一起启用时,将应用全局调试回溯样式,并且检查点仅反映在“已观察到错误...”部分。因此,在这种情况下,重检查点的名称不可见。


7.4.生产就绪的全局调试


Project Reactor 附带一个单独的 Java 代理,可以检测您的代码并添加调试信息,而无需支付在每个操作符调用上捕获堆栈跟踪的成本。该行为与激活调试模式(又称回溯)非常相似,但没有运行时性能开销。


要在您的应用程序中使用它,您必须将其添加为依赖项。


以下示例显示如何在 Maven 中添加 reactor-tools 作为依赖项:


示例 21.Maven 中的reactor-tools,位于 <dependencies>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-tools</artifactId>
    (1)
</dependency>
1
如果您使用 BOM,则无需指定 <version>


以下示例展示了如何在 Gradle 中添加 reactor-tools 作为依赖项:


例22. Gradle中的reactor-tools,修改 dependencies
dependencies {
   compile 'io.projectreactor:reactor-tools'
}


它还需要显式初始化:

ReactorDebugAgent.init();

由于实现会在加载类时对其进行检测,因此最好将其放置在 main(String[]) 方法中的其他所有内容之前:
public static void main(String[] args) {
    ReactorDebugAgent.init();
    SpringApplication.run(Application.class, args);
}


如果您无法立即运行 init,您也可以使用 processExistingClasses() 重新处理现有的类。例如,在 JUnit5 中,从 TestExecutionListener 甚至类 static 初始化块中进行测试:

ReactorDebugAgent.init();
ReactorDebugAgent.processExistingClasses();

请注意,由于需要迭代所有加载的类并应用转换,重新处理需要几秒钟的时间。仅当您发现某些调用站点未检测时才使用它。

7.4.1. Limitations


ReactorDebugAgent 作为 Java 代理实现,并使用 ByteBuddy 执行自附加。自附加可能无法在某些 JVM 上工作,请参阅 ByteBuddy 的文档了解更多详细信息。


7.4.2.将 ReactorDebugAgent 作为 Java 代理运行


如果您的环境不支持 ByteBuddy 的自附加,您可以将 reactor-tools 作为 Java 代理运行:

java -javaagent reactor-tools.jar -jar app.jar


7.4.3.在构建时运行 ReactorDebugAgent


也可以在构建时运行 reactor-tools 。为此,您需要将其应用为 ByteBuddy 构建工具的插件。


该转换将仅应用于您项目的类。类路径库不会被检测。

示例 23.带有 ByteBuddy Maven 插件的reactor-tools
<dependencies>
	<dependency>
		<groupId>io.projectreactor</groupId>
		<artifactId>reactor-tools</artifactId>
		(1)
		<classifier>original</classifier> (2)
		<scope>runtime</scope>
	</dependency>
</dependencies>

<build>
	<plugins>
		<plugin>
			<groupId>net.bytebuddy</groupId>
			<artifactId>byte-buddy-maven-plugin</artifactId>
			<configuration>
				<transformations>
					<transformation>
						<plugin>reactor.tools.agent.ReactorDebugByteBuddyPlugin</plugin>
					</transformation>
				</transformations>
			</configuration>
		</plugin>
	</plugins>
</build>
1
如果您使用 BOM,则无需指定 <version>
2
classifier 这里很重要。

示例 24. 带有 ByteBuddy 的 Gradle 插件的reactor-tools
plugins {
	id 'net.bytebuddy.byte-buddy-gradle-plugin' version '1.10.9'
}

configurations {
	byteBuddyPlugin
}

dependencies {
	byteBuddyPlugin(
			group: 'io.projectreactor',
			name: 'reactor-tools',
			(1)
			classifier: 'original', (2)
	)
}

byteBuddy {
	transformation {
		plugin = "reactor.tools.agent.ReactorDebugByteBuddyPlugin"
		classPath = configurations.byteBuddyPlugin
	}
}
1
如果您使用 BOM,则无需指定 version
2
classifier 这里很重要。


7.5。记录序列


除了堆栈跟踪调试和分析之外,工具包中另一个强大的工具是能够以异步顺序跟踪和记录事件。


log() 运算符就可以做到这一点。它链接在一个序列内,查看其上游 FluxMono 的每个事件(包括 onNextonError 和 < b5> 以及订阅、取消和请求)。


关于日志记录实现的说明


log 运算符使用 Loggers 实用程序类,该实用程序类通过 SLF4J 获取常见的日志记录框架,例如 Log4J 和 Logback,并且如果 SLF4J 是,则默认记录到控制台不可用。


控制台回退使用 System.err 表示 WARNERROR 日志级别,使用 System.out 表示其他所有日志级别。


如果您更喜欢 JDK java.util.logging 回退(如 3.0.x 中那样),则可以通过将 reactor.logging.fallback 系统属性设置为 JDK 来获取它。


在所有情况下,在生产中登录时,您应该注意配置底层日志框架以使用其最异步和非阻塞的方法 - 例如,Logback 中的 AsyncAppenderAsyncLogger 在 Log4j 2 中。


例如,假设我们激活并配置了 Logback 以及类似 range(1,10).take(3) 的链。通过在 take 之前放置 log() ,我们可以深入了解它的工作原理以及它向上游传播到范围的事件类型,如以下示例所示:

Flux<Integer> flux = Flux.range(1, 10)
                         .log()
                         .take(3);
flux.subscribe();


这会打印出以下内容(通过记录器的控制台附加程序):

10:45:20.200 [main] INFO  reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) (1)
10:45:20.205 [main] INFO  reactor.Flux.Range.1 - | request(3) (2)
10:45:20.205 [main] INFO  reactor.Flux.Range.1 - | onNext(1) (3)
10:45:20.205 [main] INFO  reactor.Flux.Range.1 - | onNext(2)
10:45:20.205 [main] INFO  reactor.Flux.Range.1 - | onNext(3)
10:45:20.205 [main] INFO  reactor.Flux.Range.1 - | cancel() (4)


在这里,除了记录器自己的格式化程序(时间、线程、级别、消息)之外, log() 运算符还以其自己的格式输出一些内容:

1
reactor.Flux.Range.1 是日志的自动类别,以防您在链中多次使用该运算符。它可以让您区分记录了哪个操作员的事件(在本例中为 range )。您可以使用 log(String) 方法签名用您自己的自定义类别覆盖标识符。在几个分隔字符之后,将打印实际事件。在这里,我们得到一个 onSubscribe 调用、一个 request 调用、三个 onNext 调用和一个 cancel 调用。对于第一行 onSubscribe ,我们得到 Subscriber 的实现,它通常对应于特定于运算符的实现。在方括号之间,我们得到了额外的信息,包括算子是否可以通过同步或异步融合自动优化。
2
在第二行,我们可以看到 take 将向上游的请求限制为 3 个。
3
然后范围连续发送三个值。
4
在最后一行,我们看到 cancel()


第二行 (2) 和最后一行 (4) 最有趣。我们可以看到 take 在那里起作用。它利用背压来向源请求准确的预期元素数量。收到足够的元素后,它通过调用 cancel() 告诉源不再需要任何元素。请注意,如果下游本身使用了背压,例如。通过仅请求 1 个元素, take 运算符将遵守这一点(当将请求从下游传播到上游时,它会限制请求)。


建议编辑“调试反应器”


8. 公开 Reactor 指标


Project Reactor 是一个旨在提高性能和更好地利用资源的库。但要真正了解系统的性能,最好能够监控其各个组件。


这就是 Reactor 通过 reactor-core-micrometer 模块提供与 Micrometer 内置集成的原因。该模块在 2022.0 BOM 版本中引入,提供了对 Micrometer 的显式依赖,这使得它能够为指标和观察提供微调的 API。


到 Reactor-Core 3.5.0 为止,指标被实现为运算符,如果 Micrometer 不在类路径上,则这些运算符将是无操作的。


reactor-core-micrometer API 要求用户显式提供某种形式的注册表,而不是依赖于硬编码的全局注册表。当将检测应用于具有原生命名或标签概念的类时,这些 API 将尝试发现反应链中的此类元素。否则,API 将期望与注册表一起提供命名计量表的前缀。

 8.1.调度程序指标


Reactor 中的每个异步操作都是通过线程和调度程序中描述的调度程序抽象来完成的。这就是为什么监视调度程序、留意开始看起来可疑的关键指标并做出相应反应非常重要。


reactor-core-micrometer 模块提供了一个“定时” Scheduler 包装器,用于围绕通过它提交的任务执行测量,可以按如下方式使用:

Scheduler originalScheduler = Schedulers.newParallel("test", 4);

Scheduler schedulerWithMetrics = Micrometer.timedScheduler(
	originalScheduler, (1)
	applicationDefinedMeterRegistry, (2)
	"testingMetrics", (3)
	Tags.of(Tag.of("additionalTag", "yes")) (4)
);
1  要换行的 Scheduler
2
用于发布指标的 MeterRegistry
3
命名仪表时使用的前缀。例如,这将导致创建一个 testingMetrics.scheduler.tasks.completed 仪表。
4
添加到为该包装创建的所有计量表的可选标签 Scheduler

当包装常见的 Scheduler (例如 Schedulers.single() )或在多个地方使用的 Scheduler 时,只有 Runnable 任务通过 Micrometer#timedScheduler 返回的包装器实例提交的数据将被检测。


有关生成的仪表和关联的默认标签,请参阅 Micrometer.timedScheduler()

 8.2.发布商指标


有时,能够在反应式管道中的某个阶段记录指标很有用。


一种方法是从提供给 tap 运算符的自定义 SignalListener 手动将值推送到您选择的指标后端。


开箱即用的实现实际上是由 reactor-core-micrometer 模块通过 Micrometer#metrics API 提供的。考虑以下管道:

listenToEvents()
    .doOnNext(event -> log.info("Received {}", event))
    .delayUntil(this::processEvent)
    .retry()
    .subscribe();


要启用此源 Flux (从 listenToEvents() 返回)的指标,我们需要打开指标集合:

listenToEvents()
    .name("events") (1)
    .tap(Micrometer.metrics( (2)
        applicationDefinedMeterRegistry (3)
    ))
    .doOnNext(event -> log.info("Received {}", event))
    .delayUntil(this::processEvent)
    .retry()
    .subscribe();
1
反应式管道此阶段的每个指标都将使用“事件”作为命名前缀(可选,默认为 reactor 前缀)。
2
我们使用 tap 运算符与 reactor-core-micrometer 中提供的 SignalListener 实现相结合来收集指标。
3
与该模块中的其他 API 一样,需要显式提供要发布指标的 MeterRegistry


Micrometer.metrics() 中提供了公开指标的详细信息。

 8.2.1.标签


除了 Micrometer.metrics() 中描述的常见标签之外,用户还可以通过 tag 运算符将自定义标签添加到其反应链中:

listenToEvents()
    .name("events") (1)
    .tag("source", "kafka") (2)
    .tap(Micrometer.metrics(applicationDefinedRegistry)) (3)
    .doOnNext(event -> log.info("Received {}", event))
    .delayUntil(this::processEvent)
    .retry()
    .subscribe();
1
此阶段的每个指标都将使用“事件”前缀进行标识。
2
将自定义标签“source”设置为值“kafka”。
3
除了常见标签之外,所有报告的指标还将分配有 source=kafka 标签。


请注意,根据您使用的监控系统,在使用标签时使用名称可能被视为强制,因为否则会导致两个默认命名序列之间出现一组不同的标签。某些系统(例如 Prometheus)可能还要求具有相同名称的每个指标具有完全相同的标签集。

 8.2.2.观察


除了完整指标之外, reactor-core-micrometer 模块还提供基于 Micrometer 的 Observation 的替代方案。根据配置和运行时类路径, Observation 可以转换为计时器、跨度、日志语句或任意组合。


可以通过 tap 运算符和 Micrometer.observation 实用程序观察反应链,如下所示:

listenToEvents()
    .name("events") (1)
    .tap(Micrometer.observation( (2)
		applicationDefinedRegistry)) (3)
    .doOnNext(event -> log.info("Received {}", event))
    .delayUntil(this::processEvent)
    .retry()
    .subscribe();
1
此管道的 Observation 将使用“events”前缀进行标识。
2
我们将 tap 运算符与 observation 实用程序一起使用。
3
必须提供一个注册表来发布观察结果。请注意,这是一个 ObservationRegistry


Micrometer.observation() 中提供了观察及其标签的详细信息。


您还可以通过 Micrometer.observation(ObservationRegistry registry, Function<ObservationRegistry, Observation> observationSupplier) 与您自己的观察供应商完全定制千分尺的观察,如下所示:

listenToEvents()
    .name("events") (1)
    .tap(Micrometer.observation( (2)
    	applicationDefinedRegistry, (3)
    	registry -> Observation.createNotStarted( (4)
    		myConvention, (5)
            myContextSupplier, (6)
            registry)))
    .doOnNext(event -> log.info("Received {}", event))
    .delayUntil(this::processEvent)
    .retry()
    .subscribe();
1
此管道的 Observation 将使用“events”前缀进行标识。
2
我们将 tap 运算符与 observation 实用程序一起使用。
3
必须提供一个注册表来发布观察结果。请注意,这是一个 ObservationRegistry
4
我们提供自己的函数来创建观察
5  使用自定义 ObservationConvention
6
和自定义 Supplier<Context>


建议编辑“暴露反应器指标”


8.3. Reactor-Core-Micrometer 模块的仪表和标签

8.3.1. Micrometer.metrics()


下面是指标点击侦听器功能使用的计量列表,通过 Micrometer.metrics(MeterRegistry meterRegistry) 公开。


请注意,下面的指标使用动态 %s 前缀。当应用于使用 name(String n) 运算符的 FluxMono 时,它将替换为 n 。否则,它将替换为默认值 "reactor"
 流动持续时间


订阅与序列终止或取消之间经过的持续时间的时间。添加 TerminationTags#STATUS 标记以指定导致计时器结束的事件( "completed""completedEmpty""error""cancelled" )。


指标名称 %s.flow.duration - 由于它包含 %s ,因此该名称是动态的,将在运行时解析。输入 distribution summary


*.active 指标中可能会缺少启动观察后添加的键值。

表 1. 低基数键

 姓名

 描述

  exception (必填)


当 STATUS 为 "error" 时 FLOW_DURATION 使用的标记,用于存储发生的异常。

  status (必填)

 终止状态:


  • "completed" 用于以 onComplete 终止的序列,并带有 onNext(s)


  • "completedEmpty" 对于在 onComplete 之前没有任何 onNext 的情况下终止的序列


  • "error" 用于以 onError 终止的序列


  • "cancelled" 对于已取消订阅的序列

  type (必填)


序列的类型( "Flux""Mono" )。

 格式错误的源事件


计算从格式错误的源接收到的事件数(即 onComplete 之后的 onNext)。


指标名称 %s.malformed.source - 由于它包含 %s ,因此该名称是动态的,将在运行时解析。输入 counter


*.active 指标中可能会缺少启动观察后添加的键值。

表 2. 低基数键

 姓名

 描述

  type (必填)


序列的类型( "Flux""Mono" )。

 下次延迟时


测量每个 onNext 之间(或第一个 onNext 和 onSubscribe 事件之间)的延迟。


指标名称 %s.onNext.delay - 由于它包含 %s ,因此该名称是动态的,将在运行时解析。键入 timer 和基本单位 nanoseconds


*.active 指标中可能会缺少启动观察后添加的键值。

Micrometer 内部使用 nanoseconds 作为基本单位。然而,每个后端决定实际的基本单元。 (即普罗米修斯使用秒)

表 3. 低基数键

 姓名

 描述

  type (必填)


序列的类型( "Flux""Mono" )。

 要求金额


计算所有订阅者对命名序列(例如 Flux.name(String) )请求的数量,直到至少一个订阅者请求无限数量。


指标名称 %s.requested - 由于它包含 %s ,因此该名称是动态的,将在运行时解析。输入 distribution summary


*.active 指标中可能会缺少启动观察后添加的键值。

表 4. 低基数键

 姓名

 描述

  type (必填)


序列的类型( "Flux""Mono" )。

 已订阅


计算序列的订阅数量。


指标名称 %s.subscribed - 由于它包含 %s ,因此该名称是动态的,将在运行时解析。输入 counter


*.active 指标中可能会缺少启动观察后添加的键值。

表 5. 低基数键

 姓名

 描述

  type (必填)


序列的类型( "Flux""Mono" )。

8.3.2. Micrometer.timedScheduler()


下面是 TimedScheduler 功能使用的计量列表,通过 Micrometer.timedScheduler(Scheduler original, MeterRegistry meterRegistry, String metricsPrefix) 公开。


请注意,下面的指标使用动态 %s 前缀。在实践中,这被替换为提供的 metricsPrefix
 活动任务


LongTaskTimer 反映当前正在运行的任务。请注意,这反映了所有类型的活动任务,包括延迟或定期安排的任务(每次迭代都被视为活动任务)。


指标名称 %s.scheduler.tasks.active - 由于它包含 %s ,因此该名称是动态的,将在运行时解析。输入 long task timer


*.active 指标中可能会缺少启动观察后添加的键值。

Micrometer 内部使用 nanoseconds 作为基本单位。然而,每个后端决定实际的基本单元。 (即普罗米修斯使用秒)
 任务完成


反映已完成执行的任务的计时器。请注意,这反映了所有类型的活动任务,包括延迟或定期的任务(每次迭代都被视为单独的已完成任务)。


指标名称 %s.scheduler.tasks.completed - 由于它包含 %s ,因此该名称是动态的,将在运行时解析。输入 timer


*.active 指标中可能会缺少启动观察后添加的键值。

Micrometer 内部使用 nanoseconds 作为基本单位。然而,每个后端决定实际的基本单元。 (即普罗米修斯使用秒)
 待处理任务


LongTaskTimer 反映已提交立即执行但由于调度程序已达到最大容量而无法立即启动的任务。请注意,仅考虑通过 Scheduler#schedule(Runnable) 和 Scheduler.Worker#schedule(Runnable) 立即提交。


指标名称 %s.scheduler.tasks.pending - 由于它包含 %s ,因此该名称是动态的,将在运行时解析。输入 long task timer


*.active 指标中可能会缺少启动观察后添加的键值。

Micrometer 内部使用 nanoseconds 作为基本单位。然而,每个后端决定实际的基本单元。 (即普罗米修斯使用秒)
 已提交的任务


每次提交任务时计数器都会加一(通过 Scheduler 和 Scheduler.Worker 上的任何调度方法)。


请注意,实际上有 4 个计数器,可以通过 SubmittedTags#SUBMISSION 标签来区分。因此,所有这些的总和可以与 TASKS_COMPLETED 计数器进行比较。


指标名称 %s.scheduler.tasks.submitted - 由于它包含 %s ,因此该名称是动态的,将在运行时解析。输入 counter


*.active 指标中可能会缺少启动观察后添加的键值。

表 6. 低基数键

 姓名

 描述

  submission.type (必填)


提交类型:

  •   "direct"Scheduler#schedule(Runnable)

  •   "delayed"Scheduler#schedule(Runnable,long,TimeUnit)


  • 初始延迟后 "periodic_initial" 用于 Scheduler#schedulePeriodically(Runnable,long,long,TimeUnit)


  • "periodic_iteration" 用于 Scheduler#schedulePeriodically(Runnable,long,long,TimeUnit) 进一步的周期性迭代

8.3.3. Micrometer.observation()


下面是观察点击侦听器功能使用的仪表列表,通过 Micrometer.observation(ObservationRegistry registry) 公开。


这是匿名观察,但您可以使用 name(String) 运算符创建具有自定义名称的类似观察。


您还可以通过 Micrometer.observation(ObservationRegistry registry, Function<ObservationRegistry, Observation> observationSupplier) 与您自己的观察供应商完全自定义 Micrometer 的观察,从而允许配置其属性(名称、上下文名称、低基数键和高基数键,...​)。
 匿名的


Micrometer.observation() 的匿名版本,当序列尚未通过例如明确命名时Flux#name(String) 运算符。


指标名称 reactor.observation 。输入 timer


指标名称 reactor.observation.active 。输入 long task timer


*.active 指标中可能会缺少启动观察后添加的键值。

Micrometer 内部使用 nanoseconds 作为基本单位。然而,每个后端决定实际的基本单元。 (即普罗米修斯使用秒)

表 7. 低基数键

 姓名

 描述

  reactor.status (必填)


序列的状态,指示它如何终止( "completed""completedEmpty""error""cancelled" )。

  reactor.type (必填)


序列的类型,即 "Flux""Mono"


9. 高级功能和概念


本章介绍 Reactor 的高级功能和概念,包括以下内容:


9.1.互用运算符的使用


从干净代码的角度来看,代码重用通常是一件好事。 Reactor 提供了一些模式,可以帮助您重用和交互代码,特别是对于您可能希望在代码库中定期应用的运算符或运算符组合。如果您将操作符链视为菜谱,则可以创建操作符菜谱的“食谱”。


9.1.1.使用 transform 运算符


transform 运算符允许您将运算符链的一部分封装到函数中。该函数在组装时应用于原始运算符链,以使用封装的运算符对其进行扩充。这样做对序列的所有订阅者应用相同的操作,基本上相当于直接链接操作符。以下代码显示了一个示例:

Function<Flux<String>, Flux<String>> filterAndMap =
f -> f.filter(color -> !color.equals("orange"))
      .map(String::toUpperCase);

Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
	.doOnNext(System.out::println)
	.transform(filterAndMap)
	.subscribe(d -> System.out.println("Subscriber to Transformed MapAndFilter: "+d));


下图显示了 transform 运算符如何封装流:

Transform Operator : encapsulate flows


前面的示例产生以下输出:

blue
Subscriber to Transformed MapAndFilter: BLUE
green
Subscriber to Transformed MapAndFilter: GREEN
orange
purple
Subscriber to Transformed MapAndFilter: PURPLE


9.1.2.使用 transformDeferred 运算符


transformDeferred 运算符与 transform 类似,也允许您将运算符封装在函数中。主要区别在于,此函数基于每个订阅者应用于原始序列。这意味着该函数实际上可以为每个订阅生成不同的操作符链(通过维护某种状态)。以下代码显示了一个示例:

AtomicInteger ai = new AtomicInteger();
Function<Flux<String>, Flux<String>> filterAndMap = f -> {
	if (ai.incrementAndGet() == 1) {
return f.filter(color -> !color.equals("orange"))
        .map(String::toUpperCase);
	}
	return f.filter(color -> !color.equals("purple"))
	        .map(String::toUpperCase);
};

Flux<String> composedFlux =
Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
    .doOnNext(System.out::println)
    .transformDeferred(filterAndMap);

composedFlux.subscribe(d -> System.out.println("Subscriber 1 to Composed MapAndFilter :"+d));
composedFlux.subscribe(d -> System.out.println("Subscriber 2 to Composed MapAndFilter: "+d));


下图显示了 transformDeferred 运算符如何处理每个订阅者的转换:

Compose Operator : Per Subscriber transformation


前面的示例产生以下输出:

blue
Subscriber 1 to Composed MapAndFilter :BLUE
green
Subscriber 1 to Composed MapAndFilter :GREEN
orange
purple
Subscriber 1 to Composed MapAndFilter :PURPLE
blue
Subscriber 2 to Composed MapAndFilter: BLUE
green
Subscriber 2 to Composed MapAndFilter: GREEN
orange
Subscriber 2 to Composed MapAndFilter: ORANGE
purple


9.2.热与冷


到目前为止,我们认为所有 Flux (和 Mono )都是相同的:它们都表示异步数据序列,并且在订阅之前不会发生任何事情。


但实际上,出版商有两大类:热的和冷的。


前面的描述适用于冷漠的出版商家族。他们为每个订阅重新生成数据。如果没有创建订阅,则永远不会生成数据。


想象一下 HTTP 请求:每个新订阅者都会触发一个 HTTP 调用,但如果没有人对结果感兴趣,则不会进行任何调用。


另一方面,热门发布商不依赖于任何数量的订阅者。他们可能会立即开始发布数据,并且每当有新的 Subscriber 到来时就会继续这样做(在这种情况下,订阅者只会看到订阅后发出的新元素)。对于热门发布商来说,在您订阅之前确实会发生一些事情。


Reactor 中的几个热门运算符的一个例子是 just :它在汇编时直接捕获值,然后将其重播给任何订阅它的人。再次使用 HTTP 调用类比,如果捕获的数据是 HTTP 调用的结果,则在实例化 just 时仅进行一次网络调用。


要将 just 转换为冷发布者,您可以使用 defer 。它将我们示例中的 HTTP 请求推迟到订阅时间(并且会导致每个新订阅都有单独的网络调用)。


相反, share()replay(…​) 可用于将冷发布者变成热门发布者(至少在第一次订阅发生后)。这两个在 Sinks 类中也有 Sinks.Many 等价物,允许以编程方式提供序列。


考虑两个示例,一个演示冷 Flux,另一个使用 Sinks 模拟热 Flux。以下代码显示了第一个示例:

Flux<String> source = Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
                          .map(String::toUpperCase);

source.subscribe(d -> System.out.println("Subscriber 1: "+d));
source.subscribe(d -> System.out.println("Subscriber 2: "+d));


第一个示例产生以下输出:

Subscriber 1: BLUE
Subscriber 1: GREEN
Subscriber 1: ORANGE
Subscriber 1: PURPLE
Subscriber 2: BLUE
Subscriber 2: GREEN
Subscriber 2: ORANGE
Subscriber 2: PURPLE


下图显示了重播行为:

Replaying behavior


两个订阅者都会捕获所有四种颜色,因为每个订阅者都会导致 Flux 上的运算符定义的进程运行。


将第一个示例与第二个示例进行比较,如以下代码所示:

Sinks.Many<String> hotSource = Sinks.unsafe().many().multicast().directBestEffort();

Flux<String> hotFlux = hotSource.asFlux().map(String::toUpperCase);

hotFlux.subscribe(d -> System.out.println("Subscriber 1 to Hot Source: "+d));

hotSource.emitNext("blue", FAIL_FAST); (1)
hotSource.tryEmitNext("green").orThrow(); (2)

hotFlux.subscribe(d -> System.out.println("Subscriber 2 to Hot Source: "+d));

hotSource.emitNext("orange", FAIL_FAST);
hotSource.emitNext("purple", FAIL_FAST);
hotSource.emitComplete(FAIL_FAST);
1
有关接收器的更多详细信息,请参阅接收器
2
旁注: orThrow() 这里是 emitNext + Sinks.EmitFailureHandler.FAIL_FAST 的替代方案,适合测试,因为在那里抛出是可以接受的(比反应式应用程序更是如此)。


第二个示例产生以下输出:

Subscriber 1 to Hot Source: BLUE
Subscriber 1 to Hot Source: GREEN
Subscriber 1 to Hot Source: ORANGE
Subscriber 2 to Hot Source: ORANGE
Subscriber 1 to Hot Source: PURPLE
Subscriber 2 to Hot Source: PURPLE


下图显示了订阅的广播方式:

Broadcasting a subscription


订阅者 1 捕获所有四种颜色。订阅者 2 在生成前两种颜色后创建,仅捕获最后两种颜色。此差异导致输出中 ORANGEPURPLE 增加一倍。无论何时附加订阅,操作员在此 Flux 上描述的过程都会运行。


9.3.使用 ConnectableFlux 向多个订阅者广播


有时,您可能不希望仅将某些处理推迟到一个订阅者的订阅时间,但您实际上可能希望其中几个订阅者会合,然后触发订阅和数据生成。


这就是 ConnectableFlux 的用途。 Flux API 中涵盖了返回 ConnectableFlux 的两个主要模式: publishreplay


  • publish 通过将这些请求转发到源,动态地尝试尊重来自不同订阅者的需求(在背压方面)。最值得注意的是,如果任何订阅者有 0 的待处理需求,则发布将暂停其对源的请求。


  • replay 缓冲通过第一个订阅看到的数据,最多可达可配置的限制(时间和缓冲区大小)。它将数据重播给后续订阅者。


ConnectableFlux 提供了额外的方法来管理下游订阅与原始源的订阅。这些附加方法包括以下内容:


  • 一旦您达到 Flux 的足够订阅量,就可以手动调用 connect() 。这会触发对上游源的订阅。


  • 一旦进行了 n 订阅, autoConnect(n) 就可以自动执行相同的工作。


  • refCount(n) 不仅自动跟踪传入订阅,还检测这些订阅何时被取消。如果没有跟踪到足够的订阅者,则源将“断开连接”,如果出现其他订阅者,则会导致稍后对源进行新的订阅。


  • refCount(int, Duration) 添加“宽限期”。一旦跟踪的订阅者数量变得太少,它就会在断开源连接之前等待 Duration ,这可能允许足够的新订阅者进入并再次跨越连接阈值。


考虑以下示例:

Flux<Integer> source = Flux.range(1, 3)
                           .doOnSubscribe(s -> System.out.println("subscribed to source"));

ConnectableFlux<Integer> co = source.publish();

co.subscribe(System.out::println, e -> {}, () -> {});
co.subscribe(System.out::println, e -> {}, () -> {});

System.out.println("done subscribing");
Thread.sleep(500);
System.out.println("will now connect");

co.connect();


前面的代码产生以下输出:

done subscribing
will now connect
subscribed to source
1
1
2
2
3
3


以下代码使用 autoConnect

Flux<Integer> source = Flux.range(1, 3)
                           .doOnSubscribe(s -> System.out.println("subscribed to source"));

Flux<Integer> autoCo = source.publish().autoConnect(2);

autoCo.subscribe(System.out::println, e -> {}, () -> {});
System.out.println("subscribed first");
Thread.sleep(500);
System.out.println("subscribing second");
autoCo.subscribe(System.out::println, e -> {}, () -> {});


前面的代码产生以下输出:

subscribed first
subscribing second
subscribed to source
1
1
2
2
3
3


9.4.三种批处理


当您有很多元素并且想要将它们分成批次时,Reactor 中有三种广泛的解决方案:分组、窗口和缓冲。这三个在概念上很接近,因为它们将 Flux<T> 重新分配到聚合中。分组和窗口化创建一个 Flux<Flux<T>> ,而缓冲聚合到一个 Collection<T>


9.4.1.使用 Flux<GroupedFlux<T>> 分组


分组是将源 Flux<T> 分成多个批次的行为,每个批次都匹配一个键。


关联的运算符是 groupBy


每个组都表示为 GroupedFlux<T> ,它允许您通过调用其 key() 方法来检索密钥。


各组的内容没有必要的连续性。一旦源元素产生新的密钥,该密钥的组就会被打开,并且与该密钥匹配的元素最终会出现在该组中(可以同时打开多个组)。


这意味着团体:


  1. 始终不相交(源元素属于一个且仅一个组)。


  2. 可以包含原始序列中不同位置的元素。

  3.  永远不会空。


以下示例根据值是偶数还是奇数对值进行分组:

StepVerifier.create(
	Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
		.groupBy(i -> i % 2 == 0 ? "even" : "odd")
		.concatMap(g -> g.defaultIfEmpty(-1) //if empty groups, show them
				.map(String::valueOf) //map to string
				.startWith(g.key())) //start with the group's key
	)
	.expectNext("odd", "1", "3", "5", "11", "13")
	.expectNext("even", "2", "4", "6", "12")
	.verifyComplete();

分组最适合组数中到少量的情况。这些组还必须强制被消耗(例如由 flatMap ),以便 groupBy 继续从上游获取数据并提供更多组。有时,这两个约束会相乘并导致挂起,例如当基数较高且使用组的 flatMap 并发性太低时。


9.4.2.使用 Flux<Flux<T>> 进行窗口化


窗口化是按照大小、时间、边界定义谓词或边界定义 Publisher 等标准将源 Flux<T> 分割为窗口的行为。


关联的运算符为 windowwindowTimeoutwindowUntilwindowWhilewindowWhen


与根据传入键随机重叠的 groupBy 相反,窗口(大多数时候)是按顺序打开的。


不过,某些变体仍然可以重叠。例如,在 window(int maxSize, int skip) 中, maxSize 参数是窗口关闭之前的元素数量, skip 参数是之后源中的元素数量将打开一个新窗口。因此,如果 maxSize > skip ,则在前一个窗口关闭之前打开一个新窗口,并且两个窗口重叠。


以下示例显示重叠窗口:

StepVerifier.create(
	Flux.range(1, 10)
		.window(5, 3) //overlapping windows
		.concatMap(g -> g.defaultIfEmpty(-1)) //show empty windows as -1
	)
		.expectNext(1, 2, 3, 4, 5)
		.expectNext(4, 5, 6, 7, 8)
		.expectNext(7, 8, 9, 10)
		.expectNext(10)
		.verifyComplete();

使用反向配置 ( maxSize < skip ),源中的一些元素将被删除,并且不属于任何窗口。


在通过 windowUntilwindowWhile 进行基于谓词的窗口的情况下,后续源元素与谓词不匹配也可能导致空窗口,如以下示例所示:

StepVerifier.create(
	Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
		.windowWhile(i -> i % 2 == 0)
		.concatMap(g -> g.defaultIfEmpty(-1))
	)
		.expectNext(-1, -1, -1) //respectively triggered by odd 1 3 5
		.expectNext(2, 4, 6) // triggered by 11
		.expectNext(12) // triggered by 13
		// however, no empty completion window is emitted (would contain extra matching elements)
		.verifyComplete();


9.4.3.使用 Flux<List<T>> 进行缓冲


缓冲与窗口类似,但有以下区别:它不是发出窗口(每个窗口都是 Flux<T> ),而是发出缓冲区( Collection<T> - 默认情况下为 List<T> )。


用于缓冲的运算符镜像了用于窗口的运算符: bufferbufferTimeoutbufferUntilbufferWhilebufferWhen


当相应的窗口操作符打开一个窗口时,缓冲操作符创建一个新的集合并开始向其中添加元素。当窗口关闭时,缓冲运算符会发出集合。


缓冲还可能导致源元素丢失或缓冲区重叠,如以下示例所示:

StepVerifier.create(
	Flux.range(1, 10)
		.buffer(5, 3) //overlapping buffers
	)
		.expectNext(Arrays.asList(1, 2, 3, 4, 5))
		.expectNext(Arrays.asList(4, 5, 6, 7, 8))
		.expectNext(Arrays.asList(7, 8, 9, 10))
		.expectNext(Collections.singletonList(10))
		.verifyComplete();


与窗口不同, bufferUntilbufferWhile 不会发出空缓冲区,如以下示例所示:

StepVerifier.create(
	Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13)
		.bufferWhile(i -> i % 2 == 0)
	)
	.expectNext(Arrays.asList(2, 4, 6)) // triggered by 11
	.expectNext(Collections.singletonList(12)) // triggered by 13
	.verifyComplete();


9.5。使用 ParallelFlux 并行化工作


如今,多核架构已成为一种商品,能够轻松并行工作非常重要。 Reactor 通过提供特殊类型 ParallelFlux 来帮助实现这一点,该类型公开针对并行工作进行优化的运算符。


要获取 ParallelFlux ,您可以在任何 Flux 上使用 parallel() 运算符。就其本身而言,此方法并不并行化工作。相反,它将工作负载划分为“轨道”(默认情况下,轨道数量与 CPU 内核数量相同)。


为了告诉结果 ParallelFlux 在哪里运行每个轨道(并且通过扩展,并行运行轨道),您必须使用 runOn(Scheduler) 。请注意,建议使用专用的 Scheduler 来进行并行工作: Schedulers.parallel()


比较下面两个例子:

Flux.range(1, 10)
    .parallel(2) (1)
    .subscribe(i -> System.out.println(Thread.currentThread().getName() + " -> " + i));
1
我们强制使用 Rails 数量,而不是依赖 CPU 核心的数量。
Flux.range(1, 10)
    .parallel(2)
    .runOn(Schedulers.parallel())
    .subscribe(i -> System.out.println(Thread.currentThread().getName() + " -> " + i));


第一个示例产生以下输出:

main -> 1
main -> 2
main -> 3
main -> 4
main -> 5
main -> 6
main -> 7
main -> 8
main -> 9
main -> 10


第二个在两个线程上正确并行,如以下输出所示:

parallel-1 -> 1
parallel-2 -> 2
parallel-1 -> 3
parallel-2 -> 4
parallel-1 -> 5
parallel-2 -> 6
parallel-1 -> 7
parallel-1 -> 9
parallel-2 -> 8
parallel-2 -> 10


如果并行处理序列后,您想要恢复到“正常” Flux 并按顺序应用运算符链的其余部分,则可以使用 sequential() ParallelFlux 上的方法。


请注意,如果您使用 Subscriber subscribeParallelFlux ,则 sequential() 会隐式应用,但在使用 < 的基于 lambda 的变体时则不会。 b4> 。


另请注意, subscribe(Subscriber<T>) 合并所有轨道,而 subscribe(Consumer<T>) 运行所有轨道。如果 subscribe() 方法有一个 lambda,则每个 lambda 的执行次数与有多少个 Rails 一样多。


您还可以通过 groups() 方法以 Flux<GroupedFlux<T>> 的形式访问各个轨道或“组”,并通过 composeGroup() 方法对它们应用其他运算符。


9.6。替换默认 Schedulers


正如我们在线程和调度程序部分中所描述的,Reactor Core 附带了多个 Scheduler 实现。虽然您始终可以通过 new* 工厂方法创建新实例,但每个 Scheduler 风格还具有一个可通过直接工厂方法访问的默认单例实例(例如 Schedulers.boundedElastic() )。


这些默认实例是当您未明确指定时需要 Scheduler 才能工作的运算符所使用的实例。例如, Flux#delayElements(Duration) 使用 Schedulers.parallel() 实例。


但是,在某些情况下,您可能需要以横切方式使用其他内容更改这些默认实例,而不必确保您调用的每个运算符都将特定的 Scheduler 作为参数。一个示例是通过包装真实调度程序来测量每个计划任务所花费的时间,以用于检测目的。换句话说,您可能想要更改默认的 Schedulers


可以通过 Schedulers.Factory 类更改默认调度程序。默认情况下, Factory 通过类似命名的方法创建所有标准 Scheduler 。您可以使用自定义实现来覆盖其中的每一个。


此外,工厂还公开了一种额外的自定义方法: decorateExecutorService 。它在创建由 ScheduledExecutorService 支持的每个 Reactor Core Scheduler 期间被调用(甚至是非默认实例,例如通过调用 Schedulers.newParallel() 创建的实例) )。


这使您可以调整要使用的 ScheduledExecutorService :默认显示为 Supplier ,并且根据配置的 Scheduler 类型,您可以选择完全绕过该供应商并返回您自己的实例,或者您可以 get() 默认实例并将其包装。


创建满足您需求的 Factory 后,您必须通过调用 Schedulers.setFactory(Factory) 来安装它。


最后, Schedulers 中有最后一个可定制的钩子: onHandleError 。每当提交给 SchedulerRunnable 任务抛出 Exception 时就会调用此钩子(请注意,如果为 UncaughtExceptionHandler 设置了 Thread 运行任务,处理程序和钩子都被调用)。


9.7.使用全局钩子


Reactor 还有另一类可配置回调,由 Reactor 运算符在各种情况下调用。它们都设置在 Hooks 类中,分为三类:

 9.7.1.落钩


当操作符的源不符合反应流规范时,将调用删除钩子。此类错误超出了正常执行路径(即它们无法通过 onError 传播)。


通常, Publisher 会调用运算符的 onNext ,尽管之前已经调用了 onCompleted 。在这种情况下, onNext 值将被删除。对于无关的 onError 信号也是如此。


相应的钩子 onNextDroppedonErrorDropped 允许您为这些 drop 提供全局 Consumer 。例如,您可以使用它来记录删除并根据需要清理与某个值关联的资源(因为它永远不会进入反应链的其余部分)。


连续两次设置钩子是累加的:您提供的每个消费者都会被调用。可以使用 Hooks.resetOn*Dropped() 方法将挂钩完全重置为其默认值。


9.7.2.内部错误挂钩


当在执行 onNextonError 和 < 期间抛出意外的 Exception 时,运算符会调用一个钩子 onOperatorError 。 b4> 方法。


与之前的类别不同,这仍然在正常的执行路径内。一个典型的例子是 map 运算符和一个抛出 Exception 的映射函数(例如除以零)。此时仍然可以通过 onError 的通常通道,这就是操作员所做的。


首先,它将 Exception 传递到 onOperatorError 。该钩子允许您检查错误(以及相关值,如果相关)并更改 Exception 。当然,你也可以做一些副业,比如记录并返回原始的 Exception


请注意,您可以多次设置 onOperatorError 挂钩。您可以为特定的 BiFunction 提供 String 标识符,并且使用不同键的后续调用会连接所有执行的函数。另一方面,重复使用同一键两次可以替换之前设置的功能。


因此,默认的钩子行为可以完全重置(通过使用 Hooks.resetOnOperatorError() ),也可以仅针对特定的 key 部分重置(通过使用 Hooks.resetOnOperatorError(String) )。

 9.7.3。组装挂钩


这些挂钩与操作员的生命周期息息相关。当组装(即实例化)运算符链时将调用它们。 onEachOperator 允许您通过返回不同的 Publisher 来动态更改链中组装的每个运算符。 onLastOperator 类似,只是它仅在 subscribe 调用之前链中的最后一个运算符上调用。


如果你想用横切的 Subscriber 实现来装饰所有运算符,你可以查看 Operators#lift* 方法来帮助你处理各种类型的 Reactor Publishers 那里( FluxMonoParallelFluxGroupedFluxConnectableFlux ),以及他们的 Fuseable 版本。


onOperatorError 一样,这些钩子是累积的,可以用键来识别。它们也可以部分或全部重置。

 9.7.4.挂钩预设


Hooks 实用程序类提供了两个预设挂钩。这些是默认行为的替代方案,您可以通过调用相应的方法来使用,而不是自己提出挂钩:


  • onNextDroppedFail()onNextDropped 用于抛出 Exceptions.failWithCancel() 异常。现在默认在 DEBUG 级别记录删除的值。要返回到旧的默认抛出行为,请使用 onNextDroppedFail()


  • onOperatorDebug() :此方法激活调试模式。它与 onOperatorError 挂钩相关联,因此调用 resetOnOperatorError() 也会重置它。您可以使用 resetOnOperatorDebug() 独立重置它,因为它在内部使用特定的键。


9.8.将上下文添加到反应序列


从命令式编程角度切换到响应式编程思维方式时遇到的重大技术挑战之一在于如何处理线程。


与您可能习惯的相反,在反应式编程中,您可以使用 Thread 来处理大致同时运行的多个异步序列(实际上,以非阻塞锁步方式)。执行也可以轻松且经常地从一个线程跳转到另一个线程。


对于使用依赖于更“稳定”的线程模型的功能(例如 ThreadLocal )的开发人员来说,这种安排尤其困难。由于它允许您将数据与线程关联起来,因此在反应式上下文中使用它变得很棘手。因此,依赖 ThreadLocal 的库在与 Reactor 一起使用时至少会带来新的挑战。最坏的情况是,他们工作得很差,甚至失败。使用 Logback 的 MDC 来存储和记录相关 ID 就是这种情况的一个典型例子。


ThreadLocal 使用的常见解决方法是通过使用(例如)按顺序移动上下文数据 C 以及业务数据 T Tuple2<T, C> 。这看起来不太好,并且会将正交关注点(上下文数据)泄漏到您的方法和 Flux 签名中。


从版本 3.1.0 开始,Reactor 附带了一项高级功能,该功能在某种程度上与 ThreadLocal 相当,但可以应用于 FluxMono 而不是 Thread 。此功能称为 Context


作为其外观的说明,以下示例同时读取和写入 Context

String key = "message";
Mono<String> r = Mono.just("Hello")
    .flatMap(s -> Mono.deferContextual(ctx ->
         Mono.just(s + " " + ctx.get(key))))
    .contextWrite(ctx -> ctx.put(key, "World"));

StepVerifier.create(r)
            .expectNext("Hello World")
            .verifyComplete();


在下面的部分中,我们将介绍 Context 以及如何使用它,以便您最终能够理解前面的示例。


这是一项更针对库开发人员的高级功能。它需要对 Subscription 的生命周期有很好的理解,并且适用于负责订阅的库。

 9.8.1. Context API


Context 是一个让人想起 Map 的界面。它存储键值对,并允许您获取通过其键存储的值。它有一个简化版本,仅公开读取方法,即 ContextView 。进一步来说:


  • 键和值的类型都是 Object ,因此 Context (和 ContextView )实例可以包含来自不同库和源的任意数量的高度不同的值。


  • Context 是不可变的。它公开了 putputAll 等写入方法,但它们生成一个新实例。


  • 对于甚至不公开此类写入方法的只读 API,自 3.4.0 起就有 ContextView 超级接口


  • 您可以使用 hasKey(Object key) 检查密钥是否存在。


  • 使用 getOrDefault(Object key, T defaultValue) 检索值(转换为 T ),或者如果 Context 实例没有该键,则回退到默认值。


  • 使用 getOrEmpty(Object key) 获取 Optional<T>Context 实例尝试将存储的值转换为 T )。


  • 使用 put(Object key, Object value) 存储键值对,返回一个新的 Context 实例。您还可以使用 putAll(ContextView) 将两个上下文合并到一个新上下文中。


  • 使用 delete(Object key) 删除与键关联的值,返回新的 Context


创建 Context 时,您可以使用静态 Context.of 方法创建最多具有五个键值对的预值 Context 实例。它们采用 2、4、6、8 或 10 个 Object 实例,每对 Object 实例都是要添加到 Context 的键值对。


或者,您也可以使用 Context.empty() 创建空的 Context


9.8.2.将 Context 绑定到 Flux 并写入


为了使 Context 有用,它必须绑定到特定的序列并且可由链中的每个操作符访问。请注意,该运算符必须是 Reactor 原生运算符,因为 Context 是 Reactor 特有的。


实际上, Context 与链中的每个 Subscriber 相关联。它使用 Subscription 传播机制使其自身可供每个操作员使用,从最终的 subscribe 开始并沿链向上移动。


为了填充只能在订阅时完成的 Context ,您需要使用 contextWrite 运算符。


contextWrite(ContextView) 合并您提供的 ContextView 和来自下游的 Context (请记住, Context 从链的底部传播到顶部)。这是通过调用 putAll 完成的,从而为上游生成一个新的 Context


您还可以使用更高级的 contextWrite(Function<Context, Context>) 。它从下游接收 Context 的副本,让您根据需要添加或删除值,并返回新的 Context 供使用。您甚至可以决定返回一个完全不同的实例,尽管确实不建议这样做(这样做可能会影响依赖于 Context 的第三方库)。


9.8.3。通过 ContextView 读取 Context


填充 Context 后,您可能想在运行时查看它。大多数时候,将信息放入 Context 的责任由最终用户承担,而利用该信息由第三方库承担,因为此类库通常位于客户端代码的上游。


面向读取的运算符允许通过公开其 ContextView 来从运算符链中的 Context 获取数据:


  • 要从类似源的运算符访问上下文,请使用 deferContextual 工厂方法


  • 要从操作符链的中间访问上下文,请使用 transformDeferredContextual(BiFunction)


  • 或者,在处理内部序列(例如 flatMap 内部)时,可以使用 Mono.deferContextual(Mono::just) 来具体化 ContextView 。但通常情况下,您可能希望直接在 defer 的 lambda 中执行有意义的工作,例如。 Mono.deferContextual(ctx → doSomethingAsyncWithContextData(v, ctx.get(key))) 其中 v 是被平面映射的值。


为了从 Context 读取数据而不误导用户认为可以在数据通过管道运行时对其进行写入,上面的运算符仅公开 ContextView 。如果需要使用仍需要 Context 的其余 API 之一,则可以使用 Context.of(contextView) 进行转换。


9.8.4。简单的 Context 示例


本节中的示例旨在更好地理解使用 Context 的一些注意事项。


我们首先更详细地回顾一下介绍中的简单示例,如下例所示:

String key = "message";
Mono<String> r = Mono.just("Hello")
    .flatMap(s -> Mono.deferContextual(ctx ->
         Mono.just(s + " " + ctx.get(key)))) (2)
    .contextWrite(ctx -> ctx.put(key, "World")); (1)

StepVerifier.create(r)
            .expectNext("Hello World") (3)
            .verifyComplete();
1
运算符链以对 contextWrite(Function) 的调用结束,该调用将 "World" 放入 "message" 键下的 Context 中。
2
我们在源元素上 flatMap ,将 ContextViewMono.deferContextual() 具体化,并直接提取与 "message" 关联的数据并将其与原始元素连接起来单词。
3
生成的 Mono<String> 发出 "Hello World"

上面的编号与实际的线路顺序并没有错误。它代表执行顺序。尽管 contextWrite 是链的最后一部分,但它是第一个执行的部分(由于其订阅时间性质以及订阅信号从下到上流动的事实)。

在运算符链中,写入 Context 的位置和从中读取位置的相对位置很重要。 Context 是不可变的,其内容只能由其上方的运算符看到,如以下示例所示:
String key = "message";
Mono<String> r = Mono.just("Hello")
    .contextWrite(ctx -> ctx.put(key, "World")) (1)
    .flatMap( s -> Mono.deferContextual(ctx ->
        Mono.just(s + " " + ctx.getOrDefault(key, "Stranger")))); (2)

StepVerifier.create(r)
            .expectNext("Hello Stranger") (3)
            .verifyComplete();
1
Context 被写入链中的过高位置。
2
因此,在 flatMap 中,没有与我们的键关联的值。而是使用默认值。
3
由此产生的 Mono<String> 会发出 "Hello Stranger"


同样,在多次尝试将相同键写入 Context 的情况下,写入的相对顺序也很重要。读取 Context 的运算符会看到最接近其下方设置的值,如以下示例所示:

String key = "message";
Mono<String> r = Mono
    .deferContextual(ctx -> Mono.just("Hello " + ctx.get(key)))
    .contextWrite(ctx -> ctx.put(key, "Reactor")) (1)
    .contextWrite(ctx -> ctx.put(key, "World")); (2)

StepVerifier.create(r)
            .expectNext("Hello Reactor") (3)
            .verifyComplete();
1
对键 "message" 的写入尝试。
2
对键 "message" 进行另一次写入尝试。
3
deferContextual 只看到最接近它(及其下方)的值集: "Reactor"


在前面的示例中, Context 在订阅期间填充为 "World" 。然后订阅信号向上游移动并发生另一次写入。这会生成第二个不可变的 Context ,其值为 "Reactor" 。之后,数据开始流动。 deferContextual 看到最接近它的 Context ,这是我们的第二个 Context ,其值为 "Reactor" (作为 "Reactor" 值向用户公开) b8>)。


您可能想知道 Context 是否与数据信号一起传播。如果是这种情况,在这两次写入之间放置另一个 flatMap 将使用顶部 Context 中的值。但事实并非如此,如以下示例所示:

String key = "message";
Mono<String> r = Mono
    .deferContextual(ctx -> Mono.just("Hello " + ctx.get(key))) (3)
    .contextWrite(ctx -> ctx.put(key, "Reactor")) (2)
    .flatMap( s -> Mono.deferContextual(ctx ->
        Mono.just(s + " " + ctx.get(key)))) (4)
    .contextWrite(ctx -> ctx.put(key, "World")); (1)

StepVerifier.create(r)
            .expectNext("Hello Reactor World") (5)
            .verifyComplete();
1
这是第一次写入。
2
这是第二次写入。
3
顶部上下文读取看到第二个写入。
4
flatMap 将初始读取的结果与第一次写入的值连接起来。
5
Mono 发出 "Hello Reactor World"


原因是 ContextSubscriber 关联,并且每个运算符通过从其下游 Subscriber 请求来访问 Context


最后一个有趣的传播情况是 Context 也被写入 flatMap 内部,如下例所示:

String key = "message";
Mono<String> r = Mono.just("Hello")
    .flatMap( s -> Mono
        .deferContextual(ctxView -> Mono.just(s + " " + ctxView.get(key)))
    )
    .flatMap( s -> Mono
        .deferContextual(ctxView -> Mono.just(s + " " + ctxView.get(key)))
        .contextWrite(ctx -> ctx.put(key, "Reactor")) (1)
    )
    .contextWrite(ctx -> ctx.put(key, "World")); (2)

StepVerifier.create(r)
            .expectNext("Hello World Reactor")
            .verifyComplete();
1
contextWrite 不会影响 flatMap 之外的任何内容。
2
这个 contextWrite 影响主序列的 Context


在前面的示例中,最终发出的值是 "Hello World Reactor" 而不是“Hello Reactor World”,因为写入 "Reactor"contextWrite 是作为内部的一部分执行的。第二个 flatMap 的序列。因此,它不可见,也不通过主序列传播,并且第一个 flatMap 看不到它。传播和不变性隔离了创建中间内部序列(例如 flatMap )的运算符中的 Context

 9.8.5。完整示例


现在我们可以考虑一个更现实的示例,即库从 Context 读取信息:一个反应式 HTTP 客户端,它采用 Mono<String> 作为 PUT


从用户角度来看,其调用方式如下:

doPut("www.example.com", Mono.just("Walter"))


为了传播相关 ID,将按如下方式调用它:

doPut("www.example.com", Mono.just("Walter"))
	.contextWrite(Context.of(HTTP_CORRELATION_ID, "2-j3r9afaf92j-afkaf"))


如前面的代码片段所示,用户代码使用 contextWriteHTTP_CORRELATION_ID 键值对填充 Context 。运算符的上游是 HTTP 客户端库返回的 Mono<Tuple2<Integer, String>> (HTTP 响应的简单表示)。因此它有效地将信息从用户代码传递到库代码。


以下示例从库的角度显示了模拟代码,该代码读取上下文并在可以找到相关 ID 时“增强请求”:

static final String HTTP_CORRELATION_ID = "reactive.http.library.correlationId";

Mono<Tuple2<Integer, String>> doPut(String url, Mono<String> data) {
  Mono<Tuple2<String, Optional<Object>>> dataAndContext =
      data.zipWith(Mono.deferContextual(c -> (1)
          Mono.just(c.getOrEmpty(HTTP_CORRELATION_ID))) (2)
      );

  return dataAndContext.<String>handle((dac, sink) -> {
      if (dac.getT2().isPresent()) { (3)
        sink.next("PUT <" + dac.getT1() + "> sent to " + url +
            " with header X-Correlation-ID = " + dac.getT2().get());
      }
      else {
        sink.next("PUT <" + dac.getT1() + "> sent to " + url);
      }
        sink.complete();
      })
      .map(msg -> Tuples.of(200, msg));
}
1
实现 ContextViewMono.deferContextual 以及...​
2
在延迟中,提取相关 ID 键的值,作为 Optional
3
如果密钥存在于上下文中,则使用相关 ID 作为标头。


库代码片段将数据 MonoMono.deferContextual(Mono::just) 压缩。这为库提供了一个 Tuple2<String, ContextView> ,并且该上下文包含来自下游的 HTTP_CORRELATION_ID 条目(因为它位于订阅者的直接路径上)。


然后,库代码使用 map 提取该键的 Optional<String> ,如果该条目存在,它将使用传递的相关 ID 作为 X-Correlation-ID 标头。最后一部分由 handle 模拟。


使用相关 ID 验证库代码的整个测试可以编写如下:

@Test
public void contextForLibraryReactivePut() {
  Mono<String> put = doPut("www.example.com", Mono.just("Walter"))
      .contextWrite(Context.of(HTTP_CORRELATION_ID, "2-j3r9afaf92j-afkaf"))
      .filter(t -> t.getT1() < 300)
      .map(Tuple2::getT2);

  StepVerifier.create(put)
              .expectNext("PUT <Walter> sent to www.example.com" +
                  " with header X-Correlation-ID = 2-j3r9afaf92j-afkaf")
              .verifyComplete();
}


9.9.上下文传播支持


从 3.5.0 开始,Reactor-Core 嵌入了对 io.micrometer:context-propagation SPI 的支持。该库旨在作为一种在上下文概念的各种实现之间轻松适应的方法,其中 ContextView / Context 是一个示例,并且在 ThreadLocal 之间变量也是如此。


ReactorContextAccessor 允许 Context-Propagation 库理解 Reactor ContextContextView 。它实现 SPI 并通过 java.util.ServiceLoader 加载。除了依赖reactor-core 和 io.micrometer:context-propagation 之外,不需要用户执行任何操作。 ReactorContextAccessor 类是公共的,但通常不应由用户代码访问。


Reactor-Core 支持 io.micrometer:context-propagation 的两种操作模式:


  • 默认(受限)模式,


  • 以及通过 Hooks.enableAutomaticContextPropagation() 启用的自动模式。请注意,此模式仅适用于新订阅,因此建议在应用程序启动时启用此钩子。


它们的主要区别是在将数据写入 Reactor Context 或访问反映当前附加 Subscriber 状态的上下文中讨论的。 > 用于阅读。


9.9.1.写入 Context


根据各个应用程序,您可能必须将已填充的 ThreadLocal 状态存储为 Context 中的条目,或者可能只需要直接填充 Context

  contextWrite 运算符


当订阅时要作为 ThreadLocal 访问的值不存在(或不需要)时,它们可以立即存储在 Context 中:

// assuming TL is known to Context-Propagation as key TLKEY.
static final ThreadLocal<String> TL = new ThreadLocal<>();

// in the main Thread, TL is not set

Mono.deferContextual(ctx ->
  Mono.delay(Duration.ofSeconds(1))
      // we're now in another thread, TL is not explicitly set
      .map(v -> "delayed ctx[" + TLKEY + "]=" + ctx.getOrDefault(TLKEY, "not found") + ", TL=" + TL.get()))
.contextWrite(ctx -> ctx.put(TLKEY, "HELLO"))
.block(); // returns "delayed ctx[TLKEY]=HELLO, TL=null" in default mode
          // returns "delayed ctx[TLKEY]=HELLO, TL=HELLO" in automatic mode
  contextCapture 运算符


当需要在订阅时捕获 ThreadLocal 值并将这些值反映在 Reactor Context 中时,可以使用此运算符,以便上游运算符受益。


与手动 contextWrite 运算符相比, contextCapture 使用 context-propagation API 获取 ContextSnapshot ,然后使用该快照填充 Reactor Context


因此,如果在订阅阶段有任何 ThreadLocal 值,并且已注册 ThreadLocalAccessor ,那么它们的值现在将存储在 Reactor Context 中并且在运行时在上游操作符中可见。

// assuming TL is known to Context-Propagation as key TLKEY.
static final ThreadLocal<String> TL = new ThreadLocal<>();

// in the main Thread, TL is set to "HELLO"
TL.set("HELLO");

Mono.deferContextual(ctx ->
  Mono.delay(Duration.ofSeconds(1))
      // we're now in another thread, TL is not explicitly set
      .map(v -> "delayed ctx[" + TLKEY + "]=" + ctx.getOrDefault(TLKEY, "not found") + ", TL=" + TL.get()))
.contextCapture() // can be skipped in automatic mode when a blocking operator follows
.block(); // returns "delayed ctx[TLKEY]=HELLO, TL=null" in default mode
          // returns "delayed ctx[TLKEY]=HELLO, TL=HELLO" in automatic mode

在自动模式下,阻塞运算符,例如 Flux#blockFirst()Flux#blockLast()Flux#toIterable()Mono#block()Mono#blockOptional() 、以及相关的重载,都透明地执行 contextCapture() ,所以大多数情况下不需要添加它。


9.9.2.访问 ThreadLocal 状态


从 Reactor-Core 3.5.0 开始, ThreadLocal 状态在一组有限的运算符中恢复。我们将此行为称为默认(受限)模式。在 3.5.3 中,添加了一种新模式,即自动模式,它提供对整个反应链中的 ThreadLocal 值的访问。


Reactor-Core 使用存储在 ContextRegistry 中注册的按键匹配的 ContextThreadLocalAccessor 实例中的值执行 ThreadLocal 状态恢复。


快照恢复的默认模式运算符: handletap


在默认模式下,如果上下文传播库, handletapFluxMono 变体的行为都会略有修改在运行时可用。


也就是说,如果它们的下游 ContextView 不为空,它们将假定已发生上下文捕获(手动或通过 contextCapture() 运算符),并将尝试透明地从该快照恢复 ThreadLocals。 ContextView 中缺少的任何 ThreadLocals 键都保持不变。


这些操作员将确保分别围绕用户提供的代码执行恢复:


  • handle 将把 BiConsumer 包装在一个恢复 `ThreadLocal` 的容器中


  • tap 变体会将 SignalListener 包装成对每个方法具有相同类型包装的变体(这包括 addToContext 方法)


目的是让一组简约的操作员透明地执行恢复。因此,我们选择了具有相当通用和广泛应用的算子(一种具有变革能力,一种具有副作用能力)

//assuming TL is known to Context-Propagation.
static final ThreadLocal<String> TL = new ThreadLocal<>();

//in the main thread, TL is set to "HELLO"
TL.set("HELLO");

Mono.delay(Duration.ofSeconds(1))
  //we're now in another thread, TL is not set yet
  .doOnNext(v -> System.out.println(TL.get()))
  //inside the handler however, TL _is_ restored
  .handle((v, sink) -> sink.next("handled delayed TL=" + TL.get()))
  .contextCapture()
  .block(); // prints "null" and returns "handled delayed TL=HELLO"
 自动模式


在自动模式下,所有算子都会跨 Thread 边界恢复 ThreadLocal 状态。相反,在默认模式下,只有选定的操作员才会这样做。


Hooks.enableAutomaticContextPropagation() 可以在应用程序启动时调用以启用自动模式。请注意,此模式仅适用于新订阅,因此建议在应用程序启动时启用此钩子。


这不是一件容易实现的任务,因为反应流规范使反应链 Thread 变得不可知。然而,Reactor-Core 尽力控制 Thread 开关的来源,并基于 Reactor Context 进行快照恢复,将其视为 ThreadLocal 的真实来源b3>状态。


虽然默认模式将 ThreadLocal 状态限制为仅作为所选运算符的参数执行的用户代码,但自动模式允许 ThreadLocal 状态跨越运算符边界。这需要进行适当的清理,以避免将状态泄漏给重用相同 Thread 的不相关代码。这需要将 ThreadLocalAccessor 注册实例的 Context 中不存在的键视为清除相应 ThreadLocal 状态的信号。这对于空 Context 尤为重要,它会清除已注册 ThreadLocalAccessor 实例的所有状态。


9.9.3。我应该选择哪种模式?


默认模式和自动模式都会对性能产生影响。访问 ThreadLocal 变量可能会显着影响反应式管道。如果最高的可扩展性和性能是目标,则可以考虑更详细的日志记录和显式参数传递方法,而不是依赖 ThreadLocal 状态。如果访问可观测性领域中已建立的库(例如 Micrometer 和 SLF4J)(它们使用 ThreadLocal 状态以方便提供有意义的生产级功能)是一种可以理解的妥协,那么模式的选择是另一种妥协制作。根据应用程序的流程和使用的运算符数量,自动模式可能比默认模式更好或更差。可以给出的唯一建议是衡量应用程序的行为方式以及在呈现预期负载时获得的可扩展性和性能特征。


9.10。处理需要清理的对象


在非常特殊的情况下,您的应用程序可能会处理不再使用时需要某种形式的清理的类型。这是一个高级场景——例如,当您有引用计数对象或处理堆外对象时。 Netty 的 ByteBuf 是两者的一个典型例子。


为了确保正确清理此类对象,您需要在 Flux 逐个 Flux 的基础上考虑它,以及在几个全局挂钩中(请参阅使用全局挂钩):


  • doOnDiscard Flux / Mono 运算符

  •   onOperatorError 钩子

  •   onNextDropped 钩子


  • 特定于操作员的处理程序


这是必需的,因为每个钩子都是在考虑特定的清理子集的情况下创建的,并且用户可能希望(例如)除了 onOperatorError 中的清理逻辑之外还实现特定的错误处理逻辑。


请注意,某些运算符不太适合处理需要清理的对象。例如, bufferWhen 可以引入重叠缓冲区,这意味着我们之前使用的丢弃“本地钩子”可能会将第一个缓冲区视为被丢弃,并清除其中位于第二个缓冲区中的元素,其中它仍然有效。


为了清理的目的,所有这些钩子必须是幂等的。在某些情况下,它们可能会多次应用于同一对象。与执行类级 instanceOf 检查的 doOnDiscard 运算符不同,全局挂钩还处理可以是任何 Object 的实例。由用户的实现来区分哪些实例需要清理,哪些不需要清理。


9.10.1. doOnDiscard 运算符或本地挂钩


这个钩子是专门为清理对象而设置的,否则这些对象永远不会暴露给用户代码。它旨在作为正常情况下运行的流的清理钩子(不是推送太多项目的格式错误的源,由 onNextDropped 覆盖)。


它是本地的,因为它是通过运算符激活的,并且仅适用于给定的 FluxMono


明显的案例包括从上游过滤元素的运营商。这些元素永远不会到达下一个运算符(或最终订阅者),但这是正常执行路径的一部分。因此,它们被传递到 doOnDiscard 挂钩。可以使用 doOnDiscard 挂钩的示例如下:


  • filter :与过滤器不匹配的项目被视为“丢弃”。


  • skip :跳过的项目将被丢弃。


  • buffer(maxSize, skip)maxSize < skip :“丢弃缓冲区”——缓冲区之间的项目被丢弃。


doOnDiscard 不仅限于过滤运算符,还可以被出于反压目的而在内部对数据进行排队的运算符使用。更具体地说,大多数时候,这在取消期间很重要。从源中预取数据并随后根据需要将数据传输给订阅者的操作符在取消时可能会有未发送的数据。此类运算符在取消期间使用 doOnDiscard 钩子来清除其内部背压 Queue


doOnDiscard(Class, Consumer) 的每次调用都与其他调用相加,在某种程度上,它仅由其上游的操作员可见和使用。


9.10.2. onOperatorError 钩子


onOperatorError 钩子旨在以横向方式修改错误(类似于 AOP 捕获并重新抛出)。


当处理 onNext 信号期间发生错误时,正在发出的元素将传递给 onOperatorError


如果该类型的元素需要清理,则需要在 onOperatorError 挂钩中实现它,可能位于错误重写代码之上。


9.10.3。 onNextDropped 钩子


对于格式错误的 Publishers ,可能会出现这样的情况:操作符在预期没有元素时收到了元素(通常是在收到 onErroronComplete 信号之后)。在这种情况下,意外元素会被“丢弃”,即传递到 onNextDropped 挂钩。如果您有需要清理的类型,则必须在 onNextDropped 挂钩中检测这些类型并在那里实现清理代码。


9.10.4。特定于操作员的处理程序


一些处理缓冲区或收集值作为其操作一部分的运算符具有特定的处理程序,用于处理收集的数据未向下游传播的情况。如果将此类运算符与需要清理的类型一起使用,则需要在这些处理程序中执行清理。


例如, distinct 有这样一个回调,当运算符终止(或被取消)时会调用该回调,以清除它用来判断元素是否不同的集合。默认情况下,集合是 HashSet ,清理回调是 HashSet::clear 。但是,如果您处理引用计数对象,您可能希望将其更改为更复杂的处理程序,在调用 clear() 之前先 release 集合中的每个元素。

 9.11。空安全


尽管 Java 不允许使用其类型系统表达 null 安全性,但 Reactor 现在提供注释来声明 API 的 null 性,类似于 Spring Framework 5 提供的注释。


Reactor 使用这些注释,但它们也可以在任何基于 Reactor 的 Java 项目中使用来声明空安全 API。方法体内使用的类型的可为空性超出了此功能的范围。


这些注释使用 JSR 305 注释(IntelliJ IDEA 等工具支持的休眠 JSR)进行元注释,以向 Java 开发人员提供与 null 安全相关的有用警告,以避免在运行时出现 NullPointerException 。 JSR 305 元注释允许工具供应商以通用方式提供空安全支持,而无需对 Reactor 注释进行硬编码支持。


对于 Kotlin 1.1.5+,没有必要也不建议在项目类路径中依赖 JSR 305。


Kotlin 也使用它们,Kotlin 本身支持空安全。有关更多详细信息,请参阅此专用部分。


reactor.util.annotation 包中提供了以下注释:


  • @NonNull :表示特定参数、返回值或字段不能为 null 。 ( @NonNullApi 适用的参数和返回值不需要它)。


  • @Nullable :表示参数、返回值或字段可以是 null


  • @NonNullApi :指示非 null 的包级注释是参数和返回值的默认行为。


尚不支持泛型类型参数、变量参数和数组元素的可为空性。请参阅问题 #878 了解最新信息。


建议编辑“高级功能和概念”


附录 A:我需要哪个操作员?


在本节中,如果运算符特定于 Flux 或 Mono,则会相应地添加前缀和链接,如下所示:Flux#fromArray。常见运算符没有前缀,并且提供了两种实现的链接,例如: just (Flux| Mono)。当特定用例由运算符组合覆盖时,它会以方法调用的形式呈现,并带有前导点和括号中的参数,如下所示: .methodCall(parameter)


我想处理:


A.1.创建新序列...​


  • 发出 T ,我已经有了: just ( Flux| Mono)


    • …​来自可选<T>:Mono#justOrEmpty(Optional<T>)


    • ...​来自潜在的 null T:Mono#justOrEmpty(T)


  • 也发出由方法 just ( Flux| Mono) 返回的 T


    • …但懒惰地捕获:使用 Mono#fromSupplier 或将 just ( Flux| Mono) 包裹在 defer ( Flux| Mono) 内


  • 发出几个 T 我可以明确枚举: Flux#just(T…​)

  •  迭代:

    •  数组:Flux#fromArray


    • 集合或可迭代: Flux#fromIterable


    • 整数范围:Flux#range


    • 为每个订阅提供的流: Flux#fromStream(Supplier<Stream>)


  • 从各种单值源发出,例如:


    • 供应商<T>:Mono#fromSupplier


    • 任务:Mono#fromCallable、Mono#fromRunnable


    • CompletableFuture<T>: Mono#fromFuture


  • 完成: empty ( Flux| Mono)


  • 立即出错: error ( Flux| Mono)


    • …​但懒惰地构建 Throwable: error(Supplier<Throwable>) ( Flux| Mono)


  • 从不执行任何操作: never ( Flux| Mono)


  • 这是在订阅时决定的: defer ( Flux| Mono)


  • 这取决于一次性资源: using ( Flux| Mono)


  • 以编程方式生成事件(可以使用状态):


    • 同步且一对一: Flux#generate


    • 异步(也可以是同步),一次可以多次发射: Flux#create (Mono#create 也是如此,没有多次发射方面)


A2。转换现有序列


  • 我想转换现有数据:


    • 在一对一的基础上(例如,字符串的长度): map (Flux | Mono)


      • ...​只需转换它: cast ( Flux| Mono)


      • …​为了具体化每个源值的索引:Flux#index


    • 在一对多的基础上(例如,字符串到其字符): flatMap ( Flux| Mono) + 使用工厂方法


    • 在一对多的基础上,每个源元素和/或状态都有编程行为: handle ( Flux| Mono)


    • 为每个源项目运行异步任务(例如http请求的url): flatMap (Flux | Mono)+异步发布者返回方法


      • ...​忽略一些数据:有条件地在 flatMap lambda 中返回 Mono.empty()


      • ...保留原始序列顺序:Flux#flatMapSequential(这会立即触发异步进程,但会对结果重新排序)


      • …​其中异步任务可以从 Mono 源返回多个值:Mono#flatMapMany


  • 我想将预设元素添加到现有序列中:


    • 在开始处: Flux#startWith(T…​)


    • 最后: Flux#concatWithValues(T…​)


  • 我想聚合一个Flux:(下面假设 Flux# 前缀)


    • 进入列表:collectList、collectSortedList


    • 进入 Map:collectMap、collectMultiMap


    • 放入任意容器中:收集


    • 进入序列的大小:count


    • 通过在每个元素之间应用函数(例如运行总和):reduce


      • ...但发出每个中间值:扫描


    • 从谓词转换为布尔值:


      • 应用于所有值(AND):全部


      • 应用于至少一个值 (OR):任意


      • 测试是否存在任何值:hasElements(hasElement 中有一个 Mono 等效项)


      • 测试特定值是否存在:hasElement(T)


  • 我想合并出版商...​


    • 按顺序: Flux#concat 或 .concatWith(other) ( Flux| Mono)


      • …但延迟任何错误,直到发出剩余的发布者:Flux#concatDelayError


      • …​但热切订阅后续发布者:Flux#mergeSequential


    • 按发射顺序(组合项目出现时发射): Flux#merge / .mergeWith(other) ( Flux| Mono)


      • ...​具有不同类型(转换合并):Flux#zip / Flux#zipWith

    •  通过配对值:


      • 从 2 个 Mono 到 Tuple2:Mono#zipWith


      • 来自 n 个 Mono,当它们全部完成时:Mono#zip


    • 通过协调终止:


      • 从 1 Mono 和任何源到 Mono<Void>: Mono#and


      • 来自 n 个来源,当它们全部完成时:Mono#when


      • 进入任意容器类型:


        • 每次所有各方都发出: Flux#zip (直到最小基数)


        • 每次有新值到达任一侧时: Flux#combineLatest


    • 选择第一个出版商...​


      • 产生一个值 ( onNext ): firstWithValue ( Flux| Mono)


      • 产生任何信号: firstWithSignal ( Flux| Mono)


    • 由源序列中的元素触发:switchMap(每个源元素映射到一个发布者)


    • 由发布者序列中下一个发布者的启动触发:switchOnNext


  • 我想重复现有的序列: repeat ( Flux| Mono)


    • …​但每隔一段时间: Flux.interval(duration).flatMap(tick → myExistingPublisher)


  • 我有一个空序列但是......​


    • 我想要一个值: defaultIfEmpty ( Flux| Mono)


    • 我想要另一个序列: switchIfEmpty ( Flux| Mono)


  • 我有一个序列,但我对值不感兴趣: ignoreElements ( Flux.ignoreElements()| Mono.ignoreElement())


    • ...我希望完成表示为 Mono: then ( Flux| Mono)


    • ...我想等待另一个任务最后完成: thenEmpty ( Flux| Mono)


    • ...我想在最后切换到另一个 Mono:Mono#then(mono)


    • ...我想在最后发出一个值:Mono#thenReturn(T)


    • ...我想在最后切换到 Flux: thenMany (Flux| Mono)


  • 我有一个 Mono,我想推迟完成......​


    • …​直到从该值派生的另一个发布者完成:Mono#delayUntil(Function)


  • 我想将元素递归地扩展为序列图并发出组合......​


    • ...​首先扩展图表宽度: expand(Function) (Flux | Mono)


    • …​首先扩展图深度: expandDeep(Function) ( Flux| Mono)


A.3.窥探序列


  • 在不修改最终序列的情况下,我想:


    • 收到关于以下方面的附加行为(有时称为“副作用”)的通知/执行附加行为:


      • 排放量: doOnNext (通量|单声道)


      • 完成: Flux#doOnComplete、Mono#doOnSuccess (包括结果,如果有的话)


      • 错误终止: doOnError ( Flux| Mono)


      • 取消: doOnCancel (Flux | Mono)


      • 序列的“开始”: doFirst ( Flux| Mono)


        • 这与 Publisher#subscribe(Subscriber) 相关


      • 订阅后: doOnSubscribe (Flux | Mono)


        • Subscription subscribe 之后确认


        • 这与 Subscriber#onSubscribe(Subscription) 相关


      • 请求: doOnRequest (通量|单声道)


      • 完成或错误: doOnTerminate (Flux | Mono)


        • 但在它向下游传播之后: doAfterTerminate ( Flux| Mono)


      • 任何类型的信号,表示为 Signal: doOnEach ( Flux| Mono)


      • 任何终止条件(完成、错误、取消): doFinally ( Flux| Mono)


    • 记录内部发生的情况: log ( Flux| Mono)


  • 我想知道所有事件:


    • 每个都表示为 Signal 对象:


      • 在序列外部的回调中: doOnEach ( Flux| Mono)


      • 而不是原来的 onNext 排放: materialize ( Flux| Mono)


        • ...然后回到 onNexts: dematerialize ( Flux| Mono)


    • 作为日志中的一行: log ( Flux| Mono)


A.4.过滤序列


  • 我想过滤一个序列:


    • 基于任意标准: filter ( Flux| Mono)


      • …​这是异步计算的: filterWhen ( Flux| Mono)


    • 限制发射对象的类型: ofType ( Flux| Mono)


    • 完全忽略这些值: ignoreElements ( Flux.ignoreElements()| Mono.ignoreElement())

    •  通过忽略重复项:


      • 在整个序列(逻辑集)中: Flux#distinct


      • 随后发出的项目之间(重复数据删除): Flux#distinctUntilChanged


  • 我只想保留序列的一个子集:


    • 取 N 个元素:


      • 在序列的开头: Flux#take(long)


        • …​向上游请求无限量:Flux#take(long, false)


        • …​基于持续时间:Flux#take(Duration)


        • ...仅第一个元素,作为 Mono:Flux#next()


      • 在序列末尾:Flux#takeLast


      • 直到满足条件(包括):Flux#takeUntil(基于谓词)、Flux#takeUntilOther(基于同伴发布者)


      • 当满足条件时(不包括):Flux#takeWhile


    • 最多取 1 个元素:


      • 在特定位置:Flux#elementAt

      •  最后:.takeLast(1)


        • ...如果为空则发出错误:Flux#last()


        • …​如果为空则发出默认值:Flux#last(T)

    •  通过跳过元素:


      • 在序列的开头: Flux#skip(long)


        • …​基于持续时间:Flux#skip(Duration)


      • 在序列末尾: Flux#skipLast


      • 直到满足条件(包括):Flux#skipUntil(基于谓词)、Flux#skipUntilOther(基于同伴发布者)


      • 当满足条件时(不包括): Flux#skipWhile

    •  按抽样项目:


      • 按持续时间: Flux#sample(Duration)


        • 但保留采样窗口中的第一个元素而不是最后一个:sampleFirst


      • 通过基于发布者的窗口:Flux#sample(Publisher)


      • 基于发布者“超时”:Flux#sampleTimeout(每个元素都会触发一个发布者,如果该发布者不与下一个发布者重叠,则发出该元素)


  • 我预计最多 1 个元素(如果超过一个则错误)...​


    • 如果序列为空,我想要一个错误: Flux#single()


    • 如果序列为空,我想要一个默认值: Flux#single(T)


    • 我也接受一个空序列:Flux#singleOrEmpty

 A.5.处理错误


  • 我想创建一个错误序列: error ( Flux| Mono)...​


    • ...​替换成功 Flux 的完成: .concat(Flux.error(e))


    • ...​替换成功的 Mono 的发射: .then(Mono.error(e))


    • …​如果 onNexts 之间的时间间隔过长: timeout ( Flux| Mono)


    • …​懒惰地: error(Supplier<Throwable>) (Flux | Mono)


  • 我想要的 try/catch 相当于:


    • 投掷: error ( Flux| Mono)

    •  捕获异常:


      • 并回退到默认值: onErrorReturn ( Flux| Mono)


      • 并吞下错误(即完成): onErrorComplete ( Flux| Mono)


      • 并回退到另一个 Flux 或 Mono: onErrorResume ( Flux| Mono)


      • 包装和重新抛出: .onErrorMap(t → new RuntimeException(t)) ( Flux| Mono)


    • 最后块: doFinally ( Flux| Mono)


    • Java 7 中的使用模式: using ( Flux| Mono) 工厂方法


  • 我想从错误中恢复...​

    •  通过回落:


      • 到一个值: onErrorReturn (Flux | Mono)


      • 完成(“吞掉”错误): onErrorComplete ( Flux| Mono)


      • 发布者或 Mono,根据错误可能不同: Flux#onErrorResume 和 Mono#onErrorResume

    •  通过重试...​


      • ...​使用简单的策略(最大尝试次数): retry() (Flux | Mono), retry(long) (Flux | Mono)


      • ...​由配套控件 Flux 触发: retryWhen ( Flux| Mono)


      • …​使用标准退避策略(带抖动的指数退避): retryWhen(Retry.backoff(…​)) ( Flux| Mono) (另请参阅 Retry 中的其他工厂方法)


  • 我想处理背压“错误”(来自上游的最大请求,并在下游没有产生足够的请求时应用该策略)...​


    • 通过抛出一个特殊的 IllegalStateException:Flux#onBackPressureError


    • 通过删除多余的值:Flux#onBackPressureDrop


      • …​除了最后一个看到的:Flux#onBackPressureLatest


    • 通过缓冲多余的值(有界或无界):Flux#onBackPressureBuffer


      • …​并在有界缓冲区溢出时应用策略:带有 BufferOverflowStrategy 的 Flux#onBackPressureBuffer


A.6.与时间一起工作


  • 我想将排放与测量的时间联系起来......​


    • ...​具有最佳可用精度和所提供数据的多功能性: timed (Flux | Mono)


      • 自上次 onNext 以来的持续时间为 Timed#elapsed()


      • Timed<T>#timestamp() 用于即时表示纪元时间戳(毫秒分辨率)


      • Timed<T>#elapsedSinceSubcription() 表示自订阅以来的持续时间(而不是上次 onNext)


      • 对于经过的持续时间可以有纳秒分辨率


    • ...​作为(遗留)Tuple2<Long, T>...​


      • 自上次 onNext: elapsed ( Flux| Mono)


      • 自创世以来(好吧,计算机时间): timestamp (Flux | Mono)


  • 如果发射之间有太多延迟,我希望我的序列被中断: timeout ( Flux| Mono)


  • 我想从时钟中获取滴答声,定期的时间间隔:Flux#interval


  • 我想在初始延迟后发出一个 0 :static Mono.delay。


  • 我想引入一个延迟:


    • 每个 onNext 信号之间:Mono#delayElement、Flux#delayElements


    • 订阅发生之前: delaySubscription ( Flux| Mono)

 A.7.分割通量


  • 我想通过边界标准将 Flux 拆分为 Flux<Flux<T>>

    •  大小:窗口(int)


      • ...​窗口重叠或下降:window(int, int)

    •  时间窗的个数(Duration)


      • ...​窗口重叠或下降:window(Duration, Duration)


    • 大小或时间(达到计数或超时时窗口关闭):windowTimeout(int, Duration)


    • 基于元素的谓词:windowUntil


      • ......发射触发下一个窗口中边界的元素( cutBefore 变体):.windowUntil(predicate, true)


      • …​在元素匹配谓词时保持窗口打开:windowWhile(不发出不匹配的元素)


    • 由控件 Publisher 中的 onNexts 表示的任意边界驱动:window(Publisher)、windowWhen


  • 我想将边界内的 Flux<T> 和缓冲区元素拆分在一起......​

    •  进入列表:


      • 通过大小边界: buffer(int)


        • ...​具有重叠或删除缓冲区: buffer(int, int)


      • 通过持续时间边界: buffer(Duration)


        • …​具有重叠或丢弃缓冲区: buffer(Duration, Duration)


      • 通过大小或持续时间边界: bufferTimeout(int, Duration)


      • 通过任意标准边界: bufferUntil(Predicate)


        • …​将触发边界的元素放入下一个缓冲区中:.bufferUntil(predicate, true)


        • …​在谓词匹配时进行缓冲并删除触发边界的元素:bufferWhile(Predicate)


      • 由控件 Publisher 中的 onNexts 表示的任意边界驱动: buffer(Publisher), bufferWhen


    • 进入任意“集合”类型 C :使用像 buffer(int,Supplier) 这样的变体


  • 我想拆分 Flux ,以便共享特征的元素最终出现在相同的子通量中: groupBy(Function) 提示:请注意,这会返回一个 Flux<GroupedFlux<K, T>> ,每个内部 GroupedFlux 共享可通过 key() 访问的相同 K 键。


A.8.回到同步世界


注意:如果从标记为“仅非阻塞”的调度程序(默认情况下为parallel() 和single())调用,除Mono#toFuture 之外的所有这些方法都会抛出UnsupportedOperatorException。


  • 我有一个 Flux<T>,我想:


    • 阻塞直到我可以获得第一个元素: Flux#blockFirst


      • …​有超时:Flux#blockFirst(Duration)


    • 阻塞直到我可以获得最后一个元素(如果为空则为 null): Flux#blockLast


      • …​有超时:Flux#blockLast(Duration)


    • 同步切换到 Iterable<T>: Flux#toIterable


    • 同步切换到 Java 8 Stream<T>:Flux#toStream


  • 我有一个 Mono<T>,我想要:


    • 阻塞直到我能得到值:Mono#block


      • …​有超时:Mono#block(Duration)


    • CompletableFuture<T>: Mono#toFuture


A.9.将 Flux 多播到多个订阅者


  • 我想将多个订阅者连接到一个 Flux:


    • 并决定何时使用 connect() 触发源:publish()(返回 ConnectableFlux)


    • 并立即触发源(后期订阅者看到稍后的数据): share() (Flux | Mono)


    • 当有足够的订阅者注册时永久连接源:.publish().autoConnect(n)


    • 当订阅者高于/低于阈值时自动连接和取消源:.publish().refCount(n)


      • …​但在取消之前给新订阅者一个进入的机会:.publish().refCount(n, Duration)


  • 我想缓存来自发布者的数据并将其重播给后来的订阅者:


    • 最多 n 个元素:cache(int)


    • 缓存在持续时间(生存时间)内看到的最新元素: cache(Duration) (Flux | Mono)


      • …​但保留不超过 n 个元素:cache(int, Duration)


    • 但没有立即触发源:Flux#replay(返回 ConnectableFlux)


建议编辑“我需要哪个运营商?”


附录 B:如何阅读弹珠图?


当我们介绍 FluxMono 时,我们展示了一个“大理石图”的示例。这些可以在整个 javadoc 中找到,以便以更直观的方式解释操作符的行为。


在本节中,我们将更深入地研究 Reactor 文档针对这些弹珠图所使用的约定。首先,让我们看看最常见的运算符模式是如何表示的。


一些运算符是实例方法:它们的输出是通过调用源 Flux 实例(如 Flux<T> output = source.fluxOperator() )上的方法生成的:

A common operator


其他运算符是静态方法。他们仍然可以将源作为输入参数,例如 Flux<T> output = Flux.merge(sourceFlux1, sourcePublisher2) 。这些表示如下:

A static operator


请注意,有时我们根据运算符的输入表示多个变体或行为,在这种情况下,有一个运算符“框”,但源变体和输出变体是分开的,如下所示:

An operator with two examples of input


这些是基本情况,但一些运算符显示稍微更高级的模式。


例如, ParallelFlux 创建多个轨,因此它们有多个输出 Flux 。它们依次表示,如下图所示:

A parallel operator


窗口运算符生成 Flux<Flux<T>> :主 Flux 通知每个窗口打开,而内部 Flux 表示窗口内容和终止。窗口表示为主 Flux 的分支,如下图所示:

The output of a windowing operator


有时,操作员将“同伴发布者”作为输入( FluxMono 或任意反应流 Publisher )。此类同伴发布者帮助定制操作员的行为,操作员将使用同伴的一些信号作为其自身内部行为的触发器。它们如下图所示:

An operator with a companion Publisher


现在我们已经了解了最常见的运算符模式,让我们展示 FluxMono 中可能发生的所有不同信号、事件和元素的图形表示:

All types of signals and events


最后,以同样的方式,我们有副作用的图形表示,它与反应流信号一起发生:

Side effects: representation of doOn* handlers
Side effects: in a diagram


建议编辑“如何阅读弹珠图?”


附录 C:常见问题解答、最佳实践和“我如何……​?”


本节涵盖以下内容:


C.1.如何结束同步、阻塞调用?


通常情况下,信息源是同步且阻塞的。要在 Reactor 应用程序中处理此类源,请应用以下模式:

Mono blockingWrapper = Mono.fromCallable(() -> { (1)
    return /* make a remote synchronous call */ (2)
});
blockingWrapper = blockingWrapper.subscribeOn(Schedulers.boundedElastic()); (3)
1
使用 fromCallable 创建新的 Mono
2
返回异步、阻塞资源。
3
确保每个订阅均由 Schedulers.boundedElastic() 的专门工作人员进行。


您应该使用 Mono ,因为源返回一个值。您应该使用 Schedulers.boundedElastic ,因为它会创建一个专用线程来等待阻塞资源,而不会影响其他非阻塞处理,同时还确保可以创建的线程数量有限制,并且在峰值期间可以排队和推迟的阻塞任务。


请注意, subscribeOn 不订阅 Mono 。它指定当订阅调用发生时使用哪种 Scheduler


另请注意, subscribeOn 运算符应紧跟在源代码之后,任何其他运算符都在 subscribeOn 包装器之后定义。


C.2.我在 Flux 上使用了运算符,但它似乎不适用。是什么赋予了?


确保您 .subscribe() 所使用的变量已受到您认为应该应用于它的运算符的影响。


反应堆操作员是装饰者。它们返回一个不同的实例,该实例包装源序列并添加行为。这就是为什么使用运算符的首选方法是链接调用。


比较以下两个例子:


示例 25. 没有链接(不正确)
Flux<String> flux = Flux.just("something", "chain");
flux.map(secret -> secret.replaceAll(".", "*")); (1)
flux.subscribe(next -> System.out.println("Received: " + next));
1
错误就在这里。结果不附加到 flux 变量。

示例 26. 没有链接(正确)
Flux<String> flux = Flux.just("something", "chain");
flux = flux.map(secret -> secret.replaceAll(".", "*"));
flux.subscribe(next -> System.out.println("Received: " + next));


下面的示例更好(因为它更简单):


示例 27. 使用链接(最佳)
Flux.just("something", "chain")
    .map(secret -> secret.replaceAll(".", "*"))
    .subscribe(next -> System.out.println("Received: " + next));


第一个版本输出以下内容:

Received: something
Received: chain


另外两个版本输出期望值,如下:

Received: *********
Received: *****


C.3.我的 Mono zipWithzipWhen 从未被调用


考虑以下示例:

myMethod.process("a") // this method returns Mono<Void>
        .zipWith(myMethod.process("b"), combinator) //this is never called
        .subscribe();


如果源 MonoemptyMono<Void>Mono<Void> 对于所有意图和目的都是空的),则某些组合永远不会叫。


这是任何转换器的典型情况,例如 zip 静态方法或 zipWith zipWhen 运算符,它们(根据定义)需要每个源中的一个元素来产生他们的输出。


因此,在 zip 源上使用数据抑制运算符是有问题的。数据抑制运算符的示例包括 then()thenEmpty(Publisher<Void>)ignoreElements()ignoreElement() 以及 when(Publisher…​)


类似地,使用 Function<T,?> 来调整其行为的运算符(例如 flatMap )需要至少发出一个元素, Function 才有机会申请。将它们应用于空(或 <Void> )序列永远不会产生元素。


您可以使用 .defaultIfEmpty(T).switchIfEmpty(Publisher<T>)T 的空序列替换为默认值或后备 Publisher<T> (分别),这可以帮助避免其中一些情况。请注意,这不适用于 Flux<Void> / Mono<Void> 源,因为您只能切换到另一个 Publisher<Void> ,它仍然保证为空。以下示例使用 defaultIfEmpty


示例 28. 在 zipWhen 之前使用 defaultIfEmpty
myMethod.emptySequenceForKey("a") // this method returns empty Mono<String>
        .defaultIfEmpty("") // this converts empty sequence to just the empty String
        .zipWhen(aString -> myMethod.process("b")) //this is called with the empty String
        .subscribe();


C.4.将 zip 与空完成的发布者一起使用


当将 zip 运算符与空完成的发布者(即完成但不发出项目的发布者)一起使用时,了解以下行为非常重要。


考虑以下测试用例:

    @Test
    public void testZipEmptyCompletionAllSubscribed() {
        AtomicInteger cnt = new AtomicInteger();
        Mono<Integer> mono1 = Mono.create(sink -> {
            cnt.incrementAndGet();
            sink.success();
        });
        Mono<Integer> mono2 = Mono.create(sink -> {
            cnt.incrementAndGet();
            sink.success();
        });
        Mono<Integer> zippedMono = Mono.zip(mono1, mono2, (v1, v2) -> v1);
        zippedMono.subscribe();
        assertEquals(2, cnt.get());
    }


虽然在这种情况下,生成的 zippedMono 订阅了 mono1mono2 ,但并不保证所有情况下都会出现这种行为。例如,考虑以下测试用例:

    @Test
    public void testZipEmptyCompletionOneSubscribed() {
        AtomicInteger cnt = new AtomicInteger();
        Mono<Integer> mono1 = Mono.create(sink -> {
            cnt.incrementAndGet();
            sink.success();
        });
        Mono<Integer> mono2 = Mono.create(sink -> {
            cnt.incrementAndGet();
            sink.success();
        });
        Mono<Integer> mono3 = Mono.create(sink -> {
            cnt.incrementAndGet();
            sink.success();
        });
        Mono<Integer> zippedMono = Mono.zip(mono1, Mono.zip(mono2, mono3, (v1, v2) -> v1), (v1, v2) -> v1);
        zippedMono.subscribe();
        assertEquals(1, cnt.get());
    }


在这种情况下,在 mono1 空完成时, zippedMono 立即完成并且不会订阅 mono2mono3


因此,在使用 zip 运算符来组合空完成的发布者的情况下,不能保证生成的发布者将订阅所有空完成的发布者。


如果需要保留第二个测试用例中所示的语义并确保订阅所有要压缩的发布者,请考虑使用 singleOptional 运算符,如下面的测试用例所示:

@Test
public void testZipOptionalAllSubscribed() {
	AtomicInteger cnt = new AtomicInteger();
	Mono<Integer> mono1 = Mono.create(sink -> {
		cnt.incrementAndGet();
		sink.success();
	});
	Mono<Integer> mono2 = Mono.create(sink -> {
		cnt.incrementAndGet();
		sink.success();
	});
	Mono<Integer> mono3 = Mono.create(sink -> {
		cnt.incrementAndGet();
		sink.success();
	});
	Mono<Optional<Integer>> zippedMono =
			Mono.zip(
					mono1.singleOptional(),
					Mono.zip(mono2.singleOptional(), mono3.singleOptional(), (v1, v2) -> v1),
					(v1, v2) -> v1);
	zippedMono.subscribe();
	assertEquals(3, cnt.get());
}


C.5。如何使用 retryWhen 来模拟 retry(3)


retryWhen 运算符可能非常复杂。希望以下代码片段可以通过尝试模拟更简单的 retry(3) 来帮助您理解它的工作原理:

AtomicInteger errorCount = new AtomicInteger();
Flux<String> flux =
		Flux.<String>error(new IllegalArgumentException())
				.doOnError(e -> errorCount.incrementAndGet())
				.retryWhen(Retry.from(companion -> (1)
						companion.map(rs -> { (2)
							if (rs.totalRetries() < 3) return rs.totalRetries(); (3)
							else throw Exceptions.propagate(rs.failure()); (4)
						})
				));
1
我们通过改编 Function lambda 来自定义 Retry ,而不是提供具体的类
2
同伴发出 RetrySignal 对象,该对象包含迄今为止的重试次数和最后一次失败的次数
3
为了允许三次重试,我们考虑索引 < 3 并返回一个要发出的值(这里我们只是返回索引)。
4
为了终止错误的序列,我们在这三次重试之后抛出原始异常。


C.6。如何使用 retryWhen 进行指数退避?


指数退避会产生重试尝试,每次尝试之间的延迟会越来越大,以免源系统过载并面临全面崩溃的风险。理由是,如果源产生错误,它就已经处于不稳定状态,并且不太可能立即从中恢复。因此盲目地立即重试可能会产生另一个错误并增加不稳定性。


3.3.4.RELEASE 开始,Reactor 附带了一个用于此类重试的构建器,与 Flux#retryWhenRetry.backoff 一起使用。


以下示例展示了构建器的简单使用,在重试尝试延迟之前和之后使用钩子记录消息。它延迟重试并增加每次尝试之间的延迟(伪代码:delay = 100ms * 2^attempt_number_starting_at_zero):

AtomicInteger errorCount = new AtomicInteger();
Flux<String> flux =
Flux.<String>error(new IllegalStateException("boom"))
		.doOnError(e -> { (1)
			errorCount.incrementAndGet();
			System.out.println(e + " at " + LocalTime.now());
		})
		.retryWhen(Retry
				.backoff(3, Duration.ofMillis(100)).jitter(0d) (2)
				.doAfterRetry(rs -> System.out.println("retried at " + LocalTime.now() + ", attempt " + rs.totalRetries())) (3)
				.onRetryExhaustedThrow((spec, rs) -> rs.failure()) (4)
		);
1
我们将记录源发出错误的时间并对其进行计数。
2
我们配置指数退避重试,最多尝试 3 次且无抖动。
3
我们还记录重试发生的时间以及重试尝试次数(从 0 开始)。
4
默认情况下,将引发 Exceptions.retryExhausted 异常,并以最后一个 failure() 作为原因。在这里,我们将其自定义为直接将原因发出为 onError


订阅后,此操作失败并在打印出以下内容后终止:

java.lang.IllegalStateException: boom at 00:00:00.0
retried at 00:00:00.101, attempt 0 (1)
java.lang.IllegalStateException: boom at 00:00:00.101
retried at 00:00:00.304, attempt 1 (2)
java.lang.IllegalStateException: boom at 00:00:00.304
retried at 00:00:00.702, attempt 2 (3)
java.lang.IllegalStateException: boom at 00:00:00.702
1
大约100ms后第一次重试
2
约200ms后第二次重试
3
约 400ms 后第三次重试


C.7.使用 publishOn() 时如何保证线程亲和性?


如调度程序中所述, publishOn() 可用于切换执行上下文。 publishOn 运算符会影响线程上下文,在该线程上下文中,其下面的链中的其余运算符将在该上下文中运行,直到出现新的 publishOn 为止。所以 publishOn 的位置很重要。


考虑以下示例:

Sinks.Many<Integer> dataSinks = Sinks.many().unicast().onBackpressureBuffer();
Flux<Integer> source = dataSinks.asFlux();
source.publishOn(scheduler1)
	  .map(i -> transform(i))
	  .publishOn(scheduler2)
	  .doOnNext(i -> processNext(i))
	  .subscribe();


map() 中的 transform 函数在 scheduler1 的工作线程上运行,而 doOnNext() 中的 processNext 方法是在 scheduler2 的工作线程上运行。每个订阅都有自己的工作线程,因此推送到相应订阅者的所有元素都发布在同一个 Thread 上。


您可以使用单线程调度程序来确保链中不同阶段或不同订阅者的线程关联性。


C.8.什么是上下文日志记录的好模式? (MDC)


大多数日志记录框架都允许上下文日志记录,让用户通常通过称为 MDC(“映射诊断上下文”)的 Map 来存储反映在日志记录模式中的变量。这是 Java 中 ThreadLocal 最频繁使用的情况之一,因此该模式假设正在记录的代码与 Thread 建立了一对一的关系。 。


在 Java 8 之前,这可能是一个安全的假设,但随着 Java 语言中函数式编程元素的出现,情况发生了一些变化……​


让我们以一个命令式 API 为例,该 API 使用模板方法模式,然后切换到更实用的风格。在模板方法模式中,继承发挥了作用。现在,在更实用的方法中,传递高阶函数来定义算法的“步骤”。现在,事情变得更加声明性而不是命令性,这使得库可以自由地决定每个步骤应该在哪里运行。例如,知道底层算法的哪些步骤可以并行化,库可以使用 ExecutorService 并行执行某些步骤。


这种函数式 API 的一个具体示例是 Java 8 中引入的 Stream API 及其 parallel() 风格。在并行 Stream 中使用 MDC 进行日志记录并不是免费的午餐:需要确保在每个步骤中捕获并重新应用 MDC。


函数式风格可以实现此类优化,因为每个步骤都是与线程无关且引用透明的,但它可能会打破 MDC 对单个 Thread 的假设。确保所有阶段都可以访问任何类型的上下文信息的最惯用的方法是通过组合链传递该上下文。在 Reactor 的开发过程中,我们遇到了同样的一般问题,我们希望避免这种非常简单和明确的方法。这就是引入 Context 的原因:只要使用 FluxMono 作为返回值,它就会通过执行链传播,通过让阶段(运算符)查看其下游阶段的 Context 。因此,Reactor 没有使用 ThreadLocal ,而是提供了这种类似于地图的对象,该对象绑定到 Subscription 而不是 Thread


既然我们已经确定 MDC“正常工作”并不是在声明式 API 中做出的最佳假设,那么我们如何执行与响应流中的事件相关的上下文日志语句( onNextonErroronComplete )?


当人们想要以直接和明确的方式记录与这些信号相关的信息时,常见问题解答的此条目提供了一种可能的中间解决方案。请务必事先阅读“向反应序列添加上下文”部分,尤其是如何向操作符链的底部进行写入,以便其上方的操作符能够看到它。


要从 Context 向 MDC 获取上下文信息,最简单的方法是使用一点样板代码将日志记录语句包装在 doOnEach 运算符中。该样板文件取决于您选择的日志框架/抽象以及您想要放入 MDC 中的信息,因此它必须位于您的代码库中。


以下是围绕单个 MDC 变量的此类辅助函数的示例,重点是使用 Java 9 增强的 Optional API 记录 onNext 事件:

public static <T> Consumer<Signal<T>> logOnNext(Consumer<T> logStatement) {
	return signal -> {
		if (!signal.isOnNext()) return; (1)
		Optional<String> toPutInMdc = signal.getContextView().getOrEmpty("CONTEXT_KEY"); (2)

		toPutInMdc.ifPresentOrElse(tpim -> {
			try (MDC.MDCCloseable cMdc = MDC.putCloseable("MDC_KEY", tpim)) { (3)
				logStatement.accept(signal.get()); (4)
			}
		},
		() -> logStatement.accept(signal.get())); (5)
	};
}
1
doOnEach 信号包括 onCompleteonError 。在此示例中,我们只对记录 onNext 感兴趣
2
我们将从 Reactor Context 中提取一个有趣的值(请参阅 Context API 部分)
3
在此示例中,我们使用 SLF4J 2 中的 MDCCloseable ,允许 try-with-resource 语法在执行日志语句后自动清理 MDC
4
正确的日志语句由调用者作为 Consumer<T> ( onNext 值的使用者)提供
5
如果 Context 中未设置预期的密钥,我们将使用替代路径,在 MDC 中不放置任何内容


使用此样板代码可确保我们成为 MDC 的好公民:我们在执行日志记录语句之前设置一个密钥,并在执行后立即将其删除。后续日志记录语句不存在污染 MDC 的风险。


当然,这是一个建议。您可能有兴趣从 Context 中提取多个值或记录 onError 的情况。您可能希望为这些情况创建额外的辅助方法,或者设计一个使用额外 lambda 来覆盖更多内容的单一方法。


无论如何,上述辅助方法的用法可能类似于以下反应式 Web 控制器:

@GetMapping("/byPrice")
public Flux<Restaurant> byPrice(@RequestParam Double maxPrice, @RequestHeader(required = false, name = "X-UserId") String userId) {
	String apiId = userId == null ? "" : userId; (1)

	return restaurantService.byPrice(maxPrice))
			   .doOnEach(logOnNext(r -> LOG.debug("found restaurant {} for ${}", (2)
					r.getName(), r.getPricePerPerson())))
			   .contextWrite(Context.of("CONTEXT_KEY", apiId)); (3)
}
1
我们需要从请求头中获取上下文信息并将其放入 Context
2
在这里,我们使用 doOnEach 将辅助方法应用于 Flux 。请记住:操作员会看到在其下方定义的 Context 值。
3
我们使用所选键 CONTEXT_KEY 将标头中的值写入 Context


在此配置中, restaurantService 可以在共享线程上发出其数据,但日志仍将为每个请求引用正确的 X-UserId


为了完整起见,我们还可以看到错误记录助手的样子:

public static Consumer<Signal<?>> logOnError(Consumer<Throwable> errorLogStatement) {
	return signal -> {
		if (!signal.isOnError()) return;
		Optional<String> toPutInMdc = signal.getContextView().getOrEmpty("CONTEXT_KEY");

		toPutInMdc.ifPresentOrElse(tpim -> {
			try (MDC.MDCCloseable cMdc = MDC.putCloseable("MDC_KEY", tpim)) {
				errorLogStatement.accept(signal.getThrowable());
			}
		},
		() -> errorLogStatement.accept(signal.getThrowable()));
	};
}


除了我们检查 Signal 实际上是 onError 以及我们向记录 lambda 语句。


在控制器中应用这个助手与我们之前所做的非常相似:

@GetMapping("/byPrice")
public Flux<Restaurant> byPrice(@RequestParam Double maxPrice, @RequestHeader(required = false, name = "X-UserId") String userId) {
	String apiId = userId == null ? "" : userId;

	return restaurantService.byPrice(maxPrice))
			   .doOnEach(logOnNext(v -> LOG.info("found restaurant {}", v))
			   .doOnEach(logOnError(e -> LOG.error("error when searching restaurants", e)) (1)
			   .contextWrite(Context.of("CONTEXT_KEY", apiId));
}
1
如果 restaurantService 发出错误,它将在此处与 MDC 上下文一起记录


建议编辑“常见问题解答、最佳实践和“我如何......?””


附录 D:Reactor-Extra


reactor-extra 工件包含额外的运算符和实用程序,供具有高级需求的 reactor-core 用户或正在孵化的运算符使用。


由于这是一个单独的工件,因此您需要将其显式添加到您的构建中。以下示例展示了如何在 Gradle 中执行此操作:

dependencies {
     compile 'io.projectreactor:reactor-core'
     compile 'io.projectreactor.addons:reactor-extra' (1)
}
1
除了核心之外,添加反应堆额外的工件。有关为什么使用 BOM 时不需要指定版本的详细信息、Maven 中的用法以及其他详细信息,请参阅获取 Reactor。


D.1. TupleUtils 和函数式接口


reactor.function 包包含补充 Java 8 FunctionPredicateConsumer 接口的功能接口,具有三到八个值。


TupleUtils 提供静态方法,充当这些功能接口的 lambda 与相应 Tuple 上的类似接口之间的桥梁。


这使您可以轻松使用任何 Tuple 的独立部分,如以下示例所示:

.map(tuple -> {
  String firstName = tuple.getT1();
  String lastName = tuple.getT2();
  String address = tuple.getT3();

  return new Customer(firstName, lastName, address);
});


您可以将前面的示例重写如下:

.map(TupleUtils.function(Customer::new)); (1)
1
(因为 Customer 构造函数符合 Function3 功能接口签名)


D.2。带有 MathFlux 的数学运算符


reactor.math 包包含 FluxMathFlux 专用版本,它提供数学运算符,包括 maxminsumIntaverageDouble 等。

 D.3。调度程序


Reactor-extra 附带 ForkJoinPoolScheduler (在 reactor.scheduler.forkjoin 包中):它使用 Java ForkJoinPool 来执行任务。


建议编辑“Reactor-Extra”