diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java index 7e2940a2c35b..2edc5c7ba45b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java @@ -78,6 +78,8 @@ public class FileSystems { private static final Pattern FILE_SCHEME_PATTERN = Pattern.compile("(?[a-zA-Z][-a-zA-Z0-9+.]*):/.*"); private static final Pattern GLOB_PATTERN = Pattern.compile("[*?{}]"); + private static final Pattern ESCAPED_GLOB_PATTERN = Pattern.compile("\\\\[*?{}]"); + private static final String GLOB_ESCAPE_PREFIX = "\\"; private static final AtomicReference> FILESYSTEM_REVISION = new AtomicReference<>(); @@ -92,6 +94,40 @@ public static boolean hasGlobWildcard(String spec) { return GLOB_PATTERN.matcher(spec).find(); } + /** + * Escapes glob wildcard characters in the given spec so they are treated as literals. + * + *

This method escapes the characters '*', '?', '{', and '}' by prefixing them with a + * backslash, allowing them to be treated as literal characters in a file path rather than as + * glob wildcards. + * + *

Example: {@code escapeGlobWildcards("file*.txt")} returns {@code "file\\*.txt"} + * + * @param spec the file path specification to escape + * @return the escaped specification + */ + public static String escapeGlobWildcards(String spec) { + checkNotNull(spec, "spec cannot be null"); + return spec.replaceAll("([*?{}])", "\\\\$1"); + } + + /** + * Unescapes glob wildcard characters in the given spec that were previously escaped with {@link + * #escapeGlobWildcards(String)}. + * + *

This method removes the backslash prefix from escaped glob characters ('*', '?', '{', '}'), + * restoring them to their unescaped form. + * + *

Example: {@code unescapeGlobWildcards("file\\*.txt")} returns {@code "file*.txt"} + * + * @param spec the file path specification to unescape + * @return the unescaped specification + */ + public static String unescapeGlobWildcards(String spec) { + checkNotNull(spec, "spec cannot be null"); + return spec.replaceAll("\\\\([*?{}])", "$1"); + } + /** * This is the entry point to convert user-provided specs to {@link ResourceId ResourceIds}. * Callers should use {@link #match} to resolve users specs ambiguities before calling other diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java index 34567309c7d0..a513c1b1f462 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileSystemsTest.java @@ -354,6 +354,56 @@ private void createFileWithContent(Path path, String content) throws Exception { } } + @Test + public void testEscapeGlobWildcards() { + // Test escaping asterisk + assertEquals("file\\*.txt", FileSystems.escapeGlobWildcards("file*.txt")); + + // Test escaping question mark + assertEquals("file\\?.txt", FileSystems.escapeGlobWildcards("file?.txt")); + + // Test escaping braces + assertEquals("file\\{1,2\\}.txt", FileSystems.escapeGlobWildcards("file{1,2}.txt")); + + // Test escaping multiple characters + assertEquals("\\*\\?\\{\\}.txt", FileSystems.escapeGlobWildcards("*?{}.txt")); + + // Test string with no glob characters + assertEquals("file.txt", FileSystems.escapeGlobWildcards("file.txt")); + + // Test empty string + assertEquals("", FileSystems.escapeGlobWildcards("")); + } + + @Test + public void testUnescapeGlobWildcards() { + // Test unescaping asterisk + assertEquals("file*.txt", FileSystems.unescapeGlobWildcards("file\\*.txt")); + + // Test unescaping question mark + assertEquals("file?.txt", FileSystems.unescapeGlobWildcards("file\\?.txt")); + + // Test unescaping braces + assertEquals("file{1,2}.txt", FileSystems.unescapeGlobWildcards("file\\{1,2\\}.txt")); + + // Test unescaping multiple characters + assertEquals("*?{}.txt", FileSystems.unescapeGlobWildcards("\\*\\?\\{\\}.txt")); + + // Test string with no escaped characters + assertEquals("file.txt", FileSystems.unescapeGlobWildcards("file.txt")); + + // Test empty string + assertEquals("", FileSystems.unescapeGlobWildcards("")); + } + + @Test + public void testEscapeUnescapeRoundTrip() { + String original = "file*test?.txt"; + String escaped = FileSystems.escapeGlobWildcards(original); + String unescaped = FileSystems.unescapeGlobWildcards(escaped); + assertEquals(original, unescaped); + } + private LocalResourceId toLocalResourceId(String str) throws Exception { boolean isDirectory; if (SystemUtils.IS_OS_WINDOWS) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 7aef1bd1ce02..7589270c3646 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -3330,8 +3330,9 @@ public Write withLoadJobProjectId(ValueProvider loadJobProjectId) { /** * Choose the frequency at which file writes are triggered. * - *

This is only applicable when the write method is set to {@link Method#FILE_LOADS} or - * {@link Method#STORAGE_WRITE_API}, and only when writing an unbounded {@link PCollection}. + *

This is only applicable when the write method is set to {@link Method#FILE_LOADS}, {@link + * Method#STORAGE_WRITE_API}, or {@link Method#STORAGE_API_AT_LEAST_ONCE}, and only when writing + * an unbounded {@link PCollection}. * *

Every triggeringFrequency duration, a BigQuery load job will be generated for all the data * written since the last load job. BigQuery has limits on how many load jobs can be triggered @@ -3736,19 +3737,22 @@ public WriteResult expand(PCollection input) { BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class); Write.Method method = resolveMethod(input); if (input.isBounded() == IsBounded.UNBOUNDED) { - if (method == Write.Method.FILE_LOADS || method == Write.Method.STORAGE_WRITE_API) { + if (method == Write.Method.FILE_LOADS + || method == Write.Method.STORAGE_WRITE_API + || method == Write.Method.STORAGE_API_AT_LEAST_ONCE) { Duration triggeringFrequency = - (method == Write.Method.STORAGE_WRITE_API) + (method == Write.Method.STORAGE_WRITE_API + || method == Write.Method.STORAGE_API_AT_LEAST_ONCE) ? getStorageApiTriggeringFrequency(bqOptions) : getTriggeringFrequency(); checkArgument( triggeringFrequency != null, - "When writing an unbounded PCollection via FILE_LOADS or STORAGE_WRITE_API, " + "When writing an unbounded PCollection via FILE_LOADS, STORAGE_WRITE_API, or STORAGE_API_AT_LEAST_ONCE, " + "triggering frequency must be specified"); } else { checkArgument( getTriggeringFrequency() == null, - "Triggering frequency can be specified only when writing via FILE_LOADS or STORAGE_WRITE_API, but the method was %s.", + "Triggering frequency can be specified only when writing via FILE_LOADS, STORAGE_WRITE_API, or STORAGE_API_AT_LEAST_ONCE, but the method was %s.", method); } if (method != Method.FILE_LOADS) { @@ -3757,13 +3761,7 @@ public WriteResult expand(PCollection input) { "Number of file shards can be specified only when writing via FILE_LOADS, but the method was %s.", method); } - if (method == Method.STORAGE_API_AT_LEAST_ONCE - && getStorageApiTriggeringFrequency(bqOptions) != null) { - LOG.warn( - "Storage API triggering frequency option will be ignored is it can only be specified only " - + "when writing via STORAGE_WRITE_API, but the method was {}.", - method); - } + if (getAutoSharding()) { if (method == Method.STORAGE_WRITE_API && getStorageApiNumStreams(bqOptions) > 0) { LOG.warn( diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index f6d10b47ccf2..14f93a95fa6d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -219,7 +219,7 @@ public PCollectionTuple expand(PCollectionwrite() + .withAvroFormatFunction(r -> new GenericData.Record(r.getSchema())) + .to("dataset.table") + .withMethod(Method.STORAGE_API_AT_LEAST_ONCE) + .withCreateDisposition(CreateDisposition.CREATE_NEVER)); + } + + @Test + public void testStreamingWriteValidateSucceedsWithTriggeringFrequencyForStorageApiAtLeastOnce() { + assumeTrue(useStreaming); + assumeTrue(useStorageApiApproximate); // Test STORAGE_API_AT_LEAST_ONCE specifically + p.enableAbandonedNodeEnforcement(false); + + // This should not throw - STORAGE_API_AT_LEAST_ONCE with triggering frequency should be valid + p.getOptions().as(BigQueryOptions.class).setStorageWriteApiTriggeringFrequencySec(30); + p.apply(Create.empty(INPUT_RECORD_CODER)) + .setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED) + .apply( + BigQueryIO.write() + .withAvroFormatFunction(r -> new GenericData.Record(r.getSchema())) + .to("dataset.table") + .withMethod(Method.STORAGE_API_AT_LEAST_ONCE) + .withCreateDisposition(CreateDisposition.CREATE_NEVER) + .withTestServices(fakeBqServices) + .withoutValidation()); + // Should validate without throwing + p.run(); + } + + @Test + public void testBoundedWriteValidateSucceedsWithoutTriggeringFrequencyForStorageApiAtLeastOnce() { + assumeTrue(!useStreaming); // Test bounded PCollection + assumeTrue(useStorageApiApproximate); // Test STORAGE_API_AT_LEAST_ONCE specifically + + // Bounded collections should not require triggering frequency even for + // STORAGE_API_AT_LEAST_ONCE + p.getOptions().as(BigQueryOptions.class).setStorageWriteApiTriggeringFrequencySec(null); + p.apply(Create.empty(INPUT_RECORD_CODER)) + .setIsBoundedInternal(PCollection.IsBounded.BOUNDED) + .apply( + BigQueryIO.write() + .withAvroFormatFunction(r -> new GenericData.Record(r.getSchema())) + .to("dataset.table") + .withMethod(Method.STORAGE_API_AT_LEAST_ONCE) + .withCreateDisposition(CreateDisposition.CREATE_NEVER) + .withTestServices(fakeBqServices) + .withoutValidation()); + // Should validate without throwing + p.run(); + } + @Test public void testBigQueryIOGetName() { assertEquals( @@ -4924,4 +4991,49 @@ public void testCustomGcsTempLocationNull() throws Exception { fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id"), containsInAnyOrder(new TableRow().set("name", "a"), new TableRow().set("name", "b"))); } + + @Test + public void testCdcWithStorageWriteApiDoesNotThrowIllegalStateException() throws Exception { + // Test for issue #31422: CDC with STORAGE_WRITE_API should not throw IllegalStateException + assumeTrue(useStorageApi); + assumeTrue(!useStorageApiApproximate); // Test STORAGE_WRITE_API specifically + + TableSchema schema = + new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("id").setType("INTEGER"), + new TableFieldSchema().setName("name").setType("STRING"))); + + // Create a write transform with CDC enabled using RowMutationInformation + BigQueryIO.Write write = + BigQueryIO.write() + .to("project-id:dataset-id.table-id") + .withSchema(schema) + .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API) + .withRowMutationInformationFn( + (Row row) -> { + return RowMutationInformation.of( + RowMutationInformation.MutationType.UPSERT, row.getValue("id").toString()); + }) + .withTestServices(fakeBqServices) + .withoutValidation(); + + // Create test data with CDC-style updates + Schema beamSchema = Schema.builder().addInt32Field("id").addStringField("name").build(); + + List testData = + ImmutableList.of( + Row.withSchema(beamSchema).addValues(1, "Alice").build(), + Row.withSchema(beamSchema).addValues(2, "Bob").build(), + Row.withSchema(beamSchema).addValues(1, "Alice Updated").build() // Update row with id=1 + ); + + // This should not throw an IllegalStateException + PCollection input = p.apply(Create.of(testData).withRowSchema(beamSchema)); + + WriteResult result = input.apply("WriteCdcToBQ", write); + + p.run(); // Should complete successfully without IllegalStateException + } }