Skip to content

Commit 6fc2836

Browse files
feat(plugin-kafka): Enable case-senstive support for Kafka connector (#26023)
## Description This PR adds support for case-sensitive identifiers in the Kafka connector. ## Motivation and Context <!---Why is this change required? What problem does it solve?--> <!---If it fixes an open issue, please link to the issue here.--> ## Impact <!---Describe any public API or user-facing feature change or any performance impact--> ## Test Plan <!---Please fill in how you tested your change--> ## Contributor checklist - [ ] Please make sure your submission complies with our [contributing guide](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md), in particular [code style](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md#code-style) and [commit standards](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md#commit-standards). - [ ] PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced. - [ ] Documented new properties (with its default value), SQL syntax, functions, or other functionality. - [ ] If release notes are required, they follow the [release notes guidelines](https://github.com/prestodb/presto/wiki/Release-Notes-Guidelines). - [ ] Adequate tests were added if applicable. - [ ] CI passed. ## Release Notes Please follow [release notes guidelines](https://github.com/prestodb/presto/wiki/Release-Notes-Guidelines) and fill in the release notes below. ``` ==RELEASE NOTE == Kafka Connector Changes * Add mixed case support for Kafka connector. ```
1 parent dc68abf commit 6fc2836

File tree

9 files changed

+375
-21
lines changed

9 files changed

+375
-21
lines changed

presto-docs/src/main/sphinx/connector/kafka.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,10 @@ Property Name Description
7373
``kafka.truststore.password`` Password for the truststore file
7474
``kafka.truststore.type`` File format of the truststore file, defaults to ``JKS``
7575
``kafka.config.resources`` A comma-separated list of Kafka client configuration files. If a specialized authentication method is required, it can be specified in these additional Kafka client properties files. Example: `/etc/kafka-configuration.properties`
76+
``case-sensitive-name-matching`` Enable case-sensitive identifier support for schema,
77+
table, and column names for the connector. When disabled,
78+
names are matched case-insensitively using lowercase
79+
normalization. Default is ``false``.
7680
=================================== ==============================================================
7781

7882
``kafka.table-names``

presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaConnectorConfig.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ public class KafkaConnectorConfig
7070
*/
7171
private List<File> resourceConfigFiles = ImmutableList.of();
7272

73+
private boolean caseSensitiveNameMatching;
74+
7375
@NotNull
7476
public String getDefaultSchema()
7577
{
@@ -173,4 +175,18 @@ public KafkaConnectorConfig setResourceConfigFiles(String files)
173175
.collect(toImmutableList());
174176
return this;
175177
}
178+
179+
public boolean isCaseSensitiveNameMatching()
180+
{
181+
return caseSensitiveNameMatching;
182+
}
183+
184+
@Config("case-sensitive-name-matching")
185+
@ConfigDescription("Enable case-sensitive matching of schema, table names across the connector. " +
186+
"When disabled, names are matched case-insensitively using lowercase normalization.")
187+
public KafkaConnectorConfig setCaseSensitiveNameMatching(boolean caseSensitiveNameMatchingEnabled)
188+
{
189+
this.caseSensitiveNameMatching = caseSensitiveNameMatchingEnabled;
190+
return this;
191+
}
176192
}

presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaMetadata.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import static com.facebook.presto.kafka.KafkaHandleResolver.convertTableHandle;
5151
import static com.google.common.collect.ImmutableList.toImmutableList;
5252
import static java.lang.String.format;
53+
import static java.util.Locale.ROOT;
5354
import static java.util.Objects.requireNonNull;
5455

