Class BoundedSource.BoundedReader<T>

    • Constructor Detail

      • BoundedReader

        public BoundedReader()
    • Method Detail

      • getFractionConsumed

        public @Nullable java.lang.Double getFractionConsumed()
        Returns a value in [0, 1] representing approximately what fraction of the current source this reader has read so far, or null if such an estimate is not available.

        It is recommended that this method should satisfy the following properties:

        By default, returns null to indicate that this cannot be estimated.

        Thread safety

        If splitAtFraction(double) is implemented, this method can be called concurrently to other methods (including itself), and it is therefore critical for it to be implemented in a thread-safe way.
      • getSplitPointsConsumed

        public long getSplitPointsConsumed()
        Returns the total amount of parallelism in the consumed (returned and processed) range of this reader's current BoundedSource (as would be returned by getCurrentSource()). This corresponds to all split point records (see RangeTracker) returned by this reader, excluding the last split point returned if the reader is not finished.

        Consider the following examples: (1) An input that can be read in parallel down to the individual records, such as CountingSource.upTo(long), is called "perfectly splittable". (2) a "block-compressed" file format such as AvroIO, in which a block of records has to be read as a whole, but different blocks can be read in parallel. (3) An "unsplittable" input such as a cursor in a database.

        • Any reader that is unstarted (aka, has never had a call to Source.Reader.start()) has a consumed parallelism of 0. This condition holds independent of whether the input is splittable.
        • Any reader that has only returned its first element (aka, has never had a call to Source.Reader.advance()) has a consumed parallelism of 0: the first element is the current element and is still being processed. This condition holds independent of whether the input is splittable.
        • For an empty reader (in which the call to Source.Reader.start() returned false), the consumed parallelism is 0. This condition holds independent of whether the input is splittable.
        • For a non-empty, finished reader (in which the call to Source.Reader.start() returned true and a call to Source.Reader.advance() has returned false), the value returned must be at least 1 and should equal the total parallelism in the source.
        • For example (1): After returning record #30 (starting at 1) out of 50 in a perfectly splittable 50-record input, this value should be 29. When finished, the consumed parallelism should be 50.
        • For example (2): In a block-compressed value consisting of 5 blocks, the value should stay at 0 until the first record of the second block is returned; stay at 1 until the first record of the third block is returned, etc. Only once the end-of-file is reached then the fifth block has been consumed and the value should stay at 5.
        • For example (3): For any non-empty unsplittable input, the consumed parallelism is 0 until the reader is finished (because the last call to Source.Reader.advance() returned false, at which point it becomes 1.

        A reader that is implemented using a RangeTracker is encouraged to use the range tracker's ability to count split points to implement this method. See OffsetBasedSource.OffsetBasedReader and OffsetRangeTracker for an example.

        Defaults to SPLIT_POINTS_UNKNOWN. Any value less than 0 will be interpreted as unknown.

        Thread safety

        See the javadoc on BoundedSource.BoundedReader for information about thread safety.
        See Also:
        getSplitPointsRemaining()
      • getSplitPointsRemaining

        public long getSplitPointsRemaining()
        Returns the total amount of parallelism in the unprocessed part of this reader's current BoundedSource (as would be returned by getCurrentSource()). This corresponds to all unprocessed split point records (see RangeTracker), including the last split point returned, in the remainder part of the source.

        This function should be implemented only in addition to getSplitPointsConsumed() and only if an exact value can be returned.

        Consider the following examples: (1) An input that can be read in parallel down to the individual records, such as CountingSource.upTo(long), is called "perfectly splittable". (2) a "block-compressed" file format such as AvroIO, in which a block of records has to be read as a whole, but different blocks can be read in parallel. (3) An "unsplittable" input such as a cursor in a database.

        Assume for examples (1) and (2) that the number of records or blocks remaining is known:

        • Any reader for which the last call to Source.Reader.start() or Source.Reader.advance() has returned true should should not return 0, because this reader itself represents parallelism at least 1. This condition holds independent of whether the input is splittable.
        • A finished reader (for which Source.Reader.start() or Source.Reader.advance()) has returned false should return a value of 0. This condition holds independent of whether the input is splittable.
        • For example 1: After returning record #30 (starting at 1) out of 50 in a perfectly splittable 50-record input, this value should be 21 (20 remaining + 1 current) if the total number of records is known.
        • For example 2: After returning a record in block 3 in a block-compressed file consisting of 5 blocks, this value should be 3 (since blocks 4 and 5 can be processed in parallel by new readers produced via dynamic work rebalancing, while the current reader continues processing block 3) if the total number of blocks is known.
        • For example (3): a reader for any non-empty unsplittable input, should return 1 until it is finished, at which point it should return 0.
        • For any reader: After returning the last split point in a file (e.g., the last record in example (1), the first record in the last block for example (2), or the first record in the file for example (3), this value should be 1: apart from the current task, no additional remainder can be split off.

        Defaults to SPLIT_POINTS_UNKNOWN. Any value less than 0 will be interpreted as unknown.

        Thread safety

        See the javadoc on BoundedSource.BoundedReader for information about thread safety.
        See Also:
        getSplitPointsConsumed()
      • getCurrentSource

        public abstract BoundedSource<T> getCurrentSource()
        Returns a Source describing the same input that this Reader currently reads (including items already read).

        Usage

        Reader subclasses can use this method for convenience to access unchanging properties of the source being read. Alternatively, they can cache these properties in the constructor.

        The framework will call this method in the course of dynamic work rebalancing, e.g. after a successful splitAtFraction(double) call.

        Mutability and thread safety

        Remember that Source objects must always be immutable. However, the return value of this function may be affected by dynamic work rebalancing, happening asynchronously via splitAtFraction(double), meaning it can return a different Source object. However, the returned object itself will still itself be immutable. Callers must take care not to rely on properties of the returned source that may be asynchronously changed as a result of this process (e.g. do not cache an end offset when reading a file).

        Implementation

        For convenience, subclasses should usually return the most concrete subclass of Source possible. In practice, the implementation of this method should nearly always be one of the following:

        • Source that inherits from a base class that already implements getCurrentSource(): delegate to base class. In this case, it is almost always an error for the subclass to maintain its own copy of the source.
          
           public FooReader(FooSource<T> source) {
             super(source);
           }
          
           public FooSource<T> getCurrentSource() {
             return (FooSource<T>)super.getCurrentSource();
           }
           
        • Source that does not support dynamic work rebalancing: return a private final variable.
          
           private final FooSource<T> source;
          
           public FooReader(FooSource<T> source) {
             this.source = source;
           }
          
           public FooSource<T> getCurrentSource() {
             return source;
           }
           
        • BoundedSource.BoundedReader that explicitly supports dynamic work rebalancing: maintain a variable pointing to an immutable source object, and protect it with synchronization.
          
           private FooSource<T> source;
          
           public FooReader(FooSource<T> source) {
             this.source = source;
           }
          
           public synchronized FooSource<T> getCurrentSource() {
             return source;
           }
          
           public synchronized FooSource<T> splitAtFraction(double fraction) {
             ...
             FooSource<T> primary = ...;
             FooSource<T> residual = ...;
             this.source = primary;
             return residual;
           }
           
        Specified by:
        getCurrentSource in class Source.Reader<T>
      • splitAtFraction

        public @Nullable BoundedSource<T> splitAtFraction​(double fraction)
        Tells the reader to narrow the range of the input it's going to read and give up the remainder, so that the new range would contain approximately the given fraction of the amount of data in the current range.

        Returns a BoundedSource representing the remainder.

        Detailed description

        Assuming the following sequence of calls:
        
         BoundedSource<T> initial = reader.getCurrentSource();
         BoundedSource<T> residual = reader.splitAtFraction(fraction);
         BoundedSource<T> primary = reader.getCurrentSource();
         
        • The "primary" and "residual" sources, when read, should together cover the same set of records as "initial".
        • The current reader should continue to be in a valid state, and continuing to read from it should, together with the records it already read, yield the same records as would have been read by "primary".
        • The amount of data read by "primary" should ideally represent approximately the given fraction of the amount of data read by "initial".
        For example, a reader that reads a range of offsets [A, B) in a file might implement this method by truncating the current range to [A, A + fraction*(B-A)) and returning a Source representing the range [A + fraction*(B-A), B).

        This method should return null if the split cannot be performed for this fraction while satisfying the semantics above. E.g., a reader that reads a range of offsets in a file should return null if it is already past the position in its range corresponding to the given fraction. In this case, the method MUST have no effect (the reader must behave as if the method hadn't been called at all).

        Statefulness

        Since this method (if successful) affects the reader's source, in subsequent invocations "fraction" should be interpreted relative to the new current source.

        Thread safety and blocking

        This method will be called concurrently to other methods (however there will not be multiple concurrent invocations of this method itself), and it is critical for it to be implemented in a thread-safe way (otherwise data loss is possible).

        It is also very important that this method always completes quickly. In particular, it should not perform or wait on any blocking operations such as I/O, RPCs etc. Violating this requirement may stall completion of the work item or even cause it to fail.

        It is incorrect to make both this method and Source.Reader.start()/Source.Reader.advance() synchronized, because those methods can perform blocking operations, and then this method would have to wait for those calls to complete.

        RangeTracker makes it easy to implement this method safely and correctly.

        By default, returns null to indicate that splitting is not possible.