diff --git a/src/main/java/org/icatproject/topcat/FacilityMap.java b/src/main/java/org/icatproject/topcat/FacilityMap.java index 204fe95d..4be63de2 100644 --- a/src/main/java/org/icatproject/topcat/FacilityMap.java +++ b/src/main/java/org/icatproject/topcat/FacilityMap.java @@ -113,7 +113,7 @@ public String getDownloadUrl( String facility, String downloadType ) throws Inte url = properties.getProperty( "facility." + facility + ".downloadType." + downloadType, "" ); if( url.length() == 0 ){ // No such property, so fall back to the facility idsUrl - logger.info("FacilityMap.getDownloadUrl: no specific property for facility '" + logger.trace("FacilityMap.getDownloadUrl: no specific property for facility '" + facility + "' and download type '" + downloadType + "'; returning idsUrl instead" ); url = this.getIdsUrl(facility); } diff --git a/src/main/java/org/icatproject/topcat/StatusCheck.java b/src/main/java/org/icatproject/topcat/StatusCheck.java index 84313e07..344cc3e4 100644 --- a/src/main/java/org/icatproject/topcat/StatusCheck.java +++ b/src/main/java/org/icatproject/topcat/StatusCheck.java @@ -12,6 +12,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import jakarta.ejb.EJB; +import jakarta.ejb.Lock; +import jakarta.ejb.LockType; import jakarta.ejb.Schedule; import jakarta.ejb.Singleton; import jakarta.json.JsonObject; @@ -51,7 +53,6 @@ public class StatusCheck { private static final Logger logger = LoggerFactory.getLogger(StatusCheck.class); private Map lastChecks = new HashMap(); private AtomicBoolean busy = new AtomicBoolean(false); - private AtomicBoolean busyQueue = new AtomicBoolean(false); @PersistenceContext(unitName="topcat") EntityManager em; @@ -61,10 +62,16 @@ public class StatusCheck { @Resource(name = "mail/topcat") private Session mailSession; - + + /** + * poll thread will be WRITE locked, which is the default behaviour for Singletons + * All write operations should go in this function, we do not want other WRITE locked + * threads (e.g. for queuing) to block traditional user cart submissions. + */ + @Lock(LockType.WRITE) @Schedule(hour = "*", minute = "*", second = "*") private void poll() { - + // Observation: glassfish may already prevent multiple executions, and may even // count the attempt as an error, so it is possible that the use of a semaphore // here is redundant. @@ -81,7 +88,12 @@ private void poll() { // For testing, separate out the poll body into its own method // And allow test configurations to disable scheduled status checks if (!Boolean.valueOf(properties.getProperty("test.disableDownloadStatusChecks", "false"))) { - updateStatuses(pollDelay, pollIntervalWait, null); + boolean downloadsUpdated = updateStatuses(pollDelay, pollIntervalWait, null); + if (!downloadsUpdated) { + // Only process a Download from the queue if there was no work to do for Cart based Downloads + int maxActiveDownloads = Integer.valueOf(properties.getProperty("queue.maxActiveDownloads", "1")); + startQueuedDownload(maxActiveDownloads); + } } } catch (Exception e) { @@ -91,34 +103,6 @@ private void poll() { } } - @Schedule(hour = "*", minute = "*/10", second = "0") - private void pollQueue() { - - // Observation: glassfish may already prevent multiple executions, and may even - // count the attempt as an error, so it is possible that the use of a semaphore - // here is redundant. - - if (!busyQueue.compareAndSet(false, true)) { - return; - } - - try { - Properties properties = Properties.getInstance(); - int maxActiveDownloads = Integer.valueOf(properties.getProperty("queue.maxActiveDownloads", "1")); - - // For testing, separate out the poll body into its own method - // And allow test configurations to disable scheduled status checks - if (!Boolean.valueOf(properties.getProperty("test.disableDownloadStatusChecks", "false"))) { - startQueuedDownloads(maxActiveDownloads); - } - - } catch (Exception e) { - logger.error(e.getMessage()); - } finally { - busyQueue.set(false); - } - } - /** * Update the status of each relevant download. * @@ -126,13 +110,14 @@ private void pollQueue() { * preparation/check * @param pollIntervalWait minimum time between checks * @param injectedIdsClient optional (possibly mock) IdsClient + * @return Whether any Downloads to update were found and prepared * @throws Exception */ - public void updateStatuses(int pollDelay, int pollIntervalWait, IdsClient injectedIdsClient) throws Exception { + public boolean updateStatuses(int pollDelay, int pollIntervalWait, IdsClient injectedIdsClient) throws Exception { // This method is intended for testing, but we are forced to make it public // rather than protected. - + boolean statusesUpdated = false; String selectString = "select download from Download download where download.isDeleted != true"; String notExpiredCondition = "download.status != org.icatproject.topcat.domain.DownloadStatus.EXPIRED"; String preparingCondition = "download.status = org.icatproject.topcat.domain.DownloadStatus.PREPARING"; @@ -144,6 +129,10 @@ public void updateStatuses(int pollDelay, int pollIntervalWait, IdsClient inject TypedQuery query = em.createQuery(queryString, Download.class); List downloads = query.getResultList(); + if (downloads.size() == 0) { + return statusesUpdated; + } + for (Download download : downloads) { Date lastCheck = lastChecks.get(download.getId()); Date now = new Date(); @@ -154,10 +143,12 @@ public void updateStatuses(int pollDelay, int pollIntervalWait, IdsClient inject // a delay. See issue #462. if (lastCheck == null) { prepareDownload(download, injectedIdsClient); + statusesUpdated = true; } else { long lastCheckSecondsAgo = (now.getTime() - lastCheck.getTime()) / 1000; if (lastCheckSecondsAgo >= pollIntervalWait) { prepareDownload(download, injectedIdsClient); + statusesUpdated = true; } } } else if (download.getPreparedId() != null && createdSecondsAgo >= pollDelay) { @@ -170,7 +161,9 @@ public void updateStatuses(int pollDelay, int pollIntervalWait, IdsClient inject } } } - } + } + + return statusesUpdated; } private void performCheck(Download download, IdsClient injectedIdsClient) { @@ -371,7 +364,7 @@ private String getQueueSessionId(Map sessionIds, String facility } /** - * Prepares Downloads which are QUEUED up to the maxActiveDownloads limit. + * Prepares up to one Download which is QUEUED, up to the maxActiveDownloads limit. * Downloads will be prepared in order of priority, with all Downloads from * Users with a value of 1 being prepared first, then 2 and so on. * @@ -379,7 +372,7 @@ private String getQueueSessionId(Map sessionIds, String facility * RESTORING status * @throws Exception */ - public void startQueuedDownloads(int maxActiveDownloads) throws Exception { + public void startQueuedDownload(int maxActiveDownloads) throws Exception { if (maxActiveDownloads == 0) { logger.trace("Preparing of queued jobs disabled by config, skipping"); return; @@ -408,17 +401,20 @@ public void startQueuedDownloads(int maxActiveDownloads) throws Exception { queuedQueryString += " order by download.createdAt"; TypedQuery queuedDownloadsQuery = em.createQuery(queuedQueryString, Download.class); List queuedDownloads = queuedDownloadsQuery.getResultList(); + int queueSize = queuedDownloads.size(); + if (queueSize == 0) { + return; + } Map sessionIds = new HashMap<>(); if (maxActiveDownloads <= 0) { // No limits on how many to submit - logger.trace("Preparing {} queued downloads", queuedDownloads.size()); - for (Download queuedDownload : queuedDownloads) { - queuedDownload.setStatus(DownloadStatus.PREPARING); - prepareDownload(queuedDownload, null, getQueueSessionId(sessionIds, queuedDownload.getFacilityName())); - } + logger.info("Preparing 1 out of {} queued downloads", queueSize); + Download queuedDownload = queuedDownloads.get(0); + queuedDownload.setStatus(DownloadStatus.PREPARING); + prepareDownload(queuedDownload, null, getQueueSessionId(sessionIds, queuedDownload.getFacilityName())); } else { - logger.trace("Preparing up to {} queued downloads", availableDownloads); + logger.info("Preparing 1 out of {} queued downloads as {} spaces available", queueSize, availableDownloads); HashMap> mapping = new HashMap<>(); for (Download queuedDownload : queuedDownloads) { String sessionId = getQueueSessionId(sessionIds, queuedDownload.getFacilityName()); @@ -429,33 +425,26 @@ public void startQueuedDownloads(int maxActiveDownloads) throws Exception { // Highest priority, prepare now queuedDownload.setStatus(DownloadStatus.PREPARING); prepareDownload(queuedDownload, null, sessionId); - availableDownloads -= 1; - if (availableDownloads <= 0) { - return; - } + return; } else { // Lower priority, add to mapping mapping.putIfAbsent(priority, new ArrayList<>()); mapping.get(priority).add(queuedDownload); } } + + // Get the highest priority encountered List keyList = new ArrayList<>(); for (Object key : mapping.keySet().toArray()) { keyList.add((Integer) key); } - Collections.sort(keyList); - for (int key : keyList) { - // Prepare from mapping in priority order - List downloadList = mapping.get(key); - for (Download download : downloadList) { - download.setStatus(DownloadStatus.PREPARING); - prepareDownload(download, null, getQueueSessionId(sessionIds, download.getFacilityName())); - availableDownloads -= 1; - if (availableDownloads <= 0) { - return; - } - } - } + int priority = Collections.min(keyList); + + // Prepare the first Download at this priority level + List downloadList = mapping.get(priority); + Download download = downloadList.get(0); + download.setStatus(DownloadStatus.PREPARING); + prepareDownload(download, null, getQueueSessionId(sessionIds, download.getFacilityName())); } } diff --git a/src/test/java/org/icatproject/topcat/StatusCheckTest.java b/src/test/java/org/icatproject/topcat/StatusCheckTest.java index 91abee98..2f7d9a4f 100644 --- a/src/test/java/org/icatproject/topcat/StatusCheckTest.java +++ b/src/test/java/org/icatproject/topcat/StatusCheckTest.java @@ -753,8 +753,8 @@ public void testExceptionDelays() throws Exception { @Test @Transactional - public void testStartQueuedDownloadsNegative() throws Exception { - System.out.println("DEBUG testStartQueuedDownloadsNegative"); + public void testStartQueuedDownloadNegative() throws Exception { + System.out.println("DEBUG testStartQueuedDownloadNegative"); Long downloadId1 = null; Long downloadId2 = null; try { @@ -766,15 +766,15 @@ public void testStartQueuedDownloadsNegative() throws Exception { downloadId1 = dummyDownload1.getId(); downloadId2 = dummyDownload2.getId(); - statusCheck.startQueuedDownloads(-1); + statusCheck.startQueuedDownload(-1); Download postDownload1 = TestHelpers.getDummyDownload(downloadId1, downloadRepository); Download postDownload2 = TestHelpers.getDummyDownload(downloadId2, downloadRepository); assertEquals(DownloadStatus.RESTORING, postDownload1.getStatus()); assertNotNull(postDownload1.getPreparedId()); - assertEquals(DownloadStatus.RESTORING, postDownload2.getStatus()); - assertNotNull(postDownload2.getPreparedId()); + assertEquals(DownloadStatus.QUEUED, postDownload2.getStatus()); + assertNull(postDownload2.getPreparedId()); } finally { // clean up TestHelpers.deleteDummyDownload(downloadId1, downloadRepository); @@ -784,7 +784,7 @@ public void testStartQueuedDownloadsNegative() throws Exception { @Test @Transactional - public void testStartQueuedDownloadsZero() throws Exception { + public void testStartQueuedDownloadZero() throws Exception { Long downloadId = null; try { String transport = "http"; @@ -792,7 +792,7 @@ public void testStartQueuedDownloadsZero() throws Exception { DownloadStatus.QUEUED, false, downloadRepository); downloadId = dummyDownload.getId(); - statusCheck.startQueuedDownloads(0); + statusCheck.startQueuedDownload(0); // Download status should still be QUEUED, as we unqueued a max of 0 downloads @@ -808,8 +808,8 @@ public void testStartQueuedDownloadsZero() throws Exception { @Test @Transactional - public void testStartQueuedDownloadsNonZero() throws Exception { - System.out.println("DEBUG testStartQueuedDownloadsNonZero"); + public void testStartQueuedDownloadNonZero() throws Exception { + System.out.println("DEBUG testStartQueuedDownloadNonZero"); Long downloadId1 = null; Long downloadId2 = null; try { @@ -821,7 +821,7 @@ public void testStartQueuedDownloadsNonZero() throws Exception { downloadId1 = dummyDownload1.getId(); downloadId2 = dummyDownload2.getId(); - statusCheck.startQueuedDownloads(1); + statusCheck.startQueuedDownload(1); Download postDownload1 = TestHelpers.getDummyDownload(downloadId1, downloadRepository); Download postDownload2 = TestHelpers.getDummyDownload(downloadId2, downloadRepository); @@ -839,7 +839,7 @@ public void testStartQueuedDownloadsNonZero() throws Exception { @Test @Transactional - public void testStartQueuedDownloadsNonZeroRestoringDownload() throws Exception { + public void testStartQueuedDownloadNonZeroRestoringDownload() throws Exception { Long downloadId1 = null; Long downloadId2 = null; try { @@ -851,7 +851,7 @@ public void testStartQueuedDownloadsNonZeroRestoringDownload() throws Exception downloadId1 = dummyDownload1.getId(); downloadId2 = dummyDownload2.getId(); - statusCheck.startQueuedDownloads(1); + statusCheck.startQueuedDownload(1); // Should not schedule the second Download, as we already have 1 which is // RESTORING diff --git a/tools/datagateway_admin b/tools/datagateway_admin index 2fe1018e..01f2ed9e 100644 --- a/tools/datagateway_admin +++ b/tools/datagateway_admin @@ -157,14 +157,15 @@ def list_file_locations(): def prepare_download(): for download_id in input_download_ids(): - requests.post( + response = requests.put( f"{topcat_url}/admin/download/{download_id}/prepare", - params={ + data={ "facilityName": facility_name, "sessionId": session_id, }, verify=verifySsl, ) + print(response.status_code, response.text) def expire_download(): @@ -173,7 +174,7 @@ def expire_download(): def _expire_download(download_id): - requests.put( + response = requests.put( topcat_url + "/admin/download/" + download_id + "/status", data={ "facilityName": facility_name, @@ -182,6 +183,7 @@ def _expire_download(download_id): }, verify=verifySsl, ) + print(response.status_code, response.text) def expire_all_pending_downloads(): @@ -194,12 +196,8 @@ def expire_all_pending_downloads(): "sessionId": session_id, "queryOffset": query }, verify=verifySsl).text) - for download in downloads: - requests.put(topcat_url + "/admin/download/" + str(download["id"]) + "/status", data={ - "facilityName": facility_name, - "sessionId": session_id, - "value": "EXPIRED" - }, verify=verifySsl) + for download in downloads: + _expire_download(download_id=download["id"]) def manage_download_types(): download_types = list(settingsJson["accessMethods"]) @@ -300,19 +298,28 @@ def queue_files(): transport = input("Enter transport mechanism: ") email = input("Enter email to notify upon completion: ") local_file = input("Enter path to local file containing newline delimited file locations: ") - with open(local_file) as f: - files = [l.strip() for l in f.readlines()] - + i = 1 + files = [] data = { "facilityName": facility_name, "sessionId": session_id, "transport": transport, "email": email, - "files": files, } url = topcat_url + "/user/queue/files" - print(requests.post(url=url, data=data, verify=verifySsl).text) - + with open(local_file) as f: + for line in f.readlines(): + files.append(line.strip()) + if len(files) >= 10000: + data["files"] = files + data["fileName"] = f"{facility_name}_files_part_{i}" + print(requests.post(url=url, data=data, verify=verifySsl).text) + i += 1 + files = [] + + if files: + data["files"] = files + print(requests.post(url=url, data=data, verify=verifySsl).text) def get_all_queued_downloads(): @@ -328,14 +335,15 @@ def get_all_queued_downloads(): return requests.get(topcat_url + "/admin/downloads", params=params, verify=verifySsl).text -def prepare_download(download_id): +def start_download(download_id): data = { "facilityName": facility_name, "sessionId": session_id, "value": "PREPARING" } url = topcat_url + "/admin/download/" + download_id + "/status" - requests.put(url=url, data=data, verify=verifySsl) + response = requests.put(url=url, data=data, verify=verifySsl) + print(response.status_code, response.text) def show_all_queued_downloads(): @@ -344,14 +352,14 @@ def show_all_queued_downloads(): def start_queued_download(): for download_id in input_download_ids(): - prepare_download(download_id=download_id) + start_download(download_id=download_id) def start_queued_downloads(): text = get_all_queued_downloads() downloads = json.loads(text) - for download in downloads: - prepare_download(str(download["id"])) + for download in downloads: + start_download(str(download["id"])) def requeue_download(): @@ -362,7 +370,8 @@ def requeue_download(): "value": "QUEUED" } url = topcat_url + "/admin/download/" + download_id + "/status" - requests.put(url=url, data=data, verify=verifySsl) + response = requests.put(url=url, data=data, verify=verifySsl) + print(response.status_code, response.text) def expire_all_queued_downloads():