Skip to content

Commit b310424

Browse files
authored
fixes #472: Fix race condition on startup 4.2.x (#473)
1 parent 3b19274 commit b310424

File tree

2 files changed

+16
-13
lines changed

2 files changed

+16
-13
lines changed

consumer/src/main/kotlin/streams/StreamsEventSinkExtensionFactory.kt

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ import org.neo4j.kernel.internal.GraphDatabaseAPI
99
import org.neo4j.kernel.lifecycle.Lifecycle
1010
import org.neo4j.kernel.lifecycle.LifecycleAdapter
1111
import org.neo4j.logging.internal.LogService
12-
import streams.config.StreamsConfig
1312
import streams.extensions.isSystemDb
13+
import java.util.concurrent.atomic.AtomicReference
1414

1515
class StreamsEventSinkExtensionFactory : ExtensionFactory<StreamsEventSinkExtensionFactory.Dependencies>(ExtensionType.DATABASE,"Streams.Consumer") {
1616

@@ -29,21 +29,22 @@ class StreamsEventSinkExtensionFactory : ExtensionFactory<StreamsEventSinkExtens
2929
private val db = dependencies.graphdatabaseAPI()
3030
private val logService = dependencies.log()
3131
private val streamsLog = logService.getUserLog(StreamsEventLifecycle::class.java)
32-
private val availabilityListener: StreamsEventSinkAvailabilityListener? = if (db.isSystemDb()) {
32+
private val availabilityListener: AtomicReference<StreamsEventSinkAvailabilityListener> = AtomicReference(null)
33+
34+
private fun createStreamsEventSinkAvailabilityListener() = if (db.isSystemDb()) {
3335
null
3436
} else {
3537
StreamsEventSinkAvailabilityListener(dependencies)
3638
}
3739

3840
override fun start() {
39-
availabilityListener?.let {
40-
dependencies.availabilityGuard().addListener(availabilityListener)
41-
}
41+
availabilityListener.updateAndGet { it ?: createStreamsEventSinkAvailabilityListener() }
42+
?.let { dependencies.availabilityGuard().addListener(it) }
4243
}
4344

4445
override fun stop() {
4546
try {
46-
availabilityListener?.let {
47+
availabilityListener.getAndSet(null)?.let {
4748
it.shutdown()
4849
dependencies.availabilityGuard().removeListener(it)
4950
}

producer/src/main/kotlin/streams/StreamsExtensionFactory.kt

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import org.neo4j.kernel.lifecycle.Lifecycle
99
import org.neo4j.kernel.lifecycle.LifecycleAdapter
1010
import org.neo4j.logging.internal.LogService
1111
import streams.extensions.isSystemDb
12+
import java.util.concurrent.atomic.AtomicReference
1213

1314
class StreamsExtensionFactory : ExtensionFactory<StreamsExtensionFactory.Dependencies>(ExtensionType.DATABASE,"Streams.Producer") {
1415
override fun newInstance(context: ExtensionContext, dependencies: Dependencies): Lifecycle {
@@ -26,26 +27,27 @@ class StreamsExtensionFactory : ExtensionFactory<StreamsExtensionFactory.Depende
2627
}
2728

2829
class StreamsEventRouterLifecycle(private val availabilityGuard: AvailabilityGuard,
29-
db: GraphDatabaseAPI,
30-
log: LogService): LifecycleAdapter() {
30+
private val db: GraphDatabaseAPI,
31+
private val log: LogService): LifecycleAdapter() {
3132

3233
private val streamsLog = log.getUserLog(StreamsEventRouterLifecycle::class.java)
3334

34-
private val streamsEventRouterAvailabilityListener: StreamsEventRouterAvailabilityListener? = if (db.isSystemDb()) {
35+
private val availabilityListener: AtomicReference<StreamsEventRouterAvailabilityListener> = AtomicReference(null)
36+
37+
private fun createStreamsEventRouterAvailabilityListener() = if (db.isSystemDb()) {
3538
null
3639
} else {
3740
StreamsEventRouterAvailabilityListener(db, log)
3841
}
3942

4043
override fun start() {
41-
streamsEventRouterAvailabilityListener?.also {
42-
availabilityGuard.addListener(it)
43-
}
44+
availabilityListener.updateAndGet { it ?: createStreamsEventRouterAvailabilityListener() }
45+
?.let { availabilityGuard.addListener(it) }
4446
}
4547

4648
override fun stop() {
4749
try {
48-
streamsEventRouterAvailabilityListener?.also {
50+
availabilityListener.getAndSet(null)?.let {
4951
it.shutdown()
5052
availabilityGuard.removeListener(it)
5153
}

0 commit comments

Comments
 (0)