5556
/**
@@ -63,6 +64,7 @@ public class KafkaMetadata
6364
private final String connectorId;
6465
private final boolean hideInternalColumns;
6566
private final TableDescriptionSupplier tableDescriptionSupplier;
67+
private final boolean caseSensitiveNameMatching;
6668

6769
@Inject
6870
public KafkaMetadata(
@@ -75,6 +77,7 @@ public KafkaMetadata(
7577
requireNonNull(kafkaConnectorConfig, "kafkaConfig is null");
7678
this.hideInternalColumns = kafkaConnectorConfig.isHideInternalColumns();
7779
this.tableDescriptionSupplier = requireNonNull(tableDescriptionSupplier, "tableDescriptionSupplier is null");
80+
this.caseSensitiveNameMatching = kafkaConnectorConfig.isCaseSensitiveNameMatching();
7881
}
7982

8083
@Override
@@ -305,4 +308,9 @@ private Optional<KafkaTopicDescription> getTopicDescription(SchemaTableName sche
305308
{
306309
return tableDescriptionSupplier.getTopicDescription(schemaTableName);
307310
}
311+
@Override
312+
public String normalizeIdentifier(ConnectorSession session, String identifier)
313+
{
314+
return caseSensitiveNameMatching ? identifier : identifier.toLowerCase(ROOT);
315+
}
308316
}

presto-kafka/src/test/java/com/facebook/presto/kafka/KafkaQueryRunner.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -58,28 +58,29 @@ private KafkaQueryRunner()
5858
public static DistributedQueryRunner createKafkaQueryRunner(EmbeddedKafka embeddedKafka, TpchTable<?>... tables)
5959
throws Exception
6060
{
61-
return createKafkaQueryRunner(embeddedKafka, ImmutableList.copyOf(tables));
61+
List<SchemaTableName> extraTables = ImmutableList.of();
62+
return createKafkaQueryRunner(embeddedKafka, ImmutableList.copyOf(tables), ImmutableMap.of(), extraTables);
6263
}
6364

64-
public static DistributedQueryRunner createKafkaQueryRunner(EmbeddedKafka embeddedKafka, Iterable<TpchTable<?>> tables)
65+
public static DistributedQueryRunner createKafkaQueryRunner(EmbeddedKafka embeddedKafka, Iterable<TpchTable<?>> tables, Map<String, String> connectorProperties, List<SchemaTableName> extraTables)
6566
throws Exception
6667
{
6768
DistributedQueryRunner queryRunner = null;
6869
try {
6970
queryRunner = new DistributedQueryRunner(createSession(), 2);
7071

7172
queryRunner.installPlugin(new TpchPlugin());
72-
queryRunner.createCatalog("tpch", "tpch");
73+
queryRunner.createCatalog("tpch", "tpch", connectorProperties);
7374

7475
embeddedKafka.start();
7576

7677
for (TpchTable<?> table : tables) {
7778
embeddedKafka.createTopics(kafkaTopicName(table));
7879
}
7980

80-
Map<SchemaTableName, KafkaTopicDescription> topicDescriptions = createTpchTopicDescriptions(queryRunner.getCoordinator().getMetadata(), tables, embeddedKafka);
81+
Map<SchemaTableName, KafkaTopicDescription> topicDescriptions = createTpchTopicDescriptions(queryRunner.getCoordinator().getMetadata(), tables, embeddedKafka, extraTables);
8182

82-
installKafkaPlugin(embeddedKafka, queryRunner, topicDescriptions);
83+
installKafkaPlugin(embeddedKafka, queryRunner, topicDescriptions, connectorProperties);
8384

8485
TestingPrestoClient prestoClient = queryRunner.getRandomClient();
8586

@@ -111,7 +112,7 @@ private static String kafkaTopicName(TpchTable<?> table)
111112
return TPCH_SCHEMA + "." + table.getTableName().toLowerCase(ENGLISH);
112113
}
113114

114-
private static Map<SchemaTableName, KafkaTopicDescription> createTpchTopicDescriptions(Metadata metadata, Iterable<TpchTable<?>> tables, EmbeddedKafka embeddedKafka)
115+
private static Map<SchemaTableName, KafkaTopicDescription> createTpchTopicDescriptions(Metadata metadata, Iterable<TpchTable<?>> tables, EmbeddedKafka embeddedKafka, List<SchemaTableName> extraTables)
115116
throws Exception
116117
{
117118
JsonCodec<KafkaTopicDescription> topicDescriptionJsonCodec = new CodecSupplier<>(KafkaTopicDescription.class, metadata).get();
@@ -121,7 +122,15 @@ private static Map<SchemaTableName, KafkaTopicDescription> createTpchTopicDescri
121122
String tableName = table.getTableName();
122123
SchemaTableName tpchTable = new SchemaTableName(TPCH_SCHEMA, tableName);
123124

124-
topicDescriptions.put(loadTpchTopicDescription(topicDescriptionJsonCodec, tpchTable.toString(), tpchTable));
125+
topicDescriptions.put(loadTpchTopicDescription(topicDescriptionJsonCodec, tpchTable.toString(), tpchTable, table.getTableName()));
126+
}
127+
128+
for (SchemaTableName extra : extraTables) {
129+
topicDescriptions.put(loadTpchTopicDescription(
130+
topicDescriptionJsonCodec,
131+
extra.getTableName(),
132+
extra,
133+
extra.getTableName().toLowerCase() + "_upper"));
125134
}
126135

127136
List<String> tableNames = new ArrayList<>(4);
@@ -173,7 +182,7 @@ public static void main(String[] args)
173182
throws Exception
174183
{
175184
Logging.initialize();
176-
DistributedQueryRunner queryRunner = createKafkaQueryRunner(EmbeddedKafka.createEmbeddedKafka(), TpchTable.getTables());
185+
DistributedQueryRunner queryRunner = createKafkaQueryRunner(EmbeddedKafka.createEmbeddedKafka(), TpchTable.getTables(), ImmutableMap.of(), ImmutableList.of());
177186
Thread.sleep(10);
178187
Logger log = Logger.get(KafkaQueryRunner.class);
179188
log.info("======== SERVER STARTED ========");

presto-kafka/src/test/java/com/facebook/presto/kafka/TestKafkaConnectorConfig.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ public void testDefaults()
3333
.setHideInternalColumns(true)
3434
.setMaxPartitionFetchBytes(1048576)
3535
.setMaxPollRecords(500)
36-
.setResourceConfigFiles(""));
36+
.setResourceConfigFiles("")
37+
.setCaseSensitiveNameMatching(false));
3738
}
3839

3940
@Test
@@ -51,6 +52,7 @@ public void testExplicitPropertyMappings()
5152
.put("kafka.max-partition-fetch-bytes", "1024")
5253
.put("kafka.max-poll-records", "1000")
5354
.put("kafka.config.resources", tempFile1 + "," + tempFile2)
55+
.put("case-sensitive-name-matching", "true")
5456
.build();
5557

5658
KafkaConnectorConfig expected = new KafkaConnectorConfig()
@@ -61,7 +63,8 @@ public void testExplicitPropertyMappings()
6163
.setHideInternalColumns(false)
6264
.setMaxPartitionFetchBytes(1024)
6365
.setMaxPollRecords(1000)
64-
.setResourceConfigFiles(tempFile1 + "," + tempFile2);
66+
.setResourceConfigFiles(tempFile1 + "," + tempFile2)
67+
.setCaseSensitiveNameMatching(true);
6568

6669
ConfigAssertions.assertFullMapping(properties, expected);
6770
}

presto-kafka/src/test/java/com/facebook/presto/kafka/TestKafkaDistributed.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
import com.facebook.presto.kafka.util.EmbeddedKafka;
1717
import com.facebook.presto.testing.QueryRunner;
1818
import com.facebook.presto.tests.AbstractTestQueries;
19+
import com.google.common.collect.ImmutableList;
20+
import com.google.common.collect.ImmutableMap;
1921
import io.airlift.tpch.TpchTable;
2022
import org.testng.annotations.AfterClass;
2123
import org.testng.annotations.Test;
@@ -36,7 +38,7 @@ protected QueryRunner createQueryRunner()
3638
throws Exception
3739
{
3840
this.embeddedKafka = createEmbeddedKafka();
39-
return createKafkaQueryRunner(embeddedKafka, TpchTable.getTables());
41+
return createKafkaQueryRunner(embeddedKafka, TpchTable.getTables(), ImmutableMap.of(), ImmutableList.of());
4042
}
4143

4244
@AfterClass(alwaysRun = true)

0 commit comments

Comments
 (0)