Class GroupIntoBatches<K,InputT>
- java.lang.Object
-
- org.apache.beam.sdk.transforms.PTransform<PCollection<KV<K,InputT>>,PCollection<KV<K,java.lang.Iterable<InputT>>>>
-
- org.apache.beam.sdk.transforms.GroupIntoBatches<K,InputT>
-
- All Implemented Interfaces:
java.io.Serializable
,HasDisplayData
public class GroupIntoBatches<K,InputT> extends PTransform<PCollection<KV<K,InputT>>,PCollection<KV<K,java.lang.Iterable<InputT>>>>
APTransform
that batches inputs to a desired batch size. Batches will contain only elements of a single key.Elements are buffered until there are enough elements for a batch, at which point they are emitted to the output
PCollection
. AmaxBufferingDuration
can be set to emit output early and avoid waiting for a full batch forever.Batches can be triggered either based on element count or byte size.
ofSize(long)
is used to specify a maximum element count whileofByteSize(long)
is used to specify a maximum byte size. The single-argumentofByteSize(long)
uses the input coder to determine the encoded byte size of each element. However, this may not always be what is desired. A user may want to control batching based on a different byte size (e.g. the memory usage of the decoded Java object) or the input coder may not be able to efficiently determine the elements' byte size. For these cases, we also provide the two-argumentofByteSize(long)
allowing the user to pass in a function to be used to determine the byte size of an element.Windows are preserved (batches contain elements from the same window). Batches may contain elements from more than one bundle.
Example 1 (batch call a webservice and get return codes):
PCollection<KV<String, String>> input = ...; long batchSize = 100L; PCollection<KV<String, Iterable<String>>> batched = input .apply(GroupIntoBatches.<String, String>ofSize(batchSize)) .setCoder(KvCoder.of(StringUtf8Coder.of(), IterableCoder.of(StringUtf8Coder.of()))) .apply(ParDo.of(new DoFn<KV<String, Iterable<String>>, KV<String, String>>()
{@ProcessElement public void processElement(@Element KV<String, Iterable<String>> element, OutputReceiver<KV<String, String>> r) { r.output(KV.of(element.getKey(), callWebService(element.getValue()))); }
}));Example 2 (batch unbounded input in a global window):
PCollection<KV<String, String>> unboundedInput = ...; long batchSize = 100L; Duration maxBufferingDuration = Duration.standardSeconds(10); PCollection<KV<String, Iterable<String>>> batched = unboundedInput .apply(Window.<KV<String, String>>into(new GlobalWindows()) .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) .discardingFiredPanes()) .apply(GroupIntoBatches.<String, String>ofSize(batchSize) .withMaxBufferingDuration(maxBufferingDuration));
- See Also:
- Serialized Form
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
GroupIntoBatches.BatchingParams<InputT>
Wrapper class for batching parameters supplied by users.class
GroupIntoBatches.WithShardedKey
-
Field Summary
-
Fields inherited from class org.apache.beam.sdk.transforms.PTransform
name, resourceHints
-
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description PCollection<KV<K,java.lang.Iterable<InputT>>>
expand(PCollection<KV<K,InputT>> input)
Override this method to specify how thisPTransform
should be expanded on the givenInputT
.GroupIntoBatches.BatchingParams<InputT>
getBatchingParams()
Returns user supplied parameters for batching.static <K,InputT>
GroupIntoBatches<K,InputT>ofByteSize(long batchSizeBytes)
Aim to create batches each with the specified byte size.static <K,InputT>
GroupIntoBatches<K,InputT>ofByteSize(long batchSizeBytes, SerializableFunction<InputT,java.lang.Long> getElementByteSize)
Aim to create batches each with the specified byte size.static <K,InputT>
GroupIntoBatches<K,InputT>ofSize(long batchSize)
Aim to create batches each with the specified element count.GroupIntoBatches<K,InputT>
withMaxBufferingDuration(org.joda.time.Duration duration)
Sets a time limit (in processing time) on how long an incomplete batch of elements is allowed to be buffered.GroupIntoBatches.WithShardedKey
withShardedKey()
Outputs batched elements associated with sharded input keys.-
Methods inherited from class org.apache.beam.sdk.transforms.PTransform
compose, compose, getAdditionalInputs, getDefaultOutputCoder, getDefaultOutputCoder, getDefaultOutputCoder, getKindString, getName, getResourceHints, populateDisplayData, setResourceHints, toString, validate, validate
-
-
-
-
Method Detail
-
ofSize
public static <K,InputT> GroupIntoBatches<K,InputT> ofSize(long batchSize)
Aim to create batches each with the specified element count.
-
ofByteSize
public static <K,InputT> GroupIntoBatches<K,InputT> ofByteSize(long batchSizeBytes)
Aim to create batches each with the specified byte size.This option uses the PCollection's coder to determine the byte size of each element. This may not always be what is desired (e.g. the encoded size is not the same as the memory usage of the Java object). This is also only recommended if the coder returns true for isRegisterByteSizeObserverCheap, otherwise the transform will perform a possibly-expensive encoding of each element in order to measure its byte size. An alternate approach is to use
ofByteSize(long, SerializableFunction)
to specify code to calculate the byte size.
-
ofByteSize
public static <K,InputT> GroupIntoBatches<K,InputT> ofByteSize(long batchSizeBytes, SerializableFunction<InputT,java.lang.Long> getElementByteSize)
Aim to create batches each with the specified byte size. The provided function is used to determine the byte size of each element.
-
getBatchingParams
public GroupIntoBatches.BatchingParams<InputT> getBatchingParams()
Returns user supplied parameters for batching.
-
withMaxBufferingDuration
public GroupIntoBatches<K,InputT> withMaxBufferingDuration(org.joda.time.Duration duration)
Sets a time limit (in processing time) on how long an incomplete batch of elements is allowed to be buffered. Once a batch is flushed to output, the timer is reset. The provided limit must be a positive duration or zero; a zero buffering duration effectively means no limit.
-
withShardedKey
@Experimental public GroupIntoBatches.WithShardedKey withShardedKey()
Outputs batched elements associated with sharded input keys. By default, keys are sharded to such that the input elements with the same key are spread to all available threads executing the transform. Runners may override the default sharding to do a better load balancing during the execution time.
-
expand
public PCollection<KV<K,java.lang.Iterable<InputT>>> expand(PCollection<KV<K,InputT>> input)
Description copied from class:PTransform
Override this method to specify how thisPTransform
should be expanded on the givenInputT
.NOTE: This method should not be called directly. Instead apply the
PTransform
should be applied to theInputT
using theapply
method.Composite transforms, which are defined in terms of other transforms, should return the output of one of the composed transforms. Non-composite transforms, which do not apply any transforms internally, should return a new unbound output and register evaluators (via backend-specific registration methods).
- Specified by:
expand
in classPTransform<PCollection<KV<K,InputT>>,PCollection<KV<K,java.lang.Iterable<InputT>>>>
-
-