Package org.apache.beam.sdk.testing
Class SourceTestUtils
- java.lang.Object
-
- org.apache.beam.sdk.testing.SourceTestUtils
-
public class SourceTestUtils extends java.lang.Object
Helper functions and test harnesses for checking correctness ofSource
implementations.Contains a few lightweight utilities (e.g. reading items from a source or a reader, such as
readFromSource(org.apache.beam.sdk.io.BoundedSource<T>, org.apache.beam.sdk.options.PipelineOptions)
andreadFromUnstartedReader(org.apache.beam.sdk.io.Source.Reader<T>)
), as well as heavyweight property testing and stress testing harnesses that help getting a large amount of test coverage with few code. Most notable ones are:assertSourcesEqualReferenceSource(org.apache.beam.sdk.io.BoundedSource<T>, java.util.List<? extends org.apache.beam.sdk.io.BoundedSource<T>>, org.apache.beam.sdk.options.PipelineOptions)
helps testing that the data read by the union of sources produced byBoundedSource.split(long, org.apache.beam.sdk.options.PipelineOptions)
is the same as data read by the original source.- If your source implements dynamic work rebalancing, use the
assertSplitAtFraction
family of functions - they test behavior ofBoundedSource.BoundedReader.splitAtFraction(double)
, in particular, that various consistency properties are respected and the total set of data read by the source is preserved when splits happen. UseassertSplitAtFractionBehavior(org.apache.beam.sdk.io.BoundedSource<T>, int, double, org.apache.beam.sdk.testing.SourceTestUtils.ExpectedSplitOutcome, org.apache.beam.sdk.options.PipelineOptions)
to test individual cases ofsplitAtFraction
and useassertSplitAtFractionExhaustive(org.apache.beam.sdk.io.BoundedSource<T>, org.apache.beam.sdk.options.PipelineOptions)
as a heavy-weight stress test including concurrency. We strongly recommend to use both.
AvroSource
orTextSource
.Like
PAssert
, requires JUnit and Hamcrest to be present in the classpath.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
SourceTestUtils.ExpectedSplitOutcome
Expected outcome ofBoundedSource.BoundedReader.splitAtFraction(double)
.
-
Constructor Summary
Constructors Constructor Description SourceTestUtils()
-
Method Summary
All Methods Static Methods Concrete Methods Modifier and Type Method Description static <T> void
assertSourcesEqualReferenceSource(BoundedSource<T> referenceSource, java.util.List<? extends BoundedSource<T>> sources, PipelineOptions options)
Given a referenceSource
and a list ofSource
s, assert that the union of the records read from the list of sources is equal to the records read from the reference source.static <T> org.apache.beam.sdk.testing.SourceTestUtils.SplitAtFractionResult
assertSplitAtFractionBehavior(BoundedSource<T> source, int numItemsToReadBeforeSplit, double splitFraction, SourceTestUtils.ExpectedSplitOutcome expectedOutcome, PipelineOptions options)
Asserts that thesource
's reader either fails tosplitAtFraction(fraction)
after readingnumItemsToReadBeforeSplit
items, or succeeds in a way that is consistent according toassertSplitAtFractionSucceedsAndConsistent(org.apache.beam.sdk.io.BoundedSource<T>, int, double, org.apache.beam.sdk.options.PipelineOptions)
.static <T> void
assertSplitAtFractionExhaustive(BoundedSource<T> source, PipelineOptions options)
Asserts that for each possible start position,BoundedSource.BoundedReader.splitAtFraction(double)
at every interesting fraction (halfway between two fractions that differ by at least one item) can be called successfully and the results are consistent if a split succeeds.static <T> void
assertSplitAtFractionFails(BoundedSource<T> source, int numItemsToReadBeforeSplit, double splitFraction, PipelineOptions options)
Asserts that thesource
's reader fails tosplitAtFraction(fraction)
after readingnumItemsToReadBeforeSplit
items.static <T> void
assertSplitAtFractionSucceedsAndConsistent(BoundedSource<T> source, int numItemsToReadBeforeSplit, double splitFraction, PipelineOptions options)
Verifies some consistency properties ofBoundedSource.BoundedReader.splitAtFraction(double)
on the given source.static <T> void
assertUnstartedReaderReadsSameAsItsSource(BoundedSource.BoundedReader<T> reader, PipelineOptions options)
Assert that aReader
returns aSource
that, when read from, produces the same records as the reader.static <T> java.util.List<org.apache.beam.sdk.testing.SourceTestUtils.ReadableStructuralValue<T>>
createStructuralValues(Coder<T> coder, java.util.List<T> list)
Testing utilities below depend on standard assertions and matchers to compare elements read by sources.static <T> java.util.List<T>
readFromSource(BoundedSource<T> source, PipelineOptions options)
Reads all elements from the givenBoundedSource
.static <T> java.util.List<T>
readFromSplitsOfSource(BoundedSource<T> source, long desiredBundleSizeBytes, PipelineOptions options)
static <T> java.util.List<T>
readFromStartedReader(Source.Reader<T> reader)
Reads all elements from the given startedSource.Reader
.static <T> java.util.List<T>
readFromUnstartedReader(Source.Reader<T> reader)
Reads all elements from the given unstartedSource.Reader
.static <T> java.util.List<T>
readNItemsFromStartedReader(Source.Reader<T> reader, int n)
Read elements from aSource.Reader
that has already hadSource.Reader.start()
called on it, until n elements are read.static <T> java.util.List<T>
readNItemsFromUnstartedReader(Source.Reader<T> reader, int n)
Read elements from aSource.Reader
until n elements are read.static <T> java.util.List<T>
readRemainingFromReader(Source.Reader<T> reader, boolean started)
Read all remaining elements from aSource.Reader
.static <T> BoundedSource<T>
toUnsplittableSource(BoundedSource<T> boundedSource)
Returns an equivalent unsplittableBoundedSource<T>
.
-
-
-
Method Detail
-
createStructuralValues
public static <T> java.util.List<org.apache.beam.sdk.testing.SourceTestUtils.ReadableStructuralValue<T>> createStructuralValues(Coder<T> coder, java.util.List<T> list) throws java.lang.Exception
Testing utilities below depend on standard assertions and matchers to compare elements read by sources. In general the elements may not implementequals
/hashCode
properly, however every source has aCoder
and everyCoder
can produce aCoder.structuralValue(T)
whoseequals
/hashCode
is consistent with equality of encoded format. So we use thisCoder.structuralValue(T)
to compare elements read by sources.- Throws:
java.lang.Exception
-
readFromSource
public static <T> java.util.List<T> readFromSource(BoundedSource<T> source, PipelineOptions options) throws java.io.IOException
Reads all elements from the givenBoundedSource
.- Throws:
java.io.IOException
-
readFromSplitsOfSource
public static <T> java.util.List<T> readFromSplitsOfSource(BoundedSource<T> source, long desiredBundleSizeBytes, PipelineOptions options) throws java.lang.Exception
- Throws:
java.lang.Exception
-
readFromUnstartedReader
public static <T> java.util.List<T> readFromUnstartedReader(Source.Reader<T> reader) throws java.io.IOException
Reads all elements from the given unstartedSource.Reader
.- Throws:
java.io.IOException
-
readFromStartedReader
public static <T> java.util.List<T> readFromStartedReader(Source.Reader<T> reader) throws java.io.IOException
Reads all elements from the given startedSource.Reader
.- Throws:
java.io.IOException
-
readNItemsFromUnstartedReader
public static <T> java.util.List<T> readNItemsFromUnstartedReader(Source.Reader<T> reader, int n) throws java.io.IOException
Read elements from aSource.Reader
until n elements are read.- Throws:
java.io.IOException
-
readNItemsFromStartedReader
public static <T> java.util.List<T> readNItemsFromStartedReader(Source.Reader<T> reader, int n) throws java.io.IOException
Read elements from aSource.Reader
that has already hadSource.Reader.start()
called on it, until n elements are read.- Throws:
java.io.IOException
-
readRemainingFromReader
public static <T> java.util.List<T> readRemainingFromReader(Source.Reader<T> reader, boolean started) throws java.io.IOException
Read all remaining elements from aSource.Reader
.- Throws:
java.io.IOException
-
assertSourcesEqualReferenceSource
public static <T> void assertSourcesEqualReferenceSource(BoundedSource<T> referenceSource, java.util.List<? extends BoundedSource<T>> sources, PipelineOptions options) throws java.lang.Exception
Given a referenceSource
and a list ofSource
s, assert that the union of the records read from the list of sources is equal to the records read from the reference source.- Throws:
java.lang.Exception
-
assertUnstartedReaderReadsSameAsItsSource
public static <T> void assertUnstartedReaderReadsSameAsItsSource(BoundedSource.BoundedReader<T> reader, PipelineOptions options) throws java.lang.Exception
Assert that aReader
returns aSource
that, when read from, produces the same records as the reader.- Throws:
java.lang.Exception
-
assertSplitAtFractionBehavior
public static <T> org.apache.beam.sdk.testing.SourceTestUtils.SplitAtFractionResult assertSplitAtFractionBehavior(BoundedSource<T> source, int numItemsToReadBeforeSplit, double splitFraction, SourceTestUtils.ExpectedSplitOutcome expectedOutcome, PipelineOptions options) throws java.lang.Exception
Asserts that thesource
's reader either fails tosplitAtFraction(fraction)
after readingnumItemsToReadBeforeSplit
items, or succeeds in a way that is consistent according toassertSplitAtFractionSucceedsAndConsistent(org.apache.beam.sdk.io.BoundedSource<T>, int, double, org.apache.beam.sdk.options.PipelineOptions)
.Returns SplitAtFractionResult.
- Throws:
java.lang.Exception
-
assertSplitAtFractionSucceedsAndConsistent
public static <T> void assertSplitAtFractionSucceedsAndConsistent(BoundedSource<T> source, int numItemsToReadBeforeSplit, double splitFraction, PipelineOptions options) throws java.lang.Exception
Verifies some consistency properties ofBoundedSource.BoundedReader.splitAtFraction(double)
on the given source. Equivalent to the following pseudocode:Reader reader = source.createReader(); read N items from reader; Source residual = reader.splitAtFraction(splitFraction); Source primary = reader.getCurrentSource(); assert: items in primary == items we read so far + items we'll get by continuing to read from reader; assert: items in original source == items in primary + items in residual
- Throws:
java.lang.Exception
-
assertSplitAtFractionFails
public static <T> void assertSplitAtFractionFails(BoundedSource<T> source, int numItemsToReadBeforeSplit, double splitFraction, PipelineOptions options) throws java.lang.Exception
Asserts that thesource
's reader fails tosplitAtFraction(fraction)
after readingnumItemsToReadBeforeSplit
items.- Throws:
java.lang.Exception
-
assertSplitAtFractionExhaustive
public static <T> void assertSplitAtFractionExhaustive(BoundedSource<T> source, PipelineOptions options) throws java.lang.Exception
Asserts that for each possible start position,BoundedSource.BoundedReader.splitAtFraction(double)
at every interesting fraction (halfway between two fractions that differ by at least one item) can be called successfully and the results are consistent if a split succeeds. Verifies multithreaded splitting as well.- Throws:
java.lang.Exception
-
toUnsplittableSource
public static <T> BoundedSource<T> toUnsplittableSource(BoundedSource<T> boundedSource)
Returns an equivalent unsplittableBoundedSource<T>
.It forwards most methods to the given
boundedSource
, except:BoundedSource.split(long, org.apache.beam.sdk.options.PipelineOptions)
rejects initial splitting by returning itself in a list.BoundedSource.BoundedReader.splitAtFraction(double)
rejects dynamic splitting by returning null.
-
-