Class WriteFiles<UserT,DestinationT,OutputT>
- java.lang.Object
-
- org.apache.beam.sdk.transforms.PTransform<PCollection<UserT>,WriteFilesResult<DestinationT>>
-
- org.apache.beam.sdk.io.WriteFiles<UserT,DestinationT,OutputT>
-
- All Implemented Interfaces:
java.io.Serializable
,HasDisplayData
@Experimental(SOURCE_SINK) public abstract class WriteFiles<UserT,DestinationT,OutputT> extends PTransform<PCollection<UserT>,WriteFilesResult<DestinationT>>
APTransform
that writes to aFileBasedSink
. A write begins with a sequential global initialization of a sink, followed by a parallel write, and ends with a sequential finalization of the write. The output of a write isPDone
.By default, every bundle in the input
PCollection
will be processed by aFileBasedSink.WriteOperation
, so the number of output will vary based on runner behavior, though at least 1 output will always be produced. The exact parallelism of the write stage can be controlled usingwithNumShards(int)
, typically used to control how many files are produced or to globally limit the number of workers connecting to an external service. However, this option can often hurt performance: it adds an additionalGroupByKey
to the pipeline.Example usage with runner-determined sharding:
p.apply(WriteFiles.to(new MySink(...)));
Example usage with a fixed number of shards:
p.apply(WriteFiles.to(new MySink(...)).withNumShards(3));
- See Also:
- Serialized Form
-
-
Field Summary
Fields Modifier and Type Field Description static java.lang.Class<? extends WriteFiles>
CONCRETE_CLASS
For internal use by runners.-
Fields inherited from class org.apache.beam.sdk.transforms.PTransform
name, resourceHints
-
-
Constructor Summary
Constructors Constructor Description WriteFiles()
-
Method Summary
All Methods Static Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description WriteFilesResult<DestinationT>
expand(PCollection<UserT> input)
Override this method to specify how thisPTransform
should be expanded on the givenInputT
.java.util.Map<TupleTag<?>,PValue>
getAdditionalInputs()
Returns allPValues
that are consumed as inputs to thisPTransform
that are independent of the expansion of thePTransform
withinPTransform.expand(PInput)
.abstract @Nullable PTransform<PCollection<UserT>,PCollectionView<java.lang.Integer>>
getComputeNumShards()
abstract @Nullable ValueProvider<java.lang.Integer>
getNumShardsProvider()
abstract @Nullable ShardingFunction<UserT,DestinationT>
getShardingFunction()
abstract FileBasedSink<UserT,DestinationT,OutputT>
getSink()
abstract boolean
getWindowedWrites()
void
populateDisplayData(DisplayData.Builder builder)
Register display data for the given transform or component.static <UserT,DestinationT,OutputT>
WriteFiles<UserT,DestinationT,OutputT>to(FileBasedSink<UserT,DestinationT,OutputT> sink)
Creates aWriteFiles
transform that writes to the givenFileBasedSink
, letting the runner control how many different shards are produced.void
validate(PipelineOptions options)
Called before running the Pipeline to verify this transform is fully and correctly specified.WriteFiles<UserT,DestinationT,OutputT>
withMaxNumWritersPerBundle(int maxNumWritersPerBundle)
Set the maximum number of writers created in a bundle before spilling to shuffle.WriteFiles<UserT,DestinationT,OutputT>
withNoSpilling()
Returns a newWriteFiles
that writes all data without spilling, simplifying the pipeline.WriteFiles<UserT,DestinationT,OutputT>
withNumShards(int numShards)
Returns a newWriteFiles
that will write to the currentFileBasedSink
using the specified number of shards.WriteFiles<UserT,DestinationT,OutputT>
withNumShards(ValueProvider<java.lang.Integer> numShardsProvider)
Returns a newWriteFiles
that will write to the currentFileBasedSink
using theValueProvider
specified number of shards.WriteFiles<UserT,DestinationT,OutputT>
withRunnerDeterminedSharding()
Returns a newWriteFiles
that will write to the currentFileBasedSink
with runner-determined sharding.WriteFiles<UserT,DestinationT,OutputT>
withSharding(PTransform<PCollection<UserT>,PCollectionView<java.lang.Integer>> sharding)
Returns a newWriteFiles
that will write to the currentFileBasedSink
using the specifiedPTransform
to compute the number of shards.WriteFiles<UserT,DestinationT,OutputT>
withShardingFunction(ShardingFunction<UserT,DestinationT> shardingFunction)
Returns a newWriteFiles
that will write to the currentFileBasedSink
using the specified sharding function to assign shard for inputs.WriteFiles<UserT,DestinationT,OutputT>
withSideInputs(java.util.List<PCollectionView<?>> sideInputs)
WriteFiles<UserT,DestinationT,OutputT>
withSkipIfEmpty()
WriteFiles<UserT,DestinationT,OutputT>
withSkipIfEmpty(boolean skipIfEmpty)
Set this sink to skip writing any files if the PCollection is empty.WriteFiles<UserT,DestinationT,OutputT>
withWindowedWrites()
Returns a newWriteFiles
that writes preserves windowing on it's input.-
Methods inherited from class org.apache.beam.sdk.transforms.PTransform
compose, compose, getDefaultOutputCoder, getDefaultOutputCoder, getDefaultOutputCoder, getKindString, getName, getResourceHints, setResourceHints, toString, validate
-
-
-
-
Field Detail
-
CONCRETE_CLASS
@Internal public static final java.lang.Class<? extends WriteFiles> CONCRETE_CLASS
For internal use by runners.
-
-
Method Detail
-
to
public static <UserT,DestinationT,OutputT> WriteFiles<UserT,DestinationT,OutputT> to(FileBasedSink<UserT,DestinationT,OutputT> sink)
Creates aWriteFiles
transform that writes to the givenFileBasedSink
, letting the runner control how many different shards are produced.
-
getSink
public abstract FileBasedSink<UserT,DestinationT,OutputT> getSink()
-
getComputeNumShards
public abstract @Nullable PTransform<PCollection<UserT>,PCollectionView<java.lang.Integer>> getComputeNumShards()
-
getNumShardsProvider
public abstract @Nullable ValueProvider<java.lang.Integer> getNumShardsProvider()
-
getWindowedWrites
public abstract boolean getWindowedWrites()
-
getShardingFunction
public abstract @Nullable ShardingFunction<UserT,DestinationT> getShardingFunction()
-
getAdditionalInputs
public java.util.Map<TupleTag<?>,PValue> getAdditionalInputs()
Description copied from class:PTransform
Returns allPValues
that are consumed as inputs to thisPTransform
that are independent of the expansion of thePTransform
withinPTransform.expand(PInput)
.For example, this can contain any side input consumed by this
PTransform
.- Overrides:
getAdditionalInputs
in classPTransform<PCollection<UserT>,WriteFilesResult<DestinationT>>
-
withNumShards
public WriteFiles<UserT,DestinationT,OutputT> withNumShards(int numShards)
Returns a newWriteFiles
that will write to the currentFileBasedSink
using the specified number of shards.This option should be used sparingly as it can hurt performance. See
WriteFiles
for more information.A value less than or equal to 0 will be equivalent to the default behavior of runner-determined sharding.
-
withNumShards
public WriteFiles<UserT,DestinationT,OutputT> withNumShards(ValueProvider<java.lang.Integer> numShardsProvider)
Returns a newWriteFiles
that will write to the currentFileBasedSink
using theValueProvider
specified number of shards.This option should be used sparingly as it can hurt performance. See
WriteFiles
for more information.
-
withMaxNumWritersPerBundle
public WriteFiles<UserT,DestinationT,OutputT> withMaxNumWritersPerBundle(int maxNumWritersPerBundle)
Set the maximum number of writers created in a bundle before spilling to shuffle.
-
withSkipIfEmpty
public WriteFiles<UserT,DestinationT,OutputT> withSkipIfEmpty(boolean skipIfEmpty)
Set this sink to skip writing any files if the PCollection is empty.
-
withSideInputs
public WriteFiles<UserT,DestinationT,OutputT> withSideInputs(java.util.List<PCollectionView<?>> sideInputs)
-
withSharding
public WriteFiles<UserT,DestinationT,OutputT> withSharding(PTransform<PCollection<UserT>,PCollectionView<java.lang.Integer>> sharding)
Returns a newWriteFiles
that will write to the currentFileBasedSink
using the specifiedPTransform
to compute the number of shards.This option should be used sparingly as it can hurt performance. See
WriteFiles
for more information.
-
withRunnerDeterminedSharding
public WriteFiles<UserT,DestinationT,OutputT> withRunnerDeterminedSharding()
Returns a newWriteFiles
that will write to the currentFileBasedSink
with runner-determined sharding.
-
withShardingFunction
public WriteFiles<UserT,DestinationT,OutputT> withShardingFunction(ShardingFunction<UserT,DestinationT> shardingFunction)
Returns a newWriteFiles
that will write to the currentFileBasedSink
using the specified sharding function to assign shard for inputs.
-
withWindowedWrites
public WriteFiles<UserT,DestinationT,OutputT> withWindowedWrites()
Returns a newWriteFiles
that writes preserves windowing on it's input.If this option is not specified, windowing and triggering are replaced by
GlobalWindows
andDefaultTrigger
.If there is no data for a window, no output shards will be generated for that window. If a window triggers multiple times, then more than a single output shard might be generated multiple times; it's up to the sink implementation to keep these output shards unique.
This option can only be used if
withNumShards(int)
is also set to a positive value.
-
withNoSpilling
public WriteFiles<UserT,DestinationT,OutputT> withNoSpilling()
Returns a newWriteFiles
that writes all data without spilling, simplifying the pipeline. This option should not be used withwithMaxNumWritersPerBundle(int)
and it will eliminate this limit possibly causing many writers to be opened. Use with caution.This option only applies to writes
withRunnerDeterminedSharding()
.
-
withSkipIfEmpty
public WriteFiles<UserT,DestinationT,OutputT> withSkipIfEmpty()
-
validate
public void validate(PipelineOptions options)
Description copied from class:PTransform
Called before running the Pipeline to verify this transform is fully and correctly specified.By default, does nothing.
- Overrides:
validate
in classPTransform<PCollection<UserT>,WriteFilesResult<DestinationT>>
-
expand
public WriteFilesResult<DestinationT> expand(PCollection<UserT> input)
Description copied from class:PTransform
Override this method to specify how thisPTransform
should be expanded on the givenInputT
.NOTE: This method should not be called directly. Instead apply the
PTransform
should be applied to theInputT
using theapply
method.Composite transforms, which are defined in terms of other transforms, should return the output of one of the composed transforms. Non-composite transforms, which do not apply any transforms internally, should return a new unbound output and register evaluators (via backend-specific registration methods).
- Specified by:
expand
in classPTransform<PCollection<UserT>,WriteFilesResult<DestinationT>>
-
populateDisplayData
public void populateDisplayData(DisplayData.Builder builder)
Description copied from class:PTransform
Register display data for the given transform or component.populateDisplayData(DisplayData.Builder)
is invoked by Pipeline runners to collect display data viaDisplayData.from(HasDisplayData)
. Implementations may callsuper.populateDisplayData(builder)
in order to register display data in the current namespace, but should otherwise usesubcomponent.populateDisplayData(builder)
to use the namespace of the subcomponent.By default, does not register any display data. Implementors may override this method to provide their own display data.
- Specified by:
populateDisplayData
in interfaceHasDisplayData
- Overrides:
populateDisplayData
in classPTransform<PCollection<UserT>,WriteFilesResult<DestinationT>>
- Parameters:
builder
- The builder to populate with display data.- See Also:
HasDisplayData
-
-