Skip to content

Commit fe5fc55

Browse files
committed
Adds heartbeat to processes to handle multi-instance deployment
1 parent fdf3245 commit fe5fc55

File tree

8 files changed

+256
-8
lines changed

8 files changed

+256
-8
lines changed
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package org.dspace.app.process;
2+
3+
import org.apache.logging.log4j.Logger;
4+
import org.dspace.core.Context;
5+
import org.dspace.scripts.factory.ScriptServiceFactory;
6+
import org.dspace.scripts.service.ProcessService;
7+
8+
/**
9+
* Helper class for updating the heartbeat of the currently running instance processes.
10+
*/
11+
public class ProcessHeartbeatUpdater {
12+
private static final Logger log = org.apache.logging.log4j.LogManager.getLogger(ProcessHeartbeatUpdater.class);
13+
14+
private static final ProcessService processService = ScriptServiceFactory.getInstance().getProcessService();
15+
16+
public static void updateProcessesHeartbeats() {
17+
try {
18+
Context context = new Context();
19+
context.turnOffAuthorisationSystem();
20+
processService.updateProcessesHeartbeat(context);
21+
context.commit();
22+
} catch (Exception e) {
23+
log.error("Error when trying to update processes heartbeats", e);
24+
}
25+
}
26+
}

dspace-api/src/main/java/org/dspace/content/dao/ProcessDAO.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import java.sql.SQLException;
1111
import java.time.Instant;
1212
import java.util.List;
13+
import java.util.UUID;
1314

1415
import org.dspace.content.ProcessStatus;
1516
import org.dspace.core.Context;
@@ -98,6 +99,24 @@ int countTotalWithParameters(Context context, ProcessQueryParameterContainer pro
9899
List<Process> findByStatusAndCreationTimeOlderThan(Context context, List<ProcessStatus> statuses, Instant date)
99100
throws SQLException;
100101

102+
/**
103+
* Find the processes with RUNNING or SCHEDULED status, from the specified instance or from any other instance but
104+
* with expired heartbeat.
105+
* @param context The relevant DSpace context
106+
* @param instanceId UUID of the instance to retrieve processes
107+
* @return The list of all Processes from the instance or with expired heartbeat
108+
* @throws SQLException If something goes wrong
109+
*/
110+
List<Process> findRunningByInstanceIdOrExpiredHeartbeat(Context context, UUID instanceId) throws SQLException;
111+
112+
/**
113+
* Updates the heartbeat of all RUNNING and SCHEDULED Processes of the specified instance.
114+
* @param context The relevant DSpace context
115+
* @param instanceId UUID of the instance to update processes heartbeats
116+
* @throws SQLException If something goes wrong
117+
*/
118+
void updateProcessesHeartbeat(Context context, UUID instanceId) throws SQLException;
119+
101120
/**
102121
* Returns a list of all Process objects in the database by the given user.
103122
*

dspace-api/src/main/java/org/dspace/content/dao/impl/ProcessDAOImpl.java

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,21 @@
1010
import static org.dspace.scripts.Process_.CREATION_TIME;
1111

1212
import java.sql.SQLException;
13+
14+
import java.text.ParseException;
1315
import java.time.Instant;
16+
import java.util.Date;
1417
import java.util.LinkedList;
1518
import java.util.List;
1619
import java.util.Map;
20+
import java.util.UUID;
1721

1822
import jakarta.persistence.criteria.CriteriaBuilder;
1923
import jakarta.persistence.criteria.CriteriaQuery;
2024
import jakarta.persistence.criteria.Predicate;
2125
import jakarta.persistence.criteria.Root;
2226
import org.apache.commons.lang3.Strings;
27+
import org.apache.logging.log4j.core.util.CronExpression;
2328
import org.dspace.content.ProcessStatus;
2429
import org.dspace.content.dao.ProcessDAO;
2530
import org.dspace.core.AbstractHibernateDAO;
@@ -28,6 +33,8 @@
2833
import org.dspace.scripts.Process;
2934
import org.dspace.scripts.ProcessQueryParameterContainer;
3035
import org.dspace.scripts.Process_;
36+
import org.dspace.services.factory.DSpaceServicesFactory;
37+
import org.hibernate.query.Query;
3138

3239
/**
3340
*
@@ -169,6 +176,62 @@ public List<Process> findByStatusAndCreationTimeOlderThan(Context context, List<
169176
return list(context, criteriaQuery, false, Process.class, -1, -1);
170177
}
171178

179+
@Override
180+
public List<Process> findRunningByInstanceIdOrExpiredHeartbeat(Context context, UUID instanceId)
181+
throws SQLException {
182+
183+
Query<Process> query = getHibernateSession(context).createNativeQuery(
184+
"""
185+
SELECT p.* FROM process AS p
186+
WHERE
187+
(
188+
p.instance_id = :instanceId
189+
OR EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - p.last_heartbeat)) > 2 * :seconds
190+
)
191+
AND p.status IN :status
192+
""",
193+
Process.class
194+
);
195+
196+
// Based on the Process heartbeat update cron, calculates how many seconds have passed between last execution
197+
// and the next. Processes with last heartbeats older than this period are considered expired.
198+
try {
199+
CronExpression cron = new CronExpression(
200+
DSpaceServicesFactory.getInstance().getConfigurationService().getProperty(
201+
"process-heartbeat.cron",
202+
"0 */1 * * * ?"
203+
)
204+
);
205+
final var prev = cron.getPrevFireTime(new Date());
206+
final var next = cron.getNextValidTimeAfter(new Date());
207+
final var mils = next.getTime() - prev.getTime();
208+
final var secs = mils / 1000;
209+
query.setParameter("seconds", secs);
210+
} catch (ParseException e) {
211+
// Defaults to 1min.
212+
query.setParameter("seconds", 60);
213+
}
214+
215+
query.setParameter("instanceId", instanceId);
216+
query.setParameter("status", List.of(ProcessStatus.RUNNING.name(), ProcessStatus.SCHEDULED.name()));
217+
return query.getResultList();
218+
}
219+
220+
@Override
221+
public void updateProcessesHeartbeat(Context context, UUID instanceId) throws SQLException {
222+
Query<Void> query = getHibernateSession(context).createNativeQuery(
223+
"""
224+
UPDATE process AS p
225+
SET p.last_heartbeat = CURRENT_TIMESTAMP
226+
WHERE p.instance_id = :instance_id AND p.status IN :status
227+
""",
228+
Void.class
229+
);
230+
query.setParameter("instanceId", instanceId);
231+
query.setParameter("status", List.of(ProcessStatus.RUNNING.name(), ProcessStatus.SCHEDULED.name()));
232+
query.executeUpdate();
233+
}
234+
172235
@Override
173236
public List<Process> findByUser(Context context, EPerson user, int limit, int offset) throws SQLException {
174237
CriteriaBuilder criteriaBuilder = getCriteriaBuilder(context);
@@ -197,5 +260,3 @@ public int countByUser(Context context, EPerson user) throws SQLException {
197260
}
198261

199262
}
200-
201-

