@@ -228,7 +228,8 @@ public void setup() {
228228 public void processElement (ProcessContext c , @ StateId (ROW_COUNTER ) ValueState <Integer > counter )
229229 throws Exception {
230230 int current = firstNonNull (counter .read (), 0 );
231- // We update schema early on to leave a healthy amount of time for the StreamWriter to recognize it,
231+ // We update schema early on to leave a healthy amount of time for the StreamWriter to
232+ // recognize it,
232233 // ensuring that subsequent writers are created with the updated schema.
233234 if (current == SCHEMA_UPDATE_TRIGGER ) {
234235 for (Map .Entry <String , String > entry : newSchemas .entrySet ()) {
@@ -246,8 +247,10 @@ public void processElement(ProcessContext c, @StateId(ROW_COUNTER) ValueState<In
246247 while (System .currentTimeMillis () - startTime < timeoutMillis ) {
247248 schemaPropagated = true ;
248249 for (Map .Entry <String , String > entry : newSchemas .entrySet ()) {
249- TableSchema currentSchema = bqClient .getTableResource (projectId , datasetId , entry .getKey ()).getSchema ();
250- TableSchema expectedSchema = BigQueryHelpers .fromJsonString (entry .getValue (), TableSchema .class );
250+ TableSchema currentSchema =
251+ bqClient .getTableResource (projectId , datasetId , entry .getKey ()).getSchema ();
252+ TableSchema expectedSchema =
253+ BigQueryHelpers .fromJsonString (entry .getValue (), TableSchema .class );
251254 if (currentSchema .getFields ().size () != expectedSchema .getFields ().size ()) {
252255 schemaPropagated = false ;
253256 break ;
@@ -261,7 +264,9 @@ public void processElement(ProcessContext c, @StateId(ROW_COUNTER) ValueState<In
261264 if (!schemaPropagated ) {
262265 LOG .warn ("Schema update did not propagate fully within the timeout." );
263266 } else {
264- LOG .info ("Schema update propagated fully within the timeout - {}." , System .currentTimeMillis () - startTime );
267+ LOG .info (
268+ "Schema update propagated fully within the timeout - {}." ,
269+ System .currentTimeMillis () - startTime );
265270 // wait for streams to recognize the new schema
266271 Thread .sleep (STREAM_RECOGNITION_DELAY_MS );
267272 }
@@ -409,14 +414,19 @@ private void runStreamingPipelineWithSchemaChange(
409414 // recognize the new schema. Apply on relevant tests.
410415 boolean waitLonger = changeTableSchema && (useAutoSchemaUpdate || !useInputSchema );
411416 if (method == Write .Method .STORAGE_WRITE_API ) {
412- write = write .withTriggeringFrequency (Duration .standardSeconds (waitLonger ? LONG_WAIT_SECONDS : 1 ));
417+ write =
418+ write .withTriggeringFrequency (
419+ Duration .standardSeconds (waitLonger ? LONG_WAIT_SECONDS : 1 ));
413420 }
414421
415422 // set up and build pipeline
416423 Instant start = new Instant (0 );
417- Duration interval = waitLonger ? Duration .standardSeconds (LONG_WAIT_SECONDS ) : Duration .millis (1 );
424+ Duration interval =
425+ waitLonger ? Duration .standardSeconds (LONG_WAIT_SECONDS ) : Duration .millis (1 );
418426 Duration stop =
419- waitLonger ? Duration .standardSeconds ((TOTAL_N - 1 ) * LONG_WAIT_SECONDS ) : Duration .millis (TOTAL_N - 1 );
427+ waitLonger
428+ ? Duration .standardSeconds ((TOTAL_N - 1 ) * LONG_WAIT_SECONDS )
429+ : Duration .millis (TOTAL_N - 1 );
420430 Function <Instant , Long > getIdFromInstant =
421431 waitLonger
422432 ? (Function <Instant , Long > & Serializable )
@@ -666,17 +676,21 @@ public void runDynamicDestinationsWithAutoSchemaUpdate(boolean useAtLeastOnce) t
666676 write =
667677 write
668678 .withMethod (Write .Method .STORAGE_WRITE_API )
669- .withTriggeringFrequency (Duration .standardSeconds (changeTableSchema ? LONG_WAIT_SECONDS : 1 ));
679+ .withTriggeringFrequency (
680+ Duration .standardSeconds (changeTableSchema ? LONG_WAIT_SECONDS : 1 ));
670681 }
671682
672683 int numRows = TOTAL_N ;
673684 // set up and build pipeline
674685 Instant start = new Instant (0 );
675686 // We give a healthy waiting period between each element to give Storage API streams a chance to
676687 // recognize the new schema. Apply on relevant tests.
677- Duration interval = changeTableSchema ? Duration .standardSeconds (LONG_WAIT_SECONDS ) : Duration .millis (1 );
688+ Duration interval =
689+ changeTableSchema ? Duration .standardSeconds (LONG_WAIT_SECONDS ) : Duration .millis (1 );
678690 Duration stop =
679- changeTableSchema ? Duration .standardSeconds ((numRows - 1 ) * LONG_WAIT_SECONDS ) : Duration .millis (numRows - 1 );
691+ changeTableSchema
692+ ? Duration .standardSeconds ((numRows - 1 ) * LONG_WAIT_SECONDS )
693+ : Duration .millis (numRows - 1 );
680694 Function <Instant , Long > getIdFromInstant =
681695 changeTableSchema
682696 ? (Function <Instant , Long > & Serializable )
0 commit comments