Skip to content

Commit 519dd34

Browse files
authored
Merge branch '0.4.x' into pr_merge_from_0_3_x_to_0_4_x
2 parents 0be8e33 + c1b441d commit 519dd34

File tree

10 files changed

+403
-41
lines changed

10 files changed

+403
-41
lines changed

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@ SHELL=/bin/bash -o pipefail
88

99
check-dependency = $(if $(shell command -v $(1)),,$(error Make sure $(1) is installed))
1010

11-
CP_VERSION ?= 5.5.0
11+
CP_VERSION ?= 6.0.1
1212
OPERATOR_VERSION ?= 0
1313

14-
KAFKA_CONNECT_DATAGEN_VERSION ?= 0.3.2
14+
KAFKA_CONNECT_DATAGEN_VERSION ?= 0.4.0
1515
AGGREGATE_VERSION = $(KAFKA_CONNECT_DATAGEN_VERSION)-$(CP_VERSION)
1616
OPERATOR_AGGREGATE_VERSION = $(AGGREGATE_VERSION).$(OPERATOR_VERSION)
1717

README.md

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
# Versions
1515

1616
There are multiple [released versions](https://github.com/confluentinc/kafka-connect-datagen/releases) of this connector, starting with `0.1.0`.
17-
The instructions below use version `0.3.2` as an example, but you can substitute any of the other released versions.
17+
The instructions below use version `0.4.0` as an example, but you can substitute any of the other released versions.
1818
In fact, unless specified otherwise, we recommend using the latest released version to get all of the features and bug fixes.
1919

2020
# Usage
@@ -28,7 +28,7 @@ Using the [Confluent Hub Client](https://docs.confluent.io/current/connect/manag
2828
To install a specific release version you can run:
2929

3030
```bash
31-
confluent-hub install confluentinc/kafka-connect-datagen:0.3.2
31+
confluent-hub install confluentinc/kafka-connect-datagen:0.4.0
3232
```
3333

3434
or to install the latest released version:
@@ -40,12 +40,12 @@ confluent-hub install confluentinc/kafka-connect-datagen:latest
4040
### Build connector from latest code
4141

4242
Alternatively, you may build and install the `kafka-connect-datagen` connector from latest code.
43-
Here we use `v0.3.2` to reference the git tag for the `0.3.2` version, but the same pattern works for all released versions.
43+
Here we use `v0.4.0` to reference the git tag for the `0.4.0` version, but the same pattern works for all released versions.
4444

4545
```bash
46-
git checkout v0.3.2
46+
git checkout v0.4.0
4747
mvn clean package
48-
confluent-hub install target/components/packages/confluentinc-kafka-connect-datagen-0.3.2.zip
48+
confluent-hub install target/components/packages/confluentinc-kafka-connect-datagen-0.4.0.zip
4949
```
5050

5151
### Run connector in local install
@@ -70,7 +70,7 @@ You can create a Docker image packaged with the locally built source by running
7070
make build-docker-from-local CP_VERSION=5.5.0
7171
```
7272

73-
This will build the connector from source and create a local image with an aggregate version number. The aggregate version number is the kafka-connect-datagen connector version number and the Confluent Platform version number separated with a `-`. The local kafka-connect-datagen version number is defined in the `pom.xml` file, and the Confluent Platform version defined in the [Makefile](Makfile). An example of the aggregate version number might be: `0.3.2-5.5.0`.
73+
This will build the connector from source and create a local image with an aggregate version number. The aggregate version number is the kafka-connect-datagen connector version number and the Confluent Platform version number separated with a `-`. The local kafka-connect-datagen version number is defined in the `pom.xml` file, and the Confluent Platform version defined in the [Makefile](Makfile). An example of the aggregate version number might be: `0.4.0-6.0.1`.
7474

7575
Alternatively, you can install the `kafka-connect-datagen` connector from [Confluent Hub](https://www.confluent.io/connector/kafka-connect-datagen/) into a Docker image by running:
7676
```bash
@@ -80,9 +80,9 @@ make build-docker-from-released CP_VERSION=5.5.0
8080
The [Makefile](Makefile) contains some default variables that affect the version numbers of both the installed `kafka-connect-datagen` as well as the base Confluent Platform version. The variables are located near the top of the [Makefile](Makefile) with the following names and current default values:
8181

8282
```bash
83-
CP_VERSION ?= 5.5.0
83+
CP_VERSION ?= 6.0.1
8484

85-
KAFKA_CONNECT_DATAGEN_VERSION ?= 0.3.2
85+
KAFKA_CONNECT_DATAGEN_VERSION ?= 0.4.0
8686
```
8787
These values can be overriden with variable declarations before the `make` command. For example:
8888

@@ -123,9 +123,10 @@ Parameter | Description | Default
123123
`kafka.topic` | Topic to write to |
124124
`max.interval` | Max interval between messages (ms) | 500
125125
`iterations` | Number of messages to send from each task, or less than 1 for unlimited | -1
126-
`schema.filename` | Filename of schema to use
126+
`schema.string` | The literal JSON-encoded Avro schema to use. Cannot be set with `schema.filename` or `quickstart`
127+
`schema.filename` | Filename of schema to use. Cannot be set with `schema.string` or `quickstart`
127128
`schema.keyfield` | Name of field to use as the message key
128-
`quickstart` | Name of [quickstart](https://github.com/confluentinc/kafka-connect-datagen/tree/master/src/main/resources) to use
129+
`quickstart` | Name of [quickstart](https://github.com/confluentinc/kafka-connect-datagen/tree/master/src/main/resources) to use. Cannot be set with `schema.string` or `schema.filename`
129130

130131
## Sample configurations
131132

@@ -140,7 +141,7 @@ For an example of using the the Protobuf converter with kafka-connect-datagen, s
140141
## Use a bundled schema specification
141142

142143
There are a few quickstart schema specifications bundled with `kafka-connect-datagen`, and they are listed in this [directory](https://github.com/confluentinc/kafka-connect-datagen/tree/master/src/main/resources).
143-
To use one of these bundled schema, refer to [this mapping](https://github.com/confluentinc/kafka-connect-datagen/blob/master/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java#L66-L73) and in the configuration file, set the parameter `quickstart` to the associated name.
144+
To use one of these bundled schema, refer to [this mapping](https://github.com/confluentinc/kafka-connect-datagen/blob/master/src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java#L75-L86) and in the configuration file, set the parameter `quickstart` to the associated name.
144145
For example:
145146

146147
```bash
@@ -246,8 +247,8 @@ To release new versions of the Docker images to Dockerhub (https://hub.docker.co
246247
The [Makefile](Makefile) contains some default variables that affect the version numbers of both the installed `kafka-connect-datagen` as well as the base Confluent Platform version. The variables are located near the top of the [Makefile](Makefile) with the following names and current default values:
247248

248249
```bash
249-
CP_VERSION ?= 5.5.0
250-
KAFKA_CONNECT_DATAGEN_VERSION ?= 0.3.2
250+
CP_VERSION ?= 6.0.1
251+
KAFKA_CONNECT_DATAGEN_VERSION ?= 0.4.0
251252
OPERATOR_VERSION ?= 0 # Operator is a 'rev' version appended at the end of the CP version, like so: 5.5.0.0
252253
```
253254

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
{
2+
"name": "datagen",
3+
"config": {
4+
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
5+
"kafka.topic": "test1",
6+
"schema.string": "{\"type\": \"record\", \"namespace\": \"ksql\", \"name\": \"pageviews\", \"fields\": [{\"type\": {\"type\": \"long\", \"format_as_time\": \"unix_long\", \"arg.properties\": {\"iteration\": {\"start\": 1, \"step\": 10}}}, \"name\": \"viewtime\"}, {\"type\": {\"type\": \"string\", \"arg.properties\": {\"regex\": \"User_[1-9]{0,1}\"}}, \"name\": \"userid\"}, {\"type\": {\"type\": \"string\", \"arg.properties\": {\"regex\": \"Page_[1-9][0-9]?\"}}, \"name\": \"pageid\"}]}",
7+
"schema.keyfield": "pageid",
8+
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
9+
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
10+
"value.converter.schemas.enable": "false",
11+
"tasks.max": "1"
12+
}
13+
}

docs/pull_request_template.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
## Problem
2+
3+
4+
## Solution
5+
6+
7+
<!--- Mark x in the box. -->
8+
##### Does this solution apply anywhere else?
9+
- [ ] yes
10+
- [ ] no
11+
12+
##### If yes, where?
13+
14+
15+
## Test Strategy
16+
17+
18+
<!--- Mark x in the box for all that apply. -->
19+
##### Testing done:
20+
- [ ] Unit tests
21+
- [ ] Integration tests
22+
- [ ] System tests
23+
- [ ] Manual tests
24+
25+
## Release Plan
26+
<!--- Describe the release plan for this feature. -->
27+
<!-- Are you backporting or merging to master? -->
28+
<!-- If you are reverting or rolling back, is it safe? -->

pom.xml

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,17 @@
2525

2626
<groupId>io.confluent.kafka.connect</groupId>
2727
<artifactId>kafka-connect-datagen</artifactId>
28-
<version>0.3.4-SNAPSHOT</version>
28+
<version>0.4.2-SNAPSHOT</version>
2929
<packaging>jar</packaging>
3030

3131

3232
<properties>
3333
<connect-runtime-version>2.0.0</connect-runtime-version>
3434
<confluent.avro.generator.version>0.3.1</confluent.avro.generator.version>
3535
<junit.version>4.12</junit.version>
36+
<guava.version>32.0.1-jre</guava.version>
3637
<avro.version>1.11.3</avro.version>
37-
<licenses.version>5.4.0</licenses.version>
38+
<jackson.version>2.15.2</jackson.version>
3839
<maven.release.plugin.version>2.5.3</maven.release.plugin.version>
3940
</properties>
4041

@@ -60,7 +61,7 @@
6061
<connection>scm:git:git://github.com/confluentinc/kafka-connect-datagen.git</connection>
6162
<developerConnection>scm:git:[email protected]:confluentinc/kafka-connect-datagen.git</developerConnection>
6263
<url>https://github.com/confluentinc/kafka-connect-datagen</url>
63-
<tag>0.3.x</tag>
64+
<tag>0.4.x</tag>
6465
</scm>
6566

6667
<developers>
@@ -86,6 +87,35 @@
8687
</pluginRepository>
8788
</pluginRepositories>
8889

90+
91+
<!-- pin transitive dependencies for CVEs -->
92+
<dependencyManagement>
93+
<dependencies>
94+
<dependency>
95+
<groupId>com.google.guava</groupId>
96+
<artifactId>guava</artifactId>
97+
<version>${guava.version}</version>
98+
</dependency>
99+
<dependency>
100+
<groupId>org.apache.httpcomponents</groupId>
101+
<artifactId>httpclient</artifactId>
102+
<version>${httpclient.version}</version>
103+
</dependency>
104+
<dependency>
105+
<groupId>com.fasterxml.jackson</groupId>
106+
<artifactId>jackson-bom</artifactId>
107+
<version>${jackson.version}</version>
108+
<type>pom</type>
109+
<scope>import</scope>
110+
</dependency>
111+
<dependency>
112+
<groupId>org.xerial.snappy</groupId>
113+
<artifactId>snappy-java</artifactId>
114+
<version>1.1.10.3</version>
115+
</dependency>
116+
</dependencies>
117+
</dependencyManagement>
118+
89119
<dependencies>
90120
<dependency>
91121
<groupId>org.apache.kafka</groupId>
@@ -113,6 +143,7 @@
113143
<version>${junit.version}</version>
114144
<scope>test</scope>
115145
</dependency>
146+
116147
<dependency>
117148
<groupId>io.confluent.avro</groupId>
118149
<artifactId>avro-random-generator</artifactId>

src/main/java/io/confluent/kafka/connect/datagen/DatagenConnector.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,17 @@
1616

1717
package io.confluent.kafka.connect.datagen;
1818

19+
import com.google.common.annotations.VisibleForTesting;
1920
import java.util.ArrayList;
2021
import java.util.HashMap;
2122
import java.util.List;
2223
import java.util.Map;
2324

25+
import java.util.stream.Collectors;
26+
import org.apache.kafka.common.config.Config;
2427
import org.apache.kafka.common.config.ConfigDef;
2528
import org.apache.kafka.common.config.ConfigException;
29+
import org.apache.kafka.common.config.ConfigValue;
2630
import org.apache.kafka.connect.connector.Task;
2731
import org.apache.kafka.connect.source.SourceConnector;
2832
import org.slf4j.Logger;
@@ -34,6 +38,10 @@ public class DatagenConnector extends SourceConnector {
3438
private DatagenConnectorConfig config;
3539
private Map<String, String> props;
3640

41+
@VisibleForTesting
42+
static final String SCHEMA_SOURCE_ERR =
43+
"Must set exactly one of " + String.join(", ", DatagenConnectorConfig.schemaSourceKeys());
44+
3745
@Override
3846
public String version() {
3947
return VersionUtil.getVersion();
@@ -76,4 +84,27 @@ public void stop() {
7684
public ConfigDef config() {
7785
return DatagenConnectorConfig.conf();
7886
}
87+
88+
@Override
89+
public Config validate(Map<String, String> connectorConfigs) {
90+
Config config = super.validate(connectorConfigs);
91+
validateSchemaSource(config);
92+
return config;
93+
}
94+
95+
private void validateSchemaSource(Config config) {
96+
List<ConfigValue> schemaSources = config.configValues().stream()
97+
.filter(v -> DatagenConnectorConfig.isExplicitlySetSchemaSource(v.name(), v.value()))
98+
.collect(Collectors.toList());
99+
if (schemaSources.size() > 1) {
100+
for (ConfigValue v : schemaSources) {
101+
v.addErrorMessage(SCHEMA_SOURCE_ERR);
102+
}
103+
}
104+
if (schemaSources.size() == 0) {
105+
config.configValues().stream()
106+
.filter(v -> DatagenConnectorConfig.schemaSourceKeys().contains(v.name()))
107+
.forEach(v -> v.addErrorMessage(SCHEMA_SOURCE_ERR));
108+
}
109+
}
79110
}

src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package io.confluent.kafka.connect.datagen;
1818

19+
import com.google.common.collect.ImmutableList;
20+
import java.util.List;
1921
import java.util.Map;
2022

2123
import org.apache.kafka.common.config.AbstractConfig;
@@ -32,6 +34,8 @@ public class DatagenConnectorConfig extends AbstractConfig {
3234
public static final String ITERATIONS_CONF = "iterations";
3335
private static final String ITERATIONS_DOC = "Number of messages to send from each task, "
3436
+ "or less than 1 for unlimited";
37+
public static final String SCHEMA_STRING_CONF = "schema.string";
38+
private static final String SCHEMA_STRING_DOC = "The literal JSON-encoded Avro schema to use";
3539
public static final String SCHEMA_FILENAME_CONF = "schema.filename";
3640
private static final String SCHEMA_FILENAME_DOC = "Filename of schema to use";
3741
public static final String SCHEMA_KEYFIELD_CONF = "schema.keyfield";
@@ -56,6 +60,7 @@ public static ConfigDef conf() {
5660
.define(KAFKA_TOPIC_CONF, Type.STRING, Importance.HIGH, KAFKA_TOPIC_DOC)
5761
.define(MAXINTERVAL_CONF, Type.LONG, 500L, Importance.HIGH, MAXINTERVAL_DOC)
5862
.define(ITERATIONS_CONF, Type.INT, -1, Importance.HIGH, ITERATIONS_DOC)
63+
.define(SCHEMA_STRING_CONF, Type.STRING, "", Importance.HIGH, SCHEMA_STRING_DOC)
5964
.define(SCHEMA_FILENAME_CONF, Type.STRING, "", Importance.HIGH, SCHEMA_FILENAME_DOC)
6065
.define(SCHEMA_KEYFIELD_CONF, Type.STRING, "", Importance.HIGH, SCHEMA_KEYFIELD_DOC)
6166
.define(QUICKSTART_CONF, Type.STRING, "", Importance.HIGH, QUICKSTART_DOC)
@@ -90,5 +95,16 @@ public Long getRandomSeed() {
9095
return this.getLong(RANDOM_SEED_CONF);
9196
}
9297

98+
public String getSchemaString() {
99+
return this.getString(SCHEMA_STRING_CONF);
100+
}
101+
102+
public static List<String> schemaSourceKeys() {
103+
return ImmutableList.of(SCHEMA_STRING_CONF, SCHEMA_FILENAME_CONF, QUICKSTART_CONF);
104+
}
105+
106+
public static boolean isExplicitlySetSchemaSource(String key, Object value) {
107+
return schemaSourceKeys().contains(key) && !("".equals(value));
108+
}
93109
}
94110

0 commit comments

Comments
 (0)