这是用户在 2024-5-13 24:01 为 https://spark.apache.org/docs/3.1.2/structured-streaming-programming-guide.html#overview 保存的双语快照页面,由 沉浸式翻译 提供双语支持。了解如何保存?

Structured Streaming Programming Guide
结构化流编程指南

Overview 概述

Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive. You can use the Dataset/DataFrame API in Scala, Java, Python or R to express streaming aggregations, event-time windows, stream-to-batch joins, etc. The computation is executed on the same optimized Spark SQL engine. Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write-Ahead Logs. In short, Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.
Structured Streaming是一个基于Spark SQL引擎的可扩展和容错流处理引擎。您可以像在静态数据上表示批处理计算一样来表示流计算。Spark SQL引擎将负责增量和连续地运行它,并在流数据继续到达时更新最终结果。您可以使用Scala、Java、Python或R中的Dataset/DataFrame API来表达流式聚合、事件时间窗口、流式到批量连接等。计算在相同的优化Spark SQL引擎上执行。最后,该系统通过检查点和预写操作确保了端到端的精确一次容错保证。简而言之,结构化流提供了快速、可扩展、容错、端到端的一次流处理,而用户不必考虑流。

Internally, by default, Structured Streaming queries are processed using a micro-batch processing engine, which processes data streams as a series of small batch jobs thereby achieving end-to-end latencies as low as 100 milliseconds and exactly-once fault-tolerance guarantees. However, since Spark 2.3, we have introduced a new low-latency processing mode called Continuous Processing, which can achieve end-to-end latencies as low as 1 millisecond with at-least-once guarantees. Without changing the Dataset/DataFrame operations in your queries, you will be able to choose the mode based on your application requirements.
在内部,默认情况下,结构化流查询使用微批处理引擎进行处理,该引擎将数据流作为一系列小批处理作业进行处理,从而实现低至100毫秒的端到端延迟和一次容错保证。然而,从Spark 2.3开始,我们引入了一种新的低延迟处理模式,称为连续处理,它可以实现端到端延迟低至1毫秒,并保证至少一次。无需更改查询中的Dataset/DataFrame操作,您就可以根据应用程序的需求选择模式。

In this guide, we are going to walk you through the programming model and the APIs. We are going to explain the concepts mostly using the default micro-batch processing model, and then later discuss Continuous Processing model. First, let’s start with a simple example of a Structured Streaming query - a streaming word count.
在本指南中,我们将向您介绍编程模型和API。我们将主要使用默认的微批处理模型来解释这些概念,然后再讨论连续处理模型。首先,让我们从结构化流查询的一个简单示例开始—流字数统计。

Quick Example 简单的例子

Let’s say you want to maintain a running word count of text data received from a data server listening on a TCP socket. Let’s see how you can express this using Structured Streaming. You can see the full code in Scala/Java/Python/R. And if you download Spark, you can directly run the example. In any case, let’s walk through the example step-by-step and understand how it works. First, we have to import the necessary classes and create a local SparkSession, the starting point of all functionalities related to Spark.
假设您希望维护从侦听TCP套接字的数据服务器接收的文本数据的运行字数。让我们看看如何使用结构化流来表达这一点。你可以在Scala/Java/Python/R中看到完整的代码。如果你下载了Spark,你可以直接运行这个例子。无论如何,让我们一步一步地看一下这个例子,了解它是如何工作的。首先,我们必须导入必要的类并创建一个本地SparkSession,这是与Spark相关的所有功能的起点。

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder
  .appName("StructuredNetworkWordCount")
  .getOrCreate()
  
import spark.implicits._
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;

import java.util.Arrays;
import java.util.Iterator;

SparkSession spark = SparkSession
  .builder()
  .appName("JavaStructuredNetworkWordCount")
  .getOrCreate();
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()
sparkR.session(appName = "StructuredNetworkWordCount")

Next, let’s create a streaming DataFrame that represents text data received from a server listening on localhost:9999, and transform the DataFrame to calculate word counts.
接下来,让我们创建一个流DataFrame,它表示从监听localhost:9999的服务器接收的文本数据,并转换DataFrame以计算字数。

// Create DataFrame representing the stream of input lines from connection to localhost:9999
val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

// Split the lines into words
val words = lines.as[String].flatMap(_.split(" "))

// Generate running word count
val wordCounts = words.groupBy("value").count()

This lines DataFrame represents an unbounded table containing the streaming text data. This table contains one column of strings named “value”, and each line in the streaming text data becomes a row in the table. Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it. Next, we have converted the DataFrame to a Dataset of String using .as[String], so that we can apply the flatMap operation to split each line into multiple words. The resultant words Dataset contains all the words. Finally, we have defined the wordCounts DataFrame by grouping by the unique values in the Dataset and counting them. Note that this is a streaming DataFrame which represents the running word counts of the stream.
lines DataFrame表示包含流式文本数据的无边界表。此表包含一列名为“value”的字符串,流文本数据中的每一行都成为表中的一行。请注意,由于我们刚刚设置转换,因此它当前没有接收任何数据,并且尚未启动它。接下来,我们使用 .as[String] 将DataFrame转换为String的Dataset,以便我们可以应用 flatMap 操作将每行拆分为多个单词。生成的 words 数据集包含所有单词。最后,我们定义了 wordCounts DataFrame,方法是按Dataset中的唯一值进行分组并对它们进行计数。请注意,这是一个流数据帧,它表示流的运行字数。

// Create DataFrame representing the stream of input lines from connection to localhost:9999
Dataset<Row> lines = spark
  .readStream()
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load();

// Split the lines into words
Dataset<String> words = lines
  .as(Encoders.STRING())
  .flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(), Encoders.STRING());

// Generate running word count
Dataset<Row> wordCounts = words.groupBy("value").count();

This lines DataFrame represents an unbounded table containing the streaming text data. This table contains one column of strings named “value”, and each line in the streaming text data becomes a row in the table. Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it. Next, we have converted the DataFrame to a Dataset of String using .as(Encoders.STRING()), so that we can apply the flatMap operation to split each line into multiple words. The resultant words Dataset contains all the words. Finally, we have defined the wordCounts DataFrame by grouping by the unique values in the Dataset and counting them. Note that this is a streaming DataFrame which represents the running word counts of the stream.
lines DataFrame表示包含流式文本数据的无边界表。此表包含一列名为“value”的字符串,流文本数据中的每一行都成为表中的一行。请注意,由于我们刚刚设置转换,因此它当前没有接收任何数据,并且尚未启动它。接下来,我们使用 .as(Encoders.STRING()) 将DataFrame转换为String的Dataset,以便我们可以应用 flatMap 操作将每行拆分为多个单词。生成的 words 数据集包含所有单词。最后,我们定义了 wordCounts DataFrame,方法是按Dataset中的唯一值进行分组并对它们进行计数。请注意,这是一个流数据帧,它表示流的运行字数。

# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

# Split the lines into words
words = lines.select(
   explode(
       split(lines.value, " ")
   ).alias("word")
)

# Generate running word count
wordCounts = words.groupBy("word").count()

This lines DataFrame represents an unbounded table containing the streaming text data. This table contains one column of strings named “value”, and each line in the streaming text data becomes a row in the table. Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it. Next, we have used two built-in SQL functions - split and explode, to split each line into multiple rows with a word each. In addition, we use the function alias to name the new column as “word”. Finally, we have defined the wordCounts DataFrame by grouping by the unique values in the Dataset and counting them. Note that this is a streaming DataFrame which represents the running word counts of the stream.

# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines <- read.stream("socket", host = "localhost", port = 9999)

# Split the lines into words
words <- selectExpr(lines, "explode(split(value, ' ')) as word")

# Generate running word count
wordCounts <- count(group_by(words, "word"))

This lines SparkDataFrame represents an unbounded table containing the streaming text data. This table contains one column of strings named “value”, and each line in the streaming text data becomes a row in the table. Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it. Next, we have a SQL expression with two SQL functions - split and explode, to split each line into multiple rows with a word each. In addition, we name the new column as “word”. Finally, we have defined the wordCounts SparkDataFrame by grouping by the unique values in the SparkDataFrame and counting them. Note that this is a streaming SparkDataFrame which represents the running word counts of the stream.

We have now set up the query on the streaming data. All that is left is to actually start receiving data and computing the counts. To do this, we set it up to print the complete set of counts (specified by outputMode("complete")) to the console every time they are updated. And then start the streaming computation using start().
我们现在已经设置了对流数据的查询。剩下的就是实际开始接收数据和计算计数。为此,我们将其设置为在每次更新时将完整的计数集(由 outputMode("complete") 指定)打印到控制台。然后使用 start() 开始流式计算。

// Start running the query that prints the running counts to the console
val query = wordCounts.writeStream
  .outputMode("complete")
  .format("console")
  .start()

query.awaitTermination()
// Start running the query that prints the running counts to the console
StreamingQuery query = wordCounts.writeStream()
  .outputMode("complete")
  .format("console")
  .start();

query.awaitTermination();
 # Start running the query that prints the running counts to the console
query = wordCounts \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query.awaitTermination()
# Start running the query that prints the running counts to the console
query <- write.stream(wordCounts, "console", outputMode = "complete")

awaitTermination(query)

After this code is executed, the streaming computation will have started in the background. The query object is a handle to that active streaming query, and we have decided to wait for the termination of the query using awaitTermination() to prevent the process from exiting while the query is active.
执行此代码后,流计算将在后台启动。 query 对象是该活动流查询的句柄,我们决定使用 awaitTermination() 等待查询终止,以防止进程在查询活动时退出。

To actually execute this example code, you can either compile the code in your own Spark application, or simply run the example once you have downloaded Spark. We are showing the latter. You will first need to run Netcat (a small utility found in most Unix-like systems) as a data server by using

$ nc -lk 9999

Then, in a different terminal, you can start the example by using
然后,在另一个终端中,您可以使用

$ ./bin/run-example org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount localhost 9999
$ ./bin/run-example org.apache.spark.examples.sql.streaming.JavaStructuredNetworkWordCount localhost 9999
$ ./bin/spark-submit examples/src/main/python/sql/streaming/structured_network_wordcount.py localhost 9999
$ ./bin/spark-submit examples/src/main/r/streaming/structured_network_wordcount.R localhost 9999

Then, any lines typed in the terminal running the netcat server will be counted and printed on screen every second. It will look something like the following.
然后,在运行netcat服务器的终端中键入的任何行将每秒计数并打印在屏幕上。它看起来像下面这样。

# TERMINAL 1:
# Running Netcat

$ nc -lk 9999
apache spark
apache hadoop



















...
# TERMINAL 2: RUNNING StructuredNetworkWordCount

$ ./bin/run-example org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount localhost 9999

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    1|
| spark|    1|
+------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    2|
| spark|    1|
|hadoop|    1|
+------+-----+
...
# TERMINAL 2: RUNNING JavaStructuredNetworkWordCount

$ ./bin/run-example org.apache.spark.examples.sql.streaming.JavaStructuredNetworkWordCount localhost 9999

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    1|
| spark|    1|
+------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    2|
| spark|    1|
|hadoop|    1|
+------+-----+
...
# TERMINAL 2: RUNNING structured_network_wordcount.py

$ ./bin/spark-submit examples/src/main/python/sql/streaming/structured_network_wordcount.py localhost 9999

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    1|
| spark|    1|
+------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    2|
| spark|    1|
|hadoop|    1|
+------+-----+
...
# TERMINAL 2: RUNNING structured_network_wordcount.R

$ ./bin/spark-submit examples/src/main/r/streaming/structured_network_wordcount.R localhost 9999

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    1|
| spark|    1|
+------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    2|
| spark|    1|
|hadoop|    1|
+------+-----+
...

Programming Model 编程模型

The key idea in Structured Streaming is to treat a live data stream as a table that is being continuously appended. This leads to a new stream processing model that is very similar to a batch processing model. You will express your streaming computation as standard batch-like query as on a static table, and Spark runs it as an incremental query on the unbounded input table. Let’s understand this model in more detail.
结构化流中的关键思想是将实时数据流视为不断追加的表。这导致了一个新的流处理模型,它非常类似于批处理模型。您将把流计算表示为静态表上的标准批处理查询,Spark将其作为无界输入表上的增量查询运行。让我们更详细地了解这个模型。

Basic Concepts 基本概念

Consider the input data stream as the “Input Table”. Every data item that is arriving on the stream is like a new row being appended to the Input Table.
将输入数据流视为“输入表”。到达流的每个数据项就像是附加到输入表的新行。

Stream as a Table

A query on the input will generate the “Result Table”. Every trigger interval (say, every 1 second), new rows get appended to the Input Table, which eventually updates the Result Table. Whenever the result table gets updated, we would want to write the changed result rows to an external sink.
对输入的查询将生成“结果表”。在每个触发间隔(比如说,每1秒),新的行被附加到输入表,这最终更新了结果表。每当结果表被更新时,我们都希望将更改后的结果行写入外部接收器。

Model

The “Output” is defined as what gets written out to the external storage. The output can be defined in a different mode:
“输出”被定义为写入外部存储器的内容。可以在不同的模式下定义输出:

Note that each mode is applicable on certain types of queries. This is discussed in detail later.
请注意,每种模式都适用于某些类型的查询。这将在后面详细讨论。

To illustrate the use of this model, let’s understand the model in context of the Quick Example above. The first lines DataFrame is the input table, and the final wordCounts DataFrame is the result table. Note that the query on streaming lines DataFrame to generate wordCounts is exactly the same as it would be a static DataFrame. However, when this query is started, Spark will continuously check for new data from the socket connection. If there is new data, Spark will run an “incremental” query that combines the previous running counts with the new data to compute updated counts, as shown below.
为了说明这个模型的使用,让我们在上面的快速示例的上下文中理解这个模型。第一个 lines DataFrame是输入表,最后一个 wordCounts DataFrame是结果表。请注意,在流 lines DataFrame上生成 wordCounts 的查询与静态DataFrame完全相同。但是,当这个查询启动时,Spark会不断地检查来自套接字连接的新数据。如果有新数据,Spark将运行一个“增量”查询,将以前的运行计数与新数据结合起来计算更新的计数,如下所示。

