Class BoundedSource.BoundedReader<T>
- java.lang.Object
-
- org.apache.beam.sdk.io.Source.Reader<T>
-
- org.apache.beam.sdk.io.BoundedSource.BoundedReader<T>
-
- All Implemented Interfaces:
java.lang.AutoCloseable
- Direct Known Subclasses:
OffsetBasedSource.OffsetBasedReader
- Enclosing class:
- BoundedSource<T>
@Experimental(SOURCE_SINK) public abstract static class BoundedSource.BoundedReader<T> extends Source.Reader<T>
AReader
that reads a bounded amount of input and supports some additional operations, such as progress estimation and dynamic work rebalancing.Boundedness
Once
Source.Reader.start()
orSource.Reader.advance()
has returned false, neither will be called again on this object.Thread safety
All methods will be run from the same thread except
splitAtFraction(double)
,getFractionConsumed()
,getCurrentSource()
,getSplitPointsConsumed()
, andgetSplitPointsRemaining()
, all of which can be called concurrently from a different thread. There will not be multiple concurrent calls tosplitAtFraction(double)
.It must be safe to call
splitAtFraction(double)
,getFractionConsumed()
,getCurrentSource()
,getSplitPointsConsumed()
, andgetSplitPointsRemaining()
concurrently with other methods.Additionally, a successful
splitAtFraction(double)
call must, by definition, causegetCurrentSource()
to start returning a different value. Callers ofgetCurrentSource()
need to be aware of the possibility that the returned value can change at any time, and must only access the properties of the source returned bygetCurrentSource()
which do not change betweensplitAtFraction(double)
calls.Implementing
splitAtFraction(double)
In the course of dynamic work rebalancing, the method
splitAtFraction(double)
may be called concurrently withSource.Reader.advance()
orSource.Reader.start()
. It is critical that their interaction is implemented in a thread-safe way, otherwise data loss is possible.Sources which support dynamic work rebalancing should use
RangeTracker
to manage the (source-specific) range of positions that is being split.
-
-
Field Summary
Fields Modifier and Type Field Description static long
SPLIT_POINTS_UNKNOWN
A constant to use as the return value forgetSplitPointsConsumed()
orgetSplitPointsRemaining()
when the exact value is unknown.
-
Constructor Summary
Constructors Constructor Description BoundedReader()
-
Method Summary
All Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description abstract BoundedSource<T>
getCurrentSource()
Returns aSource
describing the same input that thisReader
currently reads (including items already read).org.joda.time.Instant
getCurrentTimestamp()
By default, returns the minimum possible timestamp.@Nullable java.lang.Double
getFractionConsumed()
Returns a value in [0, 1] representing approximately what fraction of thecurrent source
this reader has read so far, ornull
if such an estimate is not available.long
getSplitPointsConsumed()
Returns the total amount of parallelism in the consumed (returned and processed) range of this reader's currentBoundedSource
(as would be returned bygetCurrentSource()
).long
getSplitPointsRemaining()
Returns the total amount of parallelism in the unprocessed part of this reader's currentBoundedSource
(as would be returned bygetCurrentSource()
).@Nullable BoundedSource<T>
splitAtFraction(double fraction)
Tells the reader to narrow the range of the input it's going to read and give up the remainder, so that the new range would contain approximately the given fraction of the amount of data in the current range.-
Methods inherited from class org.apache.beam.sdk.io.Source.Reader
advance, close, getCurrent, start
-
-
-
-
Field Detail
-
SPLIT_POINTS_UNKNOWN
public static final long SPLIT_POINTS_UNKNOWN
A constant to use as the return value forgetSplitPointsConsumed()
orgetSplitPointsRemaining()
when the exact value is unknown.- See Also:
- Constant Field Values
-
-
Method Detail
-
getFractionConsumed
public @Nullable java.lang.Double getFractionConsumed()
Returns a value in [0, 1] representing approximately what fraction of thecurrent source
this reader has read so far, ornull
if such an estimate is not available.It is recommended that this method should satisfy the following properties:
- Should return 0 before the
Source.Reader.start()
call. - Should return 1 after a
Source.Reader.start()
orSource.Reader.advance()
call that returns false. - The returned values should be non-decreasing (though they don't have to be unique).
By default, returns null to indicate that this cannot be estimated.
Thread safety
IfsplitAtFraction(double)
is implemented, this method can be called concurrently to other methods (including itself), and it is therefore critical for it to be implemented in a thread-safe way. - Should return 0 before the
-
getSplitPointsConsumed
public long getSplitPointsConsumed()
Returns the total amount of parallelism in the consumed (returned and processed) range of this reader's currentBoundedSource
(as would be returned bygetCurrentSource()
). This corresponds to all split point records (seeRangeTracker
) returned by this reader, excluding the last split point returned if the reader is not finished.Consider the following examples: (1) An input that can be read in parallel down to the individual records, such as
CountingSource.upTo(long)
, is called "perfectly splittable". (2) a "block-compressed" file format such asAvroIO
, in which a block of records has to be read as a whole, but different blocks can be read in parallel. (3) An "unsplittable" input such as a cursor in a database.- Any
reader
that is unstarted (aka, has never had a call toSource.Reader.start()
) has a consumed parallelism of 0. This condition holds independent of whether the input is splittable. - Any
reader
that has only returned its first element (aka, has never had a call toSource.Reader.advance()
) has a consumed parallelism of 0: the first element is the current element and is still being processed. This condition holds independent of whether the input is splittable. - For an empty reader (in which the call to
Source.Reader.start()
returned false), the consumed parallelism is 0. This condition holds independent of whether the input is splittable. - For a non-empty, finished reader (in which the call to
Source.Reader.start()
returned true and a call toSource.Reader.advance()
has returned false), the value returned must be at least 1 and should equal the total parallelism in the source. - For example (1): After returning record #30 (starting at 1) out of 50 in a perfectly splittable 50-record input, this value should be 29. When finished, the consumed parallelism should be 50.
- For example (2): In a block-compressed value consisting of 5 blocks, the value should stay at 0 until the first record of the second block is returned; stay at 1 until the first record of the third block is returned, etc. Only once the end-of-file is reached then the fifth block has been consumed and the value should stay at 5.
- For example (3): For any non-empty unsplittable input, the consumed parallelism is 0
until the reader is finished (because the last call to
Source.Reader.advance()
returned false, at which point it becomes 1.
A reader that is implemented using a
RangeTracker
is encouraged to use the range tracker's ability to count split points to implement this method. SeeOffsetBasedSource.OffsetBasedReader
andOffsetRangeTracker
for an example.Defaults to
SPLIT_POINTS_UNKNOWN
. Any value less than 0 will be interpreted as unknown.Thread safety
See the javadoc onBoundedSource.BoundedReader
for information about thread safety.- See Also:
getSplitPointsRemaining()
- Any
-
getSplitPointsRemaining
public long getSplitPointsRemaining()
Returns the total amount of parallelism in the unprocessed part of this reader's currentBoundedSource
(as would be returned bygetCurrentSource()
). This corresponds to all unprocessed split point records (seeRangeTracker
), including the last split point returned, in the remainder part of the source.This function should be implemented only in addition to
getSplitPointsConsumed()
and only if an exact value can be returned.Consider the following examples: (1) An input that can be read in parallel down to the individual records, such as
CountingSource.upTo(long)
, is called "perfectly splittable". (2) a "block-compressed" file format such asAvroIO
, in which a block of records has to be read as a whole, but different blocks can be read in parallel. (3) An "unsplittable" input such as a cursor in a database.Assume for examples (1) and (2) that the number of records or blocks remaining is known:
- Any
reader
for which the last call toSource.Reader.start()
orSource.Reader.advance()
has returned true should should not return 0, because this reader itself represents parallelism at least 1. This condition holds independent of whether the input is splittable. - A finished reader (for which
Source.Reader.start()
orSource.Reader.advance()
) has returned false should return a value of 0. This condition holds independent of whether the input is splittable. - For example 1: After returning record #30 (starting at 1) out of 50 in a perfectly splittable 50-record input, this value should be 21 (20 remaining + 1 current) if the total number of records is known.
- For example 2: After returning a record in block 3 in a block-compressed file consisting of 5 blocks, this value should be 3 (since blocks 4 and 5 can be processed in parallel by new readers produced via dynamic work rebalancing, while the current reader continues processing block 3) if the total number of blocks is known.
- For example (3): a reader for any non-empty unsplittable input, should return 1 until it is finished, at which point it should return 0.
- For any reader: After returning the last split point in a file (e.g., the last record in example (1), the first record in the last block for example (2), or the first record in the file for example (3), this value should be 1: apart from the current task, no additional remainder can be split off.
Defaults to
SPLIT_POINTS_UNKNOWN
. Any value less than 0 will be interpreted as unknown.Thread safety
See the javadoc onBoundedSource.BoundedReader
for information about thread safety.- See Also:
getSplitPointsConsumed()
- Any
-
getCurrentSource
public abstract BoundedSource<T> getCurrentSource()
Returns aSource
describing the same input that thisReader
currently reads (including items already read).Usage
Reader subclasses can use this method for convenience to access unchanging properties of the source being read. Alternatively, they can cache these properties in the constructor.
The framework will call this method in the course of dynamic work rebalancing, e.g. after a successful
splitAtFraction(double)
call.Mutability and thread safety
Remember that
Source
objects must always be immutable. However, the return value of this function may be affected by dynamic work rebalancing, happening asynchronously viasplitAtFraction(double)
, meaning it can return a differentSource
object. However, the returned object itself will still itself be immutable. Callers must take care not to rely on properties of the returned source that may be asynchronously changed as a result of this process (e.g. do not cache an end offset when reading a file).Implementation
For convenience, subclasses should usually return the most concrete subclass of
Source
possible. In practice, the implementation of this method should nearly always be one of the following:- Source that inherits from a base class that already implements
getCurrentSource()
: delegate to base class. In this case, it is almost always an error for the subclass to maintain its own copy of the source.public FooReader(FooSource<T> source) { super(source); } public FooSource<T> getCurrentSource() { return (FooSource<T>)super.getCurrentSource(); }
- Source that does not support dynamic work rebalancing: return a private final variable.
private final FooSource<T> source; public FooReader(FooSource<T> source) { this.source = source; } public FooSource<T> getCurrentSource() { return source; }
BoundedSource.BoundedReader
that explicitly supports dynamic work rebalancing: maintain a variable pointing to an immutable source object, and protect it with synchronization.private FooSource<T> source; public FooReader(FooSource<T> source) { this.source = source; } public synchronized FooSource<T> getCurrentSource() { return source; } public synchronized FooSource<T> splitAtFraction(double fraction) { ... FooSource<T> primary = ...; FooSource<T> residual = ...; this.source = primary; return residual; }
- Specified by:
getCurrentSource
in classSource.Reader<T>
- Source that inherits from a base class that already implements
-
splitAtFraction
public @Nullable BoundedSource<T> splitAtFraction(double fraction)
Tells the reader to narrow the range of the input it's going to read and give up the remainder, so that the new range would contain approximately the given fraction of the amount of data in the current range.Returns a
BoundedSource
representing the remainder.Detailed description
Assuming the following sequence of calls:BoundedSource<T> initial = reader.getCurrentSource(); BoundedSource<T> residual = reader.splitAtFraction(fraction); BoundedSource<T> primary = reader.getCurrentSource();
- The "primary" and "residual" sources, when read, should together cover the same set of records as "initial".
- The current reader should continue to be in a valid state, and continuing to read from it should, together with the records it already read, yield the same records as would have been read by "primary".
- The amount of data read by "primary" should ideally represent approximately the given fraction of the amount of data read by "initial".
This method should return
null
if the split cannot be performed for this fraction while satisfying the semantics above. E.g., a reader that reads a range of offsets in a file should returnnull
if it is already past the position in its range corresponding to the given fraction. In this case, the method MUST have no effect (the reader must behave as if the method hadn't been called at all).Statefulness
Since this method (if successful) affects the reader's source, in subsequent invocations "fraction" should be interpreted relative to the new current source.Thread safety and blocking
This method will be called concurrently to other methods (however there will not be multiple concurrent invocations of this method itself), and it is critical for it to be implemented in a thread-safe way (otherwise data loss is possible).It is also very important that this method always completes quickly. In particular, it should not perform or wait on any blocking operations such as I/O, RPCs etc. Violating this requirement may stall completion of the work item or even cause it to fail.
It is incorrect to make both this method and
Source.Reader.start()
/Source.Reader.advance()
synchronized
, because those methods can perform blocking operations, and then this method would have to wait for those calls to complete.RangeTracker
makes it easy to implement this method safely and correctly.By default, returns null to indicate that splitting is not possible.
-
getCurrentTimestamp
public org.joda.time.Instant getCurrentTimestamp() throws java.util.NoSuchElementException
By default, returns the minimum possible timestamp.- Specified by:
getCurrentTimestamp
in classSource.Reader<T>
- Throws:
java.util.NoSuchElementException
- if the reader is at the beginning of the input andSource.Reader.start()
orSource.Reader.advance()
wasn't called, or if the lastSource.Reader.start()
orSource.Reader.advance()
returnedfalse
.
-
-