这是用户在 2024-5-13 24:01 为 https://spark.apache.org/docs/3.1.2/structured-streaming-programming-guide.html#overview 保存的双语快照页面,由 沉浸式翻译 提供双语支持。了解如何保存?

Structured Streaming Programming Guide
结构化流编程指南

Overview 概述

Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive. You can use the Dataset/DataFrame API in Scala, Java, Python or R to express streaming aggregations, event-time windows, stream-to-batch joins, etc. The computation is executed on the same optimized Spark SQL engine. Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write-Ahead Logs. In short, Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.
Structured Streaming是一个基于Spark SQL引擎的可扩展和容错流处理引擎。您可以像在静态数据上表示批处理计算一样来表示流计算。Spark SQL引擎将负责增量和连续地运行它,并在流数据继续到达时更新最终结果。您可以使用Scala、Java、Python或R中的Dataset/DataFrame API来表达流式聚合、事件时间窗口、流式到批量连接等。计算在相同的优化Spark SQL引擎上执行。最后,该系统通过检查点和预写操作确保了端到端的精确一次容错保证。简而言之,结构化流提供了快速、可扩展、容错、端到端的一次流处理,而用户不必考虑流。

Internally, by default, Structured Streaming queries are processed using a micro-batch processing engine, which processes data streams as a series of small batch jobs thereby achieving end-to-end latencies as low as 100 milliseconds and exactly-once fault-tolerance guarantees. However, since Spark 2.3, we have introduced a new low-latency processing mode called Continuous Processing, which can achieve end-to-end latencies as low as 1 millisecond with at-least-once guarantees. Without changing the Dataset/DataFrame operations in your queries, you will be able to choose the mode based on your application requirements.
在内部,默认情况下,结构化流查询使用微批处理引擎进行处理,该引擎将数据流作为一系列小批处理作业进行处理,从而实现低至100毫秒的端到端延迟和一次容错保证。然而,从Spark 2.3开始,我们引入了一种新的低延迟处理模式,称为连续处理,它可以实现端到端延迟低至1毫秒,并保证至少一次。无需更改查询中的Dataset/DataFrame操作,您就可以根据应用程序的需求选择模式。

In this guide, we are going to walk you through the programming model and the APIs. We are going to explain the concepts mostly using the default micro-batch processing model, and then later discuss Continuous Processing model. First, let’s start with a simple example of a Structured Streaming query - a streaming word count.
在本指南中,我们将向您介绍编程模型和API。我们将主要使用默认的微批处理模型来解释这些概念,然后再讨论连续处理模型。首先,让我们从结构化流查询的一个简单示例开始—流字数统计。

Quick Example 简单的例子

Let’s say you want to maintain a running word count of text data received from a data server listening on a TCP socket. Let’s see how you can express this using Structured Streaming. You can see the full code in Scala/Java/Python/R. And if you download Spark, you can directly run the example. In any case, let’s walk through the example step-by-step and understand how it works. First, we have to import the necessary classes and create a local SparkSession, the starting point of all functionalities related to Spark.
假设您希望维护从侦听TCP套接字的数据服务器接收的文本数据的运行字数。让我们看看如何使用结构化流来表达这一点。你可以在Scala/Java/Python/R中看到完整的代码。如果你下载了Spark,你可以直接运行这个例子。无论如何,让我们一步一步地看一下这个例子,了解它是如何工作的。首先,我们必须导入必要的类并创建一个本地SparkSession,这是与Spark相关的所有功能的起点。

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder
  .appName("StructuredNetworkWordCount")
  .getOrCreate()
  
import spark.implicits._
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;

import java.util.Arrays;
import java.util.Iterator;

SparkSession spark = SparkSession
  .builder()
  .appName("JavaStructuredNetworkWordCount")
  .getOrCreate();
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()
sparkR.session(appName = "StructuredNetworkWordCount")

Next, let’s create a streaming DataFrame that represents text data received from a server listening on localhost:9999, and transform the DataFrame to calculate word counts.
接下来,让我们创建一个流DataFrame,它表示从监听localhost:9999的服务器接收的文本数据,并转换DataFrame以计算字数。

// Create DataFrame representing the stream of input lines from connection to localhost:9999
val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

// Split the lines into words
val words = lines.as[String].flatMap(_.split(" "))

// Generate running word count
val wordCounts = words.groupBy("value").count()

This lines DataFrame represents an unbounded table containing the streaming text data. This table contains one column of strings named “value”, and each line in the streaming text data becomes a row in the table. Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it. Next, we have converted the DataFrame to a Dataset of String using .as[String], so that we can apply the flatMap operation to split each line into multiple words. The resultant words Dataset contains all the words. Finally, we have defined the wordCounts DataFrame by grouping by the unique values in the Dataset and counting them. Note that this is a streaming DataFrame which represents the running word counts of the stream.
lines DataFrame表示包含流式文本数据的无边界表。此表包含一列名为“value”的字符串,流文本数据中的每一行都成为表中的一行。请注意,由于我们刚刚设置转换,因此它当前没有接收任何数据,并且尚未启动它。接下来,我们使用 .as[String] 将DataFrame转换为String的Dataset,以便我们可以应用 flatMap 操作将每行拆分为多个单词。生成的 words 数据集包含所有单词。最后,我们定义了 wordCounts DataFrame,方法是按Dataset中的唯一值进行分组并对它们进行计数。请注意,这是一个流数据帧,它表示流的运行字数。

// Create DataFrame representing the stream of input lines from connection to localhost:9999
Dataset<Row> lines = spark
  .readStream()
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load();

// Split the lines into words
Dataset<String> words = lines
  .as(Encoders.STRING())
  .flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(), Encoders.STRING());

// Generate running word count
Dataset<Row> wordCounts = words.groupBy("value").count();

This lines DataFrame represents an unbounded table containing the streaming text data. This table contains one column of strings named “value”, and each line in the streaming text data becomes a row in the table. Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it. Next, we have converted the DataFrame to a Dataset of String using .as(Encoders.STRING()), so that we can apply the flatMap operation to split each line into multiple words. The resultant words Dataset contains all the words. Finally, we have defined the wordCounts DataFrame by grouping by the unique values in the Dataset and counting them. Note that this is a streaming DataFrame which represents the running word counts of the stream.
lines DataFrame表示包含流式文本数据的无边界表。此表包含一列名为“value”的字符串,流文本数据中的每一行都成为表中的一行。请注意,由于我们刚刚设置转换,因此它当前没有接收任何数据,并且尚未启动它。接下来,我们使用 .as(Encoders.STRING()) 将DataFrame转换为String的Dataset,以便我们可以应用 flatMap 操作将每行拆分为多个单词。生成的 words 数据集包含所有单词。最后,我们定义了 wordCounts DataFrame,方法是按Dataset中的唯一值进行分组并对它们进行计数。请注意,这是一个流数据帧,它表示流的运行字数。

# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

# Split the lines into words
words = lines.select(
   explode(
       split(lines.value, " ")
   ).alias("word")
)

# Generate running word count
wordCounts = words.groupBy("word").count()

This lines DataFrame represents an unbounded table containing the streaming text data. This table contains one column of strings named “value”, and each line in the streaming text data becomes a row in the table. Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it. Next, we have used two built-in SQL functions - split and explode, to split each line into multiple rows with a word each. In addition, we use the function alias to name the new column as “word”. Finally, we have defined the wordCounts DataFrame by grouping by the unique values in the Dataset and counting them. Note that this is a streaming DataFrame which represents the running word counts of the stream.

# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines <- read.stream("socket", host = "localhost", port = 9999)

# Split the lines into words
words <- selectExpr(lines, "explode(split(value, ' ')) as word")

# Generate running word count
wordCounts <- count(group_by(words, "word"))