Model

Note that Structured Streaming does not materialize the entire table. It reads the latest available data from the streaming data source, processes it incrementally to update the result, and then discards the source data. It only keeps around the minimal intermediate state data as required to update the result (e.g. intermediate counts in the earlier example).
请注意,结构化流不会具体化整个表。它从流数据源中读取最新的可用数据,增量处理数据以更新结果,然后丢弃源数据。它只保留更新结果所需的最小中间状态数据(例如,前面示例中的中间计数)。

This model is significantly different from many other stream processing engines. Many streaming systems require the user to maintain running aggregations themselves, thus having to reason about fault-tolerance, and data consistency (at-least-once, or at-most-once, or exactly-once). In this model, Spark is responsible for updating the Result Table when there is new data, thus relieving the users from reasoning about it. As an example, let’s see how this model handles event-time based processing and late arriving data.
这个模型与许多其他流处理引擎有很大的不同。许多流媒体系统要求用户自己维护正在运行的聚合,因此必须考虑容错和数据一致性(至少一次,或最多一次,或正好一次)。在这个模型中,Spark负责在有新数据时更新结果表,从而减轻用户的推理。作为一个例子,让我们看看这个模型如何处理基于事件时间的处理和延迟到达的数据。

Handling Event-time and Late Data
处理事件时间和延迟数据

Event-time is the time embedded in the data itself. For many applications, you may want to operate on this event-time. For example, if you want to get the number of events generated by IoT devices every minute, then you probably want to use the time when the data was generated (that is, event-time in the data), rather than the time Spark receives them. This event-time is very naturally expressed in this model – each event from the devices is a row in the table, and event-time is a column value in the row. This allows window-based aggregations (e.g. number of events every minute) to be just a special type of grouping and aggregation on the event-time column – each time window is a group and each row can belong to multiple windows/groups. Therefore, such event-time-window-based aggregation queries can be defined consistently on both a static dataset (e.g. from collected device events logs) as well as on a data stream, making the life of the user much easier.
事件时间是嵌入数据本身的时间。对于许多应用程序,您可能希望在此事件时间上进行操作。例如,如果你想获得物联网设备每分钟生成的事件数量,那么你可能想使用数据生成的时间(即数据中的事件时间),而不是Spark接收它们的时间。这个事件时间在这个模型中非常自然地表示-来自设备的每个事件都是表中的一行,而事件时间是该行中的一个列值。这允许基于窗口的聚合(例如,每分钟的事件数量)只是事件-时间列上的特殊类型的分组和聚合-每个时间窗口是一个组,并且每行可以属于多个窗口/组。因此,可以在静态数据集(例如,来自收集的设备事件日志)以及数据流上一致地定义这种基于事件时间窗口的聚合查询,从而使用户的生活更加容易。

Furthermore, this model naturally handles data that has arrived later than expected based on its event-time. Since Spark is updating the Result Table, it has full control over updating old aggregates when there is late data, as well as cleaning up old aggregates to limit the size of intermediate state data. Since Spark 2.1, we have support for watermarking which allows the user to specify the threshold of late data, and allows the engine to accordingly clean up old state. These are explained later in more detail in the Window Operations section.
此外,该模型自然地处理基于其事件时间比预期晚到达的数据。由于Spark正在更新结果表,因此它可以完全控制在有延迟数据时更新旧的聚合,以及清理旧的聚合以限制中间状态数据的大小。从Spark 2.1开始,我们支持水印,允许用户指定延迟数据的阈值,并允许引擎相应地清理旧状态。这些将在稍后的窗口操作部分中详细解释。

Fault Tolerance Semantics
容错语义

Delivering end-to-end exactly-once semantics was one of key goals behind the design of Structured Streaming. To achieve that, we have designed the Structured Streaming sources, the sinks and the execution engine to reliably track the exact progress of the processing so that it can handle any kind of failure by restarting and/or reprocessing. Every streaming source is assumed to have offsets (similar to Kafka offsets, or Kinesis sequence numbers) to track the read position in the stream. The engine uses checkpointing and write-ahead logs to record the offset range of the data being processed in each trigger. The streaming sinks are designed to be idempotent for handling reprocessing. Together, using replayable sources and idempotent sinks, Structured Streaming can ensure end-to-end exactly-once semantics under any failure.
提供端到端的精确一次语义是结构化流设计背后的关键目标之一。为了实现这一目标,我们设计了结构化流源,接收器和执行引擎,以可靠地跟踪处理的确切进度,以便它可以通过重新启动和/或重新处理来处理任何类型的故障。假设每个流式源都有偏移量(类似于Kafka偏移量或Kinesis序列号)来跟踪流中的读取位置。引擎使用检查点和预写日志来记录每个触发器中正在处理的数据的偏移量范围。流式接收器被设计为对于处理再处理是幂等的。同时,使用可重放的源和幂等宿,结构化流可以确保在任何故障下的端到端精确一次语义。

API using Datasets and DataFrames
使用数据集和数据帧的API

Since Spark 2.0, DataFrames and Datasets can represent static, bounded data, as well as streaming, unbounded data. Similar to static Datasets/DataFrames, you can use the common entry point SparkSession (Scala/Java/Python/R docs) to create streaming DataFrames/Datasets from streaming sources, and apply the same operations on them as static DataFrames/Datasets. If you are not familiar with Datasets/DataFrames, you are strongly advised to familiarize yourself with them using the DataFrame/Dataset Programming Guide.
从Spark 2.0开始,DataFrames和Datasets可以表示静态的有界数据,也可以表示流式的无界数据。与静态数据集/数据帧类似,您可以使用公共入口点 SparkSession (Scala/Java/Python/R docs)从流源创建流数据帧/数据集,并对它们应用与静态数据帧/数据集相同的操作。如果您不熟悉数据集/数据框架,强烈建议您使用数据框架/数据集编程指南来熟悉它们。

Creating streaming DataFrames and streaming Datasets
创建流式数据帧和流式数据集

Streaming DataFrames can be created through the DataStreamReader interface (Scala/Java/Python docs) returned by SparkSession.readStream(). In R, with the read.stream() method. Similar to the read interface for creating static DataFrame, you can specify the details of the source – data format, schema, options, etc.
流数据帧可以通过 SparkSession.readStream() 返回的 DataStreamReader 接口(Scala/ Java/ Python文档)创建。在R中,使用 read.stream() 方法。与创建静态DataFrame的read接口类似,您可以指定源数据格式、模式、选项等细节。

Input Sources 输入源

There are a few built-in sources.
有几个内置的源。

Some sources are not fault-tolerant because they do not guarantee that data can be replayed using checkpointed offsets after a failure. See the earlier section on fault-tolerance semantics. Here are the details of all the sources in Spark.
有些数据源是不容错的,因为它们不能保证数据在发生故障后可以使用检查点偏移量重放。请参阅前面关于容错语义的部分。以下是Spark中所有源代码的详细信息。

Source Options Fault-tolerant Notes
File source 文件源 path: path to the input directory, and common to all file formats.
path :输入目录的路径,对所有文件格式通用。

maxFilesPerTrigger: maximum number of new files to be considered in every trigger (default: no max)
maxFilesPerTrigger :每次触发器中考虑的最大新文件数(默认值:无最大值)

latestFirst: whether to process the latest new files first, useful when there is a large backlog of files (default: false)
latestFirst :是否首先处理最新的新文件,在有大量积压文件时很有用(默认值:false)

fileNameOnly: whether to check new files based on only the filename instead of on the full path (default: false). With this set to `true`, the following files would be considered as the same file, because their filenames, "dataset.txt", are the same:
fileNameOnly :是否仅根据文件名而不是完整路径检查新文件(默认值:false)。如果将此设置为“true”,则以下文件将被视为同一文件,因为它们的文件名“soft.txt”是相同的:

"file:///dataset.txt"  “文件:/文件集. txt”
"s3://a/dataset.txt"  “s3://a/dataset.txt”
"s3n://a/b/dataset.txt"  “s3n://a/b/paget.txt”
"s3a://a/b/c/dataset.txt"
“s3a://a/b/c/soft.txt”

maxFileAge: Maximum age of a file that can be found in this directory, before it is ignored. For the first batch all files will be considered valid. If latestFirst is set to `true` and maxFilesPerTrigger is set, then this parameter will be ignored, because old files that are valid, and should be processed, may be ignored. The max age is specified with respect to the timestamp of the latest file, and not the timestamp of the current system.(default: 1 week)
maxFileAge :在此目录中可以找到的文件被忽略之前的最大年龄。对于第一批所有文件将被视为有效。如果 latestFirst 被设置为“true”,而 maxFilesPerTrigger 被设置,那么这个参数将被忽略,因为旧的文件是有效的,应该被处理,可能会被忽略。最大年龄是根据最新文件的时间戳而不是当前系统的时间戳指定的。(默认值:1周)

cleanSource: option to clean up completed files after processing.
cleanSource :选项来清理处理后完成的文件。

Available options are "archive", "delete", "off". If the option is not provided, the default value is "off".
可用选项有“存档”、“删除”、“关闭”。如果未提供该选项,则默认值为“off”。

When "archive" is provided, additional option sourceArchiveDir must be provided as well. The value of "sourceArchiveDir" must not match with source pattern in depth (the number of directories from the root directory), where the depth is minimum of depth on both paths. This will ensure archived files are never included as new source files.
当提供“archive”时,还必须提供附加选项 sourceArchiveDir 。“sourceArchiveDir”的值不能与源模式的深度(从根目录开始的目录数)匹配,其中深度是两个路径上的最小深度。这将确保归档文件永远不会作为新的源文件包括在内。

For example, suppose you provide '/hello?/spark/*' as source pattern, '/hello1/spark/archive/dir' cannot be used as the value of "sourceArchiveDir", as '/hello?/spark/*' and '/hello1/spark/archive' will be matched. '/hello1/spark' cannot be also used as the value of "sourceArchiveDir", as '/hello?/spark' and '/hello1/spark' will be matched. '/archived/here' would be OK as it doesn't match.
例如,假设您提供'/hello?/ spark/*“作为源模式,”/hello 1/spark/archive/dir“不能用作“sourceArchiveDir”的值,因为“/hello?/ spark/*'和'/hello 1/spark/archive'将匹配。“/hello 1/spark”不能也用作“sourceArchiveDir”的值,因为“/hello?/ spark'和'/hello 1/spark'将被匹配。“/archived/here”可以,因为它不匹配。

Spark will move source files respecting their own path. For example, if the path of source file is /a/b/dataset.txt and the path of archive directory is /archived/here, file will be moved to /archived/here/a/b/dataset.txt.
Spark会根据源文件的路径移动源文件。例如,如果源文件的路径是 /a/b/dataset.txt ,存档目录的路径是 /archived/here ,则文件将被移动到 /archived/here/a/b/dataset.txt

