Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions ice-rest-catalog/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,12 @@
<version>7.9.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.shyiko.skedule</groupId>
<artifactId>skedule</artifactId>
<version>0.4.0</version>
<classifier>kalvanized</classifier>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.altinity.ice.rest.catalog.internal.config.Config;
import com.altinity.ice.rest.catalog.internal.jetty.PlainErrorHandler;
import com.altinity.ice.rest.catalog.internal.jetty.ServerConfig;
import com.altinity.ice.rest.catalog.internal.maintenance.MaintenanceScheduler;
import com.altinity.ice.rest.catalog.internal.rest.RESTCatalogAdapter;
import com.altinity.ice.rest.catalog.internal.rest.RESTCatalogAuthorizationHandler;
import com.altinity.ice.rest.catalog.internal.rest.RESTCatalogHandler;
Expand Down Expand Up @@ -66,6 +67,13 @@ public String[] getVersion() {
description = "/path/to/config.yaml ($CWD/.ice-rest-catalog.yaml by default)")
String configFile;

@CommandLine.Option(
names = "--maintenance-interval",
description =
"Maintenance interval in human-friendly format (e.g. 'every day', 'every monday 09:00')",
defaultValue = "every day 00:00")
private String maintenanceInterval;

private Main() {}

private static Server createServer(int port, Catalog catalog, Map<String, String> config) {
Expand Down Expand Up @@ -276,6 +284,9 @@ public Integer call() throws Exception {

Catalog catalog = CatalogUtil.buildIcebergCatalog("rest_backend", config, null);

// Initialize and start the maintenance scheduler
initializeMaintenanceScheduler(catalog, config);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be nice to have maintenance optional (e.g. disable when maintenanceInterval is empty)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic added inside the function

  private void initializeMaintenanceScheduler(Catalog catalog, Map<String, String> config) {
    if (maintenanceInterval == null || maintenanceInterval.trim().isEmpty()) {
      logger.info("Maintenance scheduler is disabled (no maintenance interval specified)");
      return;
    ```


// TODO: replace with uds (jetty-unixdomain-server is all that is needed here but in ice you'll
// need to implement custom org.apache.iceberg.rest.RESTClient)
String adminPort = config.get(Config.OPTION_ADMIN_PORT);
Expand All @@ -297,6 +308,17 @@ public Integer call() throws Exception {
return 0;
}

private void initializeMaintenanceScheduler(Catalog catalog, Map<String, String> config) {
try {
MaintenanceScheduler scheduler = new MaintenanceScheduler(catalog, config);
scheduler.setMaintenanceSchedule(maintenanceInterval);
scheduler.startScheduledMaintenance();
} catch (Exception e) {
logger.error("Failed to initialize maintenance scheduler", e);
throw new RuntimeException(e);
}
}

public static void main(String[] args) throws Exception {
int exitCode = new CommandLine(new Main()).execute(args);
System.exit(exitCode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ private Config() {}
public static final String OPTION_ANONYMOUS_ACCESS = "ice.anonymous.access";
public static final String OPTION_ANONYMOUS_ACCESS_CONFIG =
"ice.anonymous.access.config"; // format: param=value&...
public static final String OPTION_SNAPSHOT_EXPIRATION_DAYS =
"ice.maintenance.snapshot.expiration.days";

// TODO: return Config, not Map
// https://py.iceberg.apache.org/configuration/#setting-configuration-values
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package com.altinity.ice.rest.catalog.internal.maintenance;

import com.altinity.ice.rest.catalog.internal.config.Config;
import com.github.shyiko.skedule.Schedule;
import java.time.LocalTime;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MaintenanceScheduler {
private static final Logger logger = LoggerFactory.getLogger(MaintenanceScheduler.class);
private static final int DEFAULT_EXPIRATION_DAYS = 30;

private final Catalog catalog;
private final AtomicBoolean isMaintenanceMode = new AtomicBoolean(false);
private final ScheduledExecutorService executor;
private final Map<String, String> config;
private ScheduledFuture<?> currentTask;
private Schedule schedule;

public MaintenanceScheduler(Catalog catalog, Map<String, String> config) {
this.catalog = catalog;
this.config = config;
this.executor = new ScheduledThreadPoolExecutor(1);
((ScheduledThreadPoolExecutor) executor).setRemoveOnCancelPolicy(true);
// Default schedule: every day at midnight
this.schedule = Schedule.at(LocalTime.MIDNIGHT).everyDay();
}

public void startScheduledMaintenance() {
scheduleNextMaintenance();
}

public void stopScheduledMaintenance() {
if (currentTask != null) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't look thread-safe

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added synchronization on object

currentTask.cancel(false);
}
executor.shutdown();
}

private void scheduleNextMaintenance() {
if (currentTask != null) {
currentTask.cancel(false);
}

ZonedDateTime now = ZonedDateTime.now();
ZonedDateTime next = schedule.next(now);

long delay = next.toEpochSecond() - now.toEpochSecond();
currentTask =
executor.schedule(
() -> {
performMaintenance();
scheduleNextMaintenance(); // Schedule next run
},
delay,
TimeUnit.SECONDS);

logger.info("Next maintenance scheduled for: {}", next);
}

public void setMaintenanceSchedule(String scheduleExpression) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest moving this to constructor (and making schedule final). Otherwise we have an issue with thread-safety here.

this.schedule = Schedule.parse(scheduleExpression);
scheduleNextMaintenance();
}

public void setMaintenanceMode(boolean enabled) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be private to avoid misuse

isMaintenanceMode.set(enabled);
logger.info("Maintenance mode {}", enabled ? "enabled" : "disabled");
}

public void performMaintenance() {
if (isMaintenanceMode.get()) {
logger.info("Skipping maintenance task as system is already in maintenance mode");
return;
}

try {
logger.info("Starting scheduled maintenance task");
setMaintenanceMode(true);

if (catalog != null) {
logger.info("Performing maintenance on catalog: {}", catalog.name());
List<Namespace> namespaces;
if (catalog instanceof SupportsNamespaces) {
SupportsNamespaces nsCatalog = (SupportsNamespaces) catalog;
namespaces = nsCatalog.listNamespaces();
for (Namespace ns : namespaces) {
logger.debug("Namespace: " + ns);
}
} else {
logger.error("Catalog does not support namespace operations.");
return;
}

for (Namespace namespace : namespaces) {
List<TableIdentifier> tables = catalog.listTables(namespace);
for (TableIdentifier tableIdent : tables) {
int expirationDays = DEFAULT_EXPIRATION_DAYS;
String configuredDays = config.get(Config.OPTION_SNAPSHOT_EXPIRATION_DAYS);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be better done in a constructor (with invalid value resulting in an exception)

if (configuredDays != null) {
try {
expirationDays = Integer.parseInt(configuredDays);
logger.debug("Using configured snapshot expiration days: {}", expirationDays);
} catch (NumberFormatException e) {
logger.warn(
"Invalid value for {}: {}. Using default of {} days",
Config.OPTION_SNAPSHOT_EXPIRATION_DAYS,
configuredDays,
DEFAULT_EXPIRATION_DAYS);
}
}
long olderThanMillis =
System.currentTimeMillis() - TimeUnit.DAYS.toMillis(expirationDays);
Table table = catalog.loadTable(tableIdent);
table.expireSnapshots().expireOlderThan(olderThanMillis).commit();
table.rewriteManifests().rewriteIf(manifest -> true).commit();
}
}
logger.info("Maintenance operations completed for catalog: {}", catalog.name());
} else {
logger.warn("No catalog available for maintenance operations");
}

logger.info("Scheduled maintenance task completed successfully");
} catch (Exception e) {
logger.error("Error during scheduled maintenance task", e);
} finally {
setMaintenanceMode(false);
}
}
}