From 385217f145d79e8bfc63fe04bb4c48782577ea89 Mon Sep 17 00:00:00 2001 From: Arunodoy18 Date: Thu, 20 Nov 2025 15:20:14 +0530 Subject: [PATCH 1/6] BEAM-13231: Add glob wildcard escaping utilities to FileSystems This commit adds support for escaping and unescaping glob wildcard characters in file path specifications, addressing the issue where files with literal glob metacharacters (*, ?, {, }) in their names cannot be matched. Changes: - Added escapeGlobWildcards(String spec) method to escape glob metacharacters by prefixing them with backslash - Added unescapeGlobWildcards(String spec) method to remove backslash prefixes from escaped glob characters - Added comprehensive test cases for both methods including round-trip testing These utilities provide the foundation for allowing users to treat glob metacharacters as literals when they appear in actual filenames. Fixes BEAM-13231 --- .../org/apache/beam/sdk/io/FileSystems.java | 36 +++++++++++++ .../apache/beam/sdk/io/FileSystemsTest.java | 50 +++++++++++++++++++ 2 files changed, 86 insertions(+) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java index 7e2940a2c35b..2edc5c7ba45b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java @@ -78,6 +78,8 @@ public class FileSystems { private static final Pattern FILE_SCHEME_PATTERN = Pattern.compile("(?[a-zA-Z][-a-zA-Z0-9+.]*):/.*"); private static final Pattern GLOB_PATTERN = Pattern.compile("[*?{}]"); + private static final Pattern ESCAPED_GLOB_PATTERN = Pattern.compile("\\\\[*?{}]"); + private static final String GLOB_ESCAPE_PREFIX = "\\"; private static final AtomicReference> FILESYSTEM_REVISION = new AtomicReference<>(); @@ -92,6 +94,40 @@ public static boolean hasGlobWildcard(String spec) { return GLOB_PATTERN.matcher(spec).find(); } + /** + * Escapes glob wildcard characters in the given spec so they are treated as literals. + * + *

This method escapes the characters '*', '?', '{', and '}' by prefixing them with a + * backslash, allowing them to be treated as literal characters in a file path rather than as + * glob wildcards. + * + *

Example: {@code escapeGlobWildcards("file*.txt")} returns {@code "file\\*.txt"} + * + * @param spec the file path specification to escape + * @return the escaped specification + */ + public static String escapeGlobWildcards(String spec) { + checkNotNull(spec, "spec cannot be null"); + return spec.replaceAll("([*?{}])", "\\\\$1"); + } + + /** + * Unescapes glob wildcard characters in the given spec that were previously escaped with {@link + * #escapeGlobWildcards(String)}. + * + *

This method removes the backslash prefix from escaped glob characters ('*', '?', '{', '}'), + * restoring them to their unescaped form. + * + *

Example: {@code unescapeGlobWildcards("file\\*.txt")} returns {@code "file*.txt"} + * + * @param spec the file path specification to unescape + * @return the unescaped specification + */ + public static String unescapeGlobWildcards(String spec) { + checkNotNull(spec, "spec cannot be null"); + return spec.replaceAll("\\\\([*?{}])", "$1"); + } + /** * This is the entry point to convert user-provided specs to {@link ResourceId ResourceIds}. * Callers should use {@link #match} to resolve users specs ambiguities before calling other diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java index 34567309c7d0..a513c1b1f462 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java @@ -354,6 +354,56 @@ private void createFileWithContent(Path path, String content) throws Exception { } } + @Test + public void testEscapeGlobWildcards() { + // Test escaping asterisk + assertEquals("file\\*.txt", FileSystems.escapeGlobWildcards("file*.txt")); + + // Test escaping question mark + assertEquals("file\\?.txt", FileSystems.escapeGlobWildcards("file?.txt")); + + // Test escaping braces + assertEquals("file\\{1,2\\}.txt", FileSystems.escapeGlobWildcards("file{1,2}.txt")); + + // Test escaping multiple characters + assertEquals("\\*\\?\\{\\}.txt", FileSystems.escapeGlobWildcards("*?{}.txt")); + + // Test string with no glob characters + assertEquals("file.txt", FileSystems.escapeGlobWildcards("file.txt")); + + // Test empty string + assertEquals("", FileSystems.escapeGlobWildcards("")); + } + + @Test + public void testUnescapeGlobWildcards() { + // Test unescaping asterisk + assertEquals("file*.txt", FileSystems.unescapeGlobWildcards("file\\*.txt")); + + // Test unescaping question mark + assertEquals("file?.txt", FileSystems.unescapeGlobWildcards("file\\?.txt")); + + // Test unescaping braces + assertEquals("file{1,2}.txt", FileSystems.unescapeGlobWildcards("file\\{1,2\\}.txt")); + + // Test unescaping multiple characters + assertEquals("*?{}.txt", FileSystems.unescapeGlobWildcards("\\*\\?\\{\\}.txt")); + + // Test string with no escaped characters + assertEquals("file.txt", FileSystems.unescapeGlobWildcards("file.txt")); + + // Test empty string + assertEquals("", FileSystems.unescapeGlobWildcards("")); + } + + @Test + public void testEscapeUnescapeRoundTrip() { + String original = "file*test?.txt"; + String escaped = FileSystems.escapeGlobWildcards(original); + String unescaped = FileSystems.unescapeGlobWildcards(escaped); + assertEquals(original, unescaped); + } + private LocalResourceId toLocalResourceId(String str) throws Exception { boolean isDirectory; if (SystemUtils.IS_OS_WINDOWS) { From bf37c0f44a4b710577e16653b33ed3d488ff1948 Mon Sep 17 00:00:00 2001 From: Arunodoy18 Date: Sat, 22 Nov 2025 21:04:00 +0530 Subject: [PATCH 2/6] [BEAM-31422] Fix IllegalStateException in STORAGE_WRITE_API with CDC When CDC (Change Data Capture) is enabled with STORAGE_WRITE_API method, the system was incorrectly using PENDING streams instead of the default stream, causing an IllegalStateException due to checkState validation. Changes: - Fixed StorageApiWriteUnshardedRecords to use default stream when CDC is enabled - Added comprehensive test case to prevent regression CDC requires default streams because PENDING streams don't support the RowMutationInformation functionality needed for upserts and deletes. Fixes #31422 --- .../StorageApiWriteUnshardedRecords.java | 2 +- .../io/gcp/bigquery/BigQueryIOWriteTest.java | 45 +++++++++++++++++++ 2 files changed, 46 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index f6d10b47ccf2..14f93a95fa6d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -219,7 +219,7 @@ public PCollectionTuple expand(PCollection write = + BigQueryIO.write() + .to("project-id:dataset-id.table-id") + .withSchema(schema) + .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API) + .withRowMutationInformationFn( + (Row row) -> { + return RowMutationInformation.of( + RowMutationInformation.MutationType.UPSERT, row.getValue("id").toString()); + }) + .withTestServices(fakeBqServices) + .withoutValidation(); + + // Create test data with CDC-style updates + Schema beamSchema = Schema.builder().addInt32Field("id").addStringField("name").build(); + + List testData = + ImmutableList.of( + Row.withSchema(beamSchema).addValues(1, "Alice").build(), + Row.withSchema(beamSchema).addValues(2, "Bob").build(), + Row.withSchema(beamSchema).addValues(1, "Alice Updated").build() // Update row with id=1 + ); + + // This should not throw an IllegalStateException + PCollection input = p.apply(Create.of(testData).withRowSchema(beamSchema)); + + WriteResult result = input.apply("WriteCdcToBQ", write); + + p.run(); // Should complete successfully without IllegalStateException + } } From 095049274f122c7561bcb0b0f9a7fc2a12d09abf Mon Sep 17 00:00:00 2001 From: Arunodoy18 Date: Sun, 23 Nov 2025 02:40:15 +0530 Subject: [PATCH 3/6] Fix triggeringFrequency documentation and validation for STORAGE_API_AT_LEAST_ONCE - Updated JavaDoc to reflect that triggeringFrequency applies to FILE_LOADS, STORAGE_WRITE_API, and STORAGE_API_AT_LEAST_ONCE methods - Fixed validation logic in expand() to require triggeringFrequency for STORAGE_API_AT_LEAST_ONCE when writing unbounded PCollections - Removed conflicting warning that incorrectly stated STORAGE_API_AT_LEAST_ONCE ignores triggeringFrequency - Added comprehensive test cases to verify proper validation behavior: * Test failure when triggeringFrequency is missing for unbounded collections * Test success when triggeringFrequency is provided for unbounded collections * Test success for bounded collections without triggeringFrequency requirement - Updated error messages to include all three supported methods This ensures consistent behavior across all BigQuery write methods that support triggered writes for unbounded collections. --- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 24 +++---- .../io/gcp/bigquery/BigQueryIOWriteTest.java | 69 ++++++++++++++++++- 2 files changed, 79 insertions(+), 14 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 7aef1bd1ce02..7589270c3646 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -3330,8 +3330,9 @@ public Write withLoadJobProjectId(ValueProvider loadJobProjectId) { /** * Choose the frequency at which file writes are triggered. * - *

This is only applicable when the write method is set to {@link Method#FILE_LOADS} or - * {@link Method#STORAGE_WRITE_API}, and only when writing an unbounded {@link PCollection}. + *

This is only applicable when the write method is set to {@link Method#FILE_LOADS}, {@link + * Method#STORAGE_WRITE_API}, or {@link Method#STORAGE_API_AT_LEAST_ONCE}, and only when writing + * an unbounded {@link PCollection}. * *

Every triggeringFrequency duration, a BigQuery load job will be generated for all the data * written since the last load job. BigQuery has limits on how many load jobs can be triggered @@ -3736,19 +3737,22 @@ public WriteResult expand(PCollection input) { BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class); Write.Method method = resolveMethod(input); if (input.isBounded() == IsBounded.UNBOUNDED) { - if (method == Write.Method.FILE_LOADS || method == Write.Method.STORAGE_WRITE_API) { + if (method == Write.Method.FILE_LOADS + || method == Write.Method.STORAGE_WRITE_API + || method == Write.Method.STORAGE_API_AT_LEAST_ONCE) { Duration triggeringFrequency = - (method == Write.Method.STORAGE_WRITE_API) + (method == Write.Method.STORAGE_WRITE_API + || method == Write.Method.STORAGE_API_AT_LEAST_ONCE) ? getStorageApiTriggeringFrequency(bqOptions) : getTriggeringFrequency(); checkArgument( triggeringFrequency != null, - "When writing an unbounded PCollection via FILE_LOADS or STORAGE_WRITE_API, " + "When writing an unbounded PCollection via FILE_LOADS, STORAGE_WRITE_API, or STORAGE_API_AT_LEAST_ONCE, " + "triggering frequency must be specified"); } else { checkArgument( getTriggeringFrequency() == null, - "Triggering frequency can be specified only when writing via FILE_LOADS or STORAGE_WRITE_API, but the method was %s.", + "Triggering frequency can be specified only when writing via FILE_LOADS, STORAGE_WRITE_API, or STORAGE_API_AT_LEAST_ONCE, but the method was %s.", method); } if (method != Method.FILE_LOADS) { @@ -3757,13 +3761,7 @@ public WriteResult expand(PCollection input) { "Number of file shards can be specified only when writing via FILE_LOADS, but the method was %s.", method); } - if (method == Method.STORAGE_API_AT_LEAST_ONCE - && getStorageApiTriggeringFrequency(bqOptions) != null) { - LOG.warn( - "Storage API triggering frequency option will be ignored is it can only be specified only " - + "when writing via STORAGE_WRITE_API, but the method was {}.", - method); - } + if (getAutoSharding()) { if (method == Method.STORAGE_WRITE_API && getStorageApiNumStreams(bqOptions) > 0) { LOG.warn( diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index c23aff222d99..324f520ea4b2 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -2632,7 +2632,8 @@ public void testStreamingWriteValidateFailsWithoutTriggeringFrequency() { Method method = useStorageApi ? Method.STORAGE_WRITE_API : Method.FILE_LOADS; thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("unbounded PCollection via FILE_LOADS or STORAGE_WRITE_API"); + thrown.expectMessage( + "unbounded PCollection via FILE_LOADS, STORAGE_WRITE_API, or STORAGE_API_AT_LEAST_ONCE"); thrown.expectMessage("triggering frequency must be specified"); p.getOptions().as(BigQueryOptions.class).setStorageWriteApiTriggeringFrequencySec(null); @@ -2646,6 +2647,72 @@ public void testStreamingWriteValidateFailsWithoutTriggeringFrequency() { .withCreateDisposition(CreateDisposition.CREATE_NEVER)); } + @Test + public void testStreamingWriteValidateFailsWithoutTriggeringFrequencyForStorageApiAtLeastOnce() { + assumeTrue(useStreaming); + assumeTrue(useStorageApiApproximate); // Test STORAGE_API_AT_LEAST_ONCE specifically + p.enableAbandonedNodeEnforcement(false); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage( + "unbounded PCollection via FILE_LOADS, STORAGE_WRITE_API, or STORAGE_API_AT_LEAST_ONCE"); + thrown.expectMessage("triggering frequency must be specified"); + + p.getOptions().as(BigQueryOptions.class).setStorageWriteApiTriggeringFrequencySec(null); + p.apply(Create.empty(INPUT_RECORD_CODER)) + .setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED) + .apply( + BigQueryIO.write() + .withAvroFormatFunction(r -> new GenericData.Record(r.getSchema())) + .to("dataset.table") + .withMethod(Method.STORAGE_API_AT_LEAST_ONCE) + .withCreateDisposition(CreateDisposition.CREATE_NEVER)); + } + + @Test + public void testStreamingWriteValidateSucceedsWithTriggeringFrequencyForStorageApiAtLeastOnce() { + assumeTrue(useStreaming); + assumeTrue(useStorageApiApproximate); // Test STORAGE_API_AT_LEAST_ONCE specifically + p.enableAbandonedNodeEnforcement(false); + + // This should not throw - STORAGE_API_AT_LEAST_ONCE with triggering frequency should be valid + p.getOptions().as(BigQueryOptions.class).setStorageWriteApiTriggeringFrequencySec(30); + p.apply(Create.empty(INPUT_RECORD_CODER)) + .setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED) + .apply( + BigQueryIO.write() + .withAvroFormatFunction(r -> new GenericData.Record(r.getSchema())) + .to("dataset.table") + .withMethod(Method.STORAGE_API_AT_LEAST_ONCE) + .withCreateDisposition(CreateDisposition.CREATE_NEVER) + .withTestServices(fakeBqServices) + .withoutValidation()); + // Should validate without throwing + p.run(); + } + + @Test + public void testBoundedWriteValidateSucceedsWithoutTriggeringFrequencyForStorageApiAtLeastOnce() { + assumeTrue(!useStreaming); // Test bounded PCollection + assumeTrue(useStorageApiApproximate); // Test STORAGE_API_AT_LEAST_ONCE specifically + + // Bounded collections should not require triggering frequency even for + // STORAGE_API_AT_LEAST_ONCE + p.getOptions().as(BigQueryOptions.class).setStorageWriteApiTriggeringFrequencySec(null); + p.apply(Create.empty(INPUT_RECORD_CODER)) + .setIsBoundedInternal(PCollection.IsBounded.BOUNDED) + .apply( + BigQueryIO.write() + .withAvroFormatFunction(r -> new GenericData.Record(r.getSchema())) + .to("dataset.table") + .withMethod(Method.STORAGE_API_AT_LEAST_ONCE) + .withCreateDisposition(CreateDisposition.CREATE_NEVER) + .withTestServices(fakeBqServices) + .withoutValidation()); + // Should validate without throwing + p.run(); + } + @Test public void testBigQueryIOGetName() { assertEquals( From 16062d3a7819239b078982a4ada4af37877881e1 Mon Sep 17 00:00:00 2001 From: Arunodoy18 Date: Mon, 24 Nov 2025 13:00:47 +0530 Subject: [PATCH 4/6] Add Python side input pattern for slowly updating global window - Added Python implementation of slowly updating global window side inputs pattern - Uses PeriodicSequence instead of GenerateSequence for broader compatibility - Implements Latest.Globally().without_defaults() as mentioned in issue #35934 - Added test coverage for the new pattern - Updated documentation to include Python example alongside Java Fixes missing Python side input pattern in documentation. Addresses issue #35934 by using Latest.Globally().without_defaults() for non-global windowing scenarios and ensuring Dataflow compatibility. --- .../apache_beam/examples/snippets/snippets.py | 67 +++++++++++++++++++ .../examples/snippets/snippets_test.py | 14 ++++ .../en/documentation/patterns/side-inputs.md | 2 +- 3 files changed, 82 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index f6bf5e5d44ec..c3d8fc25bcac 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -1451,6 +1451,73 @@ def cross_join(left, rights): return pipeline, result +def side_input_slow_update_global_window(): + # [START SideInputPatternSlowUpdateGlobalWindowSnip1] + import apache_beam as beam + from apache_beam.transforms.periodicsequence import PeriodicSequence + from apache_beam import pvalue + from apache_beam.transforms import window + from apache_beam.transforms.trigger import Repeatedly, AfterProcessingTime + from apache_beam.utils.timestamp import MAX_TIMESTAMP + import time + import logging + + def placeholder_external_service_read_test_data(element): + """Placeholder function that represents an external service generating test data.""" + # Replace with actual external service call + return { + 'Key_A': time.strftime('%H:%M:%S', time.localtime(time.time())) + } + + # Create pipeline + pipeline = beam.Pipeline() + + # Create a side input that updates every 5 seconds. + # View as an iterable, not singleton, so that if we happen to trigger more + # than once before Latest.Globally is computed we can handle both elements. + side_input = ( + pipeline + | 'SideInputSequence' >> PeriodicSequence( + start=time.time(), + stop=MAX_TIMESTAMP, # Run indefinitely + interval=5) # Update every 5 seconds + | 'FetchExternalData' >> beam.Map(placeholder_external_service_read_test_data) + | 'GlobalWindow' >> beam.WindowInto( + window.GlobalWindows(), + trigger=Repeatedly(AfterProcessingTime(delay=0)), + accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING) + | 'LatestValue' >> beam.combiners.Latest.Globally().without_defaults() + | 'ViewAsIterable' >> beam.pvalue.AsIter()) + + # Consume side input. Use PeriodicSequence for test data. + # Use a real source (like PubSubIO or KafkaIO) in production. + main_result = ( + pipeline + | 'MainSequence' >> PeriodicSequence( + start=time.time() - 1, + stop=MAX_TIMESTAMP, + interval=1) # Generate every 1 second + | 'FixedWindow' >> beam.WindowInto(window.FixedWindows(1)) + | 'SumGlobally' >> beam.CombineGlobally(sum).without_defaults() + | 'ProcessWithSideInput' >> beam.FlatMap( + lambda element, side_data: [ + { + 'main_value': element, + 'side_input_key_a': list(side_data)[0].get('Key_A', 'N/A') + if side_data else 'No side input', + 'timestamp': time.strftime('%H:%M:%S') + } + ], + side_data=side_input) + | 'LogResults' >> beam.Map( + lambda result: logging.info( + f"Value is {result['main_value']} with timestamp {result['timestamp']}, " + f"using key A from side input with time {result['side_input_key_a']}.") or result)) + # [END SideInputPatternSlowUpdateGlobalWindowSnip1] + + return pipeline, main_result + + def bigqueryio_deadletter(): # [START BigQueryIODeadLetter] diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index d7dd5e6af191..4f25bbd7450f 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -1469,6 +1469,20 @@ def test_side_input_slow_update(self): os.unlink(src_file_pattern + str(first_ts + interval * i)) +class SideInputGlobalWindowTest(unittest.TestCase): + """Tests for side input pattern with global windows.""" + def test_side_input_slow_update_global_window(self): + """Test the global window side input pattern with mock data.""" + # This test validates that the pattern can be constructed without external dependencies + try: + pipeline, result = snippets.side_input_slow_update_global_window() + self.assertIsNotNone(pipeline) + self.assertIsNotNone(result) + except ImportError as e: + # Skip test if PeriodicSequence is not available in this environment + self.skipTest(f"PeriodicSequence not available: {e}") + + class ValueProviderInfoTest(unittest.TestCase): """Tests for accessing value provider info after run.""" def test_accessing_valueprovider_info_after_run(self): diff --git a/website/www/site/content/en/documentation/patterns/side-inputs.md b/website/www/site/content/en/documentation/patterns/side-inputs.md index 136eeef29ada..675503a32baf 100644 --- a/website/www/site/content/en/documentation/patterns/side-inputs.md +++ b/website/www/site/content/en/documentation/patterns/side-inputs.md @@ -48,7 +48,7 @@ For instance, the following code sample uses a `Map` to create a `DoFn`. The `Ma {{< /highlight >}} {{< highlight py >}} -No sample present. +{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" SideInputPatternSlowUpdateGlobalWindowSnip1 >}} {{< /highlight >}} From ac7dbbc21bce0e821cfa86388da38fef5e6e30b1 Mon Sep 17 00:00:00 2001 From: Arunodoy18 Date: Thu, 27 Nov 2025 11:37:35 +0530 Subject: [PATCH 5/6] Improve SQL transform documentation for calcite_connection_properties in YAML - Add comprehensive SQL transform examples showing calcite_connection_properties usage - Create dedicated sql/ directory under yaml/examples/transforms with 5 examples: - sql_basic_example.yaml: Basic SQL without special configuration - sql_postgresql_functions.yaml: PostgreSQL functions like SPLIT_PART - sql_bigquery_functions.yaml: BigQuery syntax and functions - sql_mysql_functions.yaml: MySQL date/string functions - sql_advanced_configuration.yaml: Multiple configuration options - Add detailed README.md explaining calcite_connection_properties options - Update yaml/tests/sql.yaml with calcite_connection_properties test cases - Update examples/README.md to reference new SQL documentation This addresses the issue where calcite_connection_properties configuration was 'tricky to get right' by providing clear examples and documentation for different SQL dialects and use cases. Fixes: SQL options in YAML/xlang pipelines need better documentation --- .../apache_beam/yaml/examples/README.md | 12 ++ .../yaml/examples/transforms/sql/README.md | 104 ++++++++++++++++++ .../sql/sql_advanced_configuration.yaml | 87 +++++++++++++++ .../transforms/sql/sql_basic_example.yaml | 50 +++++++++ .../sql/sql_bigquery_functions.yaml | 65 +++++++++++ .../transforms/sql/sql_mysql_functions.yaml | 66 +++++++++++ .../sql/sql_postgresql_functions.yaml | 60 ++++++++++ sdks/python/apache_beam/yaml/tests/sql.yaml | 96 ++++++++++++++++ 8 files changed, 540 insertions(+) create mode 100644 sdks/python/apache_beam/yaml/examples/transforms/sql/README.md create mode 100644 sdks/python/apache_beam/yaml/examples/transforms/sql/sql_advanced_configuration.yaml create mode 100644 sdks/python/apache_beam/yaml/examples/transforms/sql/sql_basic_example.yaml create mode 100644 sdks/python/apache_beam/yaml/examples/transforms/sql/sql_bigquery_functions.yaml create mode 100644 sdks/python/apache_beam/yaml/examples/transforms/sql/sql_mysql_functions.yaml create mode 100644 sdks/python/apache_beam/yaml/examples/transforms/sql/sql_postgresql_functions.yaml diff --git a/sdks/python/apache_beam/yaml/examples/README.md b/sdks/python/apache_beam/yaml/examples/README.md index 55fd19bd8c40..2218dd686159 100644 --- a/sdks/python/apache_beam/yaml/examples/README.md +++ b/sdks/python/apache_beam/yaml/examples/README.md @@ -100,6 +100,18 @@ These examples leverage the built-in mapping transforms including `MapToFields`, `Filter` and `Explode`. More information can be found about mapping transforms [here](https://beam.apache.org/documentation/sdks/yaml-udf/). +### SQL + +Examples that demonstrate SQL transforms with various database dialect configurations: + +- [Basic SQL Transform](transforms/sql/sql_basic_example.yaml) - Simple SQL queries without special configuration +- [PostgreSQL Functions](transforms/sql/sql_postgresql_functions.yaml) - Using PostgreSQL-specific functions like SPLIT_PART +- [BigQuery Functions](transforms/sql/sql_bigquery_functions.yaml) - BigQuery syntax and functions with proper calcite_connection_properties +- [MySQL Functions](transforms/sql/sql_mysql_functions.yaml) - MySQL-specific date and string functions +- [Advanced Configuration](transforms/sql/sql_advanced_configuration.yaml) - Multiple calcite_connection_properties options + +These examples show how to use the `calcite_connection_properties` pipeline option to configure SQL transforms for different database dialects and enable dialect-specific functions and syntax. + ### IO #### Spanner diff --git a/sdks/python/apache_beam/yaml/examples/transforms/sql/README.md b/sdks/python/apache_beam/yaml/examples/transforms/sql/README.md new file mode 100644 index 000000000000..265f3e46ab62 --- /dev/null +++ b/sdks/python/apache_beam/yaml/examples/transforms/sql/README.md @@ -0,0 +1,104 @@ +# SQL Transform calcite_connection_properties Configuration Guide + +This directory contains examples demonstrating how to use `calcite_connection_properties` in Beam YAML pipelines to configure SQL transforms for different database dialects and use cases. + +## Overview + +The `calcite_connection_properties` option in pipeline options allows you to configure Apache Calcite's SQL parser and function library to support database-specific SQL syntax and functions. This is particularly useful when you need to use SQL functions or syntax that are specific to certain databases like PostgreSQL, BigQuery, MySQL, or Oracle. + +## Configuration Options + +The most commonly used `calcite_connection_properties` include: + +### Function Libraries (`fun`) +- `"standard"` - Standard SQL functions (default) +- `"postgresql"` - PostgreSQL-specific functions (e.g., SPLIT_PART, STRING_AGG) +- `"bigquery"` - BigQuery-specific functions (e.g., FORMAT_TIMESTAMP, ARRAY_TO_STRING) +- `"mysql"` - MySQL-specific functions (e.g., DATEDIFF, SUBSTRING_INDEX) +- `"oracle"` - Oracle-specific functions (e.g., NVL, SUBSTR) + +### Lexical Rules (`lex`) +- `"standard"` - Standard SQL lexical rules (default) +- `"big_query"` - BigQuery lexical rules and syntax +- `"mysql"` - MySQL lexical rules +- `"oracle"` - Oracle lexical rules + +### Other Properties +- `conformance` - SQL conformance level ("LENIENT", "STRICT", etc.) +- `caseSensitive` - Whether identifiers are case sensitive ("true"/"false") +- `quotedCasing` - How to handle quoted identifiers ("UNCHANGED", "TO_UPPER", "TO_LOWER") +- `unquotedCasing` - How to handle unquoted identifiers + +## Usage Patterns + +### Basic Configuration +```yaml +options: + calcite_connection_properties: + fun: "postgresql" +``` + +### Advanced Configuration +```yaml +options: + calcite_connection_properties: + fun: "bigquery" + lex: "big_query" + conformance: "LENIENT" + caseSensitive: "false" +``` + +## Examples in this Directory + +1. **sql_basic_example.yaml** - Basic SQL transform without special configuration +2. **sql_postgresql_functions.yaml** - Using PostgreSQL functions like SPLIT_PART +3. **sql_bigquery_functions.yaml** - BigQuery syntax and functions +4. **sql_mysql_functions.yaml** - MySQL-specific date and string functions +5. **sql_advanced_configuration.yaml** - Multiple configuration options + +## Common Use Cases + +### PostgreSQL Functions +Useful for string manipulation and array operations: +```yaml +options: + calcite_connection_properties: + fun: "postgresql" +``` + +### BigQuery Compatibility +For BigQuery-style syntax and functions: +```yaml +options: + calcite_connection_properties: + fun: "bigquery" + lex: "big_query" +``` + +### Lenient SQL Parsing +For more flexible SQL parsing: +```yaml +options: + calcite_connection_properties: + conformance: "LENIENT" +``` + +## Important Notes + +- These properties affect only the SQL parsing and function availability, not the actual data processing semantics +- Some database-specific functions may not be available depending on the Calcite version used +- Always test your SQL queries with the intended configuration before deploying to production +- The `calcite_connection_properties` must be specified in the pipeline `options` section, not in individual transform configurations + +## Troubleshooting + +If you encounter SQL parsing errors: + +1. Check that the function you're using is supported by the specified function library +2. Verify that the lexical rules (`lex`) match your SQL syntax style +3. Try using `conformance: "LENIENT"` for more flexible parsing +4. Refer to the Apache Calcite documentation for supported functions in each dialect + +For more information about Beam SQL and supported functions, see: +- [Beam SQL Documentation](https://beam.apache.org/documentation/dsls/sql/overview/) +- [Apache Calcite SQL Reference](https://calcite.apache.org/docs/reference.html) \ No newline at end of file diff --git a/sdks/python/apache_beam/yaml/examples/transforms/sql/sql_advanced_configuration.yaml b/sdks/python/apache_beam/yaml/examples/transforms/sql/sql_advanced_configuration.yaml new file mode 100644 index 000000000000..85e6348696a0 --- /dev/null +++ b/sdks/python/apache_beam/yaml/examples/transforms/sql/sql_advanced_configuration.yaml @@ -0,0 +1,87 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Advanced SQL Transform Configuration Examples +# This example demonstrates multiple calcite_connection_properties and their effects. + +pipeline: + transforms: + - type: Create + name: CreateComplexData + config: + elements: + - {id: 1, name: "Product A", price: 29.99, tags: ["electronics", "gadget"], metadata: '{"brand": "TechCorp", "warranty": 12}'} + - {id: 2, name: "Product B", price: 15.50, tags: ["books", "fiction"], metadata: '{"author": "John Doe", "pages": 320}'} + - {id: 3, name: "Product C", price: 199.99, tags: ["electronics", "computer"], metadata: '{"brand": "CompuTech", "warranty": 24}'} + + # Example 1: Standard SQL with strict conformance + - type: Sql + name: StandardSQL + input: CreateComplexData + config: + query: | + SELECT + id, + name, + price, + CASE + WHEN price < 20 THEN 'Budget' + WHEN price < 100 THEN 'Mid-range' + ELSE 'Premium' + END as price_category + FROM PCOLLECTION + WHERE price > 10 + ORDER BY price + + # Example 2: Using Oracle-style functions + - type: Sql + name: OracleStyleSQL + input: CreateComplexData + config: + query: | + SELECT + id, + name, + price, + -- Oracle-style string functions + SUBSTR(name, 1, 10) as short_name, + LENGTH(name) as name_length, + NVL(name, 'Unknown') as safe_name + FROM PCOLLECTION + + - type: LogForTesting + input: StandardSQL + + - type: LogForTesting + input: OracleStyleSQL + +# Multiple calcite_connection_properties can be configured: +# - conformance: Controls SQL conformance level (LENIENT, STRICT, etc.) +# - caseSensitive: Whether identifiers are case sensitive +# - quotedCasing: How to handle quoted identifiers (UNCHANGED, TO_UPPER, TO_LOWER) +# - unquotedCasing: How to handle unquoted identifiers +# - fun: SQL function library (standard, oracle, mysql, postgresql, bigquery, etc.) +# - lex: Lexical analysis rules (standard, oracle, mysql, big_query, etc.) +options: + calcite_connection_properties: + conformance: "LENIENT" + fun: "oracle" + lex: "oracle" + caseSensitive: "false" + quotedCasing: "UNCHANGED" + unquotedCasing: "TO_UPPER" + streaming: false \ No newline at end of file diff --git a/sdks/python/apache_beam/yaml/examples/transforms/sql/sql_basic_example.yaml b/sdks/python/apache_beam/yaml/examples/transforms/sql/sql_basic_example.yaml new file mode 100644 index 000000000000..3fb5f5acafa3 --- /dev/null +++ b/sdks/python/apache_beam/yaml/examples/transforms/sql/sql_basic_example.yaml @@ -0,0 +1,50 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Basic SQL Transform Example +# This example demonstrates basic SQL transform usage with default Calcite configuration. + +pipeline: + transforms: + - type: Create + name: CreateData + config: + elements: + - {id: 1, name: "Alice", age: 30, city: "Seattle"} + - {id: 2, name: "Bob", age: 25, city: "Portland"} + - {id: 3, name: "Charlie", age: 35, city: "San Francisco"} + - {id: 4, name: "Diana", age: 28, city: "Seattle"} + + - type: Sql + name: FilterAndGroup + input: CreateData + config: + query: | + SELECT + city, + COUNT(*) as person_count, + AVG(age) as avg_age + FROM PCOLLECTION + WHERE age >= 25 + GROUP BY city + ORDER BY city + + - type: LogForTesting + input: FilterAndGroup + +options: + streaming: false \ No newline at end of file diff --git a/sdks/python/apache_beam/yaml/examples/transforms/sql/sql_bigquery_functions.yaml b/sdks/python/apache_beam/yaml/examples/transforms/sql/sql_bigquery_functions.yaml new file mode 100644 index 000000000000..bab20413af77 --- /dev/null +++ b/sdks/python/apache_beam/yaml/examples/transforms/sql/sql_bigquery_functions.yaml @@ -0,0 +1,65 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# SQL Transform with BigQuery Functions and Syntax +# This example demonstrates using BigQuery-specific SQL syntax and functions. +# The calcite_connection_properties enable BigQuery function library and lexical rules. + +pipeline: + transforms: + - type: Create + name: CreateSalesData + config: + elements: + - {transaction_id: "txn_001", customer_id: 101, amount: 250.75, timestamp: "2024-01-15T10:30:00Z", product_categories: ["electronics", "accessories"]} + - {transaction_id: "txn_002", customer_id: 102, amount: 89.99, timestamp: "2024-01-15T11:45:00Z", product_categories: ["books", "education"]} + - {transaction_id: "txn_003", customer_id: 103, amount: 1250.00, timestamp: "2024-01-15T14:20:00Z", product_categories: ["electronics", "computers"]} + - {transaction_id: "txn_004", customer_id: 101, amount: 45.50, timestamp: "2024-01-16T09:15:00Z", product_categories: ["food", "groceries"]} + + - type: Sql + name: AnalyzeSalesData + input: CreateSalesData + config: + query: | + SELECT + customer_id, + COUNT(*) as transaction_count, + SUM(amount) as total_spent, + AVG(amount) as avg_transaction_amount, + -- BigQuery-style date/time functions + FORMAT_TIMESTAMP('%Y-%m-%d', PARSE_TIMESTAMP('%Y-%m-%dT%H:%M:%SZ', timestamp)) as transaction_date, + -- BigQuery array functions (when available) + ARRAY_TO_STRING(product_categories, ', ') as categories_str, + -- Conditional aggregation using BigQuery syntax + COUNTIF(amount > 100) as high_value_transactions, + -- BigQuery mathematical functions + ROUND(amount, 2) as rounded_amount + FROM PCOLLECTION + GROUP BY customer_id, transaction_date, categories_str, rounded_amount + ORDER BY customer_id, total_spent DESC + + - type: LogForTesting + input: AnalyzeSalesData + +# Configure Calcite to use BigQuery function library and syntax +# 'fun': 'bigquery' enables BigQuery-specific functions +# 'lex': 'big_query' enables BigQuery lexical rules and syntax +options: + calcite_connection_properties: + fun: "bigquery" + lex: "big_query" + streaming: false \ No newline at end of file diff --git a/sdks/python/apache_beam/yaml/examples/transforms/sql/sql_mysql_functions.yaml b/sdks/python/apache_beam/yaml/examples/transforms/sql/sql_mysql_functions.yaml new file mode 100644 index 000000000000..523ea41b743c --- /dev/null +++ b/sdks/python/apache_beam/yaml/examples/transforms/sql/sql_mysql_functions.yaml @@ -0,0 +1,66 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# SQL Transform with MySQL Functions and Syntax +# This example shows how to configure calcite_connection_properties for MySQL-specific SQL features. + +pipeline: + transforms: + - type: Create + name: CreateUserData + config: + elements: + - {user_id: 1, username: "alice_123", registration_date: "2024-01-15", last_login: "2024-01-20 14:30:15", profile_data: '{"age": 30, "city": "Seattle"}'} + - {user_id: 2, username: "bob.jones", registration_date: "2024-01-10", last_login: "2024-01-19 09:45:22", profile_data: '{"age": 25, "city": "Portland"}'} + - {user_id: 3, username: "charlie_brown", registration_date: "2024-01-05", last_login: "2024-01-21 16:20:33", profile_data: '{"age": 35, "city": "San Francisco"}'} + + - type: Sql + name: ProcessUserData + input: CreateUserData + config: + query: | + SELECT + user_id, + username, + registration_date, + last_login, + -- MySQL date/time functions + DATEDIFF(last_login, registration_date) as days_since_registration, + DATE_FORMAT(last_login, '%Y-%m-%d') as login_date, + DATE_FORMAT(last_login, '%H:%i:%s') as login_time, + -- MySQL string functions + SUBSTRING_INDEX(username, '_', 1) as username_prefix, + CASE + WHEN LOCATE('_', username) > 0 THEN 'has_underscore' + WHEN LOCATE('.', username) > 0 THEN 'has_dot' + ELSE 'alphanumeric_only' + END as username_format, + -- MySQL conditional functions + IF(DATEDIFF(CURRENT_DATE, registration_date) < 7, 'new_user', 'existing_user') as user_type + FROM PCOLLECTION + ORDER BY days_since_registration DESC + + - type: LogForTesting + input: ProcessUserData + +# Configure Calcite for MySQL SQL dialect +# This enables MySQL-specific functions like DATEDIFF, SUBSTRING_INDEX, LOCATE, etc. +options: + calcite_connection_properties: + fun: "mysql" + lex: "mysql" + streaming: false \ No newline at end of file diff --git a/sdks/python/apache_beam/yaml/examples/transforms/sql/sql_postgresql_functions.yaml b/sdks/python/apache_beam/yaml/examples/transforms/sql/sql_postgresql_functions.yaml new file mode 100644 index 000000000000..7c9dadbeb91a --- /dev/null +++ b/sdks/python/apache_beam/yaml/examples/transforms/sql/sql_postgresql_functions.yaml @@ -0,0 +1,60 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# SQL Transform with PostgreSQL Functions +# This example shows how to use calcite_connection_properties to enable PostgreSQL-specific SQL functions. +# The 'fun' property tells Calcite to use PostgreSQL function library, enabling functions like SPLIT_PART. + +pipeline: + transforms: + - type: Create + name: CreateEmailData + config: + elements: + - {id: 1, email: "alice@example.com", full_name: "Alice Smith"} + - {id: 2, email: "bob.jones@company.org", full_name: "Bob Jones"} + - {id: 3, email: "charlie_brown@test.net", full_name: "Charlie Brown"} + - {id: 4, email: "diana.wilson@work.co", full_name: "Diana Wilson"} + + - type: Sql + name: ExtractEmailParts + input: CreateEmailData + config: + query: | + SELECT + id, + full_name, + email, + SPLIT_PART(email, '@', 1) as username, + SPLIT_PART(email, '@', 2) as domain, + CASE + WHEN SPLIT_PART(email, '@', 2) LIKE '%.com' THEN 'Commercial' + WHEN SPLIT_PART(email, '@', 2) LIKE '%.org' THEN 'Organization' + ELSE 'Other' + END as domain_type + FROM PCOLLECTION + ORDER BY domain + + - type: LogForTesting + input: ExtractEmailParts + +# Pipeline options demonstrate how to configure calcite_connection_properties +# The 'fun' property enables PostgreSQL function library in Calcite SQL parser +options: + calcite_connection_properties: + fun: "postgresql" + streaming: false \ No newline at end of file diff --git a/sdks/python/apache_beam/yaml/tests/sql.yaml b/sdks/python/apache_beam/yaml/tests/sql.yaml index 0040a2790c54..a598fffc4181 100644 --- a/sdks/python/apache_beam/yaml/tests/sql.yaml +++ b/sdks/python/apache_beam/yaml/tests/sql.yaml @@ -93,3 +93,99 @@ pipelines: - type: PyTransform config: constructor: apache_beam.transforms.util.LogElements + + # Test calcite_connection_properties with PostgreSQL functions + - pipeline: + type: chain + transforms: + - type: Create + name: CreateEmailData + config: + elements: + - {email: "alice@example.com", id: 1} + - {email: "bob@test.org", id: 2} + + - type: Sql + name: PostgreSQLFunctions + config: + query: | + SELECT + id, + email, + SPLIT_PART(email, '@', 1) as username, + SPLIT_PART(email, '@', 2) as domain + FROM PCOLLECTION + + - type: AssertEqual + config: + elements: + - {id: 1, email: "alice@example.com", username: "alice", domain: "example.com"} + - {id: 2, email: "bob@test.org", username: "bob", domain: "test.org"} + + options: + calcite_connection_properties: + fun: "postgresql" + + # Test calcite_connection_properties with BigQuery syntax + - pipeline: + type: chain + transforms: + - type: Create + name: CreateArrayData + config: + elements: + - {id: 1, tags: ["tag1", "tag2", "tag3"]} + - {id: 2, tags: ["tagA", "tagB"]} + + - type: Sql + name: BigQueryArrayFunctions + config: + query: | + SELECT + id, + ARRAY_TO_STRING(tags, '|') as tags_string, + ARRAY_LENGTH(tags) as tag_count + FROM PCOLLECTION + + - type: AssertEqual + config: + elements: + - {id: 1, tags_string: "tag1|tag2|tag3", tag_count: 3} + - {id: 2, tags_string: "tagA|tagB", tag_count: 2} + + options: + calcite_connection_properties: + fun: "bigquery" + lex: "big_query" + + # Test calcite_connection_properties with case sensitivity settings + - pipeline: + type: chain + transforms: + - type: Create + name: CreateCaseTestData + config: + elements: + - {ID: 1, Name: "Alice", Email: "alice@test.com"} + - {ID: 2, Name: "Bob", Email: "bob@test.com"} + + - type: Sql + name: CaseSensitiveSQL + config: + query: | + SELECT + id as lower_id, + name as lower_name, + email as lower_email + FROM PCOLLECTION + + - type: AssertEqual + config: + elements: + - {lower_id: 1, lower_name: "Alice", lower_email: "alice@test.com"} + - {lower_id: 2, lower_name: "Bob", lower_email: "bob@test.com"} + + options: + calcite_connection_properties: + caseSensitive: "false" + unquotedCasing: "TO_LOWER" From a7ec0c41b142561dbe8f5c4bfdde6594be3c180c Mon Sep 17 00:00:00 2001 From: Arunodoy18 Date: Sat, 29 Nov 2025 13:10:35 +0530 Subject: [PATCH 6/6] Fix YAML structure for calcite_connection_properties test cases - Move options block to correct transform-level location - Fix indentation issues that caused precommit validation failures --- sdks/python/apache_beam/yaml/tests/sql.yaml | 31 +++++++++------------ 1 file changed, 13 insertions(+), 18 deletions(-) diff --git a/sdks/python/apache_beam/yaml/tests/sql.yaml b/sdks/python/apache_beam/yaml/tests/sql.yaml index a598fffc4181..f1dae0fa4db5 100644 --- a/sdks/python/apache_beam/yaml/tests/sql.yaml +++ b/sdks/python/apache_beam/yaml/tests/sql.yaml @@ -107,6 +107,9 @@ pipelines: - type: Sql name: PostgreSQLFunctions + options: + calcite_connection_properties: + fun: "postgresql" config: query: | SELECT @@ -120,13 +123,7 @@ pipelines: config: elements: - {id: 1, email: "alice@example.com", username: "alice", domain: "example.com"} - - {id: 2, email: "bob@test.org", username: "bob", domain: "test.org"} - - options: - calcite_connection_properties: - fun: "postgresql" - - # Test calcite_connection_properties with BigQuery syntax + - {id: 2, email: "bob@test.org", username: "bob", domain: "test.org"} # Test calcite_connection_properties with BigQuery syntax - pipeline: type: chain transforms: @@ -139,6 +136,10 @@ pipelines: - type: Sql name: BigQueryArrayFunctions + options: + calcite_connection_properties: + fun: "bigquery" + lex: "big_query" config: query: | SELECT @@ -152,11 +153,6 @@ pipelines: elements: - {id: 1, tags_string: "tag1|tag2|tag3", tag_count: 3} - {id: 2, tags_string: "tagA|tagB", tag_count: 2} - - options: - calcite_connection_properties: - fun: "bigquery" - lex: "big_query" # Test calcite_connection_properties with case sensitivity settings - pipeline: @@ -171,6 +167,10 @@ pipelines: - type: Sql name: CaseSensitiveSQL + options: + calcite_connection_properties: + caseSensitive: "false" + unquotedCasing: "TO_LOWER" config: query: | SELECT @@ -183,9 +183,4 @@ pipelines: config: elements: - {lower_id: 1, lower_name: "Alice", lower_email: "alice@test.com"} - - {lower_id: 2, lower_name: "Bob", lower_email: "bob@test.com"} - - options: - calcite_connection_properties: - caseSensitive: "false" - unquotedCasing: "TO_LOWER" + - {lower_id: 2, lower_name: "Bob", lower_email: "bob@test.com"} \ No newline at end of file