NOTE: Both archiving (via moving) or deleting completed files will introduce overhead (slow down, even if it's happening in separate thread) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enabling this option will reduce the cost to list source files which can be an expensive operation.
注意事项:归档(通过移动)或删除已完成的文件都会在每个微批处理中引入开销(即使是在单独的线程中也会减慢),因此在启用此选项之前,您需要了解文件系统中每个操作的成本。另一方面,启用此选项将减少列出源文件的成本,这可能是一个昂贵的操作。

Number of threads used in completed file cleaner can be configured withspark.sql.streaming.fileSource.cleaner.numThreads (default: 1).
完成的文件清理程序中使用的线程数可以使用 spark.sql.streaming.fileSource.cleaner.numThreads 配置(默认值:1)。

NOTE 2: The source path should not be used from multiple sources or queries when enabling this option. Similarly, you must ensure the source path doesn't match to any files in output directory of file stream sink.
注2:启用此选项时,不应使用来自多个源或查询的源路径。同样,您必须确保源路径与文件流接收器的输出目录中的任何文件都不匹配。

NOTE 3: Both delete and move actions are best effort. Failing to delete or move files will not fail the streaming query. Spark may not clean up some source files in some circumstances - e.g. the application doesn't shut down gracefully, too many files are queued to clean up.
注3:删除和移动操作都是尽力而为。未能删除或移动文件不会导致流式查询失败。在某些情况下,Spark可能无法清理某些源文件-例如,应用程序无法正常关闭,太多文件排队等待清理。


For file-format-specific options, see the related methods in DataStreamReader (Scala/Java/Python/R). E.g. for "parquet" format options see DataStreamReader.parquet().
有关文件格式特定的选项,请参阅 DataStreamReader (Scala/ Java/ Python/ R)中的相关方法。例如,有关“镶木地板”格式选项,请参见 DataStreamReader.parquet()


In addition, there are session configurations that affect certain file-formats. See the SQL Programming Guide for more details. E.g., for "parquet", see Parquet configuration section.
此外,还有一些会话配置会影响某些文件格式。有关详细信息,请参阅SQL编程指南。例如,在一个示例中,有关“镶木地板”,请参见镶木地板配置部分。
Yes Supports glob paths, but does not support multiple comma-separated paths/globs.
支持glob路径,但不支持多个逗号分隔的路径/glob。
Socket Source 套接字源 host: host to connect to, must be specified
host :要连接的主机,必须指定

port: port to connect to, must be specified
port :连接端口,必须指定
No
Rate Source 速率源 rowsPerSecond (e.g. 100, default: 1): How many rows should be generated per second.
rowsPerSecond (例如100,默认值:1):每秒应该生成多少行。


rampUpTime (e.g. 5s, default: 0s): How long to ramp up before the generating speed becomes rowsPerSecond. Using finer granularities than seconds will be truncated to integer seconds.
rampUpTime (例如:5s,默认值:0s):发电速度变为 rowsPerSecond 之前的斜升时间。使用比秒更细的粒度将被截断为整数秒。


numPartitions (e.g. 10, default: Spark's default parallelism): The partition number for the generated rows.
numPartitions (例如10,默认值:Spark的默认并行度):生成的行的分区号。


The source will try its best to reach rowsPerSecond, but the query may be resource constrained, and numPartitions can be tweaked to help reach the desired speed.
源将尽最大努力到达 rowsPerSecond ,但查询可能受到资源限制,并且可以调整 numPartitions 以帮助达到所需的速度。
Yes
Kafka Source Kafka来源 See the Kafka Integration Guide.
请参阅Kafka集成指南。
Yes

Here are some examples.
这里有一些例子。

val spark: SparkSession = ...

// Read text from socket
val socketDF = spark
  .readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

socketDF.isStreaming    // Returns True for DataFrames that have streaming sources

socketDF.printSchema

// Read all the csv files written atomically in a directory
val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark
  .readStream
  .option("sep", ";")
  .schema(userSchema)      // Specify schema of the csv files
  .csv("/path/to/directory")    // Equivalent to format("csv").load("/path/to/directory")
SparkSession spark = ...

// Read text from socket
Dataset<Row> socketDF = spark
  .readStream()
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load();

socketDF.isStreaming();    // Returns True for DataFrames that have streaming sources

socketDF.printSchema();

// Read all the csv files written atomically in a directory
StructType userSchema = new StructType().add("name", "string").add("age", "integer");
Dataset<Row> csvDF = spark
  .readStream()
  .option("sep", ";")
  .schema(userSchema)      // Specify schema of the csv files
  .csv("/path/to/directory");    // Equivalent to format("csv").load("/path/to/directory")
spark = SparkSession. ...

# Read text from socket
socketDF = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

socketDF.isStreaming()    # Returns True for DataFrames that have streaming sources

socketDF.printSchema()

# Read all the csv files written atomically in a directory
userSchema = StructType().add("name", "string").add("age", "integer")
csvDF = spark \
    .readStream \
    .option("sep", ";") \
    .schema(userSchema) \
    .csv("/path/to/directory")  # Equivalent to format("csv").load("/path/to/directory")
sparkR.session(...)

# Read text from socket
socketDF <- read.stream("socket", host = hostname, port = port)

isStreaming(socketDF)    # Returns TRUE for SparkDataFrames that have streaming sources

printSchema(socketDF)

# Read all the csv files written atomically in a directory
schema <- structType(structField("name", "string"),
                     structField("age", "integer"))
csvDF <- read.stream("csv", path = "/path/to/directory", schema = schema, sep = ";")

These examples generate streaming DataFrames that are untyped, meaning that the schema of the DataFrame is not checked at compile time, only checked at runtime when the query is submitted. Some operations like map, flatMap, etc. need the type to be known at compile time. To do those, you can convert these untyped streaming DataFrames to typed streaming Datasets using the same methods as static DataFrame. See the SQL Programming Guide for more details. Additionally, more details on the supported streaming sources are discussed later in the document.
这些示例生成无类型的流数据帧,这意味着在编译时不检查数据帧的架构,而只在提交查询时检查。一些操作,如 mapflatMap 等,需要在编译时知道类型。为此,您可以使用与静态DataFrame相同的方法将这些非类型化的流式DataFrame转换为类型化的流式Datasets。有关详细信息,请参阅SQL编程指南。此外,有关支持的流媒体源的更多细节将在本文档后面讨论。

Since Spark 3.1, you can also create streaming DataFrames from tables with DataStreamReader.table(). See Streaming Table APIs for more details.
从Spark 3.1开始,您还可以使用 DataStreamReader.table() 从表中创建流数据帧。有关详细信息,请参阅流式表API。

Schema inference and partition of streaming DataFrames/Datasets
流数据帧/数据集的模式推理和划分

By default, Structured Streaming from file based sources requires you to specify the schema, rather than rely on Spark to infer it automatically. This restriction ensures a consistent schema will be used for the streaming query, even in the case of failures. For ad-hoc use cases, you can reenable schema inference by setting spark.sql.streaming.schemaInference to true.
默认情况下,来自基于文件的源的结构化流需要您指定模式,而不是依赖Spark自动推断它。此限制确保了流查询使用一致的模式,即使在失败的情况下也是如此。对于特殊用例,您可以通过将 spark.sql.streaming.schemaInference 设置为 true 来重新启用模式推断。

Partition discovery does occur when subdirectories that are named /key=value/ are present and listing will automatically recurse into these directories. If these columns appear in the user-provided schema, they will be filled in by Spark based on the path of the file being read. The directories that make up the partitioning scheme must be present when the query starts and must remain static. For example, it is okay to add /data/year=2016/ when /data/year=2015/ was present, but it is invalid to change the partitioning column (i.e. by creating the directory /data/date=2016-04-17/).
当存在名为 /key=value/ 的子目录时,会发生分区发现,列表将自动递归到这些目录。如果这些列出现在用户提供的模式中,Spark将根据正在读取的文件的路径填充它们。组成分区方案的目录在查询开始时必须存在,并且必须保持静态。例如,当存在 /data/year=2015/ 时添加 /data/year=2016/ 是可以的,但是更改分区列(即通过创建目录 /data/date=2016-04-17/ )是无效的。

Operations on streaming DataFrames/Datasets
流数据帧/数据集上的操作

You can apply all kinds of operations on streaming DataFrames/Datasets – ranging from untyped, SQL-like operations (e.g. select, where, groupBy), to typed RDD-like operations (e.g. map, filter, flatMap). See the SQL programming guide for more details. Let’s take a look at a few example operations that you can use.
您可以对流式数据帧/数据集应用各种操作-从非类型化的SQL类操作(例如 selectwheregroupBy )到类型化的RDD类操作(例如 mapfilterflatMap )。有关详细信息,请参阅SQL编程指南。让我们来看看您可以使用的几个示例操作。

Basic Operations - Selection, Projection, Aggregation
基本操作-选择、投影、聚合

Most of the common operations on DataFrame/Dataset are supported for streaming. The few operations that are not supported are discussed later in this section.
流支持DataFrame/Dataset上的大多数常见操作。不支持的几个操作将在本节后面讨论。

case class DeviceData(device: String, deviceType: String, signal: Double, time: DateTime)

val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: string }
val ds: Dataset[DeviceData] = df.as[DeviceData]    // streaming Dataset with IOT device data

// Select the devices which have signal more than 10
df.select("device").where("signal > 10")      // using untyped APIs   
ds.filter(_.signal > 10).map(_.device)         // using typed APIs

// Running count of the number of updates for each device type
df.groupBy("deviceType").count()                          // using untyped API

// Running average signal for each device type
import org.apache.spark.sql.expressions.scalalang.typed
ds.groupByKey(_.deviceType).agg(typed.avg(_.signal))    // using typed API
import org.apache.spark.api.java.function.*;
import org.apache.spark.sql.*;
import org.apache.spark.sql.expressions.javalang.typed;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;

public class DeviceData {
  private String device;
  private String deviceType;
  private Double signal;
  private java.sql.Date time;
  ...
  // Getter and setter methods for each field
}

Dataset<Row> df = ...;    // streaming DataFrame with IOT device data with schema { device: string, type: string, signal: double, time: DateType }
Dataset<DeviceData> ds = df.as(ExpressionEncoder.javaBean(DeviceData.class)); // streaming Dataset with IOT device data

// Select the devices which have signal more than 10
df.select("device").where("signal > 10"); // using untyped APIs
ds.filter((FilterFunction<DeviceData>) value -> value.getSignal() > 10)
  .map((MapFunction<DeviceData, String>) value -> value.getDevice(), Encoders.STRING());

// Running count of the number of updates for each device type
df.groupBy("deviceType").count(); // using untyped API

// Running average signal for each device type
ds.groupByKey((MapFunction<DeviceData, String>) value -> value.getDeviceType(), Encoders.STRING())
  .agg(typed.avg((MapFunction<DeviceData, Double>) value -> value.getSignal()));
df = ...  # streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: DateType }

# Select the devices which have signal more than 10
df.select("device").where("signal > 10")

# Running count of the number of updates for each device type
df.groupBy("deviceType").count()
df <- ...  # streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: DateType }

# Select the devices which have signal more than 10
select(where(df, "signal > 10"), "device")

# Running count of the number of updates for each device type
count(groupBy(df, "deviceType"))

You can also register a streaming DataFrame/Dataset as a temporary view and then apply SQL commands on it.
您还可以将流式DataFrame/Dataset注册为临时视图,然后对其应用SQL命令。

df.createOrReplaceTempView("updates")
spark.sql("select count(*) from updates")  // returns another streaming DF
df.createOrReplaceTempView("updates");
spark.sql("select count(*) from updates");  // returns another streaming DF
df.createOrReplaceTempView("updates")
spark.sql("select count(*) from updates")  # returns another streaming DF
createOrReplaceTempView(df, "updates")
sql("select count(*) from updates")

Note, you can identify whether a DataFrame/Dataset has streaming data or not by using df.isStreaming.
请注意,您可以使用 df.isStreaming 来识别DataFrame/Dataset是否具有流数据。

df.isStreaming
df.isStreaming()
df.isStreaming()
isStreaming(df)

You may want to check the query plan of the query, as Spark could inject stateful operations during interpret of SQL statement against streaming dataset. Once stateful operations are injected in the query plan, you may need to check your query with considerations in stateful operations. (e.g. output mode, watermark, state store size maintenance, etc.)
您可能需要检查查询的查询计划,因为Spark可能会在针对流数据集解释SQL语句时注入有状态操作。在查询计划中注入有状态操作后,您可能需要检查查询中有状态操作的注意事项。(e.g.输出模式、水印、状态存储大小维护等)

Window Operations on Event Time
事件时间上的窗口操作

Aggregations over a sliding event-time window are straightforward with Structured Streaming and are very similar to grouped aggregations. In a grouped aggregation, aggregate values (e.g. counts) are maintained for each unique value in the user-specified grouping column. In case of window-based aggregations, aggregate values are maintained for each window the event-time of a row falls into. Let’s understand this with an illustration.
在滑动事件时间窗口上的聚合对于结构化流来说很简单,并且与分组聚合非常相似。在分组聚合中,为用户指定的分组列中的每个唯一值维护聚合值(例如计数)。在基于窗口的聚合的情况下,为行福尔斯的事件时间落入的每个窗口维护聚合值。让我们用一个例子来理解这一点。

Imagine our quick example is modified and the stream now contains lines along with the time when the line was generated. Instead of running word counts, we want to count words within 10 minute windows, updating every 5 minutes. That is, word counts in words received between 10 minute windows 12:00 - 12:10, 12:05 - 12:15, 12:10 - 12:20, etc. Note that 12:00 - 12:10 means data that arrived after 12:00 but before 12:10. Now, consider a word that was received at 12:07. This word should increment the counts corresponding to two windows 12:00 - 12:10 and 12:05 - 12:15. So the counts will be indexed by both, the grouping key (i.e. the word) and the window (can be calculated from the event-time).
假设修改了我们的快速示例,流现在包含了沿着生成时间的行。而不是运行单词计数,我们希望在10分钟内计算单词,每5分钟更新一次。也就是说,在10分钟窗口12:00 - 12:10、12:05 - 12:15、12:10 - 12:20等之间接收的单词中的单词计数。注意,12:00 - 12:10表示在12:00之后但在12:10之前到达的数据。现在,考虑一个在12:07收到的单词。这个词应该增加对应于两个窗口12:00 - 12:10和12:05 - 12:15的计数。因此,计数将由分组键(即单词)和窗口(可以从事件时间计算)两者索引。

The result tables would look something like the following.
结果表如下所示。

Window Operations

Since this windowing is similar to grouping, in code, you can use groupBy() and window() operations to express windowed aggregations. You can see the full code for the below examples in Scala/Java/Python.
由于这种窗口化类似于分组,在代码中,您可以使用 groupBy()window() 操作来表达窗口化聚合。你可以在Scala/ Java/ Python中看到下面例子的完整代码。

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()
Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words.groupBy(
  functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
  words.col("word")
).count();
words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()
words <- ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts <- count(
                    groupBy(
                      words,
                      window(words$timestamp, "10 minutes", "5 minutes"),
                      words$word))

Handling Late Data and Watermarking
处理延迟数据和水印

Now consider what happens if one of the events arrives late to the application. For example, say, a word generated at 12:04 (i.e. event time) could be received by the application at 12:11. The application should use the time 12:04 instead of 12:11 to update the older counts for the window 12:00 - 12:10. This occurs naturally in our window-based grouping – Structured Streaming can maintain the intermediate state for partial aggregates for a long period of time such that late data can update aggregates of old windows correctly, as illustrated below.
现在考虑一下,如果其中一个事件延迟到达应用程序会发生什么。例如,在12:04(即事件时间)生成的单词可能在12:11被应用程序接收。应用程序应使用时间12:04而不是12:11来更新窗口 12:00 - 12:10 的较旧计数。这在我们的基于窗口的分组中自然发生-结构化流可以在很长一段时间内保持部分聚合的中间状态,以便后期数据可以正确更新旧窗口的聚合,如下图所示。

Handling Late Data

However, to run this query for days, it’s necessary for the system to bound the amount of intermediate in-memory state it accumulates. This means the system needs to know when an old aggregate can be dropped from the in-memory state because the application is not going to receive late data for that aggregate any more. To enable this, in Spark 2.1, we have introduced watermarking, which lets the engine automatically track the current event time in the data and attempt to clean up old state accordingly. You can define the watermark of a query by specifying the event time column and the threshold on how late the data is expected to be in terms of event time. For a specific window ending at time T, the engine will maintain state and allow late data to update the state until (max event time seen by the engine - late threshold > T). In other words, late data within the threshold will be aggregated, but data later than the threshold will start getting dropped (see later in the section for the exact guarantees). Let’s understand this with an example. We can easily define watermarking on the previous example using withWatermark() as shown below.
但是,要运行此查询数天,系统必须限制它累积的中间内存状态的数量。这意味着系统需要知道何时可以从内存中状态删除旧的聚合,因为应用程序将不再接收该聚合的延迟数据。为了实现这一点,在Spark 2.1中,我们引入了水印,它让引擎自动跟踪数据中的当前事件时间,并尝试相应地清理旧状态。您可以通过指定事件时间列和数据在事件时间方面的预期延迟阈值来定义查询的水印。对于在时间 T 结束的特定窗口,引擎将保持状态并允许后期数据更新状态,直到 (max event time seen by the engine - late threshold > T) 。换句话说,在阈值内的延迟数据将被聚合,但晚于阈值的数据将开始被丢弃(有关确切的保证,请参阅本节后面的内容)。 让我们通过一个例子来理解这一点。我们可以很容易地定义水印上一个例子使用 withWatermark() 如下所示。

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        window($"timestamp", "10 minutes", "5 minutes"),
        $"word")
    .count()
Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        window(col("timestamp"), "10 minutes", "5 minutes"),
        col("word"))
    .count();
