Skip to content

Commit c7fa3e8

Browse files
authored
Support connector hint passthrough (#127)
* Support connector hint passthrough * touch up README * revert not needed change * Use connector name for hints
1 parent 9bf7075 commit c7fa3e8

File tree

5 files changed

+39
-6
lines changed

5 files changed

+39
-6
lines changed

README.md

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ In this case, any jobs created with this template will get deployed as `FlinkSes
148148

149149
### Configuration
150150

151-
The ``{{ }}`` sections you see in the templates are variable placeholders that will be filled in by the Deployer.
151+
The `{{ }}` sections you see in the templates are variable placeholders that will be filled in by the Deployer.
152152
See [Template.java](hoptimator-util/src/main/java/com/linkedin/hoptimator/util/Template.java) for how to specify templates.
153153

154154
While Deployers are extensible, today the primary deployer is to Kubernetes. These deployers
@@ -180,10 +180,25 @@ This can be done by adding hints as JDBC properties.
180180

181181
Hints are key-value pairs separated by an equals sign. Multiple hints are separated by a comma.
182182

183-
For example, to specify the number of kafka partitions and the flink parallelism, you could add the following hints to the query:
183+
There are two ways to use hints.
184+
185+
1. Template hints can be used to override the `{{ }}` template specifications.
186+
187+
For example, to specify the number of kafka partitions and the flink parallelism, you could add the following hints to the connection:
184188
```
185189
jdbc:hoptimator://hints=kafka.partitions=4,flink.parallelism=2
186190
```
187191
These fields can then be added to templates as `{{kafka.partitions}}` or `{{flink.parallelism}}` where applicable.
188192

193+
2. Connector hints allow the user to pass configurations directly through to an Engine (e.g. Flink).
194+
195+
Connector hints must be formatted as follows `<engine-connector-name>.<source|sink>.<configName>`
196+
197+
For example, to set a Kafka group id and startup mode to be used by Flink, you could add the following hints to the connection:
198+
```
199+
jdbc:hoptimator://hints=kafka.source.properties.group.id=4,kafka.sink.sink.parallelism=2
200+
```
201+
Field `properties.group.id` will be applied if the `kafka` connector is used by a source, and `sink.parallelism`
202+
if the `kafka` connector is used by a sink.
203+
189204
Note that hints are simply recommendations, if the planner plans a different pipeline, they will be ignored.

hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sConnector.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import java.util.stream.Collectors;
1111

1212
import com.linkedin.hoptimator.Connector;
13+
import com.linkedin.hoptimator.Sink;
1314
import com.linkedin.hoptimator.Source;
1415
import com.linkedin.hoptimator.k8s.models.V1alpha1TableTemplate;
1516
import com.linkedin.hoptimator.k8s.models.V1alpha1TableTemplateList;
@@ -46,6 +47,16 @@ public Map<String, String> configure() throws SQLException {
4647
.collect(Collectors.joining("\n"));
4748
Properties props = new Properties();
4849
try {
50+
// Preload configs in order to check for 'connector'
51+
props.load(new StringReader(configs));
52+
53+
// The order here is intentional. Connection options that allow overrides will have already been overridden above
54+
// by hints. Connection options that do not allow overrides, should not be overridden by hints.
55+
// If this were allowed, this can lead to incorrect behavior if hints attempt to override essential properties,
56+
// e.g. 'connector' or 'topic' for kafka
57+
if (props.containsKey("connector")) {
58+
props.putAll(getConnectorHints(props.getProperty("connector")));
59+
}
4960
props.load(new StringReader(configs));
5061
} catch (IOException e) {
5162
throw new SQLException(e);
@@ -55,4 +66,11 @@ public Map<String, String> configure() throws SQLException {
5566
map.put(k, props.getProperty(k)));
5667
return map;
5768
}
69+
70+
private Map<String, String> getConnectorHints(String connectorName) {
71+
String connectorHintPrefix = connectorName + "." + (source instanceof Sink ? "sink" : "source");
72+
return source.options().entrySet().stream()
73+
.filter(e -> e.getKey().startsWith(connectorHintPrefix))
74+
.collect(Collectors.toMap(e -> e.getKey().substring(connectorHintPrefix.length() + 1), Map.Entry::getValue));
75+
}
5876
}

hoptimator-kafka/src/test/java/com/linkedin/hoptimator/kafka/TestSqlScripts.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,6 @@ public class TestSqlScripts extends QuidemTestBase {
1111
@Test
1212
@Tag("integration")
1313
public void kafkaDdlScript() throws Exception {
14-
run("kafka-ddl.id", "hints=kafka.partitions=4,flink.parallelism=2");
14+
run("kafka-ddl.id", "hints=kafka.partitions=4,flink.parallelism=2,kafka.source.k1=v1,kafka.sink.k2=v2");
1515
}
1616
}

hoptimator-kafka/src/test/resources/kafka-ddl.id

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@ spec:
1212
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
1313
args:
1414
- CREATE DATABASE IF NOT EXISTS `KAFKA` WITH ()
15-
- CREATE TABLE IF NOT EXISTS `KAFKA`.`existing-topic-2` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'key.fields'='KEY', 'key.format'='raw', 'properties.bootstrap.servers'='one-kafka-bootstrap.kafka.svc.cluster.local:9094', 'scan.startup.mode'='earliest-offset', 'topic'='existing-topic-2', 'value.fields-include'='EXCEPT_KEY', 'value.format'='json')
15+
- CREATE TABLE IF NOT EXISTS `KAFKA`.`existing-topic-2` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'k1'='v1', 'key.fields'='KEY', 'key.format'='raw', 'properties.bootstrap.servers'='one-kafka-bootstrap.kafka.svc.cluster.local:9094', 'scan.startup.mode'='earliest-offset', 'topic'='existing-topic-2', 'value.fields-include'='EXCEPT_KEY', 'value.format'='json')
1616
- CREATE DATABASE IF NOT EXISTS `KAFKA` WITH ()
17-
- CREATE TABLE IF NOT EXISTS `KAFKA`.`existing-topic-1` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'key.fields'='KEY', 'key.format'='raw', 'properties.bootstrap.servers'='one-kafka-bootstrap.kafka.svc.cluster.local:9094', 'scan.startup.mode'='earliest-offset', 'topic'='existing-topic-1', 'value.fields-include'='EXCEPT_KEY', 'value.format'='json')
17+
- CREATE TABLE IF NOT EXISTS `KAFKA`.`existing-topic-1` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'k2'='v2', 'key.fields'='KEY', 'key.format'='raw', 'properties.bootstrap.servers'='one-kafka-bootstrap.kafka.svc.cluster.local:9094', 'scan.startup.mode'='earliest-offset', 'topic'='existing-topic-1', 'value.fields-include'='EXCEPT_KEY', 'value.format'='json')
1818
- INSERT INTO `KAFKA`.`existing-topic-1` (`KEY`, `VALUE`) SELECT * FROM `KAFKA`.`existing-topic-2`
1919
jarURI: file:///opt/hoptimator-flink-runner.jar
2020
parallelism: 2

hoptimator-util/src/main/java/com/linkedin/hoptimator/util/Template.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ private String formatMapAsString(Map<String, String> configMap) {
107107
private String formatPropertiesAsString(Properties props) {
108108
StringBuilder stringBuilder = new StringBuilder();
109109
for (String key : props.stringPropertyNames()) {
110-
stringBuilder.append(key).append(": ").append(props.getProperty(key)).append("\n");
110+
stringBuilder.append(key).append(": '").append(props.getProperty(key)).append("'\n");
111111
}
112112
return stringBuilder.toString();
113113
}

0 commit comments

Comments
 (0)