Class FileIO
- java.lang.Object
-
- org.apache.beam.sdk.io.FileIO
-
public class FileIO extends java.lang.Object
General-purpose transforms for working with files: listing files (matching), reading and writing.Matching filepatterns
match()
andmatchAll()
match filepatterns (respectively either a single filepattern or aPCollection
thereof) and return the files that match them asPCollections
ofMatchResult.Metadata
. Configuration options for them are inFileIO.MatchConfiguration
and include features such as treatment of filepatterns that don't match anything and continuous incremental matching of filepatterns (watching for new files).Example: Watching a single filepattern for new files
This example matches a single filepattern repeatedly every 30 seconds, continuously returns new matched files as an unbounded
PCollection<Metadata>
and stops if no new files appear for 1 hour.PCollection<Metadata> matches = p.apply(FileIO.match() .filepattern("...") .continuously( Duration.standardSeconds(30), afterTimeSinceNewOutput(Duration.standardHours(1))));
Example: Matching a PCollection of filepatterns arriving from Kafka
This example reads filepatterns from Kafka and matches each one as it arrives, producing again an unbounded
PCollection<Metadata>
, and failing in case the filepattern doesn't match anything.PCollection<String> filepatterns = p.apply(KafkaIO.read()...); PCollection<Metadata> matches = filepatterns.apply(FileIO.matchAll() .withEmptyMatchTreatment(DISALLOW));
Reading files
readMatches()
converts each result ofmatch()
ormatchAll()
to aFileIO.ReadableFile
that is convenient for reading a file's contents, optionally decompressing it.Example: Returning filenames and contents of compressed files matching a filepattern
This example matches a single filepattern and returns
KVs
of filenames and their contents asString
, decompressing each file with GZIP.PCollection<KV<String, String>> filesAndContents = p .apply(FileIO.match().filepattern("hdfs://path/to/*.gz")) // withCompression can be omitted - by default compression is detected from the filename. .apply(FileIO.readMatches().withCompression(GZIP)) .apply(MapElements // uses imports from TypeDescriptors .into(kvs(strings(), strings())) .via((ReadableFile f) -> { try { return KV.of( f.getMetadata().resourceId().toString(), f.readFullyAsUTF8String()); } catch (IOException ex) { throw new RuntimeException("Failed to read the file", ex); } }));
Writing files
write()
andwriteDynamic()
write elements from aPCollection
of a given type to files, using a givenFileIO.Sink
to write a set of elements to each file. The collection can be bounded or unbounded - in either case, writing happens by default per window and pane, and the amount of data in each window and pane is finite, so a finite number of files ("shards") are written for each window and pane. There are several aspects to this process:- How many shards are generated per pane: This is controlled by sharding, using
FileIO.Write.withNumShards(int)
orFileIO.Write.withSharding(org.apache.beam.sdk.transforms.PTransform<org.apache.beam.sdk.values.PCollection<UserT>, org.apache.beam.sdk.values.PCollectionView<java.lang.Integer>>)
. The default is runner-specific, so the number of shards will vary based on runner behavior, though at least 1 shard will always be produced for every non-empty pane. Note that setting a fixed number of shards can hurt performance: it adds an additionalGroupByKey
to the pipeline. However, it is required to set it when writing an unboundedPCollection
due to BEAM-1438 and similar behavior in other runners. - How the shards are named: This is controlled by a
FileIO.Write.FileNaming
: filenames can depend on a variety of inputs, e.g. the window, the pane, total number of shards, the current file's shard index, and compression. Controlling the file naming is described in the section File naming below. - Which elements go into which shard: Elements within a pane get distributed into
different shards created for that pane arbitrarily, though
FileIO.Write
attempts to make shards approximately evenly sized. For more control over which elements go into which files, consider using dynamic destinations (see below). - How a given set of elements is written to a shard: This is controlled by the
FileIO.Sink
, e.g.AvroIO.sink(java.lang.Class<ElementT>)
will generate Avro files. TheFileIO.Sink
controls the format of a single file: how to open a file, how to write each element to it, and how to close the file - but it does not control the set of files or which elements go where. Elements are written to a shard in an arbitrary order.FileIO.Write
can additionally compress the generated files usingFileIO.Write.withCompression(org.apache.beam.sdk.io.Compression)
. - How all of the above can be element-dependent: This is controlled by dynamic
destinations. It is possible to have different groups of elements use different
policies for naming files and for configuring the
FileIO.Sink
. See "dynamic destinations" below.
File naming
The names of generated files are produced by a
FileIO.Write.FileNaming
. The default naming strategy is to name files in the format:$prefix-$start-$end-$pane-$shard-of-$numShards$suffix$compressionSuffix
, where:- $prefix is set by
FileIO.Write.withPrefix(java.lang.String)
, the default is "output". - $start and $end are boundaries of the window of data being written, formatted in ISO 8601 format (YYYY-mm-ddTHH:MM:SSZZZ). The window is omitted in case this is the global window.
- $pane is the index of the pane within the window. The pane is omitted in case it is known to be the only pane for this window.
- $shard is the index of the current shard being written, out of the $numShards total shards written for the current pane. Both are formatted using 5 digits (or more if necessary according to $numShards) and zero-padded.
- $suffix is set by
FileIO.Write.withSuffix(java.lang.String)
, the default is empty. - $compressionSuffix is based on the default extension for the chosen
compression type
.
For example:
data-2017-12-01T19:00:00Z-2017-12-01T20:00:00Z-2-00010-of-00050.txt.gz
Alternatively, one can specify a custom naming strategy using
FileIO.Write.withNaming(Write.FileNaming)
.If
FileIO.Write.to(java.lang.String)
is specified, then the filenames produced by theFileIO.Write.FileNaming
are resolved relative to that directory.When using dynamic destinations via
writeDynamic()
(see below), specifying a custom naming strategy is required, usingFileIO.Write.withNaming(SerializableFunction)
orFileIO.Write.withNaming(Contextful)
. In those, pass a function that creates aFileIO.Write.FileNaming
for the requested group ("destination"). You can either implement a customFileIO.Write.FileNaming
, or useFileIO.Write.defaultNaming(java.lang.String, java.lang.String)
to configure the default naming strategy with a prefix and suffix as per above.Dynamic destinations
If the elements in the input collection can be partitioned into groups that should be treated differently,
FileIO.Write
supports different treatment per group ("destination"). It can use different file naming strategies for different groups, and can differently configure theFileIO.Sink
, e.g. write different elements to Avro files in different directories with different schemas.This feature is supported by
writeDynamic()
. UseFileIO.Write.by(org.apache.beam.sdk.transforms.SerializableFunction<UserT, DestinationT>)
to specify how to partition the elements into groups ("destinations"). Then elements will be grouped by destination, andFileIO.Write.withNaming(Contextful)
andFileIO.Write.via(Contextful)
will be applied separately within each group, i.e. different groups will be written using the file naming strategies returned byFileIO.Write.withNaming(Contextful)
and using sinks returned byFileIO.Write.via(Contextful)
for the respective destinations. Note that currently sharding can not be destination-dependent: every window/pane for every destination will use the same number of shards specified viaFileIO.Write.withNumShards(int)
orFileIO.Write.withSharding(org.apache.beam.sdk.transforms.PTransform<org.apache.beam.sdk.values.PCollection<UserT>, org.apache.beam.sdk.values.PCollectionView<java.lang.Integer>>)
.Writing custom types to sinks
Normally, when writing a collection of a custom type using a
FileIO.Sink
that takes a different type (for example, writing aPCollection<Event>
to a text-basedSink<String>
), one can simply apply aParDo
orMapElements
to convert the custom type to the sink's output type.However, when using dynamic destinations, in many such cases the destination needs to be extracted from the original type, so such a conversion is not possible. For example, one might write events of a custom class
Event
to a text sink, using the event's "type" as a destination. In that case, specify an output function inFileIO.Write.via(Contextful, Contextful)
orFileIO.Write.via(Contextful, Sink)
.Example: Writing CSV files
class CSVSink implements FileIO.Sink<List<String>> { private String header; private PrintWriter writer; public CSVSink(List<String> colNames) { this.header = Joiner.on(",").join(colNames); } public void open(WritableByteChannel channel) throws IOException { writer = new PrintWriter(Channels.newOutputStream(channel)); writer.println(header); } public void write(List<String> element) throws IOException { writer.println(Joiner.on(",").join(element)); } public void flush() throws IOException { writer.flush(); } } PCollection<BankTransaction> transactions = ...; // Convert transactions to strings before writing them to the CSV sink. transactions.apply(MapElements .into(TypeDescriptors.lists(TypeDescriptors.strings())) .via(tx -> Arrays.asList(tx.getUser(), tx.getAmount()))) .apply(FileIO.<List<String>>write() .via(new CSVSink(Arrays.asList("user", "amount"))) .to(".../path/to/") .withPrefix("transactions") .withSuffix(".csv"));
Example: Writing CSV files to different directories and with different headers
enum TransactionType { DEPOSIT, WITHDRAWAL, TRANSFER, ... List<String> getFieldNames(); List<String> getAllFields(BankTransaction tx); } PCollection<BankTransaction> transactions = ...; transactions.apply(FileIO.<TransactionType, Transaction>writeDynamic() .by(Transaction::getTypeName) .via(tx -> tx.getTypeName().toFields(tx), // Convert the data to be written to CSVSink type -> new CSVSink(type.getFieldNames())) .to(".../path/to/") .withNaming(type -> defaultNaming(type + "-transactions", ".csv"));
- How many shards are generated per pane: This is controlled by sharding, using
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
FileIO.Match
Implementation ofmatch()
.static class
FileIO.MatchAll
Implementation ofmatchAll()
.static class
FileIO.MatchConfiguration
Describes configuration for matching filepatterns, such asEmptyMatchTreatment
and continuous watching for matching files.static class
FileIO.ReadableFile
A utility class for accessing a potentially compressed file.static class
FileIO.ReadMatches
Implementation ofreadMatches()
.static interface
FileIO.Sink<ElementT>
Specifies how to write elements to individual files inwrite()
andwriteDynamic()
.static class
FileIO.Write<DestinationT,UserT>
Implementation ofwrite()
andwriteDynamic()
.
-
Constructor Summary
Constructors Constructor Description FileIO()
-
Method Summary
All Methods Static Methods Concrete Methods Modifier and Type Method Description static FileIO.Match
match()
Matches a filepattern usingFileSystems.match(java.util.List<java.lang.String>)
and produces a collection of matched resources (both files and directories) asMatchResult.Metadata
.static FileIO.MatchAll
matchAll()
Likematch()
, but matches each filepattern in a collection of filepatterns.static FileIO.ReadMatches
readMatches()
Converts each result ofmatch()
ormatchAll()
to aFileIO.ReadableFile
which can be used to read the contents of each file, optionally decompressing it.static <InputT> FileIO.Write<java.lang.Void,InputT>
write()
Writes elements to files using aFileIO.Sink
.static <DestT,InputT>
FileIO.Write<DestT,InputT>writeDynamic()
Writes elements to files using aFileIO.Sink
and grouping the elements using "dynamic destinations".
-
-
-
Method Detail
-
match
public static FileIO.Match match()
Matches a filepattern usingFileSystems.match(java.util.List<java.lang.String>)
and produces a collection of matched resources (both files and directories) asMatchResult.Metadata
.By default, matches the filepattern once and produces a bounded
PCollection
. To continuously watch the filepattern for new matches, useFileIO.MatchAll.continuously(Duration, TerminationCondition)
- this will produce an unboundedPCollection
.By default, a filepattern matching no resources is treated according to
EmptyMatchTreatment.DISALLOW
. To configure this behavior, useFileIO.Match.withEmptyMatchTreatment(org.apache.beam.sdk.io.fs.EmptyMatchTreatment)
.Returned
MatchResult.Metadata
are deduplicated by filename. For example, if this transform observes a file with the same name several times with different metadata (e.g. because the file is growing), it will emit the metadata the first time this file is observed, and will ignore future changes to this file.
-
matchAll
public static FileIO.MatchAll matchAll()
Likematch()
, but matches each filepattern in a collection of filepatterns.Resources are not deduplicated between filepatterns, i.e. if the same resource matches multiple filepatterns, it will be produced multiple times.
By default, a filepattern matching no resources is treated according to
EmptyMatchTreatment.ALLOW_IF_WILDCARD
. To configure this behavior, useFileIO.MatchAll.withEmptyMatchTreatment(org.apache.beam.sdk.io.fs.EmptyMatchTreatment)
.
-
readMatches
public static FileIO.ReadMatches readMatches()
Converts each result ofmatch()
ormatchAll()
to aFileIO.ReadableFile
which can be used to read the contents of each file, optionally decompressing it.
-
write
public static <InputT> FileIO.Write<java.lang.Void,InputT> write()
Writes elements to files using aFileIO.Sink
. See class-level documentation.
-
writeDynamic
public static <DestT,InputT> FileIO.Write<DestT,InputT> writeDynamic()
Writes elements to files using aFileIO.Sink
and grouping the elements using "dynamic destinations". See class-level documentation.
-
-