words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        window(words.timestamp, "10 minutes", "5 minutes"),
        words.word) \
    .count()
words <- ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group

words <- withWatermark(words, "timestamp", "10 minutes")
windowedCounts <- count(
                    groupBy(
                      words,
                      window(words$timestamp, "10 minutes", "5 minutes"),
                      words$word))

In this example, we are defining the watermark of the query on the value of the column “timestamp”, and also defining “10 minutes” as the threshold of how late is the data allowed to be. If this query is run in Update output mode (discussed later in Output Modes section), the engine will keep updating counts of a window in the Result Table until the window is older than the watermark, which lags behind the current event time in column “timestamp” by 10 minutes. Here is an illustration.
在这个例子中,我们在列"timestamp"的值上定义了查询的水印,并且还定义了"10分钟"作为数据允许延迟的阈值。如果此查询在更新输出模式下运行(稍后在输出模式部分中讨论),则引擎将不断更新结果表中窗口的计数,直到窗口比水印更旧,这比列"时间戳"中的当前事件时间滞后10分钟。这里有一个例子。

Watermarking in Update Mode

As shown in the illustration, the maximum event time tracked by the engine is the blue dashed line, and the watermark set as (max event time - '10 mins') at the beginning of every trigger is the red line. For example, when the engine observes the data (12:14, dog), it sets the watermark for the next trigger as 12:04. This watermark lets the engine maintain intermediate state for additional 10 minutes to allow late data to be counted. For example, the data (12:09, cat) is out of order and late, and it falls in windows 12:00 - 12:10 and 12:05 - 12:15. Since, it is still ahead of the watermark 12:04 in the trigger, the engine still maintains the intermediate counts as state and correctly updates the counts of the related windows. However, when the watermark is updated to 12:11, the intermediate state for window (12:00 - 12:10) is cleared, and all subsequent data (e.g. (12:04, donkey)) is considered “too late” and therefore ignored. Note that after every trigger, the updated counts (i.e. purple rows) are written to sink as the trigger output, as dictated by the Update mode.
如图所示,引擎跟踪的最大事件时间为蓝色虚线,每个触发开始时设置为 (max event time - '10 mins') 的水印为红线。例如,当引擎观察到数据 (12:14, dog) 时,它将下一个触发器的水印设置为 12:04 。此水印允许引擎将中间状态再保持10分钟,以便对延迟数据进行计数。例如,数据 (12:09, cat) 是无序和迟到的,它福尔斯在窗口 12:00 - 12:1012:05 - 12:15 。由于它仍然在触发器中的水印 12:04 之前,因此引擎仍然将中间计数保持为状态,并正确地更新相关窗口的计数。然而,当水印被更新为 12:11 时,窗口 (12:00 - 12:10) 的中间状态被清除,并且所有后续数据(例如 (12:04, donkey) )被认为“太迟”并且因此被忽略。请注意,在每个触发器之后,更新的计数(即紫色行)将作为触发器输出写入sink,如Update模式所指示的。

Some sinks (e.g. files) may not supported fine-grained updates that Update Mode requires. To work with them, we have also support Append Mode, where only the final counts are written to sink. This is illustrated below.
某些接收器(例如文件)可能不支持更新模式所需的细粒度更新。为了使用它们,我们还支持Append Mode,其中只有最终计数被写入sink。下文对此进行了说明。

Note that using withWatermark on a non-streaming Dataset is no-op. As the watermark should not affect any batch query in any way, we will ignore it directly.
请注意,在非流数据集上使用 withWatermark 是无操作的。由于水印不应该以任何方式影响任何批查询,我们将直接忽略它。

Watermarking in Append Mode

Similar to the Update Mode earlier, the engine maintains intermediate counts for each window. However, the partial counts are not updated to the Result Table and not written to sink. The engine waits for “10 mins” for late date to be counted, then drops intermediate state of a window < watermark, and appends the final counts to the Result Table/sink. For example, the final counts of window 12:00 - 12:10 is appended to the Result Table only after the watermark is updated to 12:11.
与前面的更新模式类似,引擎为每个窗口维护中间计数。但是,部分计数不会更新到结果表,也不会写入接收器。引擎等待"10分钟"以计算最后日期,然后丢弃窗口<水印的中间状态,并将最终计数附加到结果表/接收器。例如,只有在水印更新为 12:11 之后,窗口 12:00 - 12:10 的最终计数才被附加到结果表。

Conditions for watermarking to clean aggregation state
水印清除聚集状态的条件

It is important to note that the following conditions must be satisfied for the watermarking to clean the state in aggregation queries (as of Spark 2.1.1, subject to change in the future).
需要注意的是,水印必须满足以下条件才能在聚合查询中清除状态(从Spark 2.1.1开始,未来可能会更改)。

Semantic Guarantees of Aggregation with Watermarking
基于水印的聚合算法的语义保证

Join Operations 连接操作

Structured Streaming supports joining a streaming Dataset/DataFrame with a static Dataset/DataFrame as well as another streaming Dataset/DataFrame. The result of the streaming join is generated incrementally, similar to the results of streaming aggregations in the previous section. In this section we will explore what type of joins (i.e. inner, outer, semi, etc.) are supported in the above cases. Note that in all the supported join types, the result of the join with a streaming Dataset/DataFrame will be the exactly the same as if it was with a static Dataset/DataFrame containing the same data in the stream.
结构化流支持将流数据集/数据帧与静态数据集/数据帧以及另一个流数据集/数据帧连接。流式连接的结果是增量生成的,类似于上一节中的流式聚合的结果。在本节中,我们将探讨什么类型的连接(即内部,外部,半等)。在上述案例中得到支持。请注意,在所有支持的连接类型中,与流数据集/数据帧的连接结果将与包含流中相同数据的静态数据集/数据帧完全相同。

Stream-static Joins 流静态连接

Since the introduction in Spark 2.0, Structured Streaming has supported joins (inner join and some type of outer joins) between a streaming and a static DataFrame/Dataset. Here is a simple example.
自从Spark 2.0引入以来,结构化流已经支持流和静态DataFrame/Dataset之间的连接(内部连接和某种类型的外部连接)。这里有一个简单的例子。

val staticDf = spark.read. ...
val streamingDf = spark.readStream. ...

streamingDf.join(staticDf, "type")          // inner equi-join with a static DF
streamingDf.join(staticDf, "type", "left_outer")  // left outer join with a static DF
Dataset<Row> staticDf = spark.read(). ...;
Dataset<Row> streamingDf = spark.readStream(). ...;
streamingDf.join(staticDf, "type");         // inner equi-join with a static DF
streamingDf.join(staticDf, "type", "left_outer");  // left outer join with a static DF
staticDf = spark.read. ...
streamingDf = spark.readStream. ...
streamingDf.join(staticDf, "type")  # inner equi-join with a static DF
streamingDf.join(staticDf, "type", "left_outer")  # left outer join with a static DF
staticDf <- read.df(...)
streamingDf <- read.stream(...)
joined <- merge(streamingDf, staticDf, sort = FALSE)  # inner equi-join with a static DF
joined <- join(
            streamingDf,
            staticDf,
            streamingDf$value == staticDf$value,
            "left_outer")  # left outer join with a static DF

Note that stream-static joins are not stateful, so no state management is necessary. However, a few types of stream-static outer joins are not yet supported. These are listed at the end of this Join section.
请注意,流静态连接不是有状态的,因此不需要状态管理。但是,有几种类型的流静态外部联接尚不受支持。这些列在此连接部分的末尾。

Stream-stream Joins 流-流连接

In Spark 2.3, we have added support for stream-stream joins, that is, you can join two streaming Datasets/DataFrames. The challenge of generating join results between two data streams is that, at any point of time, the view of the dataset is incomplete for both sides of the join making it much harder to find matches between inputs. Any row received from one input stream can match with any future, yet-to-be-received row from the other input stream. Hence, for both the input streams, we buffer past input as streaming state, so that we can match every future input with past input and accordingly generate joined results. Furthermore, similar to streaming aggregations, we automatically handle late, out-of-order data and can limit the state using watermarks. Let’s discuss the different types of supported stream-stream joins and how to use them.
在Spark 2.3中,我们增加了对流—流连接的支持,也就是说,你可以连接两个流数据集/数据帧。在两个数据流之间生成连接结果的挑战在于,在任何时间点,数据集的视图对于连接的两端都是不完整的,这使得在输入之间找到匹配变得更加困难。从一个输入流接收到的任何行都可以与来自另一个输入流的任何未来的、尚未接收到的行相匹配。因此,对于两个输入流,我们将过去的输入缓冲为流式状态,以便我们可以将每个未来输入与过去的输入相匹配,并相应地生成连接结果。此外,与流式聚合类似,我们自动处理延迟的无序数据,并可以使用水印限制状态。让我们讨论支持的不同类型的流—流连接以及如何使用它们。

Inner Joins with optional Watermarking
带可选水印的内联接

Inner joins on any kind of columns along with any kind of join conditions are supported. However, as the stream runs, the size of streaming state will keep growing indefinitely as all past input must be saved as any new input can match with any input from the past. To avoid unbounded state, you have to define additional join conditions such that indefinitely old inputs cannot match with future inputs and therefore can be cleared from the state. In other words, you will have to do the following additional steps in the join.
支持任何类型的柱沿着上的内部连接以及任何类型的连接条件。然而,随着流的运行,流状态的大小将无限期地增长,因为所有过去的输入都必须被保存,因为任何新的输入都可以与过去的任何输入相匹配。为了避免无界状态,必须定义额外的连接条件,使得无限旧的输入不能与未来的输入匹配,因此可以从状态中清除。换句话说,你必须在连接中执行以下附加步骤。

  1. Define watermark delays on both inputs such that the engine knows how delayed the input can be (similar to streaming aggregations)
    在两个输入上定义水印延迟,以便引擎知道输入的延迟程度(类似于流聚合)

  2. Define a constraint on event-time across the two inputs such that the engine can figure out when old rows of one input is not going to be required (i.e. will not satisfy the time constraint) for matches with the other input. This constraint can be defined in one of the two ways.
    在两个输入上定义一个事件时间约束,这样引擎就可以确定何时不需要一个输入的旧行(即不满足时间约束)来与另一个输入进行匹配。这个约束可以用两种方式中的一种来定义。

    1. Time range join conditions (e.g. ...JOIN ON leftTime BETWEEN rightTime AND rightTime + INTERVAL 1 HOUR),
      时间范围连接条件(例如 ...JOIN ON leftTime BETWEEN rightTime AND rightTime + INTERVAL 1 HOUR ),

    2. Join on event-time windows (e.g. ...JOIN ON leftTimeWindow = rightTimeWindow).
      在事件时间窗口上加入(例如 ...JOIN ON leftTimeWindow = rightTimeWindow )。

Let’s understand this with an example.
让我们通过一个例子来理解这一点。

Let’s say we want to join a stream of advertisement impressions (when an ad was shown) with another stream of user clicks on advertisements to correlate when impressions led to monetizable clicks. To allow the state cleanup in this stream-stream join, you will have to specify the watermarking delays and the time constraints as follows.
假设我们想将一个广告印象流(当广告显示时)与另一个用户点击广告的流结合起来,以关联印象何时导致可货币化的点击。为了允许在这个流-流连接中进行状态清理,您必须指定水印延迟和时间约束,如下所示。

  1. Watermark delays: Say, the impressions and the corresponding clicks can be late/out-of-order in event-time by at most 2 and 3 hours, respectively.
    水印延迟:也就是说,展示和相应的点击在事件时间上最多分别延迟/乱序2小时和3小时。

  2. Event-time range condition: Say, a click can occur within a time range of 0 seconds to 1 hour after the corresponding impression.
    事件-时间范围条件:例如,点击可以在相应印象之后的0秒到1小时的时间范围内发生。

The code would look like this.
代码看起来像这样。

import org.apache.spark.sql.functions.expr

val impressions = spark.readStream. ...
val clicks = spark.readStream. ...

// Apply watermarks on event-time columns
val impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
val clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")

// Join with event-time constraints
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """)
)
import static org.apache.spark.sql.functions.expr

Dataset<Row> impressions = spark.readStream(). ...
Dataset<Row> clicks = spark.readStream(). ...

// Apply watermarks on event-time columns
Dataset<Row> impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours");
Dataset<Row> clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours");

// Join with event-time constraints
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr(
    "clickAdId = impressionAdId AND " +
    "clickTime >= impressionTime AND " +
    "clickTime <= impressionTime + interval 1 hour ")
);
from pyspark.sql.functions import expr

impressions = spark.readStream. ...
clicks = spark.readStream. ...

# Apply watermarks on event-time columns
impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")

# Join with event-time constraints
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """)
)
impressions <- read.stream(...)
clicks <- read.stream(...)

