Skip to content

Commit 9d9f0d7

Browse files
conker84mneedham
authored andcommitted
fixes #167 Added the AvailabilityGuard in the StreamsEventSinkExtensionFactory (#168)
1 parent 7e36185 commit 9d9f0d7

File tree

4 files changed

+21
-6
lines changed

4 files changed

+21
-6
lines changed

producer/src/main/kotlin/streams/StreamsConstraintsService.kt

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,19 @@ import java.util.*
1212
import java.util.concurrent.ConcurrentHashMap
1313

1414
class StreamsConstraintsService(private val db: GraphDatabaseService, private val poolInterval: Long): Closeable {
15-
override fun close() = runBlocking {
16-
job.cancelAndJoin()
17-
}
1815

1916
private val nodeConstraints = ConcurrentHashMap<String, Set<Constraint>>()
2017
private val relConstraints = ConcurrentHashMap<String, Set<Constraint>>()
2118

22-
private val job: Job
19+
private lateinit var job: Job
20+
21+
override fun close() = runBlocking {
22+
if (::job.isInitialized) {
23+
job.cancelAndJoin()
24+
}
25+
}
2326

24-
init {
27+
fun start() {
2528
job = GlobalScope.launch(Dispatchers.IO) {
2629
while (isActive) {
2730
StreamsUtils.ignoreExceptions({

producer/src/main/kotlin/streams/StreamsExtensionFactory.kt

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package streams
22

3+
import org.neo4j.kernel.AvailabilityGuard
34
import org.neo4j.kernel.configuration.Config
45
import org.neo4j.kernel.extension.ExtensionType
56
import org.neo4j.kernel.extension.KernelExtensionFactory
@@ -17,18 +18,20 @@ class StreamsExtensionFactory : KernelExtensionFactory<StreamsExtensionFactory.D
1718
val configuration = dependencies.config()
1819
val streamHandler = StreamsEventRouterFactory.getStreamsEventRouter(log, configuration)
1920
val streamsEventRouterConfiguration = StreamsEventRouterConfiguration.from(configuration.raw)
20-
return StreamsEventRouterLifecycle(db, streamHandler, streamsEventRouterConfiguration, log)
21+
return StreamsEventRouterLifecycle(db, streamHandler, streamsEventRouterConfiguration, dependencies.availabilityGuard(), log)
2122
}
2223

2324
interface Dependencies {
2425
fun graphdatabaseAPI(): GraphDatabaseAPI
2526
fun log(): LogService
2627
fun config(): Config
28+
fun availabilityGuard(): AvailabilityGuard
2729
}
2830
}
2931

3032
class StreamsEventRouterLifecycle(val db: GraphDatabaseAPI, val streamHandler: StreamsEventRouter,
3133
val streamsEventRouterConfiguration: StreamsEventRouterConfiguration,
34+
private val availabilityGuard: AvailabilityGuard,
3235
private val log: LogService): LifecycleAdapter() {
3336
private val streamsLog = log.getUserLog(StreamsEventRouterLifecycle::class.java)
3437
private lateinit var txHandler: StreamsTransactionEventHandler
@@ -53,6 +56,13 @@ class StreamsEventRouterLifecycle(val db: GraphDatabaseAPI, val streamHandler: S
5356
streamsConstraintsService = StreamsConstraintsService(db, streamsEventRouterConfiguration.schemaPollingInterval)
5457
txHandler = StreamsTransactionEventHandler(streamHandler, streamsConstraintsService, streamsEventRouterConfiguration)
5558
db.registerTransactionEventHandler(txHandler)
59+
availabilityGuard.addListener(object: AvailabilityGuard.AvailabilityListener {
60+
override fun unavailable() {}
61+
62+
override fun available() {
63+
streamsConstraintsService.start()
64+
}
65+
})
5666
}
5767
}
5868

producer/src/test/kotlin/streams/StreamsTransactionEventHandlerRelTest.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ class StreamsTransactionEventHandlerRelTest {
4040
Mockito.`when`(schemaMock.constraints).thenReturn(emptyList())
4141
Mockito.`when`(dbMock.schema()).thenReturn(schemaMock)
4242
streamsConstraintsService = StreamsConstraintsService(dbMock, 0)
43+
streamsConstraintsService.start()
4344
handler = StreamsTransactionEventHandler(MockStreamsEventRouter(),
4445
streamsConstraintsService, StreamsEventRouterConfiguration())
4546
MockStreamsEventRouter.reset()

producer/src/test/kotlin/streams/StreamsTransactionEventHandlerTest.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ class StreamsTransactionEventHandlerTest {
3838
Mockito.`when`(schemaMock.getConstraints(Mockito.any(RelationshipType::class.java))).thenReturn(emptyList())
3939
Mockito.`when`(dbMock.schema()).thenReturn(schemaMock)
4040
streamsConstraintsService = StreamsConstraintsService(dbMock, 0)
41+
streamsConstraintsService.start()
4142
handler = StreamsTransactionEventHandler(MockStreamsEventRouter(),
4243
streamsConstraintsService, StreamsEventRouterConfiguration())
4344
MockStreamsEventRouter.reset()

0 commit comments

Comments
 (0)