This lines SparkDataFrame represents an unbounded table containing the streaming text data. This table contains one column of strings named “value”, and each line in the streaming text data becomes a row in the table. Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it. Next, we have a SQL expression with two SQL functions - split and explode, to split each line into multiple rows with a word each. In addition, we name the new column as “word”. Finally, we have defined the wordCounts SparkDataFrame by grouping by the unique values in the SparkDataFrame and counting them. Note that this is a streaming SparkDataFrame which represents the running word counts of the stream.

We have now set up the query on the streaming data. All that is left is to actually start receiving data and computing the counts. To do this, we set it up to print the complete set of counts (specified by outputMode("complete")) to the console every time they are updated. And then start the streaming computation using start().
我们现在已经设置了对流数据的查询。剩下的就是实际开始接收数据和计算计数。为此,我们将其设置为在每次更新时将完整的计数集(由 outputMode("complete") 指定)打印到控制台。然后使用 start() 开始流式计算。

// Start running the query that prints the running counts to the console
val query = wordCounts.writeStream
  .outputMode("complete")
  .format("console")
  .start()

query.awaitTermination()
// Start running the query that prints the running counts to the console
StreamingQuery query = wordCounts.writeStream()
  .outputMode("complete")
  .format("console")
  .start();

query.awaitTermination();
 # Start running the query that prints the running counts to the console
query = wordCounts \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query.awaitTermination()
# Start running the query that prints the running counts to the console
query <- write.stream(wordCounts, "console", outputMode = "complete")

awaitTermination(query)

After this code is executed, the streaming computation will have started in the background. The query object is a handle to that active streaming query, and we have decided to wait for the termination of the query using awaitTermination() to prevent the process from exiting while the query is active.
执行此代码后,流计算将在后台启动。 query 对象是该活动流查询的句柄,我们决定使用 awaitTermination() 等待查询终止,以防止进程在查询活动时退出。

To actually execute this example code, you can either compile the code in your own Spark application, or simply run the example once you have downloaded Spark. We are showing the latter. You will first need to run Netcat (a small utility found in most Unix-like systems) as a data server by using

$ nc -lk 9999

Then, in a different terminal, you can start the example by using
然后,在另一个终端中,您可以使用

$ ./bin/run-example org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount localhost 9999
$ ./bin/run-example org.apache.spark.examples.sql.streaming.JavaStructuredNetworkWordCount localhost 9999
$ ./bin/spark-submit examples/src/main/python/sql/streaming/structured_network_wordcount.py localhost 9999
$ ./bin/spark-submit examples/src/main/r/streaming/structured_network_wordcount.R localhost 9999

Then, any lines typed in the terminal running the netcat server will be counted and printed on screen every second. It will look something like the following.
然后,在运行netcat服务器的终端中键入的任何行将每秒计数并打印在屏幕上。它看起来像下面这样。

# TERMINAL 1:
# Running Netcat

$ nc -lk 9999
apache spark
apache hadoop



















...
# TERMINAL 2: RUNNING StructuredNetworkWordCount

$ ./bin/run-example org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount localhost 9999

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    1|
| spark|    1|
+------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    2|
| spark|    1|
|hadoop|    1|
+------+-----+
...
# TERMINAL 2: RUNNING JavaStructuredNetworkWordCount

$ ./bin/run-example org.apache.spark.examples.sql.streaming.JavaStructuredNetworkWordCount localhost 9999

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    1|
| spark|    1|
+------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    2|
| spark|    1|
|hadoop|    1|
+------+-----+
...
# TERMINAL 2: RUNNING structured_network_wordcount.py

$ ./bin/spark-submit examples/src/main/python/sql/streaming/structured_network_wordcount.py localhost 9999

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    1|
| spark|    1|
+------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    2|
| spark|    1|
|hadoop|    1|
+------+-----+
...
# TERMINAL 2: RUNNING structured_network_wordcount.R

$ ./bin/spark-submit examples/src/main/r/streaming/structured_network_wordcount.R localhost 9999

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    1|
| spark|    1|
+------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    2|
| spark|    1|
|hadoop|    1|
+------+-----+
...

Programming Model 编程模型

The key idea in Structured Streaming is to treat a live data stream as a table that is being continuously appended. This leads to a new stream processing model that is very similar to a batch processing model. You will express your streaming computation as standard batch-like query as on a static table, and Spark runs it as an incremental query on the unbounded input table. Let’s understand this model in more detail.
结构化流中的关键思想是将实时数据流视为不断追加的表。这导致了一个新的流处理模型,它非常类似于批处理模型。您将把流计算表示为静态表上的标准批处理查询,Spark将其作为无界输入表上的增量查询运行。让我们更详细地了解这个模型。

Basic Concepts 基本概念

Consider the input data stream as the “Input Table”. Every data item that is arriving on the stream is like a new row being appended to the Input Table.
将输入数据流视为“输入表”。到达流的每个数据项就像是附加到输入表的新行。

Stream as a Table

A query on the input will generate the “Result Table”. Every trigger interval (say, every 1 second), new rows get appended to the Input Table, which eventually updates the Result Table. Whenever the result table gets updated, we would want to write the changed result rows to an external sink.
对输入的查询将生成“结果表”。在每个触发间隔(比如说,每1秒),新的行被附加到输入表,这最终更新了结果表。每当结果表被更新时,我们都希望将更改后的结果行写入外部接收器。

Model

The “Output” is defined as what gets written out to the external storage. The output can be defined in a different mode:
“输出”被定义为写入外部存储器的内容。可以在不同的模式下定义输出:

Note that each mode is applicable on certain types of queries. This is discussed in detail later.
请注意,每种模式都适用于某些类型的查询。这将在后面详细讨论。

To illustrate the use of this model, let’s understand the model in context of the Quick Example above. The first lines DataFrame is the input table, and the final wordCounts DataFrame is the result table. Note that the query on streaming lines DataFrame to generate wordCounts is exactly the same as it would be a static DataFrame. However, when this query is started, Spark will continuously check for new data from the socket connection. If there is new data, Spark will run an “incremental” query that combines the previous running counts with the new data to compute updated counts, as shown below.
为了说明这个模型的使用,让我们在上面的快速示例的上下文中理解这个模型。第一个 lines DataFrame是输入表,最后一个 wordCounts DataFrame是结果表。请注意,在流 lines DataFrame上生成 wordCounts 的查询与静态DataFrame完全相同。但是,当这个查询启动时,Spark会不断地检查来自套接字连接的新数据。如果有新数据,Spark将运行一个“增量”查询,将以前的运行计数与新数据结合起来计算更新的计数,如下所示。

Model

Note that Structured Streaming does not materialize the entire table. It reads the latest available data from the streaming data source, processes it incrementally to update the result, and then discards the source data. It only keeps around the minimal intermediate state data as required to update the result (e.g. intermediate counts in the earlier example).
请注意,结构化流不会具体化整个表。它从流数据源中读取最新的可用数据,增量处理数据以更新结果,然后丢弃源数据。它只保留更新结果所需的最小中间状态数据(例如,前面示例中的中间计数)。

This model is significantly different from many other stream processing engines. Many streaming systems require the user to maintain running aggregations themselves, thus having to reason about fault-tolerance, and data consistency (at-least-once, or at-most-once, or exactly-once). In this model, Spark is responsible for updating the Result Table when there is new data, thus relieving the users from reasoning about it. As an example, let’s see how this model handles event-time based processing and late arriving data.
这个模型与许多其他流处理引擎有很大的不同。许多流媒体系统要求用户自己维护正在运行的聚合,因此必须考虑容错和数据一致性(至少一次,或最多一次,或正好一次)。在这个模型中,Spark负责在有新数据时更新结果表,从而减轻用户的推理。作为一个例子,让我们看看这个模型如何处理基于事件时间的处理和延迟到达的数据。

Handling Event-time and Late Data
处理事件时间和延迟数据

