Class AvroIO


  • public class AvroIO
    extends java.lang.Object
    PTransforms for reading and writing Avro files.

    Reading Avro files

    To read a PCollection from one or more Avro files with the same schema known at pipeline construction time, use read(java.lang.Class<T>), using AvroIO.Read.from(org.apache.beam.sdk.options.ValueProvider<java.lang.String>) to specify the filename or filepattern to read from. If the filepatterns to be read are themselves in a PCollection you can use FileIO to match them and readFiles(java.lang.Class<T>) to read them. If the schema is unknown at pipeline construction time, use parseGenericRecords(org.apache.beam.sdk.transforms.SerializableFunction<org.apache.avro.generic.GenericRecord, T>) or parseFilesGenericRecords(org.apache.beam.sdk.transforms.SerializableFunction<org.apache.avro.generic.GenericRecord, T>).

    Many configuration options below apply to several or all of these transforms.

    See FileSystems for information on supported file systems and filepatterns.

    Filepattern expansion and watching

    By default, the filepatterns are expanded only once. AvroIO.Read.watchForNewFiles(org.joda.time.Duration, org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition<java.lang.String, ?>, boolean) or the combination of FileIO.Match.continuously(Duration, TerminationCondition) and readFiles(Class) allow streaming of new files matching the filepattern(s).

    By default, read(java.lang.Class<T>) prohibits filepatterns that match no files, and readFiles(Class) allows them in case the filepattern contains a glob wildcard character. Use AvroIO.Read.withEmptyMatchTreatment(org.apache.beam.sdk.io.fs.EmptyMatchTreatment) or FileIO.Match.withEmptyMatchTreatment(EmptyMatchTreatment) plus readFiles(Class) to configure this behavior.

    Reading records of a known schema

    To read specific records, such as Avro-generated classes, use read(Class). To read GenericRecords, use readGenericRecords(Schema) which takes a Schema object, or readGenericRecords(String) which takes an Avro schema in a JSON-encoded string form. An exception will be thrown if a record doesn't match the specified schema. Likewise, to read a PCollection of filepatterns, apply FileIO matching plus readFilesGenericRecords(org.apache.avro.Schema).

    For example:

    
     Pipeline p = ...;
    
     // Read Avro-generated classes from files on GCS
     PCollection<AvroAutoGenClass> records =
         p.apply(AvroIO.read(AvroAutoGenClass.class).from("gs://my_bucket/path/to/records-*.avro"));
    
     // Read GenericRecord's of the given schema from files on GCS
     Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
     PCollection<GenericRecord> records =
         p.apply(AvroIO.readGenericRecords(schema)
                    .from("gs://my_bucket/path/to/records-*.avro"));
     

    Reading records of an unknown schema

    To read records from files whose schema is unknown at pipeline construction time or differs between files, use parseGenericRecords(org.apache.beam.sdk.transforms.SerializableFunction<org.apache.avro.generic.GenericRecord, T>) - in this case, you will need to specify a parsing function for converting each GenericRecord into a value of your custom type. Likewise, to read a PCollection of filepatterns with unknown schema, use FileIO matching plus parseFilesGenericRecords(SerializableFunction).

    For example:

    
     Pipeline p = ...;
    
     PCollection<Foo> records =
         p.apply(AvroIO.parseGenericRecords(new SerializableFunction<GenericRecord, Foo>() {
           public Foo apply(GenericRecord record) {
             // If needed, access the schema of the record using record.getSchema()
             return ...;
           }
         }));
     

    Reading from a PCollection of filepatterns

    
     Pipeline p = ...;
    
     PCollection<String> filepatterns = p.apply(...);
     PCollection<AvroAutoGenClass> records =
         filepatterns.apply(AvroIO.readAll(AvroAutoGenClass.class));
     PCollection<AvroAutoGenClass> records =
         filepatterns
             .apply(FileIO.matchAll())
             .apply(FileIO.readMatches())
             .apply(AvroIO.readFiles(AvroAutoGenClass.class));
     PCollection<GenericRecord> genericRecords =
         filepatterns.apply(AvroIO.readGenericRecords(schema));
     PCollection<Foo> records =
         filepatterns
             .apply(FileIO.matchAll())
             .apply(FileIO.readMatches())
             .apply(AvroIO.parseFilesGenericRecords(new SerializableFunction...);
     

    Streaming new files matching a filepattern

    
     Pipeline p = ...;
    
     PCollection<AvroAutoGenClass> lines = p.apply(AvroIO
         .read(AvroAutoGenClass.class)
         .from("gs://my_bucket/path/to/records-*.avro")
         .watchForNewFiles(
           // Check for new files every minute
           Duration.standardMinutes(1),
           // Stop watching the filepattern if no new files appear within an hour
           afterTimeSinceNewOutput(Duration.standardHours(1))));
     

    Reading a very large number of files

    If it is known that the filepattern will match a very large number of files (e.g. tens of thousands or more), use AvroIO.Read.withHintMatchesManyFiles() for better performance and scalability. Note that it may decrease performance if the filepattern matches only a small number of files.

    Inferring Beam schemas from Avro files

    If you want to use SQL or schema based operations on an Avro-based PCollection, you must configure the read transform to infer the Beam schema and automatically setup the Beam related coders by doing:

    
     PCollection<AvroAutoGenClass> records =
         p.apply(AvroIO.read(...).from(...).withBeamSchemas(true));
     

    Inferring Beam schemas from Avro PCollections

    If you created an Avro-based PCollection by other means e.g. reading records from Kafka or as the output of another PTransform, you may be interested on making your PCollection schema-aware so you can use the Schema-based APIs or Beam's SqlTransform.

    If you are using Avro specific records (generated classes from an Avro schema), you can register a schema provider for the specific Avro class to make any PCollection of these objects schema-aware.

    
     pipeline.getSchemaRegistry().registerSchemaProvider(AvroAutoGenClass.class, AvroAutoGenClass.getClassSchema());
     
    You can also manually set an Avro-backed Schema coder for a PCollection using AvroUtils.schemaCoder(Class, Schema) to make it schema-aware.
    
     PCollection<AvroAutoGenClass> records = ...
     AvroCoder<AvroAutoGenClass> coder = (AvroCoder<AvroAutoGenClass>) users.getCoder();
     records.setCoder(AvroUtils.schemaCoder(coder.getType(), coder.getSchema()));
     

    If you are using GenericRecords you may need to set a specific Beam schema coder for each PCollection to match their internal Avro schema.

    
     org.apache.avro.Schema avroSchema = ...
     PCollection<GenericRecord> records = ...
     records.setCoder(AvroUtils.schemaCoder(avroSchema));
     

    Writing Avro files

    To write a PCollection to one or more Avro files, use AvroIO.Write, using AvroIO.write().to(String) to specify the output filename prefix. The default DefaultFilenamePolicy will use this prefix, in conjunction with a ShardNameTemplate (set via AvroIO.Write.withShardNameTemplate(String)) and optional filename suffix (set via AvroIO.Write.withSuffix(String), to generate output filenames in a sharded way. You can override this default write filename policy using AvroIO.Write.to(FileBasedSink.FilenamePolicy) to specify a custom file naming policy.

    By default, AvroIO.Write produces output files that are compressed using the CodecFactory.snappyCodec(). This default can be changed or overridden using AvroIO.Write.withCodec(org.apache.avro.file.CodecFactory).

    Writing specific or generic records

    To write specific records, such as Avro-generated classes, use write(Class). To write GenericRecords, use either writeGenericRecords(Schema) which takes a Schema object, or writeGenericRecords(String) which takes a schema in a JSON-encoded string form. An exception will be thrown if a record doesn't match the specified schema.

    For example:

    
     // A simple Write to a local file (only runs locally):
     PCollection<AvroAutoGenClass> records = ...;
     records.apply(AvroIO.write(AvroAutoGenClass.class).to("/path/to/file.avro"));
    
     // A Write to a sharded GCS file (runs locally and using remote execution):
     Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
     PCollection<GenericRecord> records = ...;
     records.apply("WriteToAvro", AvroIO.writeGenericRecords(schema)
         .to("gs://my_bucket/path/to/numbers")
         .withSuffix(".avro"));
     

    Writing windowed or unbounded data

    By default, all input is put into the global window before writing. If per-window writes are desired - for example, when using a streaming runner - AvroIO.Write.withWindowedWrites() will cause windowing and triggering to be preserved. When producing windowed writes with a streaming runner that supports triggers, the number of output shards must be set explicitly using AvroIO.Write.withNumShards(int); some runners may set this for you to a runner-chosen value, so you may need not set it yourself. A FileBasedSink.FilenamePolicy must be set, and unique windows and triggers must produce unique filenames.

    Writing data to multiple destinations

    The following shows a more-complex example of AvroIO.Write usage, generating dynamic file destinations as well as a dynamic Avro schema per file. In this example, a PCollection of user events (e.g. actions on a website) is written out to Avro files. Each event contains the user id as an integer field. We want events for each user to go into a specific directory for that user, and each user's data should be written with a specific schema for that user; a side input is used, so the schema can be calculated in a different stage.

    
     // This is the user class that controls dynamic destinations for this avro write. The input to
     // AvroIO.Write will be UserEvent, and we will be writing GenericRecords to the file (in order
     // to have dynamic schemas). Everything is per userid, so we define a dynamic destination type
     // of Integer.
     class UserDynamicAvroDestinations
         extends DynamicAvroDestinations<UserEvent, Integer, GenericRecord> {
       private final PCollectionView<Map<Integer, String>> userToSchemaMap;
       public UserDynamicAvroDestinations( PCollectionView<Map<Integer, String>> userToSchemaMap) {
         this.userToSchemaMap = userToSchemaMap;
       }
       public GenericRecord formatRecord(UserEvent record) {
         return formatUserRecord(record, getSchema(record.getUserId()));
       }
       public Schema getSchema(Integer userId) {
         return new Schema.Parser().parse(sideInput(userToSchemaMap).get(userId));
       }
       public Integer getDestination(UserEvent record) {
         return record.getUserId();
       }
       public Integer getDefaultDestination() {
         return 0;
       }
       public FilenamePolicy getFilenamePolicy(Integer userId) {
         return DefaultFilenamePolicy.fromParams(new Params().withBaseFilename(baseDir + "/user-"
         + userId + "/events"));
       }
       public List<PCollectionView<?>> getSideInputs() {
         return ImmutableList.<PCollectionView<?>>of(userToSchemaMap);
       }
     }
     PCollection<UserEvents> events = ...;
     PCollectionView<Map<Integer, String>> userToSchemaMap = events.apply(
         "ComputePerUserSchemas", new ComputePerUserSchemas());
     events.apply("WriteAvros", AvroIO.<Integer>writeCustomTypeToGenericRecords()
         .to(new UserDynamicAvroDestinations(userToSchemaMap)));