Skip to content

Commit 7414d64

Browse files
authored
Merge pull request #17 from Altinity/1-ice-rest-catalog-automate-catalog-maintenance
Added quartz scheduler to schedule maintenance job.
2 parents 85940bb + 76eb25b commit 7414d64

File tree

5 files changed

+178
-0
lines changed

5 files changed

+178
-0
lines changed

examples/scratch/.ice-rest-catalog.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@ s3.access-key-id: "miniouser"
55
s3.secret-access-key: "miniopassword"
66
ice.s3.region: minio
77
ice.token: foo
8+
ice.maintenance.snapshot.expiration.days: 20

ice-rest-catalog/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,12 @@
237237
<version>7.9.0</version>
238238
<scope>test</scope>
239239
</dependency>
240+
<dependency>
241+
<groupId>com.github.shyiko.skedule</groupId>
242+
<artifactId>skedule</artifactId>
243+
<version>0.4.0</version>
244+
<classifier>kalvanized</classifier>
245+
</dependency>
240246
</dependencies>
241247

242248
<build>

ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/Main.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.altinity.ice.rest.catalog.internal.config.Config;
77
import com.altinity.ice.rest.catalog.internal.jetty.PlainErrorHandler;
88
import com.altinity.ice.rest.catalog.internal.jetty.ServerConfig;
9+
import com.altinity.ice.rest.catalog.internal.maintenance.MaintenanceScheduler;
910
import com.altinity.ice.rest.catalog.internal.rest.RESTCatalogAdapter;
1011
import com.altinity.ice.rest.catalog.internal.rest.RESTCatalogAuthorizationHandler;
1112
import com.altinity.ice.rest.catalog.internal.rest.RESTCatalogHandler;
@@ -66,6 +67,12 @@ public String[] getVersion() {
6667
description = "/path/to/config.yaml ($CWD/.ice-rest-catalog.yaml by default)")
6768
String configFile;
6869

70+
@CommandLine.Option(
71+
names = "--maintenance-interval",
72+
description =
73+
"Maintenance interval in human-friendly format (e.g. 'every day', 'every monday 09:00'). Leave empty to disable maintenance.")
74+
private String maintenanceInterval;
75+
6976
private Main() {}
7077

