Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/java/org/icatproject/topcat/FacilityMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public String getDownloadUrl( String facility, String downloadType ) throws Inte
url = properties.getProperty( "facility." + facility + ".downloadType." + downloadType, "" );
if( url.length() == 0 ){
// No such property, so fall back to the facility idsUrl
logger.info("FacilityMap.getDownloadUrl: no specific property for facility '"
logger.trace("FacilityMap.getDownloadUrl: no specific property for facility '"
+ facility + "' and download type '" + downloadType + "'; returning idsUrl instead" );
url = this.getIdsUrl(facility);
}
Expand Down
109 changes: 49 additions & 60 deletions src/main/java/org/icatproject/topcat/StatusCheck.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import java.util.concurrent.atomic.AtomicBoolean;

import jakarta.ejb.EJB;
import jakarta.ejb.Lock;
import jakarta.ejb.LockType;
import jakarta.ejb.Schedule;
import jakarta.ejb.Singleton;
import jakarta.json.JsonObject;
Expand Down Expand Up @@ -51,7 +53,6 @@ public class StatusCheck {
private static final Logger logger = LoggerFactory.getLogger(StatusCheck.class);
private Map<Long, Date> lastChecks = new HashMap<Long, Date>();
private AtomicBoolean busy = new AtomicBoolean(false);
private AtomicBoolean busyQueue = new AtomicBoolean(false);

@PersistenceContext(unitName="topcat")
EntityManager em;
Expand All @@ -61,10 +62,16 @@ public class StatusCheck {

@Resource(name = "mail/topcat")
private Session mailSession;


/**
* poll thread will be WRITE locked, which is the default behaviour for Singletons
* All write operations should go in this function, we do not want other WRITE locked
* threads (e.g. for queuing) to block traditional user cart submissions.
*/
@Lock(LockType.WRITE)
@Schedule(hour = "*", minute = "*", second = "*")
private void poll() {

// Observation: glassfish may already prevent multiple executions, and may even
// count the attempt as an error, so it is possible that the use of a semaphore
// here is redundant.
Expand All @@ -81,7 +88,12 @@ private void poll() {
// For testing, separate out the poll body into its own method
// And allow test configurations to disable scheduled status checks
if (!Boolean.valueOf(properties.getProperty("test.disableDownloadStatusChecks", "false"))) {
updateStatuses(pollDelay, pollIntervalWait, null);
boolean downloadsUpdated = updateStatuses(pollDelay, pollIntervalWait, null);
if (!downloadsUpdated) {
// Only process a Download from the queue if there was no work to do for Cart based Downloads
int maxActiveDownloads = Integer.valueOf(properties.getProperty("queue.maxActiveDownloads", "1"));
startQueuedDownload(maxActiveDownloads);
}
}

} catch (Exception e) {
Expand All @@ -91,48 +103,21 @@ private void poll() {
}
}

@Schedule(hour = "*", minute = "*/10", second = "0")
private void pollQueue() {

// Observation: glassfish may already prevent multiple executions, and may even
// count the attempt as an error, so it is possible that the use of a semaphore
// here is redundant.

if (!busyQueue.compareAndSet(false, true)) {
return;
}

try {
Properties properties = Properties.getInstance();
int maxActiveDownloads = Integer.valueOf(properties.getProperty("queue.maxActiveDownloads", "1"));

// For testing, separate out the poll body into its own method
// And allow test configurations to disable scheduled status checks
if (!Boolean.valueOf(properties.getProperty("test.disableDownloadStatusChecks", "false"))) {
startQueuedDownloads(maxActiveDownloads);
}

} catch (Exception e) {
logger.error(e.getMessage());
} finally {
busyQueue.set(false);
}
}

/**
* Update the status of each relevant download.
*
* @param pollDelay minimum time to wait before initial
* preparation/check
* @param pollIntervalWait minimum time between checks
* @param injectedIdsClient optional (possibly mock) IdsClient
* @return Whether any Downloads to update were found and prepared
* @throws Exception
*/
public void updateStatuses(int pollDelay, int pollIntervalWait, IdsClient injectedIdsClient) throws Exception {
public boolean updateStatuses(int pollDelay, int pollIntervalWait, IdsClient injectedIdsClient) throws Exception {

// This method is intended for testing, but we are forced to make it public
// rather than protected.

boolean statusesUpdated = false;
String selectString = "select download from Download download where download.isDeleted != true";
String notExpiredCondition = "download.status != org.icatproject.topcat.domain.DownloadStatus.EXPIRED";
String preparingCondition = "download.status = org.icatproject.topcat.domain.DownloadStatus.PREPARING";
Expand All @@ -144,6 +129,10 @@ public void updateStatuses(int pollDelay, int pollIntervalWait, IdsClient inject
TypedQuery<Download> query = em.createQuery(queryString, Download.class);
List<Download> downloads = query.getResultList();

if (downloads.size() == 0) {
return statusesUpdated;
}

for (Download download : downloads) {
Date lastCheck = lastChecks.get(download.getId());
Date now = new Date();
Expand All @@ -154,10 +143,12 @@ public void updateStatuses(int pollDelay, int pollIntervalWait, IdsClient inject
// a delay. See issue #462.
if (lastCheck == null) {
prepareDownload(download, injectedIdsClient);
statusesUpdated = true;
} else {
long lastCheckSecondsAgo = (now.getTime() - lastCheck.getTime()) / 1000;
if (lastCheckSecondsAgo >= pollIntervalWait) {
prepareDownload(download, injectedIdsClient);
statusesUpdated = true;
}
}
} else if (download.getPreparedId() != null && createdSecondsAgo >= pollDelay) {
Expand All @@ -170,7 +161,9 @@ public void updateStatuses(int pollDelay, int pollIntervalWait, IdsClient inject
}
}
}
}
}

return statusesUpdated;
}

private void performCheck(Download download, IdsClient injectedIdsClient) {
Expand Down Expand Up @@ -371,15 +364,15 @@ private String getQueueSessionId(Map<String, String> sessionIds, String facility
}

/**
* Prepares Downloads which are QUEUED up to the maxActiveDownloads limit.
* Prepares up to one Download which is QUEUED, up to the maxActiveDownloads limit.
* Downloads will be prepared in order of priority, with all Downloads from
* Users with a value of 1 being prepared first, then 2 and so on.
*
* @param maxActiveDownloads Limit on the number of concurrent jobs with
* RESTORING status
* @throws Exception
*/
public void startQueuedDownloads(int maxActiveDownloads) throws Exception {
public void startQueuedDownload(int maxActiveDownloads) throws Exception {
if (maxActiveDownloads == 0) {
logger.trace("Preparing of queued jobs disabled by config, skipping");
return;
Expand Down Expand Up @@ -408,17 +401,20 @@ public void startQueuedDownloads(int maxActiveDownloads) throws Exception {
queuedQueryString += " order by download.createdAt";
TypedQuery<Download> queuedDownloadsQuery = em.createQuery(queuedQueryString, Download.class);
List<Download> queuedDownloads = queuedDownloadsQuery.getResultList();
int queueSize = queuedDownloads.size();
if (queueSize == 0) {
return;
}

Map<String, String> sessionIds = new HashMap<>();
if (maxActiveDownloads <= 0) {
// No limits on how many to submit
logger.trace("Preparing {} queued downloads", queuedDownloads.size());
for (Download queuedDownload : queuedDownloads) {
queuedDownload.setStatus(DownloadStatus.PREPARING);
prepareDownload(queuedDownload, null, getQueueSessionId(sessionIds, queuedDownload.getFacilityName()));
}
logger.trace("Preparing 1 out of {} queued downloads", queueSize);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless I'm missing something, I think it would be good to have this at info level. It's only going to be output when a queued item is being moved out of the queue (I think) so it would be useful to be able to trace which bit of logic is taking it out of the queue.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah when this was trying to start all the Downloads it could (rather than one at a time) we logged before iterating over the list, which may have been empty. Which meant logging even when there was no work to do and we demoted this to trace. Now, because we return early if the list is empty, we only log when there is work to do so can increase this back to info I think.

Download queuedDownload = queuedDownloads.get(0);
queuedDownload.setStatus(DownloadStatus.PREPARING);
prepareDownload(queuedDownload, null, getQueueSessionId(sessionIds, queuedDownload.getFacilityName()));
} else {
logger.trace("Preparing up to {} queued downloads", availableDownloads);
logger.trace("Preparing 1 out of {} queued downloads as {} spaces available", queueSize, availableDownloads);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise I think this would be useful at info level

HashMap<Integer, List<Download>> mapping = new HashMap<>();
for (Download queuedDownload : queuedDownloads) {
String sessionId = getQueueSessionId(sessionIds, queuedDownload.getFacilityName());
Expand All @@ -429,33 +425,26 @@ public void startQueuedDownloads(int maxActiveDownloads) throws Exception {
// Highest priority, prepare now
queuedDownload.setStatus(DownloadStatus.PREPARING);
prepareDownload(queuedDownload, null, sessionId);
availableDownloads -= 1;
if (availableDownloads <= 0) {
return;
}
return;
} else {
// Lower priority, add to mapping
mapping.putIfAbsent(priority, new ArrayList<>());
mapping.get(priority).add(queuedDownload);
}
}

// Get the highest priority encountered
List<Integer> keyList = new ArrayList<>();
for (Object key : mapping.keySet().toArray()) {
keyList.add((Integer) key);
}
Collections.sort(keyList);
for (int key : keyList) {
// Prepare from mapping in priority order
List<Download> downloadList = mapping.get(key);
for (Download download : downloadList) {
download.setStatus(DownloadStatus.PREPARING);
prepareDownload(download, null, getQueueSessionId(sessionIds, download.getFacilityName()));
availableDownloads -= 1;
if (availableDownloads <= 0) {
return;
}
}
}
int priority = Collections.min(keyList);

// Prepare the first Download at this priority level
List<Download> downloadList = mapping.get(priority);
Download download = downloadList.get(0);
download.setStatus(DownloadStatus.PREPARING);
prepareDownload(download, null, getQueueSessionId(sessionIds, download.getFacilityName()));
}
}

Expand Down
24 changes: 12 additions & 12 deletions src/test/java/org/icatproject/topcat/StatusCheckTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -753,8 +753,8 @@ public void testExceptionDelays() throws Exception {

@Test
@Transactional
public void testStartQueuedDownloadsNegative() throws Exception {
System.out.println("DEBUG testStartQueuedDownloadsNegative");
public void testStartQueuedDownloadNegative() throws Exception {
System.out.println("DEBUG testStartQueuedDownloadNegative");
Long downloadId1 = null;
Long downloadId2 = null;
try {
Expand All @@ -766,15 +766,15 @@ public void testStartQueuedDownloadsNegative() throws Exception {
downloadId1 = dummyDownload1.getId();
downloadId2 = dummyDownload2.getId();

statusCheck.startQueuedDownloads(-1);
statusCheck.startQueuedDownload(-1);

Download postDownload1 = TestHelpers.getDummyDownload(downloadId1, downloadRepository);
Download postDownload2 = TestHelpers.getDummyDownload(downloadId2, downloadRepository);

assertEquals(DownloadStatus.RESTORING, postDownload1.getStatus());
assertNotNull(postDownload1.getPreparedId());
assertEquals(DownloadStatus.RESTORING, postDownload2.getStatus());
assertNotNull(postDownload2.getPreparedId());
assertEquals(DownloadStatus.QUEUED, postDownload2.getStatus());
assertNull(postDownload2.getPreparedId());
} finally {
// clean up
TestHelpers.deleteDummyDownload(downloadId1, downloadRepository);
Expand All @@ -784,15 +784,15 @@ public void testStartQueuedDownloadsNegative() throws Exception {

@Test
@Transactional
public void testStartQueuedDownloadsZero() throws Exception {
public void testStartQueuedDownloadZero() throws Exception {
Long downloadId = null;
try {
String transport = "http";
Download dummyDownload = TestHelpers.createDummyDownload("DummyUserName", null, transport, true,
DownloadStatus.QUEUED, false, downloadRepository);
downloadId = dummyDownload.getId();

statusCheck.startQueuedDownloads(0);
statusCheck.startQueuedDownload(0);

// Download status should still be QUEUED, as we unqueued a max of 0 downloads

Expand All @@ -808,8 +808,8 @@ public void testStartQueuedDownloadsZero() throws Exception {

@Test
@Transactional
public void testStartQueuedDownloadsNonZero() throws Exception {
System.out.println("DEBUG testStartQueuedDownloadsNonZero");
public void testStartQueuedDownloadNonZero() throws Exception {
System.out.println("DEBUG testStartQueuedDownloadNonZero");
Long downloadId1 = null;
Long downloadId2 = null;
try {
Expand All @@ -821,7 +821,7 @@ public void testStartQueuedDownloadsNonZero() throws Exception {
downloadId1 = dummyDownload1.getId();
downloadId2 = dummyDownload2.getId();

statusCheck.startQueuedDownloads(1);
statusCheck.startQueuedDownload(1);

Download postDownload1 = TestHelpers.getDummyDownload(downloadId1, downloadRepository);
Download postDownload2 = TestHelpers.getDummyDownload(downloadId2, downloadRepository);
Expand All @@ -839,7 +839,7 @@ public void testStartQueuedDownloadsNonZero() throws Exception {

@Test
@Transactional
public void testStartQueuedDownloadsNonZeroRestoringDownload() throws Exception {
public void testStartQueuedDownloadNonZeroRestoringDownload() throws Exception {
Long downloadId1 = null;
Long downloadId2 = null;
try {
Expand All @@ -851,7 +851,7 @@ public void testStartQueuedDownloadsNonZeroRestoringDownload() throws Exception
downloadId1 = dummyDownload1.getId();
downloadId2 = dummyDownload2.getId();

statusCheck.startQueuedDownloads(1);
statusCheck.startQueuedDownload(1);

// Should not schedule the second Download, as we already have 1 which is
// RESTORING
Expand Down
21 changes: 15 additions & 6 deletions tools/datagateway_admin
Original file line number Diff line number Diff line change
Expand Up @@ -300,19 +300,28 @@ def queue_files():
transport = input("Enter transport mechanism: ")
email = input("Enter email to notify upon completion: ")
local_file = input("Enter path to local file containing newline delimited file locations: ")
with open(local_file) as f:
files = [l.strip() for l in f.readlines()]

i = 1
files = []
data = {
"facilityName": facility_name,
"sessionId": session_id,
"transport": transport,
"email": email,
"files": files,
}
url = topcat_url + "/user/queue/files"
print(requests.post(url=url, data=data, verify=verifySsl).text)

with open(local_file) as f:
for line in f.readlines():
files.append(line.strip())
if len(files) >= 10000:
data["files"] = files
data["fileName"] = f"{facility_name}_files_part_{i}"
print(requests.post(url=url, data=data, verify=verifySsl).text)
i += 1
files = []

if files:
data["files"] = files
print(requests.post(url=url, data=data, verify=verifySsl).text)


def get_all_queued_downloads():
Expand Down