Skip to content

Commit d2402e6

Browse files
conker84jexp
authored andcommitted
added event batch.size (#133)
simplified build process
1 parent c538bbf commit d2402e6

File tree

15 files changed

+221
-105
lines changed

15 files changed

+221
-105
lines changed

kafka-connect-neo4j/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ Build the project by running the following command:
88

99
$ mvn clean install
1010

11-
Inside the directory `<neo4j-streams>/kafka-connect-neo4j/target` you'll find a file named `kafka-connect-neo4j-<VERSION>.tar.gz`
11+
Inside the directory `<neo4j-streams>/kafka-connect-neo4j/target/component/packages` you'll find a file named `neo4j-kafka-connect-neo4j-<VERSION>.zip`
1212

1313
# Run with docker
1414

15-
Please refer to this file [readme.adoc](docker/readme.adoc)
15+
Please refer to this file [readme.adoc](doc/readme.adoc)
2.46 KB
Loading

kafka-connect-neo4j/docker/docker-compose.yml renamed to kafka-connect-neo4j/doc/docker-compose.yml

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,25 @@
22
version: '2'
33
services:
44
neo4j:
5-
image: neo4j:3.4
5+
image: neo4j:3.4-enterprise
66
hostname: neo4j
77
container_name: neo4j
88
ports:
9-
- "7474:7474"
10-
- "7687:7687"
9+
- "7474:7474"
10+
- "7687:7687"
1111
environment:
1212
NEO4J_kafka_zookeeper_connect: zookeeper:2181
1313
NEO4J_kafka_bootstrap_servers: broker:9093
1414
NEO4J_AUTH: neo4j/connect
1515
NEO4J_dbms_memory_heap_max__size: 8G
16+
NEO4J_ACCEPT_LICENSE_AGREEMENT: yes
1617

1718
zookeeper:
1819
image: confluentinc/cp-zookeeper
1920
hostname: zookeeper
2021
container_name: zookeeper
2122
ports:
22-
- "2181:2181"
23+
- "2181:2181"
2324
environment:
2425
ZOOKEEPER_CLIENT_PORT: 2181
2526
ZOOKEEPER_TICK_TIME: 2000
@@ -29,9 +30,9 @@ services:
2930
hostname: broker
3031
container_name: broker
3132
depends_on:
32-
- zookeeper
33+
- zookeeper
3334
ports:
34-
- "9092:9092"
35+
- "9092:9092"
3536
expose:
3637
- "9093"
3738
environment:
@@ -58,10 +59,10 @@ services:
5859
hostname: schema_registry
5960
container_name: schema_registry
6061
depends_on:
61-
- zookeeper
62-
- broker
62+
- zookeeper
63+
- broker
6364
ports:
64-
- "8081:8081"
65+
- "8081:8081"
6566
environment:
6667
SCHEMA_REGISTRY_HOST_NAME: schema_registry
6768
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
@@ -71,13 +72,13 @@ services:
7172
hostname: connect
7273
container_name: connect
7374
depends_on:
74-
- zookeeper
75-
- broker
76-
- schema_registry
75+
- zookeeper
76+
- broker
77+
- schema_registry
7778
ports:
78-
- "8083:8083"
79+
- "8083:8083"
7980
volumes:
80-
- ./plugins:/tmp/connect-plugins
81+
- ./plugins:/tmp/connect-plugins
8182
environment:
8283
CONNECT_BOOTSTRAP_SERVERS: 'broker:9093'
8384
CONNECT_REST_ADVERTISED_HOST_NAME: connect
@@ -105,12 +106,12 @@ services:
105106
hostname: control-center
106107
container_name: control-center
107108
depends_on:
108-
- zookeeper
109-
- broker
110-
- schema_registry
111-
- connect
109+
- zookeeper
110+
- broker
111+
- schema_registry
112+
- connect
112113
ports:
113-
- "9021:9021"
114+
- "9021:9021"
114115
environment:
115116
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:9093'
116117
CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'

kafka-connect-neo4j/docker/readme.adoc renamed to kafka-connect-neo4j/doc/readme.adoc

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,11 @@ Build the project by running the following command:
44

55
mvn clean install
66

7-
Inside the directory `<neo4j-streams>/kafka-connect-neo4j/target` you'll find a file named `kafka-connect-neo4j-<VERSION>.tar.gz`
8-
7+
Inside the directory `<neo4j-streams>/kafka-connect-neo4j/target/component/packages` you'll find a file named `neo4j-kafka-connect-neo4j-<VERSION>.zip`
98

109
= Configuring the stack
1110

12-
Create a directory `plugins` at the same level of the compose file and unzip the file `kafka-connect-neo4j-<VERSION>.tar.gz` inside it, then start the compose file
11+
Create a directory `plugins` at the same level of the compose file and unzip the file `neo4j-kafka-connect-neo4j-<VERSION>.zip` inside it, then start the compose file
1312

1413
docker-compose up -d
1514

kafka-connect-neo4j/pom.xml

Lines changed: 84 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -11,26 +11,27 @@
1111
<name>kafka-connect-neo4j</name>
1212
<description>A Kafka Connect Connector for kafka-connect-neo4j</description>
1313

14-
<parent>
15-
<groupId>com.github.jcustenborder.kafka.connect</groupId>
16-
<artifactId>kafka-connect-parent</artifactId>
17-
<version>2.0.0-cp1</version>
18-
</parent>
1914
<issueManagement>
2015
<system>github</system>
2116
<url>https://github.com/neo4j-contrib/neo4j-streams/issues</url>
2217
</issueManagement>
2318

19+
<parent>
20+
<groupId>org.neo4j</groupId>
21+
<artifactId>neo4j-streams-parent</artifactId>
22+
<version>3.4.0</version>
23+
</parent>
24+
2425
<properties>
2526
<neo4j.java.driver.version>1.6.3</neo4j.java.driver.version>
2627
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
27-
<java.version>1.8</java.version>
28-
<kotlin.version>1.3.0</kotlin.version>
29-
<kotlin.coroutines.version>1.0.0</kotlin.coroutines.version>
30-
<kotlin.compiler.incremental>true</kotlin.compiler.incremental>
28+
3129
<easymock.version>4.0.1</easymock.version>
32-
<neo4j.version>3.4.7</neo4j.version>
33-
<neo4j.streams.version>3.4.0</neo4j.streams.version>
30+
31+
<avro.version>1.8.2</avro.version>
32+
<confluent.serializer.version>5.0.0</confluent.serializer.version>
33+
<confluent.connect.plugin.version>0.11.1</confluent.connect.plugin.version>
34+
<mvn.assembly.plugin.version>3.1.0</mvn.assembly.plugin.version>
3435
</properties>
3536

3637
<organization>
@@ -59,26 +60,47 @@
5960
<url>[email protected]:neo4j-contrib/neo4j-streams.git</url>
6061
</scm>
6162

63+
<repositories>
64+
<repository>
65+
<id>confluent</id>
66+
<url>http://packages.confluent.io/maven/</url>
67+
</repository>
68+
</repositories>
6269

6370
<dependencies>
71+
<dependency>
72+
<groupId>org.apache.kafka</groupId>
73+
<artifactId>connect-api</artifactId>
74+
<version>${kafka.version}</version>
75+
<scope>provided</scope>
76+
</dependency>
77+
<dependency>
78+
<groupId>com.github.jcustenborder.kafka.connect</groupId>
79+
<artifactId>connect-utils</artifactId>
80+
<version>0.3.141</version>
81+
</dependency>
82+
<dependency>
83+
<groupId>io.confluent</groupId>
84+
<artifactId>kafka-avro-serializer</artifactId>
85+
<version>${confluent.serializer.version}</version>
86+
</dependency>
87+
<dependency>
88+
<groupId>io.confluent</groupId>
89+
<artifactId>kafka-connect-maven-plugin</artifactId>
90+
<version>${confluent.connect.plugin.version}</version>
91+
</dependency>
92+
6493
<dependency>
6594
<groupId>org.neo4j</groupId>
6695
<artifactId>neo4j-streams-common</artifactId>
67-
<version>${neo4j.streams.version}</version>
96+
<version>${parent.version}</version>
6897
</dependency>
6998
<dependency>
7099
<groupId>org.neo4j.driver</groupId>
71100
<artifactId>neo4j-java-driver</artifactId>
72101
<version>${neo4j.java.driver.version}</version>
73102
</dependency>
74103

75-
76-
<dependency>
77-
<groupId>org.neo4j.test</groupId>
78-
<artifactId>neo4j-harness</artifactId>
79-
<version>${neo4j.version}</version>
80-
<scope>test</scope>
81-
</dependency>
82104
<dependency>
83105
<groupId>org.easymock</groupId>
84106
<artifactId>easymock</artifactId>
@@ -116,51 +138,66 @@
116138

117139
<plugins>
118140
<plugin>
119-
<artifactId>maven-shade-plugin</artifactId>
120-
<version>2.4.3</version>
141+
<groupId>org.apache.maven.plugins</groupId>
142+
<artifactId>maven-assembly-plugin</artifactId>
143+
<version>${mvn.assembly.plugin.version}</version>
144+
<configuration>
145+
<descriptorRefs>
146+
<descriptorRef>jar-with-dependencies</descriptorRef>
147+
</descriptorRefs>
148+
<outputDirectory>${basedir}/target/kafka-connect-neo4j/</outputDirectory>
149+
</configuration>
121150
<executions>
122151
<execution>
152+
<id>make-assembly</id>
123153
<phase>package</phase>
124154
<goals>
125-
<goal>shade</goal>
155+
<goal>single</goal>
126156
</goals>
127157
</execution>
128158
</executions>
129159
</plugin>
130160
<plugin>
131-
<artifactId>kotlin-maven-plugin</artifactId>
132-
<groupId>org.jetbrains.kotlin</groupId>
133-
<version>${kotlin.version}</version>
161+
<groupId>io.confluent</groupId>
162+
<artifactId>kafka-connect-maven-plugin</artifactId>
163+
<version>${confluent.connect.plugin.version}</version>
134164
<executions>
135165
<execution>
136-
<id>compile</id>
137-
<goals>
138-
<goal>compile</goal>
139-
</goals>
140-
</execution>
141-
142-
<execution>
143-
<id>test-compile</id>
144166
<goals>
145-
<goal>test-compile</goal>
167+
<goal>kafka-connect</goal>
146168
</goals>
169+
<configuration>
170+
<componentTypes>
171+
<componentType>sink</componentType>
172+
</componentTypes>
173+
<ownerUsername>neo4j</ownerUsername>
174+
<title>Kafka Connect Neo4j Sink</title>
175+
<documentationUrl>https://neo4j-contrib.github.io/neo4j-streams/</documentationUrl>
176+
<description>It's a basic Apache Kafka Connect SinkConnector which allows moving data from Kafka topics into Neo4j via Cypher templated queries.</description>
177+
<logo>assets/neo4j-logo.png</logo>
178+
<supportSummary>Support provided through community involvement.</supportSummary>
179+
<supportUrl>${project.issueManagement.url}</supportUrl>
180+
<confluentControlCenterIntegration>true</confluentControlCenterIntegration>
181+
<tags>
182+
<tag>neo4j</tag>
183+
<tag>nosql</tag>
184+
<tag>json</tag>
185+
<tag>graph</tag>
186+
<tag>nodes</tag>
187+
<tag>relationships</tag>
188+
<tag>cypher</tag>
189+
</tags>
190+
</configuration>
147191
</execution>
148192
</executions>
149-
<configuration>
150-
<jvmTarget>1.8</jvmTarget>
151-
</configuration>
152-
</plugin>
153-
<plugin>
154-
<artifactId>maven-surefire-plugin</artifactId>
155-
<version>2.20.1</version>
156-
<configuration>
157-
<includes>
158-
<include>**/*Test.*</include>
159-
<include>**/*IT.*</include>
160-
</includes>
161-
</configuration>
162193
</plugin>
163194
</plugins>
195+
<resources>
196+
<resource>
197+
<directory>src/main/resources</directory>
198+
<filtering>true</filtering>
199+
</resource>
200+
</resources>
164201
</build>
165202

166203
</project>

kafka-connect-neo4j/src/main/kotlin/streams/kafka/connect/sink/Neo4jService.kt

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,15 @@ package streams.kafka.connect.sink
33
import kotlinx.coroutines.async
44
import kotlinx.coroutines.channels.ticker
55
import kotlinx.coroutines.coroutineScope
6-
import kotlinx.coroutines.delay
76
import kotlinx.coroutines.selects.whileSelect
87
import org.apache.kafka.common.config.ConfigException
9-
import org.neo4j.driver.v1.*
8+
import org.neo4j.driver.v1.AuthTokens
9+
import org.neo4j.driver.v1.Config
10+
import org.neo4j.driver.v1.Driver
11+
import org.neo4j.driver.v1.GraphDatabase
1012
import org.slf4j.Logger
1113
import org.slf4j.LoggerFactory
1214
import java.util.concurrent.TimeUnit
13-
import kotlin.random.Random
1415

1516

1617
class Neo4jService(private val config: Neo4jSinkConnectorConfig) {
@@ -58,37 +59,37 @@ class Neo4jService(private val config: Neo4jSinkConnectorConfig) {
5859
driver.close()
5960
}
6061

61-
private fun debug(message: String) {
62-
if (log.isDebugEnabled) {
63-
log.debug(message)
64-
}
65-
}
66-
67-
private suspend fun write(query: String, data: Map<String, Any>) {
62+
private fun write(query: String, data: Map<String, Any>) {
6863
val session = driver.session()
6964
session.writeTransaction {
7065
try {
7166
it.run(query, data)
7267
it.success()
73-
debug("Successfully executed query: `$query`, with data: `$data`")
68+
if (log.isDebugEnabled) {
69+
log.debug("Successfully executed query: `$query`, with data: `$data`")
70+
}
7471
} catch (e: Exception) {
75-
debug("Exception `${e.message}` while executing query: `$query`, with data: `$data`")
72+
if (log.isDebugEnabled) {
73+
log.debug("Exception `${e.message}` while executing query: `$query`, with data: `$data`")
74+
}
7675
it.failure()
7776
}
7877
it.close()
7978
}
8079
session.close()
8180
}
8281

83-
suspend fun writeData(data: Map<String, Map<String, Any>>) = coroutineScope {
82+
suspend fun writeData(data: Map<String, List<List<Map<String, Any>>>>) = coroutineScope {
8483
val timeout = config.batchTimeout
8584
val ticker = ticker(timeout)
86-
val deferredList = data.map { (query, events) ->
87-
async { write(query, events) }
85+
val deferredList = data.flatMap { (query, events) ->
86+
events.flatMap { it.map { async { write(query, it) } } }
8887
}
8988
whileSelect {
9089
ticker.onReceive {
91-
debug("Timeout $timeout occurred while executing queries")
90+
if (log.isDebugEnabled) {
91+
log.debug("Timeout $timeout occurred while executing queries")
92+
}
9293
deferredList.forEach { deferred -> deferred.cancel() }
9394
false // Stops the whileSelect
9495
}
@@ -99,4 +100,6 @@ class Neo4jService(private val config: Neo4jSinkConnectorConfig) {
99100
}
100101
}
101102

103+
104+
102105
}

0 commit comments

Comments
 (0)