Class GroupIntoBatches<K,​InputT>

  • All Implemented Interfaces:
    java.io.Serializable, HasDisplayData

    public class GroupIntoBatches<K,​InputT>
    extends PTransform<PCollection<KV<K,​InputT>>,​PCollection<KV<K,​java.lang.Iterable<InputT>>>>
    A PTransform that batches inputs to a desired batch size. Batches will contain only elements of a single key.

    Elements are buffered until there are enough elements for a batch, at which point they are emitted to the output PCollection. A maxBufferingDuration can be set to emit output early and avoid waiting for a full batch forever.

    Batches can be triggered either based on element count or byte size. ofSize(long) is used to specify a maximum element count while ofByteSize(long) is used to specify a maximum byte size. The single-argument ofByteSize(long) uses the input coder to determine the encoded byte size of each element. However, this may not always be what is desired. A user may want to control batching based on a different byte size (e.g. the memory usage of the decoded Java object) or the input coder may not be able to efficiently determine the elements' byte size. For these cases, we also provide the two-argument ofByteSize(long) allowing the user to pass in a function to be used to determine the byte size of an element.

    Windows are preserved (batches contain elements from the same window). Batches may contain elements from more than one bundle.

    Example 1 (batch call a webservice and get return codes):

    
     PCollection<KV<String, String>> input = ...;
     long batchSize = 100L;
     PCollection<KV<String, Iterable<String>>> batched = input
         .apply(GroupIntoBatches.<String, String>ofSize(batchSize))
         .setCoder(KvCoder.of(StringUtf8Coder.of(), IterableCoder.of(StringUtf8Coder.of())))
         .apply(ParDo.of(new DoFn<KV<String, Iterable<String>>, KV<String, String>>() {
            @ProcessElement
             public void processElement(@Element KV<String, Iterable<String>> element,
                 OutputReceiver<KV<String, String>> r) {
                 r.output(KV.of(element.getKey(), callWebService(element.getValue())));
             }
         }));
     

    Example 2 (batch unbounded input in a global window):

    
     PCollection<KV<String, String>> unboundedInput = ...;
     long batchSize = 100L;
     Duration maxBufferingDuration = Duration.standardSeconds(10);
     PCollection<KV<String, Iterable<String>>> batched = unboundedInput
         .apply(Window.<KV<String, String>>into(new GlobalWindows())
             .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
             .discardingFiredPanes())
         .apply(GroupIntoBatches.<String, String>ofSize(batchSize)
             .withMaxBufferingDuration(maxBufferingDuration));
     
    See Also:
    Serialized Form
    • Method Detail

      • ofSize

        public static <K,​InputT> GroupIntoBatches<K,​InputT> ofSize​(long batchSize)
        Aim to create batches each with the specified element count.
      • ofByteSize

        public static <K,​InputT> GroupIntoBatches<K,​InputT> ofByteSize​(long batchSizeBytes)
        Aim to create batches each with the specified byte size.

        This option uses the PCollection's coder to determine the byte size of each element. This may not always be what is desired (e.g. the encoded size is not the same as the memory usage of the Java object). This is also only recommended if the coder returns true for isRegisterByteSizeObserverCheap, otherwise the transform will perform a possibly-expensive encoding of each element in order to measure its byte size. An alternate approach is to use ofByteSize(long, SerializableFunction) to specify code to calculate the byte size.

      • ofByteSize

        public static <K,​InputT> GroupIntoBatches<K,​InputT> ofByteSize​(long batchSizeBytes,
                                                                                   SerializableFunction<InputT,​java.lang.Long> getElementByteSize)
        Aim to create batches each with the specified byte size. The provided function is used to determine the byte size of each element.
      • withMaxBufferingDuration

        public GroupIntoBatches<K,​InputT> withMaxBufferingDuration​(org.joda.time.Duration duration)
        Sets a time limit (in processing time) on how long an incomplete batch of elements is allowed to be buffered. Once a batch is flushed to output, the timer is reset. The provided limit must be a positive duration or zero; a zero buffering duration effectively means no limit.
      • withShardedKey

        @Experimental
        public GroupIntoBatches.WithShardedKey withShardedKey()
        Outputs batched elements associated with sharded input keys. By default, keys are sharded to such that the input elements with the same key are spread to all available threads executing the transform. Runners may override the default sharding to do a better load balancing during the execution time.
      • expand

        public PCollection<KV<K,​java.lang.Iterable<InputT>>> expand​(PCollection<KV<K,​InputT>> input)
        Description copied from class: PTransform
        Override this method to specify how this PTransform should be expanded on the given InputT.

        NOTE: This method should not be called directly. Instead apply the PTransform should be applied to the InputT using the apply method.

        Composite transforms, which are defined in terms of other transforms, should return the output of one of the composed transforms. Non-composite transforms, which do not apply any transforms internally, should return a new unbound output and register evaluators (via backend-specific registration methods).

        Specified by:
        expand in class PTransform<PCollection<KV<K,​InputT>>,​PCollection<KV<K,​java.lang.Iterable<InputT>>>>