Skip to content

Commit f55e534

Browse files
authored
AMQ-9808: Dead lock when using CRON schedule with AMQ_SCHEDULED_REPEAT (#1536)
* AMQ-9808: Dead lock when using CRON schedule with AMQ_SCHEDULED_REPEAT * AMQ-9808: Consistency and better exception handling
1 parent 359d953 commit f55e534

File tree

2 files changed

+41
-15
lines changed

2 files changed

+41
-15
lines changed

activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,16 @@ private void doReschedule(final String jobId, long executionTime, long nextExecu
268268
this.store.store(update);
269269
}
270270

271+
private void doSchedule(final List<Closure> toSchedule) {
272+
for (Closure closure : toSchedule) {
273+
try {
274+
closure.run();
275+
} catch (final Exception e) {
276+
LOG.warn("Failed to schedule job", e);
277+
}
278+
}
279+
}
280+
271281
private void doRemove(final List<Closure> toRemove) throws IOException {
272282
for (Closure closure : toRemove) {
273283
closure.run();
@@ -727,6 +737,7 @@ protected void mainLoop() {
727737
// needed before firing the job event.
728738
List<Closure> toRemove = new ArrayList<>();
729739
List<Closure> toReschedule = new ArrayList<>();
740+
List<Closure> toSchedule = new ArrayList<>();
730741
try {
731742
this.store.readLockIndex();
732743

@@ -776,12 +787,18 @@ protected void mainLoop() {
776787
// we have a separate schedule to run at this time
777788
// so the cron job is used to set of a separate schedule
778789
// hence we won't fire the original cron job to the
779-
// listeners but we do need to start a separate schedule
780-
String jobId = ID_GENERATOR.generateId();
781-
ByteSequence payload = getPayload(job.getLocation());
782-
schedule(jobId, payload, "", job.getDelay(), job.getPeriod(), job.getRepeat());
783-
waitTime = job.getDelay() != 0 ? job.getDelay() : job.getPeriod();
784-
this.scheduleTime.setWaitTime(waitTime);
790+
// listeners, but we do need to start a separate schedule
791+
toSchedule.add(() -> {
792+
try {
793+
String jobId = ID_GENERATOR.generateId();
794+
ByteSequence payload = getPayload(job.getLocation());
795+
schedule(jobId, payload, "", job.getDelay(), job.getPeriod(), job.getRepeat());
796+
} catch (Exception e) {
797+
LOG.warn("Failed to schedule cron follow-up job", e);
798+
}
799+
});
800+
long wait = job.getDelay() != 0 ? job.getDelay() : job.getPeriod();
801+
this.scheduleTime.setWaitTime(wait);
785802
}
786803
} else {
787804
toRemove.add(() -> doRemove(executionTime, job.getJobId()));
@@ -797,6 +814,10 @@ protected void mainLoop() {
797814
} finally {
798815
this.store.readUnlockIndex();
799816

817+
// deferred execution of all jobs to be scheduled to avoid deadlock with indexLock
818+
doSchedule(toSchedule);
819+
820+
// now reschedule all jobs that need rescheduling
800821
doReschedule(toReschedule);
801822

802823
// now remove all jobs that have not been rescheduled,
@@ -805,6 +826,7 @@ protected void mainLoop() {
805826
}
806827

807828
this.scheduleTime.pause();
829+
808830
} catch (Exception ioe) {
809831
LOG.error("{} Failed to schedule job", this.name, ioe);
810832
try {

activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,6 @@
1616
*/
1717
package org.apache.activemq.broker.scheduler;
1818

19-
import static org.junit.Assert.assertEquals;
20-
import static org.junit.Assert.assertNotNull;
21-
22-
import java.util.List;
23-
import java.util.concurrent.CountDownLatch;
24-
import java.util.concurrent.TimeUnit;
25-
import java.util.concurrent.atomic.AtomicInteger;
26-
2719
import jakarta.jms.Connection;
2820
import jakarta.jms.JMSException;
2921
import jakarta.jms.Message;
@@ -32,7 +24,6 @@
3224
import jakarta.jms.MessageProducer;
3325
import jakarta.jms.Session;
3426
import jakarta.jms.TextMessage;
35-
3627
import org.apache.activemq.ActiveMQConnectionFactory;
3728
import org.apache.activemq.ScheduledMessage;
3829
import org.apache.activemq.store.kahadb.disk.journal.Location;
@@ -48,6 +39,14 @@
4839
import org.slf4j.Logger;
4940
import org.slf4j.LoggerFactory;
5041

42+
import java.util.List;
43+
import java.util.concurrent.CountDownLatch;
44+
import java.util.concurrent.TimeUnit;
45+
import java.util.concurrent.atomic.AtomicInteger;
46+
47+
import static org.junit.Assert.assertEquals;
48+
import static org.junit.Assert.assertNotNull;
49+
5150
public class JmsSchedulerTest extends JobSchedulerTestSupport {
5251

5352
private static final Logger LOG = LoggerFactory.getLogger(JmsSchedulerTest.class);
@@ -230,6 +229,11 @@ public void append(LogEvent event) {
230229
numberOfDiscardedJobs.incrementAndGet();
231230
}
232231
}
232+
233+
@Override
234+
public boolean isStarted() {
235+
return true; // false in DefaultTestAppender so Log4j will discard this appender
236+
}
233237
};
234238

235239
registerLogAppender(appender);

0 commit comments

Comments
 (0)