Event-time is the time embedded in the data itself. For many applications, you may want to operate on this event-time. For example, if you want to get the number of events generated by IoT devices every minute, then you probably want to use the time when the data was generated (that is, event-time in the data), rather than the time Spark receives them. This event-time is very naturally expressed in this model – each event from the devices is a row in the table, and event-time is a column value in the row. This allows window-based aggregations (e.g. number of events every minute) to be just a special type of grouping and aggregation on the event-time column – each time window is a group and each row can belong to multiple windows/groups. Therefore, such event-time-window-based aggregation queries can be defined consistently on both a static dataset (e.g. from collected device events logs) as well as on a data stream, making the life of the user much easier.
事件时间是嵌入数据本身的时间。对于许多应用程序,您可能希望在此事件时间上进行操作。例如,如果你想获得物联网设备每分钟生成的事件数量,那么你可能想使用数据生成的时间(即数据中的事件时间),而不是Spark接收它们的时间。这个事件时间在这个模型中非常自然地表示-来自设备的每个事件都是表中的一行,而事件时间是该行中的一个列值。这允许基于窗口的聚合(例如,每分钟的事件数量)只是事件-时间列上的特殊类型的分组和聚合-每个时间窗口是一个组,并且每行可以属于多个窗口/组。因此,可以在静态数据集(例如,来自收集的设备事件日志)以及数据流上一致地定义这种基于事件时间窗口的聚合查询,从而使用户的生活更加容易。

Furthermore, this model naturally handles data that has arrived later than expected based on its event-time. Since Spark is updating the Result Table, it has full control over updating old aggregates when there is late data, as well as cleaning up old aggregates to limit the size of intermediate state data. Since Spark 2.1, we have support for watermarking which allows the user to specify the threshold of late data, and allows the engine to accordingly clean up old state. These are explained later in more detail in the Window Operations section.
此外,该模型自然地处理基于其事件时间比预期晚到达的数据。由于Spark正在更新结果表,因此它可以完全控制在有延迟数据时更新旧的聚合,以及清理旧的聚合以限制中间状态数据的大小。从Spark 2.1开始,我们支持水印,允许用户指定延迟数据的阈值,并允许引擎相应地清理旧状态。这些将在稍后的窗口操作部分中详细解释。

Fault Tolerance Semantics
容错语义

Delivering end-to-end exactly-once semantics was one of key goals behind the design of Structured Streaming. To achieve that, we have designed the Structured Streaming sources, the sinks and the execution engine to reliably track the exact progress of the processing so that it can handle any kind of failure by restarting and/or reprocessing. Every streaming source is assumed to have offsets (similar to Kafka offsets, or Kinesis sequence numbers) to track the read position in the stream. The engine uses checkpointing and write-ahead logs to record the offset range of the data being processed in each trigger. The streaming sinks are designed to be idempotent for handling reprocessing. Together, using replayable sources and idempotent sinks, Structured Streaming can ensure end-to-end exactly-once semantics under any failure.
提供端到端的精确一次语义是结构化流设计背后的关键目标之一。为了实现这一目标,我们设计了结构化流源,接收器和执行引擎,以可靠地跟踪处理的确切进度,以便它可以通过重新启动和/或重新处理来处理任何类型的故障。假设每个流式源都有偏移量(类似于Kafka偏移量或Kinesis序列号)来跟踪流中的读取位置。引擎使用检查点和预写日志来记录每个触发器中正在处理的数据的偏移量范围。流式接收器被设计为对于处理再处理是幂等的。同时,使用可重放的源和幂等宿,结构化流可以确保在任何故障下的端到端精确一次语义。

API using Datasets and DataFrames
使用数据集和数据帧的API

Since Spark 2.0, DataFrames and Datasets can represent static, bounded data, as well as streaming, unbounded data. Similar to static Datasets/DataFrames, you can use the common entry point SparkSession (Scala/Java/Python/R docs) to create streaming DataFrames/Datasets from streaming sources, and apply the same operations on them as static DataFrames/Datasets. If you are not familiar with Datasets/DataFrames, you are strongly advised to familiarize yourself with them using the DataFrame/Dataset Programming Guide.
从Spark 2.0开始,DataFrames和Datasets可以表示静态的有界数据,也可以表示流式的无界数据。与静态数据集/数据帧类似,您可以使用公共入口点 SparkSession (Scala/Java/Python/R docs)从流源创建流数据帧/数据集,并对它们应用与静态数据帧/数据集相同的操作。如果您不熟悉数据集/数据框架,强烈建议您使用数据框架/数据集编程指南来熟悉它们。

Creating streaming DataFrames and streaming Datasets
创建流式数据帧和流式数据集

Streaming DataFrames can be created through the DataStreamReader interface (Scala/Java/Python docs) returned by SparkSession.readStream(). In R, with the read.stream() method. Similar to the read interface for creating static DataFrame, you can specify the details of the source – data format, schema, options, etc.
流数据帧可以通过 SparkSession.readStream() 返回的 DataStreamReader 接口(Scala/ Java/ Python文档)创建。在R中,使用 read.stream() 方法。与创建静态DataFrame的read接口类似,您可以指定源数据格式、模式、选项等细节。

Input Sources 输入源

There are a few built-in sources.
有几个内置的源。

Some sources are not fault-tolerant because they do not guarantee that data can be replayed using checkpointed offsets after a failure. See the earlier section on fault-tolerance semantics. Here are the details of all the sources in Spark.
有些数据源是不容错的,因为它们不能保证数据在发生故障后可以使用检查点偏移量重放。请参阅前面关于容错语义的部分。以下是Spark中所有源代码的详细信息。

Source Options Fault-tolerant Notes
File source 文件源 path: path to the input directory, and common to all file formats.
path :输入目录的路径,对所有文件格式通用。

maxFilesPerTrigger: maximum number of new files to be considered in every trigger (default: no max)
maxFilesPerTrigger :每次触发器中考虑的最大新文件数(默认值:无最大值)

latestFirst: whether to process the latest new files first, useful when there is a large backlog of files (default: false)
latestFirst :是否首先处理最新的新文件,在有大量积压文件时很有用(默认值:false)

fileNameOnly: whether to check new files based on only the filename instead of on the full path (default: false). With this set to `true`, the following files would be considered as the same file, because their filenames, "dataset.txt", are the same:
fileNameOnly :是否仅根据文件名而不是完整路径检查新文件(默认值:false)。如果将此设置为“true”,则以下文件将被视为同一文件,因为它们的文件名“soft.txt”是相同的:

"file:///dataset.txt"  “文件:/文件集. txt”
"s3://a/dataset.txt"  “s3://a/dataset.txt”
"s3n://a/b/dataset.txt"  “s3n://a/b/paget.txt”
"s3a://a/b/c/dataset.txt"
“s3a://a/b/c/soft.txt”

maxFileAge: Maximum age of a file that can be found in this directory, before it is ignored. For the first batch all files will be considered valid. If latestFirst is set to `true` and maxFilesPerTrigger is set, then this parameter will be ignored, because old files that are valid, and should be processed, may be ignored. The max age is specified with respect to the timestamp of the latest file, and not the timestamp of the current system.(default: 1 week)
maxFileAge :在此目录中可以找到的文件被忽略之前的最大年龄。对于第一批所有文件将被视为有效。如果 latestFirst 被设置为“true”,而 maxFilesPerTrigger 被设置,那么这个参数将被忽略,因为旧的文件是有效的,应该被处理,可能会被忽略。最大年龄是根据最新文件的时间戳而不是当前系统的时间戳指定的。(默认值:1周)

cleanSource: option to clean up completed files after processing.
cleanSource :选项来清理处理后完成的文件。

Available options are "archive", "delete", "off". If the option is not provided, the default value is "off".
可用选项有“存档”、“删除”、“关闭”。如果未提供该选项,则默认值为“off”。