# Apply watermarks on event-time columns
impressionsWithWatermark <- withWatermark(impressions, "impressionTime", "2 hours")
clicksWithWatermark <- withWatermark(clicks, "clickTime", "3 hours")

# Join with event-time constraints
joined <- join(
  impressionsWithWatermark,
  clicksWithWatermark,
  expr(
    paste(
      "clickAdId = impressionAdId AND",
      "clickTime >= impressionTime AND",
      "clickTime <= impressionTime + interval 1 hour"
)))
Semantic Guarantees of Stream-stream Inner Joins with Watermarking
带水印的流-流内连接的语义保证

This is similar to the guarantees provided by watermarking on aggregations. A watermark delay of “2 hours” guarantees that the engine will never drop any data that is less than 2 hours delayed. But data delayed by more than 2 hours may or may not get processed.
这类似于在聚合上加水印所提供的保证。“2小时”的水印延迟保证引擎永远不会丢弃任何延迟小于2小时的数据。但延迟超过2小时的数据可能会或可能不会得到处理。

Outer Joins with Watermarking
带水印的外部联接

While the watermark + event-time constraints is optional for inner joins, for outer joins they must be specified. This is because for generating the NULL results in outer join, the engine must know when an input row is not going to match with anything in future. Hence, the watermark + event-time constraints must be specified for generating correct results. Therefore, a query with outer-join will look quite like the ad-monetization example earlier, except that there will be an additional parameter specifying it to be an outer-join.
虽然水印+事件时间约束对于内部连接是可选的,但对于外部连接,必须指定它们。这是因为为了在外部连接中生成NULL结果,引擎必须知道输入行将来何时不会与任何内容匹配。因此,必须指定水印+事件时间约束以生成正确的结果。因此,带有outer-join的查询看起来与前面的广告货币化示例非常相似,只是有一个额外的参数将其指定为outer-join。

impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
 )
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr(
    "clickAdId = impressionAdId AND " +
    "clickTime >= impressionTime AND " +
    "clickTime <= impressionTime + interval 1 hour "),
  "leftOuter"                 // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
);
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined <- join(
  impressionsWithWatermark,
  clicksWithWatermark,
  expr(
    paste(
      "clickAdId = impressionAdId AND",
      "clickTime >= impressionTime AND",
      "clickTime <= impressionTime + interval 1 hour"),
  "left_outer"                 # can be "inner", "left_outer", "right_outer", "full_outer", "left_semi"
))
Semantic Guarantees of Stream-stream Outer Joins with Watermarking
带水印的流-流外连接的语义保证

Outer joins have the same guarantees as inner joins regarding watermark delays and whether data will be dropped or not.
关于水印延迟以及数据是否会被丢弃,外部连接与内部连接具有相同的保证。

Caveats 警告

There are a few important characteristics to note regarding how the outer results are generated.
关于外部结果是如何生成的,有几个重要的特征需要注意。

Semi Joins with Watermarking
带水印的半联接

A semi join returns values from the left side of the relation that has a match with the right. It is also referred to as a left semi join. Similar to outer joins, watermark + event-time constraints must be specified for semi join. This is to evict unmatched input rows on left side, the engine must know when an input row on left side is not going to match with anything on right side in future.
半联接返回来自与右侧匹配的关系左侧的值。它也被称为左半联接。与外部联接类似,必须为半联接指定水印+事件时间约束。这是为了驱逐左侧不匹配的输入行,引擎必须知道左侧的输入行何时不会与右侧的任何内容匹配。

Semantic Guarantees of Stream-stream Semi Joins with Watermarking
带水印的流—流半连接的语义保证

Semi joins have the same guarantees as inner joins regarding watermark delays and whether data will be dropped or not.
关于水印延迟和数据是否会被丢弃,半连接具有与内连接相同的保证。

Support matrix for joins in streaming queries
流查询中连接的支持矩阵
Left Input 左输入 Right Input 右输入 Join Type 联接类型
Static Static All types 所有类型 Supported, since its not on streaming data even though it can be present in a streaming query
支持,因为它不在流数据上,即使它可以存在于流查询中
Stream Static Inner Supported, not stateful 受支持,无状态
Left Outer 左外 Supported, not stateful 受支持,无状态
Right Outer 右外 Not supported 不支持
Full Outer 完全外部 Not supported 不支持
Left Semi 左半 Supported, not stateful 受支持,无状态
Static Stream Inner Supported, not stateful 受支持,无状态
Left Outer 左外 Not supported 不支持
Right Outer 右外 Supported, not stateful 受支持,无状态
Full Outer 完全外部 Not supported 不支持
Left Semi 左半 Not supported 不支持
Stream Stream Inner Supported, optionally specify watermark on both sides + time constraints for state cleanup
支持,可选择在两侧指定水印+状态清除的时间限制
Left Outer 左外 Conditionally supported, must specify watermark on right + time constraints for correct results, optionally specify watermark on left for all state cleanup
支持连接,必须在右侧指定水印+时间限制以获得正确的结果,可选地在左侧指定水印以进行所有状态清理
Right Outer 右外 Conditionally supported, must specify watermark on left + time constraints for correct results, optionally specify watermark on right for all state cleanup
支持连接,必须在左侧指定水印+时间限制以获得正确的结果,可选地在右侧指定水印以进行所有状态清理
Full Outer 完全外部 Conditionally supported, must specify watermark on one side + time constraints for correct results, optionally specify watermark on the other side for all state cleanup
支持连接,必须在一侧指定水印+时间限制以获得正确的结果,可选地在另一侧指定水印以进行所有状态清理
Left Semi 左半 Conditionally supported, must specify watermark on right + time constraints for correct results, optionally specify watermark on left for all state cleanup
支持连接,必须在右侧指定水印+时间限制以获得正确的结果,可选地在左侧指定水印以进行所有状态清理

Additional details on supported joins:
有关支持的联接的其他详细信息:

Streaming Deduplication 流媒体

You can deduplicate records in data streams using a unique identifier in the events. This is exactly same as deduplication on static using a unique identifier column. The query will store the necessary amount of data from previous records such that it can filter duplicate records. Similar to aggregations, you can use deduplication with or without watermarking.
您可以使用事件中的唯一标识符消除数据流中的重复记录。这与使用唯一标识符列的静态重复数据消除完全相同。查询将存储来自以前记录的必要数据量,以便可以过滤重复记录。与聚合类似,您可以使用带或不带水印的重复数据消除。

val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// Without watermark using guid column
streamingDf.dropDuplicates("guid")

// With watermark using guid and eventTime columns
streamingDf
  .withWatermark("eventTime", "10 seconds")
  .dropDuplicates("guid", "eventTime")
Dataset<Row> streamingDf = spark.readStream(). ...;  // columns: guid, eventTime, ...

// Without watermark using guid column
streamingDf.dropDuplicates("guid");

// With watermark using guid and eventTime columns
streamingDf
  .withWatermark("eventTime", "10 seconds")
  .dropDuplicates("guid", "eventTime");
streamingDf = spark.readStream. ...

# Without watermark using guid column
streamingDf.dropDuplicates("guid")

# With watermark using guid and eventTime columns
streamingDf \
  .withWatermark("eventTime", "10 seconds") \
  .dropDuplicates("guid", "eventTime")
streamingDf <- read.stream(...)

# Without watermark using guid column
streamingDf <- dropDuplicates(streamingDf, "guid")

# With watermark using guid and eventTime columns
streamingDf <- withWatermark(streamingDf, "eventTime", "10 seconds")
streamingDf <- dropDuplicates(streamingDf, "guid", "eventTime")

Policy for handling multiple watermarks
处理多个水印的策略

A streaming query can have multiple input streams that are unioned or joined together. Each of the input streams can have a different threshold of late data that needs to be tolerated for stateful operations. You specify these thresholds using withWatermarks("eventTime", delay) on each of the input streams. For example, consider a query with stream-stream joins between inputStream1 and inputStream2.
流式查询可以具有联合或联接在一起的多个输入流。每个输入流可以具有不同的延迟数据阈值,该延迟数据阈值需要被容忍用于有状态操作。您可以在每个输入流上使用 withWatermarks("eventTime", delay) 指定这些阈值。例如,考虑在 inputStream1inputStream2 之间具有流-流连接的查询。

inputStream1.withWatermark("eventTime1", "1 hour")
  .join(
    inputStream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)

While executing the query, Structured Streaming individually tracks the maximum event time seen in each input stream, calculates watermarks based on the corresponding delay, and chooses a single global watermark with them to be used for stateful operations. By default, the minimum is chosen as the global watermark because it ensures that no data is accidentally dropped as too late if one of the streams falls behind the others (for example, one of the streams stops receiving data due to upstream failures). In other words, the global watermark will safely move at the pace of the slowest stream and the query output will be delayed accordingly.
在执行查询时,结构化流单独跟踪每个输入流中出现的最大事件时间,基于相应的延迟计算水印,并选择单个全局水印用于有状态操作。默认情况下,选择最小值作为全局水印,因为它可以确保在其中一个流福尔斯落后于其他流(例如,其中一个流由于上游故障而停止接收数据)时,不会因太晚而意外丢弃数据。换句话说,全局水印将以最慢流的速度安全地移动,并且查询输出将相应地延迟。

However, in some cases, you may want to get faster results even if it means dropping data from the slowest stream. Since Spark 2.4, you can set the multiple watermark policy to choose the maximum value as the global watermark by setting the SQL configuration spark.sql.streaming.multipleWatermarkPolicy to max (default is min). This lets the global watermark move at the pace of the fastest stream. However, as a side effect, data from the slower streams will be aggressively dropped. Hence, use this configuration judiciously.
但是,在某些情况下,您可能希望获得更快的结果,即使这意味着从最慢的流中删除数据。从Spark 2.4开始,您可以通过将SQL配置 spark.sql.streaming.multipleWatermarkPolicy 设置为 max (默认为 min )来设置多水印策略以选择最大值作为全局水印。这使得全局水印以最快流的速度移动。然而,作为副作用,来自较慢流的数据将被积极丢弃。因此,请明智地使用此配置。

Arbitrary Stateful Operations
任意状态操作

Many usecases require more advanced stateful operations than aggregations. For example, in many usecases, you have to track sessions from data streams of events. For doing such sessionization, you will have to save arbitrary types of data as state, and perform arbitrary operations on the state using the data stream events in every trigger. Since Spark 2.2, this can be done using the operation mapGroupsWithState and the more powerful operation flatMapGroupsWithState. Both operations allow you to apply user-defined code on grouped Datasets to update user-defined state. For more concrete details, take a look at the API documentation (Scala/Java) and the examples (Scala/Java).
许多用例需要比聚合更高级的有状态操作。例如,在许多用例中,您必须从事件的数据流中跟踪会话。为了进行这种会话化,您必须将任意类型的数据保存保存为状态,并使用每个触发器中的数据流事件对状态执行任意操作。从Spark 2.2开始,这可以使用操作 mapGroupsWithState 和更强大的操作 flatMapGroupsWithState 来完成。这两个操作都允许您在分组的数据集上应用用户定义的代码来更新用户定义的状态。有关更多具体细节,请查看API文档(Scala/ Java)和示例(Scala/ Java)。

Though Spark cannot check and force it, the state function should be implemented with respect to the semantics of the output mode. For example, in Update mode Spark doesn’t expect that the state function will emit rows which are older than current watermark plus allowed late record delay, whereas in Append mode the state function can emit these rows.
虽然Spark不能检查和强制它,但应该根据输出模式的语义来实现状态函数。例如,在更新模式下,Spark不期望状态函数发出比当前水印加上允许的后期记录延迟更早的行,而在追加模式下,状态函数可以发出这些行。

Unsupported Operations 不支持的操作

There are a few DataFrame/Dataset operations that are not supported with streaming DataFrames/Datasets. Some of them are as follows.
有几个DataFrame/Dataset操作不支持流式DataFrame/Datasets。其中一些如下。

In addition, there are some Dataset methods that will not work on streaming Datasets. They are actions that will immediately run queries and return results, which does not make sense on a streaming Dataset. Rather, those functionalities can be done by explicitly starting a streaming query (see the next section regarding that).
此外,还有一些Dataset方法无法在流式数据集上工作。它们是将立即运行查询并返回结果的操作,这在流数据集上没有意义。相反,这些功能可以通过显式启动流式查询来完成(请参阅下一节)。

If you try any of these operations, you will see an AnalysisException like “operation XYZ is not supported with streaming DataFrames/Datasets”. While some of them may be supported in future releases of Spark, there are others which are fundamentally hard to implement on streaming data efficiently. For example, sorting on the input stream is not supported, as it requires keeping track of all the data received in the stream. This is therefore fundamentally hard to execute efficiently.
如果您尝试这些操作中的任何一个,您将看到一个类似于“操作XYZ不支持流式数据帧/数据集”的 AnalysisException 。虽然其中一些可能会在Spark的未来版本中得到支持,但还有一些从根本上很难有效地在流数据上实现。例如,不支持对输入流进行排序,因为它需要跟踪流中接收到的所有数据。因此,这从根本上难以有效地执行。

Limitation of global watermark
全局水印的局限性

In Append mode, if a stateful operation emits rows older than current watermark plus allowed late record delay, they will be “late rows” in downstream stateful operations (as Spark uses global watermark). Note that these rows may be discarded. This is a limitation of a global watermark, and it could potentially cause a correctness issue.
在Append模式下,如果有状态操作发出的行早于当前水印加上允许的延迟记录延迟,则它们将在下游有状态操作中成为“延迟行”(因为Spark使用全局水印)。请注意,这些行可能会被丢弃。这是全局水印的一个限制,并且它可能潜在地导致正确性问题。

