Skip to content

Commit 359df51

Browse files
authored
Merge pull request #348 from clowder-framework/ddey2/327
Added logic for primary Clowder instance which handles multiple extra…
2 parents 121649a + 15f74c3 commit 359df51

File tree

5 files changed

+35
-19
lines changed

5 files changed

+35
-19
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
1111
- swagger lint action
1212
- When downloading a file with a `'` in the name it would save the file as blob
1313
- Fix for a rare race condition with masonry where tiles could end up overlapping in space page.
14+
- Fixes bug where same extractor shows up multiple times and all Clowder instances index db on reindex [#327](https://github.com/clowder-framework/clowder/issues/327)
1415

1516
### Changed
1617
- Changed `Enabled By SuperAdmin` to read `Enabled by Server Admin` [#344](https://github.com/clowder-framework/clowder/issues/344)

app/services/mongodb/MongoDBQueueService.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,8 @@ trait MongoDBQueueService {
9393

9494
// start pool to being processing queue actions
9595
def listen() = {
96-
if (queueTimer == null) {
96+
//only if this is the primary clowder instance
97+
if (queueTimer == null && configuration.getBoolean("clowder.primary").getOrElse(true)) {
9798
// TODO: Need to make these in a separate pool
9899
queueTimer = Akka.system().scheduler.schedule(0 seconds, 5 millis) {
99100
getNextQueuedAction match {

app/services/rabbitmq/RabbitMQMessageService.scala

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -128,24 +128,27 @@ class RabbitMQMessageService extends MessageService {
128128
new MsgConsumer(channel.get, event_filter.get)
129129
)
130130

131-
// Start actor to listen to extractor heartbeats
132-
Logger.info("Starting extractor heartbeat listener")
133-
// create fanout exchange if it doesn't already exist
134-
channel.get.exchangeDeclare("extractors", "fanout", true)
135-
// anonymous queue
136-
val heartbeatsQueue = channel.get.queueDeclare().getQueue
137-
// bind queue to exchange
138-
channel.get.queueBind(heartbeatsQueue, "extractors", "*")
139-
extractorsHeartbeats = Some(Akka.system.actorOf(
140-
Props(new ExtractorsHeartbeats(channel.get, heartbeatsQueue)), name = "ExtractorsHeartbeats"
141-
))
142-
Logger.debug("Initializing a MsgConsumer for the ExtractorsHeartbeats")
143-
channel.get.basicConsume(
144-
heartbeatsQueue,
145-
false, // do not auto ack
146-
"ExtractorsHeartbeats", // tagging the consumer is important if you want to stop it later
147-
new MsgConsumer(channel.get, extractorsHeartbeats.get)
148-
)
131+
//register new extractor only if this is the primary clowder instance
132+
if (configuration.getBoolean("clowder.primary").getOrElse(true)) {
133+
// Start actor to listen to extractor heartbeats
134+
Logger.info("Starting extractor heartbeat listener")
135+
// create fanout exchange if it doesn't already exist
136+
channel.get.exchangeDeclare("extractors", "fanout", true)
137+
// anonymous queue
138+
val heartbeatsQueue = channel.get.queueDeclare().getQueue
139+
// bind queue to exchange
140+
channel.get.queueBind(heartbeatsQueue, "extractors", "*")
141+
extractorsHeartbeats = Some(Akka.system.actorOf(
142+
Props(new ExtractorsHeartbeats(channel.get, heartbeatsQueue)), name = "ExtractorsHeartbeats"
143+
))
144+
Logger.debug("Initializing a MsgConsumer for the ExtractorsHeartbeats")
145+
channel.get.basicConsume(
146+
heartbeatsQueue,
147+
false, // do not auto ack
148+
"ExtractorsHeartbeats", // tagging the consumer is important if you want to stop it later
149+
new MsgConsumer(channel.get, extractorsHeartbeats.get)
150+
)
151+
}
149152

150153
// Setup Actor to submit new extractions to broker
151154
extractQueue = Some(Akka.system.actorOf(Props(new PublishDirectActor(channel = channel.get,

conf/application.conf

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,13 @@ api.version="beta"
183183
# mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]
184184
mongodbURI = "mongodb://127.0.0.1:27017/clowder"
185185

186+
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
187+
# Clowder Primary Instance
188+
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
189+
# This is required if there are multiple clowder instances. This config variable indicates
190+
# the primary clowder instance who takes care of special actions like registering new extractors,
191+
# indexing db on a reindex etc. The default value is true which means only one instance of clowder running.
192+
clowder.primary=true
186193

187194
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
188195
# RabbitMQ

docker/custom.conf

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ smtp.host=${?SMTP_SERVER}
3737
service.byteStorage=services.filesystem.DiskByteStorageService
3838
service.byteStorage=${?CLOWDER_STORAGE}
3939

40+
#primary Clowder instance
41+
clowder.primary=true
42+
clowder.primary=${?CLOWDER_PRIMARY}
43+
4044
# location in case of services.filesystem.DiskByteStorageService
4145
clowder.diskStorage.path="/home/clowder/data"
4246
clowder.diskStorage.path=${?CLOWDER_DISKPATH}

0 commit comments

Comments
 (0)