When "archive" is provided, additional option sourceArchiveDir must be provided as well. The value of "sourceArchiveDir" must not match with source pattern in depth (the number of directories from the root directory), where the depth is minimum of depth on both paths. This will ensure archived files are never included as new source files.
当提供“archive”时,还必须提供附加选项 sourceArchiveDir 。“sourceArchiveDir”的值不能与源模式的深度(从根目录开始的目录数)匹配,其中深度是两个路径上的最小深度。这将确保归档文件永远不会作为新的源文件包括在内。

For example, suppose you provide '/hello?/spark/*' as source pattern, '/hello1/spark/archive/dir' cannot be used as the value of "sourceArchiveDir", as '/hello?/spark/*' and '/hello1/spark/archive' will be matched. '/hello1/spark' cannot be also used as the value of "sourceArchiveDir", as '/hello?/spark' and '/hello1/spark' will be matched. '/archived/here' would be OK as it doesn't match.
例如,假设您提供'/hello?/ spark/*“作为源模式,”/hello 1/spark/archive/dir“不能用作“sourceArchiveDir”的值,因为“/hello?/ spark/*'和'/hello 1/spark/archive'将匹配。“/hello 1/spark”不能也用作“sourceArchiveDir”的值,因为“/hello?/ spark'和'/hello 1/spark'将被匹配。“/archived/here”可以,因为它不匹配。

Spark will move source files respecting their own path. For example, if the path of source file is /a/b/dataset.txt and the path of archive directory is /archived/here, file will be moved to /archived/here/a/b/dataset.txt.
Spark会根据源文件的路径移动源文件。例如,如果源文件的路径是 /a/b/dataset.txt ,存档目录的路径是 /archived/here ,则文件将被移动到 /archived/here/a/b/dataset.txt

