3030import java .util .HashMap ;
3131import java .util .List ;
3232import java .util .Map ;
33+ import java .util .Queue ;
3334import java .util .Set ;
35+ import java .util .concurrent .LinkedBlockingQueue ;
3436import org .apache .beam .model .pipeline .v1 .RunnerApi ;
3537import org .apache .beam .runners .fnexecution .provisioning .JobInfo ;
3638import org .apache .beam .runners .spark .SparkPipelineOptions ;
5961import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .collect .ImmutableMap ;
6062import org .apache .spark .api .java .JavaRDD ;
6163import org .apache .spark .api .java .JavaSparkContext ;
62- import org .apache .spark .api .java .JavaSparkContext$ ;
6364import org .apache .spark .broadcast .Broadcast ;
6465import org .apache .spark .storage .StorageLevel ;
6566import org .apache .spark .streaming .api .java .JavaDStream ;
66- import org .apache .spark .streaming .dstream . ConstantInputDStream ;
67+ import org .apache .spark .streaming .api . java . JavaInputDStream ;
6768import scala .Tuple2 ;
6869import scala .collection .JavaConverters ;
6970
@@ -157,17 +158,14 @@ private static void translateImpulse(
157158 .parallelize (CoderHelpers .toByteArrays (windowedValues , windowCoder ))
158159 .map (CoderHelpers .fromByteFunction (windowCoder ));
159160
160- final ConstantInputDStream <WindowedValue <byte []>> inputDStream =
161- new ConstantInputDStream <>(
162- context .getStreamingContext ().ssc (),
163- emptyByteArrayRDD .rdd (),
164- JavaSparkContext$ .MODULE$ .fakeClassTag ());
165-
166- final JavaDStream <WindowedValue <byte []>> stream =
167- JavaDStream .fromDStream (inputDStream , JavaSparkContext$ .MODULE$ .fakeClassTag ());
168-
161+ Queue <JavaRDD <WindowedValue <byte []>>> rddQueue = new LinkedBlockingQueue <>();
162+ rddQueue .offer (emptyByteArrayRDD );
163+ JavaInputDStream <WindowedValue <byte []>> emptyByteArrayStream =
164+ context .getStreamingContext ().queueStream (rddQueue , true /* oneAtATime */ );
169165 UnboundedDataset <byte []> output =
170- new UnboundedDataset <>(stream , Collections .singletonList (inputDStream .id ()));
166+ new UnboundedDataset <>(
167+ emptyByteArrayStream ,
168+ Collections .singletonList (emptyByteArrayStream .inputDStream ().id ()));
171169
172170 // Add watermark to holder and advance to infinity to ensure future watermarks can be updated
173171 GlobalWatermarkHolder .SparkWatermarks sparkWatermark =
@@ -307,11 +305,9 @@ private static <T> void translateFlatten(
307305 List <Integer > streamSources = new ArrayList <>();
308306
309307 if (inputsMap .isEmpty ()) {
310- final JavaRDD <WindowedValue <T >> emptyRDD = context .getSparkContext ().emptyRDD ();
311- final SingleEmitInputDStream <WindowedValue <T >> singleEmitInputDStream =
312- new SingleEmitInputDStream <>(context .getStreamingContext ().ssc (), emptyRDD .rdd ());
313- unifiedStreams =
314- JavaDStream .fromDStream (singleEmitInputDStream , JavaSparkContext$ .MODULE$ .fakeClassTag ());
308+ Queue <JavaRDD <WindowedValue <T >>> q = new LinkedBlockingQueue <>();
309+ q .offer (context .getSparkContext ().emptyRDD ());
310+ unifiedStreams = context .getStreamingContext ().queueStream (q );
315311 } else {
316312 List <JavaDStream <WindowedValue <T >>> dStreams = new ArrayList <>();
317313 for (String inputId : inputsMap .values ()) {
@@ -322,13 +318,11 @@ private static <T> void translateFlatten(
322318 dStreams .add (unboundedDataset .getDStream ());
323319 } else {
324320 // create a single RDD stream.
325- final SingleEmitInputDStream <WindowedValue <T >> singleEmitInputDStream =
326- new SingleEmitInputDStream <WindowedValue <T >>(
327- context .getStreamingContext ().ssc (), ((BoundedDataset ) dataset ).getRDD ().rdd ());
328- final JavaDStream <WindowedValue <T >> dStream =
329- JavaDStream .fromDStream (
330- singleEmitInputDStream , JavaSparkContext$ .MODULE$ .fakeClassTag ());
331-
321+ Queue <JavaRDD <WindowedValue <T >>> q = new LinkedBlockingQueue <>();
322+ q .offer (((BoundedDataset ) dataset ).getRDD ());
323+ // TODO (https://github.com/apache/beam/issues/20426): this is not recoverable from
324+ // checkpoint!
325+ JavaDStream <WindowedValue <T >> dStream = context .getStreamingContext ().queueStream (q );
332326 dStreams .add (dStream );
333327 }
334328 }
0 commit comments