7178
private static Server createServer(int port, Catalog catalog, Map<String, String> config) {
@@ -276,6 +283,9 @@ public Integer call() throws Exception {
276283

277284
Catalog catalog = CatalogUtil.buildIcebergCatalog("rest_backend", config, null);
278285

286+
// Initialize and start the maintenance scheduler
287+
initializeMaintenanceScheduler(catalog, config);
288+
279289
// TODO: replace with uds (jetty-unixdomain-server is all that is needed here but in ice you'll
280290
// need to implement custom org.apache.iceberg.rest.RESTClient)
281291
String adminPort = config.get(Config.OPTION_ADMIN_PORT);
@@ -297,6 +307,23 @@ public Integer call() throws Exception {
297307
return 0;
298308
}
299309

310+
private void initializeMaintenanceScheduler(Catalog catalog, Map<String, String> config) {
311+
if (maintenanceInterval == null || maintenanceInterval.trim().isEmpty()) {
312+
logger.info("Maintenance scheduler is disabled (no maintenance interval specified)");
313+
return;
314+
}
315+
316+
try {
317+
MaintenanceScheduler scheduler =
318+
new MaintenanceScheduler(catalog, config, maintenanceInterval);
319+
scheduler.startScheduledMaintenance();
320+
logger.info("Maintenance scheduler initialized with interval: {}", maintenanceInterval);
321+
} catch (Exception e) {
322+
logger.error("Failed to initialize maintenance scheduler", e);
323+
throw new RuntimeException(e);
324+
}
325+
}
326+
300327
public static void main(String[] args) throws Exception {
301328
int exitCode = new CommandLine(new Main()).execute(args);
302329
System.exit(exitCode);

ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/Config.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ private Config() {}
4141
public static final String OPTION_ANONYMOUS_ACCESS = "ice.anonymous.access";
4242
public static final String OPTION_ANONYMOUS_ACCESS_CONFIG =
4343
"ice.anonymous.access.config"; // format: param=value&...
44+
public static final String OPTION_SNAPSHOT_EXPIRATION_DAYS =
45+
"ice.maintenance.snapshot.expiration.days";
4446

4547
// TODO: return Config, not Map
4648
// https://py.iceberg.apache.org/configuration/#setting-configuration-values
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
package com.altinity.ice.rest.catalog.internal.maintenance;
2+
3+
import com.altinity.ice.rest.catalog.internal.config.Config;
4+
import com.github.shyiko.skedule.Schedule;
5+
import java.time.ZonedDateTime;
6+
import java.util.List;
7+
import java.util.Map;
8+
import java.util.concurrent.ScheduledExecutorService;
9+
import java.util.concurrent.ScheduledFuture;
10+
import java.util.concurrent.ScheduledThreadPoolExecutor;
11+
import java.util.concurrent.TimeUnit;
12+
import java.util.concurrent.atomic.AtomicBoolean;
13+
import org.apache.iceberg.Table;
14+
import org.apache.iceberg.catalog.Catalog;
15+
import org.apache.iceberg.catalog.Namespace;
16+
import org.apache.iceberg.catalog.SupportsNamespaces;
17+
import org.apache.iceberg.catalog.TableIdentifier;
18+
import org.slf4j.Logger;
19+
import org.slf4j.LoggerFactory;
20+
21+
public class MaintenanceScheduler {
22+
private static final Logger logger = LoggerFactory.getLogger(MaintenanceScheduler.class);
23+
private static final int DEFAULT_EXPIRATION_DAYS = 30;
24+
25+
private final Catalog catalog;
26+
private final AtomicBoolean isMaintenanceMode = new AtomicBoolean(false);
27+
private final ScheduledExecutorService executor;
28+
private final Schedule schedule;
29+
private final Object taskLock = new Object();
30+
31+
private ScheduledFuture<?> currentTask;
32+
private final Integer snapshotExpirationDays;
33+
34+
public MaintenanceScheduler(
35+
Catalog catalog, Map<String, String> config, String maintenanceInterval) {
36+
this.catalog = catalog;
37+
this.executor = new ScheduledThreadPoolExecutor(1);
38+
((ScheduledThreadPoolExecutor) executor).setRemoveOnCancelPolicy(true);
39+
this.schedule = Schedule.parse(maintenanceInterval);
40+
if (config.containsKey(Config.OPTION_SNAPSHOT_EXPIRATION_DAYS)) {
41+
this.snapshotExpirationDays =
42+
Integer.parseInt(config.get(Config.OPTION_SNAPSHOT_EXPIRATION_DAYS));
43+
} else {
44+
this.snapshotExpirationDays = DEFAULT_EXPIRATION_DAYS;
45+
}
46+
}
47+
48+
public void startScheduledMaintenance() {
49+
scheduleNextMaintenance();
50+
}
51+
52+
public void stopScheduledMaintenance() {
53+
synchronized (taskLock) {
54+
if (currentTask != null) {
55+
currentTask.cancel(false);
56+
}
57+
executor.shutdown();
58+
}
59+
}
60+
61+
private void scheduleNextMaintenance() {
62+
synchronized (taskLock) {
63+
if (currentTask != null) {
64+
currentTask.cancel(false);
65+
}
66+
67+
ZonedDateTime now = ZonedDateTime.now();
68+
ZonedDateTime next = schedule.next(now);
69+
70+
long delay = next.toEpochSecond() - now.toEpochSecond();
71+
currentTask =
72+
executor.schedule(
73+
() -> {
74+
performMaintenance();
75+
scheduleNextMaintenance(); // Schedule next run
76+
},
77+
delay,
78+
TimeUnit.SECONDS);
79+
80+
logger.info("Next maintenance scheduled for: {}", next);
81+
}
82+
}
83+
84+
public void performMaintenance() {
85+
if (isMaintenanceMode.get()) {
86+
logger.info("Skipping maintenance task as system is already in maintenance mode");
87+
return;
88+
}
89+
90+
try {
91+
logger.info("Starting scheduled maintenance task");
92+
setMaintenanceMode(true);
93+
94+
if (catalog != null) {
95+
logger.info("Performing maintenance on catalog: {}", catalog.name());
96+
List<Namespace> namespaces;
97+
if (catalog instanceof SupportsNamespaces) {
98+
SupportsNamespaces nsCatalog = (SupportsNamespaces) catalog;
99+
namespaces = nsCatalog.listNamespaces();
100+
for (Namespace ns : namespaces) {
101+
logger.debug("Namespace: " + ns);
102+
}
103+
} else {
104+
logger.error("Catalog does not support namespace operations.");
105+
return;
106+
}
107+
108+
for (Namespace namespace : namespaces) {
109+
List<TableIdentifier> tables = catalog.listTables(namespace);
110+
for (TableIdentifier tableIdent : tables) {
111+
long olderThanMillis =
112+
System.currentTimeMillis() - TimeUnit.DAYS.toMillis(snapshotExpirationDays);
113+
Table table = catalog.loadTable(tableIdent);
114+
115+
// Check if table has any snapshots before performing maintenance
116+
if (table.currentSnapshot() == null) {
117+
logger.warn("Table {} has no snapshots, skipping maintenance", tableIdent);
118+
continue;
119+
}
120+
121+
table.rewriteManifests().rewriteIf(manifest -> true).commit();
122+
table.expireSnapshots().expireOlderThan(olderThanMillis).commit();
123+
}
124+
}
125+
logger.info("Maintenance operations completed for catalog: {}", catalog.name());
126+
} else {
127+
logger.warn("No catalog available for maintenance operations");
128+
}
129+
130+
logger.info("Scheduled maintenance task completed successfully");
131+
} catch (Exception e) {
132+
logger.error("Error during scheduled maintenance task", e);
133+
} finally {
134+
setMaintenanceMode(false);
135+
}
136+
}
137+
138+
private void setMaintenanceMode(boolean enabled) {
139+
isMaintenanceMode.set(enabled);
140+
logger.info("Maintenance mode {}", enabled ? "enabled" : "disabled");
141+
}
142+
}

0 commit comments

Comments
 (0)