NOTE: Both archiving (via moving) or deleting completed files will introduce overhead (slow down, even if it's happening in separate thread) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enabling this option will reduce the cost to list source files which can be an expensive operation.
注意事项:归档(通过移动)或删除已完成的文件都会在每个微批处理中引入开销(即使是在单独的线程中也会减慢),因此在启用此选项之前,您需要了解文件系统中每个操作的成本。另一方面,启用此选项将减少列出源文件的成本,这可能是一个昂贵的操作。

Number of threads used in completed file cleaner can be configured withspark.sql.streaming.fileSource.cleaner.numThreads (default: 1).
完成的文件清理程序中使用的线程数可以使用 spark.sql.streaming.fileSource.cleaner.numThreads 配置(默认值:1)。

NOTE 2: The source path should not be used from multiple sources or queries when enabling this option. Similarly, you must ensure the source path doesn't match to any files in output directory of file stream sink.
注2:启用此选项时,不应使用来自多个源或查询的源路径。同样,您必须确保源路径与文件流接收器的输出目录中的任何文件都不匹配。

NOTE 3: Both delete and move actions are best effort. Failing to delete or move files will not fail the streaming query. Spark may not clean up some source files in some circumstances - e.g. the application doesn't shut down gracefully, too many files are queued to clean up.
注3:删除和移动操作都是尽力而为。未能删除或移动文件不会导致流式查询失败。在某些情况下,Spark可能无法清理某些源文件-例如,应用程序无法正常关闭,太多文件排队等待清理。


For file-format-specific options, see the related methods in DataStreamReader (Scala/Java/Python/R). E.g. for "parquet" format options see DataStreamReader.parquet().
有关文件格式特定的选项,请参阅 DataStreamReader (Scala/ Java/ Python/ R)中的相关方法。例如,有关“镶木地板”格式选项,请参见 DataStreamReader.parquet()


In addition, there are session configurations that affect certain file-formats. See the SQL Programming Guide for more details. E.g., for "parquet", see Parquet configuration section.
此外,还有一些会话配置会影响某些文件格式。有关详细信息,请参阅SQL编程指南。例如,在一个示例中,有关“镶木地板”,请参见镶木地板配置部分。
Yes Supports glob paths, but does not support multiple comma-separated paths/globs.
支持glob路径,但不支持多个逗号分隔的路径/glob。
Socket Source 套接字源 host: host to connect to, must be specified
host :要连接的主机,必须指定

port: port to connect to, must be specified
port :连接端口,必须指定
No
Rate Source 速率源 rowsPerSecond (e.g. 100, default: 1): How many rows should be generated per second.
rowsPerSecond (例如100,默认值:1):每秒应该生成多少行。


rampUpTime (e.g. 5s, default: 0s): How long to ramp up before the generating speed becomes rowsPerSecond. Using finer granularities than seconds will be truncated to integer seconds.
rampUpTime (例如:5s,默认值:0s):发电速度变为 rowsPerSecond 之前的斜升时间。使用比秒更细的粒度将被截断为整数秒。


numPartitions (e.g. 10, default: Spark's default parallelism): The partition number for the generated rows.
numPartitions (例如10,默认值:Spark的默认并行度):生成的行的分区号。


The source will try its best to reach rowsPerSecond, but the query may be resource constrained, and numPartitions can be tweaked to help reach the desired speed.
源将尽最大努力到达 rowsPerSecond ,但查询可能受到资源限制,并且可以调整 numPartitions 以帮助达到所需的速度。
Yes
Kafka Source Kafka来源 See the Kafka Integration Guide.
请参阅Kafka集成指南。
Yes

Here are some examples.
这里有一些例子。

val spark: SparkSession = ...

// Read text from socket
val socketDF = spark
  .readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

socketDF.isStreaming    // Returns True for DataFrames that have streaming sources

socketDF.printSchema

// Read all the csv files written atomically in a directory
val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark
  .readStream
  .option("sep", ";")
  .schema(userSchema)      // Specify schema of the csv files
  .csv("/path/to/directory")    // Equivalent to format("csv").load("/path/to/directory")
SparkSession spark = ...

// Read text from socket
Dataset<Row> socketDF = spark
  .readStream()
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load();

socketDF.isStreaming();    // Returns True for DataFrames that have streaming sources

socketDF.printSchema();

// Read all the csv files written atomically in a directory
StructType userSchema = new StructType().add("name", "string").add("age", "integer");
Dataset<Row> csvDF = spark
  .readStream()
  .option("sep", ";")
  .schema(userSchema)      // Specify schema of the csv files
  .csv("/path/to/directory");    // Equivalent to format("csv").load("/path/to/directory")
spark = SparkSession. ...

# Read text from socket
socketDF = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

socketDF.isStreaming()    # Returns True for DataFrames that have streaming sources

socketDF.printSchema()

# Read all the csv files written atomically in a directory
userSchema = StructType().add("name", "string").add("age", "integer")
csvDF = spark \
    .readStream \
    .option("sep", ";") \
    .schema(userSchema) \
    .csv("/path/to/directory")  # Equivalent to format("csv").load("/path/to/directory")
sparkR.session(...)

# Read text from socket
socketDF <- read.stream("socket", host = hostname, port = port)

isStreaming(socketDF)    # Returns TRUE for SparkDataFrames that have streaming sources

printSchema(socketDF)

# Read all the csv files written atomically in a directory
schema <- structType(structField("name", "string"),
                     structField("age", "integer"))
csvDF <- read.stream("csv", path = "/path/to/directory", schema = schema, sep = ";")

These examples generate streaming DataFrames that are untyped, meaning that the schema of the DataFrame is not checked at compile time, only checked at runtime when the query is submitted. Some operations like map, flatMap, etc. need the type to be known at compile time. To do those, you can convert these untyped streaming DataFrames to typed streaming Datasets using the same methods as static DataFrame. See the SQL Programming Guide for more details. Additionally, more details on the supported streaming sources are discussed later in the document.
这些示例生成无类型的流数据帧,这意味着在编译时不检查数据帧的架构,而只在提交查询时检查。一些操作,如 mapflatMap 等,需要在编译时知道类型。为此,您可以使用与静态DataFrame相同的方法将这些非类型化的流式DataFrame转换为类型化的流式Datasets。有关详细信息,请参阅SQL编程指南。此外,有关支持的流媒体源的更多细节将在本文档后面讨论。

Since Spark 3.1, you can also create streaming DataFrames from tables with DataStreamReader.table(). See Streaming Table APIs for more details.
从Spark 3.1开始,您还可以使用 DataStreamReader.table() 从表中创建流数据帧。有关详细信息,请参阅流式表API。

Schema inference and partition of streaming DataFrames/Datasets
流数据帧/数据集的模式推理和划分

By default, Structured Streaming from file based sources requires you to specify the schema, rather than rely on Spark to infer it automatically. This restriction ensures a consistent schema will be used for the streaming query, even in the case of failures. For ad-hoc use cases, you can reenable schema inference by setting spark.sql.streaming.schemaInference to true.
默认情况下,来自基于文件的源的结构化流需要您指定模式,而不是依赖Spark自动推断它。此限制确保了流查询使用一致的模式,即使在失败的情况下也是如此。对于特殊用例,您可以通过将 spark.sql.streaming.schemaInference 设置为 true 来重新启用模式推断。

Partition discovery does occur when subdirectories that are named /key=value/ are present and listing will automatically recurse into these directories. If these columns appear in the user-provided schema, they will be filled in by Spark based on the path of the file being read. The directories that make up the partitioning scheme must be present when the query starts and must remain static. For example, it is okay to add /data/year=2016/ when /data/year=2015/ was present, but it is invalid to change the partitioning column (i.e. by creating the directory /data/date=2016-04-17/).
当存在名为 /key=value/ 的子目录时,会发生分区发现,列表将自动递归到这些目录。如果这些列出现在用户提供的模式中,Spark将根据正在读取的文件的路径填充它们。组成分区方案的目录在查询开始时必须存在,并且必须保持静态。例如,当存在 /data/year=2015/ 时添加 /data/year=2016/ 是可以的,但是更改分区列(即通过创建目录 /data/date=2016-04-17/ )是无效的。

Operations on streaming DataFrames/Datasets
流数据帧/数据集上的操作

You can apply all kinds of operations on streaming DataFrames/Datasets – ranging from untyped, SQL-like operations (e.g. select, where, groupBy), to typed RDD-like operations (e.g. map, filter, flatMap). See the SQL programming guide for more details. Let’s take a look at a few example operations that you can use.
您可以对流式数据帧/数据集应用各种操作-从非类型化的SQL类操作(例如 selectwheregroupBy )到类型化的RDD类操作(例如 mapfilterflatMap )。有关详细信息,请参阅SQL编程指南。让我们来看看您可以使用的几个示例操作。

Basic Operations - Selection, Projection, Aggregation
基本操作-选择、投影、聚合

Most of the common operations on DataFrame/Dataset are supported for streaming. The few operations that are not supported are discussed later in this section.
流支持DataFrame/Dataset上的大多数常见操作。不支持的几个操作将在本节后面讨论。

case class DeviceData(device: String, deviceType: String, signal: Double, time: DateTime)

val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: string }
val ds: Dataset[DeviceData] = df.as[DeviceData]    // streaming Dataset with IOT device data

// Select the devices which have signal more than 10
df.select("device").where("signal > 10")      // using untyped APIs   
ds.filter(_.signal > 10).map(_.device)         // using typed APIs

// Running count of the number of updates for each device type
df.groupBy("deviceType").count()                          // using untyped API

// Running average signal for each device type
import org.apache.spark.sql.expressions.scalalang.typed
ds.groupByKey(_.deviceType).agg(typed.avg(_.signal))    // using typed API
import org.apache.spark.api.java.function.*;
import org.apache.spark.sql.*;
import org.apache.spark.sql.expressions.javalang.typed;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;

public class DeviceData {
  private String device;
  private String deviceType;
  private Double signal;
  private java.sql.Date time;
  ...
  // Getter and setter methods for each field
}

Dataset<Row> df = ...;    // streaming DataFrame with IOT device data with schema { device: string, type: string, signal: double, time: DateType }
Dataset<DeviceData> ds = df.as(ExpressionEncoder.javaBean(DeviceData.class)); // streaming Dataset with IOT device data

// Select the devices which have signal more than 10
df.select("device").where("signal > 10"); // using untyped APIs
ds.filter((FilterFunction<DeviceData>) value -> value.getSignal() > 10)
  .map((MapFunction<DeviceData, String>) value -> value.getDevice(), Encoders.STRING());

// Running count of the number of updates for each device type
df.groupBy("deviceType").count(); // using untyped API

// Running average signal for each device type
ds.groupByKey((MapFunction<DeviceData, String>) value -> value.getDeviceType(), Encoders.STRING())
  .agg(typed.avg((MapFunction<DeviceData, Double>) value -> value.getSignal()));
df = ...  # streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: DateType }

# Select the devices which have signal more than 10
df.select("device").where("signal > 10")

# Running count of the number of updates for each device type
df.groupBy("deviceType").count()
df <- ...  # streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: DateType }

# Select the devices which have signal more than 10
select(where(df, "signal > 10"), "device")

# Running count of the number of updates for each device type
count(groupBy(df, "deviceType"))

You can also register a streaming DataFrame/Dataset as a temporary view and then apply SQL commands on it.
您还可以将流式DataFrame/Dataset注册为临时视图,然后对其应用SQL命令。

df.createOrReplaceTempView("updates")
spark.sql("select count(*) from updates")  // returns another streaming DF
df.createOrReplaceTempView("updates");
spark.sql("select count(*) from updates");  // returns another streaming DF
df.createOrReplaceTempView("updates")
spark.sql("select count(*) from updates")  # returns another streaming DF
createOrReplaceTempView(df, "updates")
sql("select count(*) from updates")

Note, you can identify whether a DataFrame/Dataset has streaming data or not by using df.isStreaming.
请注意,您可以使用 df.isStreaming 来识别DataFrame/Dataset是否具有流数据。

df.isStreaming
df.isStreaming()
df.isStreaming()
isStreaming(df)

You may want to check the query plan of the query, as Spark could inject stateful operations during interpret of SQL statement against streaming dataset. Once stateful operations are injected in the query plan, you may need to check your query with considerations in stateful operations. (e.g. output mode, watermark, state store size maintenance, etc.)
您可能需要检查查询的查询计划,因为Spark可能会在针对流数据集解释SQL语句时注入有状态操作。在查询计划中注入有状态操作后,您可能需要检查查询中有状态操作的注意事项。(e.g.输出模式、水印、状态存储大小维护等)

Window Operations on Event Time
事件时间上的窗口操作

Aggregations over a sliding event-time window are straightforward with Structured Streaming and are very similar to grouped aggregations. In a grouped aggregation, aggregate values (e.g. counts) are maintained for each unique value in the user-specified grouping column. In case of window-based aggregations, aggregate values are maintained for each window the event-time of a row falls into. Let’s understand this with an illustration.
在滑动事件时间窗口上的聚合对于结构化流来说很简单,并且与分组聚合非常相似。在分组聚合中,为用户指定的分组列中的每个唯一值维护聚合值(例如计数)。在基于窗口的聚合的情况下,为行福尔斯的事件时间落入的每个窗口维护聚合值。让我们用一个例子来理解这一点。

Imagine our quick example is modified and the stream now contains lines along with the time when the line was generated. Instead of running word counts, we want to count words within 10 minute windows, updating every 5 minutes. That is, word counts in words received between 10 minute windows 12:00 - 12:10, 12:05 - 12:15, 12:10 - 12:20, etc. Note that 12:00 - 12:10 means data that arrived after 12:00 but before 12:10. Now, consider a word that was received at 12:07. This word should increment the counts corresponding to two windows 12:00 - 12:10 and 12:05 - 12:15. So the counts will be indexed by both, the grouping key (i.e. the word) and the window (can be calculated from the event-time).
假设修改了我们的快速示例,流现在包含了沿着生成时间的行。而不是运行单词计数,我们希望在10分钟内计算单词,每5分钟更新一次。也就是说,在10分钟窗口12:00 - 12:10、12:05 - 12:15、12:10 - 12:20等之间接收的单词中的单词计数。注意,12:00 - 12:10表示在12:00之后但在12:10之前到达的数据。现在,考虑一个在12:07收到的单词。这个词应该增加对应于两个窗口12:00 - 12:10和12:05 - 12:15的计数。因此,计数将由分组键(即单词)和窗口(可以从事件时间计算)两者索引。

The result tables would look something like the following.
结果表如下所示。

Window Operations

Since this windowing is similar to grouping, in code, you can use groupBy() and window() operations to express windowed aggregations. You can see the full code for the below examples in Scala/Java/Python.
由于这种窗口化类似于分组,在代码中,您可以使用 groupBy()window() 操作来表达窗口化聚合。你可以在Scala/ Java/ Python中看到下面例子的完整代码。

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()
Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words.groupBy(
  functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
  words.col("word")
).count();
words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()
words <- ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts <- count(
                    groupBy(
                      words,
                      window(words$timestamp, "10 minutes", "5 minutes"),
                      words$word))

Handling Late Data and Watermarking
处理延迟数据和水印

Now consider what happens if one of the events arrives late to the application. For example, say, a word generated at 12:04 (i.e. event time) could be received by the application at 12:11. The application should use the time 12:04 instead of 12:11 to update the older counts for the window 12:00 - 12:10. This occurs naturally in our window-based grouping – Structured Streaming can maintain the intermediate state for partial aggregates for a long period of time such that late data can update aggregates of old windows correctly, as illustrated below.
现在考虑一下,如果其中一个事件延迟到达应用程序会发生什么。例如,在12:04(即事件时间)生成的单词可能在12:11被应用程序接收。应用程序应使用时间12:04而不是12:11来更新窗口 12:00 - 12:10 的较旧计数。这在我们的基于窗口的分组中自然发生-结构化流可以在很长一段时间内保持部分聚合的中间状态,以便后期数据可以正确更新旧窗口的聚合,如下图所示。

Handling Late Data

However, to run this query for days, it’s necessary for the system to bound the amount of intermediate in-memory state it accumulates. This means the system needs to know when an old aggregate can be dropped from the in-memory state because the application is not going to receive late data for that aggregate any more. To enable this, in Spark 2.1, we have introduced watermarking, which lets the engine automatically track the current event time in the data and attempt to clean up old state accordingly. You can define the watermark of a query by specifying the event time column and the threshold on how late the data is expected to be in terms of event time. For a specific window ending at time T, the engine will maintain state and allow late data to update the state until (max event time seen by the engine - late threshold > T). In other words, late data within the threshold will be aggregated, but data later than the threshold will start getting dropped (see later in the section for the exact guarantees). Let’s understand this with an example. We can easily define watermarking on the previous example using withWatermark() as shown below.
但是,要运行此查询数天,系统必须限制它累积的中间内存状态的数量。这意味着系统需要知道何时可以从内存中状态删除旧的聚合,因为应用程序将不再接收该聚合的延迟数据。为了实现这一点,在Spark 2.1中,我们引入了水印,它让引擎自动跟踪数据中的当前事件时间,并尝试相应地清理旧状态。您可以通过指定事件时间列和数据在事件时间方面的预期延迟阈值来定义查询的水印。对于在时间 T 结束的特定窗口,引擎将保持状态并允许后期数据更新状态,直到 (max event time seen by the engine - late threshold > T) 。换句话说,在阈值内的延迟数据将被聚合,但晚于阈值的数据将开始被丢弃(有关确切的保证,请参阅本节后面的内容)。 让我们通过一个例子来理解这一点。我们可以很容易地定义水印上一个例子使用 withWatermark() 如下所示。

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        window($"timestamp", "10 minutes", "5 minutes"),
        $"word")
    .count()
Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        window(col("timestamp"), "10 minutes", "5 minutes"),
        col("word"))
    .count();
