Skip to content

Commit 79677c0

Browse files
bodom0015robkooper
andauthored
HOTFIX: Close channel after submitting via EventSinkService (#215)
* Close channel after submitting via EventSinkService * Add a null check, report error if no channels * Update CHANGELOG.md * remove duplicate requirement Co-authored-by: Rob Kooper <[email protected]>
1 parent 83aca55 commit 79677c0

File tree

3 files changed

+22
-9
lines changed

3 files changed

+22
-9
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
66

77
## 1.17.0 - 2021-04-29
88

9+
### Fixed
10+
- Close channel after submitting via RabbitMQMessageService
11+
912
### Added
1013
- Endpoint `/api/datasets/createfrombag` to ingest datasets in BagIt format. Includes basic dataset metadata, files,
1114
folders and technical metadata. Downloading datasets now includes extra Datacite and Clowder metadata.

app/services/rabbitmq/RabbitMQMessageService.scala

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -231,15 +231,26 @@ class RabbitMQMessageService extends MessageService {
231231
/** Submit a message to broker. */
232232
override def submit(exchange: String, routing_key: String, message: JsValue, exchange_type: String = "topic") = {
233233
connect()
234-
val tempChannel = connection.get.createChannel()
235-
tempChannel.exchangeDeclare(exchange, exchange_type, true)
236-
237-
// If a routing_key (queue name) was provided, ensure that the queue exists
238-
if (routing_key != "") {
239-
tempChannel.queueDeclare(routing_key, true, false, false, null)
240-
tempChannel.queueBind(routing_key, exchange, routing_key)
234+
var tempChannel: Channel = null
235+
try {
236+
tempChannel = connection.get.createChannel()
237+
if (tempChannel != null) {
238+
tempChannel.exchangeDeclare(exchange, exchange_type, true)
239+
240+
// If a routing_key (queue name) was provided, ensure that the queue exists
241+
if (routing_key != "") {
242+
tempChannel.queueDeclare(routing_key, true, false, false, null)
243+
tempChannel.queueBind(routing_key, exchange, routing_key)
244+
}
245+
tempChannel.basicPublish(exchange, routing_key, null, message.toString.getBytes)
246+
} else {
247+
Logger.error("Error: no channels available to submit message")
248+
}
249+
} finally {
250+
if (tempChannel != null) {
251+
tempChannel.close()
252+
}
241253
}
242-
tempChannel.basicPublish(exchange, routing_key, null, message.toString.getBytes)
243254
}
244255

245256
/**

doc/src/sphinx/requirements.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ pytz==2021.1
1616
recommonmark==0.6.0
1717
requests==2.25.1
1818
snowballstemmer==2.1.0
19-
sphinx-rtd-theme==0.5.0
2019
sphinx==3.1.2
2120
sphinxcontrib-applehelp==1.0.2
2221
sphinxcontrib-devhelp==1.0.2

0 commit comments

Comments
 (0)