From f14b23e04703d256348909b7334272be60abde0a Mon Sep 17 00:00:00 2001
From: Hoxo <29162640+Hoxo@users.noreply.github.com>
Date: Sat, 27 May 2023 23:43:59 +0300
Subject: [PATCH 1/9] task queue draft
---
.../lzy/allocator/alloc/AllocateVmAction.java | 2 +-
.../allocator/alloc/DeleteSessionAction.java | 8 -
.../alloc/MountDynamicDiskAction.java | 2 +-
.../alloc/UnmountDynamicDiskAction.java | 2 +-
.../db/allocator/migrations/V8__task.sql | 21 ++
lzy/long-running/pom.xml | 5 +
.../lzy/longrunning/OperationRunnerBase.java | 13 +-
.../ai/lzy/longrunning/task/DaoTaskQueue.java | 100 ++++++++
.../java/ai/lzy/longrunning/task/Task.java | 74 ++++++
.../lzy/longrunning/task/TaskAwareAction.java | 58 +++++
.../ai/lzy/longrunning/task/TaskExecutor.java | 63 +++++
.../ai/lzy/longrunning/task/TaskQueue.java | 19 ++
.../ai/lzy/longrunning/task/TaskResolver.java | 5 +
.../ai/lzy/longrunning/task/dao/TaskDao.java | 29 +++
.../lzy/longrunning/task/dao/TaskDaoImpl.java | 227 ++++++++++++++++++
.../ai/lzy/longrunning/TaskDaoImplTest.java | 139 +++++++++++
.../test/resources/db/migrations/V1__task.sql | 18 ++
.../src/test/resources/log4j2.yaml | 28 +++
.../lzy/model/db/test/DatabaseTestUtils.java | 23 ++
19 files changed, 823 insertions(+), 13 deletions(-)
create mode 100644 lzy/allocator/src/main/resources/db/allocator/migrations/V8__task.sql
create mode 100644 lzy/long-running/src/main/java/ai/lzy/longrunning/task/DaoTaskQueue.java
create mode 100644 lzy/long-running/src/main/java/ai/lzy/longrunning/task/Task.java
create mode 100644 lzy/long-running/src/main/java/ai/lzy/longrunning/task/TaskAwareAction.java
create mode 100644 lzy/long-running/src/main/java/ai/lzy/longrunning/task/TaskExecutor.java
create mode 100644 lzy/long-running/src/main/java/ai/lzy/longrunning/task/TaskQueue.java
create mode 100644 lzy/long-running/src/main/java/ai/lzy/longrunning/task/TaskResolver.java
create mode 100644 lzy/long-running/src/main/java/ai/lzy/longrunning/task/dao/TaskDao.java
create mode 100644 lzy/long-running/src/main/java/ai/lzy/longrunning/task/dao/TaskDaoImpl.java
create mode 100644 lzy/long-running/src/test/java/ai/lzy/longrunning/TaskDaoImplTest.java
create mode 100644 lzy/long-running/src/test/resources/db/migrations/V1__task.sql
create mode 100644 lzy/long-running/src/test/resources/log4j2.yaml
diff --git a/lzy/allocator/src/main/java/ai/lzy/allocator/alloc/AllocateVmAction.java b/lzy/allocator/src/main/java/ai/lzy/allocator/alloc/AllocateVmAction.java
index 0f1f0befc2..bb6625d4e8 100644
--- a/lzy/allocator/src/main/java/ai/lzy/allocator/alloc/AllocateVmAction.java
+++ b/lzy/allocator/src/main/java/ai/lzy/allocator/alloc/AllocateVmAction.java
@@ -67,7 +67,7 @@ protected void notifyExpired() {
}
@Override
- protected void notifyFinished() {
+ protected void notifyFinished(@Nullable Throwable t) {
allocationContext.metrics().runningAllocations.labels(vm.poolLabel()).dec();
if (deleteVmAction != null) {
diff --git a/lzy/allocator/src/main/java/ai/lzy/allocator/alloc/DeleteSessionAction.java b/lzy/allocator/src/main/java/ai/lzy/allocator/alloc/DeleteSessionAction.java
index 1e7ed69067..e77d8d09d4 100644
--- a/lzy/allocator/src/main/java/ai/lzy/allocator/alloc/DeleteSessionAction.java
+++ b/lzy/allocator/src/main/java/ai/lzy/allocator/alloc/DeleteSessionAction.java
@@ -46,14 +46,6 @@ protected boolean isInjectedError(Error e) {
return e instanceof InjectedFailures.TerminateException;
}
- @Override
- protected void notifyExpired() {
- }
-
- @Override
- protected void notifyFinished() {
- }
-
@Override
protected void onExpired(TransactionHandle tx) {
throw new RuntimeException("Unexpected, sessionId: '%s', op: '%s'".formatted(sessionId, id()));
diff --git a/lzy/allocator/src/main/java/ai/lzy/allocator/alloc/MountDynamicDiskAction.java b/lzy/allocator/src/main/java/ai/lzy/allocator/alloc/MountDynamicDiskAction.java
index 97378fd0bd..f097e53c79 100644
--- a/lzy/allocator/src/main/java/ai/lzy/allocator/alloc/MountDynamicDiskAction.java
+++ b/lzy/allocator/src/main/java/ai/lzy/allocator/alloc/MountDynamicDiskAction.java
@@ -68,7 +68,7 @@ protected boolean isInjectedError(Error e) {
}
@Override
- protected void notifyFinished() {
+ protected void notifyFinished(@Nullable Throwable t) {
if (unmountAction != null) {
log().error("{} Failed to mount dynamic disk", logPrefix());
try {
diff --git a/lzy/allocator/src/main/java/ai/lzy/allocator/alloc/UnmountDynamicDiskAction.java b/lzy/allocator/src/main/java/ai/lzy/allocator/alloc/UnmountDynamicDiskAction.java
index 8294a2aec9..84a8d5c064 100644
--- a/lzy/allocator/src/main/java/ai/lzy/allocator/alloc/UnmountDynamicDiskAction.java
+++ b/lzy/allocator/src/main/java/ai/lzy/allocator/alloc/UnmountDynamicDiskAction.java
@@ -59,7 +59,7 @@ public static String description(@Nullable Vm vm, DynamicMount mount) {
}
@Override
- protected void notifyFinished() {
+ protected void notifyFinished(@Nullable Throwable t) {
log().info("{} Finished unmounting volume {}", logPrefix(), dynamicMount.id());
}
diff --git a/lzy/allocator/src/main/resources/db/allocator/migrations/V8__task.sql b/lzy/allocator/src/main/resources/db/allocator/migrations/V8__task.sql
new file mode 100644
index 0000000000..7a5e930a57
--- /dev/null
+++ b/lzy/allocator/src/main/resources/db/allocator/migrations/V8__task.sql
@@ -0,0 +1,21 @@
+CREATE TYPE task_status AS ENUM ('PENDING', 'RUNNING', 'FAILED', 'FINISHED');
+
+CREATE TYPE task_type AS ENUM ('UNMOUNT', 'MOUNT');
+
+CREATE TABLE IF NOT EXISTS task(
+ id BIGSERIAL NOT NULL,
+ name TEXT NOT NULL,
+ entity_id TEXT NOT NULL,
+ type task_type NOT NULL,
+ status task_status NOT NULL,
+ created_at TIMESTAMP NOT NULL,
+ updated_at TIMESTAMP NOT NULL,
+ metadata JSONB NOT NULL,
+ operation_id TEXT,
+ worker_id TEXT,
+ lease_till TIMESTAMP,
+ PRIMARY KEY (id),
+ FOREIGN KEY (operation_id) REFERENCES operation(id)
+);
+
+CREATE INDEX IF NOT EXISTS task_status_entity_id_idx ON task(status, entity_id, id);
diff --git a/lzy/long-running/pom.xml b/lzy/long-running/pom.xml
index 7930f9992d..f295c97eb7 100644
--- a/lzy/long-running/pom.xml
+++ b/lzy/long-running/pom.xml
@@ -45,6 +45,11 @@
junit
test
+
+ io.zonky.test
+ embedded-postgres
+ test
+
diff --git a/lzy/long-running/src/main/java/ai/lzy/longrunning/OperationRunnerBase.java b/lzy/long-running/src/main/java/ai/lzy/longrunning/OperationRunnerBase.java
index 434392b044..15d4e70543 100644
--- a/lzy/long-running/src/main/java/ai/lzy/longrunning/OperationRunnerBase.java
+++ b/lzy/long-running/src/main/java/ai/lzy/longrunning/OperationRunnerBase.java
@@ -55,6 +55,7 @@ protected final void execute() {
}
for (var step : steps()) {
+ beforeStep();
final var stepResult = step.get();
switch (stepResult.code()) {
case ALREADY_DONE -> { }
@@ -83,7 +84,7 @@ protected final void execute() {
}
}
} catch (Throwable e) {
- notifyFinished();
+ notifyFinished(e);
if (e instanceof Error err && isInjectedError(err)) {
log.error("{} Terminated by InjectedFailure exception: {}", logPrefix, e.getMessage());
} else {
@@ -98,6 +99,10 @@ protected final void execute() {
}
}
+ protected void beforeStep() {
+
+ }
+
protected Map prepareLogContext() {
var ctx = super.prepareLogContext();
ctx.put(LogContextKey.OPERATION_ID, id);
@@ -272,7 +277,11 @@ protected void onNotFound(@Nullable TransactionHandle tx) throws SQLException {
protected void onCompletedOutside(Operation op, @Nullable TransactionHandle tx) throws SQLException {
}
- protected void notifyFinished() {
+ private void notifyFinished() {
+ notifyFinished(null);
+ }
+
+ protected void notifyFinished(@Nullable Throwable t) {
}
protected final void failOperation(Status status, @Nullable TransactionHandle tx) throws SQLException {
diff --git a/lzy/long-running/src/main/java/ai/lzy/longrunning/task/DaoTaskQueue.java b/lzy/long-running/src/main/java/ai/lzy/longrunning/task/DaoTaskQueue.java
new file mode 100644
index 0000000000..0183170792
--- /dev/null
+++ b/lzy/long-running/src/main/java/ai/lzy/longrunning/task/DaoTaskQueue.java
@@ -0,0 +1,100 @@
+package ai.lzy.longrunning.task;
+
+import ai.lzy.longrunning.task.dao.TaskDao;
+import com.google.common.collect.Queues;
+import jakarta.annotation.Nullable;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Queue;
+
+import static ai.lzy.model.db.DbHelper.withRetries;
+
+public class DaoTaskQueue implements TaskQueue {
+
+ private static final Logger LOG = LogManager.getLogger(TaskExecutor.class);
+
+ private final TaskDao taskDao;
+ private final int maxQueueSize;
+ private final Duration leaseTime;
+ private final String instanceId;
+ private final Queue queue;
+
+ public DaoTaskQueue(TaskDao taskDao, int maxQueueSize, Duration leaseTime, String instanceId) {
+ this.taskDao = taskDao;
+ this.maxQueueSize = maxQueueSize;
+ this.leaseTime = leaseTime;
+ this.instanceId = instanceId;
+ this.queue = Queues.newConcurrentLinkedQueue();
+ }
+
+ private void loadNextBatch() {
+ var toLoad = capacity();
+ if (toLoad > 0) {
+ var tasks = loadPendingTasks(toLoad);
+ queue.addAll(tasks);
+ }
+ }
+
+ private int capacity() {
+ return maxQueueSize - queue.size();
+ }
+
+ private List loadPendingTasks(int toLoad) {
+ try {
+ return withRetries(LOG, () -> taskDao.lockPendingBatch(instanceId, leaseTime, toLoad, null));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void add(Task task) {
+ try {
+ withRetries(LOG, () -> taskDao.insert(task, null));
+ } catch (Exception e) {
+ throw new RuntimeException();
+ }
+ }
+
+ @Nullable
+ @Override
+ public Task pollNext() {
+ var next = queue.poll();
+ if (next == null) {
+ loadNextBatch();
+ next = queue.poll();
+ }
+ return next;
+ }
+
+ public List pollRemaining() {
+ if (!queue.isEmpty()) {
+ var result = queue.stream().toList();
+ queue.clear();
+ return result;
+ }
+ //don't load to queue, just return loaded tasks
+ return loadPendingTasks(capacity());
+ }
+
+ @Override
+ public void update(long id, Task.Update update) {
+ try {
+ withRetries(LOG, () -> taskDao.update(id, update, null));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void updateLease(Task task, Duration duration) {
+ try {
+ withRetries(LOG, () -> taskDao.updateLease(task.id(), duration, null));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/lzy/long-running/src/main/java/ai/lzy/longrunning/task/Task.java b/lzy/long-running/src/main/java/ai/lzy/longrunning/task/Task.java
new file mode 100644
index 0000000000..5b7d27cbc7
--- /dev/null
+++ b/lzy/long-running/src/main/java/ai/lzy/longrunning/task/Task.java
@@ -0,0 +1,74 @@
+package ai.lzy.longrunning.task;
+
+import jakarta.annotation.Nullable;
+
+import java.time.Instant;
+import java.util.Map;
+
+public record Task(
+ long id,
+ String name,
+ String entityId,
+ String type,
+ Status status,
+ Instant createdAt,
+ Instant updatedAt,
+ Map metadata,
+ @Nullable
+ String operationId,
+ @Nullable
+ String workerId,
+ @Nullable
+ Instant leaseTill
+) {
+ public static Task createPending(String name, String entityId, String type, Map metadata) {
+ return new Task(-1, name, entityId, type, Status.PENDING, Instant.now(), Instant.now(),
+ metadata, null, null, null);
+ }
+
+ public enum Status {
+ PENDING,
+ RUNNING,
+ FAILED,
+ FINISHED,
+ }
+
+ public record Update(
+ @Nullable Status status,
+ @Nullable Map metadata,
+ @Nullable String operationId
+ ) {
+ public boolean isEmpty() {
+ return status == null && metadata == null && operationId == null;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private Status status;
+ private Map metadata;
+ private String operationId;
+
+ public Builder status(Status status) {
+ this.status = status;
+ return this;
+ }
+
+ public Builder metadata(Map metadata) {
+ this.metadata = metadata;
+ return this;
+ }
+
+ public Builder operationId(String operationId) {
+ this.operationId = operationId;
+ return this;
+ }
+
+ public Update build() {
+ return new Update(status, metadata, operationId);
+ }
+ }
+ }
+}
diff --git a/lzy/long-running/src/main/java/ai/lzy/longrunning/task/TaskAwareAction.java b/lzy/long-running/src/main/java/ai/lzy/longrunning/task/TaskAwareAction.java
new file mode 100644
index 0000000000..6c9d97fdd6
--- /dev/null
+++ b/lzy/long-running/src/main/java/ai/lzy/longrunning/task/TaskAwareAction.java
@@ -0,0 +1,58 @@
+package ai.lzy.longrunning.task;
+
+import ai.lzy.longrunning.OperationRunnerBase;
+import ai.lzy.longrunning.OperationsExecutor;
+import ai.lzy.longrunning.dao.OperationDao;
+import ai.lzy.model.db.Storage;
+import jakarta.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Map;
+
+public abstract class TaskAwareAction extends OperationRunnerBase {
+ private final Task task;
+ private final TaskQueue queue;
+ private final Duration leaseDuration;
+
+ public TaskAwareAction(Task task, TaskQueue queue, Duration leaseDuration, String opId, String desc,
+ Storage storage, OperationDao operationsDao, OperationsExecutor executor)
+ {
+ super(opId, desc, storage, operationsDao, executor);
+ this.task = task;
+ this.queue = queue;
+ this.leaseDuration = leaseDuration;
+ }
+
+ @Override
+ protected Map prepareLogContext() {
+ var ctx = super.prepareLogContext();
+ ctx.put("task_id", String.valueOf(task.id()));
+ ctx.put("task_type", task.type());
+ ctx.put("task_name", task.name());
+ ctx.put("task_entity_id", task.entityId());
+ return ctx;
+ }
+
+ protected Task task() {
+ return task;
+ }
+
+ @Override
+ protected void beforeStep() {
+ super.beforeStep();
+ queue.updateLease(task, leaseDuration);
+ }
+
+ @Override
+ protected void notifyFinished(@Nullable Throwable t) {
+ super.notifyFinished(t);
+
+ var builder = Task.Update.builder();
+ if (t != null) {
+ builder.status(Task.Status.FAILED);
+ } else {
+ builder.status(Task.Status.FINISHED);
+ }
+ queue.update(task.id(), builder.build());
+ }
+}
diff --git a/lzy/long-running/src/main/java/ai/lzy/longrunning/task/TaskExecutor.java b/lzy/long-running/src/main/java/ai/lzy/longrunning/task/TaskExecutor.java
new file mode 100644
index 0000000000..f0f07eff7a
--- /dev/null
+++ b/lzy/long-running/src/main/java/ai/lzy/longrunning/task/TaskExecutor.java
@@ -0,0 +1,63 @@
+package ai.lzy.longrunning.task;
+
+import ai.lzy.longrunning.OperationsExecutor;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.time.Duration;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class TaskExecutor {
+
+ private static final Logger LOG = LogManager.getLogger(TaskExecutor.class);
+
+ private final TaskQueue taskQueue;
+ private final OperationsExecutor operationsExecutor;
+ private final TaskResolver resolver;
+ private final ScheduledExecutorService scheduler;
+ private final Duration initialDelay;
+ private final Duration executionDelay;
+
+ private volatile boolean started = false;
+ private volatile boolean disabled = false;
+
+ public TaskExecutor(TaskQueue taskQueue, OperationsExecutor operationsExecutor, TaskResolver resolver,
+ Duration initialDelay, Duration executionDelay)
+ {
+ this.taskQueue = taskQueue;
+ this.operationsExecutor = operationsExecutor;
+ this.resolver = resolver;
+ this.initialDelay = initialDelay;
+ this.executionDelay = executionDelay;
+ this.scheduler = Executors.newSingleThreadScheduledExecutor();
+
+ }
+
+ //todo support worker failures - retry locked tasks after restart
+ public void start() {
+ if (started) {
+ throw new IllegalStateException("Task executor has already started!");
+ }
+ started = true;
+ scheduler.scheduleWithFixedDelay(() -> {
+ try {
+ for (Task task : taskQueue.pollRemaining()) {
+ if (disabled) {
+ return;
+ }
+ var resolvedAction = resolver.resolve(task);
+ operationsExecutor.startNew(resolvedAction);
+ }
+ } catch (Exception e) {
+ LOG.error("Got exception while scheduling task", e);
+ }
+ }, initialDelay.toMillis(), executionDelay.toMillis(), TimeUnit.MILLISECONDS);
+ }
+
+ public void shutdown() {
+ disabled = true;
+ scheduler.shutdown();
+ }
+}
diff --git a/lzy/long-running/src/main/java/ai/lzy/longrunning/task/TaskQueue.java b/lzy/long-running/src/main/java/ai/lzy/longrunning/task/TaskQueue.java
new file mode 100644
index 0000000000..f76e58d592
--- /dev/null
+++ b/lzy/long-running/src/main/java/ai/lzy/longrunning/task/TaskQueue.java
@@ -0,0 +1,19 @@
+package ai.lzy.longrunning.task;
+
+import jakarta.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.List;
+
+public interface TaskQueue {
+ void add(Task task);
+
+ @Nullable
+ Task pollNext();
+
+ List pollRemaining();
+
+ void update(long id, Task.Update update);
+
+ void updateLease(Task task, Duration duration);
+}
diff --git a/lzy/long-running/src/main/java/ai/lzy/longrunning/task/TaskResolver.java b/lzy/long-running/src/main/java/ai/lzy/longrunning/task/TaskResolver.java
new file mode 100644
index 0000000000..8193886c51
--- /dev/null
+++ b/lzy/long-running/src/main/java/ai/lzy/longrunning/task/TaskResolver.java
@@ -0,0 +1,5 @@
+package ai.lzy.longrunning.task;
+
+public interface TaskResolver {
+ TaskAwareAction resolve(Task task);
+}
diff --git a/lzy/long-running/src/main/java/ai/lzy/longrunning/task/dao/TaskDao.java b/lzy/long-running/src/main/java/ai/lzy/longrunning/task/dao/TaskDao.java
new file mode 100644
index 0000000000..31a92ee5d6
--- /dev/null
+++ b/lzy/long-running/src/main/java/ai/lzy/longrunning/task/dao/TaskDao.java
@@ -0,0 +1,29 @@
+package ai.lzy.longrunning.task.dao;
+
+import ai.lzy.longrunning.task.Task;
+import ai.lzy.model.db.TransactionHandle;
+import jakarta.annotation.Nullable;
+
+import java.sql.SQLException;
+import java.time.Duration;
+import java.util.List;
+
+public interface TaskDao {
+
+ @Nullable
+ Task get(long id, @Nullable TransactionHandle tx) throws SQLException;
+
+ @Nullable
+ Task update(long id, Task.Update update, @Nullable TransactionHandle tx) throws SQLException;
+
+ @Nullable
+ Task updateLease(long id, Duration duration, @Nullable TransactionHandle tx) throws SQLException;
+
+ @Nullable
+ Task insert(Task task, @Nullable TransactionHandle tx) throws SQLException;
+
+ void delete(long id, @Nullable TransactionHandle tx) throws SQLException;
+
+ List lockPendingBatch(String ownerId, Duration leaseTime, int batchSize, @Nullable TransactionHandle tx)
+ throws SQLException;
+}
diff --git a/lzy/long-running/src/main/java/ai/lzy/longrunning/task/dao/TaskDaoImpl.java b/lzy/long-running/src/main/java/ai/lzy/longrunning/task/dao/TaskDaoImpl.java
new file mode 100644
index 0000000000..53065a517c
--- /dev/null
+++ b/lzy/long-running/src/main/java/ai/lzy/longrunning/task/dao/TaskDaoImpl.java
@@ -0,0 +1,227 @@
+package ai.lzy.longrunning.task.dao;
+
+import ai.lzy.longrunning.task.Task;
+import ai.lzy.model.db.DbOperation;
+import ai.lzy.model.db.Storage;
+import ai.lzy.model.db.TransactionHandle;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import jakarta.annotation.Nullable;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class TaskDaoImpl implements TaskDao {
+
+ private static final String FIELDS = "id, name, entity_id, type, status, created_at, updated_at, metadata," +
+ " operation_id, worker_id, lease_till";
+ public static final String SELECT_QUERY = "SELECT %s FROM task WHERE id = ?".formatted(FIELDS);
+ public static final String INSERT_QUERY = """
+ INSERT INTO task (name, entity_id, type, status, created_at, updated_at, metadata, operation_id)
+ VALUES (?, ?, cast(? as task_type), cast(? as task_status), now(), now(), cast(? as jsonb), ?)
+ RETURNING %s;
+ """.formatted(FIELDS);
+
+ //in first nested request we gather all tasks that either locked or free.
+ //in second nested request we filter result of previous request to get only free tasks
+ // and select only specific amount.
+ private static final String LOCK_PENDING_BATCH_QUERY = """
+ UPDATE task
+ SET status = 'RUNNING', worker_id = ?, updated_at = now(), lease_till = now() + cast(? as interval)
+ WHERE id IN (
+ SELECT id
+ FROM task
+ WHERE id IN (
+ SELECT DISTINCT ON (entity_id) id
+ FROM task
+ WHERE status IN ('PENDING', 'RUNNING')
+ ORDER BY entity_id, id
+ ) AND status = 'PENDING'
+ LIMIT ?
+ )
+ RETURNING %s;
+ """.formatted(FIELDS);
+ public static final String DELETE_QUERY = "DELETE FROM task WHERE id = ?";
+ public static final String UPDATE_LEASE_QUERY = """
+ UPDATE task
+ SET lease_till = now() + cast(? as interval)
+ WHERE id = ?
+ RETURNING %s
+ """.formatted(FIELDS);
+ private static final TypeReference