Structured Streaming Programming Guide
结构化流编程指南
- Overview 概述
- Quick Example 简单的例子
- Programming Model 编程模型
- API using Datasets and DataFrames
使用数据集和数据帧的API- Creating streaming DataFrames and streaming Datasets
创建流式数据帧和流式数据集 - Operations on streaming DataFrames/Datasets
流数据帧/数据集上的操作- Basic Operations - Selection, Projection, Aggregation
基本操作-选择、投影、聚合 - Window Operations on Event Time
事件时间上的窗口操作 - Join Operations 连接操作
- Streaming Deduplication 流媒体
- Policy for handling multiple watermarks
处理多个水印的策略 - Arbitrary Stateful Operations
任意状态操作 - Unsupported Operations 不支持的操作
- Limitation of global watermark
全局水印的局限性
- Basic Operations - Selection, Projection, Aggregation
- Starting Streaming Queries
开始流媒体播放 - Managing Streaming Queries
管理流媒体服务 - Monitoring Streaming Queries
监控流媒体 - Recovering from Failures with Checkpointing
使用检查点从故障中恢复 - Recovery Semantics after Changes in a Streaming Query
流查询中更改后的恢复语义
- Creating streaming DataFrames and streaming Datasets
- Continuous Processing 连续处理
- Additional Information 附加信息
Overview 概述
Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive. You can use the Dataset/DataFrame API in Scala, Java, Python or R to express streaming aggregations, event-time windows, stream-to-batch joins, etc. The computation is executed on the same optimized Spark SQL engine. Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write-Ahead Logs. In short, Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.
Structured Streaming是一个基于Spark SQL引擎的可扩展和容错流处理引擎。您可以像在静态数据上表示批处理计算一样来表示流计算。Spark SQL引擎将负责增量和连续地运行它,并在流数据继续到达时更新最终结果。您可以使用Scala、Java、Python或R中的Dataset/DataFrame API来表达流式聚合、事件时间窗口、流式到批量连接等。计算在相同的优化Spark SQL引擎上执行。最后,该系统通过检查点和预写操作确保了端到端的精确一次容错保证。简而言之,结构化流提供了快速、可扩展、容错、端到端的一次流处理,而用户不必考虑流。
Internally, by default, Structured Streaming queries are processed using a micro-batch processing engine, which processes data streams as a series of small batch jobs thereby achieving end-to-end latencies as low as 100 milliseconds and exactly-once fault-tolerance guarantees. However, since Spark 2.3, we have introduced a new low-latency processing mode called Continuous Processing, which can achieve end-to-end latencies as low as 1 millisecond with at-least-once guarantees. Without changing the Dataset/DataFrame operations in your queries, you will be able to choose the mode based on your application requirements.
在内部,默认情况下,结构化流查询使用微批处理引擎进行处理,该引擎将数据流作为一系列小批处理作业进行处理,从而实现低至100毫秒的端到端延迟和一次容错保证。然而,从Spark 2.3开始,我们引入了一种新的低延迟处理模式,称为连续处理,它可以实现端到端延迟低至1毫秒,并保证至少一次。无需更改查询中的Dataset/DataFrame操作,您就可以根据应用程序的需求选择模式。
In this guide, we are going to walk you through the programming model and the APIs. We are going to explain the concepts mostly using the default micro-batch processing model, and then later discuss Continuous Processing model. First, let’s start with a simple example of a Structured Streaming query - a streaming word count.
在本指南中,我们将向您介绍编程模型和API。我们将主要使用默认的微批处理模型来解释这些概念,然后再讨论连续处理模型。首先,让我们从结构化流查询的一个简单示例开始—流字数统计。
Quick Example 简单的例子
Let’s say you want to maintain a running word count of text data received from a data server listening on a TCP socket. Let’s see how you can express this using Structured Streaming. You can see the full code in
Scala/Java/Python/R.
And if you download Spark, you can directly run the example. In any case, let’s walk through the example step-by-step and understand how it works. First, we have to import the necessary classes and create a local SparkSession, the starting point of all functionalities related to Spark.
假设您希望维护从侦听TCP套接字的数据服务器接收的文本数据的运行字数。让我们看看如何使用结构化流来表达这一点。你可以在Scala/Java/Python/R中看到完整的代码。如果你下载了Spark,你可以直接运行这个例子。无论如何,让我们一步一步地看一下这个例子,了解它是如何工作的。首先,我们必须导入必要的类并创建一个本地SparkSession,这是与Spark相关的所有功能的起点。
Next, let’s create a streaming DataFrame that represents text data received from a server listening on localhost:9999, and transform the DataFrame to calculate word counts.
接下来,让我们创建一个流DataFrame,它表示从监听localhost:9999的服务器接收的文本数据,并转换DataFrame以计算字数。
This lines
DataFrame represents an unbounded table containing the streaming text data. This table contains one column of strings named “value”, and each line in the streaming text data becomes a row in the table. Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it. Next, we have converted the DataFrame to a Dataset of String using .as[String]
, so that we can apply the flatMap
operation to split each line into multiple words. The resultant words
Dataset contains all the words. Finally, we have defined the wordCounts
DataFrame by grouping by the unique values in the Dataset and counting them. Note that this is a streaming DataFrame which represents the running word counts of the stream.
此 lines
DataFrame表示包含流式文本数据的无边界表。此表包含一列名为“value”的字符串,流文本数据中的每一行都成为表中的一行。请注意,由于我们刚刚设置转换,因此它当前没有接收任何数据,并且尚未启动它。接下来,我们使用 .as[String]
将DataFrame转换为String的Dataset,以便我们可以应用 flatMap
操作将每行拆分为多个单词。生成的 words
数据集包含所有单词。最后,我们定义了 wordCounts
DataFrame,方法是按Dataset中的唯一值进行分组并对它们进行计数。请注意,这是一个流数据帧,它表示流的运行字数。
This lines
DataFrame represents an unbounded table containing the streaming text data. This table contains one column of strings named “value”, and each line in the streaming text data becomes a row in the table. Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it. Next, we have converted the DataFrame to a Dataset of String using .as(Encoders.STRING())
, so that we can apply the flatMap
operation to split each line into multiple words. The resultant words
Dataset contains all the words. Finally, we have defined the wordCounts
DataFrame by grouping by the unique values in the Dataset and counting them. Note that this is a streaming DataFrame which represents the running word counts of the stream.
此 lines
DataFrame表示包含流式文本数据的无边界表。此表包含一列名为“value”的字符串,流文本数据中的每一行都成为表中的一行。请注意,由于我们刚刚设置转换,因此它当前没有接收任何数据,并且尚未启动它。接下来,我们使用 .as(Encoders.STRING())
将DataFrame转换为String的Dataset,以便我们可以应用 flatMap
操作将每行拆分为多个单词。生成的 words
数据集包含所有单词。最后,我们定义了 wordCounts
DataFrame,方法是按Dataset中的唯一值进行分组并对它们进行计数。请注意,这是一个流数据帧,它表示流的运行字数。
This lines
DataFrame represents an unbounded table containing the streaming text data. This table contains one column of strings named “value”, and each line in the streaming text data becomes a row in the table. Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it. Next, we have used two built-in SQL functions - split and explode, to split each line into multiple rows with a word each. In addition, we use the function alias
to name the new column as “word”. Finally, we have defined the wordCounts
DataFrame by grouping by the unique values in the Dataset and counting them. Note that this is a streaming DataFrame which represents the running word counts of the stream.
This lines
SparkDataFrame represents an unbounded table containing the streaming text data. This table contains one column of strings named “value”, and each line in the streaming text data becomes a row in the table. Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it. Next, we have a SQL expression with two SQL functions - split and explode, to split each line into multiple rows with a word each. In addition, we name the new column as “word”. Finally, we have defined the wordCounts
SparkDataFrame by grouping by the unique values in the SparkDataFrame and counting them. Note that this is a streaming SparkDataFrame which represents the running word counts of the stream.
We have now set up the query on the streaming data. All that is left is to actually start receiving data and computing the counts. To do this, we set it up to print the complete set of counts (specified by outputMode("complete")
) to the console every time they are updated. And then start the streaming computation using start()
.
我们现在已经设置了对流数据的查询。剩下的就是实际开始接收数据和计算计数。为此,我们将其设置为在每次更新时将完整的计数集(由 outputMode("complete")
指定)打印到控制台。然后使用 start()
开始流式计算。
After this code is executed, the streaming computation will have started in the background. The query
object is a handle to that active streaming query, and we have decided to wait for the termination of the query using awaitTermination()
to prevent the process from exiting while the query is active.
执行此代码后,流计算将在后台启动。 query
对象是该活动流查询的句柄,我们决定使用 awaitTermination()
等待查询终止,以防止进程在查询活动时退出。
To actually execute this example code, you can either compile the code in your own Spark application, or simply run the example once you have downloaded Spark. We are showing the latter. You will first need to run Netcat (a small utility found in most Unix-like systems) as a data server by using
$ nc -lk 9999
Then, in a different terminal, you can start the example by using
然后,在另一个终端中,您可以使用
Then, any lines typed in the terminal running the netcat server will be counted and printed on screen every second. It will look something like the following.
然后,在运行netcat服务器的终端中键入的任何行将每秒计数并打印在屏幕上。它看起来像下面这样。
|
Programming Model 编程模型
The key idea in Structured Streaming is to treat a live data stream as a
table that is being continuously appended. This leads to a new stream
processing model that is very similar to a batch processing model. You will
express your streaming computation as standard batch-like query as on a static
table, and Spark runs it as an incremental query on the unbounded input
table. Let’s understand this model in more detail.
结构化流中的关键思想是将实时数据流视为不断追加的表。这导致了一个新的流处理模型,它非常类似于批处理模型。您将把流计算表示为静态表上的标准批处理查询,Spark将其作为无界输入表上的增量查询运行。让我们更详细地了解这个模型。
Basic Concepts 基本概念
Consider the input data stream as the “Input Table”. Every data item that is
arriving on the stream is like a new row being appended to the Input Table.
将输入数据流视为“输入表”。到达流的每个数据项就像是附加到输入表的新行。
A query on the input will generate the “Result Table”. Every trigger interval (say, every 1 second), new rows get appended to the Input Table, which eventually updates the Result Table. Whenever the result table gets updated, we would want to write the changed result rows to an external sink.
对输入的查询将生成“结果表”。在每个触发间隔(比如说,每1秒),新的行被附加到输入表,这最终更新了结果表。每当结果表被更新时,我们都希望将更改后的结果行写入外部接收器。
The “Output” is defined as what gets written out to the external storage. The output can be defined in a different mode:
“输出”被定义为写入外部存储器的内容。可以在不同的模式下定义输出:
-
Complete Mode - The entire updated Result Table will be written to the external storage. It is up to the storage connector to decide how to handle writing of the entire table.
完整模式—整个更新的结果表将写入外部存储器。由存储连接器决定如何处理整个表的写入。 -
Append Mode - Only the new rows appended in the Result Table since the last trigger will be written to the external storage. This is applicable only on the queries where existing rows in the Result Table are not expected to change.
追加模式—只有自上次触发后追加到结果表中的新行才会写入外部存储器。这仅适用于结果表中的现有行预计不会更改的查询。 -
Update Mode - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage (available since Spark 2.1.1). Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger. If the query doesn’t contain aggregations, it will be equivalent to Append mode.
更新模式—只有自上次触发后在结果表中更新的行才会被写入外部存储(自Spark 2.1.1起可用)。请注意,这与完整模式不同,因为此模式仅输出自上次触发以来发生更改的行。如果查询不包含聚合,则它将等效于追加模式。
Note that each mode is applicable on certain types of queries. This is discussed in detail later.
请注意,每种模式都适用于某些类型的查询。这将在后面详细讨论。
To illustrate the use of this model, let’s understand the model in context of
the Quick Example above. The first lines
DataFrame is the input table, and
the final wordCounts
DataFrame is the result table. Note that the query on
streaming lines
DataFrame to generate wordCounts
is exactly the same as
it would be a static DataFrame. However, when this query is started, Spark
will continuously check for new data from the socket connection. If there is
new data, Spark will run an “incremental” query that combines the previous
running counts with the new data to compute updated counts, as shown below.
为了说明这个模型的使用,让我们在上面的快速示例的上下文中理解这个模型。第一个 lines
DataFrame是输入表,最后一个 wordCounts
DataFrame是结果表。请注意,在流 lines
DataFrame上生成 wordCounts
的查询与静态DataFrame完全相同。但是,当这个查询启动时,Spark会不断地检查来自套接字连接的新数据。如果有新数据,Spark将运行一个“增量”查询,将以前的运行计数与新数据结合起来计算更新的计数,如下所示。
Note that Structured Streaming does not materialize the entire table. It reads the latest
available data from the streaming data source, processes it incrementally to update the result,
and then discards the source data. It only keeps around the minimal intermediate state data as
required to update the result (e.g. intermediate counts in the earlier example).
请注意,结构化流不会具体化整个表。它从流数据源中读取最新的可用数据,增量处理数据以更新结果,然后丢弃源数据。它只保留更新结果所需的最小中间状态数据(例如,前面示例中的中间计数)。
This model is significantly different from many other stream processing
engines. Many streaming systems require the user to maintain running
aggregations themselves, thus having to reason about fault-tolerance, and
data consistency (at-least-once, or at-most-once, or exactly-once). In this
model, Spark is responsible for updating the Result Table when there is new
data, thus relieving the users from reasoning about it. As an example, let’s
see how this model handles event-time based processing and late arriving data.
这个模型与许多其他流处理引擎有很大的不同。许多流媒体系统要求用户自己维护正在运行的聚合,因此必须考虑容错和数据一致性(至少一次,或最多一次,或正好一次)。在这个模型中,Spark负责在有新数据时更新结果表,从而减轻用户的推理。作为一个例子,让我们看看这个模型如何处理基于事件时间的处理和延迟到达的数据。
Handling Event-time and Late Data
处理事件时间和延迟数据
Event-time is the time embedded in the data itself. For many applications, you may want to operate on this event-time. For example, if you want to get the number of events generated by IoT devices every minute, then you probably want to use the time when the data was generated (that is, event-time in the data), rather than the time Spark receives them. This event-time is very naturally expressed in this model – each event from the devices is a row in the table, and event-time is a column value in the row. This allows window-based aggregations (e.g. number of events every minute) to be just a special type of grouping and aggregation on the event-time column – each time window is a group and each row can belong to multiple windows/groups. Therefore, such event-time-window-based aggregation queries can be defined consistently on both a static dataset (e.g. from collected device events logs) as well as on a data stream, making the life of the user much easier.
事件时间是嵌入数据本身的时间。对于许多应用程序,您可能希望在此事件时间上进行操作。例如,如果你想获得物联网设备每分钟生成的事件数量,那么你可能想使用数据生成的时间(即数据中的事件时间),而不是Spark接收它们的时间。这个事件时间在这个模型中非常自然地表示-来自设备的每个事件都是表中的一行,而事件时间是该行中的一个列值。这允许基于窗口的聚合(例如,每分钟的事件数量)只是事件-时间列上的特殊类型的分组和聚合-每个时间窗口是一个组,并且每行可以属于多个窗口/组。因此,可以在静态数据集(例如,来自收集的设备事件日志)以及数据流上一致地定义这种基于事件时间窗口的聚合查询,从而使用户的生活更加容易。
Furthermore, this model naturally handles data that has arrived later than
expected based on its event-time. Since Spark is updating the Result Table,
it has full control over updating old aggregates when there is late data,
as well as cleaning up old aggregates to limit the size of intermediate
state data. Since Spark 2.1, we have support for watermarking which
allows the user to specify the threshold of late data, and allows the engine
to accordingly clean up old state. These are explained later in more
detail in the Window Operations section.
此外,该模型自然地处理基于其事件时间比预期晚到达的数据。由于Spark正在更新结果表,因此它可以完全控制在有延迟数据时更新旧的聚合,以及清理旧的聚合以限制中间状态数据的大小。从Spark 2.1开始,我们支持水印,允许用户指定延迟数据的阈值,并允许引擎相应地清理旧状态。这些将在稍后的窗口操作部分中详细解释。
Fault Tolerance Semantics
容错语义
Delivering end-to-end exactly-once semantics was one of key goals behind the design of Structured Streaming. To achieve that, we have designed the Structured Streaming sources, the sinks and the execution engine to reliably track the exact progress of the processing so that it can handle any kind of failure by restarting and/or reprocessing. Every streaming source is assumed to have offsets (similar to Kafka offsets, or Kinesis sequence numbers)
to track the read position in the stream. The engine uses checkpointing and write-ahead logs to record the offset range of the data being processed in each trigger. The streaming sinks are designed to be idempotent for handling reprocessing. Together, using replayable sources and idempotent sinks, Structured Streaming can ensure end-to-end exactly-once semantics under any failure.
提供端到端的精确一次语义是结构化流设计背后的关键目标之一。为了实现这一目标,我们设计了结构化流源,接收器和执行引擎,以可靠地跟踪处理的确切进度,以便它可以通过重新启动和/或重新处理来处理任何类型的故障。假设每个流式源都有偏移量(类似于Kafka偏移量或Kinesis序列号)来跟踪流中的读取位置。引擎使用检查点和预写日志来记录每个触发器中正在处理的数据的偏移量范围。流式接收器被设计为对于处理再处理是幂等的。同时,使用可重放的源和幂等宿,结构化流可以确保在任何故障下的端到端精确一次语义。
API using Datasets and DataFrames
使用数据集和数据帧的API
Since Spark 2.0, DataFrames and Datasets can represent static, bounded data, as well as streaming, unbounded data. Similar to static Datasets/DataFrames, you can use the common entry point SparkSession
(Scala/Java/Python/R docs)
to create streaming DataFrames/Datasets from streaming sources, and apply the same operations on them as static DataFrames/Datasets. If you are not familiar with Datasets/DataFrames, you are strongly advised to familiarize yourself with them using the
DataFrame/Dataset Programming Guide.
从Spark 2.0开始,DataFrames和Datasets可以表示静态的有界数据,也可以表示流式的无界数据。与静态数据集/数据帧类似,您可以使用公共入口点 SparkSession
(Scala/Java/Python/R docs)从流源创建流数据帧/数据集,并对它们应用与静态数据帧/数据集相同的操作。如果您不熟悉数据集/数据框架,强烈建议您使用数据框架/数据集编程指南来熟悉它们。
Creating streaming DataFrames and streaming Datasets
创建流式数据帧和流式数据集
Streaming DataFrames can be created through the DataStreamReader
interface
(Scala/Java/Python docs)
returned by SparkSession.readStream()
. In R, with the read.stream()
method. Similar to the read interface for creating static DataFrame, you can specify the details of the source – data format, schema, options, etc.
流数据帧可以通过 SparkSession.readStream()
返回的 DataStreamReader
接口(Scala/ Java/ Python文档)创建。在R中,使用 read.stream()
方法。与创建静态DataFrame的read接口类似,您可以指定源数据格式、模式、选项等细节。
Input Sources 输入源
There are a few built-in sources.
有几个内置的源。
- File source - Reads files written in a directory as a stream of data. Files will be processed in the order of file modification time. If
latestFirst
is set, order will be reversed. Supported file formats are text, CSV, JSON, ORC, Parquet. See the docs of the DataStreamReader interface for a more up-to-date list, and supported options for each file format. Note that the files must be atomically placed in the given directory, which in most file systems, can be achieved by file move operations.
文件源-将写入目录的文件作为数据流读取。文件将按照文件修改时间的顺序进行处理。如果设置了latestFirst
,顺序将颠倒。支持的文件格式是文本,CSV,JSON,ORC,Parquet。请参阅DataStreamReader接口的文档,以获得更新的列表以及每种文件格式的支持选项。请注意,文件必须原子地放置在给定的目录中,这在大多数文件系统中可以通过文件移动操作来实现。 -
Kafka source - Reads data from Kafka. It’s compatible with Kafka broker versions 0.10.0 or higher. See the Kafka Integration Guide for more details.
Kafka source -从Kafka读取数据。它与Kafka broker 0.10.0或更高版本兼容。有关详细信息,请参阅Kafka集成指南。 -
Socket source (for testing) - Reads UTF8 text data from a socket connection. The listening server socket is at the driver. Note that this should be used only for testing as this does not provide end-to-end fault-tolerance guarantees.
套接字源(用于测试)—从套接字连接读取UTF8文本数据。侦听服务器套接字位于驱动程序上。请注意,这应该仅用于测试,因为这不提供端到端的容错保证。 - Rate source (for testing) - Generates data at the specified number of rows per second, each output row contains a
timestamp
andvalue
. Wheretimestamp
is aTimestamp
type containing the time of message dispatch, andvalue
is ofLong
type containing the message count, starting from 0 as the first row. This source is intended for testing and benchmarking.
速率源(用于测试)—以每秒指定的行数生成数据,每个输出行包含timestamp
和value
。其中,timestamp
是包含消息发送时间的Timestamp
类型,value
是包含消息计数的Long
类型,从第一行0开始。此源代码用于测试和基准测试。
Some sources are not fault-tolerant because they do not guarantee that data can be replayed using
checkpointed offsets after a failure. See the earlier section on
fault-tolerance semantics.
Here are the details of all the sources in Spark.
有些数据源是不容错的,因为它们不能保证数据在发生故障后可以使用检查点偏移量重放。请参阅前面关于容错语义的部分。以下是Spark中所有源代码的详细信息。
Source | Options | Fault-tolerant | Notes |
---|---|---|---|
File source 文件源 |
path : path to the input directory, and common to all file formats.
path :输入目录的路径,对所有文件格式通用。 maxFilesPerTrigger : maximum number of new files to be considered in every trigger (default: no max)
maxFilesPerTrigger :每次触发器中考虑的最大新文件数(默认值:无最大值) latestFirst : whether to process the latest new files first, useful when there is a large backlog of files (default: false)
latestFirst :是否首先处理最新的新文件,在有大量积压文件时很有用(默认值:false) fileNameOnly : whether to check new files based on only the filename instead of on the full path (default: false). With this set to `true`, the following files would be considered as the same file, because their filenames, "dataset.txt", are the same:
fileNameOnly :是否仅根据文件名而不是完整路径检查新文件(默认值:false)。如果将此设置为“true”,则以下文件将被视为同一文件,因为它们的文件名“soft.txt”是相同的: "file:///dataset.txt" “文件:/文件集. txt” "s3://a/dataset.txt" “s3://a/dataset.txt” "s3n://a/b/dataset.txt" “s3n://a/b/paget.txt” "s3a://a/b/c/dataset.txt" “s3a://a/b/c/soft.txt” maxFileAge : Maximum age of a file that can be found in this directory, before it is ignored. For the first batch all files will be considered valid. If latestFirst is set to `true` and maxFilesPerTrigger is set, then this parameter will be ignored, because old files that are valid, and should be processed, may be ignored. The max age is specified with respect to the timestamp of the latest file, and not the timestamp of the current system.(default: 1 week)
maxFileAge :在此目录中可以找到的文件被忽略之前的最大年龄。对于第一批所有文件将被视为有效。如果 latestFirst 被设置为“true”,而 maxFilesPerTrigger 被设置,那么这个参数将被忽略,因为旧的文件是有效的,应该被处理,可能会被忽略。最大年龄是根据最新文件的时间戳而不是当前系统的时间戳指定的。(默认值:1周) cleanSource : option to clean up completed files after processing.cleanSource :选项来清理处理后完成的文件。Available options are "archive", "delete", "off". If the option is not provided, the default value is "off". 可用选项有“存档”、“删除”、“关闭”。如果未提供该选项,则默认值为“off”。 When "archive" is provided, additional option sourceArchiveDir must be provided as well. The value of "sourceArchiveDir" must not match with source pattern in depth (the number of directories from the root directory), where the depth is minimum of depth on both paths. This will ensure archived files are never included as new source files.当提供“archive”时,还必须提供附加选项 sourceArchiveDir 。“sourceArchiveDir”的值不能与源模式的深度(从根目录开始的目录数)匹配,其中深度是两个路径上的最小深度。这将确保归档文件永远不会作为新的源文件包括在内。For example, suppose you provide '/hello?/spark/*' as source pattern, '/hello1/spark/archive/dir' cannot be used as the value of "sourceArchiveDir", as '/hello?/spark/*' and '/hello1/spark/archive' will be matched. '/hello1/spark' cannot be also used as the value of "sourceArchiveDir", as '/hello?/spark' and '/hello1/spark' will be matched. '/archived/here' would be OK as it doesn't match. 例如,假设您提供'/hello?/ spark/*“作为源模式,”/hello 1/spark/archive/dir“不能用作“sourceArchiveDir”的值,因为“/hello?/ spark/*'和'/hello 1/spark/archive'将匹配。“/hello 1/spark”不能也用作“sourceArchiveDir”的值,因为“/hello?/ spark'和'/hello 1/spark'将被匹配。“/archived/here”可以,因为它不匹配。 Spark will move source files respecting their own path. For example, if the path of source file is /a/b/dataset.txt and the path of archive directory is /archived/here , file will be moved to /archived/here/a/b/dataset.txt .Spark会根据源文件的路径移动源文件。例如,如果源文件的路径是 /a/b/dataset.txt ,存档目录的路径是 /archived/here ,则文件将被移动到 /archived/here/a/b/dataset.txt 。NOTE: Both archiving (via moving) or deleting completed files will introduce overhead (slow down, even if it's happening in separate thread) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enabling this option will reduce the cost to list source files which can be an expensive operation. 注意事项:归档(通过移动)或删除已完成的文件都会在每个微批处理中引入开销(即使是在单独的线程中也会减慢),因此在启用此选项之前,您需要了解文件系统中每个操作的成本。另一方面,启用此选项将减少列出源文件的成本,这可能是一个昂贵的操作。 Number of threads used in completed file cleaner can be configured with spark.sql.streaming.fileSource.cleaner.numThreads (default: 1).完成的文件清理程序中使用的线程数可以使用 spark.sql.streaming.fileSource.cleaner.numThreads 配置(默认值:1)。NOTE 2: The source path should not be used from multiple sources or queries when enabling this option. Similarly, you must ensure the source path doesn't match to any files in output directory of file stream sink. 注2:启用此选项时,不应使用来自多个源或查询的源路径。同样,您必须确保源路径与文件流接收器的输出目录中的任何文件都不匹配。 NOTE 3: Both delete and move actions are best effort. Failing to delete or move files will not fail the streaming query. Spark may not clean up some source files in some circumstances - e.g. the application doesn't shut down gracefully, too many files are queued to clean up. 注3:删除和移动操作都是尽力而为。未能删除或移动文件不会导致流式查询失败。在某些情况下,Spark可能无法清理某些源文件-例如,应用程序无法正常关闭,太多文件排队等待清理。 For file-format-specific options, see the related methods in DataStreamReader
(Scala/Java/Python/R).
E.g. for "parquet" format options see DataStreamReader.parquet() .
有关文件格式特定的选项,请参阅 DataStreamReader (Scala/ Java/ Python/ R)中的相关方法。例如,有关“镶木地板”格式选项,请参见 DataStreamReader.parquet() 。 In addition, there are session configurations that affect certain file-formats. See the SQL Programming Guide for more details. E.g., for "parquet", see Parquet configuration section. 此外,还有一些会话配置会影响某些文件格式。有关详细信息,请参阅SQL编程指南。例如,在一个示例中,有关“镶木地板”,请参见镶木地板配置部分。 |
Yes | Supports glob paths, but does not support multiple comma-separated paths/globs. 支持glob路径,但不支持多个逗号分隔的路径/glob。 |
Socket Source 套接字源 |
host : host to connect to, must be specifiedhost :要连接的主机,必须指定port : port to connect to, must be specified
port :连接端口,必须指定 |
No | |
Rate Source 速率源 |
rowsPerSecond (e.g. 100, default: 1): How many rows should be generated per second.rowsPerSecond (例如100,默认值:1):每秒应该生成多少行。rampUpTime (e.g. 5s, default: 0s): How long to ramp up before the generating speed becomes rowsPerSecond . Using finer granularities than seconds will be truncated to integer seconds. rampUpTime (例如:5s,默认值:0s):发电速度变为 rowsPerSecond 之前的斜升时间。使用比秒更细的粒度将被截断为整数秒。 numPartitions (e.g. 10, default: Spark's default parallelism): The partition number for the generated rows. numPartitions (例如10,默认值:Spark的默认并行度):生成的行的分区号。 The source will try its best to reach rowsPerSecond , but the query may be resource constrained, and numPartitions can be tweaked to help reach the desired speed.
源将尽最大努力到达 rowsPerSecond ,但查询可能受到资源限制,并且可以调整 numPartitions 以帮助达到所需的速度。 |
Yes | |
Kafka Source Kafka来源 |
See the Kafka Integration Guide.
请参阅Kafka集成指南。 |
Yes | |
Here are some examples.
这里有一些例子。
These examples generate streaming DataFrames that are untyped, meaning that the schema of the DataFrame is not checked at compile time, only checked at runtime when the query is submitted. Some operations like map
, flatMap
, etc. need the type to be known at compile time. To do those, you can convert these untyped streaming DataFrames to typed streaming Datasets using the same methods as static DataFrame. See the SQL Programming Guide for more details. Additionally, more details on the supported streaming sources are discussed later in the document.
这些示例生成无类型的流数据帧,这意味着在编译时不检查数据帧的架构,而只在提交查询时检查。一些操作,如 map
、 flatMap
等,需要在编译时知道类型。为此,您可以使用与静态DataFrame相同的方法将这些非类型化的流式DataFrame转换为类型化的流式Datasets。有关详细信息,请参阅SQL编程指南。此外,有关支持的流媒体源的更多细节将在本文档后面讨论。
Since Spark 3.1, you can also create streaming DataFrames from tables with DataStreamReader.table()
. See Streaming Table APIs for more details.
从Spark 3.1开始,您还可以使用 DataStreamReader.table()
从表中创建流数据帧。有关详细信息,请参阅流式表API。
Schema inference and partition of streaming DataFrames/Datasets
流数据帧/数据集的模式推理和划分
By default, Structured Streaming from file based sources requires you to specify the schema, rather than rely on Spark to infer it automatically. This restriction ensures a consistent schema will be used for the streaming query, even in the case of failures. For ad-hoc use cases, you can reenable schema inference by setting spark.sql.streaming.schemaInference
to true
.
默认情况下,来自基于文件的源的结构化流需要您指定模式,而不是依赖Spark自动推断它。此限制确保了流查询使用一致的模式,即使在失败的情况下也是如此。对于特殊用例,您可以通过将 spark.sql.streaming.schemaInference
设置为 true
来重新启用模式推断。
Partition discovery does occur when subdirectories that are named /key=value/
are present and listing will automatically recurse into these directories. If these columns appear in the user-provided schema, they will be filled in by Spark based on the path of the file being read. The directories that make up the partitioning scheme must be present when the query starts and must remain static. For example, it is okay to add /data/year=2016/
when /data/year=2015/
was present, but it is invalid to change the partitioning column (i.e. by creating the directory /data/date=2016-04-17/
).
当存在名为 /key=value/
的子目录时,会发生分区发现,列表将自动递归到这些目录。如果这些列出现在用户提供的模式中,Spark将根据正在读取的文件的路径填充它们。组成分区方案的目录在查询开始时必须存在,并且必须保持静态。例如,当存在 /data/year=2015/
时添加 /data/year=2016/
是可以的,但是更改分区列(即通过创建目录 /data/date=2016-04-17/
)是无效的。
Operations on streaming DataFrames/Datasets
流数据帧/数据集上的操作
You can apply all kinds of operations on streaming DataFrames/Datasets – ranging from untyped, SQL-like operations (e.g. select
, where
, groupBy
), to typed RDD-like operations (e.g. map
, filter
, flatMap
). See the SQL programming guide for more details. Let’s take a look at a few example operations that you can use.
您可以对流式数据帧/数据集应用各种操作-从非类型化的SQL类操作(例如 select
, where
, groupBy
)到类型化的RDD类操作(例如 map
, filter
, flatMap
)。有关详细信息,请参阅SQL编程指南。让我们来看看您可以使用的几个示例操作。
Basic Operations - Selection, Projection, Aggregation
基本操作-选择、投影、聚合
Most of the common operations on DataFrame/Dataset are supported for streaming. The few operations that are not supported are discussed later in this section.
流支持DataFrame/Dataset上的大多数常见操作。不支持的几个操作将在本节后面讨论。
You can also register a streaming DataFrame/Dataset as a temporary view and then apply SQL commands on it.
您还可以将流式DataFrame/Dataset注册为临时视图,然后对其应用SQL命令。
Note, you can identify whether a DataFrame/Dataset has streaming data or not by using df.isStreaming
.
请注意,您可以使用 df.isStreaming
来识别DataFrame/Dataset是否具有流数据。
You may want to check the query plan of the query, as Spark could inject stateful operations during interpret of SQL statement against streaming dataset. Once stateful operations are injected in the query plan, you may need to check your query with considerations in stateful operations. (e.g. output mode, watermark, state store size maintenance, etc.)
您可能需要检查查询的查询计划,因为Spark可能会在针对流数据集解释SQL语句时注入有状态操作。在查询计划中注入有状态操作后,您可能需要检查查询中有状态操作的注意事项。(e.g.输出模式、水印、状态存储大小维护等)
Window Operations on Event Time
事件时间上的窗口操作
Aggregations over a sliding event-time window are straightforward with Structured Streaming and are very similar to grouped aggregations. In a grouped aggregation, aggregate values (e.g. counts) are maintained for each unique value in the user-specified grouping column. In case of window-based aggregations, aggregate values are maintained for each window the event-time of a row falls into. Let’s understand this with an illustration.
在滑动事件时间窗口上的聚合对于结构化流来说很简单,并且与分组聚合非常相似。在分组聚合中,为用户指定的分组列中的每个唯一值维护聚合值(例如计数)。在基于窗口的聚合的情况下,为行福尔斯的事件时间落入的每个窗口维护聚合值。让我们用一个例子来理解这一点。
Imagine our quick example is modified and the stream now contains lines along with the time when the line was generated. Instead of running word counts, we want to count words within 10 minute windows, updating every 5 minutes. That is, word counts in words received between 10 minute windows 12:00 - 12:10, 12:05 - 12:15, 12:10 - 12:20, etc. Note that 12:00 - 12:10 means data that arrived after 12:00 but before 12:10. Now, consider a word that was received at 12:07. This word should increment the counts corresponding to two windows 12:00 - 12:10 and 12:05 - 12:15. So the counts will be indexed by both, the grouping key (i.e. the word) and the window (can be calculated from the event-time).
假设修改了我们的快速示例,流现在包含了沿着生成时间的行。而不是运行单词计数,我们希望在10分钟内计算单词,每5分钟更新一次。也就是说,在10分钟窗口12:00 - 12:10、12:05 - 12:15、12:10 - 12:20等之间接收的单词中的单词计数。注意,12:00 - 12:10表示在12:00之后但在12:10之前到达的数据。现在,考虑一个在12:07收到的单词。这个词应该增加对应于两个窗口12:00 - 12:10和12:05 - 12:15的计数。因此,计数将由分组键(即单词)和窗口(可以从事件时间计算)两者索引。
The result tables would look something like the following.
结果表如下所示。
Since this windowing is similar to grouping, in code, you can use groupBy()
and window()
operations to express windowed aggregations. You can see the full code for the below examples in
Scala/Java/Python.
由于这种窗口化类似于分组,在代码中,您可以使用 groupBy()
和 window()
操作来表达窗口化聚合。你可以在Scala/ Java/ Python中看到下面例子的完整代码。
Handling Late Data and Watermarking
处理延迟数据和水印
Now consider what happens if one of the events arrives late to the application.
For example, say, a word generated at 12:04 (i.e. event time) could be received by
the application at 12:11. The application should use the time 12:04 instead of 12:11
to update the older counts for the window 12:00 - 12:10
. This occurs
naturally in our window-based grouping – Structured Streaming can maintain the intermediate state
for partial aggregates for a long period of time such that late data can update aggregates of
old windows correctly, as illustrated below.
现在考虑一下,如果其中一个事件延迟到达应用程序会发生什么。例如,在12:04(即事件时间)生成的单词可能在12:11被应用程序接收。应用程序应使用时间12:04而不是12:11来更新窗口 12:00 - 12:10
的较旧计数。这在我们的基于窗口的分组中自然发生-结构化流可以在很长一段时间内保持部分聚合的中间状态,以便后期数据可以正确更新旧窗口的聚合,如下图所示。
However, to run this query for days, it’s necessary for the system to bound the amount of
intermediate in-memory state it accumulates. This means the system needs to know when an old
aggregate can be dropped from the in-memory state because the application is not going to receive
late data for that aggregate any more. To enable this, in Spark 2.1, we have introduced
watermarking, which lets the engine automatically track the current event time in the data
and attempt to clean up old state accordingly. You can define the watermark of a query by
specifying the event time column and the threshold on how late the data is expected to be in terms of
event time. For a specific window ending at time T
, the engine will maintain state and allow late
data to update the state until (max event time seen by the engine - late threshold > T)
.
In other words, late data within the threshold will be aggregated,
but data later than the threshold will start getting dropped
(see later
in the section for the exact guarantees). Let’s understand this with an example. We can
easily define watermarking on the previous example using withWatermark()
as shown below.
但是,要运行此查询数天,系统必须限制它累积的中间内存状态的数量。这意味着系统需要知道何时可以从内存中状态删除旧的聚合,因为应用程序将不再接收该聚合的延迟数据。为了实现这一点,在Spark 2.1中,我们引入了水印,它让引擎自动跟踪数据中的当前事件时间,并尝试相应地清理旧状态。您可以通过指定事件时间列和数据在事件时间方面的预期延迟阈值来定义查询的水印。对于在时间 T
结束的特定窗口,引擎将保持状态并允许后期数据更新状态,直到 (max event time seen by the engine - late threshold > T)
。换句话说,在阈值内的延迟数据将被聚合,但晚于阈值的数据将开始被丢弃(有关确切的保证,请参阅本节后面的内容)。 让我们通过一个例子来理解这一点。我们可以很容易地定义水印上一个例子使用 withWatermark()
如下所示。
In this example, we are defining the watermark of the query on the value of the column “timestamp”,
and also defining “10 minutes” as the threshold of how late is the data allowed to be. If this query
is run in Update output mode (discussed later in Output Modes section),
the engine will keep updating counts of a window in the Result Table until the window is older
than the watermark, which lags behind the current event time in column “timestamp” by 10 minutes.
Here is an illustration.
在这个例子中,我们在列"timestamp"的值上定义了查询的水印,并且还定义了"10分钟"作为数据允许延迟的阈值。如果此查询在更新输出模式下运行(稍后在输出模式部分中讨论),则引擎将不断更新结果表中窗口的计数,直到窗口比水印更旧,这比列"时间戳"中的当前事件时间滞后10分钟。这里有一个例子。
As shown in the illustration, the maximum event time tracked by the engine is the
blue dashed line, and the watermark set as (max event time - '10 mins')
at the beginning of every trigger is the red line. For example, when the engine observes the data
(12:14, dog)
, it sets the watermark for the next trigger as 12:04
.
This watermark lets the engine maintain intermediate state for additional 10 minutes to allow late
data to be counted. For example, the data (12:09, cat)
is out of order and late, and it falls in
windows 12:00 - 12:10
and 12:05 - 12:15
. Since, it is still ahead of the watermark 12:04
in
the trigger, the engine still maintains the intermediate counts as state and correctly updates the
counts of the related windows. However, when the watermark is updated to 12:11
, the intermediate
state for window (12:00 - 12:10)
is cleared, and all subsequent data (e.g. (12:04, donkey)
)
is considered “too late” and therefore ignored. Note that after every trigger,
the updated counts (i.e. purple rows) are written to sink as the trigger output, as dictated by
the Update mode.
如图所示,引擎跟踪的最大事件时间为蓝色虚线,每个触发开始时设置为 (max event time - '10 mins')
的水印为红线。例如,当引擎观察到数据 (12:14, dog)
时,它将下一个触发器的水印设置为 12:04
。此水印允许引擎将中间状态再保持10分钟,以便对延迟数据进行计数。例如,数据 (12:09, cat)
是无序和迟到的,它福尔斯在窗口 12:00 - 12:10
和 12:05 - 12:15
。由于它仍然在触发器中的水印 12:04
之前,因此引擎仍然将中间计数保持为状态,并正确地更新相关窗口的计数。然而,当水印被更新为 12:11
时,窗口 (12:00 - 12:10)
的中间状态被清除,并且所有后续数据(例如 (12:04, donkey)
)被认为“太迟”并且因此被忽略。请注意,在每个触发器之后,更新的计数(即紫色行)将作为触发器输出写入sink,如Update模式所指示的。
Some sinks (e.g. files) may not supported fine-grained updates that Update Mode requires. To work
with them, we have also support Append Mode, where only the final counts are written to sink.
This is illustrated below.
某些接收器(例如文件)可能不支持更新模式所需的细粒度更新。为了使用它们,我们还支持Append Mode,其中只有最终计数被写入sink。下文对此进行了说明。
Note that using withWatermark
on a non-streaming Dataset is no-op. As the watermark should not affect
any batch query in any way, we will ignore it directly.
请注意,在非流数据集上使用 withWatermark
是无操作的。由于水印不应该以任何方式影响任何批查询,我们将直接忽略它。
Similar to the Update Mode earlier, the engine maintains intermediate counts for each window.
However, the partial counts are not updated to the Result Table and not written to sink. The engine
waits for “10 mins” for late date to be counted,
then drops intermediate state of a window < watermark, and appends the final
counts to the Result Table/sink. For example, the final counts of window 12:00 - 12:10
is
appended to the Result Table only after the watermark is updated to 12:11
.
与前面的更新模式类似,引擎为每个窗口维护中间计数。但是,部分计数不会更新到结果表,也不会写入接收器。引擎等待"10分钟"以计算最后日期,然后丢弃窗口<水印的中间状态,并将最终计数附加到结果表/接收器。例如,只有在水印更新为 12:11
之后,窗口 12:00 - 12:10
的最终计数才被附加到结果表。
Conditions for watermarking to clean aggregation state
水印清除聚集状态的条件
It is important to note that the following conditions must be satisfied for the watermarking to
clean the state in aggregation queries (as of Spark 2.1.1, subject to change in the future).
需要注意的是,水印必须满足以下条件才能在聚合查询中清除状态(从Spark 2.1.1开始,未来可能会更改)。
-
Output mode must be Append or Update. Complete mode requires all aggregate data to be preserved, and hence cannot use watermarking to drop intermediate state. See the Output Modes section for detailed explanation of the semantics of each output mode.
输出模式必须为“追加”或“更新”。完整模式要求保留所有聚合数据,因此不能使用水印删除中间状态。有关每个输出模式的语义的详细说明,请参见输出模式部分。 -
The aggregation must have either the event-time column, or a
window
on the event-time column.
聚合必须具有事件时间列,或在事件时间列上具有window
。 -
withWatermark
must be called on the same column as the timestamp column used in the aggregate. For example,df.withWatermark("time", "1 min").groupBy("time2").count()
is invalid in Append output mode, as watermark is defined on a different column from the aggregation column.
必须在聚合中使用的timestamp列的同一列上调用withWatermark
。例如,df.withWatermark("time", "1 min").groupBy("time2").count()
在“追加”输出模式下无效,因为水印定义在与聚合列不同的列上。 -
withWatermark
must be called before the aggregation for the watermark details to be used. For example,df.groupBy("time").count().withWatermark("time", "1 min")
is invalid in Append output mode.
必须在聚合之前调用withWatermark
,以便使用水印详细信息。例如,df.groupBy("time").count().withWatermark("time", "1 min")
在追加输出模式下无效。
Semantic Guarantees of Aggregation with Watermarking
基于水印的聚合算法的语义保证
-
A watermark delay (set with
withWatermark
) of “2 hours” guarantees that the engine will never drop any data that is less than 2 hours delayed. In other words, any data less than 2 hours behind (in terms of event-time) the latest data processed till then is guaranteed to be aggregated.
水印延迟(使用withWatermark
设置)“2小时”保证引擎永远不会丢弃任何延迟小于2小时的数据。换句话说,任何比最新处理的数据晚不到2小时(就事件时间而言)的数据都保证被聚合。 -
However, the guarantee is strict only in one direction. Data delayed by more than 2 hours is not guaranteed to be dropped; it may or may not get aggregated. More delayed is the data, less likely is the engine going to process it.
然而,这种保证只在一个方向上是严格的。延迟超过2小时的数据不一定会被丢弃;它可能会也可能不会被聚合。数据延迟越多,引擎处理它的可能性就越小。
Join Operations 连接操作
Structured Streaming supports joining a streaming Dataset/DataFrame with a static Dataset/DataFrame
as well as another streaming Dataset/DataFrame. The result of the streaming join is generated
incrementally, similar to the results of streaming aggregations in the previous section. In this
section we will explore what type of joins (i.e. inner, outer, semi, etc.) are supported in the above
cases. Note that in all the supported join types, the result of the join with a streaming
Dataset/DataFrame will be the exactly the same as if it was with a static Dataset/DataFrame
containing the same data in the stream.
结构化流支持将流数据集/数据帧与静态数据集/数据帧以及另一个流数据集/数据帧连接。流式连接的结果是增量生成的,类似于上一节中的流式聚合的结果。在本节中,我们将探讨什么类型的连接(即内部,外部,半等)。在上述案例中得到支持。请注意,在所有支持的连接类型中,与流数据集/数据帧的连接结果将与包含流中相同数据的静态数据集/数据帧完全相同。
Stream-static Joins 流静态连接
Since the introduction in Spark 2.0, Structured Streaming has supported joins (inner join and some
type of outer joins) between a streaming and a static DataFrame/Dataset. Here is a simple example.
自从Spark 2.0引入以来,结构化流已经支持流和静态DataFrame/Dataset之间的连接(内部连接和某种类型的外部连接)。这里有一个简单的例子。
Note that stream-static joins are not stateful, so no state management is necessary.
However, a few types of stream-static outer joins are not yet supported.
These are listed at the end of this Join section.
请注意,流静态连接不是有状态的,因此不需要状态管理。但是,有几种类型的流静态外部联接尚不受支持。这些列在此连接部分的末尾。
Stream-stream Joins 流-流连接
In Spark 2.3, we have added support for stream-stream joins, that is, you can join two streaming
Datasets/DataFrames. The challenge of generating join results between two data streams is that,
at any point of time, the view of the dataset is incomplete for both sides of the join making
it much harder to find matches between inputs. Any row received from one input stream can match
with any future, yet-to-be-received row from the other input stream. Hence, for both the input
streams, we buffer past input as streaming state, so that we can match every future input with
past input and accordingly generate joined results. Furthermore, similar to streaming aggregations,
we automatically handle late, out-of-order data and can limit the state using watermarks.
Let’s discuss the different types of supported stream-stream joins and how to use them.
在Spark 2.3中,我们增加了对流—流连接的支持,也就是说,你可以连接两个流数据集/数据帧。在两个数据流之间生成连接结果的挑战在于,在任何时间点,数据集的视图对于连接的两端都是不完整的,这使得在输入之间找到匹配变得更加困难。从一个输入流接收到的任何行都可以与来自另一个输入流的任何未来的、尚未接收到的行相匹配。因此,对于两个输入流,我们将过去的输入缓冲为流式状态,以便我们可以将每个未来输入与过去的输入相匹配,并相应地生成连接结果。此外,与流式聚合类似,我们自动处理延迟的无序数据,并可以使用水印限制状态。让我们讨论支持的不同类型的流—流连接以及如何使用它们。
Inner Joins with optional Watermarking
带可选水印的内联接
Inner joins on any kind of columns along with any kind of join conditions are supported.
However, as the stream runs, the size of streaming state will keep growing indefinitely as
all past input must be saved as any new input can match with any input from the past.
To avoid unbounded state, you have to define additional join conditions such that indefinitely
old inputs cannot match with future inputs and therefore can be cleared from the state.
In other words, you will have to do the following additional steps in the join.
支持任何类型的柱沿着上的内部连接以及任何类型的连接条件。然而,随着流的运行,流状态的大小将无限期地增长,因为所有过去的输入都必须被保存,因为任何新的输入都可以与过去的任何输入相匹配。为了避免无界状态,必须定义额外的连接条件,使得无限旧的输入不能与未来的输入匹配,因此可以从状态中清除。换句话说,你必须在连接中执行以下附加步骤。
-
Define watermark delays on both inputs such that the engine knows how delayed the input can be (similar to streaming aggregations)
在两个输入上定义水印延迟,以便引擎知道输入的延迟程度(类似于流聚合) -
Define a constraint on event-time across the two inputs such that the engine can figure out when old rows of one input is not going to be required (i.e. will not satisfy the time constraint) for matches with the other input. This constraint can be defined in one of the two ways.
在两个输入上定义一个事件时间约束,这样引擎就可以确定何时不需要一个输入的旧行(即不满足时间约束)来与另一个输入进行匹配。这个约束可以用两种方式中的一种来定义。-
Time range join conditions (e.g.
...JOIN ON leftTime BETWEEN rightTime AND rightTime + INTERVAL 1 HOUR
),
时间范围连接条件(例如...JOIN ON leftTime BETWEEN rightTime AND rightTime + INTERVAL 1 HOUR
), -
Join on event-time windows (e.g.
...JOIN ON leftTimeWindow = rightTimeWindow
).
在事件时间窗口上加入(例如...JOIN ON leftTimeWindow = rightTimeWindow
)。
-
Let’s understand this with an example.
让我们通过一个例子来理解这一点。
Let’s say we want to join a stream of advertisement impressions (when an ad was shown) with
another stream of user clicks on advertisements to correlate when impressions led to
monetizable clicks. To allow the state cleanup in this stream-stream join, you will have to
specify the watermarking delays and the time constraints as follows.
假设我们想将一个广告印象流(当广告显示时)与另一个用户点击广告的流结合起来,以关联印象何时导致可货币化的点击。为了允许在这个流-流连接中进行状态清理,您必须指定水印延迟和时间约束,如下所示。
-
Watermark delays: Say, the impressions and the corresponding clicks can be late/out-of-order in event-time by at most 2 and 3 hours, respectively.
水印延迟:也就是说,展示和相应的点击在事件时间上最多分别延迟/乱序2小时和3小时。 -
Event-time range condition: Say, a click can occur within a time range of 0 seconds to 1 hour after the corresponding impression.
事件-时间范围条件:例如,点击可以在相应印象之后的0秒到1小时的时间范围内发生。
The code would look like this.
代码看起来像这样。
Semantic Guarantees of Stream-stream Inner Joins with Watermarking
带水印的流-流内连接的语义保证
This is similar to the guarantees provided by watermarking on aggregations.
A watermark delay of “2 hours” guarantees that the engine will never drop any data that is less than
2 hours delayed. But data delayed by more than 2 hours may or may not get processed.
这类似于在聚合上加水印所提供的保证。“2小时”的水印延迟保证引擎永远不会丢弃任何延迟小于2小时的数据。但延迟超过2小时的数据可能会或可能不会得到处理。
Outer Joins with Watermarking
带水印的外部联接
While the watermark + event-time constraints is optional for inner joins, for outer joins
they must be specified. This is because for generating the NULL results in outer join, the
engine must know when an input row is not going to match with anything in future. Hence, the
watermark + event-time constraints must be specified for generating correct results. Therefore,
a query with outer-join will look quite like the ad-monetization example earlier, except that
there will be an additional parameter specifying it to be an outer-join.
虽然水印+事件时间约束对于内部连接是可选的,但对于外部连接,必须指定它们。这是因为为了在外部连接中生成NULL结果,引擎必须知道输入行将来何时不会与任何内容匹配。因此,必须指定水印+事件时间约束以生成正确的结果。因此,带有outer-join的查询看起来与前面的广告货币化示例非常相似,只是有一个额外的参数将其指定为outer-join。
Semantic Guarantees of Stream-stream Outer Joins with Watermarking
带水印的流-流外连接的语义保证
Outer joins have the same guarantees as inner joins
regarding watermark delays and whether data will be dropped or not.
关于水印延迟以及数据是否会被丢弃,外部连接与内部连接具有相同的保证。
Caveats 警告
There are a few important characteristics to note regarding how the outer results are generated.
关于外部结果是如何生成的,有几个重要的特征需要注意。
-
The outer NULL results will be generated with a delay that depends on the specified watermark delay and the time range condition. This is because the engine has to wait for that long to ensure there were no matches and there will be no more matches in future.
外部NULL结果的生成延迟取决于指定的水印延迟和时间范围条件。这是因为引擎必须等待那么长时间,以确保没有匹配,并且将来不会有更多的匹配。 -
In the current implementation in the micro-batch engine, watermarks are advanced at the end of a micro-batch, and the next micro-batch uses the updated watermark to clean up state and output outer results. Since we trigger a micro-batch only when there is new data to be processed, the generation of the outer result may get delayed if there no new data being received in the stream. In short, if any of the two input streams being joined does not receive data for a while, the outer (both cases, left or right) output may get delayed.
在微批处理引擎的当前实现中,水印在微批处理结束时被推进,并且下一个微批处理使用更新的水印来清理状态并输出外部结果。由于我们仅在有新数据要处理时才触发微批处理,因此如果流中没有接收到新数据,则外部结果的生成可能会延迟。简而言之,如果被连接的两个输入流中的任何一个在一段时间内没有接收到数据,则外部(两种情况,左或右)输出可能会延迟。
Semi Joins with Watermarking
带水印的半联接
A semi join returns values from the left side of the relation that has a match with the right.
It is also referred to as a left semi join. Similar to outer joins, watermark + event-time
constraints must be specified for semi join. This is to evict unmatched input rows on left side,
the engine must know when an input row on left side is not going to match with anything on right
side in future.
半联接返回来自与右侧匹配的关系左侧的值。它也被称为左半联接。与外部联接类似,必须为半联接指定水印+事件时间约束。这是为了驱逐左侧不匹配的输入行,引擎必须知道左侧的输入行何时不会与右侧的任何内容匹配。
Semantic Guarantees of Stream-stream Semi Joins with Watermarking
带水印的流—流半连接的语义保证
Semi joins have the same guarantees as inner joins
regarding watermark delays and whether data will be dropped or not.
关于水印延迟和数据是否会被丢弃,半连接具有与内连接相同的保证。
Support matrix for joins in streaming queries
流查询中连接的支持矩阵
Left Input 左输入 | Right Input 右输入 | Join Type 联接类型 | |
---|---|---|---|
Static | Static | All types 所有类型 |
Supported, since its not on streaming data even though it
can be present in a streaming query
支持,因为它不在流数据上,即使它可以存在于流查询中 |
Stream | Static | Inner | Supported, not stateful 受支持,无状态 |
Left Outer 左外 | Supported, not stateful 受支持,无状态 | ||
Right Outer 右外 | Not supported 不支持 | ||
Full Outer 完全外部 | Not supported 不支持 | ||
Left Semi 左半 | Supported, not stateful 受支持,无状态 | ||
Static | Stream | Inner | Supported, not stateful 受支持,无状态 |
Left Outer 左外 | Not supported 不支持 | ||
Right Outer 右外 | Supported, not stateful 受支持,无状态 | ||
Full Outer 完全外部 | Not supported 不支持 | ||
Left Semi 左半 | Not supported 不支持 | ||
Stream | Stream | Inner |
Supported, optionally specify watermark on both sides +
time constraints for state cleanup
支持,可选择在两侧指定水印+状态清除的时间限制 |
Left Outer 左外 |
Conditionally supported, must specify watermark on right + time constraints for correct
results, optionally specify watermark on left for all state cleanup
支持连接,必须在右侧指定水印+时间限制以获得正确的结果,可选地在左侧指定水印以进行所有状态清理 |
||
Right Outer 右外 |
Conditionally supported, must specify watermark on left + time constraints for correct
results, optionally specify watermark on right for all state cleanup
支持连接,必须在左侧指定水印+时间限制以获得正确的结果,可选地在右侧指定水印以进行所有状态清理 |
||
Full Outer 完全外部 |
Conditionally supported, must specify watermark on one side + time constraints for correct
results, optionally specify watermark on the other side for all state cleanup
支持连接,必须在一侧指定水印+时间限制以获得正确的结果,可选地在另一侧指定水印以进行所有状态清理 |
||
Left Semi 左半 |
Conditionally supported, must specify watermark on right + time constraints for correct
results, optionally specify watermark on left for all state cleanup
支持连接,必须在右侧指定水印+时间限制以获得正确的结果,可选地在左侧指定水印以进行所有状态清理 |
||
Additional details on supported joins:
有关支持的联接的其他详细信息:
-
Joins can be cascaded, that is, you can do
df1.join(df2, ...).join(df3, ...).join(df4, ....)
.
Joins可以级联,也就是说,你可以做df1.join(df2, ...).join(df3, ...).join(df4, ....)
。 -
As of Spark 2.4, you can use joins only when the query is in Append output mode. Other output modes are not yet supported.
从Spark 2.4开始,只有当查询处于Append输出模式时才能使用joins。其他输出模式尚不支持。 -
As of Spark 2.4, you cannot use other non-map-like operations before joins. Here are a few examples of what cannot be used.
从Spark 2.4开始,你不能在joins之前使用其他非map类的操作。以下是一些不能使用的例子。-
Cannot use streaming aggregations before joins.
无法在联接之前使用流式聚合。 -
Cannot use mapGroupsWithState and flatMapGroupsWithState in Update mode before joins.
在联接之前,不能在更新模式下使用mapGroupsWithState和flatMapGroupsWithState。
-
Streaming Deduplication 流媒体
You can deduplicate records in data streams using a unique identifier in the events. This is exactly same as deduplication on static using a unique identifier column. The query will store the necessary amount of data from previous records such that it can filter duplicate records. Similar to aggregations, you can use deduplication with or without watermarking.
您可以使用事件中的唯一标识符消除数据流中的重复记录。这与使用唯一标识符列的静态重复数据消除完全相同。查询将存储来自以前记录的必要数据量,以便可以过滤重复记录。与聚合类似,您可以使用带或不带水印的重复数据消除。
-
With watermark - If there is an upper bound on how late a duplicate record may arrive, then you can define a watermark on an event time column and deduplicate using both the guid and the event time columns. The query will use the watermark to remove old state data from past records that are not expected to get any duplicates any more. This bounds the amount of the state the query has to maintain.
带水印—如果重复记录的到达时间有上限,则可以在事件时间列上定义水印,并使用guid和事件时间列进行重复数据消除。查询将使用水印从过去的记录中删除旧的状态数据,这些记录预计不会再获得任何重复的数据。这限制了查询必须维护的状态的数量。 -
Without watermark - Since there are no bounds on when a duplicate record may arrive, the query stores the data from all the past records as state.
无水印——由于重复记录到达的时间没有限制,因此查询将所有过去记录的数据存储为状态。
Policy for handling multiple watermarks
处理多个水印的策略
A streaming query can have multiple input streams that are unioned or joined together.
Each of the input streams can have a different threshold of late data that needs to
be tolerated for stateful operations. You specify these thresholds using
withWatermarks("eventTime", delay)
on each of the input streams. For example, consider
a query with stream-stream joins between inputStream1
and inputStream2
.
流式查询可以具有联合或联接在一起的多个输入流。每个输入流可以具有不同的延迟数据阈值,该延迟数据阈值需要被容忍用于有状态操作。您可以在每个输入流上使用 withWatermarks("eventTime", delay)
指定这些阈值。例如,考虑在 inputStream1
和 inputStream2
之间具有流-流连接的查询。
While executing the query, Structured Streaming individually tracks the maximum
event time seen in each input stream, calculates watermarks based on the corresponding delay,
and chooses a single global watermark with them to be used for stateful operations. By default,
the minimum is chosen as the global watermark because it ensures that no data is
accidentally dropped as too late if one of the streams falls behind the others
(for example, one of the streams stops receiving data due to upstream failures). In other words,
the global watermark will safely move at the pace of the slowest stream and the query output will
be delayed accordingly.
在执行查询时,结构化流单独跟踪每个输入流中出现的最大事件时间,基于相应的延迟计算水印,并选择单个全局水印用于有状态操作。默认情况下,选择最小值作为全局水印,因为它可以确保在其中一个流福尔斯落后于其他流(例如,其中一个流由于上游故障而停止接收数据)时,不会因太晚而意外丢弃数据。换句话说,全局水印将以最慢流的速度安全地移动,并且查询输出将相应地延迟。
However, in some cases, you may want to get faster results even if it means dropping data from the
slowest stream. Since Spark 2.4, you can set the multiple watermark policy to choose
the maximum value as the global watermark by setting the SQL configuration
spark.sql.streaming.multipleWatermarkPolicy
to max
(default is min
).
This lets the global watermark move at the pace of the fastest stream.
However, as a side effect, data from the slower streams will be aggressively dropped. Hence, use
this configuration judiciously.
但是,在某些情况下,您可能希望获得更快的结果,即使这意味着从最慢的流中删除数据。从Spark 2.4开始,您可以通过将SQL配置 spark.sql.streaming.multipleWatermarkPolicy
设置为 max
(默认为 min
)来设置多水印策略以选择最大值作为全局水印。这使得全局水印以最快流的速度移动。然而,作为副作用,来自较慢流的数据将被积极丢弃。因此,请明智地使用此配置。
Arbitrary Stateful Operations
任意状态操作
Many usecases require more advanced stateful operations than aggregations. For example, in many usecases, you have to track sessions from data streams of events. For doing such sessionization, you will have to save arbitrary types of data as state, and perform arbitrary operations on the state using the data stream events in every trigger. Since Spark 2.2, this can be done using the operation mapGroupsWithState
and the more powerful operation flatMapGroupsWithState
. Both operations allow you to apply user-defined code on grouped Datasets to update user-defined state. For more concrete details, take a look at the API documentation (Scala/Java) and the examples (Scala/Java).
许多用例需要比聚合更高级的有状态操作。例如,在许多用例中,您必须从事件的数据流中跟踪会话。为了进行这种会话化,您必须将任意类型的数据保存保存为状态,并使用每个触发器中的数据流事件对状态执行任意操作。从Spark 2.2开始,这可以使用操作 mapGroupsWithState
和更强大的操作 flatMapGroupsWithState
来完成。这两个操作都允许您在分组的数据集上应用用户定义的代码来更新用户定义的状态。有关更多具体细节,请查看API文档(Scala/ Java)和示例(Scala/ Java)。
Though Spark cannot check and force it, the state function should be implemented with respect to the semantics of the output mode. For example, in Update mode Spark doesn’t expect that the state function will emit rows which are older than current watermark plus allowed late record delay, whereas in Append mode the state function can emit these rows.
虽然Spark不能检查和强制它,但应该根据输出模式的语义来实现状态函数。例如,在更新模式下,Spark不期望状态函数发出比当前水印加上允许的后期记录延迟更早的行,而在追加模式下,状态函数可以发出这些行。
Unsupported Operations 不支持的操作
There are a few DataFrame/Dataset operations that are not supported with streaming DataFrames/Datasets.
Some of them are as follows.
有几个DataFrame/Dataset操作不支持流式DataFrame/Datasets。其中一些如下。
-
Multiple streaming aggregations (i.e. a chain of aggregations on a streaming DF) are not yet supported on streaming Datasets.
流式数据集尚不支持多个流式聚合(即流式DF上的聚合链)。 -
Limit and take the first N rows are not supported on streaming Datasets.
在流式数据集上不支持限制和取前N行。 -
Distinct operations on streaming Datasets are not supported.
不支持对流式数据集执行不同操作。 -
Sorting operations are supported on streaming Datasets only after an aggregation and in Complete Output Mode.
只有在聚合后和完全输出模式下,才支持流式数据集的排序操作。 -
Few types of outer joins on streaming Datasets are not supported. See the support matrix in the Join Operations section for more details.
不支持流式数据集上的几种外部联接类型。有关更多详细信息,请参阅连接操作部分中的支持列表。
In addition, there are some Dataset methods that will not work on streaming Datasets. They are actions that will immediately run queries and return results, which does not make sense on a streaming Dataset. Rather, those functionalities can be done by explicitly starting a streaming query (see the next section regarding that).
此外,还有一些Dataset方法无法在流式数据集上工作。它们是将立即运行查询并返回结果的操作,这在流数据集上没有意义。相反,这些功能可以通过显式启动流式查询来完成(请参阅下一节)。
-
count()
- Cannot return a single count from a streaming Dataset. Instead, useds.groupBy().count()
which returns a streaming Dataset containing a running count.
count()
—无法从流式数据集返回单个计数。相反,使用ds.groupBy().count()
返回包含运行计数的流式数据集。 -
foreach()
- Instead useds.writeStream.foreach(...)
(see next section).
foreach()
—改为使用ds.writeStream.foreach(...)
(参见下一节)。 -
show()
- Instead use the console sink (see next section).
show()
—改为使用控制台接收器(参见下一节)。
If you try any of these operations, you will see an AnalysisException
like “operation XYZ is not supported with streaming DataFrames/Datasets”.
While some of them may be supported in future releases of Spark,
there are others which are fundamentally hard to implement on streaming data efficiently.
For example, sorting on the input stream is not supported, as it requires keeping
track of all the data received in the stream. This is therefore fundamentally hard to execute
efficiently.
如果您尝试这些操作中的任何一个,您将看到一个类似于“操作XYZ不支持流式数据帧/数据集”的 AnalysisException
。虽然其中一些可能会在Spark的未来版本中得到支持,但还有一些从根本上很难有效地在流数据上实现。例如,不支持对输入流进行排序,因为它需要跟踪流中接收到的所有数据。因此,这从根本上难以有效地执行。
Limitation of global watermark
全局水印的局限性
In Append mode, if a stateful operation emits rows older than current watermark plus allowed late record delay,
they will be “late rows” in downstream stateful operations (as Spark uses global watermark). Note that these rows may be discarded.
This is a limitation of a global watermark, and it could potentially cause a correctness issue.
在Append模式下,如果有状态操作发出的行早于当前水印加上允许的延迟记录延迟,则它们将在下游有状态操作中成为“延迟行”(因为Spark使用全局水印)。请注意,这些行可能会被丢弃。这是全局水印的一个限制,并且它可能潜在地导致正确性问题。
Spark will check the logical plan of query and log a warning when Spark detects such a pattern.
Spark将检查查询的逻辑计划,并在Spark检测到这种模式时记录警告。
Any of the stateful operation(s) after any of below stateful operations can have this issue:
以下任何有状态操作之后的任何有状态操作都可能存在此问题:
- streaming aggregation in Append mode
追加模式下的流聚合 - stream-stream outer join
河流-河流外连接 mapGroupsWithState
andflatMapGroupsWithState
in Append mode (depending on the implementation of the state function)
Append模式下的mapGroupsWithState
和flatMapGroupsWithState
(取决于state函数的实现)
As Spark cannot check the state function of mapGroupsWithState
/flatMapGroupsWithState
, Spark assumes that the state function
emits late rows if the operator uses Append mode.
由于Spark无法检查 mapGroupsWithState
/ flatMapGroupsWithState
的状态函数,因此Spark假设如果操作符使用Append模式,则状态函数会发出最后的行。
Spark provides two ways to check the number of late rows on stateful operators which would help you identify the issue:
Spark提供了两种方法来检查有状态操作符上的延迟行数,这将有助于您识别问题:
- On Spark UI: check the metrics in stateful operator nodes in query execution details page in SQL tab
在Spark UI上:在SQL选项卡的查询执行详细信息页面中检查有状态操作符节点中的指标 - On Streaming Query Listener: check “numRowsDroppedByWatermark” in “stateOperators” in QueryProcessEvent.
在流式查询中:检查QueryProcessEvent中“stateOperators”中的“numRowsDroppedByWatermark”。
Please note that “numRowsDroppedByWatermark” represents the number of “dropped” rows by watermark, which is not always same as the count of “late input rows” for the operator.
It depends on the implementation of the operator - e.g. streaming aggregation does pre-aggregate input rows and checks the late inputs against pre-aggregated inputs,
hence the number is not same as the number of original input rows. You’d like to just check the fact whether the value is zero or non-zero.
请注意,“numRowsDroppedByWatermark”表示水印“丢弃”的行数,这并不总是与运算符的“后期输入行”计数相同。这取决于操作符的实现-例如,流式聚合会预先聚合输入行,并将后期输入与预先聚合的输入进行检查,因此数量与原始输入行的数量不同。你只需要检查这个值是零还是非零。
There’s a known workaround: split your streaming query into multiple queries per stateful operator, and ensure
end-to-end exactly once per query. Ensuring end-to-end exactly once for the last query is optional.
有一个已知的解决方法:将流查询拆分为每个有状态操作符的多个查询,并确保每个查询只进行一次端到端的查询。确保对最后一个查询只执行一次端到端操作是可选的。
Starting Streaming Queries
开始流媒体播放
Once you have defined the final result DataFrame/Dataset, all that is left is for you to start the streaming computation. To do that, you have to use the DataStreamWriter
(Scala/Java/Python docs)
returned through Dataset.writeStream()
. You will have to specify one or more of the following in this interface.
一旦定义了最终结果DataFrame/Dataset,剩下的就是开始流式计算了。要做到这一点,您必须使用通过 Dataset.writeStream()
返回的 DataStreamWriter
(Scala/ Java/ Python文档)。您必须在此接口中指定以下一项或多项。
-
Details of the output sink: Data format, location, etc.
输出接收器的详细信息:数据格式、位置等。 -
Output mode: Specify what gets written to the output sink.
输出模式:指定写入输出接收器的内容。 -
Query name: Optionally, specify a unique name of the query for identification.
查询名称:(可选)指定查询的唯一名称以进行标识。 -
Trigger interval: Optionally, specify the trigger interval. If it is not specified, the system will check for availability of new data as soon as the previous processing has been completed. If a trigger time is missed because the previous processing has not been completed, then the system will trigger processing immediately.
触发间隔:(可选)指定触发间隔。如果未指定,则系统将在上一次处理完成后立即检查新数据的可用性。如果由于先前的处理尚未完成而错过触发时间,则系统将立即触发处理。 -
Checkpoint location: For some output sinks where the end-to-end fault-tolerance can be guaranteed, specify the location where the system will write all the checkpoint information. This should be a directory in an HDFS-compatible fault-tolerant file system. The semantics of checkpointing is discussed in more detail in the next section.
检查站位置:对于某些可以保证端到端容错的输出接收器,请指定系统将写入所有检查点信息的位置。这应该是一个与HDFS兼容的容错文件系统中的目录。检查点的语义将在下一节中更详细地讨论。
Output Modes 输出模式
There are a few types of output modes.
有几种类型的输出模式。
-
Append mode (default) - This is the default mode, where only the new rows added to the Result Table since the last trigger will be outputted to the sink. This is supported for only those queries where rows added to the Result Table is never going to change. Hence, this mode guarantees that each row will be output only once (assuming fault-tolerant sink). For example, queries with only
select
,where
,map
,flatMap
,filter
,join
, etc. will support Append mode.
Append mode(默认)—这是默认模式,其中只有自上次触发器以来添加到结果表的新行才会输出到接收器。只有那些添加到结果表中的行永远不会更改的查询才支持这种方法。因此,这种模式保证每一行只输出一次(假设是容错接收器)。例如,只有select
、where
、map
、flatMap
、filter
、join
等的查询将支持Append模式。 -
Complete mode - The whole Result Table will be outputted to the sink after every trigger. This is supported for aggregation queries.
完整模式—整个结果表将输出到接收器后,每次触发。聚合查询支持此功能。 -
Update mode - (Available since Spark 2.1.1) Only the rows in the Result Table that were updated since the last trigger will be outputted to the sink. More information to be added in future releases.
更新模式—(自Spark 2.1.1起可用)只有结果表中自上次触发器以来更新的行才会输出到接收器。更多信息将在未来版本中添加。
Different types of streaming queries support different output modes.
Here is the compatibility matrix.
不同类型的流式查询支持不同的输出模式。这是兼容性矩阵。
Query Type 查询类型 | Supported Output Modes 支持的输出模式 | Notes | |
---|---|---|---|
Queries with aggregation 聚集性 |
Aggregation on event-time with watermark 带水印的事件时间聚合 |
Append, Update, Complete 追加、更新、完成 |
Append mode uses watermark to drop old aggregation state. But the output of a
windowed aggregation is delayed the late threshold specified in withWatermark() as by
the modes semantics, rows can be added to the Result Table only once after they are
finalized (i.e. after watermark is crossed). See the
Late Data section for more details.
附加模式使用水印删除旧的聚合状态。但是,窗口化聚合的输出延迟了 withWatermark() 中指定的延迟阈值,因为模式语义,行只能在完成后(即,在越过水印后)添加到结果表中一次。有关详细信息,请参阅“最新数据”部分。 Update mode uses watermark to drop old aggregation state. 更新模式使用水印删除旧的聚合状态。 Complete mode does not drop old aggregation state since by definition this mode preserves all data in the Result Table. 完成模式不会删除旧的聚合状态,因为根据定义,此模式将保留结果表中的所有数据。 |
Other aggregations 其他聚合物 | Complete, Update 完成,更新 |
Since no watermark is defined (only defined in other category),
old aggregation state is not dropped.
由于未定义水印(仅在其他类别中定义),因此不会删除旧的聚合状态。 Append mode is not supported as aggregates can update thus violating the semantics of this mode. 不支持追加模式,因为聚合可以更新,从而违反此模式的语义。 |
|
Queries with mapGroupsWithState 用 mapGroupsWithState |
Update |
Aggregations not allowed in a query with mapGroupsWithState .
在带有 mapGroupsWithState 的查询中不允许聚合。 |
|
Queries with flatMapGroupsWithState 用 flatMapGroupsWithState |
Append operation mode 追加操作模式 | Append |
Aggregations are allowed after flatMapGroupsWithState .
flatMapGroupsWithState 后允许聚合。 |
Update operation mode 更新操作模式 | Update |
Aggregations not allowed in a query with flatMapGroupsWithState .
在带有 flatMapGroupsWithState 的查询中不允许聚合。 |
|
Queries with joins 用 joins |
Append |
Update and Complete mode not supported yet. See the
support matrix in the Join Operations section
for more details on what types of joins are supported.
尚不支持更新和完成模式。有关支持哪些类型的联接的详细信息,请参阅联接操作部分中的支持列表。 |
|
Other queries 其他查询 | Append, Update 追加、更新 |
Complete mode not supported as it is infeasible to keep all unaggregated data in the Result Table.
不支持完整模式,因为在结果表中保留所有未聚合的数据是不可行的。 |
|
Output Sinks 输出接收器
There are a few types of built-in output sinks.
有几种类型的内置输出接收器。
- File sink - Stores the output to a directory.
文件接收器-将输出存储到一个目录。
- Kafka sink - Stores the output to one or more topics in Kafka.
Kafka sink -将输出存储到Kafka中的一个或多个主题。
- Foreach sink - Runs arbitrary computation on the records in the output. See later in the section for more details.
Foreach sink -对输出中的记录进行任意计算。有关详细信息,请参阅本节后面的内容。
- Console sink (for debugging) - Prints the output to the console/stdout every time there is a trigger. Both, Append and Complete output modes, are supported. This should be used for debugging purposes on low data volumes as the entire output is collected and stored in the driver’s memory after every trigger.
控制台接收器(用于调试)-每次有触发器时将输出打印到控制台/标准输出。支持“追加”和“完成”输出模式。这应该用于低数据量的调试目的,因为在每次触发后,整个输出都被收集并存储在驱动程序的内存中。
- Memory sink (for debugging) - The output is stored in memory as an in-memory table.
Both, Append and Complete output modes, are supported. This should be used for debugging purposes
on low data volumes as the entire output is collected and stored in the driver’s memory.
Hence, use it with caution.
内存接收器(用于调试)-输出作为内存表存储在内存中。支持“追加”和“完成”输出模式。这应该用于低数据量的调试目的,因为整个输出被收集并存储在驱动程序的内存中。因此,请谨慎使用。
Some sinks are not fault-tolerant because they do not guarantee persistence of the output and are
meant for debugging purposes only. See the earlier section on
fault-tolerance semantics.
Here are the details of all the sinks in Spark.
有些接收器是不容错的,因为它们不保证输出的持久性,并且仅用于调试目的。请参阅前面关于容错语义的部分。以下是Spark中所有水槽的详细信息。
Sink | Supported Output Modes 支持的输出模式 | Options | Fault-tolerant | Notes |
---|---|---|---|---|
File Sink 文件接收器 | Append |
path : path to the output directory, must be specified.path :输出目录的路径,必须指定。retention : time to live (TTL) for output files. Output files which batches were
committed older than TTL will be eventually excluded in metadata log. This means reader queries which read
the sink's output directory may not process them. You can provide the value as string format of the time. (like "12h", "7d", etc.)
By default it's disabled.
retention :输出文件的生存时间(TTL)。在TTL之前提交的批处理的输出文件最终将被排除在元数据日志中。这意味着读取接收器的输出目录的读取器查询可能不会处理它们。您可以将该值提供为时间的字符串格式。(like"12h"、"7d"等)默认情况下它是禁用的。 For file-format-specific options, see the related methods in DataFrameWriter (Scala/Java/Python/R). E.g. for "parquet" format options see DataFrameWriter.parquet() 有关文件格式特定的选项,请参阅DataFrameWriter(Scala/Java/Python/R)中的相关方法。例如,有关"镶木地板"格式选项,请参见 DataFrameWriter.parquet()
|
Yes (exactly-once) 是(仅一次) | Supports writes to partitioned tables. Partitioning by time may be useful. 支持写入分区表。按时间划分可能是有用的。 |
Kafka Sink Kafka水槽 | Append, Update, Complete 追加、更新、完成 |
See the Kafka Integration Guide 请参阅Kafka集成指南 |
Yes (at-least-once) 是(至少一次) | More details in the Kafka Integration Guide 更多详细信息,请参阅Kafka集成指南 |
Foreach Sink Foreach水槽 | Append, Update, Complete 追加、更新、完成 |
None | Yes (at-least-once) 是(至少一次) | More details in the next section 下一节中的更多细节 |
ForeachBatch Sink ForeachBatch接收器 | Append, Update, Complete 追加、更新、完成 |
None | Depends on the implementation 取决于实现 |
More details in the next section 下一节中的更多细节 |
Console Sink 控制台水槽 | Append, Update, Complete 追加、更新、完成 |
numRows : Number of rows to print every trigger (default: 20)
numRows :每个触发器打印的行数(默认值:20) truncate : Whether to truncate the output if too long (default: true)
truncate :如果输出太长,是否截断输出(默认值:true) |
No | |
Memory Sink 记忆槽 | Append, Complete 追加,完成 | None | No. But in Complete Mode, restarted query will recreate the full table. 号但是在完全模式下,重新启动的查询将重新创建整个表。 |
Table name is the query name. 表名称是查询名称。 |
Note that you have to call start()
to actually start the execution of the query. This returns a StreamingQuery object which is a handle to the continuously running execution. You can use this object to manage the query, which we will discuss in the next subsection. For now, let’s understand all this with a few examples.
请注意,您必须调用 start()
才能真正开始执行查询。这将返回一个StreamingQuery对象,它是持续运行的执行的句柄。您可以使用此对象来管理查询,我们将在下一小节中讨论。现在,让我们通过几个例子来理解这一切。
Using Foreach and ForeachBatch
使用Foreach和ForeachBatch
The foreach
and foreachBatch
operations allow you to apply arbitrary operations and writing
logic on the output of a streaming query. They have slightly different use cases - while foreach
allows custom write logic on every row, foreachBatch
allows arbitrary operations
and custom logic on the output of each micro-batch. Let’s understand their usages in more detail.
foreach
和 foreachBatch
操作允许您在流查询的输出上应用任意操作和编写逻辑。它们的用例略有不同-而 foreach
允许在每行上自定义写入逻辑, foreachBatch
允许在每个微批处理的输出上进行任意操作和自定义逻辑。让我们更详细地了解它们的用法。
ForeachBatch
foreachBatch(...)
allows you to specify a function that is executed on
the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python.
It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch.
foreachBatch(...)
允许您指定一个函数,该函数将对流式查询的每个微批处理的输出数据执行。从Spark 2.4开始,Scala、Java和Python都支持这个功能。它有两个参数:一个DataFrame或Dataset,它包含微批处理的输出数据和微批处理的唯一ID。
R is not yet supported.
With foreachBatch
, you can do the following.
使用 foreachBatch
,您可以执行以下操作。
- Reuse existing batch data sources - For many storage systems, there may not be a streaming sink available yet,
but there may already exist a data writer for batch queries. Using
foreachBatch
, you can use the batch data writers on the output of each micro-batch.
重用现有的批处理数据源-对于许多存储系统,可能还没有可用的流式接收器,但可能已经存在用于批处理查询的数据写入器。使用foreachBatch
,您可以在每个微批处理的输出上使用批处理数据写入器。 - Write to multiple locations - If you want to write the output of a streaming query to multiple locations,
then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can
cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations,
you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline.
写入多个位置-如果您想将流查询的输出写入多个位置,那么您可以简单地多次写入输出DataFrame/Dataset。但是,每次尝试写入都可能导致重新计算输出数据(包括可能重新读取输入数据)。为了避免重新计算,你应该缓存输出的DataFrame/Dataset,将其写入多个位置,然后取消缓存。
- Apply additional DataFrame operations - Many DataFrame and Dataset operations are not supported
in streaming DataFrames because Spark does not support generating incremental plans in those cases.
Using
foreachBatch
, you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself.
应用额外的DataFrame操作-在流式DataFrame中不支持许多DataFrame和Dataset操作,因为Spark不支持在这些情况下生成增量计划。使用foreachBatch
,您可以对每个微批处理输出应用其中的一些操作。但是,您必须自己推理执行该操作的端到端语义。
Note: 注意事项:
- By default,
foreachBatch
provides only at-least-once write guarantees. However, you can use the batchId provided to the function as way to deduplicate the output and get an exactly-once guarantee.
默认情况下,foreachBatch
只提供至少一次写入保证。但是,您可以使用提供给该函数的batchId作为消除重复输出的方法,并获得一个精确的一次保证。 foreachBatch
does not work with the continuous processing mode as it fundamentally relies on the micro-batch execution of a streaming query. If you write data in the continuous mode, useforeach
instead.
foreachBatch
不适用于连续处理模式,因为它基本上依赖于流式查询的微批处理执行。如果您以连续模式写入数据,请改用foreach
。
Foreach
If foreachBatch
is not an option (for example, corresponding batch data writer does not exist, or
continuous processing mode), then you can express your custom writer logic using foreach
.
Specifically, you can express the data writing logic by dividing it into three methods: open
, process
, and close
.
Since Spark 2.4, foreach
is available in Scala, Java and Python.
如果 foreachBatch
不是一个选项(例如,对应的批处理数据编写器不存在,或者连续处理模式),那么您可以使用 foreach
来表达您的自定义编写器逻辑。具体来说,可以将数据写入逻辑分为三个方法: open
、 process
和 close
。从Spark 2.4开始, foreach
在Scala,Java和Python中可用。
In Scala, you have to extend the class ForeachWriter
(docs).
In Java, you have to extend the class ForeachWriter
(docs).
在Java中,你必须扩展类 ForeachWriter
(docs)。
In Python, you can invoke foreach in two ways: in a function or in an object. The function offers a simple way to express your processing logic but does not allow you to deduplicate generated data when failures cause reprocessing of some input data. For that situation you must specify the processing logic in an object.
- First, the function takes a row as input.
- Second, the object has a process method and optional open and close methods:
R is not yet supported.
Execution semantics
When the streaming query is started, Spark calls the function or the object’s methods in the following way:
执行语义当流查询启动时,Spark以以下方式调用函数或对象的方法:
-
A single copy of this object is responsible for all the data generated by a single task in a query. In other words, one instance is responsible for processing one partition of the data generated in a distributed manner.
此对象的单个副本负责查询中单个任务生成的所有数据。换句话说,一个实例负责处理以分布式方式生成的数据的一个分区。 -
This object must be serializable, because each task will get a fresh serialized-deserialized copy of the provided object. Hence, it is strongly recommended that any initialization for writing data (for example. opening a connection or starting a transaction) is done after the open() method has been called, which signifies that the task is ready to generate data.
此对象必须是可序列化的,因为每个任务都将获得所提供对象的新的序列化-重命名副本。因此,强烈建议任何写入数据的初始化(例如,打开连接或启动事务)是在调用open()方法之后完成的,这表示任务已准备好生成数据。 -
The lifecycle of the methods are as follows:
方法的生命周期如下:-
For each partition with partition_id:
对于每个具有partition_id的分区:-
For each batch/epoch of streaming data with epoch_id:
对于具有epoch_id的流数据的每个批次/时期:-
Method open(partitionId, epochId) is called.
方法open(partitionId,epochId)被调用。 -
If open(…) returns true, for each row in the partition and batch/epoch, method process(row) is called.
如果open(...)返回true,则对于分区和batch/epoch中的每一行,调用方法process(row)。 -
Method close(error) is called with error (if any) seen while processing rows.
方法close(error)被调用,在处理行时看到错误(如果有的话)。
-
-
-
-
The close() method (if it exists) is called if an open() method exists and returns successfully (irrespective of the return value), except if the JVM or Python process crashes in the middle.
如果open()方法存在并成功返回(无论返回值如何),则调用close()方法(如果存在),除非JVM或Python进程在中途崩溃。 -
Note: Spark does not guarantee same output for (partitionId, epochId), so deduplication cannot be achieved with (partitionId, epochId). e.g. source provides different number of partitions for some reasons, Spark optimization changes number of partitions, etc. See SPARK-28650 for more details. If you need deduplication on output, try out
foreachBatch
instead.
注意:Spark不保证(partitionId,epochId)的输出相同,因此无法使用(partitionId,epochId)实现重复数据删除。例如,source由于某些原因提供了不同数量的分区,Spark优化更改了分区数量等。有关更多详细信息,请参阅SPARK-28650。如果您需要对输出进行重复数据消除,请尝试使用foreachBatch
。
Streaming Table APIs 流表API
Since Spark 3.1, you can also use DataStreamReader.table()
to read tables as streaming DataFrames and use DataStreamWriter.toTable()
to write streaming DataFrames as tables:
从Spark 3.1开始,您还可以使用 DataStreamReader.table()
将表读取为流式数据帧,并使用 DataStreamWriter.toTable()
将流式数据帧写入表:
Not available in R.
For more details, please check the docs for DataStreamReader (Scala/Java/Python docs) and DataStreamWriter (Scala/Java/Python docs).
有关详细信息,请查看DataStreamReader(Scala/ Java/ Python文档)和DataStreamWriter(Scala/ Java/ Python文档)的文档。
Triggers 触发
The trigger settings of a streaming query define the timing of streaming data processing, whether
the query is going to be executed as micro-batch query with a fixed batch interval or as a continuous processing query.
Here are the different kinds of triggers that are supported.
流式查询的触发器设置定义流式数据处理的定时,该查询将作为具有固定批处理间隔的微批处理查询还是作为连续处理查询来执行。下面是支持的不同类型的触发器。
Trigger Type 触发类型 | Description |
---|---|
unspecified (default) 未指定(默认) |
If no trigger setting is explicitly specified, then by default, the query will be
executed in micro-batch mode, where micro-batches will be generated as soon as
the previous micro-batch has completed processing.
如果未显式指定触发器设置,则默认情况下,查询将在微批处理模式下执行,其中,一旦前一个微批处理完成处理,就会生成微批处理。 |
Fixed interval micro-batches 固定间隔微量批次 |
The query will be executed with micro-batches mode, where micro-batches will be kicked off
at the user-specified intervals.
该查询将以微批处理模式执行,其中微批处理将以用户指定的时间间隔启动。
|
One-time micro-batch 一次性微量批 |
The query will execute only one micro-batch to process all the available data and then
stop on its own. This is useful in scenarios you want to periodically spin up a cluster,
process everything that is available since the last period, and then shutdown the
cluster. In some case, this may lead to significant cost savings.
查询将只执行一个微批处理来处理所有可用数据,然后自行停止。这在您希望定期启动群集、处理自上次周期以来可用的所有内容然后关闭群集的情况下非常有用。在某些情况下,这可能导致显著成本节约。 |
Continuous with fixed checkpoint interval 连续,检查点间隔固定 (experimental) (实验) |
The query will be executed in the new low-latency, continuous processing mode. Read more
about this in the Continuous Processing section below.
查询将以新的低延迟、连续处理模式执行。在下面的连续处理部分中了解更多信息。 |
Here are a few code examples.
下面是一些代码示例。
Managing Streaming Queries
管理流媒体服务
The StreamingQuery
object created when a query is started can be used to monitor and manage the query.
查询启动时创建的 StreamingQuery
对象可用于监视和管理查询。
You can start any number of queries in a single SparkSession. They will all be running concurrently sharing the cluster resources. You can use sparkSession.streams()
to get the StreamingQueryManager
(Scala/Java/Python docs)
that can be used to manage the currently active queries.
您可以在单个SparkSession中启动任意数量的查询。它们都将同时运行,共享群集资源。您可以使用 sparkSession.streams()
来获取 StreamingQueryManager
(Scala/Java/Python文档),该文档可用于管理当前活动的查询。
Monitoring Streaming Queries
监控流媒体
There are multiple ways to monitor active streaming queries. You can either push metrics to external systems using Spark’s Dropwizard Metrics support, or access them programmatically.
有多种方法可以监视活动的流查询。您可以使用Spark的Dropwizard支持将指标推送到外部系统,或者以编程方式访问它们。
Reading Metrics Interactively
互动式阅读
You can directly get the current status and metrics of an active query using
streamingQuery.lastProgress()
and streamingQuery.status()
.
lastProgress()
returns a StreamingQueryProgress
object
in Scala
and Java
and a dictionary with the same fields in Python. It has all the information about
the progress made in the last trigger of the stream - what data was processed,
what were the processing rates, latencies, etc. There is also
streamingQuery.recentProgress
which returns an array of last few progresses.
您可以使用 streamingQuery.lastProgress()
和 streamingQuery.status()
直接获取活动查询的当前状态和指标。 lastProgress()
在Scala和Java中返回一个 StreamingQueryProgress
对象,在Python中返回一个具有相同字段的字典。它有关于流的最后一个触发器中所做的进度的所有信息—处理了什么数据,处理速率是什么,延迟等等。还有 streamingQuery.recentProgress
,它返回最后几个进度的数组。
In addition, streamingQuery.status()
returns a StreamingQueryStatus
object
in Scala
and Java
and a dictionary with the same fields in Python. It gives information about
what the query is immediately doing - is a trigger active, is data being processed, etc.
此外, streamingQuery.status()
在Scala和Java中返回一个 StreamingQueryStatus
对象,在Python中返回一个具有相同字段的字典。它提供了有关查询正在立即执行的操作的信息-触发器是否处于活动状态,数据是否正在处理等。
Here are a few examples.
这里有几个例子。
Reporting Metrics programmatically using Asynchronous APIs
使用异步API以编程方式报告
You can also asynchronously monitor all queries associated with a
SparkSession
by attaching a StreamingQueryListener
(Scala/Java docs).
Once you attach your custom StreamingQueryListener
object with
sparkSession.streams.attachListener()
, you will get callbacks when a query is started and
stopped and when there is progress made in an active query. Here is an example,
您还可以通过附加 StreamingQueryListener
(Scala/ Java文档)来异步监视与 SparkSession
关联的所有查询。一旦你用 sparkSession.streams.attachListener()
附加了你的自定义 StreamingQueryListener
对象,当一个查询开始和停止时,以及当一个活动的查询有进展时,你会得到回调。这里有一个例子,
Reporting Metrics using Dropwizard
使用Dropwizard报告故障
Spark supports reporting metrics using the Dropwizard Library. To enable metrics of Structured Streaming queries to be reported as well, you have to explicitly enable the configuration spark.sql.streaming.metricsEnabled
in the SparkSession.
Spark支持使用Dropwizard库报告指标。要使结构化流查询的指标也能够报告,您必须在SparkSession中显式启用配置 spark.sql.streaming.metricsEnabled
。
All queries started in the SparkSession after this configuration has been enabled will report metrics through Dropwizard to whatever sinks have been configured (e.g. Ganglia, Graphite, JMX, etc.).
启用此配置后,在SparkSession中启动的所有查询都将通过Dropwizard向已配置的任何接收器(例如Ganglia,Graphite,JMX等)报告指标。
Recovering from Failures with Checkpointing
使用检查点从故障中恢复
In case of a failure or intentional shutdown, you can recover the previous progress and state of a previous query, and continue where it left off. This is done using checkpointing and write-ahead logs. You can configure a query with a checkpoint location, and the query will save all the progress information (i.e. range of offsets processed in each trigger) and the running aggregates (e.g. word counts in the quick example) to the checkpoint location. This checkpoint location has to be a path in an HDFS compatible file system, and can be set as an option in the DataStreamWriter when starting a query.
如果出现故障或故意关闭,您可以恢复上一个查询的先前进度和状态,并从中断处继续。这是使用检查点和写前日志完成的。您可以使用检查点位置配置查询,查询将保存所有进度信息(即每个触发器中处理的偏移量范围)和运行聚合(例如快速示例中的字数)到检查点位置。此检查点位置必须是HDFS兼容文件系统中的路径,并且可以在启动查询时在DataStreamWriter中设置为选项。
Recovery Semantics after Changes in a Streaming Query
流查询中更改后的恢复语义
There are limitations on what changes in a streaming query are allowed between restarts from the
same checkpoint location. Here are a few kinds of changes that are either not allowed, or
the effect of the change is not well-defined. For all of them:
在从同一检查点位置重新启动之间,允许对流式查询进行哪些更改存在限制。下面是一些不允许的更改,或者更改的效果没有很好地定义。对于所有这些:
-
The term allowed means you can do the specified change but whether the semantics of its effect is well-defined depends on the query and the change.
术语allowed意味着您可以进行指定的更改,但其效果的语义是否定义良好取决于查询和更改。 -
The term not allowed means you should not do the specified change as the restarted query is likely to fail with unpredictable errors.
sdf
represents a streaming DataFrame/Dataset generated with sparkSession.readStream.
术语“不允许”表示不应执行指定的更改,因为重新启动的查询可能会因不可预测的错误而失败。sdf
表示使用sparkSession. readStream生成的流式DataFrame/Dataset。
Types of changes 类型的更改
-
Changes in the number or type (i.e. different source) of input sources: This is not allowed.
改变输入源的数量或类型(即不同的源):这是不允许的。 -
Changes in the parameters of input sources: Whether this is allowed and whether the semantics of the change are well-defined depends on the source and the query. Here are a few examples.
输入源参数的更改:这是否允许以及更改的语义是否定义良好取决于源和查询。这里有几个例子。-
Addition/deletion/modification of rate limits is allowed:
spark.readStream.format("kafka").option("subscribe", "topic")
tospark.readStream.format("kafka").option("subscribe", "topic").option("maxOffsetsPerTrigger", ...)
允许增加/删除/修改速率限制:spark.readStream.format("kafka").option("subscribe", "topic")
至spark.readStream.format("kafka").option("subscribe", "topic").option("maxOffsetsPerTrigger", ...)
-
Changes to subscribed topics/files are generally not allowed as the results are unpredictable:
spark.readStream.format("kafka").option("subscribe", "topic")
tospark.readStream.format("kafka").option("subscribe", "newTopic")
通常不允许更改订阅的主题/文件,因为结果不可预测:spark.readStream.format("kafka").option("subscribe", "topic")
到spark.readStream.format("kafka").option("subscribe", "newTopic")
-
-
Changes in the type of output sink: Changes between a few specific combinations of sinks are allowed. This needs to be verified on a case-by-case basis. Here are a few examples.
输出接收器类型的更改:允许在几个特定的接收器组合之间进行更改。这需要逐案核实。这里有几个例子。-
File sink to Kafka sink is allowed. Kafka will see only the new data.
允许文件接收器到Kafka接收器。Kafka只会看到新的数据。 -
Kafka sink to file sink is not allowed.
不允许Kafka接收器到文件接收器。 -
Kafka sink changed to foreach, or vice versa is allowed.
允许将Kafka sink更改为foreach,反之亦然。
-
-
Changes in the parameters of output sink: Whether this is allowed and whether the semantics of the change are well-defined depends on the sink and the query. Here are a few examples.
输出接收器参数的更改:是否允许以及更改的语义是否定义良好取决于接收器和查询。这里有几个例子。-
Changes to output directory of a file sink are not allowed:
sdf.writeStream.format("parquet").option("path", "/somePath")
tosdf.writeStream.format("parquet").option("path", "/anotherPath")
不允许更改文件接收器的输出目录:sdf.writeStream.format("parquet").option("path", "/somePath")
到sdf.writeStream.format("parquet").option("path", "/anotherPath")
-
Changes to output topic are allowed:
sdf.writeStream.format("kafka").option("topic", "someTopic")
tosdf.writeStream.format("kafka").option("topic", "anotherTopic")
允许更改输出主题:sdf.writeStream.format("kafka").option("topic", "someTopic")
到sdf.writeStream.format("kafka").option("topic", "anotherTopic")
-
Changes to the user-defined foreach sink (that is, the
ForeachWriter
code) are allowed, but the semantics of the change depends on the code.
允许对用户定义的foreach接收器(即ForeachWriter
代码)进行更改,但更改的语义取决于代码。
-
-
Changes in projection / filter / map-like operations: Some cases are allowed. For example:
在投影/过滤器/映射类操作中的更改:某些情况下是允许的。例如:-
Addition / deletion of filters is allowed:
sdf.selectExpr("a")
tosdf.where(...).selectExpr("a").filter(...)
.
允许添加/删除过滤器:sdf.selectExpr("a")
至sdf.where(...).selectExpr("a").filter(...)
。 -
Changes in projections with same output schema are allowed:
sdf.selectExpr("stringColumn AS json").writeStream
tosdf.selectExpr("anotherStringColumn AS json").writeStream
允许对具有相同输出方案的投影进行更改:sdf.selectExpr("stringColumn AS json").writeStream
到sdf.selectExpr("anotherStringColumn AS json").writeStream
-
Changes in projections with different output schema are conditionally allowed:
sdf.selectExpr("a").writeStream
tosdf.selectExpr("b").writeStream
is allowed only if the output sink allows the schema change from"a"
to"b"
.
有条件地允许具有不同输出模式的投影中的更改:仅当输出接收器允许模式从"a"
更改为"b"
时,才允许从sdf.selectExpr("a").writeStream
更改为sdf.selectExpr("b").writeStream
。
-
-
Changes in stateful operations: Some operations in streaming queries need to maintain state data in order to continuously update the result. Structured Streaming automatically checkpoints the state data to fault-tolerant storage (for example, HDFS, AWS S3, Azure Blob storage) and restores it after restart. However, this assumes that the schema of the state data remains same across restarts. This means that any changes (that is, additions, deletions, or schema modifications) to the stateful operations of a streaming query are not allowed between restarts. Here is the list of stateful operations whose schema should not be changed between restarts in order to ensure state recovery:
有状态操作的变化:流查询中的一些操作需要维护状态数据,以便不断更新结果。Structured Streaming会自动将状态数据检查到容错存储(例如HDFS、AWS S3、Azure Blob存储),并在重启后恢复。但是,这假设状态数据的模式在重新启动时保持不变。这意味着在重新启动之间不允许对流查询的有状态操作进行任何更改(即添加、删除或模式修改)。以下是有状态操作的列表,为了确保状态恢复,在重新启动之间不应更改其模式:-
Streaming aggregation: For example,
sdf.groupBy("a").agg(...)
. Any change in number or type of grouping keys or aggregates is not allowed.
流聚合:例如,sdf.groupBy("a").agg(...)
。不允许更改分组键或聚合的数量或类型。 -
Streaming deduplication: For example,
sdf.dropDuplicates("a")
. Any change in number or type of grouping keys or aggregates is not allowed.
流式重复数据消除:例如,sdf.dropDuplicates("a")
。不允许更改分组键或聚合的数量或类型。 -
Stream-stream join: For example,
sdf1.join(sdf2, ...)
(i.e. both inputs are generated withsparkSession.readStream
). Changes in the schema or equi-joining columns are not allowed. Changes in join type (outer or inner) are not allowed. Other changes in the join condition are ill-defined.
流-流连接:例如,sdf1.join(sdf2, ...)
(即两个输入都是用sparkSession.readStream
生成的)。不允许更改架构或等联接列。不允许更改联接类型(外部或内部)。连接条件中的其他更改是不明确的。 -
Arbitrary stateful operation: For example,
sdf.groupByKey(...).mapGroupsWithState(...)
orsdf.groupByKey(...).flatMapGroupsWithState(...)
. Any change to the schema of the user-defined state and the type of timeout is not allowed. Any change within the user-defined state-mapping function are allowed, but the semantic effect of the change depends on the user-defined logic. If you really want to support state schema changes, then you can explicitly encode/decode your complex state data structures into bytes using an encoding/decoding scheme that supports schema migration. For example, if you save your state as Avro-encoded bytes, then you are free to change the Avro-state-schema between query restarts as the binary state will always be restored successfully.
任意有状态操作:例如sdf.groupByKey(...).mapGroupsWithState(...)
或sdf.groupByKey(...).flatMapGroupsWithState(...)
。不允许对用户定义的状态和超时类型的架构进行任何更改。允许在用户定义的状态映射函数中进行任何更改,但更改的语义效果取决于用户定义的逻辑。如果您确实希望支持状态模式更改,那么可以使用支持模式迁移的编码/解码方案将复杂的状态数据结构显式编码/解码为字节。例如,如果您将状态保存为Avro编码的字节,则可以在查询重新启动之间自由更改Avro-state-schema,因为二进制状态将始终成功恢复。
-
Continuous Processing 连续处理
[Experimental] 【实验】
Continuous processing is a new, experimental streaming execution mode introduced in Spark 2.3 that enables low (~1 ms) end-to-end latency with at-least-once fault-tolerance guarantees. Compare this with the default micro-batch processing engine which can achieve exactly-once guarantees but achieve latencies of ~100ms at best. For some types of queries (discussed below), you can choose which mode to execute them in without modifying the application logic (i.e. without changing the DataFrame/Dataset operations).
连续处理是Spark 2.3中引入的一种新的实验性流执行模式,它可以实现低(~1 ms)的端到端延迟,并保证至少一次容错。与此相比,默认的微批处理引擎可以实现精确的一次保证,但最多只能实现~ 100毫秒的延迟。对于某些类型的查询(下面将讨论),您可以选择在不修改应用程序逻辑(即不更改DataFrame/Dataset操作)的情况下执行它们的模式。
To run a supported query in continuous processing mode, all you need to do is specify a continuous trigger with the desired checkpoint interval as a parameter. For example,
要在连续处理模式下运行受支持的查询,只需指定一个连续触发器,并将所需的检查点间隔作为参数。比如说,
A checkpoint interval of 1 second means that the continuous processing engine will record the progress of the query every second. The resulting checkpoints are in a format compatible with the micro-batch engine, hence any query can be restarted with any trigger. For example, a supported query started with the micro-batch mode can be restarted in continuous mode, and vice versa. Note that any time you switch to continuous mode, you will get at-least-once fault-tolerance guarantees.
1秒的检查点间隔意味着连续处理引擎将每秒记录查询的进度。生成的检查点采用与微批处理引擎兼容的格式,因此可以使用任何触发器重新启动任何查询。例如,以微批处理模式启动的受支持查询可以以连续模式重新启动,反之亦然。请注意,无论何时切换到连续模式,都将获得至少一次容错保证。
Supported Queries 支持的服务器
As of Spark 2.4, only the following type of queries are supported in the continuous processing mode.
从Spark 2.4开始,在连续处理模式下只支持以下类型的查询。
- Operations: Only map-like Dataset/DataFrame operations are supported in continuous mode, that is, only projections (
select
,map
,flatMap
,mapPartitions
, etc.) and selections (where
,filter
, etc.).
操作:连续模式下仅支持类似地图的数据集/数据帧操作,即仅支持投影(select
、map
、flatMap
、mapPartitions
等)和选择(where
、filter
等)。- All SQL functions are supported except aggregation functions (since aggregations are not yet supported),
current_timestamp()
andcurrent_date()
(deterministic computations using time is challenging).
除了聚合函数(因为还不支持聚合),current_timestamp()
和current_date()
(使用时间的确定性计算是有挑战性的)之外,所有SQL函数都被支持。
- All SQL functions are supported except aggregation functions (since aggregations are not yet supported),
- Sources:
资料来源:
- Kafka source: All options are supported.
Kafka源码:支持所有选项。 - Rate source: Good for testing. Only options that are supported in the continuous mode are
numPartitions
androwsPerSecond
.
数据来源:Good for testing。在连续模式中仅支持numPartitions
和rowsPerSecond
选项。
- Kafka source: All options are supported.
- Sinks:
水槽:
- Kafka sink: All options are supported.
Kafka接收器:支持所有选项。 - Memory sink: Good for debugging.
Memory sink:适合调试。 - Console sink: Good for debugging. All options are supported. Note that the console will print every checkpoint interval that you have specified in the continuous trigger.
控制台接收器:适合调试。支持所有选项。注意,控制台将打印您在连续触发器中指定的每个检查点间隔。
- Kafka sink: All options are supported.
See Input Sources and Output Sinks sections for more details on them. While the console sink is good for testing, the end-to-end low-latency processing can be best observed with Kafka as the source and sink, as this allows the engine to process the data and make the results available in the output topic within milliseconds of the input data being available in the input topic.
有关它们的更多详细信息,请参见输入源和输出接收器部分。虽然控制台接收器适合测试,但端到端的低延迟处理可以通过Kafka作为源和接收器来最好地观察,因为这允许引擎处理数据并在输入数据在输入主题中可用的毫秒内在输出主题中提供结果。
Caveats 警告
- Continuous processing engine launches multiple long-running tasks that continuously read data from sources, process it and continuously write to sinks. The number of tasks required by the query depends on how many partitions the query can read from the sources in parallel. Therefore, before starting a continuous processing query, you must ensure there are enough cores in the cluster to all the tasks in parallel. For example, if you are reading from a Kafka topic that has 10 partitions, then the cluster must have at least 10 cores for the query to make progress.
持续处理引擎启动多个长时间运行的任务,这些任务持续从源读取数据、处理数据并持续写入接收器。查询所需的任务数取决于查询可以从源并行读取的分区数。因此,在开始连续处理查询之前,必须确保集群中有足够的核心来并行处理所有任务。例如,如果您正在从具有10个分区的Kafka主题中进行阅读,则集群必须至少具有10个核心才能使查询取得进展。 - Stopping a continuous processing stream may produce spurious task termination warnings. These can be safely ignored.
停止连续处理流可能会产生虚假的任务终止警告。这些可以安全地忽略。 - There are currently no automatic retries of failed tasks. Any failure will lead to the query being stopped and it needs to be manually restarted from the checkpoint.
当前没有失败任务的自动重试。任何失败都将导致查询停止,并且需要从检查点手动重新启动。
Additional Information 附加信息
Notes 注意到
- Several configurations are not modifiable after the query has run. To change them, discard the checkpoint and start a new query. These configurations include:
有几个配置在查询运行后不可修改。要更改它们,请丢弃检查点并启动新查询。这些配置包括:spark.sql.shuffle.partitions
- This is due to the physical partitioning of state: state is partitioned via applying hash function to key, hence the number of partitions for state should be unchanged.
这是由于状态的物理分区:状态是通过对键应用哈希函数来分区的,因此状态的分区数量应该保持不变。 - If you want to run fewer tasks for stateful operations,
coalesce
would help with avoiding unnecessary repartitioning.
如果你想运行更少的有状态操作任务,coalesce
将有助于避免不必要的重新分区。- After
coalesce
, the number of (reduced) tasks will be kept unless another shuffle happens.
在coalesce
之后,(减少的)任务数量将保持不变,除非发生另一次洗牌。
- After
- This is due to the physical partitioning of state: state is partitioned via applying hash function to key, hence the number of partitions for state should be unchanged.
spark.sql.streaming.stateStore.providerClass
: To read the previous state of the query properly, the class of state store provider should be unchanged.
spark.sql.streaming.stateStore.providerClass
:要正确读取查询的先前状态,状态存储提供程序的类应保持不变。spark.sql.streaming.multipleWatermarkPolicy
: Modification of this would lead inconsistent watermark value when query contains multiple watermarks, hence the policy should be unchanged.
spark.sql.streaming.multipleWatermarkPolicy
:修改此会导致查询包含多个水印时水印值不一致,因此策略应保持不变。
Further Reading 进一步阅读
- See and run the
Scala/Java/Python/R
examples.
查看并运行Scala/ Java/ Python/ R示例。- Instructions on how to run Spark examples
关于如何运行Spark示例的说明
- Instructions on how to run Spark examples
- Read about integrating with Kafka in the Structured Streaming Kafka Integration Guide
请阅读《结构化流Kafka集成指南》中有关与Kafka集成的信息 - Read more details about using DataFrames/Datasets in the Spark SQL Programming Guide
阅读Spark SQL编程指南中有关使用DataFrames/Datasets的更多详细信息 - Third-party Blog Posts
第三方博客文章
- Real-time Streaming ETL with Structured Streaming in Apache Spark 2.1 (Databricks Blog)
在Apache Spark 2.1中使用结构化流的实时流ETL(Databricks博客) - Real-Time End-to-End Integration with Apache Kafka in Apache Spark’s Structured Streaming (Databricks Blog)
在Apache Spark的结构化流中与Apache Kafka进行实时端到端集成(Databricks博客) - Event-time Aggregation and Watermarking in Apache Spark’s Structured Streaming (Databricks Blog)
Apache Spark的结构化流中的事件时聚合和水印(Databricks博客)
- Real-time Streaming ETL with Structured Streaming in Apache Spark 2.1 (Databricks Blog)
Talks 会谈
- Spark Summit Europe 2017
2017年欧洲星火峰会- Easy, Scalable, Fault-tolerant Stream Processing with Structured Streaming in Apache Spark -
Part 1 slides/video, Part 2 slides/video
在Apache Spark中使用结构化流进行简单、可扩展、容错的流处理—第1部分幻灯片/视频,第2部分幻灯片/视频 - Deep Dive into Stateful Stream Processing in Structured Streaming - slides/video
深入研究结构化流中的状态流处理—幻灯片/视频
- Easy, Scalable, Fault-tolerant Stream Processing with Structured Streaming in Apache Spark -
Part 1 slides/video, Part 2 slides/video
- Spark Summit 2016
2016年星火峰会
- A Deep Dive into Structured Streaming - slides/video
深入了解结构化流媒体—幻灯片/视频
- A Deep Dive into Structured Streaming - slides/video