diff --git a/examples/scratch/.ice-rest-catalog.yaml b/examples/scratch/.ice-rest-catalog.yaml index 2f076b93..f5c73af9 100644 --- a/examples/scratch/.ice-rest-catalog.yaml +++ b/examples/scratch/.ice-rest-catalog.yaml @@ -5,3 +5,4 @@ s3.access-key-id: "miniouser" s3.secret-access-key: "miniopassword" ice.s3.region: minio ice.token: foo +ice.maintenance.snapshot.expiration.days: 20 diff --git a/ice-rest-catalog/pom.xml b/ice-rest-catalog/pom.xml index 64a08a47..a0a63f22 100644 --- a/ice-rest-catalog/pom.xml +++ b/ice-rest-catalog/pom.xml @@ -237,6 +237,12 @@ 7.9.0 test + + com.github.shyiko.skedule + skedule + 0.4.0 + kalvanized + diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/Main.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/Main.java index 55afe574..c399d8fd 100644 --- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/Main.java +++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/Main.java @@ -6,6 +6,7 @@ import com.altinity.ice.rest.catalog.internal.config.Config; import com.altinity.ice.rest.catalog.internal.jetty.PlainErrorHandler; import com.altinity.ice.rest.catalog.internal.jetty.ServerConfig; +import com.altinity.ice.rest.catalog.internal.maintenance.MaintenanceScheduler; import com.altinity.ice.rest.catalog.internal.rest.RESTCatalogAdapter; import com.altinity.ice.rest.catalog.internal.rest.RESTCatalogAuthorizationHandler; import com.altinity.ice.rest.catalog.internal.rest.RESTCatalogHandler; @@ -66,6 +67,12 @@ public String[] getVersion() { description = "/path/to/config.yaml ($CWD/.ice-rest-catalog.yaml by default)") String configFile; + @CommandLine.Option( + names = "--maintenance-interval", + description = + "Maintenance interval in human-friendly format (e.g. 'every day', 'every monday 09:00'). Leave empty to disable maintenance.") + private String maintenanceInterval; + private Main() {} private static Server createServer(int port, Catalog catalog, Map config) { @@ -276,6 +283,9 @@ public Integer call() throws Exception { Catalog catalog = CatalogUtil.buildIcebergCatalog("rest_backend", config, null); + // Initialize and start the maintenance scheduler + initializeMaintenanceScheduler(catalog, config); + // TODO: replace with uds (jetty-unixdomain-server is all that is needed here but in ice you'll // need to implement custom org.apache.iceberg.rest.RESTClient) String adminPort = config.get(Config.OPTION_ADMIN_PORT); @@ -297,6 +307,23 @@ public Integer call() throws Exception { return 0; } + private void initializeMaintenanceScheduler(Catalog catalog, Map config) { + if (maintenanceInterval == null || maintenanceInterval.trim().isEmpty()) { + logger.info("Maintenance scheduler is disabled (no maintenance interval specified)"); + return; + } + + try { + MaintenanceScheduler scheduler = + new MaintenanceScheduler(catalog, config, maintenanceInterval); + scheduler.startScheduledMaintenance(); + logger.info("Maintenance scheduler initialized with interval: {}", maintenanceInterval); + } catch (Exception e) { + logger.error("Failed to initialize maintenance scheduler", e); + throw new RuntimeException(e); + } + } + public static void main(String[] args) throws Exception { int exitCode = new CommandLine(new Main()).execute(args); System.exit(exitCode); diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/Config.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/Config.java index 83a953b0..26a7d54d 100644 --- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/Config.java +++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/Config.java @@ -41,6 +41,8 @@ private Config() {} public static final String OPTION_ANONYMOUS_ACCESS = "ice.anonymous.access"; public static final String OPTION_ANONYMOUS_ACCESS_CONFIG = "ice.anonymous.access.config"; // format: param=value&... + public static final String OPTION_SNAPSHOT_EXPIRATION_DAYS = + "ice.maintenance.snapshot.expiration.days"; // TODO: return Config, not Map // https://py.iceberg.apache.org/configuration/#setting-configuration-values diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java new file mode 100644 index 00000000..3fc06b39 --- /dev/null +++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java @@ -0,0 +1,142 @@ +package com.altinity.ice.rest.catalog.internal.maintenance; + +import com.altinity.ice.rest.catalog.internal.config.Config; +import com.github.shyiko.skedule.Schedule; +import java.time.ZonedDateTime; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MaintenanceScheduler { + private static final Logger logger = LoggerFactory.getLogger(MaintenanceScheduler.class); + private static final int DEFAULT_EXPIRATION_DAYS = 30; + + private final Catalog catalog; + private final AtomicBoolean isMaintenanceMode = new AtomicBoolean(false); + private final ScheduledExecutorService executor; + private final Schedule schedule; + private final Object taskLock = new Object(); + + private ScheduledFuture currentTask; + private final Integer snapshotExpirationDays; + + public MaintenanceScheduler( + Catalog catalog, Map config, String maintenanceInterval) { + this.catalog = catalog; + this.executor = new ScheduledThreadPoolExecutor(1); + ((ScheduledThreadPoolExecutor) executor).setRemoveOnCancelPolicy(true); + this.schedule = Schedule.parse(maintenanceInterval); + if (config.containsKey(Config.OPTION_SNAPSHOT_EXPIRATION_DAYS)) { + this.snapshotExpirationDays = + Integer.parseInt(config.get(Config.OPTION_SNAPSHOT_EXPIRATION_DAYS)); + } else { + this.snapshotExpirationDays = DEFAULT_EXPIRATION_DAYS; + } + } + + public void startScheduledMaintenance() { + scheduleNextMaintenance(); + } + + public void stopScheduledMaintenance() { + synchronized (taskLock) { + if (currentTask != null) { + currentTask.cancel(false); + } + executor.shutdown(); + } + } + + private void scheduleNextMaintenance() { + synchronized (taskLock) { + if (currentTask != null) { + currentTask.cancel(false); + } + + ZonedDateTime now = ZonedDateTime.now(); + ZonedDateTime next = schedule.next(now); + + long delay = next.toEpochSecond() - now.toEpochSecond(); + currentTask = + executor.schedule( + () -> { + performMaintenance(); + scheduleNextMaintenance(); // Schedule next run + }, + delay, + TimeUnit.SECONDS); + + logger.info("Next maintenance scheduled for: {}", next); + } + } + + public void performMaintenance() { + if (isMaintenanceMode.get()) { + logger.info("Skipping maintenance task as system is already in maintenance mode"); + return; + } + + try { + logger.info("Starting scheduled maintenance task"); + setMaintenanceMode(true); + + if (catalog != null) { + logger.info("Performing maintenance on catalog: {}", catalog.name()); + List namespaces; + if (catalog instanceof SupportsNamespaces) { + SupportsNamespaces nsCatalog = (SupportsNamespaces) catalog; + namespaces = nsCatalog.listNamespaces(); + for (Namespace ns : namespaces) { + logger.debug("Namespace: " + ns); + } + } else { + logger.error("Catalog does not support namespace operations."); + return; + } + + for (Namespace namespace : namespaces) { + List tables = catalog.listTables(namespace); + for (TableIdentifier tableIdent : tables) { + long olderThanMillis = + System.currentTimeMillis() - TimeUnit.DAYS.toMillis(snapshotExpirationDays); + Table table = catalog.loadTable(tableIdent); + + // Check if table has any snapshots before performing maintenance + if (table.currentSnapshot() == null) { + logger.warn("Table {} has no snapshots, skipping maintenance", tableIdent); + continue; + } + + table.rewriteManifests().rewriteIf(manifest -> true).commit(); + table.expireSnapshots().expireOlderThan(olderThanMillis).commit(); + } + } + logger.info("Maintenance operations completed for catalog: {}", catalog.name()); + } else { + logger.warn("No catalog available for maintenance operations"); + } + + logger.info("Scheduled maintenance task completed successfully"); + } catch (Exception e) { + logger.error("Error during scheduled maintenance task", e); + } finally { + setMaintenanceMode(false); + } + } + + private void setMaintenanceMode(boolean enabled) { + isMaintenanceMode.set(enabled); + logger.info("Maintenance mode {}", enabled ? "enabled" : "disabled"); + } +}