Skip to content

Commit a3f1619

Browse files
authored
Prepare first release 2k22 (#516)
* minor fixes procedures * fixes #510: java.lang.NoClassDefFoundError: com/google/common/base/Preconditions * version bump * fixed KafkaNeo4jRecoveryTSE * test fixes * fix procedure test
1 parent e1245cc commit a3f1619

File tree

11 files changed

+102
-155
lines changed

11 files changed

+102
-155
lines changed

common/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,13 @@
88
<artifactId>neo4j-streams-common</artifactId>
99
<name>Neo4j Streams - Common</name>
1010
<description>Neo4j Streams - Commons Package</description>
11-
<version>4.1.0</version>
11+
<version>4.1.1</version>
1212
<packaging>jar</packaging>
1313

1414
<parent>
1515
<groupId>org.neo4j</groupId>
1616
<artifactId>neo4j-streams-parent</artifactId>
17-
<version>4.1.0</version>
17+
<version>4.1.1</version>
1818
</parent>
1919

2020
<dependencies>

consumer/pom.xml

Lines changed: 2 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,15 @@
66

77
<groupId>org.neo4j</groupId>
88
<artifactId>neo4j-streams-consumer</artifactId>
9-
<version>4.1.0</version>
9+
<version>4.1.1</version>
1010
<name>Neo4j Streams - Consumer</name>
1111
<description>Neo4j Streams - Kafka Consumer</description>
1212
<packaging>jar</packaging>
1313

1414
<parent>
1515
<groupId>org.neo4j</groupId>
1616
<artifactId>neo4j-streams-parent</artifactId>
17-
<version>4.1.0</version>
17+
<version>4.1.1</version>
1818
</parent>
1919

2020
<repositories>
@@ -59,79 +59,4 @@
5959
</dependency>
6060

6161
</dependencies>
62-
63-
<profiles>
64-
<profile>
65-
<id>default</id>
66-
<dependencies>
67-
<dependency>
68-
<groupId>com.neo4j</groupId>
69-
<artifactId>neo4j-causal-clustering</artifactId>
70-
<scope>test</scope>
71-
<exclusions>
72-
<exclusion>
73-
<groupId>com.fasterxml.jackson.core</groupId>
74-
<artifactId>jackson-databind</artifactId>
75-
</exclusion>
76-
</exclusions>
77-
</dependency>
78-
<dependency>
79-
<groupId>com.neo4j</groupId>
80-
<artifactId>neo4j-causal-clustering</artifactId>
81-
<type>test-jar</type>
82-
<scope>test</scope>
83-
<exclusions>
84-
<exclusion>
85-
<groupId>com.fasterxml.jackson.core</groupId>
86-
<artifactId>jackson-databind</artifactId>
87-
</exclusion>
88-
</exclusions>
89-
</dependency>
90-
</dependencies>
91-
<activation>
92-
<property>
93-
<name>!env.TRAVIS</name>
94-
</property>
95-
<activeByDefault>true</activeByDefault>
96-
</activation>
97-
</profile>
98-
<profile>
99-
<id>travis</id>
100-
<activation>
101-
<property>
102-
<name>env.TRAVIS</name>
103-
<value>true</value>
104-
</property>
105-
</activation>
106-
<build>
107-
<plugins>
108-
<plugin>
109-
<groupId>org.jetbrains.kotlin</groupId>
110-
<artifactId>kotlin-maven-plugin</artifactId>
111-
<version>${kotlin.version}</version>
112-
<executions>
113-
<execution>
114-
<id>test-compile</id>
115-
<goals>
116-
<goal>test-compile</goal>
117-
</goals>
118-
<configuration>
119-
<!--
120-
As Kotlin Maven Compiler does not provide testExcludes option
121-
we need to workaround specifying the test source dirs and excluding
122-
by hand the `enterprise` directory
123-
-->
124-
<sourceDirs>
125-
<sourceDir>${project.basedir}/src/test/kotlin/integrations</sourceDir>
126-
<sourceDir>${project.basedir}/src/test/kotlin/streams</sourceDir>
127-
</sourceDirs>
128-
</configuration>
129-
</execution>
130-
</executions>
131-
</plugin>
132-
</plugins>
133-
</build>
134-
</profile>
135-
</profiles>
136-
13762
</project>

consumer/src/main/kotlin/streams/procedures/QueueBasedSpliterator.kt

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@ class QueueBasedSpliterator<T> constructor(private val queue: BlockingQueue<T>,
2424

2525
override fun tryAdvance(action: Consumer<in T?>): Boolean {
2626
if (transactionIsTerminated(terminationGuard)) return false
27-
if (isEnd) return false
27+
if (isEnd()) return false
2828
action.accept(entry)
2929
entry = poll()
30-
return !isEnd
30+
return !isEnd()
3131
}
3232

3333
private fun transactionIsTerminated(terminationGuard: TerminationGuard): Boolean {
@@ -42,8 +42,7 @@ class QueueBasedSpliterator<T> constructor(private val queue: BlockingQueue<T>,
4242
}
4343
}
4444

45-
private val isEnd: Boolean
46-
private get() = entry == null || entry === tombstone
45+
private fun isEnd(): Boolean = entry == tombstone
4746

4847
private fun poll(): T? {
4948
return try {

consumer/src/main/kotlin/streams/procedures/StreamsSinkProcedures.kt

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,9 @@ class StreamsSinkProcedures {
134134

135135
private fun readData(topic: String, procedureConfig: Map<String, Any>, consumerConfig: Map<String, String>): Stream<StreamResult?> {
136136
val cfg = procedureConfig.mapValues { if (it.key != "partitions") it.value else mapOf(topic to it.value) }
137+
val maxRecords = cfg.getOrDefault("max.records", 1000).toString().toInt()
137138
val timeout = cfg.getOrDefault("timeout", 1000).toString().toLong()
138-
val data = ArrayBlockingQueue<StreamResult>(1000)
139+
val data = ArrayBlockingQueue<StreamResult>(maxRecords)
139140
val tombstone = StreamResult(emptyMap<String, Any>())
140141
GlobalScope.launch(Dispatchers.IO) {
141142
val consumer = createConsumer(consumerConfig, topic)
@@ -144,15 +145,19 @@ class StreamsSinkProcedures {
144145
val start = System.currentTimeMillis()
145146
while ((System.currentTimeMillis() - start) < timeout) {
146147
consumer.read(cfg) { _, topicData ->
147-
data.addAll(topicData.mapNotNull { it.value }.map { StreamResult(mapOf("data" to it)) })
148+
var elements = topicData.mapNotNull { it.value }.map { StreamResult(mapOf("data" to it)) }
149+
if (elements.size + data.size >= maxRecords) {
150+
elements = elements.subList(0, maxRecords - data.size - 1)
151+
}
152+
data.addAll(elements)
148153
}
149154
}
150-
data.add(tombstone)
151155
} catch (e: Exception) {
152156
if (log?.isDebugEnabled!!) {
153157
log?.error("Error while consuming data", e)
154158
}
155159
} finally {
160+
data.add(tombstone)
156161
consumer.stop()
157162
}
158163
}

consumer/src/test/kotlin/integrations/kafka/KafkaNeo4jRecoveryTSE.kt

Lines changed: 50 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,20 @@ import org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric
77
import org.apache.commons.lang3.exception.ExceptionUtils.getRootCause
88
import org.apache.kafka.clients.producer.ProducerRecord
99
import org.assertj.core.api.Assertions.assertThat
10-
import org.junit.jupiter.api.Assertions.*
10+
import org.junit.jupiter.api.Assertions.assertEquals
11+
import org.junit.jupiter.api.Assertions.assertFalse
12+
import org.junit.jupiter.api.Assertions.assertNotNull
13+
import org.junit.jupiter.api.Assertions.assertThrows
14+
import org.junit.jupiter.api.Assertions.assertTrue
1115
import org.junit.jupiter.api.BeforeEach
1216
import org.junit.jupiter.api.Test
1317
import org.neo4j.configuration.Config
1418
import org.neo4j.configuration.Config.defaults
15-
import org.neo4j.configuration.GraphDatabaseSettings.*
19+
import org.neo4j.configuration.GraphDatabaseInternalSettings
20+
import org.neo4j.configuration.GraphDatabaseSettings.DEFAULT_DATABASE_NAME
21+
import org.neo4j.configuration.GraphDatabaseSettings.fail_on_missing_files
22+
import org.neo4j.configuration.GraphDatabaseSettings.logical_log_rotation_threshold
23+
import org.neo4j.configuration.GraphDatabaseSettings.preallocate_logical_logs
1624
import org.neo4j.dbms.DatabaseStateService
1725
import org.neo4j.dbms.api.DatabaseManagementService
1826
import org.neo4j.dbms.database.DatabaseStartAbortedException
@@ -21,16 +29,15 @@ import org.neo4j.graphdb.Label
2129
import org.neo4j.graphdb.RelationshipType.withName
2230
import org.neo4j.graphdb.schema.IndexType
2331
import org.neo4j.internal.helpers.collection.Iterables.count
24-
import org.neo4j.internal.index.label.RelationshipTypeScanStoreSettings.enable_relationship_type_scan_store
25-
import org.neo4j.internal.kernel.api.IndexQuery.fulltextSearch
2632
import org.neo4j.internal.kernel.api.IndexQueryConstraints.unconstrained
33+
import org.neo4j.internal.kernel.api.PropertyIndexQuery.fulltextSearch
2734
import org.neo4j.io.ByteUnit
2835
import org.neo4j.io.fs.DefaultFileSystemAbstraction
2936
import org.neo4j.io.layout.DatabaseLayout
3037
import org.neo4j.io.layout.Neo4jLayout
3138
import org.neo4j.io.pagecache.PageCache
39+
import org.neo4j.io.pagecache.context.CursorContext
3240
import org.neo4j.io.pagecache.tracing.DefaultPageCacheTracer
33-
import org.neo4j.io.pagecache.tracing.cursor.PageCursorTracer.NULL
3441
import org.neo4j.kernel.availability.CompositeDatabaseAvailabilityGuard
3542
import org.neo4j.kernel.database.DatabaseTracers
3643
import org.neo4j.kernel.database.DatabaseTracers.EMPTY
@@ -61,7 +68,6 @@ import org.neo4j.test.extension.Inject
6168
import org.neo4j.test.extension.Neo4jLayoutExtension
6269
import org.neo4j.test.extension.pagecache.PageCacheExtension
6370
import streams.utils.JSONUtils
64-
import java.io.IOException
6571
import java.lang.String.valueOf
6672
import java.nio.file.DirectoryStream
6773
import java.nio.file.Path
@@ -120,7 +126,15 @@ class KafkaNeo4jRecoveryTSE: KafkaEventSinkBaseTSE() {
120126

121127
@Test
122128
fun recoverEmptyDatabase() {
123-
createDatabase()
129+
val config = Config.newBuilder()
130+
.set(GraphDatabaseInternalSettings.skip_default_indexes_on_creation, true)
131+
.set(preallocate_logical_logs, false)
132+
.build()
133+
134+
managementService = TestDatabaseManagementServiceBuilder(neo4jLayout)
135+
.setConfig(config)
136+
.build()
137+
managementService!!.database(databaseLayout!!.databaseName) as GraphDatabaseAPI
124138
managementService!!.shutdown()
125139
sendKafkaEvents()
126140
RecoveryHelpers.removeLastCheckpointRecordFromLastLogFile(databaseLayout, fileSystem)
@@ -139,7 +153,7 @@ class KafkaNeo4jRecoveryTSE: KafkaEventSinkBaseTSE() {
139153
recoverDatabase()
140154
val recoveredDatabase: GraphDatabaseService = createDatabase()
141155
try {
142-
recoveredDatabase.beginTx().use { tx -> assertEquals(numberOfNodes, count(tx.allNodes)) }
156+
recoveredDatabase.beginTx().use { tx -> assertEquals(numberOfNodes.toLong(), count(tx.allNodes)) }
143157
} finally {
144158
managementService!!.shutdown()
145159
}
@@ -161,7 +175,7 @@ class KafkaNeo4jRecoveryTSE: KafkaEventSinkBaseTSE() {
161175
assertThat(pageCacheTracer.hits() + pageCacheTracer.faults()).isEqualTo(pageCacheTracer.pins())
162176
val recoveredDatabase: GraphDatabaseService = createDatabase()
163177
try {
164-
recoveredDatabase.beginTx().use { tx -> assertEquals(numberOfNodes, count(tx.allNodes)) }
178+
recoveredDatabase.beginTx().use { tx -> assertEquals(numberOfNodes.toLong(), count(tx.allNodes)) }
165179
} finally {
166180
managementService!!.shutdown()
167181
}
@@ -186,9 +200,9 @@ class KafkaNeo4jRecoveryTSE: KafkaEventSinkBaseTSE() {
186200
val recoveredDatabase: GraphDatabaseService = createDatabase()
187201
try {
188202
recoveredDatabase.beginTx().use { transaction ->
189-
assertEquals(numberOfNodes, count(transaction.allNodes))
190-
assertEquals(numberOfRelationships, count(transaction.allRelationships))
191-
assertEquals(numberOfRelationships, count(transaction.allRelationshipTypesInUse))
203+
assertEquals(numberOfNodes.toLong(), count(transaction.allNodes))
204+
assertEquals(numberOfRelationships.toLong(), count(transaction.allRelationships))
205+
assertEquals(numberOfRelationships.toLong(), count(transaction.allRelationshipTypesInUse))
192206
}
193207
} finally {
194208
managementService!!.shutdown()
@@ -216,10 +230,10 @@ class KafkaNeo4jRecoveryTSE: KafkaEventSinkBaseTSE() {
216230
val recoveredDatabase: GraphDatabaseService = createDatabase()
217231
try {
218232
recoveredDatabase.beginTx().use { transaction ->
219-
assertEquals(numberOfNodes, count(transaction.allNodes))
220-
assertEquals(numberOfRelationships, count(transaction.allRelationships))
221-
assertEquals(numberOfRelationships, count(transaction.allRelationshipTypesInUse))
222-
assertEquals(numberOfNodes, count(transaction.allPropertyKeys))
233+
assertEquals(numberOfNodes.toLong(), count(transaction.allNodes))
234+
assertEquals(numberOfRelationships.toLong(), count(transaction.allRelationships))
235+
assertEquals(numberOfRelationships.toLong(), count(transaction.allRelationshipTypesInUse))
236+
assertEquals(numberOfNodes.toLong(), count(transaction.allPropertyKeys))
223237
}
224238
} finally {
225239
managementService!!.shutdown()
@@ -259,10 +273,10 @@ class KafkaNeo4jRecoveryTSE: KafkaEventSinkBaseTSE() {
259273
val recoveredDatabase: GraphDatabaseService = createDatabase()
260274
try {
261275
recoveredDatabase.beginTx().use { transaction ->
262-
assertEquals(numberOfNodes, count(transaction.allNodes))
263-
assertEquals(numberOfRelationships, count(transaction.allRelationships))
264-
assertEquals(numberOfRelationships, count(transaction.allRelationshipTypesInUse))
265-
assertEquals(numberOfPropertyKeys, count(transaction.allPropertyKeys))
276+
assertEquals(numberOfNodes.toLong(), count(transaction.allNodes))
277+
assertEquals(numberOfRelationships.toLong(), count(transaction.allRelationships))
278+
assertEquals(numberOfRelationships.toLong(), count(transaction.allRelationshipTypesInUse))
279+
assertEquals(numberOfPropertyKeys.toLong(), count(transaction.allPropertyKeys))
266280
}
267281
} finally {
268282
managementService!!.shutdown()
@@ -300,9 +314,10 @@ class KafkaNeo4jRecoveryTSE: KafkaEventSinkBaseTSE() {
300314
recoveredDatabase.beginTx().use { transaction ->
301315
val ktx = (transaction as InternalTransaction).kernelTransaction()
302316
val index = ktx.schemaRead().indexGetForName(indexName)
317+
val indexReadSession = ktx.dataRead().indexReadSession(index)
303318
var relationshipsInIndex = 0
304-
ktx.cursors().allocateRelationshipIndexCursor(ktx.pageCursorTracer()).use { cursor ->
305-
ktx.dataRead().relationshipIndexSeek(index, cursor, unconstrained(), fulltextSearch("*"))
319+
ktx.cursors().allocateRelationshipValueIndexCursor(ktx.cursorContext(), ktx.memoryTracker()).use { cursor ->
320+
ktx.dataRead().relationshipIndexSeek(indexReadSession, cursor, unconstrained(), fulltextSearch("*"))
306321
while (cursor.next()) {
307322
relationshipsInIndex++
308323
}
@@ -395,7 +410,8 @@ class KafkaNeo4jRecoveryTSE: KafkaEventSinkBaseTSE() {
395410
pageCache,
396411
database.databaseLayout().metadataStore(),
397412
LAST_MISSING_STORE_FILES_RECOVERY_TIMESTAMP,
398-
NULL
413+
databaseLayout!!.databaseName,
414+
CursorContext.NULL
399415
)
400416
)
401417
managementService!!.shutdown()
@@ -558,7 +574,7 @@ class KafkaNeo4jRecoveryTSE: KafkaEventSinkBaseTSE() {
558574
}
559575
}
560576
monitors.addMonitorListener(recoveryMonitor)
561-
val service = builderWithRelationshipTypeScanStoreSet(layout.neo4jLayout)
577+
val service = createTestDatabaseBuilder(layout.neo4jLayout)
562578
.addExtension(guardExtensionFactory)
563579
.setMonitors(monitors).build()
564580
try {
@@ -599,16 +615,14 @@ class KafkaNeo4jRecoveryTSE: KafkaEventSinkBaseTSE() {
599615
}
600616

601617
private fun recoverDatabase(databaseTracers: DatabaseTracers) {
602-
val config =
603-
Config.newBuilder().set(enable_relationship_type_scan_store, enableRelationshipTypeScanStore()).build()
618+
val config = Config.newBuilder().build()
604619
assertTrue(isRecoveryRequired(databaseLayout, config))
605620
performRecovery(fileSystem, pageCache, databaseTracers, config, databaseLayout, INSTANCE)
606621
assertFalse(isRecoveryRequired(databaseLayout, config))
607622
}
608623

609624
private fun isRecoveryRequired(layout: DatabaseLayout?): Boolean {
610-
val config =
611-
Config.newBuilder().set(enable_relationship_type_scan_store, enableRelationshipTypeScanStore()).build()
625+
val config = Config.newBuilder().build()
612626
return isRecoveryRequired(layout, config)
613627
}
614628

@@ -625,7 +639,7 @@ class KafkaNeo4jRecoveryTSE: KafkaEventSinkBaseTSE() {
625639
private fun buildLogFiles(): LogFiles {
626640
return LogFilesBuilder
627641
.logFilesBasedOnlyBuilder(databaseLayout!!.transactionLogsDirectory, fileSystem)
628-
.withCommandReaderFactory(StorageEngineFactory.selectStorageEngine().commandReaderFactory())
642+
.withCommandReaderFactory(StorageEngineFactory.defaultStorageEngine().commandReaderFactory())
629643
.build()
630644
}
631645

@@ -670,7 +684,7 @@ class KafkaNeo4jRecoveryTSE: KafkaEventSinkBaseTSE() {
670684

671685
private fun createBuilder(logThreshold: Long) {
672686
if (builder == null) {
673-
builder = builderWithRelationshipTypeScanStoreSet()
687+
builder = createTestDatabaseBuilder()
674688
.setConfig(preallocate_logical_logs, false)
675689
.setConfig(logical_log_rotation_threshold, logThreshold)
676690
}
@@ -682,18 +696,17 @@ class KafkaNeo4jRecoveryTSE: KafkaEventSinkBaseTSE() {
682696
}
683697

684698
private fun forcedRecoveryManagement(): DatabaseManagementService {
685-
return builderWithRelationshipTypeScanStoreSet()
699+
return createTestDatabaseBuilder()
686700
.setConfig(fail_on_missing_files, false)
687701
.build()
688702
}
689703

690-
private fun builderWithRelationshipTypeScanStoreSet(): TestDatabaseManagementServiceBuilder {
691-
return builderWithRelationshipTypeScanStoreSet(neo4jLayout)
704+
private fun createTestDatabaseBuilder(): TestDatabaseManagementServiceBuilder {
705+
return createTestDatabaseBuilder(neo4jLayout)
692706
}
693707

694-
private fun builderWithRelationshipTypeScanStoreSet(neo4jLayout: Neo4jLayout?): TestDatabaseManagementServiceBuilder {
708+
private fun createTestDatabaseBuilder(neo4jLayout: Neo4jLayout?): TestDatabaseManagementServiceBuilder {
695709
return TestDatabaseManagementServiceBuilder(neo4jLayout)
696-
.setConfig(enable_relationship_type_scan_store, enableRelationshipTypeScanStore())
697710
}
698711

699712
private fun getDatabasePageCache(databaseAPI: GraphDatabaseAPI): PageCache {
@@ -708,7 +721,8 @@ class KafkaNeo4jRecoveryTSE: KafkaEventSinkBaseTSE() {
708721
restartedCache,
709722
databaseAPI.databaseLayout().metadataStore(),
710723
LAST_MISSING_STORE_FILES_RECOVERY_TIMESTAMP,
711-
NULL
724+
databaseLayout!!.databaseName,
725+
CursorContext.NULL
712726
)
713727
assertThat(record).isGreaterThan(0L)
714728
} finally {

0 commit comments

Comments
 (0)