From 79fd87b36b2bf1d1175d98f15b0b8128ff65caa2 Mon Sep 17 00:00:00 2001 From: Suvrat1629 Date: Fri, 10 Oct 2025 00:54:15 +0530 Subject: [PATCH 1/2] Fix CassandraIO ReadFn to quote column names for reserved keywords --- .../apache/beam/sdk/io/cassandra/ReadFn.java | 10 + .../sdk/io/cassandra/CassandraIOTest.java | 362 ++++++++++++++++++ 2 files changed, 372 insertions(+) diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ReadFn.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ReadFn.java index 678c72d42ff2..8f16e729bc86 100644 --- a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ReadFn.java +++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ReadFn.java @@ -50,6 +50,7 @@ public void processElement(@Element Read read, OutputReceiver receiver) th session.getCluster().getMetadata().getKeyspace(read.keyspace().get()) .getTable(read.table().get()).getPartitionKey().stream() .map(ColumnMetadata::getName) + .map(ReadFn::quoteIdentifier) .collect(Collectors.joining(",")); String query = generateRangeQuery(read, partitionKey, read.ringRanges() != null); @@ -148,4 +149,13 @@ private static String buildInitialQuery(Read spec, Boolean hasRingRange) { private static String getJoinerClause(String queryString) { return queryString.toUpperCase().contains("WHERE") ? " AND " : " WHERE "; } + + static String quoteIdentifier(String identifier) { + if (identifier == null) { + return null; + } + // Escape any existing double quotes by doubling them + String escaped = identifier.replace("\"", "\"\""); + return "\"" + escaped + "\""; + } } diff --git a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java index 747f803ea46b..b040af994976 100644 --- a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java +++ b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java @@ -844,4 +844,366 @@ private static RingRange fromEncodedKey(Metadata metadata, ByteBuffer... bb) { /** Simple Cassandra entity used in write tests. */ @Table(name = CASSANDRA_TABLE_WRITE, keyspace = CASSANDRA_KEYSPACE) static class ScientistWrite extends Scientist {} + + /** Test the quoteIdentifier utility method with various inputs. */ + @Test + public void testQuoteIdentifier() { + // Test normal identifiers + assertEquals("\"normal_column\"", ReadFn.quoteIdentifier("normal_column")); + assertEquals("\"myTable\"", ReadFn.quoteIdentifier("myTable")); + assertEquals("\"column123\"", ReadFn.quoteIdentifier("column123")); + + // Test reserved keywords + assertEquals("\"true\"", ReadFn.quoteIdentifier("true")); + assertEquals("\"key\"", ReadFn.quoteIdentifier("key")); + assertEquals("\"select\"", ReadFn.quoteIdentifier("select")); + assertEquals("\"from\"", ReadFn.quoteIdentifier("from")); + assertEquals("\"where\"", ReadFn.quoteIdentifier("where")); + assertEquals("\"table\"", ReadFn.quoteIdentifier("table")); + assertEquals("\"keyspace\"", ReadFn.quoteIdentifier("keyspace")); + + // Test identifiers with existing quotes (should be escaped by doubling) + assertEquals("\"column\"\"with\"\"quotes\"", ReadFn.quoteIdentifier("column\"with\"quotes")); + assertEquals("\"single\"\"quote\"", ReadFn.quoteIdentifier("single\"quote")); + assertEquals("\"\"\"starts_with_quote\"", ReadFn.quoteIdentifier("\"starts_with_quote")); + assertEquals("\"ends_with_quote\"\"\"", ReadFn.quoteIdentifier("ends_with_quote\"")); + + // Test edge cases + assertEquals("\"\"", ReadFn.quoteIdentifier("")); + assertNull(ReadFn.quoteIdentifier(null)); + + // Test special characters that might be in identifiers + assertEquals("\"column with spaces\"", ReadFn.quoteIdentifier("column with spaces")); + assertEquals("\"column-with-dashes\"", ReadFn.quoteIdentifier("column-with-dashes")); + assertEquals("\"column.with.dots\"", ReadFn.quoteIdentifier("column.with.dots")); + } + + /** + * Test reading from a table with reserved keyword column names. This integration test verifies + * the complete fix works end-to-end. + */ + @Test + public void testReadWithReservedKeywordColumns() throws Exception { + String reservedTableName = "reserved_keywords_table"; + + // Create table with reserved keyword column names + String createTableQuery = + String.format( + "CREATE TABLE IF NOT EXISTS %s.%s(" + + "\"true\" text, \"key\" text, \"select\" text, normal_column text, " + + "PRIMARY KEY (\"true\", \"key\")" + + ");", + CASSANDRA_KEYSPACE, reservedTableName); + + session.execute(createTableQuery); + + // Insert test data with reserved keyword column names + String insertQuery1 = + String.format( + "INSERT INTO %s.%s(\"true\", \"key\", \"select\", normal_column) " + + "VALUES ('true_value_1', 'key_value_1', 'select_value_1', 'normal_value_1');", + CASSANDRA_KEYSPACE, reservedTableName); + session.execute(insertQuery1); + + String insertQuery2 = + String.format( + "INSERT INTO %s.%s(\"true\", \"key\", \"select\", normal_column) " + + "VALUES ('true_value_2', 'key_value_2', 'select_value_2', 'normal_value_2');", + CASSANDRA_KEYSPACE, reservedTableName); + session.execute(insertQuery2); + + // Flush to ensure data is written + flushMemTablesAndRefreshSizeEstimates(); + + // Test reading with CassandraIO - this should work with the fix + PCollection output = + pipeline.apply( + CassandraIO.read() + .withHosts(Collections.singletonList(CASSANDRA_HOST)) + .withPort(cassandraPort) + .withKeyspace(CASSANDRA_KEYSPACE) + .withTable(reservedTableName) + .withCoder(SerializableCoder.of(ReservedKeywordEntity.class)) + .withEntity(ReservedKeywordEntity.class)); + + // Verify we can read the data successfully + PAssert.thatSingleton(output.apply("Count", Count.globally())).isEqualTo(2L); + + PAssert.that(output) + .satisfies( + input -> { + List entities = new ArrayList<>(); + input.forEach(entities::add); + + assertEquals(2, entities.size()); + + // Check that data was read correctly + boolean foundFirst = false, foundSecond = false; + for (ReservedKeywordEntity entity : entities) { + if ("true_value_1".equals(entity.trueColumn)) { + assertEquals("key_value_1", entity.keyColumn); + assertEquals("select_value_1", entity.selectColumn); + assertEquals("normal_value_1", entity.normalColumn); + foundFirst = true; + } else if ("true_value_2".equals(entity.trueColumn)) { + assertEquals("key_value_2", entity.keyColumn); + assertEquals("select_value_2", entity.selectColumn); + assertEquals("normal_value_2", entity.normalColumn); + foundSecond = true; + } + } + + assertTrue("Should find first test record", foundFirst); + assertTrue("Should find second test record", foundSecond); + return null; + }); + + pipeline.run(); + + // Clean up test table + session.execute( + String.format("DROP TABLE IF EXISTS %s.%s", CASSANDRA_KEYSPACE, reservedTableName)); + } + + /** Test reading with a custom query that includes reserved keyword column names. */ + @Test + public void testReadWithCustomQueryAndReservedKeywords() throws Exception { + String customQueryTableName = "custom_query_test"; + + // Create table with reserved keyword column names + String createTableQuery = + String.format( + "CREATE TABLE IF NOT EXISTS %s.%s(" + + "\"from\" text, \"where\" text, data text, " + + "PRIMARY KEY (\"from\", \"where\")" + + ");", + CASSANDRA_KEYSPACE, customQueryTableName); + + session.execute(createTableQuery); + + // Insert test data + String insertQuery = + String.format( + "INSERT INTO %s.%s(\"from\", \"where\", data) " + + "VALUES ('source1', 'condition1', 'test_data');", + CASSANDRA_KEYSPACE, customQueryTableName); + session.execute(insertQuery); + + // Test with custom query that has WHERE clause - this tests the query building logic + String customQuery = + String.format( + "SELECT \"from\", \"where\", data FROM %s.%s WHERE \"from\"='source1'", + CASSANDRA_KEYSPACE, customQueryTableName); + + PCollection output = + pipeline.apply( + CassandraIO.read() + .withHosts(Collections.singletonList(CASSANDRA_HOST)) + .withPort(cassandraPort) + .withKeyspace(CASSANDRA_KEYSPACE) + .withTable(customQueryTableName) + .withQuery(customQuery) + .withCoder(SerializableCoder.of(CustomQueryEntity.class)) + .withEntity(CustomQueryEntity.class)); + + PAssert.thatSingleton(output.apply("Count", Count.globally())).isEqualTo(1L); + + PAssert.that(output) + .satisfies( + input -> { + CustomQueryEntity entity = input.iterator().next(); + assertEquals("source1", entity.fromColumn); + assertEquals("condition1", entity.whereColumn); + assertEquals("test_data", entity.data); + return null; + }); + + pipeline.run(); + + // Clean up + session.execute( + String.format("DROP TABLE IF EXISTS %s.%s", CASSANDRA_KEYSPACE, customQueryTableName)); + } + + /** Test that the fix handles multiple partition key columns with reserved keywords. */ + @Test + public void testMultiplePartitionKeyReservedWords() throws Exception { + String multiPartitionTableName = "multi_partition_test"; + + // Create table with multiple partition key columns that are reserved keywords + String createTableQuery = + String.format( + "CREATE TABLE IF NOT EXISTS %s.%s(" + + "\"table\" text, \"index\" text, \"value\" text, data text, " + + "PRIMARY KEY ((\"table\", \"index\"), \"value\")" + + ");", + CASSANDRA_KEYSPACE, multiPartitionTableName); + + session.execute(createTableQuery); + + // Insert test data + String insertQuery = + String.format( + "INSERT INTO %s.%s(\"table\", \"index\", \"value\", data) " + + "VALUES ('table1', 'index1', 'value1', 'test_data');", + CASSANDRA_KEYSPACE, multiPartitionTableName); + session.execute(insertQuery); + + PCollection output = + pipeline.apply( + CassandraIO.read() + .withHosts(Collections.singletonList(CASSANDRA_HOST)) + .withPort(cassandraPort) + .withKeyspace(CASSANDRA_KEYSPACE) + .withTable(multiPartitionTableName) + .withCoder(SerializableCoder.of(MultiPartitionEntity.class)) + .withEntity(MultiPartitionEntity.class)); + + PAssert.thatSingleton(output.apply("Count", Count.globally())).isEqualTo(1L); + + PAssert.that(output) + .satisfies( + input -> { + MultiPartitionEntity entity = input.iterator().next(); + assertEquals("table1", entity.tableColumn); + assertEquals("index1", entity.indexColumn); + assertEquals("value1", entity.valueColumn); + assertEquals("test_data", entity.data); + return null; + }); + + pipeline.run(); + + // Clean up + session.execute( + String.format("DROP TABLE IF EXISTS %s.%s", CASSANDRA_KEYSPACE, multiPartitionTableName)); + } + + /** Test that normal (non-reserved) identifiers still work correctly after the fix. */ + @Test + public void testNormalIdentifiersStillWork() throws Exception { + // This test uses the existing CASSANDRA_TABLE which has normal column names + // to ensure our changes don't break existing functionality + + PCollection output = + pipeline.apply( + CassandraIO.read() + .withHosts(Collections.singletonList(CASSANDRA_HOST)) + .withPort(cassandraPort) + .withKeyspace(CASSANDRA_KEYSPACE) + .withTable(CASSANDRA_TABLE) + .withCoder(SerializableCoder.of(Scientist.class)) + .withEntity(Scientist.class)); + + PAssert.thatSingleton(output.apply("Count", Count.globally())).isEqualTo(NUM_ROWS); + + pipeline.run(); + } + + // Add these entity classes after the existing entity classes at the end of the file + + /** Test entity class for reserved keyword column names to verify identifier quoting. */ + @Table(name = "reserved_keywords_table", keyspace = CASSANDRA_KEYSPACE) + static class ReservedKeywordEntity implements Serializable { + + @PartitionKey + @Column(name = "true") // Reserved keyword as column name + String trueColumn; + + @ClusteringColumn + @Column(name = "key") // Reserved keyword as column name + String keyColumn; + + @Column(name = "select") // Reserved keyword as column name + String selectColumn; + + @Column(name = "normal_column") // Normal column name + String normalColumn; + + @Override + public boolean equals(@Nullable Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ReservedKeywordEntity that = (ReservedKeywordEntity) o; + return Objects.equal(trueColumn, that.trueColumn) + && Objects.equal(keyColumn, that.keyColumn) + && Objects.equal(selectColumn, that.selectColumn) + && Objects.equal(normalColumn, that.normalColumn); + } + + @Override + public int hashCode() { + return Objects.hashCode(trueColumn, keyColumn, selectColumn, normalColumn); + } + + @Override + public String toString() { + return String.format( + "ReservedKeywordEntity{true='%s', key='%s', select='%s', normal='%s'}", + trueColumn, keyColumn, selectColumn, normalColumn); + } + } + + /** Test entity for custom query test with reserved keyword column names. */ + @Table(name = "custom_query_test", keyspace = CASSANDRA_KEYSPACE) + static class CustomQueryEntity implements Serializable { + @PartitionKey + @Column(name = "from") + String fromColumn; + + @ClusteringColumn + @Column(name = "where") + String whereColumn; + + @Column String data; + + @Override + public boolean equals(@Nullable Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + CustomQueryEntity that = (CustomQueryEntity) o; + return Objects.equal(fromColumn, that.fromColumn) + && Objects.equal(whereColumn, that.whereColumn) + && Objects.equal(data, that.data); + } + + @Override + public int hashCode() { + return Objects.hashCode(fromColumn, whereColumn, data); + } + } + + /** Test entity for multiple partition key test with reserved keywords. */ + @Table(name = "multi_partition_test", keyspace = CASSANDRA_KEYSPACE) + static class MultiPartitionEntity implements Serializable { + @PartitionKey(0) + @Column(name = "table") + String tableColumn; + + @PartitionKey(1) + @Column(name = "index") + String indexColumn; + + @ClusteringColumn + @Column(name = "value") + String valueColumn; + + @Column String data; + + @Override + public boolean equals(@Nullable Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + MultiPartitionEntity that = (MultiPartitionEntity) o; + return Objects.equal(tableColumn, that.tableColumn) + && Objects.equal(indexColumn, that.indexColumn) + && Objects.equal(valueColumn, that.valueColumn) + && Objects.equal(data, that.data); + } + + @Override + public int hashCode() { + return Objects.hashCode(tableColumn, indexColumn, valueColumn, data); + } + } } From 8a217abded8b2cfb8a3584b2d747caa5689d15a6 Mon Sep 17 00:00:00 2001 From: Suvrat1629 Date: Fri, 10 Oct 2025 18:05:45 +0530 Subject: [PATCH 2/2] Minor fixes --- .../sdk/io/cassandra/CassandraIOTest.java | 24 ++++++++++++++----- 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java index b040af994976..df52421db235 100644 --- a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java +++ b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java @@ -1122,8 +1122,12 @@ static class ReservedKeywordEntity implements Serializable { @Override public boolean equals(@Nullable Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } ReservedKeywordEntity that = (ReservedKeywordEntity) o; return Objects.equal(trueColumn, that.trueColumn) && Objects.equal(keyColumn, that.keyColumn) @@ -1159,8 +1163,12 @@ static class CustomQueryEntity implements Serializable { @Override public boolean equals(@Nullable Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } CustomQueryEntity that = (CustomQueryEntity) o; return Objects.equal(fromColumn, that.fromColumn) && Objects.equal(whereColumn, that.whereColumn) @@ -1192,8 +1200,12 @@ static class MultiPartitionEntity implements Serializable { @Override public boolean equals(@Nullable Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } MultiPartitionEntity that = (MultiPartitionEntity) o; return Objects.equal(tableColumn, that.tableColumn) && Objects.equal(indexColumn, that.indexColumn)