dspace-api/src/main/java/org/dspace/scripts/Process.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import java.time.Instant;
1111
import java.util.ArrayList;
1212
import java.util.List;
13+
import java.util.UUID;
1314

1415
import jakarta.persistence.CascadeType;
1516
import jakarta.persistence.Column;
@@ -34,6 +35,7 @@
3435
import org.dspace.eperson.EPerson;
3536
import org.dspace.eperson.Group;
3637
import org.hibernate.Length;
38+
import org.hibernate.annotations.CreationTimestamp;
3739

3840
/**
3941
* This class is the DB Entity representation of the Process object to be stored in the Database
@@ -90,6 +92,12 @@ public class Process implements ReloadableEntity<Integer> {
9092
@Column(name = "creation_time", nullable = false)
9193
private Instant creationTime;
9294

95+
@Column(name = "last_heartbeat")
96+
private Instant lastHeartbeat;
97+
98+
@Column(name = "instance_id")
99+
private UUID instanceId;
100+
93101
public static final String BITSTREAM_TYPE_METADATAFIELD = "dspace.process.filetype";
94102
public static final String OUTPUT_TYPE = "script_output";
95103

@@ -235,6 +243,22 @@ public void setGroups(List<Group> groups) {
235243
this.groups = groups;
236244
}
237245

246+
public UUID getInstanceId() {
247+
return instanceId;
248+
}
249+
250+
public void setInstanceId(UUID instanceId) {
251+
this.instanceId = instanceId;
252+
}
253+
254+
public Instant getLastHeartbeat() {
255+
return lastHeartbeat;
256+
}
257+
258+
public void setLastHeartbeat(Instant lastHeartbeat) {
259+
this.lastHeartbeat = lastHeartbeat;
260+
}
261+
238262
/**
239263
* Return <code>true</code> if <code>other</code> is the same Process
240264
* as this object, <code>false</code> otherwise
@@ -255,6 +279,7 @@ public boolean equals(Object other) {
255279
.append(this.getParameters(), ((Process) other).getParameters())
256280
.append(this.getCreationTime(), ((Process) other).getCreationTime())
257281
.append(this.getEPerson(), ((Process) other).getEPerson())
282+
.append(this.getInstanceId(), ((Process) other).getInstanceId())
258283
.isEquals());
259284
}
260285

@@ -270,6 +295,7 @@ public int hashCode() {
270295
.append(this.getParameters())
271296
.append(this.getCreationTime())
272297
.append(this.getEPerson())
298+
.append(this.getInstanceId())
273299
.toHashCode();
274300
}
275301
}

dspace-api/src/main/java/org/dspace/scripts/ProcessServiceImpl.java

Lines changed: 87 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@
1313
import java.io.FileWriter;
1414
import java.io.IOException;
1515
import java.io.InputStream;
16+
import java.nio.file.Files;
17+
import java.nio.file.Path;
18+
import java.nio.file.Paths;
1619
import java.sql.SQLException;
1720
import java.time.Instant;
1821
import java.time.format.DateTimeFormatter;
@@ -23,7 +26,9 @@
2326
import java.util.List;
2427
import java.util.Optional;
2528
import java.util.Set;
29+
import java.util.UUID;
2630
import java.util.regex.Pattern;
31+
import java.util.stream.Stream;
2732

2833
import org.apache.commons.collections4.ListUtils;
2934
import org.apache.commons.io.FileUtils;
@@ -48,6 +53,7 @@
4853
import org.dspace.eperson.Group;
4954
import org.dspace.scripts.service.ProcessService;
5055
import org.dspace.services.ConfigurationService;
56+
import org.dspace.services.factory.DSpaceServicesFactory;
5157
import org.springframework.beans.factory.annotation.Autowired;
5258

5359
/**
@@ -85,6 +91,8 @@ public Process create(Context context, EPerson ePerson, String scriptName,
8591
process.setName(scriptName);
8692
process.setParameters(DSpaceCommandLineParameter.concatenate(parameters));
8793
process.setCreationTime(Instant.now());
94+
process.setLastHeartbeat(process.getCreationTime());
95+
process.setInstanceId(getOrGenerateInstanceId());
8896
Optional.ofNullable(specialGroups)
8997
.ifPresent(sg -> {
9098
// we use a set to be sure no duplicated special groups are stored with process
@@ -327,24 +335,48 @@ public List<Process> findByStatusAndCreationTimeOlderThan(Context context, List<
327335
return this.processDAO.findByStatusAndCreationTimeOlderThan(context, statuses, date);
328336
}
329337

338+
@Override
339+
public List<Process> findRunningByInstanceIdOrExpiredHeartbeat(Context context) throws SQLException {
340+
return this.processDAO.findRunningByInstanceIdOrExpiredHeartbeat(context, getOrGenerateInstanceId());
341+
}
342+
343+
@Override
344+
public void updateProcessesHeartbeat(Context context) throws SQLException {
345+
this.processDAO.updateProcessesHeartbeat(context, getOrGenerateInstanceId());
346+
}
347+
330348
@Override
331349
public int countByUser(Context context, EPerson user) throws SQLException {
332350
return processDAO.countByUser(context, user);
333351
}
334352

335353
@Override
336354
public void failRunningProcesses(Context context) throws SQLException, IOException, AuthorizeException {
337-
List<Process> processesToBeFailed = findByStatusAndCreationTimeOlderThan(
338-
context, List.of(ProcessStatus.RUNNING, ProcessStatus.SCHEDULED), Instant.now());
355+
356+
List<Process> processesToBeFailed = findRunningByInstanceIdOrExpiredHeartbeat(context);
357+
339358
for (Process process : processesToBeFailed) {
340359
context.setCurrentUser(process.getEPerson());
341360
// Fail the process.
342-
log.info("Process with ID {} did not complete before tomcat shutdown, failing it now.", process.getID());
361+
UUID currentInstanceId = getOrGenerateInstanceId();
362+
363+
String message = String.format(
364+
"""
365+
Process with ID %s (created in instance '%s') did not complete before tomcat shutdown.
366+
Failing it now from %s.
367+
""",
368+
process.getID(),
369+
process.getInstanceId(),
370+
currentInstanceId.equals(process.getInstanceId())
371+
? "the same instance '" + currentInstanceId + "' (instance restarted)"
372+
: "another instance '" + currentInstanceId + "' (expired heartbeat)"
373+
);
374+
375+
log.info(message);
376+
343377
fail(context, process);
344378
// But still attach its log to the process.
345-
appendLog(process.getID(), process.getName(),
346-
"Process did not complete before tomcat shutdown.",
347-
ProcessLogLevel.ERROR);
379+
appendLog(process.getID(), process.getName(), message, ProcessLogLevel.ERROR);
348380
createLogBitstream(context, process);
349381
}
350382
}
@@ -374,4 +406,53 @@ private File getLogsDirectory() {
374406
}
375407
return logsDir;
376408
}
409+
410+
/**
411+
* Gets, or creates if not exists, the folder that stores the file that identifies the instance.
412+
* @return Path to the foder containing the instance file.
413+
* @throws IOException If something goes wrong
414+
*/
415+
private Path getOrCreateInstanceIdFolder() throws IOException {
416+
Path instanceFolder = Paths.get(
417+
DSpaceServicesFactory.getInstance().getConfigurationService().getProperty("dspace.dir") + "/instance"
418+
);
419+
if (!Files.exists(instanceFolder)) {
420+
Files.createDirectories(instanceFolder);
421+
} else if (!Files.isDirectory(instanceFolder)) {
422+
// Não deveria ocorrer.
423+
throw new RuntimeException();
424+
}
425+
return instanceFolder;
426+
}
427+
428+
/**
429+
* Get, or creates if not exists, the UUID that identifies the currently running instance. This UUID is stored
430+
* in '[dspace.dir]/instance/[uuid]'.
431+
* @return
432+
*/
433+
private UUID getOrGenerateInstanceId() {
434+
UUID instanceId = null;
435+
try {
436+
Path instanceFolder = getOrCreateInstanceIdFolder();
437+
438+
// Gets UUID from file if exists, or creates a new one
439+
try (Stream<Path> filesStream = Files.walk(instanceFolder).filter(Files::isRegularFile)) {
440+
List<Path> instanceFiles = filesStream.toList();
441+
if (instanceFiles.isEmpty()) {
442+
// No file exists yet, creates a new UUID and a file named by it
443+
instanceId = UUID.randomUUID();
444+
Path instanceFile = instanceFolder.resolve(instanceId.toString());
445+
Files.createFile(instanceFile);
446+
} else if (instanceFiles.size() == 1) {
447+
// If file exists, should be named as a valid UUID
448+
instanceId = UUID.fromString(instanceFiles.get(0).getFileName().toString());
449+
} else {
450+
// Should not occur
451+
throw new IOException("Found more than one instance id file");
452+
}
453+
}
454+
} catch (Exception ignored) {
455+
}
456+
return instanceId;
457+
}
377458
}

dspace-api/src/main/java/org/dspace/scripts/service/ProcessService.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,21 @@ void createLogBitstream(Context context, Process process)
256256
List<Process> findByStatusAndCreationTimeOlderThan(Context context, List<ProcessStatus> statuses, Instant date)
257257
throws SQLException;
258258

259+
/**
260+
* Find the processes with RUNNING or SCHEDULED status, from this currently running instance or from any other
261+
* instance but with expired heartbeat.
262+
* @param context The relevant DSpace context
263+
* @throws SQLException If something goes wrong
264+
*/
265+
List<Process> findRunningByInstanceIdOrExpiredHeartbeat(Context context) throws SQLException;
266+
267+
/**
268+
* Updates the heartbeat of all RUNNING and SCHEDULED Processes of the currently running instance.
269+
* @param context The relevant DSpace context
270+
* @throws SQLException If something goes wrong
271+
*/
272+
public void updateProcessesHeartbeat(Context context) throws SQLException;
273+
259274
/**
260275
* Returns a list of all Process objects in the database by the given user.
261276
*

0 commit comments

Comments
 (0)