words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        window(words.timestamp, "10 minutes", "5 minutes"),
        words.word) \
    .count()
words <- ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group

words <- withWatermark(words, "timestamp", "10 minutes")
windowedCounts <- count(
                    groupBy(
                      words,
                      window(words$timestamp, "10 minutes", "5 minutes"),
                      words$word))

In this example, we are defining the watermark of the query on the value of the column “timestamp”, and also defining “10 minutes” as the threshold of how late is the data allowed to be. If this query is run in Update output mode (discussed later in Output Modes section), the engine will keep updating counts of a window in the Result Table until the window is older than the watermark, which lags behind the current event time in column “timestamp” by 10 minutes. Here is an illustration.
在这个例子中,我们在列"timestamp"的值上定义了查询的水印,并且还定义了"10分钟"作为数据允许延迟的阈值。如果此查询在更新输出模式下运行(稍后在输出模式部分中讨论),则引擎将不断更新结果表中窗口的计数,直到窗口比水印更旧,这比列"时间戳"中的当前事件时间滞后10分钟。这里有一个例子。

Watermarking in Update Mode

As shown in the illustration, the maximum event time tracked by the engine is the blue dashed line, and the watermark set as (max event time - '10 mins') at the beginning of every trigger is the red line. For example, when the engine observes the data (12:14, dog), it sets the watermark for the next trigger as 12:04. This watermark lets the engine maintain intermediate state for additional 10 minutes to allow late data to be counted. For example, the data (12:09, cat) is out of order and late, and it falls in windows 12:00 - 12:10 and 12:05 - 12:15. Since, it is still ahead of the watermark 12:04 in the trigger, the engine still maintains the intermediate counts as state and correctly updates the counts of the related windows. However, when the watermark is updated to 12:11, the intermediate state for window (12:00 - 12:10) is cleared, and all subsequent data (e.g. (12:04, donkey)) is considered “too late” and therefore ignored. Note that after every trigger, the updated counts (i.e. purple rows) are written to sink as the trigger output, as dictated by the Update mode.
如图所示,引擎跟踪的最大事件时间为蓝色虚线,每个触发开始时设置为 (max event time - '10 mins') 的水印为红线。例如,当引擎观察到数据 (12:14, dog) 时,它将下一个触发器的水印设置为 12:04 。此水印允许引擎将中间状态再保持10分钟,以便对延迟数据进行计数。例如,数据 (12:09, cat) 是无序和迟到的,它福尔斯在窗口 12:00 - 12:1012:05 - 12:15 。由于它仍然在触发器中的水印 12:04 之前,因此引擎仍然将中间计数保持为状态,并正确地更新相关窗口的计数。然而,当水印被更新为 12:11 时,窗口 (12:00 - 12:10) 的中间状态被清除,并且所有后续数据(例如 (12:04, donkey) )被认为“太迟”并且因此被忽略。请注意,在每个触发器之后,更新的计数(即紫色行)将作为触发器输出写入sink,如Update模式所指示的。

Some sinks (e.g. files) may not supported fine-grained updates that Update Mode requires. To work with them, we have also support Append Mode, where only the final counts are written to sink. This is illustrated below.
某些接收器(例如文件)可能不支持更新模式所需的细粒度更新。为了使用它们,我们还支持Append Mode,其中只有最终计数被写入sink。下文对此进行了说明。

Note that using withWatermark on a non-streaming Dataset is no-op. As the watermark should not affect any batch query in any way, we will ignore it directly.
请注意,在非流数据集上使用 withWatermark 是无操作的。由于水印不应该以任何方式影响任何批查询,我们将直接忽略它。

Watermarking in Append Mode

