Class TextIO


  • public class TextIO
    extends java.lang.Object
    PTransforms for reading and writing text files.

    Reading text files

    To read a PCollection from one or more text files, use TextIO.read() to instantiate a transform and use TextIO.Read.from(String) to specify the path of the file(s) to be read. Alternatively, if the filenames to be read are themselves in a PCollection you can use FileIO to match them and readFiles() to read them.

    read() returns a PCollection of Strings, each corresponding to one line of an input UTF-8 text file (split into lines delimited by '\n', '\r', or '\r\n', or specified delimiter see TextIO.Read.withDelimiter(byte[])).

    Filepattern expansion and watching

    By default, the filepatterns are expanded only once. TextIO.Read.watchForNewFiles(org.joda.time.Duration, org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition<java.lang.String, ?>, boolean) or the combination of FileIO.Match.continuously(Duration, TerminationCondition) and readFiles() allow streaming of new files matching the filepattern(s).

    By default, read() prohibits filepatterns that match no files, and readFiles() allows them in case the filepattern contains a glob wildcard character. Use TextIO.Read.withEmptyMatchTreatment(org.apache.beam.sdk.io.fs.EmptyMatchTreatment) or FileIO.Match.withEmptyMatchTreatment(EmptyMatchTreatment) plus readFiles() to configure this behavior.

    Example 1: reading a file or filepattern.

    
     Pipeline p = ...;
    
     // A simple Read of a local file (only runs locally):
     PCollection<String> lines = p.apply(TextIO.read().from("/local/path/to/file.txt"));
     

    Example 2: reading a PCollection of filenames.

    
     Pipeline p = ...;
    
     // E.g. the filenames might be computed from other data in the pipeline, or
     // read from a data source.
     PCollection<String> filenames = ...;
    
     // Read all files in the collection.
     PCollection<String> lines =
         filenames
             .apply(FileIO.matchAll())
             .apply(FileIO.readMatches())
             .apply(TextIO.readFiles());
     

    Example 3: streaming new files matching a filepattern.

    
     Pipeline p = ...;
    
     PCollection<String> lines = p.apply(TextIO.read()
         .from("/local/path/to/files/*")
         .watchForNewFiles(
           // Check for new files every minute
           Duration.standardMinutes(1),
           // Stop watching the filepattern if no new files appear within an hour
           afterTimeSinceNewOutput(Duration.standardHours(1))));
     

    Reading a very large number of files

    If it is known that the filepattern will match a very large number of files (e.g. tens of thousands or more), use TextIO.Read.withHintMatchesManyFiles() for better performance and scalability. Note that it may decrease performance if the filepattern matches only a small number of files.

    Writing text files

    To write a PCollection to one or more text files, use TextIO.write(), using TextIO.Write.to(String) to specify the output prefix of the files to write.

    For example:

    
     // A simple Write to a local file (only runs locally):
     PCollection<String> lines = ...;
     lines.apply(TextIO.write().to("/path/to/file.txt"));
    
     // Same as above, only with Gzip compression:
     PCollection<String> lines = ...;
     lines.apply(TextIO.write().to("/path/to/file.txt"))
          .withSuffix(".txt")
          .withCompression(Compression.GZIP));
     

    Any existing files with the same names as generated output files will be overwritten.

    If you want better control over how filenames are generated than the default policy allows, a custom FileBasedSink.FilenamePolicy can also be set using TextIO.Write.to(FilenamePolicy).

    Advanced features

    TextIO supports all features of FileIO.write() and FileIO.writeDynamic(), such as writing windowed/unbounded data, writing data to multiple destinations, and so on, by providing a TextIO.Sink via sink().

    For example, to write events of different type to different filenames:

    
     PCollection<Event> events = ...;
     events.apply(FileIO.<EventType, Event>writeDynamic()
           .by(Event::getTypeName)
           .via(TextIO.sink(), Event::toString)
           .to(type -> nameFilesUsingWindowPaneAndShard(".../events/" + type + "/data", ".txt")));
     

    For backwards compatibility, TextIO also supports the legacy FileBasedSink.DynamicDestinations interface for advanced features via TextIO.Write.to(DynamicDestinations).