Class TestPipeline
- java.lang.Object
-
- org.apache.beam.sdk.Pipeline
-
- org.apache.beam.sdk.testing.TestPipeline
-
- All Implemented Interfaces:
org.junit.rules.TestRule
public class TestPipeline extends Pipeline implements org.junit.rules.TestRule
A creator of test pipelines that can be used inside of tests that can be configured to run locally or against a remote pipeline runner.It is recommended to tag hand-selected tests for this purpose using the
ValidatesRunner
Category
annotation, as each test run against a pipeline runner will utilize resources of that pipeline runner.In order to run tests on a pipeline runner, the following conditions must be met:
- System property "beamTestPipelineOptions" must contain a JSON delimited list of pipeline
options. For example:
Note that the set of pipeline options required is pipeline runner specific.[ "--runner=TestDataflowRunner", "--project=mygcpproject", "--stagingLocation=gs://mygcsbucket/path" ]
- Jars containing the SDK and test classes must be available on the classpath.
Use
PAssert
for tests, as it integrates with this test harness in both direct and remote execution modes. For example:@Rule public final transient TestPipeline p = TestPipeline.create(); @Test @Category(NeedsRunner.class) public void myPipelineTest() throws Exception { final PCollection<String> pCollection = pipeline.apply(...) PAssert.that(pCollection).containsInAnyOrder(...); pipeline.run(); }
For pipeline runners, it is required that they must throw an
AssertionError
containing the message from thePAssert
that failed.See also the Testing documentation section.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
TestPipeline.AbandonedNodeException
An exception thrown in case an abandonedPTransform
is detected, that is, aPTransform
that has not been run.static class
TestPipeline.PipelineRunMissingException
An exception thrown in case a test finishes without invokingPipeline.run()
.static interface
TestPipeline.TestValueProviderOptions
Implementation detail ofnewProvider(T)
, do not use.-
Nested classes/interfaces inherited from class org.apache.beam.sdk.Pipeline
Pipeline.PipelineExecutionException, Pipeline.PipelineVisitor
-
-
Field Summary
Fields Modifier and Type Field Description static java.lang.String
PROPERTY_BEAM_TEST_PIPELINE_OPTIONS
System property used to setTestPipelineOptions
.
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description org.junit.runners.model.Statement
apply(org.junit.runners.model.Statement statement, org.junit.runner.Description description)
static TestPipeline
create()
Creates and returns a new test pipeline.TestPipeline
enableAbandonedNodeEnforcement(boolean enable)
Enables the abandoned node detection.TestPipeline
enableAutoRunIfMissing(boolean enable)
If enabled, apipeline.run()
statement will be added automatically in case it is missing in the test.static TestPipeline
fromOptions(PipelineOptions options)
PipelineOptions
getOptions()
<T> ValueProvider<T>
newProvider(T runtimeValue)
Returns a newValueProvider
that is inaccessible beforerun()
, but will be accessible while the pipeline runs.PipelineResult
run()
Runs thisTestPipeline
, unwrapping anyAssertionError
that is raised during testing.PipelineResult
run(PipelineOptions options)
Likerun()
but with the given potentially modified options.PipelineResult
runWithAdditionalOptionArgs(java.util.List<java.lang.String> additionalArgs)
Runs thisTestPipeline
with additional cmd pipeline option args.static PipelineOptions
testingPipelineOptions()
CreatesPipelineOptions
for testing.java.lang.String
toString()
static void
verifyPAssertsSucceeded(Pipeline pipeline, PipelineResult pipelineResult)
Verifies all {PAsserts
} in the pipeline have been executed and were successful.-
Methods inherited from class org.apache.beam.sdk.Pipeline
apply, apply, applyTransform, applyTransform, begin, create, forTransformHierarchy, getCoderRegistry, getSchemaRegistry, replaceAll, setCoderRegistry, traverseTopologically
-
-
-
-
Field Detail
-
PROPERTY_BEAM_TEST_PIPELINE_OPTIONS
public static final java.lang.String PROPERTY_BEAM_TEST_PIPELINE_OPTIONS
System property used to setTestPipelineOptions
.- See Also:
- Constant Field Values
-
-
Method Detail
-
create
public static TestPipeline create()
Creates and returns a new test pipeline.Use
PAssert
to add tests, then callPipeline.run()
to execute the pipeline and check the tests.
-
fromOptions
public static TestPipeline fromOptions(PipelineOptions options)
-
getOptions
public PipelineOptions getOptions()
- Overrides:
getOptions
in classPipeline
-
apply
public org.junit.runners.model.Statement apply(org.junit.runners.model.Statement statement, org.junit.runner.Description description)
- Specified by:
apply
in interfaceorg.junit.rules.TestRule
-
run
public PipelineResult run()
Runs thisTestPipeline
, unwrapping anyAssertionError
that is raised during testing.
-
runWithAdditionalOptionArgs
public PipelineResult runWithAdditionalOptionArgs(java.util.List<java.lang.String> additionalArgs)
Runs thisTestPipeline
with additional cmd pipeline option args.This is useful when using
PipelineOptions.as(Class)
directly introduces circular dependency.Most of logic is similar to
testingPipelineOptions()
.
-
run
public PipelineResult run(PipelineOptions options)
Likerun()
but with the given potentially modified options.
-
newProvider
public <T> ValueProvider<T> newProvider(T runtimeValue)
Returns a newValueProvider
that is inaccessible beforerun()
, but will be accessible while the pipeline runs.
-
enableAbandonedNodeEnforcement
public TestPipeline enableAbandonedNodeEnforcement(boolean enable)
Enables the abandoned node detection. Abandoned nodes arePTransforms
,PAsserts
included, that were not executed by the pipeline runner. Abandoned nodes are most likely to occur due to the one of the following scenarios:- Lack of a
pipeline.run()
statement at the end of a test. - Addition of PTransforms after the pipeline has already run.
CrashingRunner
) and/or aNeedsRunner
or aValidatesRunner
annotation are detected. - Lack of a
-
enableAutoRunIfMissing
public TestPipeline enableAutoRunIfMissing(boolean enable)
If enabled, apipeline.run()
statement will be added automatically in case it is missing in the test.
-
testingPipelineOptions
public static PipelineOptions testingPipelineOptions()
CreatesPipelineOptions
for testing.
-
verifyPAssertsSucceeded
public static void verifyPAssertsSucceeded(Pipeline pipeline, PipelineResult pipelineResult)
Verifies all {PAsserts
} in the pipeline have been executed and were successful.Note this only runs for runners which support Metrics. Runners which do not should verify this in some other way. See: https://issues.apache.org/jira/browse/BEAM-2001
-
-