Similar to the Update Mode earlier, the engine maintains intermediate counts for each window. However, the partial counts are not updated to the Result Table and not written to sink. The engine waits for “10 mins” for late date to be counted, then drops intermediate state of a window < watermark, and appends the final counts to the Result Table/sink. For example, the final counts of window 12:00 - 12:10 is appended to the Result Table only after the watermark is updated to 12:11.
与前面的更新模式类似,引擎为每个窗口维护中间计数。但是,部分计数不会更新到结果表,也不会写入接收器。引擎等待"10分钟"以计算最后日期,然后丢弃窗口<水印的中间状态,并将最终计数附加到结果表/接收器。例如,只有在水印更新为 12:11 之后,窗口 12:00 - 12:10 的最终计数才被附加到结果表。

Conditions for watermarking to clean aggregation state
水印清除聚集状态的条件

It is important to note that the following conditions must be satisfied for the watermarking to clean the state in aggregation queries (as of Spark 2.1.1, subject to change in the future).
需要注意的是,水印必须满足以下条件才能在聚合查询中清除状态(从Spark 2.1.1开始,未来可能会更改)。

Semantic Guarantees of Aggregation with Watermarking
基于水印的聚合算法的语义保证

Join Operations 连接操作

Structured Streaming supports joining a streaming Dataset/DataFrame with a static Dataset/DataFrame as well as another streaming Dataset/DataFrame. The result of the streaming join is generated incrementally, similar to the results of streaming aggregations in the previous section. In this section we will explore what type of joins (i.e. inner, outer, semi, etc.) are supported in the above cases. Note that in all the supported join types, the result of the join with a streaming Dataset/DataFrame will be the exactly the same as if it was with a static Dataset/DataFrame containing the same data in the stream.
结构化流支持将流数据集/数据帧与静态数据集/数据帧以及另一个流数据集/数据帧连接。流式连接的结果是增量生成的,类似于上一节中的流式聚合的结果。在本节中,我们将探讨什么类型的连接(即内部,外部,半等)。在上述案例中得到支持。请注意,在所有支持的连接类型中,与流数据集/数据帧的连接结果将与包含流中相同数据的静态数据集/数据帧完全相同。

Stream-static Joins 流静态连接

Since the introduction in Spark 2.0, Structured Streaming has supported joins (inner join and some type of outer joins) between a streaming and a static DataFrame/Dataset. Here is a simple example.
自从Spark 2.0引入以来,结构化流已经支持流和静态DataFrame/Dataset之间的连接(内部连接和某种类型的外部连接)。这里有一个简单的例子。

val staticDf = spark.read. ...
val streamingDf = spark.readStream. ...

streamingDf.join(staticDf, "type")          // inner equi-join with a static DF
streamingDf.join(staticDf, "type", "left_outer")  // left outer join with a static DF
Dataset<Row> staticDf = spark.read(). ...;
Dataset<Row> streamingDf = spark.readStream(). ...;
streamingDf.join(staticDf, "type");         // inner equi-join with a static DF
streamingDf.join(staticDf, "type", "left_outer");  // left outer join with a static DF
staticDf = spark.read. ...
streamingDf = spark.readStream. ...
streamingDf.join(staticDf, "type")  # inner equi-join with a static DF
streamingDf.join(staticDf, "type", "left_outer")  # left outer join with a static DF
staticDf <- read.df(...)
streamingDf <- read.stream(...)
joined <- merge(streamingDf, staticDf, sort = FALSE)  # inner equi-join with a static DF
joined <- join(
            streamingDf,
            staticDf,
            streamingDf$value == staticDf$value,
            "left_outer")  # left outer join with a static DF

Note that stream-static joins are not stateful, so no state management is necessary. However, a few types of stream-static outer joins are not yet supported. These are listed at the end of this Join section.
请注意,流静态连接不是有状态的,因此不需要状态管理。但是,有几种类型的流静态外部联接尚不受支持。这些列在此连接部分的末尾。

Stream-stream Joins 流-流连接

In Spark 2.3, we have added support for stream-stream joins, that is, you can join two streaming Datasets/DataFrames. The challenge of generating join results between two data streams is that, at any point of time, the view of the dataset is incomplete for both sides of the join making it much harder to find matches between inputs. Any row received from one input stream can match with any future, yet-to-be-received row from the other input stream. Hence, for both the input streams, we buffer past input as streaming state, so that we can match every future input with past input and accordingly generate joined results. Furthermore, similar to streaming aggregations, we automatically handle late, out-of-order data and can limit the state using watermarks. Let’s discuss the different types of supported stream-stream joins and how to use them.
在Spark 2.3中,我们增加了对流—流连接的支持,也就是说,你可以连接两个流数据集/数据帧。在两个数据流之间生成连接结果的挑战在于,在任何时间点,数据集的视图对于连接的两端都是不完整的,这使得在输入之间找到匹配变得更加困难。从一个输入流接收到的任何行都可以与来自另一个输入流的任何未来的、尚未接收到的行相匹配。因此,对于两个输入流,我们将过去的输入缓冲为流式状态,以便我们可以将每个未来输入与过去的输入相匹配,并相应地生成连接结果。此外,与流式聚合类似,我们自动处理延迟的无序数据,并可以使用水印限制状态。让我们讨论支持的不同类型的流—流连接以及如何使用它们。

Inner Joins with optional Watermarking
带可选水印的内联接

Inner joins on any kind of columns along with any kind of join conditions are supported. However, as the stream runs, the size of streaming state will keep growing indefinitely as all past input must be saved as any new input can match with any input from the past. To avoid unbounded state, you have to define additional join conditions such that indefinitely old inputs cannot match with future inputs and therefore can be cleared from the state. In other words, you will have to do the following additional steps in the join.
支持任何类型的柱沿着上的内部连接以及任何类型的连接条件。然而,随着流的运行,流状态的大小将无限期地增长,因为所有过去的输入都必须被保存,因为任何新的输入都可以与过去的任何输入相匹配。为了避免无界状态,必须定义额外的连接条件,使得无限旧的输入不能与未来的输入匹配,因此可以从状态中清除。换句话说,你必须在连接中执行以下附加步骤。

  1. Define watermark delays on both inputs such that the engine knows how delayed the input can be (similar to streaming aggregations)
    在两个输入上定义水印延迟,以便引擎知道输入的延迟程度(类似于流聚合)

  2. Define a constraint on event-time across the two inputs such that the engine can figure out when old rows of one input is not going to be required (i.e. will not satisfy the time constraint) for matches with the other input. This constraint can be defined in one of the two ways.
    在两个输入上定义一个事件时间约束,这样引擎就可以确定何时不需要一个输入的旧行(即不满足时间约束)来与另一个输入进行匹配。这个约束可以用两种方式中的一种来定义。

    1. Time range join conditions (e.g. ...JOIN ON leftTime BETWEEN rightTime AND rightTime + INTERVAL 1 HOUR),
      时间范围连接条件(例如 ...JOIN ON leftTime BETWEEN rightTime AND rightTime + INTERVAL 1 HOUR ),

    2. Join on event-time windows (e.g. ...JOIN ON leftTimeWindow = rightTimeWindow).
      在事件时间窗口上加入(例如 ...JOIN ON leftTimeWindow = rightTimeWindow )。

Let’s understand this with an example.
让我们通过一个例子来理解这一点。

Let’s say we want to join a stream of advertisement impressions (when an ad was shown) with another stream of user clicks on advertisements to correlate when impressions led to monetizable clicks. To allow the state cleanup in this stream-stream join, you will have to specify the watermarking delays and the time constraints as follows.
假设我们想将一个广告印象流(当广告显示时)与另一个用户点击广告的流结合起来,以关联印象何时导致可货币化的点击。为了允许在这个流-流连接中进行状态清理,您必须指定水印延迟和时间约束,如下所示。

  1. Watermark delays: Say, the impressions and the corresponding clicks can be late/out-of-order in event-time by at most 2 and 3 hours, respectively.
    水印延迟:也就是说,展示和相应的点击在事件时间上最多分别延迟/乱序2小时和3小时。

  2. Event-time range condition: Say, a click can occur within a time range of 0 seconds to 1 hour after the corresponding impression.
    事件-时间范围条件:例如,点击可以在相应印象之后的0秒到1小时的时间范围内发生。