Spark will check the logical plan of query and log a warning when Spark detects such a pattern.
Spark将检查查询的逻辑计划,并在Spark检测到这种模式时记录警告。

Any of the stateful operation(s) after any of below stateful operations can have this issue:
以下任何有状态操作之后的任何有状态操作都可能存在此问题:

As Spark cannot check the state function of mapGroupsWithState/flatMapGroupsWithState, Spark assumes that the state function emits late rows if the operator uses Append mode.
由于Spark无法检查 mapGroupsWithState / flatMapGroupsWithState 的状态函数,因此Spark假设如果操作符使用Append模式,则状态函数会发出最后的行。

Spark provides two ways to check the number of late rows on stateful operators which would help you identify the issue:
Spark提供了两种方法来检查有状态操作符上的延迟行数,这将有助于您识别问题:

  1. On Spark UI: check the metrics in stateful operator nodes in query execution details page in SQL tab
    在Spark UI上:在SQL选项卡的查询执行详细信息页面中检查有状态操作符节点中的指标
  2. On Streaming Query Listener: check “numRowsDroppedByWatermark” in “stateOperators” in QueryProcessEvent.
    在流式查询中:检查QueryProcessEvent中“stateOperators”中的“numRowsDroppedByWatermark”。

Please note that “numRowsDroppedByWatermark” represents the number of “dropped” rows by watermark, which is not always same as the count of “late input rows” for the operator. It depends on the implementation of the operator - e.g. streaming aggregation does pre-aggregate input rows and checks the late inputs against pre-aggregated inputs, hence the number is not same as the number of original input rows. You’d like to just check the fact whether the value is zero or non-zero.
请注意,“numRowsDroppedByWatermark”表示水印“丢弃”的行数,这并不总是与运算符的“后期输入行”计数相同。这取决于操作符的实现-例如,流式聚合会预先聚合输入行,并将后期输入与预先聚合的输入进行检查,因此数量与原始输入行的数量不同。你只需要检查这个值是零还是非零。

There’s a known workaround: split your streaming query into multiple queries per stateful operator, and ensure end-to-end exactly once per query. Ensuring end-to-end exactly once for the last query is optional.
有一个已知的解决方法:将流查询拆分为每个有状态操作符的多个查询,并确保每个查询只进行一次端到端的查询。确保对最后一个查询只执行一次端到端操作是可选的。

Starting Streaming Queries
开始流媒体播放

Once you have defined the final result DataFrame/Dataset, all that is left is for you to start the streaming computation. To do that, you have to use the DataStreamWriter (Scala/Java/Python docs) returned through Dataset.writeStream(). You will have to specify one or more of the following in this interface.
一旦定义了最终结果DataFrame/Dataset,剩下的就是开始流式计算了。要做到这一点,您必须使用通过 Dataset.writeStream() 返回的 DataStreamWriter (Scala/ Java/ Python文档)。您必须在此接口中指定以下一项或多项。

Output Modes 输出模式

There are a few types of output modes.
有几种类型的输出模式。

Different types of streaming queries support different output modes. Here is the compatibility matrix.
不同类型的流式查询支持不同的输出模式。这是兼容性矩阵。

Query Type 查询类型 Supported Output Modes 支持的输出模式 Notes
Queries with aggregation
聚集性
Aggregation on event-time with watermark
带水印的事件时间聚合
Append, Update, Complete
追加、更新、完成
Append mode uses watermark to drop old aggregation state. But the output of a windowed aggregation is delayed the late threshold specified in withWatermark() as by the modes semantics, rows can be added to the Result Table only once after they are finalized (i.e. after watermark is crossed). See the Late Data section for more details.
附加模式使用水印删除旧的聚合状态。但是,窗口化聚合的输出延迟了 withWatermark() 中指定的延迟阈值,因为模式语义,行只能在完成后(即,在越过水印后)添加到结果表中一次。有关详细信息,请参阅“最新数据”部分。


Update mode uses watermark to drop old aggregation state.
更新模式使用水印删除旧的聚合状态。


Complete mode does not drop old aggregation state since by definition this mode preserves all data in the Result Table.
完成模式不会删除旧的聚合状态,因为根据定义,此模式将保留结果表中的所有数据。
Other aggregations 其他聚合物 Complete, Update 完成,更新 Since no watermark is defined (only defined in other category), old aggregation state is not dropped.
由于未定义水印(仅在其他类别中定义),因此不会删除旧的聚合状态。


Append mode is not supported as aggregates can update thus violating the semantics of this mode.
不支持追加模式,因为聚合可以更新,从而违反此模式的语义。
Queries with mapGroupsWithState mapGroupsWithState Update Aggregations not allowed in a query with mapGroupsWithState.
在带有 mapGroupsWithState 的查询中不允许聚合。
Queries with flatMapGroupsWithState flatMapGroupsWithState Append operation mode 追加操作模式 Append Aggregations are allowed after flatMapGroupsWithState.
flatMapGroupsWithState 后允许聚合。
Update operation mode 更新操作模式 Update Aggregations not allowed in a query with flatMapGroupsWithState.
在带有 flatMapGroupsWithState 的查询中不允许聚合。
Queries with joins joins Append Update and Complete mode not supported yet. See the support matrix in the Join Operations section for more details on what types of joins are supported.
尚不支持更新和完成模式。有关支持哪些类型的联接的详细信息,请参阅联接操作部分中的支持列表。
Other queries 其他查询 Append, Update 追加、更新 Complete mode not supported as it is infeasible to keep all unaggregated data in the Result Table.
不支持完整模式,因为在结果表中保留所有未聚合的数据是不可行的。

Output Sinks 输出接收器

There are a few types of built-in output sinks.
有几种类型的内置输出接收器。

writeStream
    .format("parquet")        // can be "orc", "json", "csv", etc.
    .option("path", "path/to/destination/dir")
    .start()
writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("topic", "updates")
    .start()
writeStream
    .foreach(...)
    .start()
writeStream
    .format("console")
    .start()
writeStream
    .format("memory")
    .queryName("tableName")
    .start()

Some sinks are not fault-tolerant because they do not guarantee persistence of the output and are meant for debugging purposes only. See the earlier section on fault-tolerance semantics. Here are the details of all the sinks in Spark.
有些接收器是不容错的,因为它们不保证输出的持久性,并且仅用于调试目的。请参阅前面关于容错语义的部分。以下是Spark中所有水槽的详细信息。

Sink Supported Output Modes 支持的输出模式 Options Fault-tolerant Notes
File Sink 文件接收器 Append path: path to the output directory, must be specified.
path :输出目录的路径,必须指定。

retention: time to live (TTL) for output files. Output files which batches were committed older than TTL will be eventually excluded in metadata log. This means reader queries which read the sink's output directory may not process them. You can provide the value as string format of the time. (like "12h", "7d", etc.) By default it's disabled.
retention :输出文件的生存时间(TTL)。在TTL之前提交的批处理的输出文件最终将被排除在元数据日志中。这意味着读取接收器的输出目录的读取器查询可能不会处理它们。您可以将该值提供为时间的字符串格式。(like"12h"、"7d"等)默认情况下它是禁用的。


For file-format-specific options, see the related methods in DataFrameWriter (Scala/Java/Python/R). E.g. for "parquet" format options see DataFrameWriter.parquet()
有关文件格式特定的选项,请参阅DataFrameWriter(Scala/Java/Python/R)中的相关方法。例如,有关"镶木地板"格式选项,请参见 DataFrameWriter.parquet()
Yes (exactly-once) 是(仅一次) Supports writes to partitioned tables. Partitioning by time may be useful.
支持写入分区表。按时间划分可能是有用的。
Kafka Sink Kafka水槽 Append, Update, Complete
追加、更新、完成
See the Kafka Integration Guide
请参阅Kafka集成指南
Yes (at-least-once) 是(至少一次) More details in the Kafka Integration Guide
更多详细信息,请参阅Kafka集成指南
Foreach Sink Foreach水槽 Append, Update, Complete
追加、更新、完成
None Yes (at-least-once) 是(至少一次) More details in the next section
下一节中的更多细节
ForeachBatch Sink ForeachBatch接收器 Append, Update, Complete
追加、更新、完成
None Depends on the implementation
取决于实现
More details in the next section
下一节中的更多细节
Console Sink 控制台水槽 Append, Update, Complete
追加、更新、完成
numRows: Number of rows to print every trigger (default: 20)
numRows :每个触发器打印的行数(默认值:20)

truncate: Whether to truncate the output if too long (default: true)
truncate :如果输出太长,是否截断输出(默认值:true)
No
Memory Sink 记忆槽 Append, Complete 追加,完成 None No. But in Complete Mode, restarted query will recreate the full table.
号但是在完全模式下,重新启动的查询将重新创建整个表。
Table name is the query name.
表名称是查询名称。

Note that you have to call start() to actually start the execution of the query. This returns a StreamingQuery object which is a handle to the continuously running execution. You can use this object to manage the query, which we will discuss in the next subsection. For now, let’s understand all this with a few examples.
请注意,您必须调用 start() 才能真正开始执行查询。这将返回一个StreamingQuery对象,它是持续运行的执行的句柄。您可以使用此对象来管理查询,我们将在下一小节中讨论。现在,让我们通过几个例子来理解这一切。

// ========== DF with no aggregations ==========
val noAggDF = deviceDataDf.select("device").where("signal > 10")   

// Print new data to console
noAggDF
  .writeStream
  .format("console")
  .start()

// Write new data to Parquet files
noAggDF
  .writeStream
  .format("parquet")
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .option("path", "path/to/destination/dir")
  .start()

// ========== DF with aggregation ==========
val aggDF = df.groupBy("device").count()

// Print updated aggregations to console
aggDF
  .writeStream
  .outputMode("complete")
  .format("console")
  .start()

// Have all the aggregates in an in-memory table
aggDF
  .writeStream
  .queryName("aggregates")    // this query name will be the table name
  .outputMode("complete")
  .format("memory")
  .start()

spark.sql("select * from aggregates").show()   // interactively query in-memory table
// ========== DF with no aggregations ==========
Dataset<Row> noAggDF = deviceDataDf.select("device").where("signal > 10");

// Print new data to console
noAggDF
  .writeStream()
  .format("console")
  .start();

// Write new data to Parquet files
noAggDF
  .writeStream()
  .format("parquet")
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .option("path", "path/to/destination/dir")
  .start();

// ========== DF with aggregation ==========
Dataset<Row> aggDF = df.groupBy("device").count();

// Print updated aggregations to console
aggDF
  .writeStream()
  .outputMode("complete")
  .format("console")
  .start();

// Have all the aggregates in an in-memory table
aggDF
  .writeStream()
  .queryName("aggregates")    // this query name will be the table name
  .outputMode("complete")
  .format("memory")
  .start();

spark.sql("select * from aggregates").show();   // interactively query in-memory table
# ========== DF with no aggregations ==========
noAggDF = deviceDataDf.select("device").where("signal > 10")   

# Print new data to console
noAggDF \
    .writeStream \
    .format("console") \
    .start()

# Write new data to Parquet files
noAggDF \
    .writeStream \
    .format("parquet") \
    .option("checkpointLocation", "path/to/checkpoint/dir") \
    .option("path", "path/to/destination/dir") \
    .start()

# ========== DF with aggregation ==========
aggDF = df.groupBy("device").count()

# Print updated aggregations to console
aggDF \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

# Have all the aggregates in an in-memory table. The query name will be the table name
aggDF \
    .writeStream \
    .queryName("aggregates") \
    .outputMode("complete") \
    .format("memory") \
    .start()

spark.sql("select * from aggregates").show()   # interactively query in-memory table
# ========== DF with no aggregations ==========
noAggDF <- select(where(deviceDataDf, "signal > 10"), "device")

# Print new data to console
write.stream(noAggDF, "console")

# Write new data to Parquet files
write.stream(noAggDF,
             "parquet",
             path = "path/to/destination/dir",
             checkpointLocation = "path/to/checkpoint/dir")

# ========== DF with aggregation ==========
aggDF <- count(groupBy(df, "device"))

# Print updated aggregations to console
write.stream(aggDF, "console", outputMode = "complete")

# Have all the aggregates in an in memory table. The query name will be the table name
write.stream(aggDF, "memory", queryName = "aggregates", outputMode = "complete")

# Interactively query in-memory table
head(sql("select * from aggregates"))
Using Foreach and ForeachBatch
使用Foreach和ForeachBatch

The foreach and foreachBatch operations allow you to apply arbitrary operations and writing logic on the output of a streaming query. They have slightly different use cases - while foreach allows custom write logic on every row, foreachBatch allows arbitrary operations and custom logic on the output of each micro-batch. Let’s understand their usages in more detail.
foreachforeachBatch 操作允许您在流查询的输出上应用任意操作和编写逻辑。它们的用例略有不同-而 foreach 允许在每行上自定义写入逻辑, foreachBatch 允许在每个微批处理的输出上进行任意操作和自定义逻辑。让我们更详细地了解它们的用法。

ForeachBatch

foreachBatch(...) allows you to specify a function that is executed on the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch.
foreachBatch(...) 允许您指定一个函数,该函数将对流式查询的每个微批处理的输出数据执行。从Spark 2.4开始,Scala、Java和Python都支持这个功能。它有两个参数:一个DataFrame或Dataset,它包含微批处理的输出数据和微批处理的唯一ID。

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  // Transform and write batchDF 
}.start()
streamingDatasetOfString.writeStream().foreachBatch(
  new VoidFunction2<Dataset<String>, Long>() {
    public void call(Dataset<String> dataset, Long batchId) {
      // Transform and write batchDF
    }    
  }
).start();
def foreach_batch_function(df, epoch_id):
    # Transform and write batchDF
    pass
  
