Skip to content

Commit e407984

Browse files
committed
Added logic for primary Clowder instance which handles multiple extractors problem and multiple indexing issue
1 parent 86ba0e3 commit e407984

File tree

5 files changed

+31
-19
lines changed

5 files changed

+31
-19
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](http://keepachangelog.com/)
66
and this project adheres to [Semantic Versioning](http://semver.org/).
77

8+
## Unreleased
9+
- Fixes bug where same extractor shows up multiple times and all Clowder instances index db on reindex [#327](https://github.com/clowder-framework/clowder/issues/327)
10+
811
## 1.20.1 - 2022-04-04
912

1013
### Fixed

app/services/mongodb/MongoDBQueueService.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,8 @@ trait MongoDBQueueService {
9393

9494
// start pool to being processing queue actions
9595
def listen() = {
96-
if (queueTimer == null) {
96+
//only if this is the primary clowder instance
97+
if (queueTimer == null && configuration.getBoolean("clowder.primary").getOrElse(true)) {
9798
// TODO: Need to make these in a separate pool
9899
queueTimer = Akka.system().scheduler.schedule(0 seconds, 5 millis) {
99100
getNextQueuedAction match {

app/services/rabbitmq/RabbitMQMessageService.scala

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -128,24 +128,27 @@ class RabbitMQMessageService extends MessageService {
128128
new MsgConsumer(channel.get, event_filter.get)
129129
)
130130

131-
// Start actor to listen to extractor heartbeats
132-
Logger.info("Starting extractor heartbeat listener")
133-
// create fanout exchange if it doesn't already exist
134-
channel.get.exchangeDeclare("extractors", "fanout", true)
135-
// anonymous queue
136-
val heartbeatsQueue = channel.get.queueDeclare().getQueue
137-
// bind queue to exchange
138-
channel.get.queueBind(heartbeatsQueue, "extractors", "*")
139-
extractorsHeartbeats = Some(Akka.system.actorOf(
140-
Props(new ExtractorsHeartbeats(channel.get, heartbeatsQueue)), name = "ExtractorsHeartbeats"
141-
))
142-
Logger.debug("Initializing a MsgConsumer for the ExtractorsHeartbeats")
143-
channel.get.basicConsume(
144-
heartbeatsQueue,
145-
false, // do not auto ack
146-
"ExtractorsHeartbeats", // tagging the consumer is important if you want to stop it later
147-
new MsgConsumer(channel.get, extractorsHeartbeats.get)
148-
)
131+
//register new extractor only if this is the primary clowder instance
132+
if (configuration.getBoolean("clowder.primary").getOrElse(true)) {
133+
// Start actor to listen to extractor heartbeats
134+
Logger.info("Starting extractor heartbeat listener")
135+
// create fanout exchange if it doesn't already exist
136+
channel.get.exchangeDeclare("extractors", "fanout", true)
137+
// anonymous queue
138+
val heartbeatsQueue = channel.get.queueDeclare().getQueue
139+
// bind queue to exchange
140+
channel.get.queueBind(heartbeatsQueue, "extractors", "*")
141+
extractorsHeartbeats = Some(Akka.system.actorOf(
142+
Props(new ExtractorsHeartbeats(channel.get, heartbeatsQueue)), name = "ExtractorsHeartbeats"
143+
))
144+
Logger.debug("Initializing a MsgConsumer for the ExtractorsHeartbeats")
145+
channel.get.basicConsume(
146+
heartbeatsQueue,
147+
false, // do not auto ack
148+
"ExtractorsHeartbeats", // tagging the consumer is important if you want to stop it later
149+
new MsgConsumer(channel.get, extractorsHeartbeats.get)
150+
)
151+
}
149152

150153
// Setup Actor to submit new extractions to broker
151154
extractQueue = Some(Akka.system.actorOf(Props(new PublishDirectActor(channel = channel.get,

conf/application.conf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ mongodbURI = "mongodb://127.0.0.1:27017/clowder"
190190
# All requests to extractors and any other external process is send
191191
# using rabbitmq, this will setup the connection information.
192192
clowder.rabbitmq.uri="amqp://guest:guest@localhost:5672/%2f"
193+
clowder.primary=true
193194
#clowder.rabbitmq.managmentPort=15672
194195
#clowder.rabbitmq.exchange=clowder
195196
# Following variable will use the url when sending messages over rabbitmq. This can be used in a docker

docker/custom.conf

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ smtp.host=${?SMTP_SERVER}
3737
service.byteStorage=services.filesystem.DiskByteStorageService
3838
service.byteStorage=${?CLOWDER_STORAGE}
3939

40+
#primary Clowder instance
41+
clowder.primary=true
42+
clowder.primary=${?CLOWDER_PRIMARY}
43+
4044
# location in case of services.filesystem.DiskByteStorageService
4145
clowder.diskStorage.path="/home/clowder/data"
4246
clowder.diskStorage.path=${?CLOWDER_DISKPATH}

0 commit comments

Comments
 (0)