Class FileBasedSource.FileBasedReader<T>
- java.lang.Object
-
- org.apache.beam.sdk.io.Source.Reader<T>
-
- org.apache.beam.sdk.io.BoundedSource.BoundedReader<T>
-
- org.apache.beam.sdk.io.OffsetBasedSource.OffsetBasedReader<T>
-
- org.apache.beam.sdk.io.FileBasedSource.FileBasedReader<T>
-
- All Implemented Interfaces:
java.lang.AutoCloseable
- Direct Known Subclasses:
BlockBasedSource.BlockBasedReader
,CompressedSource.CompressedReader
- Enclosing class:
- FileBasedSource<T>
public abstract static class FileBasedSource.FileBasedReader<T> extends OffsetBasedSource.OffsetBasedReader<T>
Areader
that implements code common to readers ofFileBasedSource
s.Seekability
This reader uses a
ReadableByteChannel
created for the file represented by the corresponding source to efficiently move to the correct starting position defined in the source. Subclasses of this reader should implementstartReading(java.nio.channels.ReadableByteChannel)
to get access to this channel. If the source corresponding to the reader is for a subrange of a file theReadableByteChannel
provided is guaranteed to be an instance of the typeSeekableByteChannel
, which may be used by subclass to traverse back in the channel to determine the correct starting position.Reading Records
Sequential reading is implemented using
readNextRecord()
.Then
FileBasedReader
implements "reading a range [A, B)" in the following way.OffsetBasedSource.OffsetBasedReader.start()
opens the fileOffsetBasedSource.OffsetBasedReader.start()
seeks theSeekableByteChannel
to A (reading offset ranges for non-seekable files is not supported) and callsstartReading()
OffsetBasedSource.OffsetBasedReader.start()
callsOffsetBasedSource.OffsetBasedReader.advance()
once, which, viareadNextRecord()
, locates the first record which is at a split point AND its offset is at or after A. If this record is at or after B,OffsetBasedSource.OffsetBasedReader.advance()
returns false and reading is finished.- if the previous advance call returned
true
sequential reading starts andadvance()
will be called repeatedly
advance()
callsreadNextRecord()
on the subclass, and stops (returns false) if the new record is at a split point AND the offset of the new record is at or after B.Thread Safety
Since this class implements
Source.Reader
it guarantees thread safety. Abstract methods defined here will not be accessed by more than one thread concurrently.
-
-
Field Summary
-
Fields inherited from class org.apache.beam.sdk.io.BoundedSource.BoundedReader
SPLIT_POINTS_UNKNOWN
-
-
Constructor Summary
Constructors Constructor Description FileBasedReader(FileBasedSource<T> source)
Subclasses should not perform IO operations at the constructor.
-
Method Summary
All Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description protected boolean
advanceImpl()
Advances to the next record and returnstrue
, or returns false if there is no next record.boolean
allowsDynamicSplitting()
Whether this reader should allow dynamic splitting of the offset ranges.void
close()
Closes anyReadableByteChannel
created for the current reader.FileBasedSource<T>
getCurrentSource()
Returns aSource
describing the same input that thisReader
currently reads (including items already read).protected abstract boolean
readNextRecord()
Reads the next record from the channel provided bystartReading(java.nio.channels.ReadableByteChannel)
.protected boolean
startImpl()
Initializes theOffsetBasedSource.OffsetBasedReader
and advances to the first record, returningtrue
if there is a record available to be read.protected abstract void
startReading(java.nio.channels.ReadableByteChannel channel)
Performs any initialization of the subclass ofFileBasedReader
that involves IO operations.-
Methods inherited from class org.apache.beam.sdk.io.OffsetBasedSource.OffsetBasedReader
advance, getCurrentOffset, getFractionConsumed, getSplitPointsConsumed, getSplitPointsRemaining, isAtSplitPoint, isDone, isStarted, splitAtFraction, start
-
Methods inherited from class org.apache.beam.sdk.io.BoundedSource.BoundedReader
getCurrentTimestamp
-
Methods inherited from class org.apache.beam.sdk.io.Source.Reader
getCurrent
-
-
-
-
Constructor Detail
-
FileBasedReader
public FileBasedReader(FileBasedSource<T> source)
Subclasses should not perform IO operations at the constructor. All IO operations should be delayed until thestartReading(java.nio.channels.ReadableByteChannel)
method is invoked.
-
-
Method Detail
-
getCurrentSource
public FileBasedSource<T> getCurrentSource()
Description copied from class:BoundedSource.BoundedReader
Returns aSource
describing the same input that thisReader
currently reads (including items already read).Usage
Reader subclasses can use this method for convenience to access unchanging properties of the source being read. Alternatively, they can cache these properties in the constructor.
The framework will call this method in the course of dynamic work rebalancing, e.g. after a successful
BoundedSource.BoundedReader.splitAtFraction(double)
call.Mutability and thread safety
Remember that
Source
objects must always be immutable. However, the return value of this function may be affected by dynamic work rebalancing, happening asynchronously viaBoundedSource.BoundedReader.splitAtFraction(double)
, meaning it can return a differentSource
object. However, the returned object itself will still itself be immutable. Callers must take care not to rely on properties of the returned source that may be asynchronously changed as a result of this process (e.g. do not cache an end offset when reading a file).Implementation
For convenience, subclasses should usually return the most concrete subclass of
Source
possible. In practice, the implementation of this method should nearly always be one of the following:- Source that inherits from a base class that already implements
BoundedSource.BoundedReader.getCurrentSource()
: delegate to base class. In this case, it is almost always an error for the subclass to maintain its own copy of the source.public FooReader(FooSource<T> source) { super(source); } public FooSource<T> getCurrentSource() { return (FooSource<T>)super.getCurrentSource(); }
- Source that does not support dynamic work rebalancing: return a private final variable.
private final FooSource<T> source; public FooReader(FooSource<T> source) { this.source = source; } public FooSource<T> getCurrentSource() { return source; }
BoundedSource.BoundedReader
that explicitly supports dynamic work rebalancing: maintain a variable pointing to an immutable source object, and protect it with synchronization.private FooSource<T> source; public FooReader(FooSource<T> source) { this.source = source; } public synchronized FooSource<T> getCurrentSource() { return source; } public synchronized FooSource<T> splitAtFraction(double fraction) { ... FooSource<T> primary = ...; FooSource<T> residual = ...; this.source = primary; return residual; }
- Overrides:
getCurrentSource
in classOffsetBasedSource.OffsetBasedReader<T>
- Source that inherits from a base class that already implements
-
startImpl
protected final boolean startImpl() throws java.io.IOException
Description copied from class:OffsetBasedSource.OffsetBasedReader
Initializes theOffsetBasedSource.OffsetBasedReader
and advances to the first record, returningtrue
if there is a record available to be read. This method will be invoked exactly once and may perform expensive setup operations that are needed to initialize the reader.This function is the
OffsetBasedReader
implementation ofSource.Reader.start()
. The key difference is that the implementor can ignore the possibility that it should no longer produce the first record, either because it has exceeded the originalendOffset
assigned to the reader, or because a concurrent call toOffsetBasedSource.OffsetBasedReader.splitAtFraction(double)
has changed the source to shrink the offset range being read.- Specified by:
startImpl
in classOffsetBasedSource.OffsetBasedReader<T>
- Throws:
java.io.IOException
- See Also:
Source.Reader.start()
-
advanceImpl
protected final boolean advanceImpl() throws java.io.IOException
Description copied from class:OffsetBasedSource.OffsetBasedReader
Advances to the next record and returnstrue
, or returns false if there is no next record.This function is the
OffsetBasedReader
implementation ofSource.Reader.advance()
. The key difference is that the implementor can ignore the possibility that it should no longer produce the next record, either because it has exceeded the originalendOffset
assigned to the reader, or because a concurrent call toOffsetBasedSource.OffsetBasedReader.splitAtFraction(double)
has changed the source to shrink the offset range being read.- Specified by:
advanceImpl
in classOffsetBasedSource.OffsetBasedReader<T>
- Throws:
java.io.IOException
- See Also:
Source.Reader.advance()
-
close
public void close() throws java.io.IOException
Closes anyReadableByteChannel
created for the current reader. This implementation is idempotent. Anyclose()
method introduced by a subclass must be idempotent and must call theclose()
method in theFileBasedReader
.- Specified by:
close
in interfacejava.lang.AutoCloseable
- Specified by:
close
in classSource.Reader<T>
- Throws:
java.io.IOException
-
allowsDynamicSplitting
public boolean allowsDynamicSplitting()
Description copied from class:OffsetBasedSource.OffsetBasedReader
Whether this reader should allow dynamic splitting of the offset ranges.True by default. Override this to return false if the reader cannot support dynamic splitting correctly. If this returns false,
OffsetBasedSource.OffsetBasedReader.splitAtFraction(double)
will refuse all split requests.- Overrides:
allowsDynamicSplitting
in classOffsetBasedSource.OffsetBasedReader<T>
-
startReading
protected abstract void startReading(java.nio.channels.ReadableByteChannel channel) throws java.io.IOException
Performs any initialization of the subclass ofFileBasedReader
that involves IO operations. Will only be invoked once and before that invocation the base class will seek the channel to the source's starting offset.Provided
ReadableByteChannel
is for the file represented by the source of this reader. Subclass may use thechannel
to build a higher level IO abstraction, e.g., a BufferedReader or an XML parser.If the corresponding source is for a subrange of a file,
channel
is guaranteed to be an instance of the typeSeekableByteChannel
.After this method is invoked the base class will not be reading data from the channel or adjusting the position of the channel. But the base class is responsible for properly closing the channel.
- Parameters:
channel
- a byte channel representing the file backing the reader.- Throws:
java.io.IOException
-
readNextRecord
protected abstract boolean readNextRecord() throws java.io.IOException
Reads the next record from the channel provided bystartReading(java.nio.channels.ReadableByteChannel)
. MethodsSource.Reader.getCurrent()
,OffsetBasedSource.OffsetBasedReader.getCurrentOffset()
, andOffsetBasedSource.OffsetBasedReader.isAtSplitPoint()
should return the corresponding information about the record read by the last invocation of this method.Note that this method will be called the same way for reading the first record in the source (file or offset range in the file) and for reading subsequent records. It is up to the subclass to do anything special for locating and reading the first record, if necessary.
- Returns:
true
if a record was successfully read,false
if the end of the channel was reached before successfully reading a new record.- Throws:
java.io.IOException
-
-