Class OffsetBasedSource.OffsetBasedReader<T>
- java.lang.Object
-
- org.apache.beam.sdk.io.Source.Reader<T>
-
- org.apache.beam.sdk.io.BoundedSource.BoundedReader<T>
-
- org.apache.beam.sdk.io.OffsetBasedSource.OffsetBasedReader<T>
-
- All Implemented Interfaces:
java.lang.AutoCloseable
- Direct Known Subclasses:
FileBasedSource.FileBasedReader
- Enclosing class:
- OffsetBasedSource<T>
public abstract static class OffsetBasedSource.OffsetBasedReader<T> extends BoundedSource.BoundedReader<T>
ASource.Reader
that implements code common to readers of allOffsetBasedSource
s.Subclasses have to implement:
- The methods
startImpl()
andadvanceImpl()
for reading the first or subsequent records. - The methods
Source.Reader.getCurrent()
,getCurrentOffset()
, and optionallyisAtSplitPoint()
andBoundedSource.BoundedReader.getCurrentTimestamp()
to access properties of the last record successfully read bystartImpl()
oradvanceImpl()
.
-
-
Field Summary
-
Fields inherited from class org.apache.beam.sdk.io.BoundedSource.BoundedReader
SPLIT_POINTS_UNKNOWN
-
-
Constructor Summary
Constructors Constructor Description OffsetBasedReader(OffsetBasedSource<T> source)
-
Method Summary
All Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description boolean
advance()
Advances the reader to the next valid record.protected abstract boolean
advanceImpl()
Advances to the next record and returnstrue
, or returns false if there is no next record.boolean
allowsDynamicSplitting()
Whether this reader should allow dynamic splitting of the offset ranges.protected abstract long
getCurrentOffset()
Returns the starting offset of thecurrent record
, which has been read by the last successfulSource.Reader.start()
orSource.Reader.advance()
call.OffsetBasedSource<T>
getCurrentSource()
Returns aSource
describing the same input that thisReader
currently reads (including items already read).java.lang.Double
getFractionConsumed()
Returns a value in [0, 1] representing approximately what fraction of thecurrent source
this reader has read so far, ornull
if such an estimate is not available.long
getSplitPointsConsumed()
Returns the total amount of parallelism in the consumed (returned and processed) range of this reader's currentBoundedSource
(as would be returned byBoundedSource.BoundedReader.getCurrentSource()
).long
getSplitPointsRemaining()
Returns the total amount of parallelism in the unprocessed part of this reader's currentBoundedSource
(as would be returned byBoundedSource.BoundedReader.getCurrentSource()
).protected boolean
isAtSplitPoint()
Returns whether the current record is at a split point (i.e., whether the current record would be the first record to be read by a source with a specified start offset ofgetCurrentOffset()
).boolean
isDone()
boolean
isStarted()
Returns true if there has been a call tostart()
.OffsetBasedSource<T>
splitAtFraction(double fraction)
Tells the reader to narrow the range of the input it's going to read and give up the remainder, so that the new range would contain approximately the given fraction of the amount of data in the current range.boolean
start()
Initializes the reader and advances the reader to the first record.protected abstract boolean
startImpl()
Initializes theOffsetBasedSource.OffsetBasedReader
and advances to the first record, returningtrue
if there is a record available to be read.-
Methods inherited from class org.apache.beam.sdk.io.BoundedSource.BoundedReader
getCurrentTimestamp
-
Methods inherited from class org.apache.beam.sdk.io.Source.Reader
close, getCurrent
-
-
-
-
Constructor Detail
-
OffsetBasedReader
public OffsetBasedReader(OffsetBasedSource<T> source)
- Parameters:
source
- theOffsetBasedSource
to be read by the current reader.
-
-
Method Detail
-
isDone
public final boolean isDone()
-
isStarted
public final boolean isStarted()
Returns true if there has been a call tostart()
.
-
getCurrentOffset
protected abstract long getCurrentOffset() throws java.util.NoSuchElementException
Returns the starting offset of thecurrent record
, which has been read by the last successfulSource.Reader.start()
orSource.Reader.advance()
call.If no such call has been made yet, the return value is unspecified.
See
RangeTracker
for description of offset semantics.- Throws:
java.util.NoSuchElementException
-
isAtSplitPoint
protected boolean isAtSplitPoint() throws java.util.NoSuchElementException
Returns whether the current record is at a split point (i.e., whether the current record would be the first record to be read by a source with a specified start offset ofgetCurrentOffset()
).See detailed documentation about split points in
RangeTracker
.- Throws:
java.util.NoSuchElementException
-
start
public final boolean start() throws java.io.IOException
Description copied from class:Source.Reader
Initializes the reader and advances the reader to the first record.This method should be called exactly once. The invocation should occur prior to calling
Source.Reader.advance()
orSource.Reader.getCurrent()
. This method may perform expensive operations that are needed to initialize the reader.- Specified by:
start
in classSource.Reader<T>
- Returns:
true
if a record was read,false
if there is no more input available.- Throws:
java.io.IOException
-
advance
public final boolean advance() throws java.io.IOException
Description copied from class:Source.Reader
Advances the reader to the next valid record.It is an error to call this without having called
Source.Reader.start()
first.- Specified by:
advance
in classSource.Reader<T>
- Returns:
true
if a record was read,false
if there is no more input available.- Throws:
java.io.IOException
-
startImpl
protected abstract boolean startImpl() throws java.io.IOException
Initializes theOffsetBasedSource.OffsetBasedReader
and advances to the first record, returningtrue
if there is a record available to be read. This method will be invoked exactly once and may perform expensive setup operations that are needed to initialize the reader.This function is the
OffsetBasedReader
implementation ofSource.Reader.start()
. The key difference is that the implementor can ignore the possibility that it should no longer produce the first record, either because it has exceeded the originalendOffset
assigned to the reader, or because a concurrent call tosplitAtFraction(double)
has changed the source to shrink the offset range being read.- Throws:
java.io.IOException
- See Also:
Source.Reader.start()
-
advanceImpl
protected abstract boolean advanceImpl() throws java.io.IOException
Advances to the next record and returnstrue
, or returns false if there is no next record.This function is the
OffsetBasedReader
implementation ofSource.Reader.advance()
. The key difference is that the implementor can ignore the possibility that it should no longer produce the next record, either because it has exceeded the originalendOffset
assigned to the reader, or because a concurrent call tosplitAtFraction(double)
has changed the source to shrink the offset range being read.- Throws:
java.io.IOException
- See Also:
Source.Reader.advance()
-
getCurrentSource
public OffsetBasedSource<T> getCurrentSource()
Description copied from class:BoundedSource.BoundedReader
Returns aSource
describing the same input that thisReader
currently reads (including items already read).Usage
Reader subclasses can use this method for convenience to access unchanging properties of the source being read. Alternatively, they can cache these properties in the constructor.
The framework will call this method in the course of dynamic work rebalancing, e.g. after a successful
BoundedSource.BoundedReader.splitAtFraction(double)
call.Mutability and thread safety
Remember that
Source
objects must always be immutable. However, the return value of this function may be affected by dynamic work rebalancing, happening asynchronously viaBoundedSource.BoundedReader.splitAtFraction(double)
, meaning it can return a differentSource
object. However, the returned object itself will still itself be immutable. Callers must take care not to rely on properties of the returned source that may be asynchronously changed as a result of this process (e.g. do not cache an end offset when reading a file).Implementation
For convenience, subclasses should usually return the most concrete subclass of
Source
possible. In practice, the implementation of this method should nearly always be one of the following:- Source that inherits from a base class that already implements
BoundedSource.BoundedReader.getCurrentSource()
: delegate to base class. In this case, it is almost always an error for the subclass to maintain its own copy of the source.public FooReader(FooSource<T> source) { super(source); } public FooSource<T> getCurrentSource() { return (FooSource<T>)super.getCurrentSource(); }
- Source that does not support dynamic work rebalancing: return a private final variable.
private final FooSource<T> source; public FooReader(FooSource<T> source) { this.source = source; } public FooSource<T> getCurrentSource() { return source; }
BoundedSource.BoundedReader
that explicitly supports dynamic work rebalancing: maintain a variable pointing to an immutable source object, and protect it with synchronization.private FooSource<T> source; public FooReader(FooSource<T> source) { this.source = source; } public synchronized FooSource<T> getCurrentSource() { return source; } public synchronized FooSource<T> splitAtFraction(double fraction) { ... FooSource<T> primary = ...; FooSource<T> residual = ...; this.source = primary; return residual; }
- Specified by:
getCurrentSource
in classBoundedSource.BoundedReader<T>
- Source that inherits from a base class that already implements
-
getFractionConsumed
public java.lang.Double getFractionConsumed()
Description copied from class:BoundedSource.BoundedReader
Returns a value in [0, 1] representing approximately what fraction of thecurrent source
this reader has read so far, ornull
if such an estimate is not available.It is recommended that this method should satisfy the following properties:
- Should return 0 before the
Source.Reader.start()
call. - Should return 1 after a
Source.Reader.start()
orSource.Reader.advance()
call that returns false. - The returned values should be non-decreasing (though they don't have to be unique).
By default, returns null to indicate that this cannot be estimated.
Thread safety
IfBoundedSource.BoundedReader.splitAtFraction(double)
is implemented, this method can be called concurrently to other methods (including itself), and it is therefore critical for it to be implemented in a thread-safe way.- Overrides:
getFractionConsumed
in classBoundedSource.BoundedReader<T>
- Should return 0 before the
-
getSplitPointsConsumed
public long getSplitPointsConsumed()
Description copied from class:BoundedSource.BoundedReader
Returns the total amount of parallelism in the consumed (returned and processed) range of this reader's currentBoundedSource
(as would be returned byBoundedSource.BoundedReader.getCurrentSource()
). This corresponds to all split point records (seeRangeTracker
) returned by this reader, excluding the last split point returned if the reader is not finished.Consider the following examples: (1) An input that can be read in parallel down to the individual records, such as
CountingSource.upTo(long)
, is called "perfectly splittable". (2) a "block-compressed" file format such asAvroIO
, in which a block of records has to be read as a whole, but different blocks can be read in parallel. (3) An "unsplittable" input such as a cursor in a database.- Any
reader
that is unstarted (aka, has never had a call toSource.Reader.start()
) has a consumed parallelism of 0. This condition holds independent of whether the input is splittable. - Any
reader
that has only returned its first element (aka, has never had a call toSource.Reader.advance()
) has a consumed parallelism of 0: the first element is the current element and is still being processed. This condition holds independent of whether the input is splittable. - For an empty reader (in which the call to
Source.Reader.start()
returned false), the consumed parallelism is 0. This condition holds independent of whether the input is splittable. - For a non-empty, finished reader (in which the call to
Source.Reader.start()
returned true and a call toSource.Reader.advance()
has returned false), the value returned must be at least 1 and should equal the total parallelism in the source. - For example (1): After returning record #30 (starting at 1) out of 50 in a perfectly splittable 50-record input, this value should be 29. When finished, the consumed parallelism should be 50.
- For example (2): In a block-compressed value consisting of 5 blocks, the value should stay at 0 until the first record of the second block is returned; stay at 1 until the first record of the third block is returned, etc. Only once the end-of-file is reached then the fifth block has been consumed and the value should stay at 5.
- For example (3): For any non-empty unsplittable input, the consumed parallelism is 0
until the reader is finished (because the last call to
Source.Reader.advance()
returned false, at which point it becomes 1.
A reader that is implemented using a
RangeTracker
is encouraged to use the range tracker's ability to count split points to implement this method. SeeOffsetBasedSource.OffsetBasedReader
andOffsetRangeTracker
for an example.Defaults to
BoundedSource.BoundedReader.SPLIT_POINTS_UNKNOWN
. Any value less than 0 will be interpreted as unknown.Thread safety
See the javadoc onBoundedSource.BoundedReader
for information about thread safety.- Overrides:
getSplitPointsConsumed
in classBoundedSource.BoundedReader<T>
- See Also:
BoundedSource.BoundedReader.getSplitPointsRemaining()
- Any
-
getSplitPointsRemaining
public long getSplitPointsRemaining()
Description copied from class:BoundedSource.BoundedReader
Returns the total amount of parallelism in the unprocessed part of this reader's currentBoundedSource
(as would be returned byBoundedSource.BoundedReader.getCurrentSource()
). This corresponds to all unprocessed split point records (seeRangeTracker
), including the last split point returned, in the remainder part of the source.This function should be implemented only in addition to
BoundedSource.BoundedReader.getSplitPointsConsumed()
and only if an exact value can be returned.Consider the following examples: (1) An input that can be read in parallel down to the individual records, such as
CountingSource.upTo(long)
, is called "perfectly splittable". (2) a "block-compressed" file format such asAvroIO
, in which a block of records has to be read as a whole, but different blocks can be read in parallel. (3) An "unsplittable" input such as a cursor in a database.Assume for examples (1) and (2) that the number of records or blocks remaining is known:
- Any
reader
for which the last call toSource.Reader.start()
orSource.Reader.advance()
has returned true should should not return 0, because this reader itself represents parallelism at least 1. This condition holds independent of whether the input is splittable. - A finished reader (for which
Source.Reader.start()
orSource.Reader.advance()
) has returned false should return a value of 0. This condition holds independent of whether the input is splittable. - For example 1: After returning record #30 (starting at 1) out of 50 in a perfectly splittable 50-record input, this value should be 21 (20 remaining + 1 current) if the total number of records is known.
- For example 2: After returning a record in block 3 in a block-compressed file consisting of 5 blocks, this value should be 3 (since blocks 4 and 5 can be processed in parallel by new readers produced via dynamic work rebalancing, while the current reader continues processing block 3) if the total number of blocks is known.
- For example (3): a reader for any non-empty unsplittable input, should return 1 until it is finished, at which point it should return 0.
- For any reader: After returning the last split point in a file (e.g., the last record in example (1), the first record in the last block for example (2), or the first record in the file for example (3), this value should be 1: apart from the current task, no additional remainder can be split off.
Defaults to
BoundedSource.BoundedReader.SPLIT_POINTS_UNKNOWN
. Any value less than 0 will be interpreted as unknown.Thread safety
See the javadoc onBoundedSource.BoundedReader
for information about thread safety.- Overrides:
getSplitPointsRemaining
in classBoundedSource.BoundedReader<T>
- See Also:
BoundedSource.BoundedReader.getSplitPointsConsumed()
- Any
-
allowsDynamicSplitting
public boolean allowsDynamicSplitting()
Whether this reader should allow dynamic splitting of the offset ranges.True by default. Override this to return false if the reader cannot support dynamic splitting correctly. If this returns false,
splitAtFraction(double)
will refuse all split requests.
-
splitAtFraction
public final OffsetBasedSource<T> splitAtFraction(double fraction)
Description copied from class:BoundedSource.BoundedReader
Tells the reader to narrow the range of the input it's going to read and give up the remainder, so that the new range would contain approximately the given fraction of the amount of data in the current range.Returns a
BoundedSource
representing the remainder.Detailed description
Assuming the following sequence of calls:BoundedSource<T> initial = reader.getCurrentSource(); BoundedSource<T> residual = reader.splitAtFraction(fraction); BoundedSource<T> primary = reader.getCurrentSource();
- The "primary" and "residual" sources, when read, should together cover the same set of records as "initial".
- The current reader should continue to be in a valid state, and continuing to read from it should, together with the records it already read, yield the same records as would have been read by "primary".
- The amount of data read by "primary" should ideally represent approximately the given fraction of the amount of data read by "initial".
This method should return
null
if the split cannot be performed for this fraction while satisfying the semantics above. E.g., a reader that reads a range of offsets in a file should returnnull
if it is already past the position in its range corresponding to the given fraction. In this case, the method MUST have no effect (the reader must behave as if the method hadn't been called at all).Statefulness
Since this method (if successful) affects the reader's source, in subsequent invocations "fraction" should be interpreted relative to the new current source.Thread safety and blocking
This method will be called concurrently to other methods (however there will not be multiple concurrent invocations of this method itself), and it is critical for it to be implemented in a thread-safe way (otherwise data loss is possible).It is also very important that this method always completes quickly. In particular, it should not perform or wait on any blocking operations such as I/O, RPCs etc. Violating this requirement may stall completion of the work item or even cause it to fail.
It is incorrect to make both this method and
Source.Reader.start()
/Source.Reader.advance()
synchronized
, because those methods can perform blocking operations, and then this method would have to wait for those calls to complete.RangeTracker
makes it easy to implement this method safely and correctly.By default, returns null to indicate that splitting is not possible.
- Overrides:
splitAtFraction
in classBoundedSource.BoundedReader<T>
-
-