@@ -155,7 +155,7 @@ public void setup() {
155155 Configuration .class ),
156156 "large" ,
157157 Configuration .fromJsonString (
158- "{\" rowsPerSecond\" :50000,\" numRecords\" :5000000,\" valueSizeBytes\" :1000,\" minutes\" :60,\" pipelineTimeout\" :240 ,\" runner\" :\" DataflowRunner\" }" ,
158+ "{\" rowsPerSecond\" :50000,\" numRecords\" :5000000,\" valueSizeBytes\" :1000,\" minutes\" :60,\" pipelineTimeout\" :180 ,\" runner\" :\" DataflowRunner\" }" ,
159159 Configuration .class ));
160160 } catch (IOException e ) {
161161 throw new RuntimeException (e );
@@ -178,6 +178,13 @@ public void testWriteAndRead() throws IOException, ParseException, InterruptedEx
178178 PipelineLauncher .LaunchInfo readInfo = readData ();
179179
180180 try {
181+ // Add monitoring for write job progress
182+ PipelineOperator .Result writeResult =
183+ pipelineOperator .waitUntilDone (
184+ createConfig (writeInfo , Duration .ofMinutes (configuration .pipelineTimeout )));
185+ assertNotEquals (PipelineOperator .Result .LAUNCH_FAILED , writeResult );
186+
187+ // Add monitoring for read job progress
181188 PipelineOperator .Result readResult =
182189 pipelineOperator .waitUntilDone (
183190 createConfig (readInfo , Duration .ofMinutes (configuration .pipelineTimeout )));
@@ -271,8 +278,12 @@ private PipelineLauncher.LaunchInfo generateDataAndWrite() throws IOException {
271278 .withProducerConfigUpdates (
272279 ImmutableMap .of (
273280 ProducerConfig .RETRIES_CONFIG , 10 ,
274- ProducerConfig .REQUEST_TIMEOUT_MS_CONFIG , 600000 ,
275- ProducerConfig .RETRY_BACKOFF_MS_CONFIG , 5000 ))
281+ ProducerConfig .REQUEST_TIMEOUT_MS_CONFIG , 300000 , // Reduced from 600000
282+ ProducerConfig .RETRY_BACKOFF_MS_CONFIG , 5000 ,
283+ ProducerConfig .DELIVERY_TIMEOUT_MS_CONFIG , 300000 , // Add delivery timeout
284+ ProducerConfig .BATCH_SIZE_CONFIG , 16384 , // Add batch size
285+ ProducerConfig .LINGER_MS_CONFIG , 100 , // Add linger time
286+ ProducerConfig .BUFFER_MEMORY_CONFIG , 33554432 )) // Add buffer memory
276287 .values ());
277288
278289 PipelineLauncher .LaunchConfig options =
@@ -287,6 +298,10 @@ private PipelineLauncher.LaunchInfo generateDataAndWrite() throws IOException {
287298 .addParameter ("numWorkers" , String .valueOf (configuration .numWorkers ))
288299 .addParameter ("maxNumWorkers" , String .valueOf (configuration .maxNumWorkers ))
289300 .addParameter ("experiments" , configuration .useDataflowRunnerV2 ? "use_runner_v2" : "" )
301+ .addParameter ("enableStreamingEngine" , "true" ) // Enable streaming engine
302+ .addParameter ("streamingMode" , "true" ) // Enable streaming mode
303+ .addParameter ("usePublicIps" , "false" ) // Use private IPs for better performance
304+ .addParameter ("subnetwork" , "regions/us-central1/subnetworks/default" ) // Use default subnet
290305 .build ();
291306
292307 return pipelineLauncher .launch (project , region , options );
@@ -298,7 +313,13 @@ private PipelineLauncher.LaunchInfo readData() throws IOException {
298313 KafkaIO .readBytes ()
299314 .withBootstrapServers (configuration .bootstrapServers )
300315 .withTopic (kafkaTopic )
301- .withConsumerConfigUpdates (ImmutableMap .of ("auto.offset.reset" , "earliest" ));
316+ .withConsumerConfigUpdates (ImmutableMap .of (
317+ "auto.offset.reset" , "earliest" ,
318+ "session.timeout.ms" , "30000" , // Add session timeout
319+ "heartbeat.interval.ms" , "10000" , // Add heartbeat interval
320+ "max.poll.interval.ms" , "300000" , // Add max poll interval
321+ "fetch.min.bytes" , "1" , // Add fetch min bytes
322+ "fetch.max.wait.ms" , "500" )); // Add fetch max wait
302323
303324 readPipeline
304325 .apply ("Read from Kafka" , readFromKafka )
@@ -311,6 +332,10 @@ private PipelineLauncher.LaunchInfo readData() throws IOException {
311332 .addParameter ("numWorkers" , String .valueOf (configuration .numWorkers ))
312333 .addParameter ("runner" , configuration .runner )
313334 .addParameter ("experiments" , configuration .useDataflowRunnerV2 ? "use_runner_v2" : "" )
335+ .addParameter ("enableStreamingEngine" , "true" ) // Enable streaming engine
336+ .addParameter ("streamingMode" , "true" ) // Enable streaming mode
337+ .addParameter ("usePublicIps" , "false" ) // Use private IPs for better performance
338+ .addParameter ("subnetwork" , "regions/us-central1/subnetworks/default" ) // Use default subnet
314339 .build ();
315340
316341 return pipelineLauncher .launch (project , region , options );
0 commit comments