Stream caching 流缓存
While stream types (like StreamSource
, InputStream
and Reader
) are commonly used in messaging for performance reasons, they also have an important drawback: they can only be read once. In order to be able to work with message content multiple times, the stream needs to be cached.
流类型(如 StreamSource
、 InputStream
和 Reader
)通常在消息传递中用于提高性能,但它们也有一个重要的缺点:它们只能被读取一次。为了能够多次处理消息内容,需要将流缓存起来。
Streams are cached in memory. However, for large stream messages, you can set spoolEnabled=true
and then large message (over 128 KB) will be cached in a temporary file instead. Camel itself will handle deleting the temporary file once the cached stream is no longer necessary.
流在内存中被缓存。然而,对于大型流消息,您可以设置 spoolEnabled=true
,然后大于 128 KB 的大消息将被缓存在临时文件中。一旦不再需要缓存的流,Camel 本身将处理删除临时文件。
StreamCache - Affecting the message payload The |
In order to determine if a message payload requires caching, then Camel uses the Type Converter functionality, to determine if the message payload type can be converted into an org.apache.camel.StreamCache
instance.
为了确定消息有效载荷是否需要缓存,Camel 使用类型转换功能来确定消息有效载荷类型是否可以转换为一个实例。
All the classes from the Camel release that implements |
Configuring Stream Caching
配置流缓存
Stream caching is configured using org.apache.camel.spi.StreamCachingStrategy
.
流缓存是使用 org.apache.camel.spi.StreamCachingStrategy
进行配置的。
The strategy has the following options:
该策略有以下选项:
Option 选项 | Default 默认 | Description 描述 |
---|---|---|
enabled 启用 | true 真实的 | Whether stream caching is enabled |
allowClasses 允许班级 | To filter stream caching of a given set of allowed/denied classes. By default, all classes that are | |
denyClasses 拒绝课程 | To filter stream caching of a given set of allowed/denied classes. By default, all classes that are | |
spoolEnabled 启用卷轴 | false 错误的 | Whether spool to disk is enabled |
spoolDirectory 卷轴目录 | ${java.io.tmpdir}/camel/camel-tmp-#uuid# | Base directory where temporary files for spooled streams should be stored. This option supports naming patterns as documented below. |
spoolCipher 线圈密码 | null 空 | If set, the temporary files are encrypted using the specified cipher transformation (i.e., a valid stream or 8-bit cipher name such as "RC4", "AES/CTR/NoPadding". An empty name "" is treated as null). |
spoolThreshold 线程阈值 | 128 KB 128 千字节 | Size in bytes when the stream should be spooled to disk instead of keeping in memory. Use a value of 0 or negative to disable it all together so streams is always kept in memory regardless of their size. |
spoolUsedHeapMemoryThreshold | 0 | A percentage (1 to 99) of current used heap memory to use as threshold for spooling streams to disk. The upper bounds is based on heap committed (guaranteed memory the JVM can claim). This can be used to spool to disk when running low on memory. |
spoolUsedHeapMemoryLimit 线程池使用的堆内存限制 | Max | If |
anySpoolRules 任何线轴规则 | false 错误的 | Whether any or all |
bufferSize 缓冲区大小 | 4096 | Sets the buffer size to use when allocating in-memory buffers used for in-memory stream caches. |
removeSpoolDirectoryWhenStopping | true 真实的 | Whether to remove the spool directory when stopping CamelContext. |
statisticsEnabled 统计已启用 | false 错误的 | Whether utilization statistics is enabled. By enabling this you can see these statics for example with JMX. |
SpoolDirectory naming pattern
SPOOLDIRECTORY 命名模式
The following patterns is supported:
支持以下模式:
-
#uuid#
= a random UUID
#uuid#
= 一个随机的 UUID -
#camelId#
= the CamelContext id (e.g. the name)
#camelId#
= CamelContext 的 id(例如名称) -
#name#
= same as#camelId#
#name#
= 和#camelId#
相同 -
#counter#
= an incrementing counter
#counter#
= 递增计数器 -
#bundleId#
= the OSGi bundle id (only for OSGi environments)
#bundleId#
= OSGi 捆绑包 ID(仅适用于 OSGi 环境) -
#symbolicName#
= the OSGi symbolic name (only for OSGi environments)
#symbolicName#
= OSGi 符号名称(仅适用于 OSGi 环境) -
#version#
= the OSGi bundle version (only for OSGi environments)
#version#
= OSGi 捆绑包版本(仅适用于 OSGi 环境) -
${env:key}
= the environment variable with the key
${env:key}
= 带有键的环境变量 -
${key}
= the JVM system property with the key
${key}
= 使用键的 JVM 系统属性
A couple of examples:
一些例子:
To store in the java temp directory with a sub directory using the CamelContext
name:
在 Java 临时目录中使用子目录存储,使用 CamelContext
名称:
context.getStreamCachingStrategy().setSpoolDirectory"${java.io.tmpdir}#name#/");
To store in KARAF_HOME/tmp/bundleId
directory:
存储在 KARAF_HOME/tmp/bundleId
目录中:
context.getStreamCachingStrategy().setSpoolDirectory"${env:KARAF_HOME}/tmp/bundle#bundleId#");
Configuring StreamCachingStrategy in Java
在 Java 中配置 StreamCachingStrategy
You can configure the StreamCachingStrategy
in Java as shown below:
您可以按照下面的示例在 Java 中配置 StreamCachingStrategy
context.getStreamCachingStrategy().setSpoolEnabled(true);
context.getStreamCachingStrategy().setSpoolDirectory("/tmp/cachedir");
context.getStreamCachingStrategy().setSpoolThreshold(64 * 1024);
context.getStreamCachingStrategy().setBufferSize(16 * 1024);
// to enable encryption using RC4
// context.getStreamCachingStrategy().setSpoolCipher("RC4");
And remember to enable Stream caching on the CamelContext
:
请在 CamelContext
上启用流缓存
context.setStreamCaching(true);
or on routes: 或者在路线上:
from("file:inbox")
.streamCaching()
.to("bean:foo");
Configuring StreamCachingStrategy in XML
在 XML 中配置 StreamCachingStrategy
In XML you can enable stream caching on the <camelContext>
and then do the configuration in the streamCaching
element:
在 XML 中,您可以在 <camelContext>
上启用流缓存,然后在 streamCaching
元素中进行配置
<camelContext streamCache="true">
<streamCaching id="myCacheConfig" bufferSize="16384" spoolEnabled="true" spoolDirectory="/tmp/cachedir" spoolThreshold="65536"/>
<route>
<from uri="direct:c"/>
<to uri="mock:c"/>
</route>
</camelContext>
Using spoolUsedHeapMemoryThreshold
使用 SPOOLUSEDHEAPMEMORYTHRESHOLD
By default, stream caching will spool only big payloads (128 KB or bigger) to disk. However you can also set the spoolUsedHeapMemoryThreshold
option which is a percentage of used heap memory. This can be used to also spool to disk when running low on memory.
默认情况下,流缓存只会将大型有效负载(128 KB 或更大)暂存到磁盘中。但是,您还可以设置 spoolUsedHeapMemoryThreshold
选项,该选项是已使用堆内存的百分比。当内存不足时,可以使用此选项将数据暂存到磁盘中。
For example with: 例如:
<streamCaching id="myCacheConfig" spoolEnabled="true" spoolDirectory="/tmp/cachedir" spoolUsedHeapMemoryThreshold="70"/>
Then notice that as spoolThreshold
is default enabled with 128 KB, then we have both thresholds in use (spoolThreshold
and spoolUsedHeapMemoryThreshold
). And in this example then we only spool to disk if payload is > 128 KB and that used heap memory is > 70%. The reason is that we have the option anySpoolRules
as default false
. That means both rules must be true
(e.g. AND).
然后注意,由于 spoolThreshold
默认启用了 128 KB,因此我们同时使用了两个阈值( spoolThreshold
和 spoolUsedHeapMemoryThreshold
)。在这个例子中,只有当有效负载> 128 KB 并且使用的堆内存> 70%时,我们才会将数据暂存到磁盘上。原因是我们将选项 anySpoolRules
默认为 false
。这意味着两个规则都必须 true
(例如 AND)。
If we want to spool to disk if either of the rules (e.g. OR), then we can do:
如果我们想要将数据缓存到磁盘,只要满足其中一个规则(例如,或者),我们可以这样做:
<streamCaching id="myCacheConfig" spoolEnabled="true" spoolDirectory="/tmp/cachedir" spoolUsedHeapMemoryThreshold="70" anySpoolRules="true"/>
If we only want to spool to disk if we run low on memory then we can set:
如果我们只想在内存不足时将数据缓存到磁盘,那么我们可以设置:
<streamCaching id="myCacheConfig" spoolEnabled="true" spoolDirectory="/tmp/cachedir" spoolThreshold="-1" spoolUsedHeapMemoryThreshold="70"/>
then we do not use the spoolThreshold
rule, and only the heap memory based is in use.
然后我们不使用 spoolThreshold
规则,只使用基于堆内存的规则。
By default, the upper limit of the used heap memory is based on the maximum heap size. Though you can also configure to use the committed heap size as the upper limit, this is done using the spoolUsedHeapMemoryLimit
option as shown below:
默认情况下,已使用堆内存的上限基于最大堆大小。尽管您也可以配置使用已提交的堆大小作为上限,但是可以使用下面显示的 spoolUsedHeapMemoryLimit
选项来完成:
<streamCaching id="myCacheConfig" spoolEnabled="true" spoolDirectory="/tmp/cachedir" spoolUsedHeapMemoryThreshold="70" spoolUsedHeapMemoryLimit="Committed"/>
Using custom SpoolRule implementations
使用自定义的 SpoolRule 实现
You can implement your custom rules to determine if the stream should be spooled to disk. This can be done by implementing the interface org.apache.camel.spi.StreamCachingStrategy.SpoolRule
which has a single method:
您可以实现自定义规则来确定是否将流写入磁盘。这可以通过实现接口 org.apache.camel.spi.StreamCachingStrategy.SpoolRule
来实现,该接口有一个方法:
boolean shouldSpoolCache(long length);
The length
is the length of the stream. To use the rule then add it to the StreamCachingStrategy
as shown below:
length
是流的长度。要使用该规则,请将其添加到 StreamCachingStrategy
中,如下所示:
SpoolRule mySpoolRule = ...
context.getStreamCachingStrategy().addSpoolRule(mySpoolRule);
And from XML you need to define a <bean>
with your custom rule:
从 XML 中,您需要使用自定义规则定义一个 <bean>
<bean id="mySpoolRule" class="com.foo.MySpoolRule"/>
<streamCaching id="myCacheConfig" spoolEnabled="true" spoolDirectory="/tmp/cachedir" spoolRules="mySpoolRule"/>
Using the spoolRules attribute on <streamCaching>
. if you have more rules, then separate them by comma.
使用 <streamCaching>
上的 spoolRules 属性。如果有更多的规则,则用逗号分隔。
<streamCaching id="myCacheConfig" spoolEnabled="true" spoolDirectory="/tmp/cachedir" spoolRules="mySpoolRule,myOtherSpoolRule"/>