Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/main/config/run.properties.example
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,11 @@ queue.maxActiveDownloads = 10
# Limit the number files per queued Download part. Multiple Datasets will be combined into part
# Downloads based on their fileCount up to this limit. If a single Dataset has a fileCount
# greater than this limit, it will still be submitted in a part by itself.
queue.maxFileCount = 10000
queue.visit.maxPartFileCount = 10000

# Requests to the /queue/files endpoint will be rejected if they exceed this number of files
# Any chunking should be done clientside
queue.files.maxFileCount = 10000

# When queueing Downloads a positive priority will allow a User to proceed.
# Non-positive values will block that User from submitting a request to the queue.
Expand Down
72 changes: 55 additions & 17 deletions src/main/java/org/icatproject/topcat/IcatClient.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package org.icatproject.topcat;

import java.util.Map;
import java.util.Set;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.ArrayList;
Expand All @@ -21,6 +23,12 @@

public class IcatClient {

public class DatafilesResponse {
public final List<Long> ids = new ArrayList<>();
public final Set<String> missing = new HashSet<>();
public long totalSize = 0L;
}

private Logger logger = LoggerFactory.getLogger(IcatClient.class);

private HttpClient httpClient;
Expand Down Expand Up @@ -139,7 +147,7 @@ public String getFullName() throws TopcatException {
* @throws TopcatException
*/
public JsonArray getDatasets(String visitId) throws TopcatException {
String query = "SELECT dataset.id, dataset.fileCount from Dataset dataset";
String query = "SELECT dataset.id, dataset.fileCount, dataset.fileSize from Dataset dataset";
query += " WHERE dataset.investigation.visitId = '" + visitId + "' ORDER BY dataset.id";
return submitQuery(query);
}
Expand All @@ -153,45 +161,56 @@ public JsonArray getDatasets(String visitId) throws TopcatException {
* @throws TopcatException
* @throws UnsupportedEncodingException
*/
public List<Long> getDatafiles(List<String> files) throws TopcatException, UnsupportedEncodingException {
List<Long> datafileIds = new ArrayList<>();
public DatafilesResponse getDatafiles(List<String> files) throws TopcatException, UnsupportedEncodingException {
DatafilesResponse response = new DatafilesResponse();
if (files.size() == 0) {
// Ensure that we don't error when calling .next() below by returning early
return datafileIds;
return response;
}

// Total limit - "entityManager?sessionId=" - `sessionId` - "?query=" - `queryPrefix` - `querySuffix
// Limit is 1024 - 24 - 36 - 7 - 51 - 17
// Limit is 1024 - 24 - 36 - 7 - 48 - 17
int getUrlLimit = Integer.parseInt(Properties.getInstance().getProperty("getUrlLimit", "1024"));
int chunkLimit = getUrlLimit - 135;
String queryPrefix = "SELECT d.id from Datafile d WHERE d.location in (";
int chunkLimit = getUrlLimit - 132;
String queryPrefix = "SELECT d from Datafile d WHERE d.location in (";
String querySuffix = ") ORDER BY d.id";
ListIterator<String> iterator = files.listIterator();

String chunkedFiles = "'" + iterator.next() + "'";
String file = iterator.next();
String chunkedFiles = "'" + file + "'";
response.missing.add(file);
int chunkSize = URLEncoder.encode(chunkedFiles, "UTF8").length();
while (iterator.hasNext()) {
String file = "'" + iterator.next() + "'";
int encodedFileLength = URLEncoder.encode(file, "UTF8").length();
file = iterator.next();
String quotedFile = "'" + file + "'";
int encodedFileLength = URLEncoder.encode(quotedFile, "UTF8").length();
if (chunkSize + 3 + encodedFileLength > chunkLimit) {
JsonArray jsonArray = submitQuery(queryPrefix + chunkedFiles + querySuffix);
for (JsonNumber datafileIdJsonNumber : jsonArray.getValuesAs(JsonNumber.class)) {
datafileIds.add(datafileIdJsonNumber.longValueExact());
for (JsonObject jsonObject : jsonArray.getValuesAs(JsonObject.class)) {
JsonObject datafile = jsonObject.getJsonObject("Datafile");
response.ids.add(datafile.getJsonNumber("id").longValueExact());
response.missing.remove(datafile.getString("location"));
response.totalSize += datafile.getJsonNumber("fileSize").longValueExact();
}

chunkedFiles = file;
chunkedFiles = quotedFile;
chunkSize = encodedFileLength;
response.missing.add(file);
} else {
chunkedFiles += "," + file;
chunkedFiles += "," + quotedFile;
chunkSize += 3 + encodedFileLength; // 3 is size of , when encoded as %2C
response.missing.add(file);
}
}
JsonArray jsonArray = submitQuery(queryPrefix + chunkedFiles + querySuffix);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These 7 lines are a copy of those just above, so should go in a separate method to avoid duplication. There was some duplication previously but it is now worse with the use of the new response object and the additional values being populated.

for (JsonNumber datafileIdJsonNumber : jsonArray.getValuesAs(JsonNumber.class)) {
datafileIds.add(datafileIdJsonNumber.longValueExact());
for (JsonObject jsonObject : jsonArray.getValuesAs(JsonObject.class)) {
JsonObject datafile = jsonObject.getJsonObject("Datafile");
response.ids.add(datafile.getJsonNumber("id").longValueExact());
response.missing.remove(datafile.getString("location"));
response.totalSize += datafile.getJsonNumber("fileSize").longValueExact();
}

return datafileIds;
return response;
}

/**
Expand All @@ -209,6 +228,25 @@ public long getDatasetFileCount(long datasetId) throws TopcatException {
return jsonArray.getJsonNumber(0).longValueExact();
}

/**
* Utility method to get the fileSize (not size) of a Dataset by SELECTing its
* child Datafiles. Ideally the fileSize field should be used, this is a
* fallback option if that field is not set.
*
* @param datasetId ICAT Dataset.id
* @return The total size of Datafiles in the specified Dataset
* @throws TopcatException
*/
public long getDatasetFileSize(long datasetId) throws TopcatException {
String query = "SELECT datafile.fileSize FROM Datafile datafile WHERE datafile.dataset.id = " + datasetId;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why you didn't use a SELECT SUM query here? I think the ICAT API supports them.

JsonArray jsonArray = submitQuery(query);
long size = 0L;
for (JsonNumber number : jsonArray.getValuesAs(JsonNumber.class)) {
size += number.longValueExact();
}
return size;
}

/**
* Utility method for submitting an unencoded query to the entityManager
* endpoint, and returning the resultant JsonArray.
Expand Down
66 changes: 35 additions & 31 deletions src/main/java/org/icatproject/topcat/StatusCheck.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import jakarta.ejb.EJB;
import jakarta.ejb.Schedule;
import jakarta.ejb.Singleton;
import jakarta.json.JsonObject;
import jakarta.persistence.EntityManager;
import jakarta.persistence.PersistenceContext;
import jakarta.persistence.TypedQuery;
Expand Down Expand Up @@ -114,35 +115,35 @@ public void updateStatuses(int pollDelay, int pollIntervalWait, IdsClient inject
String queryString = selectString + " and " + notExpiredCondition + " and (" + isActiveCondition + ")";

TypedQuery<Download> query = em.createQuery(queryString, Download.class);
List<Download> downloads = query.getResultList();
List<Download> downloads = query.getResultList();

for (Download download : downloads) {
Date lastCheck = lastChecks.get(download.getId());
Date now = new Date();
long createdSecondsAgo = (now.getTime() - download.getCreatedAt().getTime()) / 1000;
Date lastCheck = lastChecks.get(download.getId());
Date now = new Date();
long createdSecondsAgo = (now.getTime() - download.getCreatedAt().getTime()) / 1000;
if (download.getStatus() == DownloadStatus.PREPARING) {
// If prepareDownload was called previously but caught an exception (other than
// TopcatException), we should not call it again immediately, but should impose
// a delay. See issue #462.
if (lastCheck == null) {
prepareDownload(download, injectedIdsClient);
} else {
long lastCheckSecondsAgo = (now.getTime() - lastCheck.getTime()) / 1000;
prepareDownload(download, injectedIdsClient);
} else {
long lastCheckSecondsAgo = (now.getTime() - lastCheck.getTime()) / 1000;
if (lastCheckSecondsAgo >= pollIntervalWait) {
prepareDownload(download, injectedIdsClient);
}
}
} else if (createdSecondsAgo >= pollDelay) {
prepareDownload(download, injectedIdsClient);
}
}
} else if (download.getPreparedId() != null && createdSecondsAgo >= pollDelay) {
if (lastCheck == null) {
performCheck(download, injectedIdsClient);
} else {
long lastCheckSecondsAgo = (now.getTime() - lastCheck.getTime()) / 1000;
performCheck(download, injectedIdsClient);
} else {
long lastCheckSecondsAgo = (now.getTime() - lastCheck.getTime()) / 1000;
if (lastCheckSecondsAgo >= pollIntervalWait) {
performCheck(download, injectedIdsClient);
}
performCheck(download, injectedIdsClient);
}
}
}
}
}
}

private void performCheck(Download download, IdsClient injectedIdsClient) {
Expand Down Expand Up @@ -250,12 +251,14 @@ private void prepareDownload(Download download, IdsClient injectedIdsClient, Str
String preparedId = idsClient.prepareData(sessionId, download.getInvestigationIds(), download.getDatasetIds(), download.getDatafileIds());
download.setPreparedId(preparedId);

try {
Long size = idsClient.getSize(sessionId, download.getInvestigationIds(), download.getDatasetIds(), download.getDatafileIds());
download.setSize(size);
} catch(Exception e) {
logger.error("prepareDownload: setting size to -1 as getSize threw exception: " + e.getMessage());
download.setSize(-1);
if (download.getSize() <= 0) {
try {
Long size = idsClient.getSize(sessionId, download.getInvestigationIds(), download.getDatasetIds(), download.getDatafileIds());
download.setSize(size);
} catch(Exception e) {
logger.error("prepareDownload: setting size to -1 as getSize threw exception: " + e.getMessage());
download.setSize(-1);
}
}

if (download.getIsTwoLevel() || !download.getTransport().matches("https|http")) {
Expand Down Expand Up @@ -285,19 +288,20 @@ private void prepareDownload(Download download, IdsClient injectedIdsClient, Str
* @param sessionIds Map from Facility to functional sessionId
* @param facilityName Name of ICAT Facility to get the sessionId for
* @return Functional ICAT sessionId
* @throws InternalException If the facilityName cannot be mapped to an ICAT url
* @throws BadRequestException If the login fails
* @throws Exception If the login fails
*/
private String getQueueSessionId(Map<String, String> sessionIds, String facilityName)
throws InternalException, BadRequestException {
throws Exception {
String sessionId = sessionIds.get(facilityName);
if (sessionId == null) {
IcatClient icatClient = new IcatClient(FacilityMap.getInstance().getIcatUrl(facilityName));
Properties properties = Properties.getInstance();
String plugin = properties.getProperty("queue.account." + facilityName + ".plugin");
String username = properties.getProperty("queue.account." + facilityName + ".username");
String password = properties.getProperty("queue.account." + facilityName + ".password");
sessionId = icatClient.login(plugin, username, password);
String jsonString = icatClient.login(plugin, username, password);
JsonObject jsonObject = Utils.parseJsonObject(jsonString);
sessionId = jsonObject.getString("sessionId");
sessionIds.put(facilityName, sessionId);
}
return sessionId;
Expand All @@ -315,7 +319,7 @@ private String getQueueSessionId(Map<String, String> sessionIds, String facility
*/
public void startQueuedDownloads(int maxActiveDownloads) throws Exception {
if (maxActiveDownloads == 0) {
logger.debug("Preparing of queued jobs disabled by config, skipping");
logger.trace("Preparing of queued jobs disabled by config, skipping");
return;
}

Expand All @@ -332,7 +336,7 @@ public void startQueuedDownloads(int maxActiveDownloads) throws Exception {
int activeDownloadsSize = activeDownloads.size();
if (activeDownloadsSize >= maxActiveDownloads) {
String format = "More downloads currently RESTORING {} than maxActiveDownloads {}, cannot prepare queued jobs";
logger.info(format, activeDownloadsSize, maxActiveDownloads);
logger.trace(format, activeDownloadsSize, maxActiveDownloads);
return;
}
availableDownloads -= activeDownloadsSize;
Expand All @@ -346,13 +350,13 @@ public void startQueuedDownloads(int maxActiveDownloads) throws Exception {
Map<String, String> sessionIds = new HashMap<>();
if (maxActiveDownloads <= 0) {
// No limits on how many to submit
logger.info("Preparing {} queued downloads", queuedDownloads.size());
logger.trace("Preparing {} queued downloads", queuedDownloads.size());
for (Download queuedDownload : queuedDownloads) {
queuedDownload.setStatus(DownloadStatus.PREPARING);
prepareDownload(queuedDownload, null, getQueueSessionId(sessionIds, queuedDownload.getFacilityName()));
}
} else {
logger.info("Preparing up to {} queued downloads", availableDownloads);
logger.trace("Preparing up to {} queued downloads", availableDownloads);
HashMap<Integer, List<Download>> mapping = new HashMap<>();
for (Download queuedDownload : queuedDownloads) {
String sessionId = getQueueSessionId(sessionIds, queuedDownload.getFacilityName());
Expand Down
Loading
Loading