File tree Expand file tree Collapse file tree 1 file changed +5
-2
lines changed
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/status Expand file tree Collapse file tree 1 file changed +5
-2
lines changed Original file line number Diff line number Diff line change @@ -65,8 +65,6 @@ public BeamFnStatusClient(
6565 PipelineOptions options ,
6666 Cache <?, ?> cache ) {
6767 this .channel = channelFactory .apply (apiServiceDescriptor );
68- this .outboundObserver =
69- BeamFnWorkerStatusGrpc .newStub (channel ).workerStatus (new InboundObserver ());
7068 this .processBundleCache = processBundleCache ;
7169 this .memoryMonitor = MemoryMonitor .fromOptions (options );
7270 this .cache = cache ;
@@ -76,6 +74,11 @@ public BeamFnStatusClient(
7674 thread .setPriority (Thread .MIN_PRIORITY );
7775 thread .setName ("MemoryMonitor" );
7876 thread .start ();
77+
78+ // Start the rpc after all the initialization is complete as the InboundObserver
79+ // may be called any time after this.
80+ this .outboundObserver =
81+ BeamFnWorkerStatusGrpc .newStub (channel ).workerStatus (new InboundObserver ());
7982 }
8083
8184 @ Override
You can’t perform that action at this time.
0 commit comments