Skip to content

Commit 372f389

Browse files
committed
Rebase v 3.5 to the last version of master branch (#106)
1 parent c7d26dd commit 372f389

File tree

18 files changed

+349
-56
lines changed

18 files changed

+349
-56
lines changed

common/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,13 @@
88
<artifactId>neo4j-streams-common</artifactId>
99
<name>Neo4j Streams - Common</name>
1010
<description>Neo4j Streams - Commons Package</description>
11-
<version>3.4.0</version>
11+
<version>3.5.0</version>
1212
<packaging>jar</packaging>
1313

1414
<parent>
1515
<groupId>org.neo4j</groupId>
1616
<artifactId>neo4j-streams-parent</artifactId>
17-
<version>3.4.0</version>
17+
<version>3.5.0</version>
1818
</parent>
1919

2020
</project>

common/src/main/kotlin/streams/utils/Neo4jUtils.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package streams.utils
22

33
import org.neo4j.graphdb.QueryExecutionException
4-
import org.neo4j.kernel.impl.logging.LogService
54
import org.neo4j.kernel.internal.GraphDatabaseAPI
5+
import org.neo4j.logging.internal.LogService
66
import java.lang.reflect.InvocationTargetException
77

88
object Neo4jUtils {

common/src/test/kotlin/streams/utils/Neo4jUtilsTest.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
package streams.utils
22

3-
import org.junit.*
4-
import org.neo4j.kernel.impl.logging.LogService
3+
import org.junit.AfterClass
4+
import org.junit.BeforeClass
5+
import org.junit.Test
56
import org.neo4j.kernel.internal.GraphDatabaseAPI
67
import org.neo4j.test.TestGraphDatabaseFactory
78
import kotlin.test.assertTrue

consumer/pom.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,22 +6,22 @@
66

77
<groupId>org.neo4j</groupId>
88
<artifactId>neo4j-streams-consumer</artifactId>
9-
<version>3.4.0</version>
9+
<version>3.5.0</version>
1010
<name>Neo4j Streams - Consumer</name>
1111
<description>Neo4j Streams - Kafka Consumer</description>
1212
<packaging>jar</packaging>
1313

1414
<parent>
1515
<groupId>org.neo4j</groupId>
1616
<artifactId>neo4j-streams-parent</artifactId>
17-
<version>3.4.0</version>
17+
<version>3.5.0</version>
1818
</parent>
1919

2020
<dependencies>
2121
<dependency>
2222
<groupId>org.neo4j</groupId>
2323
<artifactId>neo4j-streams-common</artifactId>
24-
<version>3.4.0</version>
24+
<version>3.5.0</version>
2525
</dependency>
2626
</dependencies>
2727

consumer/src/main/kotlin/streams/StreamsEventSinkExtensionFactory.kt

Lines changed: 14 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,19 @@
11
package streams
22

3-
import org.neo4j.kernel.AvailabilityGuard
3+
import org.neo4j.kernel.availability.AvailabilityGuard
4+
import org.neo4j.kernel.availability.AvailabilityListener
45
import org.neo4j.kernel.configuration.Config
6+
import org.neo4j.kernel.extension.ExtensionType
57
import org.neo4j.kernel.extension.KernelExtensionFactory
6-
import org.neo4j.kernel.impl.logging.LogService
78
import org.neo4j.kernel.impl.spi.KernelContext
89
import org.neo4j.kernel.internal.GraphDatabaseAPI
910
import org.neo4j.kernel.lifecycle.Lifecycle
1011
import org.neo4j.kernel.lifecycle.LifecycleAdapter
12+
import org.neo4j.logging.internal.LogService
1113
import streams.procedures.StreamsSinkProcedures
12-
import streams.utils.Neo4jUtils
1314
import streams.utils.StreamsUtils
1415

15-
class StreamsEventSinkExtensionFactory : KernelExtensionFactory<StreamsEventSinkExtensionFactory.Dependencies>("Streams.Consumer") {
16+
class StreamsEventSinkExtensionFactory : KernelExtensionFactory<StreamsEventSinkExtensionFactory.Dependencies>(ExtensionType.DATABASE,"Streams.Consumer") {
1617

1718
override fun newInstance(context: KernelContext, dependencies: Dependencies): Lifecycle {
1819
return StreamsEventLifecycle(dependencies)
@@ -27,43 +28,29 @@ class StreamsEventSinkExtensionFactory : KernelExtensionFactory<StreamsEventSink
2728

2829
class StreamsEventLifecycle(private val dependencies: StreamsEventSinkExtensionFactory.Dependencies): LifecycleAdapter() {
2930
private val db = dependencies.graphdatabaseAPI()
30-
private val logService = dependencies.log()
31+
private val log = dependencies.log()
3132
private val configuration = dependencies.config()
32-
private var streamsLog = logService.getUserLog(StreamsEventLifecycle::class.java)
33+
private var streamsLog = log.getUserLog(StreamsEventLifecycle::class.java)
3334

3435
private lateinit var eventSink: StreamsEventSink
3536

3637
override fun start() {
3738
try {
38-
dependencies.availabilityGuard().addListener(object: AvailabilityGuard.AvailabilityListener {
39+
dependencies.availabilityGuard().addListener(object: AvailabilityListener {
3940
override fun unavailable() {}
4041

4142
override fun available() {
42-
streamsLog.info("Initialising the Streams Sink module")
4343
val streamsSinkConfiguration = StreamsSinkConfiguration.from(configuration)
4444
val streamsTopicService = StreamsTopicService(db, streamsSinkConfiguration.topics)
45-
val streamsQueryExecution = StreamsEventSinkQueryExecution(streamsTopicService, db, logService.getUserLog(StreamsEventSinkQueryExecution::class.java))
46-
47-
// Create and start the Sink
48-
eventSink = StreamsEventSinkFactory
49-
.getStreamsEventSink(configuration,
50-
streamsQueryExecution,
51-
streamsTopicService,
52-
logService.getUserLog(StreamsEventSinkFactory::class.java))
53-
eventSink.start()
54-
if (Neo4jUtils.isWriteableInstance(db)) {
55-
if (streamsLog.isDebugEnabled) {
56-
streamsLog.debug("Subscribed topics with queries: $${streamsTopicService.getAll()}")
57-
} else {
58-
streamsLog.info("Subscribed topics: ${streamsTopicService.getTopics()}")
59-
}
60-
}
61-
62-
// Register required services for the Procedures
45+
val streamsQueryExecution = StreamsEventSinkQueryExecution(streamsTopicService, db, log.getUserLog(StreamsEventSinkQueryExecution::class.java))
6346
StreamsSinkProcedures.registerStreamsSinkConfiguration(streamsSinkConfiguration)
47+
eventSink = StreamsEventSinkFactory.getStreamsEventSink(configuration,
48+
streamsQueryExecution,
49+
streamsTopicService,
50+
log.getUserLog(StreamsEventSinkFactory::class.java))
51+
eventSink.start()
6452
StreamsSinkProcedures.registerStreamsEventConsumerFactory(eventSink.getEventConsumerFactory())
6553
StreamsSinkProcedures.registerStreamsEventSinkConfigMapper(eventSink.getEventSinkConfigMapper())
66-
streamsLog.info("Streams Sink module initialised")
6754
}
6855

6956
})

distribution/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
<parent>
1414
<groupId>org.neo4j</groupId>
1515
<artifactId>neo4j-streams-parent</artifactId>
16-
<version>3.4.0</version>
16+
<version>3.5.0</version>
1717
</parent>
1818

1919
<!-- Include here all the required dependencies of the final package -->

doc/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,14 @@
66

77
<groupId>org.neo4j</groupId>
88
<artifactId>neo4j-streams-docs</artifactId>
9-
<version>3.4.0</version>
9+
<version>3.5.0</version>
1010
<name>Neo4j Streams - Docs</name>
1111
<description>Neo4j Streams - Documentation</description>
1212

1313
<parent>
1414
<groupId>org.neo4j</groupId>
1515
<artifactId>neo4j-streams-parent</artifactId>
16-
<version>3.4.0</version>
16+
<version>3.5.0</version>
1717
</parent>
1818

1919
<properties>
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
{
2+
"name": "Neo4jSinkConnector",
3+
"config": {
4+
"topics": "my-topic",
5+
"connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
6+
"errors.retry.timeout": "-1",
7+
"errors.retry.delay.max.ms": "1000",
8+
"errors.tolerance": "all",
9+
"errors.log.enable": true,
10+
"errors.log.include.messages": true,
11+
"neo4j.server.uri": "bolt://neo4j:7687",
12+
"neo4j.authentication.basic.username": "neo4j",
13+
"neo4j.authentication.basic.password": "connect",
14+
"neo4j.encryption.enabled": false,
15+
"neo4j.topic.cypher.my-topic": "MERGE (p:Person{name: event.name, surname: event.surname}) MERGE (f:Family{name: event.surname}) MERGE (p)-[:BELONGS_TO]->(f)"
16+
}
17+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
{
2+
"name": "Neo4jSinkConnector",
3+
"config": {
4+
"topics": "my-topic",
5+
"connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
6+
"errors.retry.timeout": "-1",
7+
"errors.retry.delay.max.ms": "1000",
8+
"errors.tolerance": "all",
9+
"errors.log.enable": true,
10+
"errors.log.include.messages": true,
11+
"neo4j.server.uri": "bolt://neo4j:7687",
12+
"neo4j.authentication.basic.username": "neo4j",
13+
"neo4j.authentication.basic.password": "connect",
14+
"neo4j.encryption.enabled": false,
15+
"neo4j.topic.cypher.my-topic": "MERGE (p:Person{name: event.name, surname: event.surname}) MERGE (f:Family{name: event.surname}) MERGE (p)-[:BELONGS_TO]->(f)"
16+
}
17+
}
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
---
2+
version: '2'
3+
services:
4+
neo4j:
5+
image: neo4j:3.4
6+
hostname: neo4j
7+
container_name: neo4j
8+
ports:
9+
- "7474:7474"
10+
- "7687:7687"
11+
environment:
12+
NEO4J_kafka_zookeeper_connect: zookeeper:2181
13+
NEO4J_kafka_bootstrap_servers: broker:9093
14+
NEO4J_AUTH: neo4j/connect
15+
NEO4J_dbms_memory_heap_max__size: 8G
16+
17+
zookeeper:
18+
image: confluentinc/cp-zookeeper
19+
hostname: zookeeper
20+
container_name: zookeeper
21+
ports:
22+
- "2181:2181"
23+
environment:
24+
ZOOKEEPER_CLIENT_PORT: 2181
25+
ZOOKEEPER_TICK_TIME: 2000
26+
27+
broker:
28+
image: confluentinc/cp-enterprise-kafka
29+
hostname: broker
30+
container_name: broker
31+
depends_on:
32+
- zookeeper
33+
ports:
34+
- "9092:9092"
35+
expose:
36+
- "9093"
37+
environment:
38+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9093,OUTSIDE://localhost:9092
39+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
40+
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
41+
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9093
42+
43+
# workaround if we change to a custom name the schema_registry fails to start
44+
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
45+
46+
KAFKA_BROKER_ID: 1
47+
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
48+
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
49+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
50+
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
51+
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
52+
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
53+
CONFLUENT_METRICS_ENABLE: 'true'
54+
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
55+
56+
schema_registry:
57+
image: confluentinc/cp-schema-registry
58+
hostname: schema_registry
59+
container_name: schema_registry
60+
depends_on:
61+
- zookeeper
62+
- broker
63+
ports:
64+
- "8081:8081"
65+
environment:
66+
SCHEMA_REGISTRY_HOST_NAME: schema_registry
67+
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
68+
69+
connect:
70+
image: confluentinc/cp-kafka-connect
71+
hostname: connect
72+
container_name: connect
73+
depends_on:
74+
- zookeeper
75+
- broker
76+
- schema_registry
77+
ports:
78+
- "8083:8083"
79+
volumes:
80+
- ./plugins:/tmp/connect-plugins
81+
environment:
82+
CONNECT_BOOTSTRAP_SERVERS: 'broker:9093'
83+
CONNECT_REST_ADVERTISED_HOST_NAME: connect
84+
CONNECT_REST_PORT: 8083
85+
CONNECT_GROUP_ID: compose-connect-group
86+
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
87+
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
88+
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
89+
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
90+
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
91+
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
92+
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
93+
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
94+
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema_registry:8081'
95+
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
96+
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema_registry:8081'
97+
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
98+
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
99+
CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
100+
CONNECT_PLUGIN_PATH: /usr/share/java,/tmp/connect-plugins
101+
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=DEBUG,org.I0Itec.zkclient=DEBUG,org.reflections=ERROR
102+
103+
control-center:
104+
image: confluentinc/cp-enterprise-control-center
105+
hostname: control-center
106+
container_name: control-center
107+
depends_on:
108+
- zookeeper
109+
- broker
110+
- schema_registry
111+
- connect
112+
ports:
113+
- "9021:9021"
114+
environment:
115+
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:9093'
116+
CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
117+
CONTROL_CENTER_CONNECT_CLUSTER: 'connect:8083'
118+
CONTROL_CENTER_REPLICATION_FACTOR: 1
119+
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
120+
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
121+
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
122+
PORT: 9021

0 commit comments

Comments
 (0)