The code would look like this.
代码看起来像这样。

import org.apache.spark.sql.functions.expr

val impressions = spark.readStream. ...
val clicks = spark.readStream. ...

// Apply watermarks on event-time columns
val impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
val clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")

// Join with event-time constraints
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """)
)
import static org.apache.spark.sql.functions.expr

Dataset<Row> impressions = spark.readStream(). ...
Dataset<Row> clicks = spark.readStream(). ...

// Apply watermarks on event-time columns
Dataset<Row> impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours");
Dataset<Row> clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours");

// Join with event-time constraints
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr(
    "clickAdId = impressionAdId AND " +
    "clickTime >= impressionTime AND " +
    "clickTime <= impressionTime + interval 1 hour ")
);
from pyspark.sql.functions import expr

impressions = spark.readStream. ...
clicks = spark.readStream. ...

# Apply watermarks on event-time columns
impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")

# Join with event-time constraints
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """)
)
impressions <- read.stream(...)
clicks <- read.stream(...)

# Apply watermarks on event-time columns
impressionsWithWatermark <- withWatermark(impressions, "impressionTime", "2 hours")
clicksWithWatermark <- withWatermark(clicks, "clickTime", "3 hours")

# Join with event-time constraints
joined <- join(
  impressionsWithWatermark,
  clicksWithWatermark,
  expr(
    paste(
      "clickAdId = impressionAdId AND",
      "clickTime >= impressionTime AND",
      "clickTime <= impressionTime + interval 1 hour"
)))
Semantic Guarantees of Stream-stream Inner Joins with Watermarking
带水印的流-流内连接的语义保证

This is similar to the guarantees provided by watermarking on aggregations. A watermark delay of “2 hours” guarantees that the engine will never drop any data that is less than 2 hours delayed. But data delayed by more than 2 hours may or may not get processed.
这类似于在聚合上加水印所提供的保证。“2小时”的水印延迟保证引擎永远不会丢弃任何延迟小于2小时的数据。但延迟超过2小时的数据可能会或可能不会得到处理。

Outer Joins with Watermarking
带水印的外部联接

While the watermark + event-time constraints is optional for inner joins, for outer joins they must be specified. This is because for generating the NULL results in outer join, the engine must know when an input row is not going to match with anything in future. Hence, the watermark + event-time constraints must be specified for generating correct results. Therefore, a query with outer-join will look quite like the ad-monetization example earlier, except that there will be an additional parameter specifying it to be an outer-join.
虽然水印+事件时间约束对于内部连接是可选的,但对于外部连接,必须指定它们。这是因为为了在外部连接中生成NULL结果,引擎必须知道输入行将来何时不会与任何内容匹配。因此,必须指定水印+事件时间约束以生成正确的结果。因此,带有outer-join的查询看起来与前面的广告货币化示例非常相似,只是有一个额外的参数将其指定为outer-join。

impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
 )
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr(
    "clickAdId = impressionAdId AND " +
    "clickTime >= impressionTime AND " +
    "clickTime <= impressionTime + interval 1 hour "),
  "leftOuter"                 // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
);
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined <- join(
  impressionsWithWatermark,
  clicksWithWatermark,
  expr(
    paste(
      "clickAdId = impressionAdId AND",
      "clickTime >= impressionTime AND",
      "clickTime <= impressionTime + interval 1 hour"),
  "left_outer"                 # can be "inner", "left_outer", "right_outer", "full_outer", "left_semi"
))
Semantic Guarantees of Stream-stream Outer Joins with Watermarking
带水印的流-流外连接的语义保证

Outer joins have the same guarantees as inner joins regarding watermark delays and whether data will be dropped or not.
关于水印延迟以及数据是否会被丢弃,外部连接与内部连接具有相同的保证。

Caveats 警告

There are a few important characteristics to note regarding how the outer results are generated.
关于外部结果是如何生成的,有几个重要的特征需要注意。

Semi Joins with Watermarking
带水印的半联接

A semi join returns values from the left side of the relation that has a match with the right. It is also referred to as a left semi join. Similar to outer joins, watermark + event-time constraints must be specified for semi join. This is to evict unmatched input rows on left side, the engine must know when an input row on left side is not going to match with anything on right side in future.
半联接返回来自与右侧匹配的关系左侧的值。它也被称为左半联接。与外部联接类似,必须为半联接指定水印+事件时间约束。这是为了驱逐左侧不匹配的输入行,引擎必须知道左侧的输入行何时不会与右侧的任何内容匹配。

Semantic Guarantees of Stream-stream Semi Joins with Watermarking
带水印的流—流半连接的语义保证

Semi joins have the same guarantees as inner joins regarding watermark delays and whether data will be dropped or not.
关于水印延迟和数据是否会被丢弃,半连接具有与内连接相同的保证。

Support matrix for joins in streaming queries
流查询中连接的支持矩阵
Left Input 左输入 Right Input 右输入 Join Type 联接类型
Static Static All types 所有类型 Supported, since its not on streaming data even though it can be present in a streaming query
支持,因为它不在流数据上,即使它可以存在于流查询中
Stream Static Inner Supported, not stateful 受支持,无状态
Left Outer 左外 Supported, not stateful 受支持,无状态
Right Outer 右外 Not supported 不支持
Full Outer 完全外部 Not supported 不支持
Left Semi 左半 Supported, not stateful 受支持,无状态
Static Stream Inner Supported, not stateful 受支持,无状态
Left Outer 左外 Not supported 不支持
Right Outer 右外 Supported, not stateful 受支持,无状态
Full Outer 完全外部 Not supported 不支持
Left Semi 左半 Not supported 不支持
Stream Stream Inner Supported, optionally specify watermark on both sides + time constraints for state cleanup
支持,可选择在两侧指定水印+状态清除的时间限制
Left Outer 左外 Conditionally supported, must specify watermark on right + time constraints for correct results, optionally specify watermark on left for all state cleanup
支持连接,必须在右侧指定水印+时间限制以获得正确的结果,可选地在左侧指定水印以进行所有状态清理
Right Outer 右外 Conditionally supported, must specify watermark on left + time constraints for correct results, optionally specify watermark on right for all state cleanup
支持连接,必须在左侧指定水印+时间限制以获得正确的结果,可选地在右侧指定水印以进行所有状态清理
Full Outer 完全外部 Conditionally supported, must specify watermark on one side + time constraints for correct results, optionally specify watermark on the other side for all state cleanup
支持连接,必须在一侧指定水印+时间限制以获得正确的结果,可选地在另一侧指定水印以进行所有状态清理
Left Semi 左半 Conditionally supported, must specify watermark on right + time constraints for correct results, optionally specify watermark on left for all state cleanup
支持连接,必须在右侧指定水印+时间限制以获得正确的结果,可选地在左侧指定水印以进行所有状态清理

Additional details on supported joins:
有关支持的联接的其他详细信息:

Streaming Deduplication 流媒体

You can deduplicate records in data streams using a unique identifier in the events. This is exactly same as deduplication on static using a unique identifier column. The query will store the necessary amount of data from previous records such that it can filter duplicate records. Similar to aggregations, you can use deduplication with or without watermarking.
您可以使用事件中的唯一标识符消除数据流中的重复记录。这与使用唯一标识符列的静态重复数据消除完全相同。查询将存储来自以前记录的必要数据量,以便可以过滤重复记录。与聚合类似,您可以使用带或不带水印的重复数据消除。

val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// Without watermark using guid column
streamingDf.dropDuplicates("guid")

// With watermark using guid and eventTime columns
streamingDf
  .withWatermark("eventTime", "10 seconds")
  .dropDuplicates("guid", "eventTime")
Dataset<Row> streamingDf = spark.readStream(). ...;  //