Class GroupByKey<K,​V>

  • Type Parameters:
    K - the type of the keys of the input and output PCollections
    V - the type of the values of the input PCollection and the elements of the Iterables in the output PCollection
    All Implemented Interfaces:
    java.io.Serializable, HasDisplayData

    public class GroupByKey<K,​V>
    extends PTransform<PCollection<KV<K,​V>>,​PCollection<KV<K,​java.lang.Iterable<V>>>>
    GroupByKey<K, V> takes a PCollection<KV<K, V>>, groups the values by key and windows, and returns a PCollection<KV<K, Iterable<V>>> representing a map from each distinct key and window of the input PCollection to an Iterable over all the values associated with that key in the input per window. Absent repeatedly-firing triggering, each key in the output PCollection is unique within each window.

    GroupByKey is analogous to converting a multi-map into a uni-map, and related to GROUP BY in SQL. It corresponds to the "shuffle" step between the Mapper and the Reducer in the MapReduce framework.

    Two keys of type K are compared for equality not by regular Java Object.equals(java.lang.Object), but instead by first encoding each of the keys using the Coder of the keys of the input PCollection, and then comparing the encoded bytes. This admits efficient parallel evaluation. Note that this requires that the Coder of the keys be deterministic (see Coder.verifyDeterministic()). If the key Coder is not deterministic, an exception is thrown at pipeline construction time.

    By default, the Coder of the keys of the output PCollection is the same as that of the keys of the input, and the Coder of the elements of the Iterable values of the output PCollection is the same as the Coder of the values of the input.

    Example of use:

    
     PCollection<KV<String, Doc>> urlDocPairs = ...;
     PCollection<KV<String, Iterable<Doc>>> urlToDocs =
         urlDocPairs.apply(GroupByKey.<String, Doc>create());
     PCollection<R> results =
         urlToDocs.apply(ParDo.of(new DoFn<KV<String, Iterable<Doc>>, R>() {
          @ProcessElement
           public void processElement(ProcessContext c) {
             String url = c.element().getKey();
             Iterable<Doc> docsWithThatUrl = c.element().getValue();
             ... process all docs having that url ...
           }}));
     

    GroupByKey is a key primitive in data-parallel processing, since it is the main way to efficiently bring associated data together into one location. It is also a key determiner of the performance of a data-parallel pipeline.

    See CoGroupByKey for a way to group multiple input PCollections by a common key at once.

    See Combine.PerKey for a common pattern of GroupByKey followed by Combine.GroupedValues.

    When grouping, windows that can be merged according to the WindowFn of the input PCollection will be merged together, and a window pane corresponding to the new, merged window will be created. The items in this pane will be emitted when a trigger fires. By default this will be when the input sources estimate there will be no more data for the window. See AfterWatermark for details on the estimation.

    The timestamp for each emitted pane is determined by the Window.withTimestampCombiner(TimestampCombiner) windowing operation}. The output PCollection will have the same WindowFn as the input.

    If the input PCollection contains late data or the requested TriggerFn can fire before the watermark, then there may be multiple elements output by a GroupByKey that correspond to the same key and window.

    If the WindowFn of the input requires merging, it is not valid to apply another GroupByKey without first applying a new WindowFn or applying Window.remerge().

    See Also:
    Serialized Form
    • Method Detail

      • create

        public static <K,​V> GroupByKey<K,​V> create()
        Returns a GroupByKey<K, V> PTransform.
        Type Parameters:
        K - the type of the keys of the input and output PCollections
        V - the type of the values of the input PCollection and the elements of the Iterables in the output PCollection
      • fewKeys

        public boolean fewKeys()
        Returns whether it groups just few keys.
      • applicableTo

        public static void applicableTo​(PCollection<?> input)
      • expand

        public PCollection<KV<K,​java.lang.Iterable<V>>> expand​(PCollection<KV<K,​V>> input)
        Description copied from class: PTransform
        Override this method to specify how this PTransform should be expanded on the given InputT.

        NOTE: This method should not be called directly. Instead apply the PTransform should be applied to the InputT using the apply method.

        Composite transforms, which are defined in terms of other transforms, should return the output of one of the composed transforms. Non-composite transforms, which do not apply any transforms internally, should return a new unbound output and register evaluators (via backend-specific registration methods).

        Specified by:
        expand in class PTransform<PCollection<KV<K,​V>>,​PCollection<KV<K,​java.lang.Iterable<V>>>>
      • getKeyCoder

        public static <K,​V> Coder<K> getKeyCoder​(Coder<KV<K,​V>> inputCoder)
        Returns the Coder of the keys of the input to this transform, which is also used as the Coder of the keys of the output of this transform.
      • getInputValueCoder

        public static <K,​V> Coder<V> getInputValueCoder​(Coder<KV<K,​V>> inputCoder)
        Returns the Coder of the values of the input to this transform.
      • getOutputKvCoder

        public static <K,​V> KvCoder<K,​java.lang.Iterable<V>> getOutputKvCoder​(Coder<KV<K,​V>> inputCoder)
        Returns the Coder of the output of this transform.
      • populateDisplayData

        public void populateDisplayData​(DisplayData.Builder builder)
        Description copied from class: PTransform
        Register display data for the given transform or component.

        populateDisplayData(DisplayData.Builder) is invoked by Pipeline runners to collect display data via DisplayData.from(HasDisplayData). Implementations may call super.populateDisplayData(builder) in order to register display data in the current namespace, but should otherwise use subcomponent.populateDisplayData(builder) to use the namespace of the subcomponent.

        By default, does not register any display data. Implementors may override this method to provide their own display data.

        Specified by:
        populateDisplayData in interface HasDisplayData
        Overrides:
        populateDisplayData in class PTransform<PCollection<KV<K,​V>>,​PCollection<KV<K,​java.lang.Iterable<V>>>>
        Parameters:
        builder - The builder to populate with display data.
        See Also:
        HasDisplayData