Skip to content

Commit 57eea63

Browse files
fdelbrayelleFrançois Delbrayelle
andauthored
feat(core): add separate purge options for execution and trigger logs in PurgeLogs (#14922)
* feat(core): add separate purge options for execution and trigger logs in PurgeLogs Add `purgeExecutionLogs` and `purgeTriggerLogs` boolean flags to PurgeLogs task, allowing users to selectively purge only execution logs (trigger_id IS NULL) or only trigger logs (trigger_id IS NOT NULL). Both default to true for backward compatibility. Output now includes `executionLogsCount` and `triggerLogsCount` alongside the existing `count` total. * fix(core): address review feedback on PurgeLogs purge options - Remove new deleteByQuery overload, add params to existing method with a default method for backward compatibility - Use execution_id (indexed) instead of trigger_id (not indexed) for filtering: execution logs have non-null execution_id, non-execution logs (triggers, flow-level) have null execution_id - Rename purgeTriggerLogs to purgeNonExecutionLogs for accuracy - Fix schema descriptions to be user-friendly instead of technical * fix(core): simplify ExecutionLogService purge method and use record return type Replace int[] with a PurgeResult record and simplify the if/else chain to a single deleteByQuery call passing both boolean flags directly. --------- Co-authored-by: François Delbrayelle <fdelbrayelle@kestra.io>
1 parent 5607129 commit 57eea63

File tree

7 files changed

+147
-9
lines changed

7 files changed

+147
-9
lines changed

core/src/main/java/io/kestra/core/repositories/LogRepositoryInterface.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,5 +98,9 @@ Flux<LogEntry> findAsync(
9898

9999
void deleteByFilters(String tenantId, List<QueryFilter> filters);
100100

101-
int deleteByQuery(String tenantId, String namespace, String flowId, String executionId, List<Level> logLevels, ZonedDateTime startDate, ZonedDateTime endDate);
101+
default int deleteByQuery(String tenantId, String namespace, String flowId, String executionId, List<Level> logLevels, ZonedDateTime startDate, ZonedDateTime endDate) {
102+
return deleteByQuery(tenantId, namespace, flowId, executionId, logLevels, startDate, endDate, true, true);
103+
}
104+
105+
int deleteByQuery(String tenantId, String namespace, String flowId, String executionId, List<Level> logLevels, ZonedDateTime startDate, ZonedDateTime endDate, boolean purgeExecutionLogs, boolean purgeNonExecutionLogs);
102106
}

core/src/main/java/io/kestra/core/services/ExecutionLogService.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,10 @@ public ExecutionLogService(LogRepositoryInterface logRepository) {
2828
this.logRepository = logRepository;
2929
}
3030

31+
public record PurgeResult(int executionLogsDeleted, int nonExecutionLogsDeleted) {}
32+
3133
/**
32-
* Purges log entries matching the given criteria.
34+
* Purges log entries matching the given criteria, with separate control over execution and non-execution logs.
3335
*
3436
* @param tenantId the tenant identifier
3537
* @param namespace the namespace of the flow
@@ -40,8 +42,18 @@ public ExecutionLogService(LogRepositoryInterface logRepository) {
4042
* @param endDate the end of the date range.
4143
* @return the number of log entries deleted
4244
*/
43-
public int purge(String tenantId, String namespace, String flowId, String executionId, List<Level> logLevels, ZonedDateTime startDate, ZonedDateTime endDate) {
44-
return logRepository.deleteByQuery(tenantId, namespace, flowId, executionId, logLevels, startDate, endDate);
45+
public PurgeResult purge(String tenantId, String namespace, String flowId, String executionId, List<Level> logLevels, ZonedDateTime startDate, ZonedDateTime endDate, boolean purgeExecutionLogs, boolean purgeNonExecutionLogs) {
46+
if (!purgeExecutionLogs && !purgeNonExecutionLogs) {
47+
return new PurgeResult(0, 0);
48+
}
49+
50+
int deleted = logRepository.deleteByQuery(tenantId, namespace, flowId, executionId, logLevels, startDate, endDate, purgeExecutionLogs, purgeNonExecutionLogs);
51+
52+
// When both types are purged in a single call, we can't distinguish the individual counts
53+
return new PurgeResult(
54+
purgeExecutionLogs ? deleted : 0,
55+
!purgeExecutionLogs && purgeNonExecutionLogs ? deleted : 0
56+
);
4557
}
4658

4759

core/src/main/java/io/kestra/plugin/core/log/PurgeLogs.java

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import io.kestra.core.services.ExecutionLogService;
1111
import io.swagger.v3.oas.annotations.media.Schema;
1212
import jakarta.validation.constraints.NotNull;
13+
import lombok.Builder;
1314
import lombok.EqualsAndHashCode;
1415
import lombok.Getter;
1516
import lombok.NoArgsConstructor;
@@ -104,6 +105,20 @@ public class PurgeLogs extends Task implements RunnableTask<PurgeLogs.Output> {
104105
@NotNull
105106
private Property<String> endDate;
106107

108+
@Schema(
109+
title = "Whether to purge execution logs",
110+
description = "If set to `true`, logs attached to an execution will be purged. Default is `true`."
111+
)
112+
@Builder.Default
113+
private Property<Boolean> purgeExecutionLogs = Property.ofValue(true);
114+
115+
@Schema(
116+
title = "Whether to purge non-execution logs",
117+
description = "If set to `true`, logs not attached to an execution (e.g. trigger logs) will be purged. Default is `true`."
118+
)
119+
@Builder.Default
120+
private Property<Boolean> purgeNonExecutionLogs = Property.ofValue(true);
121+
107122
@Override
108123
public Output run(RunContext runContext) throws Exception {
109124
ExecutionLogService logService = ((DefaultRunContext)runContext).getApplicationContext().getBean(ExecutionLogService.class);
@@ -118,26 +133,45 @@ public Output run(RunContext runContext) throws Exception {
118133

119134
var logLevelsRendered = runContext.render(this.logLevels).asList(Level.class);
120135
var renderedDate = runContext.render(startDate).as(String.class).orElse(null);
121-
int deleted = logService.purge(
136+
boolean execLogs = runContext.render(purgeExecutionLogs).as(Boolean.class).orElse(true);
137+
boolean nonExecLogs = runContext.render(purgeNonExecutionLogs).as(Boolean.class).orElse(true);
138+
139+
var purgeResult = logService.purge(
122140
flowInfo.tenantId(),
123141
runContext.render(namespace).as(String.class).orElse(null),
124142
runContext.render(flowId).as(String.class).orElse(null),
125143
runContext.render(executionId).as(String.class).orElse(null),
126144
logLevelsRendered.isEmpty() ? null : logLevelsRendered,
127145
renderedDate != null ? ZonedDateTime.parse(renderedDate) : null,
128-
ZonedDateTime.parse(runContext.render(endDate).as(String.class).orElseThrow())
146+
ZonedDateTime.parse(runContext.render(endDate).as(String.class).orElseThrow()),
147+
execLogs,
148+
nonExecLogs
129149
);
130150

131-
return Output.builder().count(deleted).build();
151+
return Output.builder()
152+
.count(purgeResult.executionLogsDeleted() + purgeResult.nonExecutionLogsDeleted())
153+
.executionLogsCount(purgeResult.executionLogsDeleted())
154+
.nonExecutionLogsCount(purgeResult.nonExecutionLogsDeleted())
155+
.build();
132156
}
133157

134158

135159
@SuperBuilder(toBuilder = true)
136160
@Getter
137161
public static class Output implements io.kestra.core.models.tasks.Output {
138162
@Schema(
139-
title = "The count of deleted logs"
163+
title = "The total count of deleted logs"
140164
)
141165
private int count;
166+
167+
@Schema(
168+
title = "The count of deleted execution logs"
169+
)
170+
private int executionLogsCount;
171+
172+
@Schema(
173+
title = "The count of deleted non-execution logs"
174+
)
175+
private int nonExecutionLogsCount;
142176
}
143177
}

core/src/test/java/io/kestra/plugin/core/log/PurgeLogsTest.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,70 @@ void run_with_no_arguments() throws Exception {
5252
assertThat((int) execution.getTaskRunList().getFirst().getOutputs().get("count")).isPositive();
5353
}
5454

55+
@Test
56+
@LoadFlows("flows/valids/purge_logs_execution_only.yaml")
57+
void run_purge_execution_logs_only() throws Exception {
58+
// create an execution log (with executionId)
59+
logRepository.save(LogEntry.builder()
60+
.namespace("namespace")
61+
.flowId("flowId")
62+
.executionId("exec-123")
63+
.tenantId(MAIN_TENANT)
64+
.timestamp(Instant.now())
65+
.level(Level.INFO)
66+
.message("Execution log")
67+
.build());
68+
69+
// create a non-execution log (without executionId)
70+
logRepository.save(LogEntry.builder()
71+
.namespace("namespace")
72+
.flowId("flowId")
73+
.tenantId(MAIN_TENANT)
74+
.timestamp(Instant.now())
75+
.level(Level.INFO)
76+
.message("Non-execution log")
77+
.build());
78+
79+
Execution execution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "purge_logs_execution_only");
80+
81+
assertTrue(execution.getState().isSuccess());
82+
var outputs = execution.getTaskRunList().getFirst().getOutputs();
83+
assertThat((int) outputs.get("executionLogsCount")).isPositive();
84+
assertThat((int) outputs.get("nonExecutionLogsCount")).isZero();
85+
}
86+
87+
@Test
88+
@LoadFlows("flows/valids/purge_logs_trigger_only.yaml")
89+
void run_purge_non_execution_logs_only() throws Exception {
90+
// create an execution log (with executionId)
91+
logRepository.save(LogEntry.builder()
92+
.namespace("namespace")
93+
.flowId("flowId")
94+
.executionId("exec-456")
95+
.tenantId(MAIN_TENANT)
96+
.timestamp(Instant.now())
97+
.level(Level.INFO)
98+
.message("Execution log")
99+
.build());
100+
101+
// create a non-execution log (without executionId)
102+
logRepository.save(LogEntry.builder()
103+
.namespace("namespace")
104+
.flowId("flowId")
105+
.tenantId(MAIN_TENANT)
106+
.timestamp(Instant.now())
107+
.level(Level.INFO)
108+
.message("Non-execution log")
109+
.build());
110+
111+
Execution execution = runnerUtils.runOne(MAIN_TENANT, "io.kestra.tests", "purge_logs_trigger_only");
112+
113+
assertTrue(execution.getState().isSuccess());
114+
var outputs = execution.getTaskRunList().getFirst().getOutputs();
115+
assertThat((int) outputs.get("executionLogsCount")).isZero();
116+
assertThat((int) outputs.get("nonExecutionLogsCount")).isPositive();
117+
}
118+
55119
@org.junit.jupiter.api.parallel.Execution(ExecutionMode.SAME_THREAD)
56120
@ParameterizedTest
57121
@MethodSource("buildArguments")
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
id: purge_logs_execution_only
2+
namespace: io.kestra.tests
3+
4+
tasks:
5+
- id: purge_logs
6+
type: io.kestra.plugin.core.log.PurgeLogs
7+
endDate: "{{ now() | dateAdd(2, 'HOURS') }}"
8+
purgeExecutionLogs: true
9+
purgeNonExecutionLogs: false
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
id: purge_logs_trigger_only
2+
namespace: io.kestra.tests
3+
4+
tasks:
5+
- id: purge_logs
6+
type: io.kestra.plugin.core.log.PurgeLogs
7+
endDate: "{{ now() | dateAdd(2, 'HOURS') }}"
8+
purgeExecutionLogs: false
9+
purgeNonExecutionLogs: true

jdbc/src/main/java/io/kestra/jdbc/repository/AbstractJdbcLogRepository.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,7 @@ public void deleteByQuery(String tenantId, String namespace, String flowId, Stri
327327
}
328328

329329
@Override
330-
public int deleteByQuery(String tenantId, String namespace, String flowId, String executionId, List<Level> logLevels, ZonedDateTime startDate, ZonedDateTime endDate) {
330+
public int deleteByQuery(String tenantId, String namespace, String flowId, String executionId, List<Level> logLevels, ZonedDateTime startDate, ZonedDateTime endDate, boolean purgeExecutionLogs, boolean purgeNonExecutionLogs) {
331331
return this.jdbcRepository
332332
.getDslContextWrapper()
333333
.transactionResult(configuration -> {
@@ -358,6 +358,12 @@ public int deleteByQuery(String tenantId, String namespace, String flowId, Strin
358358
delete = delete.and(levelsCondition(logLevels));
359359
}
360360

361+
if (purgeExecutionLogs && !purgeNonExecutionLogs) {
362+
delete = delete.and(field("execution_id").isNotNull());
363+
} else if (purgeNonExecutionLogs && !purgeExecutionLogs) {
364+
delete = delete.and(field("execution_id").isNull());
365+
}
366+
361367
return delete.execute();
362368
});
363369
}

0 commit comments

Comments
 (0)