-
Notifications
You must be signed in to change notification settings - Fork 9
Open
Description
There is a race condition that can occur in these methods due to the delay (and lack of synchronization) between the check of the futures map and the insert of the scheduled thread into the futures map. This problem results in multiple scheduled polling threads being created for the same slave on certain (slower) hardware. The issue is easily resolved by synchronizing the maintenance of the futures map on the slave. Fix is pasted below.
private void handleSub(final SlaveNode slave, final Node event) {
synchronized (slave)
{
slave.addToSub(event);
LOGGER.info(String.format("Added subscription for slave %s, register %s", slave.node.getName(), event.getName()));
if (futures.containsKey(slave))
{
return;
}
ScheduledThreadPoolExecutor stpe = slave.getDaemonThreadPool();
ScheduledFuture<?> future = stpe.scheduleWithFixedDelay(new Runnable()
{
@Override
public void run()
{
slave.readPoints();
}
}, 0, slave.intervalInMs, TimeUnit.MILLISECONDS);
futures.put(slave, future);
LOGGER.info(String.format("Scheduled poll for slave %s at %dms", slave.node.getName(), slave.intervalInMs));
}
}
private void handleUnsub(SlaveNode slave, Node event) {
synchronized (slave)
{
slave.removeFromSub(event);
LOGGER.info(String.format("Removed subscription for slave %s, register %s", slave.node.getName(), event.getName()));
if (slave.noneSubscribed())
{
ScheduledFuture<?> future = futures.remove(slave);
if (future != null)
{
future.cancel(false);
LOGGER.info(String.format("Cancelled poll for slave %s", slave.node.getName()));
}
}
}
}
Metadata
Metadata
Assignees
Labels
No labels