Skip to content

Commit c2c67f6

Browse files
Revert separation of polling threads, process queue singly #75
1 parent c378617 commit c2c67f6

File tree

4 files changed

+77
-79
lines changed

4 files changed

+77
-79
lines changed

src/main/java/org/icatproject/topcat/FacilityMap.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public String getDownloadUrl( String facility, String downloadType ) throws Inte
113113
url = properties.getProperty( "facility." + facility + ".downloadType." + downloadType, "" );
114114
if( url.length() == 0 ){
115115
// No such property, so fall back to the facility idsUrl
116-
logger.info("FacilityMap.getDownloadUrl: no specific property for facility '"
116+
logger.trace("FacilityMap.getDownloadUrl: no specific property for facility '"
117117
+ facility + "' and download type '" + downloadType + "'; returning idsUrl instead" );
118118
url = this.getIdsUrl(facility);
119119
}

src/main/java/org/icatproject/topcat/StatusCheck.java

Lines changed: 49 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
import java.util.concurrent.atomic.AtomicBoolean;
1313

1414
import jakarta.ejb.EJB;
15+
import jakarta.ejb.Lock;
16+
import jakarta.ejb.LockType;
1517
import jakarta.ejb.Schedule;
1618
import jakarta.ejb.Singleton;
1719
import jakarta.json.JsonObject;
@@ -51,7 +53,6 @@ public class StatusCheck {
5153
private static final Logger logger = LoggerFactory.getLogger(StatusCheck.class);
5254
private Map<Long, Date> lastChecks = new HashMap<Long, Date>();
5355
private AtomicBoolean busy = new AtomicBoolean(false);
54-
private AtomicBoolean busyQueue = new AtomicBoolean(false);
5556

5657
@PersistenceContext(unitName="topcat")
5758
EntityManager em;
@@ -61,10 +62,16 @@ public class StatusCheck {
6162

6263
@Resource(name = "mail/topcat")
6364
private Session mailSession;
64-
65+
66+
/**
67+
* poll thread will be WRITE locked, which is the default behaviour for Singletons
68+
* All write operations should go in this function, we do not want other WRITE locked
69+
* threads (e.g. for queuing) to block traditional user cart submissions.
70+
*/
71+
@Lock(LockType.WRITE)
6572
@Schedule(hour = "*", minute = "*", second = "*")
6673
private void poll() {
67-
74+
6875
// Observation: glassfish may already prevent multiple executions, and may even
6976
// count the attempt as an error, so it is possible that the use of a semaphore
7077
// here is redundant.
@@ -81,7 +88,12 @@ private void poll() {
8188
// For testing, separate out the poll body into its own method
8289
// And allow test configurations to disable scheduled status checks
8390
if (!Boolean.valueOf(properties.getProperty("test.disableDownloadStatusChecks", "false"))) {
84-
updateStatuses(pollDelay, pollIntervalWait, null);
91+
boolean downloadsUpdated = updateStatuses(pollDelay, pollIntervalWait, null);
92+
if (!downloadsUpdated) {
93+
// Only process a Download from the queue if there was no work to do for Cart based Downloads
94+
int maxActiveDownloads = Integer.valueOf(properties.getProperty("queue.maxActiveDownloads", "1"));
95+
startQueuedDownload(maxActiveDownloads);
96+
}
8597
}
8698

8799
} catch (Exception e) {
@@ -91,48 +103,21 @@ private void poll() {
91103
}
92104
}
93105

94-
@Schedule(hour = "*", minute = "*/10", second = "0")
95-
private void pollQueue() {
96-
97-
// Observation: glassfish may already prevent multiple executions, and may even
98-
// count the attempt as an error, so it is possible that the use of a semaphore
99-
// here is redundant.
100-
101-
if (!busyQueue.compareAndSet(false, true)) {
102-
return;
103-
}
104-
105-
try {
106-
Properties properties = Properties.getInstance();
107-
int maxActiveDownloads = Integer.valueOf(properties.getProperty("queue.maxActiveDownloads", "1"));
108-
109-
// For testing, separate out the poll body into its own method
110-
// And allow test configurations to disable scheduled status checks
111-
if (!Boolean.valueOf(properties.getProperty("test.disableDownloadStatusChecks", "false"))) {
112-
startQueuedDownloads(maxActiveDownloads);
113-
}
114-
115-
} catch (Exception e) {
116-
logger.error(e.getMessage());
117-
} finally {
118-
busyQueue.set(false);
119-
}
120-
}
121-
122106
/**
123107
* Update the status of each relevant download.
124108
*
125109
* @param pollDelay minimum time to wait before initial
126110
* preparation/check
127111
* @param pollIntervalWait minimum time between checks
128112
* @param injectedIdsClient optional (possibly mock) IdsClient
113+
* @return Whether any Downloads to update were found and prepared
129114
* @throws Exception
130115
*/
131-
public void updateStatuses(int pollDelay, int pollIntervalWait, IdsClient injectedIdsClient) throws Exception {
116+
public boolean updateStatuses(int pollDelay, int pollIntervalWait, IdsClient injectedIdsClient) throws Exception {
132117

133118
// This method is intended for testing, but we are forced to make it public
134119
// rather than protected.
135-
120+
boolean statusesUpdated = false;
136121
String selectString = "select download from Download download where download.isDeleted != true";
137122
String notExpiredCondition = "download.status != org.icatproject.topcat.domain.DownloadStatus.EXPIRED";
138123
String preparingCondition = "download.status = org.icatproject.topcat.domain.DownloadStatus.PREPARING";
@@ -144,6 +129,10 @@ public void updateStatuses(int pollDelay, int pollIntervalWait, IdsClient inject
144129
TypedQuery<Download> query = em.createQuery(queryString, Download.class);
145130
List<Download> downloads = query.getResultList();
146131

132+
if (downloads.size() == 0) {
133+
return statusesUpdated;
134+
}
135+
147136
for (Download download : downloads) {
148137
Date lastCheck = lastChecks.get(download.getId());
149138
Date now = new Date();
@@ -154,10 +143,12 @@ public void updateStatuses(int pollDelay, int pollIntervalWait, IdsClient inject
154143
// a delay. See issue #462.
155144
if (lastCheck == null) {
156145
prepareDownload(download, injectedIdsClient);
146+
statusesUpdated = true;
157147
} else {
158148
long lastCheckSecondsAgo = (now.getTime() - lastCheck.getTime()) / 1000;
159149
if (lastCheckSecondsAgo >= pollIntervalWait) {
160150
prepareDownload(download, injectedIdsClient);
151+
statusesUpdated = true;
161152
}
162153
}
163154
} else if (download.getPreparedId() != null && createdSecondsAgo >= pollDelay) {
@@ -170,7 +161,9 @@ public void updateStatuses(int pollDelay, int pollIntervalWait, IdsClient inject
170161
}
171162
}
172163
}
173-
}
164+
}
165+
166+
return statusesUpdated;
174167
}
175168

176169
private void performCheck(Download download, IdsClient injectedIdsClient) {
@@ -371,15 +364,15 @@ private String getQueueSessionId(Map<String, String> sessionIds, String facility
371364
}
372365

373366
/**
374-
* Prepares Downloads which are QUEUED up to the maxActiveDownloads limit.
367+
* Prepares up to one Download which is QUEUED, up to the maxActiveDownloads limit.
375368
* Downloads will be prepared in order of priority, with all Downloads from
376369
* Users with a value of 1 being prepared first, then 2 and so on.
377370
*
378371
* @param maxActiveDownloads Limit on the number of concurrent jobs with
379372
* RESTORING status
380373
* @throws Exception
381374
*/
382-
public void startQueuedDownloads(int maxActiveDownloads) throws Exception {
375+
public void startQueuedDownload(int maxActiveDownloads) throws Exception {
383376
if (maxActiveDownloads == 0) {
384377
logger.trace("Preparing of queued jobs disabled by config, skipping");
385378
return;
@@ -408,17 +401,20 @@ public void startQueuedDownloads(int maxActiveDownloads) throws Exception {
408401
queuedQueryString += " order by download.createdAt";
409402
TypedQuery<Download> queuedDownloadsQuery = em.createQuery(queuedQueryString, Download.class);
410403
List<Download> queuedDownloads = queuedDownloadsQuery.getResultList();
404+
int queueSize = queuedDownloads.size();
405+
if (queueSize == 0) {
406+
return;
407+
}
411408

412409
Map<String, String> sessionIds = new HashMap<>();
413410
if (maxActiveDownloads <= 0) {
414411
// No limits on how many to submit
415-
logger.trace("Preparing {} queued downloads", queuedDownloads.size());
416-
for (Download queuedDownload : queuedDownloads) {
417-
queuedDownload.setStatus(DownloadStatus.PREPARING);
418-
prepareDownload(queuedDownload, null, getQueueSessionId(sessionIds, queuedDownload.getFacilityName()));
419-
}
412+
logger.trace("Preparing 1 out of {} queued downloads", queueSize);
413+
Download queuedDownload = queuedDownloads.get(0);
414+
queuedDownload.setStatus(DownloadStatus.PREPARING);
415+
prepareDownload(queuedDownload, null, getQueueSessionId(sessionIds, queuedDownload.getFacilityName()));
420416
} else {
421-
logger.trace("Preparing up to {} queued downloads", availableDownloads);
417+
logger.trace("Preparing 1 out of {} queued downloads as {} spaces available", queueSize, availableDownloads);
422418
HashMap<Integer, List<Download>> mapping = new HashMap<>();
423419
for (Download queuedDownload : queuedDownloads) {
424420
String sessionId = getQueueSessionId(sessionIds, queuedDownload.getFacilityName());
@@ -429,33 +425,26 @@ public void startQueuedDownloads(int maxActiveDownloads) throws Exception {
429425
// Highest priority, prepare now
430426
queuedDownload.setStatus(DownloadStatus.PREPARING);
431427
prepareDownload(queuedDownload, null, sessionId);
432-
availableDownloads -= 1;
433-
if (availableDownloads <= 0) {
434-
return;
435-
}
428+
return;
436429
} else {
437430
// Lower priority, add to mapping
438431
mapping.putIfAbsent(priority, new ArrayList<>());
439432
mapping.get(priority).add(queuedDownload);
440433
}
441434
}
435+
436+
// Get the highest priority encountered
442437
List<Integer> keyList = new ArrayList<>();
443438
for (Object key : mapping.keySet().toArray()) {
444439
keyList.add((Integer) key);
445440
}
446-
Collections.sort(keyList);
447-
for (int key : keyList) {
448-
// Prepare from mapping in priority order
449-
List<Download> downloadList = mapping.get(key);
450-
for (Download download : downloadList) {
451-
download.setStatus(DownloadStatus.PREPARING);
452-
prepareDownload(download, null, getQueueSessionId(sessionIds, download.getFacilityName()));
453-
availableDownloads -= 1;
454-
if (availableDownloads <= 0) {
455-
return;
456-
}
457-
}
458-
}
441+
int priority = Collections.min(keyList);
442+
443+
// Prepare the first Download at this priority level
444+
List<Download> downloadList = mapping.get(priority);
445+
Download download = downloadList.get(0);
446+
download.setStatus(DownloadStatus.PREPARING);
447+
prepareDownload(download, null, getQueueSessionId(sessionIds, download.getFacilityName()));
459448
}
460449
}
461450

src/test/java/org/icatproject/topcat/StatusCheckTest.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -753,8 +753,8 @@ public void testExceptionDelays() throws Exception {
753753

754754
@Test
755755
@Transactional
756-
public void testStartQueuedDownloadsNegative() throws Exception {
757-
System.out.println("DEBUG testStartQueuedDownloadsNegative");
756+
public void testStartQueuedDownloadNegative() throws Exception {
757+
System.out.println("DEBUG testStartQueuedDownloadNegative");
758758
Long downloadId1 = null;
759759
Long downloadId2 = null;
760760
try {
@@ -766,15 +766,15 @@ public void testStartQueuedDownloadsNegative() throws Exception {
766766
downloadId1 = dummyDownload1.getId();
767767
downloadId2 = dummyDownload2.getId();
768768

769-
statusCheck.startQueuedDownloads(-1);
769+
statusCheck.startQueuedDownload(-1);
770770

771771
Download postDownload1 = TestHelpers.getDummyDownload(downloadId1, downloadRepository);
772772
Download postDownload2 = TestHelpers.getDummyDownload(downloadId2, downloadRepository);
773773

774774
assertEquals(DownloadStatus.RESTORING, postDownload1.getStatus());
775775
assertNotNull(postDownload1.getPreparedId());
776-
assertEquals(DownloadStatus.RESTORING, postDownload2.getStatus());
777-
assertNotNull(postDownload2.getPreparedId());
776+
assertEquals(DownloadStatus.QUEUED, postDownload2.getStatus());
777+
assertNull(postDownload2.getPreparedId());
778778
} finally {
779779
// clean up
780780
TestHelpers.deleteDummyDownload(downloadId1, downloadRepository);
@@ -784,15 +784,15 @@ public void testStartQueuedDownloadsNegative() throws Exception {
784784

785785
@Test
786786
@Transactional
787-
public void testStartQueuedDownloadsZero() throws Exception {
787+
public void testStartQueuedDownloadZero() throws Exception {
788788
Long downloadId = null;
789789
try {
790790
String transport = "http";
791791
Download dummyDownload = TestHelpers.createDummyDownload("DummyUserName", null, transport, true,
792792
DownloadStatus.QUEUED, false, downloadRepository);
793793
downloadId = dummyDownload.getId();
794794

795-
statusCheck.startQueuedDownloads(0);
795+
statusCheck.startQueuedDownload(0);
796796

797797
// Download status should still be QUEUED, as we unqueued a max of 0 downloads
798798

@@ -808,8 +808,8 @@ public void testStartQueuedDownloadsZero() throws Exception {
808808

809809
@Test
810810
@Transactional
811-
public void testStartQueuedDownloadsNonZero() throws Exception {
812-
System.out.println("DEBUG testStartQueuedDownloadsNonZero");
811+
public void testStartQueuedDownloadNonZero() throws Exception {
812+
System.out.println("DEBUG testStartQueuedDownloadNonZero");
813813
Long downloadId1 = null;
814814
Long downloadId2 = null;
815815
try {
@@ -821,7 +821,7 @@ public void testStartQueuedDownloadsNonZero() throws Exception {
821821
downloadId1 = dummyDownload1.getId();
822822
downloadId2 = dummyDownload2.getId();
823823

824-
statusCheck.startQueuedDownloads(1);
824+
statusCheck.startQueuedDownload(1);
825825

826826
Download postDownload1 = TestHelpers.getDummyDownload(downloadId1, downloadRepository);
827827
Download postDownload2 = TestHelpers.getDummyDownload(downloadId2, downloadRepository);
@@ -839,7 +839,7 @@ public void testStartQueuedDownloadsNonZero() throws Exception {
839839

840840
@Test
841841
@Transactional
842-
public void testStartQueuedDownloadsNonZeroRestoringDownload() throws Exception {
842+
public void testStartQueuedDownloadNonZeroRestoringDownload() throws Exception {
843843
Long downloadId1 = null;
844844
Long downloadId2 = null;
845845
try {
@@ -851,7 +851,7 @@ public void testStartQueuedDownloadsNonZeroRestoringDownload() throws Exception
851851
downloadId1 = dummyDownload1.getId();
852852
downloadId2 = dummyDownload2.getId();
853853

854-
statusCheck.startQueuedDownloads(1);
854+
statusCheck.startQueuedDownload(1);
855855

856856
// Should not schedule the second Download, as we already have 1 which is
857857
// RESTORING

tools/datagateway_admin

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -300,19 +300,28 @@ def queue_files():
300300
transport = input("Enter transport mechanism: ")
301301
email = input("Enter email to notify upon completion: ")
302302
local_file = input("Enter path to local file containing newline delimited file locations: ")
303-
with open(local_file) as f:
304-
files = [l.strip() for l in f.readlines()]
305-
303+
i = 1
304+
files = []
306305
data = {
307306
"facilityName": facility_name,
308307
"sessionId": session_id,
309308
"transport": transport,
310309
"email": email,
311-
"files": files,
312310
}
313311
url = topcat_url + "/user/queue/files"
314-
print(requests.post(url=url, data=data, verify=verifySsl).text)
315-
312+
with open(local_file) as f:
313+
for line in f.readlines():
314+
files.append(line.strip())
315+
if len(files) >= 10000:
316+
data["files"] = files
317+
data["fileName"] = f"{facility_name}_files_part_{i}"
318+
print(requests.post(url=url, data=data, verify=verifySsl).text)
319+
i += 1
320+
files = []
321+
322+
if files:
323+
data["files"] = files
324+
print(requests.post(url=url, data=data, verify=verifySsl).text)
316325

317326

318327
def get_all_queued_downloads():

0 commit comments

Comments
 (0)