[OPIK-4842] [BE] Refactor retention jobs to Quartz, extract estimation#5898
[OPIK-4842] [BE] Refactor retention jobs to Quartz, extract estimation#5898
Conversation
apps/opik-backend/src/main/java/com/comet/opik/domain/retention/RetentionEstimationService.java
Show resolved
Hide resolved
...ik-backend/src/main/java/com/comet/opik/api/resources/v1/jobs/RetentionSlidingWindowJob.java
Outdated
Show resolved
Hide resolved
apps/opik-backend/src/main/java/com/comet/opik/domain/retention/RetentionRuleDAO.java
Outdated
Show resolved
Hide resolved
Split the single RetentionPolicyJob (Managed pattern) into two independent Quartz jobs: - RetentionSlidingWindowJob: regular sliding-window cycle, runs every (24*60)/executionsPerDay minutes (default 30min), fraction-based workspace sharding - RetentionCatchUpJob: progressive historical deletion, runs on its own schedule (default 60min), separate distributed lock Both follow the established Quartz pattern (Job + InterruptableJob, @DisallowConcurrentExecution, bestEffortLock) and are registered via OpikGuiceyLifecycleEventListener. Benefits: - Separate locks: catch-up never blocks regular retention - Independent schedules: catch-up can run less frequently - Either can be disabled independently - Follows codebase conventions (TraceThreadsClosingJob pattern)
Move velocity estimation from the synchronous HTTP rule creation endpoint into a dedicated RetentionEstimationJob (3rd Quartz job): - RetentionEstimationJob: runs every 5 min (configurable), finds rules with catchUpDone=false and no velocity, estimates velocity + cursor for each, updates the rule in MySQL - RetentionRuleService.create(): no longer calls ClickHouse during rule creation. Saves rule with velocity=null, cursor=null, catchUpDone=false. The estimation job picks it up within minutes. - RetentionEstimationService: extracted from RetentionRuleServiceImpl, contains estimateVelocity, scoutFirstDataCursor, isTooManyRowsException This fixes the blocking HTTP thread concern raised in PR #5820 review: the scouting loop could make up to ~18 sequential ClickHouse queries for huge workspaces, now it runs in a background job instead.
apps/opik-backend/src/main/java/com/comet/opik/domain/TraceDAO.java
Outdated
Show resolved
Hide resolved
apps/opik-backend/src/main/java/com/comet/opik/domain/retention/RetentionRuleService.java
Show resolved
Hide resolved
apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/jobs/RetentionCatchUpJob.java
Outdated
Show resolved
Hide resolved
apps/opik-backend/src/main/java/com/comet/opik/domain/retention/RetentionEstimationService.java
Show resolved
Hide resolved
After rebase onto main, use the new bestEffortLock overload with holdUntilExpiry=true on all three retention jobs. The lock is held for the full interval (30min/5min/45min) so that with N instances, only one execution happens per interval, not N sequential ones.
c3b7074 to
e4d4859
Compare
.../opik-backend/src/main/java/com/comet/opik/api/resources/v1/jobs/RetentionEstimationJob.java
Outdated
Show resolved
Hide resolved
apps/opik-backend/src/main/java/com/comet/opik/domain/retention/RetentionRuleService.java
Show resolved
Hide resolved
…DEBUG - Remove catchUp.enabled check from rule creation so rules are always marked for catch-up when applyToPast=true, surviving temporary job disablement - Convert findUnestimatedCatchUpRules SQL to text block - Demote SpanDAO.estimateVelocityForRetention log to DEBUG - Demote TraceDAO.scoutFirstDayWithData log to DEBUG
apps/opik-backend/src/main/java/com/comet/opik/domain/retention/RetentionRuleDAO.java
Show resolved
Hide resolved
…nds, fix reactor thread blocking - Remove InterruptableJob, AtomicBoolean interrupted, and @DisallowConcurrentExecution from all 3 retention jobs. Concurrency is guarded by Redis lock (holdUntilExpiry), not Quartz. doJob() returns immediately via subscribe(), freeing the Quartz thread. - Remove unused lockTimeoutSeconds from RetentionConfig and CatchUpConfig. - Add subscribeOn(boundedElastic) to estimation job to avoid blocking a reactor thread (estimatePendingRules calls .block() on DAO chains internally). - Retention deletes and estimation are idempotent — incomplete work during shutdown is safely retried on the next cycle.
| @Override | ||
| public void doJob(JobExecutionContext context) { | ||
| // estimatePendingRules() calls .block() on DAO reactive chains internally, | ||
| // so we use subscribeOn(boundedElastic) to avoid blocking a reactor thread. | ||
| lockService.bestEffortLock( | ||
| RUN_LOCK, | ||
| Mono.fromRunnable(estimationService::estimatePendingRules) | ||
| .subscribeOn(Schedulers.boundedElastic()), | ||
| Mono.fromRunnable(() -> log.debug( | ||
| "Retention estimation: could not acquire lock, another instance is running")), | ||
| Duration.ofMinutes(config.getCatchUp().getEstimationIntervalMinutes()), | ||
| Duration.ZERO, |
There was a problem hiding this comment.
lockService.bestEffortLock(...).subscribe(...) is duplicated across retention jobs, should we extract a shared helper like AbstractRetentionJob or RetentionJobRunner.runWithLock(lock, workMono, interval, failLog, successLog) to centralize the lock/subscribe/logging boilerplate?
Finding type: Code Dedup and Conventions | Severity: 🟢 Low
Want Baz to fix this for you? Activate Fixer
Backend Tests - Integration Group 11 39 files 39 suites 3m 36s ⏱️ For more details on these errors, see this check. Results for commit 8d0198e. ♻️ This comment has been updated with latest results. |
apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/jobs/RetentionCatchUpJob.java
Outdated
Show resolved
Hide resolved
.../opik-backend/src/main/java/com/comet/opik/api/resources/v1/jobs/RetentionEstimationJob.java
Outdated
Show resolved
Hide resolved
...ik-backend/src/main/java/com/comet/opik/api/resources/v1/jobs/RetentionSlidingWindowJob.java
Outdated
Show resolved
Hide resolved
apps/opik-backend/src/main/java/com/comet/opik/domain/retention/RetentionRuleDAO.java
Outdated
Show resolved
Hide resolved
apps/opik-backend/src/main/java/com/comet/opik/domain/retention/RetentionEstimationService.java
Outdated
Show resolved
Hide resolved
apps/opik-backend/src/main/java/com/comet/opik/infrastructure/RetentionConfig.java
Outdated
Show resolved
Hide resolved
...backend/src/main/java/com/comet/opik/infrastructure/bi/OpikGuiceyLifecycleEventListener.java
Outdated
Show resolved
Hide resolved
apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/jobs/RetentionCatchUpJob.java
Outdated
Show resolved
Hide resolved
...ik-backend/src/main/java/com/comet/opik/api/resources/v1/jobs/RetentionSlidingWindowJob.java
Outdated
Show resolved
Hide resolved
apps/opik-backend/src/main/java/com/comet/opik/domain/retention/RetentionRuleDAO.java
Show resolved
Hide resolved
|
💡 suggestion | Observability None of the three new jobs emit any metrics. Given that these are deletion jobs with direct data impact, adding instrumentation would make operational monitoring significantly easier. Suggested additions:
The per-workspace 🤖 Review posted via /review-github-pr |
- Add @DisallowConcurrentExecution, InterruptableJob, .block() to all 3 jobs - Add LIMIT 10 to findUnestimatedCatchUpRules to bound work per cycle - Move isTooManyRowsException to RetentionUtils - Rename CatchUpConfig.getInterval() to getCatchUpInterval() for clarity - Expand idx_catch_up_pending to cover all equality + ORDER BY columns - Standardize lock-not-acquired logs to DEBUG across all jobs - Consolidate 3 retention setup methods into single setRetentionJobs()
apps/opik-backend/src/main/java/com/comet/opik/api/resources/v1/jobs/RetentionCatchUpJob.java
Outdated
Show resolved
Hide resolved
apps/opik-backend/src/main/java/com/comet/opik/domain/retention/RetentionEstimationService.java
Show resolved
Hide resolved
...ik-backend/src/main/java/com/comet/opik/api/resources/v1/jobs/RetentionSlidingWindowJob.java
Show resolved
Hide resolved
…bility - Add run counter and duration histogram to all 3 retention jobs - Add domain-specific counters to services (workspaces processed, rules processed/completed, rules estimated, velocity values) - Add lightweight pre-delete row counts (SELECT count) in SpanDAO/TraceDAO for observability — upper-bound ceiling with >99% precision, excludes expensive experiment_items exclusion subquery - Bump migration from 000061 to 000062 (collision with workspace rule project index migration on main)
📋 PR Linter Failed❌ Missing Section. The description is missing the |
|
Addressed in 7bfe150. Here's what was implemented: Job-level metrics (all 3 jobs):
Domain-level metrics:
On Skipped: Per-workspace tags on row counters (high-cardinality risk) and per-query duration histograms for estimation (would need deeper plumbing for marginal value). The |
…-delete ordering - FIFO ordering (ORDER BY created_at ASC) prevents a consistently failing rule from starving other pending rules - Document why counts run sequentially before deletes (not in parallel): metric must reflect what's about to be removed, cost is minimal via ClickHouse primary key index
…onale Sequential counts avoid overloading ClickHouse and the collected metrics help assess query cost over time.
...ik-backend/src/main/java/com/comet/opik/api/resources/v1/jobs/RetentionSlidingWindowJob.java
Show resolved
Hide resolved
apps/opik-backend/src/main/java/com/comet/opik/domain/retention/RetentionPolicyService.java
Show resolved
Hide resolved
apps/opik-backend/src/main/java/com/comet/opik/domain/retention/RetentionCatchUpService.java
Show resolved
Hide resolved
Without the SETTINGS clause in the SQL template, the log_comment placeholder from getSTWithLogComment was never injected into the query, making these counts invisible in ClickHouse query logs.
...sources/liquibase/db-app-state/migrations/000062_add_catch_up_columns_to_retention_rules.sql
Outdated
Show resolved
Hide resolved
…weight catch_up_velocity is always a range predicate (<, >=, BETWEEN) in finder queries, so MySQL's B-tree can't use columns after it for ORDER BY. The new index (catch_up_done, enabled, apply_to_past, catch_up_cursor) satisfies both filter and sort from a single index scan. Velocity filtering happens post-index with negligible cost on a small table.
Details
Refactors the retention job infrastructure from the Managed+Flux.interval pattern to proper Quartz jobs, and extracts velocity estimation from the HTTP request handler into a background job.
Three independent Quartz jobs:
executionsPerDay), fraction-based workspace shardingcatchUp.estimationIntervalMinutes). Fixes the blocking HTTP thread concern from PR [OPIK-4842] [BE] Catch-up job for apply-to-past retention rules #5820 reviewcatchUp.intervalMinutes)Key changes:
holdUntilExpiry=true, this prevents redundant runs across multiple instances@DisallowConcurrentExecution+InterruptableJobfor Quartz-level safety; Redis lock is the primary distributed guarddoJob()uses.block()with try/catch for proper error handling and graceful shutdown viaAtomicBoolean interruptedvelocity=null, cursor=null, catchUpDone=false. The estimation job picks it up within 5 minutesObservability (OpenTelemetry metrics):
runCounter(success/skipped_lock/error) andrunDurationhistogram on all 3 jobsworkspacesProcessedcounter,rowsToDeletecounter (by table)rulesProcessed(by tier: small/medium/large),rulesCompleted,rowsToDelete(by table)rulesEstimatedcounter,velocityValueshistogramSELECT count()before each delete batch for observability — upper-bound ceiling with >99% precision (excludes expensive experiment_items exclusion subquery to avoid join cost)Review feedback addressed:
setRetentionJobs()in lifecycle listenerfindUnestimatedCatchUpRuleslimited to 10 per tickisTooManyRowsExceptionmoved to RetentionUtilsgetCatchUpInterval()renamed to avoid shadowing parentgetInterval()Depends on: PR #5820 (catch-up job) — already merged to main.
Change checklist
Issues
AI-WATERMARK
AI-WATERMARK: yes
Testing
Commands run:
Results: 20 tests, 0 failures, 0 errors
Scenarios validated:
Documentation
Retention documentation will come in future task.