Class OffsetBasedSource.OffsetBasedReader<T>

    • Method Detail

      • isDone

        public final boolean isDone()
        Returns true if the last call to start() or advance() returned false.
      • isStarted

        public final boolean isStarted()
        Returns true if there has been a call to start().
      • getCurrentOffset

        protected abstract long getCurrentOffset()
                                          throws java.util.NoSuchElementException
        Returns the starting offset of the current record, which has been read by the last successful Source.Reader.start() or Source.Reader.advance() call.

        If no such call has been made yet, the return value is unspecified.

        See RangeTracker for description of offset semantics.

        Throws:
        java.util.NoSuchElementException
      • isAtSplitPoint

        protected boolean isAtSplitPoint()
                                  throws java.util.NoSuchElementException
        Returns whether the current record is at a split point (i.e., whether the current record would be the first record to be read by a source with a specified start offset of getCurrentOffset()).

        See detailed documentation about split points in RangeTracker.

        Throws:
        java.util.NoSuchElementException
      • start

        public final boolean start()
                            throws java.io.IOException
        Description copied from class: Source.Reader
        Initializes the reader and advances the reader to the first record.

        This method should be called exactly once. The invocation should occur prior to calling Source.Reader.advance() or Source.Reader.getCurrent(). This method may perform expensive operations that are needed to initialize the reader.

        Specified by:
        start in class Source.Reader<T>
        Returns:
        true if a record was read, false if there is no more input available.
        Throws:
        java.io.IOException
      • advance

        public final boolean advance()
                              throws java.io.IOException
        Description copied from class: Source.Reader
        Advances the reader to the next valid record.

        It is an error to call this without having called Source.Reader.start() first.

        Specified by:
        advance in class Source.Reader<T>
        Returns:
        true if a record was read, false if there is no more input available.
        Throws:
        java.io.IOException
      • startImpl

        protected abstract boolean startImpl()
                                      throws java.io.IOException
        Initializes the OffsetBasedSource.OffsetBasedReader and advances to the first record, returning true if there is a record available to be read. This method will be invoked exactly once and may perform expensive setup operations that are needed to initialize the reader.

        This function is the OffsetBasedReader implementation of Source.Reader.start(). The key difference is that the implementor can ignore the possibility that it should no longer produce the first record, either because it has exceeded the original endOffset assigned to the reader, or because a concurrent call to splitAtFraction(double) has changed the source to shrink the offset range being read.

        Throws:
        java.io.IOException
        See Also:
        Source.Reader.start()
      • advanceImpl

        protected abstract boolean advanceImpl()
                                        throws java.io.IOException
        Advances to the next record and returns true, or returns false if there is no next record.

        This function is the OffsetBasedReader implementation of Source.Reader.advance(). The key difference is that the implementor can ignore the possibility that it should no longer produce the next record, either because it has exceeded the original endOffset assigned to the reader, or because a concurrent call to splitAtFraction(double) has changed the source to shrink the offset range being read.

        Throws:
        java.io.IOException
        See Also:
        Source.Reader.advance()
      • getCurrentSource

        public OffsetBasedSource<T> getCurrentSource()
        Description copied from class: BoundedSource.BoundedReader
        Returns a Source describing the same input that this Reader currently reads (including items already read).

        Usage

        Reader subclasses can use this method for convenience to access unchanging properties of the source being read. Alternatively, they can cache these properties in the constructor.

        The framework will call this method in the course of dynamic work rebalancing, e.g. after a successful BoundedSource.BoundedReader.splitAtFraction(double) call.

        Mutability and thread safety

        Remember that Source objects must always be immutable. However, the return value of this function may be affected by dynamic work rebalancing, happening asynchronously via BoundedSource.BoundedReader.splitAtFraction(double), meaning it can return a different Source object. However, the returned object itself will still itself be immutable. Callers must take care not to rely on properties of the returned source that may be asynchronously changed as a result of this process (e.g. do not cache an end offset when reading a file).

        Implementation

        For convenience, subclasses should usually return the most concrete subclass of Source possible. In practice, the implementation of this method should nearly always be one of the following:

        • Source that inherits from a base class that already implements BoundedSource.BoundedReader.getCurrentSource(): delegate to base class. In this case, it is almost always an error for the subclass to maintain its own copy of the source.
          
           public FooReader(FooSource<T> source) {
             super(source);
           }
          
           public FooSource<T> getCurrentSource() {
             return (FooSource<T>)super.getCurrentSource();
           }
           
        • Source that does not support dynamic work rebalancing: return a private final variable.
          
           private final FooSource<T> source;
          
           public FooReader(FooSource<T> source) {
             this.source = source;
           }
          
           public FooSource<T> getCurrentSource() {
             return source;
           }
           
        • BoundedSource.BoundedReader that explicitly supports dynamic work rebalancing: maintain a variable pointing to an immutable source object, and protect it with synchronization.
          
           private FooSource<T> source;
          
           public FooReader(FooSource<T> source) {
             this.source = source;
           }
          
           public synchronized FooSource<T> getCurrentSource() {
             return source;
           }
          
           public synchronized FooSource<T> splitAtFraction(double fraction) {
             ...
             FooSource<T> primary = ...;
             FooSource<T> residual = ...;
             this.source = primary;
             return residual;
           }
           
        Specified by:
        getCurrentSource in class BoundedSource.BoundedReader<T>
      • getSplitPointsConsumed

        public long getSplitPointsConsumed()
        Description copied from class: BoundedSource.BoundedReader
        Returns the total amount of parallelism in the consumed (returned and processed) range of this reader's current BoundedSource (as would be returned by BoundedSource.BoundedReader.getCurrentSource()). This corresponds to all split point records (see RangeTracker) returned by this reader, excluding the last split point returned if the reader is not finished.

        Consider the following examples: (1) An input that can be read in parallel down to the individual records, such as CountingSource.upTo(long), is called "perfectly splittable". (2) a "block-compressed" file format such as AvroIO, in which a block of records has to be read as a whole, but different blocks can be read in parallel. (3) An "unsplittable" input such as a cursor in a database.

        • Any reader that is unstarted (aka, has never had a call to Source.Reader.start()) has a consumed parallelism of 0. This condition holds independent of whether the input is splittable.
        • Any reader that has only returned its first element (aka, has never had a call to Source.Reader.advance()) has a consumed parallelism of 0: the first element is the current element and is still being processed. This condition holds independent of whether the input is splittable.
        • For an empty reader (in which the call to Source.Reader.start() returned false), the consumed parallelism is 0. This condition holds independent of whether the input is splittable.
        • For a non-empty, finished reader (in which the call to Source.Reader.start() returned true and a call to Source.Reader.advance() has returned false), the value returned must be at least 1 and should equal the total parallelism in the source.
        • For example (1): After returning record #30 (starting at 1) out of 50 in a perfectly splittable 50-record input, this value should be 29. When finished, the consumed parallelism should be 50.
        • For example (2): In a block-compressed value consisting of 5 blocks, the value should stay at 0 until the first record of the second block is returned; stay at 1 until the first record of the third block is returned, etc. Only once the end-of-file is reached then the fifth block has been consumed and the value should stay at 5.
        • For example (3): For any non-empty unsplittable input, the consumed parallelism is 0 until the reader is finished (because the last call to Source.Reader.advance() returned false, at which point it becomes 1.

        A reader that is implemented using a RangeTracker is encouraged to use the range tracker's ability to count split points to implement this method. See OffsetBasedSource.OffsetBasedReader and OffsetRangeTracker for an example.

        Defaults to BoundedSource.BoundedReader.SPLIT_POINTS_UNKNOWN. Any value less than 0 will be interpreted as unknown.

        Thread safety

        See the javadoc on BoundedSource.BoundedReader for information about thread safety.
        Overrides:
        getSplitPointsConsumed in class BoundedSource.BoundedReader<T>
        See Also:
        BoundedSource.BoundedReader.getSplitPointsRemaining()
      • getSplitPointsRemaining

        public long getSplitPointsRemaining()
        Description copied from class: BoundedSource.BoundedReader
        Returns the total amount of parallelism in the unprocessed part of this reader's current BoundedSource (as would be returned by BoundedSource.BoundedReader.getCurrentSource()). This corresponds to all unprocessed split point records (see RangeTracker), including the last split point returned, in the remainder part of the source.

        This function should be implemented only in addition to BoundedSource.BoundedReader.getSplitPointsConsumed() and only if an exact value can be returned.

        Consider the following examples: (1) An input that can be read in parallel down to the individual records, such as CountingSource.upTo(long), is called "perfectly splittable". (2) a "block-compressed" file format such as AvroIO, in which a block of records has to be read as a whole, but different blocks can be read in parallel. (3) An "unsplittable" input such as a cursor in a database.

        Assume for examples (1) and (2) that the number of records or blocks remaining is known:

        • Any reader for which the last call to Source.Reader.start() or Source.Reader.advance() has returned true should should not return 0, because this reader itself represents parallelism at least 1. This condition holds independent of whether the input is splittable.
        • A finished reader (for which Source.Reader.start() or Source.Reader.advance()) has returned false should return a value of 0. This condition holds independent of whether the input is splittable.
        • For example 1: After returning record #30 (starting at 1) out of 50 in a perfectly splittable 50-record input, this value should be 21 (20 remaining + 1 current) if the total number of records is known.
        • For example 2: After returning a record in block 3 in a block-compressed file consisting of 5 blocks, this value should be 3 (since blocks 4 and 5 can be processed in parallel by new readers produced via dynamic work rebalancing, while the current reader continues processing block 3) if the total number of blocks is known.
        • For example (3): a reader for any non-empty unsplittable input, should return 1 until it is finished, at which point it should return 0.
        • For any reader: After returning the last split point in a file (e.g., the last record in example (1), the first record in the last block for example (2), or the first record in the file for example (3), this value should be 1: apart from the current task, no additional remainder can be split off.

        Defaults to BoundedSource.BoundedReader.SPLIT_POINTS_UNKNOWN. Any value less than 0 will be interpreted as unknown.

        Thread safety

        See the javadoc on BoundedSource.BoundedReader for information about thread safety.
        Overrides:
        getSplitPointsRemaining in class BoundedSource.BoundedReader<T>
        See Also:
        BoundedSource.BoundedReader.getSplitPointsConsumed()
      • allowsDynamicSplitting

        public boolean allowsDynamicSplitting()
        Whether this reader should allow dynamic splitting of the offset ranges.

        True by default. Override this to return false if the reader cannot support dynamic splitting correctly. If this returns false, splitAtFraction(double) will refuse all split requests.

      • splitAtFraction

        public final OffsetBasedSource<T> splitAtFraction​(double fraction)
        Description copied from class: BoundedSource.BoundedReader
        Tells the reader to narrow the range of the input it's going to read and give up the remainder, so that the new range would contain approximately the given fraction of the amount of data in the current range.

        Returns a BoundedSource representing the remainder.

        Detailed description

        Assuming the following sequence of calls:
        
         BoundedSource<T> initial = reader.getCurrentSource();
         BoundedSource<T> residual = reader.splitAtFraction(fraction);
         BoundedSource<T> primary = reader.getCurrentSource();
         
        • The "primary" and "residual" sources, when read, should together cover the same set of records as "initial".
        • The current reader should continue to be in a valid state, and continuing to read from it should, together with the records it already read, yield the same records as would have been read by "primary".
        • The amount of data read by "primary" should ideally represent approximately the given fraction of the amount of data read by "initial".
        For example, a reader that reads a range of offsets [A, B) in a file might implement this method by truncating the current range to [A, A + fraction*(B-A)) and returning a Source representing the range [A + fraction*(B-A), B).

        This method should return null if the split cannot be performed for this fraction while satisfying the semantics above. E.g., a reader that reads a range of offsets in a file should return null if it is already past the position in its range corresponding to the given fraction. In this case, the method MUST have no effect (the reader must behave as if the method hadn't been called at all).

        Statefulness

        Since this method (if successful) affects the reader's source, in subsequent invocations "fraction" should be interpreted relative to the new current source.

        Thread safety and blocking

        This method will be called concurrently to other methods (however there will not be multiple concurrent invocations of this method itself), and it is critical for it to be implemented in a thread-safe way (otherwise data loss is possible).

        It is also very important that this method always completes quickly. In particular, it should not perform or wait on any blocking operations such as I/O, RPCs etc. Violating this requirement may stall completion of the work item or even cause it to fail.

        It is incorrect to make both this method and Source.Reader.start()/Source.Reader.advance() synchronized, because those methods can perform blocking operations, and then this method would have to wait for those calls to complete.

        RangeTracker makes it easy to implement this method safely and correctly.

        By default, returns null to indicate that splitting is not possible.

        Overrides:
        splitAtFraction in class BoundedSource.BoundedReader<T>