File tree Expand file tree Collapse file tree 2 files changed +11
-5
lines changed
e2e-tests/controller-spark
pipelines/batch/src/main/java/com/google/fhir/analytics Expand file tree Collapse file tree 2 files changed +11
-5
lines changed Original file line number Diff line number Diff line change @@ -227,10 +227,12 @@ function check_parquet() {
227227
228228 if [[ " ${isIncremental} " == " true" ]]
229229 then
230- # In case of incremental run, we will have two directories
231- # assuming batch run was executed before this.
230+ # In case of incremental run, we will have two directories (because a batch
231+ # run was executed first); one directory is for the first batch run and
232+ # the second is for the merge step of the incremental run. The second
233+ # directory has one more patient, hence the new totals.
232234 TOTAL_TEST_PATIENTS=$(( 2 * TOTAL_TEST_PATIENTS + 1 ))
233- TOTAL_VIEW_PATIENTS=108
235+ TOTAL_VIEW_PATIENTS=$(( 2 * TOTAL_VIEW_PATIENTS + 1 ))
234236 TOTAL_TEST_ENCOUNTERS=$(( 2 * TOTAL_TEST_ENCOUNTERS))
235237 TOTAL_TEST_OBS=$(( 2 * TOTAL_TEST_OBS))
236238 fi
Original file line number Diff line number Diff line change 11/*
2- * Copyright 2020-2024 Google LLC
2+ * Copyright 2020-2025 Google LLC
33 *
44 * Licensed under the Apache License, Version 2.0 (the "License");
55 * you may not use this file except in compliance with the License.
@@ -342,7 +342,11 @@ public static Pipeline writeMergedViewsPipeline(
342342 public void processElement (ProcessContext c ) {
343343 KV <String , CoGbkResult > e = c .element ();
344344 List <GenericRecord > lastRecords = new ArrayList <>();
345- e .getValue ().getAll (newDwh ).forEach (lastRecords ::add );
345+ Iterable <GenericRecord > iter = e .getValue ().getAll (newDwh );
346+ if (!iter .iterator ().hasNext ()) {
347+ iter = e .getValue ().getAll (oldDwh );
348+ }
349+ iter .forEach (lastRecords ::add );
346350 for (GenericRecord r : lastRecords ) {
347351 c .output (r );
348352 }
You can’t perform that action at this time.
0 commit comments