streamingDF.writeStream.foreachBatch(foreach_batch_function).start()   

R is not yet supported.

With foreachBatch, you can do the following.
使用 foreachBatch ,您可以执行以下操作。

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.persist()
  batchDF.write.format(...).save(...)  // location 1
  batchDF.write.format(...).save(...)  // location 2
  batchDF.unpersist()
}

Note: 注意事项:

Foreach

If foreachBatch is not an option (for example, corresponding batch data writer does not exist, or continuous processing mode), then you can express your custom writer logic using foreach. Specifically, you can express the data writing logic by dividing it into three methods: open, process, and close. Since Spark 2.4, foreach is available in Scala, Java and Python.
如果 foreachBatch 不是一个选项(例如,对应的批处理数据编写器不存在,或者连续处理模式),那么您可以使用 foreach 来表达您的自定义编写器逻辑。具体来说,可以将数据写入逻辑分为三个方法: openprocessclose 。从Spark 2.4开始, foreach 在Scala,Java和Python中可用。

In Scala, you have to extend the class ForeachWriter (docs).

streamingDatasetOfString.writeStream.foreach(
  new ForeachWriter[String] {

    def open(partitionId: Long, version: Long): Boolean = {
      // Open connection
    }

    def process(record: String): Unit = {
      // Write string to connection
    }

    def close(errorOrNull: Throwable): Unit = {
      // Close the connection
    }
  }
).start()

In Java, you have to extend the class ForeachWriter (docs).
在Java中,你必须扩展类 ForeachWriter (docs)。

streamingDatasetOfString.writeStream().foreach(
  new ForeachWriter<String>() {

    @Override public boolean open(long partitionId, long version) {
      // Open connection
    }

    @Override public void process(String record) {
      // Write string to connection
    }

    @Override public void close(Throwable errorOrNull) {
      // Close the connection
    }
  }
).start();

In Python, you can invoke foreach in two ways: in a function or in an object. The function offers a simple way to express your processing logic but does not allow you to deduplicate generated data when failures cause reprocessing of some input data. For that situation you must specify the processing logic in an object.

  • First, the function takes a row as input.
def process_row(row):
    # Write row to storage
    pass

query = streamingDF.writeStream.foreach(process_row).start()  
  • Second, the object has a process method and optional open and close methods:
class ForeachWriter:
    def open(self, partition_id, epoch_id):
        # Open connection. This method is optional in Python.
        pass

    def process(self, row):
        # Write row to connection. This method is NOT optional in Python.
        pass

    def close(self, error):
        # Close the connection. This method in optional in Python.
        pass
      
query = streamingDF.writeStream.foreach(ForeachWriter()).start()

R is not yet supported.

Execution semantics When the streaming query is started, Spark calls the function or the object’s methods in the following way:
执行语义当流查询启动时,Spark以以下方式调用函数或对象的方法:

Streaming Table APIs 流表API

Since Spark 3.1, you can also use DataStreamReader.table() to read tables as streaming DataFrames and use DataStreamWriter.toTable() to write streaming DataFrames as tables:
从Spark 3.1开始,您还可以使用 DataStreamReader.table() 将表读取为流式数据帧,并使用 DataStreamWriter.toTable() 将流式数据帧写入表:

val spark: SparkSession = ...

// Create a streaming DataFrame
val df = spark.readStream
  .format("rate")
  .option("rowsPerSecond", 10)
  .load()

// Write the streaming DataFrame to a table
df.writeStream
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .toTable("myTable")

// Check the table result
spark.read.table("myTable").show()

// Transform the source dataset and write to a new table
spark.readStream
  .table("myTable")
  .select("value")
  .writeStream
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .format("parquet")
  .toTable("newTable")

// Check the new table result
spark.read.table("newTable").show()
SparkSession spark = ...

// Create a streaming DataFrame
Dataset<Row> df = spark.readStream()
  .format("rate")
  .option("rowsPerSecond", 10)
  .load();

// Write the streaming DataFrame to a table
df.writeStream()
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .toTable("myTable");

// Check the table result
spark.read().table("myTable").show();

// Transform the source dataset and write to a new table
spark.readStream()
  .table("myTable")
  .select("value")
  .writeStream()
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .format("parquet")
  .toTable("newTable");

// Check the new table result
spark.read().table("newTable").show();
spark = ...  # spark session

# Create a streaming DataFrame
df = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 10) \
    .load()

# Write the streaming DataFrame to a table
df.writeStream \
    .option("checkpointLocation", "path/to/checkpoint/dir") \
    .toTable("myTable")

# Check the table result
spark.read.table("myTable").show()

# Transform the source dataset and write to a new table
spark.readStream \
    .table("myTable") \
    .select("value") \
    .writeStream \
    .option("checkpointLocation", "path/to/checkpoint/dir") \
    .format("parquet") \
    .toTable("newTable")

# Check the new table result
spark.read.table("newTable").show()

Not available in R.

For more details, please check the docs for DataStreamReader (Scala/Java/Python docs) and DataStreamWriter (Scala/Java/Python docs).
有关详细信息,请查看DataStreamReader(Scala/ Java/ Python文档)和DataStreamWriter(Scala/ Java/ Python文档)的文档。

Triggers 触发

The trigger settings of a streaming query define the timing of streaming data processing, whether the query is going to be executed as micro-batch query with a fixed batch interval or as a continuous processing query. Here are the different kinds of triggers that are supported.
流式查询的触发器设置定义流式数据处理的定时,该查询将作为具有固定批处理间隔的微批处理查询还是作为连续处理查询来执行。下面是支持的不同类型的触发器。

Trigger Type 触发类型 Description
unspecified (default) 未指定(默认) If no trigger setting is explicitly specified, then by default, the query will be executed in micro-batch mode, where micro-batches will be generated as soon as the previous micro-batch has completed processing.
如果未显式指定触发器设置,则默认情况下,查询将在微批处理模式下执行,其中,一旦前一个微批处理完成处理,就会生成微批处理。
Fixed interval micro-batches
固定间隔微量批次
The query will be executed with micro-batches mode, where micro-batches will be kicked off at the user-specified intervals.
该查询将以微批处理模式执行,其中微批处理将以用户指定的时间间隔启动。
  • If the previous micro-batch completes within the interval, then the engine will wait until the interval is over before kicking off the next micro-batch.
    如果前一个微批处理在间隔内完成,则引擎将等待直到间隔结束,然后才开始下一个微批处理。
  • If the previous micro-batch takes longer than the interval to complete (i.e. if an interval boundary is missed), then the next micro-batch will start as soon as the previous one completes (i.e., it will not wait for the next interval boundary).
    如果前一个微批花费比间隔更长的时间来完成(即,如果间隔边界被错过),则下一个微批将在前一个微批完成后立即开始(即,它将不等待下一个间隔边界)。
  • If no new data is available, then no micro-batch will be kicked off.
    如果没有新数据可用,则不会启动微批处理。
One-time micro-batch 一次性微量批 The query will execute only one micro-batch to process all the available data and then stop on its own. This is useful in scenarios you want to periodically spin up a cluster, process everything that is available since the last period, and then shutdown the cluster. In some case, this may lead to significant cost savings.
查询将只执行一个微批处理来处理所有可用数据,然后自行停止。这在您希望定期启动群集、处理自上次周期以来可用的所有内容然后关闭群集的情况下非常有用。在某些情况下,这可能导致显著成本节约。
Continuous with fixed checkpoint interval
连续,检查点间隔固定

(experimental) (实验)
The query will be executed in the new low-latency, continuous processing mode. Read more about this in the Continuous Processing section below.
查询将以新的低延迟、连续处理模式执行。在下面的连续处理部分中了解更多信息。

Here are a few code examples.
下面是一些代码示例。

import org.apache.spark.sql.streaming.Trigger

// Default trigger (runs micro-batch as soon as it can)
df.writeStream
  .format("console")
  .start()

// ProcessingTime trigger with two-seconds micro-batch interval
df.writeStream
  .format("console")
  .trigger(Trigger.ProcessingTime("2 seconds"))
  .start()

// One-time trigger
df.writeStream
  .format("console")
  .trigger(Trigger.Once())
  .start()

// Continuous trigger with one-second checkpointing interval
df.writeStream
  .format("console")
  .trigger(Trigger.Continuous("1 second"))
  .start()
import org.apache.spark.sql.streaming.Trigger

// Default trigger (runs micro-batch as soon as it can)
df.writeStream
  .format("console")
  .start();

// ProcessingTime trigger with two-seconds micro-batch interval
df.writeStream
  .format("console")
  .trigger(Trigger.ProcessingTime("2 seconds"))
  .start();

// One-time trigger
df.writeStream
  .format("console")
  .trigger(Trigger.Once())
  .start();

// Continuous trigger with one-second checkpointing interval
df.writeStream
  .format("console")
  .trigger(Trigger.Continuous("1 second"))
  .start();
# Default trigger (runs micro-batch as soon as it can)
df.writeStream \
  .format("console") \
  .start()

# ProcessingTime trigger with two-seconds micro-batch interval
df.writeStream \
  .format("console") \
  .trigger(processingTime='2 seconds') \
  .start()

# One-time trigger
df.writeStream \
  .format("console") \
  .trigger(once=True) \
  .start()

# Continuous trigger with one-second checkpointing interval
df.writeStream
  .format("console")
  .trigger(continuous='1 second')
  .start()
# Default trigger (runs micro-batch as soon as it can)
write.stream(df, "console")

# ProcessingTime trigger with two-seconds micro-batch interval
write.stream(df, "console", trigger.processingTime = "2 seconds")

# One-time trigger
write.stream(df, "console", trigger.once = TRUE)

# Continuous trigger is not yet supported

Managing Streaming Queries
管理流媒体服务

The StreamingQuery object created when a query is started can be used to monitor and manage the query.
查询启动时创建的 StreamingQuery 对象可用于监视和管理查询。

val query = df.writeStream.format("console").start()   // get the query object

query.id          // get the unique identifier of the running query that persists across restarts from checkpoint data

query.runId       // get the unique id of this run of the query, which will be generated at every start/restart

query.name        // get the name of the auto-generated or user-specified name

query.explain()   // print detailed explanations of the query

query.stop()      // stop the query

query.awaitTermination()   // block until query is terminated, with stop() or with error

query.exception       // the exception if the query has been terminated with error

query.recentProgress  // an array of the most recent progress updates for this query

query.lastProgress    // the most recent progress update of this streaming query
StreamingQuery query = df.writeStream().format("console").start();   // get the query object

query.id();          // get the unique identifier of the running query that persists across restarts from checkpoint data

query.runId();       // get the unique id of this run of the query, which will be generated at every start/restart

query.name();        // get the name of the auto-generated or user-specified name

query.explain();   // print detailed explanations of the query

query.stop();      // stop the query

query.awaitTermination();   // block until query is terminated, with stop() or with error

query.exception();       // the exception if the query has been terminated with error

query.recentProgress();  // an array of the most recent progress updates for this query

query.lastProgress();    // the most recent progress update of this streaming query
query = df.writeStream.format("console").start()   # get the query object

query.id()          # get the unique identifier of the running query that persists across restarts from checkpoint data

query.runId()       # get the unique id of this run of the query, which will be generated at every start/restart

query.name()        # get the name of the auto-generated or user-specified name

query.explain()   # print detailed explanations of the query

query.stop()      # stop the query

query.awaitTermination()   # block until query is terminated, with stop() or with error

query.exception()       # the exception if the query has been terminated with error

query.recentProgress()  # an array of the most recent progress updates for this query

query.lastProgress()    # the most recent progress update of this streaming query
query <- write.stream(df, "console")  # get the query object

queryName(query)          # get the name of the auto-generated or user-specified name

explain(query)            # print detailed explanations of the query

stopQuery(query)          # stop the query

awaitTermination(query)   # block until query is terminated, with stop() or with error

lastProgress(query)       # the most recent progress update of this streaming query

You can start any number of queries in a single SparkSession. They will all be running concurrently sharing the cluster resources. You can use sparkSession.streams() to get the StreamingQueryManager (Scala/Java/Python docs) that can be used to manage the currently active queries.
您可以在单个SparkSession中启动任意数量的查询。它们都将同时运行,共享群集资源。您可以使用 sparkSession.streams() 来获取 StreamingQueryManager (Scala/Java/Python文档),该文档可用于管理当前活动的查询。

val spark: SparkSession = ...

spark.streams.active    // get the list of currently active streaming queries

spark.streams.get(id)   // get a query object by its unique id

spark.streams.awaitAnyTermination()   // block until any one of them terminates
SparkSession spark = ...

spark.streams().active();    // get the list of currently active streaming queries

spark.streams().get(id);   // get a query object by its unique id

spark.streams().awaitAnyTermination();   // block until any one of them terminates
spark = ...  # spark session

spark.streams.active  # get the list of currently active streaming queries

spark.streams.get(id)  # get a query object by its unique id

spark.streams.awaitAnyTermination()  # block until any one of them terminates
Not available in R.

Monitoring Streaming Queries
监控流媒体

There are multiple ways to monitor active streaming queries. You can either push metrics to external systems using Spark’s Dropwizard Metrics support, or access them programmatically.
有多种方法可以监视活动的流查询。您可以使用Spark的Dropwizard支持将指标推送到外部系统,或者以编程方式访问它们。

Reading Metrics Interactively
互动式阅读

