Skip to content

Rework AsyncDoFn/FileDownloadDoFn to avoid outputting element outside of allowed timestamp bound #5866

@clairemcginty

Description

@clairemcginty

See discussion here: #5806
And here: apache/beam#34902 (comment)
And here: apache/beam#36838 (comment)

On updating to Beam 2.69, AsyncDoFn/FileDownloadDoFn tests started failing with:

[info] - should propagate element metadata *** FAILED *** (45 milliseconds)
[info]   org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalArgumentException: Cannot output with timestamp 1970-01-01T00:00:00.001Z. Output timestamps must be no earlier than the timestamp of the current input or timer (1970-01-01T00:00:00.008Z) minus the allowed skew (0 milliseconds) and no later than 294247-01-10T04:00:54.775Z. See the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.
[info]   at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:377)
[info]   at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:345)
[info]   at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
[info]   at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
[info]   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:325)
[info]   at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:442)
[info]   at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:381)
[info]   at com.spotify.scio.ScioContext.execute(ScioContext.scala:671)
[info]   at com.spotify.scio.ScioContext$$anonfun$run$1.apply(ScioContext.scala:658)
[info]   at com.spotify.scio.ScioContext$$anonfun$run$1.apply(ScioContext.scala:646)
[info]   ...
[info]   Cause: java.lang.IllegalArgumentException: Cannot output with timestamp 1970-01-01T00:00:00.001Z. Output timestamps must be no earlier than the timestamp of the current input or timer (1970-01-01T00:00:00.008Z) minus the allowed skew (0 milliseconds) and no later than 294247-01-10T04:00:54.775Z. See the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.
[info]   at org.apache.beam.runners.core.SimpleDoFnRunner.checkTimestamp(SimpleDoFnRunner.java:263)
[info]   at org.apache.beam.runners.core.SimpleDoFnRunner.access$1300(SimpleDoFnRunner.java:89)
[info]   at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.lambda$outputWindowedValue$0(SimpleDoFnRunner.java:462)
[info]   at org.apache.beam.sdk.values.WindowedValues$Builder.output(WindowedValues.java:210)
[info]   at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWindowedValue(SimpleDoFnRunner.java:465)
[info]   at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:123)
[info]   at org.apache.beam.sdk.values.WindowedValues$Builder.output(WindowedValues.java:210)
[info]   at org.apache.beam.sdk.transforms.DoFn$OutputReceiver.outputWindowedValue(DoFn.java:416)
[info]   at com.spotify.scio.transforms.BaseAsyncLookupDoFn.lambda$processElement$0(BaseAsyncLookupDoFn.java:190)
[info]   at com.spotify.scio.transforms.BaseAsyncLookupDoFn.flush(BaseAsyncLookupDoFn.java:310)
[info]   ...

Per discussion in the linked PRs/issues, we worked around this by just overriding allowed timestamp skew in those DoFns. However, a better option may be to just output the completed elements entirely in FinishBundle rather than in both ProcessElement/FinishBundle, which should (™️) avoid the timestamp check added to ProcessElement.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions