Class FileIO


  • public class FileIO
    extends java.lang.Object
    General-purpose transforms for working with files: listing files (matching), reading and writing.

    Matching filepatterns

    match() and matchAll() match filepatterns (respectively either a single filepattern or a PCollection thereof) and return the files that match them as PCollections of MatchResult.Metadata. Configuration options for them are in FileIO.MatchConfiguration and include features such as treatment of filepatterns that don't match anything and continuous incremental matching of filepatterns (watching for new files).

    Example: Watching a single filepattern for new files

    This example matches a single filepattern repeatedly every 30 seconds, continuously returns new matched files as an unbounded PCollection<Metadata> and stops if no new files appear for 1 hour.

    
     PCollection<Metadata> matches = p.apply(FileIO.match()
         .filepattern("...")
         .continuously(
           Duration.standardSeconds(30), afterTimeSinceNewOutput(Duration.standardHours(1))));
     

    Example: Matching a PCollection of filepatterns arriving from Kafka

    This example reads filepatterns from Kafka and matches each one as it arrives, producing again an unbounded PCollection<Metadata>, and failing in case the filepattern doesn't match anything.

    
     PCollection<String> filepatterns = p.apply(KafkaIO.read()...);
    
     PCollection<Metadata> matches = filepatterns.apply(FileIO.matchAll()
         .withEmptyMatchTreatment(DISALLOW));
     

    Reading files

    readMatches() converts each result of match() or matchAll() to a FileIO.ReadableFile that is convenient for reading a file's contents, optionally decompressing it.

    Example: Returning filenames and contents of compressed files matching a filepattern

    This example matches a single filepattern and returns KVs of filenames and their contents as String, decompressing each file with GZIP.

    
     PCollection<KV<String, String>> filesAndContents = p
         .apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))
         // withCompression can be omitted - by default compression is detected from the filename.
         .apply(FileIO.readMatches().withCompression(GZIP))
         .apply(MapElements
             // uses imports from TypeDescriptors
             .into(kvs(strings(), strings()))
             .via((ReadableFile f) -> {
               try {
                 return KV.of(
                     f.getMetadata().resourceId().toString(), f.readFullyAsUTF8String());
               } catch (IOException ex) {
                 throw new RuntimeException("Failed to read the file", ex);
               }
             }));
     

    Writing files

    write() and writeDynamic() write elements from a PCollection of a given type to files, using a given FileIO.Sink to write a set of elements to each file. The collection can be bounded or unbounded - in either case, writing happens by default per window and pane, and the amount of data in each window and pane is finite, so a finite number of files ("shards") are written for each window and pane. There are several aspects to this process:

    • How many shards are generated per pane: This is controlled by sharding, using FileIO.Write.withNumShards(int) or FileIO.Write.withSharding(org.apache.beam.sdk.transforms.PTransform<org.apache.beam.sdk.values.PCollection<UserT>, org.apache.beam.sdk.values.PCollectionView<java.lang.Integer>>). The default is runner-specific, so the number of shards will vary based on runner behavior, though at least 1 shard will always be produced for every non-empty pane. Note that setting a fixed number of shards can hurt performance: it adds an additional GroupByKey to the pipeline. However, it is required to set it when writing an unbounded PCollection due to BEAM-1438 and similar behavior in other runners.
    • How the shards are named: This is controlled by a FileIO.Write.FileNaming: filenames can depend on a variety of inputs, e.g. the window, the pane, total number of shards, the current file's shard index, and compression. Controlling the file naming is described in the section File naming below.
    • Which elements go into which shard: Elements within a pane get distributed into different shards created for that pane arbitrarily, though FileIO.Write attempts to make shards approximately evenly sized. For more control over which elements go into which files, consider using dynamic destinations (see below).
    • How a given set of elements is written to a shard: This is controlled by the FileIO.Sink, e.g. AvroIO.sink(java.lang.Class<ElementT>) will generate Avro files. The FileIO.Sink controls the format of a single file: how to open a file, how to write each element to it, and how to close the file - but it does not control the set of files or which elements go where. Elements are written to a shard in an arbitrary order. FileIO.Write can additionally compress the generated files using FileIO.Write.withCompression(org.apache.beam.sdk.io.Compression).
    • How all of the above can be element-dependent: This is controlled by dynamic destinations. It is possible to have different groups of elements use different policies for naming files and for configuring the FileIO.Sink. See "dynamic destinations" below.

    File naming

    The names of generated files are produced by a FileIO.Write.FileNaming. The default naming strategy is to name files in the format: $prefix-$start-$end-$pane-$shard-of-$numShards$suffix$compressionSuffix, where:

    • $prefix is set by FileIO.Write.withPrefix(java.lang.String), the default is "output".
    • $start and $end are boundaries of the window of data being written, formatted in ISO 8601 format (YYYY-mm-ddTHH:MM:SSZZZ). The window is omitted in case this is the global window.
    • $pane is the index of the pane within the window. The pane is omitted in case it is known to be the only pane for this window.
    • $shard is the index of the current shard being written, out of the $numShards total shards written for the current pane. Both are formatted using 5 digits (or more if necessary according to $numShards) and zero-padded.
    • $suffix is set by FileIO.Write.withSuffix(java.lang.String), the default is empty.
    • $compressionSuffix is based on the default extension for the chosen compression type.

    For example: data-2017-12-01T19:00:00Z-2017-12-01T20:00:00Z-2-00010-of-00050.txt.gz

    Alternatively, one can specify a custom naming strategy using FileIO.Write.withNaming(Write.FileNaming).

    If FileIO.Write.to(java.lang.String) is specified, then the filenames produced by the FileIO.Write.FileNaming are resolved relative to that directory.

    When using dynamic destinations via writeDynamic() (see below), specifying a custom naming strategy is required, using FileIO.Write.withNaming(SerializableFunction) or FileIO.Write.withNaming(Contextful). In those, pass a function that creates a FileIO.Write.FileNaming for the requested group ("destination"). You can either implement a custom FileIO.Write.FileNaming, or use FileIO.Write.defaultNaming(java.lang.String, java.lang.String) to configure the default naming strategy with a prefix and suffix as per above.

    Dynamic destinations

    If the elements in the input collection can be partitioned into groups that should be treated differently, FileIO.Write supports different treatment per group ("destination"). It can use different file naming strategies for different groups, and can differently configure the FileIO.Sink, e.g. write different elements to Avro files in different directories with different schemas.

    This feature is supported by writeDynamic(). Use FileIO.Write.by(org.apache.beam.sdk.transforms.SerializableFunction<UserT, DestinationT>) to specify how to partition the elements into groups ("destinations"). Then elements will be grouped by destination, and FileIO.Write.withNaming(Contextful) and FileIO.Write.via(Contextful) will be applied separately within each group, i.e. different groups will be written using the file naming strategies returned by FileIO.Write.withNaming(Contextful) and using sinks returned by FileIO.Write.via(Contextful) for the respective destinations. Note that currently sharding can not be destination-dependent: every window/pane for every destination will use the same number of shards specified via FileIO.Write.withNumShards(int) or FileIO.Write.withSharding(org.apache.beam.sdk.transforms.PTransform<org.apache.beam.sdk.values.PCollection<UserT>, org.apache.beam.sdk.values.PCollectionView<java.lang.Integer>>).

    Writing custom types to sinks

    Normally, when writing a collection of a custom type using a FileIO.Sink that takes a different type (for example, writing a PCollection<Event> to a text-based Sink<String>), one can simply apply a ParDo or MapElements to convert the custom type to the sink's output type.

    However, when using dynamic destinations, in many such cases the destination needs to be extracted from the original type, so such a conversion is not possible. For example, one might write events of a custom class Event to a text sink, using the event's "type" as a destination. In that case, specify an output function in FileIO.Write.via(Contextful, Contextful) or FileIO.Write.via(Contextful, Sink).

    Example: Writing CSV files

    
     class CSVSink implements FileIO.Sink<List<String>> {
       private String header;
       private PrintWriter writer;
    
       public CSVSink(List<String> colNames) {
         this.header = Joiner.on(",").join(colNames);
       }
    
       public void open(WritableByteChannel channel) throws IOException {
         writer = new PrintWriter(Channels.newOutputStream(channel));
         writer.println(header);
       }
    
       public void write(List<String> element) throws IOException {
         writer.println(Joiner.on(",").join(element));
       }
    
       public void flush() throws IOException {
         writer.flush();
       }
     }
    
     PCollection<BankTransaction> transactions = ...;
     // Convert transactions to strings before writing them to the CSV sink.
     transactions.apply(MapElements
             .into(TypeDescriptors.lists(TypeDescriptors.strings()))
             .via(tx -> Arrays.asList(tx.getUser(), tx.getAmount())))
         .apply(FileIO.<List<String>>write()
             .via(new CSVSink(Arrays.asList("user", "amount")))
             .to(".../path/to/")
             .withPrefix("transactions")
             .withSuffix(".csv"));
     

    Example: Writing CSV files to different directories and with different headers

    
     enum TransactionType {
       DEPOSIT,
       WITHDRAWAL,
       TRANSFER,
       ...
    
       List<String> getFieldNames();
       List<String> getAllFields(BankTransaction tx);
     }
    
     PCollection<BankTransaction> transactions = ...;
     transactions.apply(FileIO.<TransactionType, Transaction>writeDynamic()
         .by(Transaction::getTypeName)
         .via(tx -> tx.getTypeName().toFields(tx),  // Convert the data to be written to CSVSink
              type -> new CSVSink(type.getFieldNames()))
         .to(".../path/to/")
         .withNaming(type -> defaultNaming(type + "-transactions", ".csv"));