Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/main/config/run.properties.example
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,11 @@ queue.maxActiveDownloads = 10
# Limit the number files per queued Download part. Multiple Datasets will be combined into part
# Downloads based on their fileCount up to this limit. If a single Dataset has a fileCount
# greater than this limit, it will still be submitted in a part by itself.
queue.maxFileCount = 10000
queue.visit.maxPartFileCount = 10000

# Requests to the /queue/files endpoint will be rejected if they exceed this number of files
# Any chunking should be done clientside
queue.files.maxFileCount = 10000

# When queueing Downloads a positive priority will allow a User to proceed.
# Non-positive values will block that User from submitting a request to the queue.
Expand Down
90 changes: 68 additions & 22 deletions src/main/java/org/icatproject/topcat/IcatClient.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package org.icatproject.topcat;

import java.util.Map;
import java.util.Set;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.ArrayList;
Expand All @@ -21,6 +23,30 @@

public class IcatClient {

public class DatafilesResponse {
public final List<Long> ids = new ArrayList<>();
public final Set<String> missing = new HashSet<>();
public long totalSize = 0L;

/**
* Submit a query for Datafiles, then appends the ids, increments the size, and
* records any missing file locations.
*
* @param query Query to submit
* @throws TopcatException If the query returns not authorized, or not found.
*/
public void submitDatafilesQuery(String query)
throws TopcatException {
JsonArray jsonArray = submitQuery(query);
for (JsonObject jsonObject : jsonArray.getValuesAs(JsonObject.class)) {
JsonObject datafile = jsonObject.getJsonObject("Datafile");
ids.add(datafile.getJsonNumber("id").longValueExact());
missing.remove(datafile.getString("location"));
totalSize += datafile.getJsonNumber("fileSize").longValueExact();
}
}
}

private Logger logger = LoggerFactory.getLogger(IcatClient.class);

private HttpClient httpClient;
Expand Down Expand Up @@ -139,7 +165,7 @@ public String getFullName() throws TopcatException {
* @throws TopcatException
*/
public JsonArray getDatasets(String visitId) throws TopcatException {
String query = "SELECT dataset.id, dataset.fileCount from Dataset dataset";
String query = "SELECT dataset.id, dataset.fileCount, dataset.fileSize from Dataset dataset";
query += " WHERE dataset.investigation.visitId = '" + visitId + "' ORDER BY dataset.id";
return submitQuery(query);
}
Expand All @@ -153,45 +179,44 @@ public JsonArray getDatasets(String visitId) throws TopcatException {
* @throws TopcatException
* @throws UnsupportedEncodingException
*/
public List<Long> getDatafiles(List<String> files) throws TopcatException, UnsupportedEncodingException {
List<Long> datafileIds = new ArrayList<>();
public DatafilesResponse getDatafiles(List<String> files) throws TopcatException, UnsupportedEncodingException {
DatafilesResponse response = new DatafilesResponse();
if (files.size() == 0) {
// Ensure that we don't error when calling .next() below by returning early
return datafileIds;
return response;
}

// Total limit - "entityManager?sessionId=" - `sessionId` - "?query=" - `queryPrefix` - `querySuffix
// Limit is 1024 - 24 - 36 - 7 - 51 - 17
// Limit is 1024 - 24 - 36 - 7 - 48 - 17
int getUrlLimit = Integer.parseInt(Properties.getInstance().getProperty("getUrlLimit", "1024"));
int chunkLimit = getUrlLimit - 135;
String queryPrefix = "SELECT d.id from Datafile d WHERE d.location in (";
int chunkLimit = getUrlLimit - 132;
String queryPrefix = "SELECT d from Datafile d WHERE d.location in (";
String querySuffix = ") ORDER BY d.id";
ListIterator<String> iterator = files.listIterator();

String chunkedFiles = "'" + iterator.next() + "'";
String file = iterator.next();
String chunkedFiles = "'" + file + "'";
response.missing.add(file);
int chunkSize = URLEncoder.encode(chunkedFiles, "UTF8").length();
while (iterator.hasNext()) {
String file = "'" + iterator.next() + "'";
int encodedFileLength = URLEncoder.encode(file, "UTF8").length();
file = iterator.next();
String quotedFile = "'" + file + "'";
int encodedFileLength = URLEncoder.encode(quotedFile, "UTF8").length();
if (chunkSize + 3 + encodedFileLength > chunkLimit) {
JsonArray jsonArray = submitQuery(queryPrefix + chunkedFiles + querySuffix);
for (JsonNumber datafileIdJsonNumber : jsonArray.getValuesAs(JsonNumber.class)) {
datafileIds.add(datafileIdJsonNumber.longValueExact());
}
response.submitDatafilesQuery(queryPrefix + chunkedFiles + querySuffix);

chunkedFiles = file;
chunkedFiles = quotedFile;
chunkSize = encodedFileLength;
response.missing.add(file);
} else {
chunkedFiles += "," + file;
chunkedFiles += "," + quotedFile;
chunkSize += 3 + encodedFileLength; // 3 is size of , when encoded as %2C
response.missing.add(file);
}
}
JsonArray jsonArray = submitQuery(queryPrefix + chunkedFiles + querySuffix);
for (JsonNumber datafileIdJsonNumber : jsonArray.getValuesAs(JsonNumber.class)) {
datafileIds.add(datafileIdJsonNumber.longValueExact());
}
response.submitDatafilesQuery(queryPrefix + chunkedFiles + querySuffix);

return datafileIds;
return response;
}

/**
Expand All @@ -209,6 +234,21 @@ public long getDatasetFileCount(long datasetId) throws TopcatException {
return jsonArray.getJsonNumber(0).longValueExact();
}

/**
* Utility method to get the fileSize (not size) of a Dataset by SELECTing its
* child Datafiles. Ideally the fileSize field should be used, this is a
* fallback option if that field is not set.
*
* @param datasetId ICAT Dataset.id
* @return The total size of Datafiles in the specified Dataset
* @throws TopcatException
*/
public long getDatasetFileSize(long datasetId) throws TopcatException {
String query = "SELECT SUM(datafile.fileSize) FROM Datafile datafile WHERE datafile.dataset.id = " + datasetId;
JsonArray jsonArray = submitQuery(query);
return jsonArray.getJsonNumber(0).longValueExact();
}

/**
* Utility method for submitting an unencoded query to the entityManager
* endpoint, and returning the resultant JsonArray.
Expand Down Expand Up @@ -238,6 +278,11 @@ private JsonArray submitQuery(String query) throws TopcatException {
/**
* Gets a single Entity of the specified type, without any other conditions.
*
* NOTE: This function is written and intended for getting Investigation,
* Dataset or Datafile entities as part of the tests. It does not handle casing of
* entities containing multiple words, or querying for a specific instance of an
* entity.
*
* @param entityType Type of ICAT Entity to get
* @return A single ICAT Entity of the specified type as a JsonObject
* @throws TopcatException
Expand Down Expand Up @@ -391,7 +436,8 @@ public int getQueuePriority(String userName) throws TopcatException {
}
}

if (!userName.startsWith(Properties.getInstance().getProperty("anonUserName"))) {
String anonUserName = Properties.getInstance().getProperty("anonUserName");
if (anonUserName == null || !userName.startsWith(anonUserName)) {
// The anonymous cart username will end with the user's sessionId so cannot do .equals
return priorityMap.getAuthenticatedPriority();
} else {
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/org/icatproject/topcat/PriorityMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ public PriorityMap() {
Properties properties = Properties.getInstance();

anonUserName = properties.getProperty("anonUserName", "");
if (anonUserName.equals("")) {
logger.warn("anonUserName not defined, cannot distinguish anonymous and authenticated users so "
+ "authenticated priority will be used as default level");
}
anonDownloadEnabled = Boolean.parseBoolean(properties.getProperty("anonDownloadEnabled", "true"));
String defaultString;
if (anonDownloadEnabled) {
Expand Down
66 changes: 35 additions & 31 deletions src/main/java/org/icatproject/topcat/StatusCheck.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import jakarta.ejb.EJB;
import jakarta.ejb.Schedule;
import jakarta.ejb.Singleton;
import jakarta.json.JsonObject;
import jakarta.persistence.EntityManager;
import jakarta.persistence.PersistenceContext;
import jakarta.persistence.TypedQuery;
Expand Down Expand Up @@ -114,35 +115,35 @@ public void updateStatuses(int pollDelay, int pollIntervalWait, IdsClient inject
String queryString = selectString + " and " + notExpiredCondition + " and (" + isActiveCondition + ")";

TypedQuery<Download> query = em.createQuery(queryString, Download.class);
List<Download> downloads = query.getResultList();
List<Download> downloads = query.getResultList();

for (Download download : downloads) {
Date lastCheck = lastChecks.get(download.getId());
Date now = new Date();
long createdSecondsAgo = (now.getTime() - download.getCreatedAt().getTime()) / 1000;
Date lastCheck = lastChecks.get(download.getId());
Date now = new Date();
long createdSecondsAgo = (now.getTime() - download.getCreatedAt().getTime()) / 1000;
if (download.getStatus() == DownloadStatus.PREPARING) {
// If prepareDownload was called previously but caught an exception (other than
// TopcatException), we should not call it again immediately, but should impose
// a delay. See issue #462.
if (lastCheck == null) {
prepareDownload(download, injectedIdsClient);
} else {
long lastCheckSecondsAgo = (now.getTime() - lastCheck.getTime()) / 1000;
prepareDownload(download, injectedIdsClient);
} else {
long lastCheckSecondsAgo = (now.getTime() - lastCheck.getTime()) / 1000;
if (lastCheckSecondsAgo >= pollIntervalWait) {
prepareDownload(download, injectedIdsClient);
}
}
} else if (createdSecondsAgo >= pollDelay) {
prepareDownload(download, injectedIdsClient);
}
}
} else if (download.getPreparedId() != null && createdSecondsAgo >= pollDelay) {
if (lastCheck == null) {
performCheck(download, injectedIdsClient);
} else {
long lastCheckSecondsAgo = (now.getTime() - lastCheck.getTime()) / 1000;
performCheck(download, injectedIdsClient);
} else {
long lastCheckSecondsAgo = (now.getTime() - lastCheck.getTime()) / 1000;
if (lastCheckSecondsAgo >= pollIntervalWait) {
performCheck(download, injectedIdsClient);
}
performCheck(download, injectedIdsClient);
}
}
}
}
}
}

private void performCheck(Download download, IdsClient injectedIdsClient) {
Expand Down Expand Up @@ -250,12 +251,14 @@ private void prepareDownload(Download download, IdsClient injectedIdsClient, Str
String preparedId = idsClient.prepareData(sessionId, download.getInvestigationIds(), download.getDatasetIds(), download.getDatafileIds());
download.setPreparedId(preparedId);

try {
Long size = idsClient.getSize(sessionId, download.getInvestigationIds(), download.getDatasetIds(), download.getDatafileIds());
download.setSize(size);
} catch(Exception e) {
logger.error("prepareDownload: setting size to -1 as getSize threw exception: " + e.getMessage());
download.setSize(-1);
if (download.getSize() <= 0) {
try {
Long size = idsClient.getSize(sessionId, download.getInvestigationIds(), download.getDatasetIds(), download.getDatafileIds());
download.setSize(size);
} catch(Exception e) {
logger.error("prepareDownload: setting size to -1 as getSize threw exception: " + e.getMessage());
download.setSize(-1);
}
}

if (download.getIsTwoLevel() || !download.getTransport().matches("https|http")) {
Expand Down Expand Up @@ -285,19 +288,20 @@ private void prepareDownload(Download download, IdsClient injectedIdsClient, Str
* @param sessionIds Map from Facility to functional sessionId
* @param facilityName Name of ICAT Facility to get the sessionId for
* @return Functional ICAT sessionId
* @throws InternalException If the facilityName cannot be mapped to an ICAT url
* @throws BadRequestException If the login fails
* @throws Exception If the login fails
*/
private String getQueueSessionId(Map<String, String> sessionIds, String facilityName)
throws InternalException, BadRequestException {
throws Exception {
String sessionId = sessionIds.get(facilityName);
if (sessionId == null) {
IcatClient icatClient = new IcatClient(FacilityMap.getInstance().getIcatUrl(facilityName));
Properties properties = Properties.getInstance();
String plugin = properties.getProperty("queue.account." + facilityName + ".plugin");
String username = properties.getProperty("queue.account." + facilityName + ".username");
String password = properties.getProperty("queue.account." + facilityName + ".password");
sessionId = icatClient.login(plugin, username, password);
String jsonString = icatClient.login(plugin, username, password);
JsonObject jsonObject = Utils.parseJsonObject(jsonString);
sessionId = jsonObject.getString("sessionId");
sessionIds.put(facilityName, sessionId);
}
return sessionId;
Expand All @@ -315,7 +319,7 @@ private String getQueueSessionId(Map<String, String> sessionIds, String facility
*/
public void startQueuedDownloads(int maxActiveDownloads) throws Exception {
if (maxActiveDownloads == 0) {
logger.debug("Preparing of queued jobs disabled by config, skipping");
logger.trace("Preparing of queued jobs disabled by config, skipping");
return;
}

Expand All @@ -332,7 +336,7 @@ public void startQueuedDownloads(int maxActiveDownloads) throws Exception {
int activeDownloadsSize = activeDownloads.size();
if (activeDownloadsSize >= maxActiveDownloads) {
String format = "More downloads currently RESTORING {} than maxActiveDownloads {}, cannot prepare queued jobs";
logger.info(format, activeDownloadsSize, maxActiveDownloads);
logger.trace(format, activeDownloadsSize, maxActiveDownloads);
return;
}
availableDownloads -= activeDownloadsSize;
Expand All @@ -346,13 +350,13 @@ public void startQueuedDownloads(int maxActiveDownloads) throws Exception {
Map<String, String> sessionIds = new HashMap<>();
if (maxActiveDownloads <= 0) {
// No limits on how many to submit
logger.info("Preparing {} queued downloads", queuedDownloads.size());
logger.trace("Preparing {} queued downloads", queuedDownloads.size());
for (Download queuedDownload : queuedDownloads) {
queuedDownload.setStatus(DownloadStatus.PREPARING);
prepareDownload(queuedDownload, null, getQueueSessionId(sessionIds, queuedDownload.getFacilityName()));
}
} else {
logger.info("Preparing up to {} queued downloads", availableDownloads);
logger.trace("Preparing up to {} queued downloads", availableDownloads);
HashMap<Integer, List<Download>> mapping = new HashMap<>();
for (Download queuedDownload : queuedDownloads) {
String sessionId = getQueueSessionId(sessionIds, queuedDownload.getFacilityName());
Expand Down
Loading
Loading