From 385217f145d79e8bfc63fe04bb4c48782577ea89 Mon Sep 17 00:00:00 2001 From: Arunodoy18 Date: Thu, 20 Nov 2025 15:20:14 +0530 Subject: [PATCH 1/3] BEAM-13231: Add glob wildcard escaping utilities to FileSystems This commit adds support for escaping and unescaping glob wildcard characters in file path specifications, addressing the issue where files with literal glob metacharacters (*, ?, {, }) in their names cannot be matched. Changes: - Added escapeGlobWildcards(String spec) method to escape glob metacharacters by prefixing them with backslash - Added unescapeGlobWildcards(String spec) method to remove backslash prefixes from escaped glob characters - Added comprehensive test cases for both methods including round-trip testing These utilities provide the foundation for allowing users to treat glob metacharacters as literals when they appear in actual filenames. Fixes BEAM-13231 --- .../org/apache/beam/sdk/io/FileSystems.java | 36 +++++++++++++ .../apache/beam/sdk/io/FileSystemsTest.java | 50 +++++++++++++++++++ 2 files changed, 86 insertions(+) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java index 7e2940a2c35b..2edc5c7ba45b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java @@ -78,6 +78,8 @@ public class FileSystems { private static final Pattern FILE_SCHEME_PATTERN = Pattern.compile("(?[a-zA-Z][-a-zA-Z0-9+.]*):/.*"); private static final Pattern GLOB_PATTERN = Pattern.compile("[*?{}]"); + private static final Pattern ESCAPED_GLOB_PATTERN = Pattern.compile("\\\\[*?{}]"); + private static final String GLOB_ESCAPE_PREFIX = "\\"; private static final AtomicReference> FILESYSTEM_REVISION = new AtomicReference<>(); @@ -92,6 +94,40 @@ public static boolean hasGlobWildcard(String spec) { return GLOB_PATTERN.matcher(spec).find(); } + /** + * Escapes glob wildcard characters in the given spec so they are treated as literals. + * + *

This method escapes the characters '*', '?', '{', and '}' by prefixing them with a + * backslash, allowing them to be treated as literal characters in a file path rather than as + * glob wildcards. + * + *

Example: {@code escapeGlobWildcards("file*.txt")} returns {@code "file\\*.txt"} + * + * @param spec the file path specification to escape + * @return the escaped specification + */ + public static String escapeGlobWildcards(String spec) { + checkNotNull(spec, "spec cannot be null"); + return spec.replaceAll("([*?{}])", "\\\\$1"); + } + + /** + * Unescapes glob wildcard characters in the given spec that were previously escaped with {@link + * #escapeGlobWildcards(String)}. + * + *

This method removes the backslash prefix from escaped glob characters ('*', '?', '{', '}'), + * restoring them to their unescaped form. + * + *

Example: {@code unescapeGlobWildcards("file\\*.txt")} returns {@code "file*.txt"} + * + * @param spec the file path specification to unescape + * @return the unescaped specification + */ + public static String unescapeGlobWildcards(String spec) { + checkNotNull(spec, "spec cannot be null"); + return spec.replaceAll("\\\\([*?{}])", "$1"); + } + /** * This is the entry point to convert user-provided specs to {@link ResourceId ResourceIds}. * Callers should use {@link #match} to resolve users specs ambiguities before calling other diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java index 34567309c7d0..a513c1b1f462 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java @@ -354,6 +354,56 @@ private void createFileWithContent(Path path, String content) throws Exception { } } + @Test + public void testEscapeGlobWildcards() { + // Test escaping asterisk + assertEquals("file\\*.txt", FileSystems.escapeGlobWildcards("file*.txt")); + + // Test escaping question mark + assertEquals("file\\?.txt", FileSystems.escapeGlobWildcards("file?.txt")); + + // Test escaping braces + assertEquals("file\\{1,2\\}.txt", FileSystems.escapeGlobWildcards("file{1,2}.txt")); + + // Test escaping multiple characters + assertEquals("\\*\\?\\{\\}.txt", FileSystems.escapeGlobWildcards("*?{}.txt")); + + // Test string with no glob characters + assertEquals("file.txt", FileSystems.escapeGlobWildcards("file.txt")); + + // Test empty string + assertEquals("", FileSystems.escapeGlobWildcards("")); + } + + @Test + public void testUnescapeGlobWildcards() { + // Test unescaping asterisk + assertEquals("file*.txt", FileSystems.unescapeGlobWildcards("file\\*.txt")); + + // Test unescaping question mark + assertEquals("file?.txt", FileSystems.unescapeGlobWildcards("file\\?.txt")); + + // Test unescaping braces + assertEquals("file{1,2}.txt", FileSystems.unescapeGlobWildcards("file\\{1,2\\}.txt")); + + // Test unescaping multiple characters + assertEquals("*?{}.txt", FileSystems.unescapeGlobWildcards("\\*\\?\\{\\}.txt")); + + // Test string with no escaped characters + assertEquals("file.txt", FileSystems.unescapeGlobWildcards("file.txt")); + + // Test empty string + assertEquals("", FileSystems.unescapeGlobWildcards("")); + } + + @Test + public void testEscapeUnescapeRoundTrip() { + String original = "file*test?.txt"; + String escaped = FileSystems.escapeGlobWildcards(original); + String unescaped = FileSystems.unescapeGlobWildcards(escaped); + assertEquals(original, unescaped); + } + private LocalResourceId toLocalResourceId(String str) throws Exception { boolean isDirectory; if (SystemUtils.IS_OS_WINDOWS) { From bf37c0f44a4b710577e16653b33ed3d488ff1948 Mon Sep 17 00:00:00 2001 From: Arunodoy18 Date: Sat, 22 Nov 2025 21:04:00 +0530 Subject: [PATCH 2/3] [BEAM-31422] Fix IllegalStateException in STORAGE_WRITE_API with CDC When CDC (Change Data Capture) is enabled with STORAGE_WRITE_API method, the system was incorrectly using PENDING streams instead of the default stream, causing an IllegalStateException due to checkState validation. Changes: - Fixed StorageApiWriteUnshardedRecords to use default stream when CDC is enabled - Added comprehensive test case to prevent regression CDC requires default streams because PENDING streams don't support the RowMutationInformation functionality needed for upserts and deletes. Fixes #31422 --- .../StorageApiWriteUnshardedRecords.java | 2 +- .../io/gcp/bigquery/BigQueryIOWriteTest.java | 45 +++++++++++++++++++ 2 files changed, 46 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index f6d10b47ccf2..14f93a95fa6d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -219,7 +219,7 @@ public PCollectionTuple expand(PCollection write = + BigQueryIO.write() + .to("project-id:dataset-id.table-id") + .withSchema(schema) + .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API) + .withRowMutationInformationFn( + (Row row) -> { + return RowMutationInformation.of( + RowMutationInformation.MutationType.UPSERT, row.getValue("id").toString()); + }) + .withTestServices(fakeBqServices) + .withoutValidation(); + + // Create test data with CDC-style updates + Schema beamSchema = Schema.builder().addInt32Field("id").addStringField("name").build(); + + List testData = + ImmutableList.of( + Row.withSchema(beamSchema).addValues(1, "Alice").build(), + Row.withSchema(beamSchema).addValues(2, "Bob").build(), + Row.withSchema(beamSchema).addValues(1, "Alice Updated").build() // Update row with id=1 + ); + + // This should not throw an IllegalStateException + PCollection input = p.apply(Create.of(testData).withRowSchema(beamSchema)); + + WriteResult result = input.apply("WriteCdcToBQ", write); + + p.run(); // Should complete successfully without IllegalStateException + } } From 095049274f122c7561bcb0b0f9a7fc2a12d09abf Mon Sep 17 00:00:00 2001 From: Arunodoy18 Date: Sun, 23 Nov 2025 02:40:15 +0530 Subject: [PATCH 3/3] Fix triggeringFrequency documentation and validation for STORAGE_API_AT_LEAST_ONCE - Updated JavaDoc to reflect that triggeringFrequency applies to FILE_LOADS, STORAGE_WRITE_API, and STORAGE_API_AT_LEAST_ONCE methods - Fixed validation logic in expand() to require triggeringFrequency for STORAGE_API_AT_LEAST_ONCE when writing unbounded PCollections - Removed conflicting warning that incorrectly stated STORAGE_API_AT_LEAST_ONCE ignores triggeringFrequency - Added comprehensive test cases to verify proper validation behavior: * Test failure when triggeringFrequency is missing for unbounded collections * Test success when triggeringFrequency is provided for unbounded collections * Test success for bounded collections without triggeringFrequency requirement - Updated error messages to include all three supported methods This ensures consistent behavior across all BigQuery write methods that support triggered writes for unbounded collections. --- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 24 +++---- .../io/gcp/bigquery/BigQueryIOWriteTest.java | 69 ++++++++++++++++++- 2 files changed, 79 insertions(+), 14 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 7aef1bd1ce02..7589270c3646 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -3330,8 +3330,9 @@ public Write withLoadJobProjectId(ValueProvider loadJobProjectId) { /** * Choose the frequency at which file writes are triggered. * - *

This is only applicable when the write method is set to {@link Method#FILE_LOADS} or - * {@link Method#STORAGE_WRITE_API}, and only when writing an unbounded {@link PCollection}. + *

This is only applicable when the write method is set to {@link Method#FILE_LOADS}, {@link + * Method#STORAGE_WRITE_API}, or {@link Method#STORAGE_API_AT_LEAST_ONCE}, and only when writing + * an unbounded {@link PCollection}. * *

Every triggeringFrequency duration, a BigQuery load job will be generated for all the data * written since the last load job. BigQuery has limits on how many load jobs can be triggered @@ -3736,19 +3737,22 @@ public WriteResult expand(PCollection input) { BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class); Write.Method method = resolveMethod(input); if (input.isBounded() == IsBounded.UNBOUNDED) { - if (method == Write.Method.FILE_LOADS || method == Write.Method.STORAGE_WRITE_API) { + if (method == Write.Method.FILE_LOADS + || method == Write.Method.STORAGE_WRITE_API + || method == Write.Method.STORAGE_API_AT_LEAST_ONCE) { Duration triggeringFrequency = - (method == Write.Method.STORAGE_WRITE_API) + (method == Write.Method.STORAGE_WRITE_API + || method == Write.Method.STORAGE_API_AT_LEAST_ONCE) ? getStorageApiTriggeringFrequency(bqOptions) : getTriggeringFrequency(); checkArgument( triggeringFrequency != null, - "When writing an unbounded PCollection via FILE_LOADS or STORAGE_WRITE_API, " + "When writing an unbounded PCollection via FILE_LOADS, STORAGE_WRITE_API, or STORAGE_API_AT_LEAST_ONCE, " + "triggering frequency must be specified"); } else { checkArgument( getTriggeringFrequency() == null, - "Triggering frequency can be specified only when writing via FILE_LOADS or STORAGE_WRITE_API, but the method was %s.", + "Triggering frequency can be specified only when writing via FILE_LOADS, STORAGE_WRITE_API, or STORAGE_API_AT_LEAST_ONCE, but the method was %s.", method); } if (method != Method.FILE_LOADS) { @@ -3757,13 +3761,7 @@ public WriteResult expand(PCollection input) { "Number of file shards can be specified only when writing via FILE_LOADS, but the method was %s.", method); } - if (method == Method.STORAGE_API_AT_LEAST_ONCE - && getStorageApiTriggeringFrequency(bqOptions) != null) { - LOG.warn( - "Storage API triggering frequency option will be ignored is it can only be specified only " - + "when writing via STORAGE_WRITE_API, but the method was {}.", - method); - } + if (getAutoSharding()) { if (method == Method.STORAGE_WRITE_API && getStorageApiNumStreams(bqOptions) > 0) { LOG.warn( diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index c23aff222d99..324f520ea4b2 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -2632,7 +2632,8 @@ public void testStreamingWriteValidateFailsWithoutTriggeringFrequency() { Method method = useStorageApi ? Method.STORAGE_WRITE_API : Method.FILE_LOADS; thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("unbounded PCollection via FILE_LOADS or STORAGE_WRITE_API"); + thrown.expectMessage( + "unbounded PCollection via FILE_LOADS, STORAGE_WRITE_API, or STORAGE_API_AT_LEAST_ONCE"); thrown.expectMessage("triggering frequency must be specified"); p.getOptions().as(BigQueryOptions.class).setStorageWriteApiTriggeringFrequencySec(null); @@ -2646,6 +2647,72 @@ public void testStreamingWriteValidateFailsWithoutTriggeringFrequency() { .withCreateDisposition(CreateDisposition.CREATE_NEVER)); } + @Test + public void testStreamingWriteValidateFailsWithoutTriggeringFrequencyForStorageApiAtLeastOnce() { + assumeTrue(useStreaming); + assumeTrue(useStorageApiApproximate); // Test STORAGE_API_AT_LEAST_ONCE specifically + p.enableAbandonedNodeEnforcement(false); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage( + "unbounded PCollection via FILE_LOADS, STORAGE_WRITE_API, or STORAGE_API_AT_LEAST_ONCE"); + thrown.expectMessage("triggering frequency must be specified"); + + p.getOptions().as(BigQueryOptions.class).setStorageWriteApiTriggeringFrequencySec(null); + p.apply(Create.empty(INPUT_RECORD_CODER)) + .setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED) + .apply( + BigQueryIO.write() + .withAvroFormatFunction(r -> new GenericData.Record(r.getSchema())) + .to("dataset.table") + .withMethod(Method.STORAGE_API_AT_LEAST_ONCE) + .withCreateDisposition(CreateDisposition.CREATE_NEVER)); + } + + @Test + public void testStreamingWriteValidateSucceedsWithTriggeringFrequencyForStorageApiAtLeastOnce() { + assumeTrue(useStreaming); + assumeTrue(useStorageApiApproximate); // Test STORAGE_API_AT_LEAST_ONCE specifically + p.enableAbandonedNodeEnforcement(false); + + // This should not throw - STORAGE_API_AT_LEAST_ONCE with triggering frequency should be valid + p.getOptions().as(BigQueryOptions.class).setStorageWriteApiTriggeringFrequencySec(30); + p.apply(Create.empty(INPUT_RECORD_CODER)) + .setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED) + .apply( + BigQueryIO.write() + .withAvroFormatFunction(r -> new GenericData.Record(r.getSchema())) + .to("dataset.table") + .withMethod(Method.STORAGE_API_AT_LEAST_ONCE) + .withCreateDisposition(CreateDisposition.CREATE_NEVER) + .withTestServices(fakeBqServices) + .withoutValidation()); + // Should validate without throwing + p.run(); + } + + @Test + public void testBoundedWriteValidateSucceedsWithoutTriggeringFrequencyForStorageApiAtLeastOnce() { + assumeTrue(!useStreaming); // Test bounded PCollection + assumeTrue(useStorageApiApproximate); // Test STORAGE_API_AT_LEAST_ONCE specifically + + // Bounded collections should not require triggering frequency even for + // STORAGE_API_AT_LEAST_ONCE + p.getOptions().as(BigQueryOptions.class).setStorageWriteApiTriggeringFrequencySec(null); + p.apply(Create.empty(INPUT_RECORD_CODER)) + .setIsBoundedInternal(PCollection.IsBounded.BOUNDED) + .apply( + BigQueryIO.write() + .withAvroFormatFunction(r -> new GenericData.Record(r.getSchema())) + .to("dataset.table") + .withMethod(Method.STORAGE_API_AT_LEAST_ONCE) + .withCreateDisposition(CreateDisposition.CREATE_NEVER) + .withTestServices(fakeBqServices) + .withoutValidation()); + // Should validate without throwing + p.run(); + } + @Test public void testBigQueryIOGetName() { assertEquals(