You can directly get the current status and metrics of an active query using streamingQuery.lastProgress() and streamingQuery.status(). lastProgress() returns a StreamingQueryProgress object in Scala and Java and a dictionary with the same fields in Python. It has all the information about the progress made in the last trigger of the stream - what data was processed, what were the processing rates, latencies, etc. There is also streamingQuery.recentProgress which returns an array of last few progresses.
您可以使用 streamingQuery.lastProgress()streamingQuery.status() 直接获取活动查询的当前状态和指标。 lastProgress() 在Scala和Java中返回一个 StreamingQueryProgress 对象,在Python中返回一个具有相同字段的字典。它有关于流的最后一个触发器中所做的进度的所有信息—处理了什么数据,处理速率是什么,延迟等等。还有 streamingQuery.recentProgress ,它返回最后几个进度的数组。

In addition, streamingQuery.status() returns a StreamingQueryStatus object in Scala and Java and a dictionary with the same fields in Python. It gives information about what the query is immediately doing - is a trigger active, is data being processed, etc.
此外, streamingQuery.status() 在Scala和Java中返回一个 StreamingQueryStatus 对象,在Python中返回一个具有相同字段的字典。它提供了有关查询正在立即执行的操作的信息-触发器是否处于活动状态,数据是否正在处理等。

Here are a few examples.
这里有几个例子。

val query: StreamingQuery = ...

println(query.lastProgress)

/* Will print something like the following.

{
  "id" : "ce011fdc-8762-4dcb-84eb-a77333e28109",
  "runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a",
  "name" : "MyQuery",
  "timestamp" : "2016-12-14T18:45:24.873Z",
  "numInputRows" : 10,
  "inputRowsPerSecond" : 120.0,
  "processedRowsPerSecond" : 200.0,
  "durationMs" : {
    "triggerExecution" : 3,
    "getOffset" : 2
  },
  "eventTime" : {
    "watermark" : "2016-12-14T18:45:24.873Z"
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[topic-0]]",
    "startOffset" : {
      "topic-0" : {
        "2" : 0,
        "4" : 1,
        "1" : 1,
        "3" : 1,
        "0" : 1
      }
    },
    "endOffset" : {
      "topic-0" : {
        "2" : 0,
        "4" : 115,
        "1" : 134,
        "3" : 21,
        "0" : 534
      }
    },
    "numInputRows" : 10,
    "inputRowsPerSecond" : 120.0,
    "processedRowsPerSecond" : 200.0
  } ],
  "sink" : {
    "description" : "MemorySink"
  }
}
*/


println(query.status)

/*  Will print something like the following.
{
  "message" : "Waiting for data to arrive",
  "isDataAvailable" : false,
  "isTriggerActive" : false
}
*/
StreamingQuery query = ...

System.out.println(query.lastProgress());
/* Will print something like the following.

{
  "id" : "ce011fdc-8762-4dcb-84eb-a77333e28109",
  "runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a",
  "name" : "MyQuery",
  "timestamp" : "2016-12-14T18:45:24.873Z",
  "numInputRows" : 10,
  "inputRowsPerSecond" : 120.0,
  "processedRowsPerSecond" : 200.0,
  "durationMs" : {
    "triggerExecution" : 3,
    "getOffset" : 2
  },
  "eventTime" : {
    "watermark" : "2016-12-14T18:45:24.873Z"
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[topic-0]]",
    "startOffset" : {
      "topic-0" : {
        "2" : 0,
        "4" : 1,
        "1" : 1,
        "3" : 1,
        "0" : 1
      }
    },
    "endOffset" : {
      "topic-0" : {
        "2" : 0,
        "4" : 115,
        "1" : 134,
        "3" : 21,
        "0" : 534
      }
    },
    "numInputRows" : 10,
    "inputRowsPerSecond" : 120.0,
    "processedRowsPerSecond" : 200.0
  } ],
  "sink" : {
    "description" : "MemorySink"
  }
}
*/


System.out.println(query.status());
/*  Will print something like the following.
{
  "message" : "Waiting for data to arrive",
  "isDataAvailable" : false,
  "isTriggerActive" : false
}
*/
query = ...  # a StreamingQuery
print(query.lastProgress)

'''
Will print something like the following.

{u'stateOperators': [], u'eventTime': {u'watermark': u'2016-12-14T18:45:24.873Z'}, u'name': u'MyQuery', u'timestamp': u'2016-12-14T18:45:24.873Z', u'processedRowsPerSecond': 200.0, u'inputRowsPerSecond': 120.0, u'numInputRows': 10, u'sources': [{u'description': u'KafkaSource[Subscribe[topic-0]]', u'endOffset': {u'topic-0': {u'1': 134, u'0': 534, u'3': 21, u'2': 0, u'4': 115}}, u'processedRowsPerSecond': 200.0, u'inputRowsPerSecond': 120.0, u'numInputRows': 10, u'startOffset': {u'topic-0': {u'1': 1, u'0': 1, u'3': 1, u'2': 0, u'4': 1}}}], u'durationMs': {u'getOffset': 2, u'triggerExecution': 3}, u'runId': u'88e2ff94-ede0-45a8-b687-6316fbef529a', u'id': u'ce011fdc-8762-4dcb-84eb-a77333e28109', u'sink': {u'description': u'MemorySink'}}
'''

print(query.status)
''' 
Will print something like the following.

{u'message': u'Waiting for data to arrive', u'isTriggerActive': False, u'isDataAvailable': False}
'''
query <- ...  # a StreamingQuery
lastProgress(query)

'''
Will print something like the following.

{
  "id" : "8c57e1ec-94b5-4c99-b100-f694162df0b9",
  "runId" : "ae505c5a-a64e-4896-8c28-c7cbaf926f16",
  "name" : null,
  "timestamp" : "2017-04-26T08:27:28.835Z",
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "getOffset" : 0,
    "triggerExecution" : 1
  },
  "stateOperators" : [ {
    "numRowsTotal" : 4,
    "numRowsUpdated" : 0
  } ],
  "sources" : [ {
    "description" : "TextSocketSource[host: localhost, port: 9999]",
    "startOffset" : 1,
    "endOffset" : 1,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@76b37531"
  }
}
'''

status(query)
'''
Will print something like the following.

{
  "message" : "Waiting for data to arrive",
  "isDataAvailable" : false,
  "isTriggerActive" : false
}
'''

Reporting Metrics programmatically using Asynchronous APIs
使用异步API以编程方式报告

You can also asynchronously monitor all queries associated with a SparkSession by attaching a StreamingQueryListener (Scala/Java docs). Once you attach your custom StreamingQueryListener object with sparkSession.streams.attachListener(), you will get callbacks when a query is started and stopped and when there is progress made in an active query. Here is an example,
您还可以通过附加 StreamingQueryListener (Scala/ Java文档)来异步监视与 SparkSession 关联的所有查询。一旦你用 sparkSession.streams.attachListener() 附加了你的自定义 StreamingQueryListener 对象,当一个查询开始和停止时,以及当一个活动的查询有进展时,你会得到回调。这里有一个例子,

val spark: SparkSession = ...

spark.streams.addListener(new StreamingQueryListener() {
    override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
        println("Query started: " + queryStarted.id)
    }
    override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
        println("Query terminated: " + queryTerminated.id)
    }
    override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
        println("Query made progress: " + queryProgress.progress)
    }
})
SparkSession spark = ...

spark.streams().addListener(new StreamingQueryListener() {
    @Override
    public void onQueryStarted(QueryStartedEvent queryStarted) {
        System.out.println("Query started: " + queryStarted.id());
    }
    @Override
    public void onQueryTerminated(QueryTerminatedEvent queryTerminated) {
        System.out.println("Query terminated: " + queryTerminated.id());
    }
    @Override
    public void onQueryProgress(QueryProgressEvent queryProgress) {
        System.out.println("Query made progress: " + queryProgress.progress());
    }
});
Not available in Python.
Not available in R.

Reporting Metrics using Dropwizard
使用Dropwizard报告故障

Spark supports reporting metrics using the Dropwizard Library. To enable metrics of Structured Streaming queries to be reported as well, you have to explicitly enable the configuration spark.sql.streaming.metricsEnabled in the SparkSession.
Spark支持使用Dropwizard库报告指标。要使结构化流查询的指标也能够报告,您必须在SparkSession中显式启用配置 spark.sql.streaming.metricsEnabled

spark.conf.set("spark.sql.streaming.metricsEnabled", "true")
// or
spark.sql("SET spark.sql.streaming.metricsEnabled=true")
spark.conf().set("spark.sql.streaming.metricsEnabled", "true");
// or
spark.sql("SET spark.sql.streaming.metricsEnabled=true");
spark.conf.set("spark.sql.streaming.metricsEnabled", "true")
# or
spark.sql("SET spark.sql.streaming.metricsEnabled=true")
sql("SET spark.sql.streaming.metricsEnabled=true")

All queries started in the SparkSession after this configuration has been enabled will report metrics through Dropwizard to whatever sinks have been configured (e.g. Ganglia, Graphite, JMX, etc.).
启用此配置后,在SparkSession中启动的所有查询都将通过Dropwizard向已配置的任何接收器(例如Ganglia,Graphite,JMX等)报告指标。

Recovering from Failures with Checkpointing
使用检查点从故障中恢复

In case of a failure or intentional shutdown, you can recover the previous progress and state of a previous query, and continue where it left off. This is done using checkpointing and write-ahead logs. You can configure a query with a checkpoint location, and the query will save all the progress information (i.e. range of offsets processed in each trigger) and the running aggregates (e.g. word counts in the quick example) to the checkpoint location. This checkpoint location has to be a path in an HDFS compatible file system, and can be set as an option in the DataStreamWriter when starting a query.
如果出现故障或故意关闭,您可以恢复上一个查询的先前进度和状态,并从中断处继续。这是使用检查点和写前日志完成的。您可以使用检查点位置配置查询,查询将保存所有进度信息(即每个触发器中处理的偏移量范围)和运行聚合(例如快速示例中的字数)到检查点位置。此检查点位置必须是HDFS兼容文件系统中的路径,并且可以在启动查询时在DataStreamWriter中设置为选项。

aggDF
  .writeStream
  .outputMode("complete")
  .option("checkpointLocation", "path/to/HDFS/dir")
  .format("memory")
  .start()
aggDF
  .writeStream()
  .outputMode("complete")
  .option("checkpointLocation", "path/to/HDFS/dir")
  .format("memory")
  .start();
aggDF \
    .writeStream \
    .outputMode("complete") \
    .option("checkpointLocation", "path/to/HDFS/dir") \
    .format("memory") \
    .start()
write.stream(aggDF, "memory", outputMode = "complete", checkpointLocation = "path/to/HDFS/dir")

Recovery Semantics after Changes in a Streaming Query
流查询中更改后的恢复语义

There are limitations on what changes in a streaming query are allowed between restarts from the same checkpoint location. Here are a few kinds of changes that are either not allowed, or the effect of the change is not well-defined. For all of them:
在从同一检查点位置重新启动之间,允许对流式查询进行哪些更改存在限制。下面是一些不允许的更改,或者更改的效果没有很好地定义。对于所有这些:

Types of changes 类型的更改

Continuous Processing 连续处理

[Experimental] 【实验】

Continuous processing is a new, experimental streaming execution mode introduced in Spark 2.3 that enables low (~1 ms) end-to-end latency with at-least-once fault-tolerance guarantees. Compare this with the default micro-batch processing engine which can achieve exactly-once guarantees but achieve latencies of ~100ms at best. For some types of queries (discussed below), you can choose which mode to execute them in without modifying the application logic (i.e. without changing the DataFrame/Dataset operations).
连续处理是Spark 2.3中引入的一种新的实验性流执行模式,它可以实现低(~1 ms)的端到端延迟,并保证至少一次容错。与此相比,默认的微批处理引擎可以实现精确的一次保证,但最多只能实现~ 100毫秒的延迟。对于某些类型的查询(下面将讨论),您可以选择在不修改应用程序逻辑(即不更改DataFrame/Dataset操作)的情况下执行它们的模式。

To run a supported query in continuous processing mode, all you need to do is specify a continuous trigger with the desired checkpoint interval as a parameter. For example,
要在连续处理模式下运行受支持的查询,只需指定一个连续触发器,并将所需的检查点间隔作为参数。比如说,

import org.apache.spark.sql.streaming.Trigger

spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .trigger(Trigger.Continuous("1 second"))  // only change in query
  .start()
import org.apache.spark.sql.streaming.Trigger;

spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .trigger(Trigger.Continuous("1 second"))  // only change in query
  .start();
spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load() \
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "topic1") \
  .trigger(continuous="1 second") \     # only change in query
  .start()

A checkpoint interval of 1 second means that the continuous processing engine will record the progress of the query every second. The resulting checkpoints are in a format compatible with the micro-batch engine, hence any query can be restarted with any trigger. For example, a supported query started with the micro-batch mode can be restarted in continuous mode, and vice versa. Note that any time you switch to continuous mode, you will get at-least-once fault-tolerance guarantees.
1秒的检查点间隔意味着连续处理引擎将每秒记录查询的进度。生成的检查点采用与微批处理引擎兼容的格式,因此可以使用任何触发器重新启动任何查询。例如,以微批处理模式启动的受支持查询可以以连续模式重新启动,反之亦然。请注意,无论何时切换到连续模式,都将获得至少一次容错保证。

Supported Queries 支持的服务器

As of Spark 2.4, only the following type of queries are supported in the continuous processing mode.
从Spark 2.4开始,在连续处理模式下只支持以下类型的查询。

See Input Sources and Output Sinks sections for more details on them. While the console sink is good for testing, the end-to-end low-latency processing can be best observed with Kafka as the source and sink, as this allows the engine to process the data and make the results available in the output topic within milliseconds of the input data being available in the input topic.
有关它们的更多详细信息,请参见输入源和输出接收器部分。虽然控制台接收器适合测试,但端到端的低延迟处理可以通过Kafka作为源和接收器来最好地观察,因为这允许引擎处理数据并在输入数据在输入主题中可用的毫秒内在输出主题中提供结果。

Caveats 警告

Additional Information 附加信息

Notes 注意到

Further Reading 进一步阅读

Talks 会谈