@@ -183,7 +183,7 @@ public void testWriteAndRead() throws IOException, ParseException, InterruptedEx
183183 pipelineOperator .waitUntilDone (
184184 createConfig (writeInfo , Duration .ofMinutes (configuration .pipelineTimeout )));
185185 assertNotEquals (PipelineOperator .Result .LAUNCH_FAILED , writeResult );
186-
186+
187187 // Add monitoring for read job progress
188188 PipelineOperator .Result readResult =
189189 pipelineOperator .waitUntilDone (
@@ -300,7 +300,8 @@ private PipelineLauncher.LaunchInfo generateDataAndWrite() throws IOException {
300300 .addParameter ("experiments" , configuration .useDataflowRunnerV2 ? "use_runner_v2" : "" )
301301 .addParameter ("enableStreamingEngine" , "true" ) // Enable streaming engine
302302 .addParameter ("usePublicIps" , "false" ) // Use private IPs for better performance
303- .addParameter ("subnetwork" , "regions/us-central1/subnetworks/default" ) // Use default subnet
303+ .addParameter (
304+ "subnetwork" , "regions/us-central1/subnetworks/default" ) // Use default subnet
304305 .build ();
305306
306307 return pipelineLauncher .launch (project , region , options );
@@ -312,13 +313,14 @@ private PipelineLauncher.LaunchInfo readData() throws IOException {
312313 KafkaIO .readBytes ()
313314 .withBootstrapServers (configuration .bootstrapServers )
314315 .withTopic (kafkaTopic )
315- .withConsumerConfigUpdates (ImmutableMap .of (
316- "auto.offset.reset" , "earliest" ,
317- "session.timeout.ms" , "30000" , // Add session timeout
318- "heartbeat.interval.ms" , "10000" , // Add heartbeat interval
319- "max.poll.interval.ms" , "300000" , // Add max poll interval
320- "fetch.min.bytes" , "1" , // Add fetch min bytes
321- "fetch.max.wait.ms" , "500" )); // Add fetch max wait
316+ .withConsumerConfigUpdates (
317+ ImmutableMap .of (
318+ "auto.offset.reset" , "earliest" ,
319+ "session.timeout.ms" , "30000" , // Add session timeout
320+ "heartbeat.interval.ms" , "10000" , // Add heartbeat interval
321+ "max.poll.interval.ms" , "300000" , // Add max poll interval
322+ "fetch.min.bytes" , "1" , // Add fetch min bytes
323+ "fetch.max.wait.ms" , "500" )); // Add fetch max wait
322324
323325 readPipeline
324326 .apply ("Read from Kafka" , readFromKafka )
@@ -333,7 +335,8 @@ private PipelineLauncher.LaunchInfo readData() throws IOException {
333335 .addParameter ("experiments" , configuration .useDataflowRunnerV2 ? "use_runner_v2" : "" )
334336 .addParameter ("enableStreamingEngine" , "true" ) // Enable streaming engine
335337 .addParameter ("usePublicIps" , "false" ) // Use private IPs for better performance
336- .addParameter ("subnetwork" , "regions/us-central1/subnetworks/default" ) // Use default subnet
338+ .addParameter (
339+ "subnetwork" , "regions/us-central1/subnetworks/default" ) // Use default subnet
337340 .build ();
338341
339342 return pipelineLauncher .launch (project , region , options );
0 commit comments