diff --git a/.gitignore b/.gitignore
index ef7dc1f..52ecf19 100644
--- a/.gitignore
+++ b/.gitignore
@@ -6,3 +6,4 @@ target
/release
.devbox
/.envrc
+*.credentials
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 51855b0..10604b8 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -5,7 +5,24 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
-## [Unreleased](https://github.com/altinity/ice/compare/v0.4.0...master)
+## [Unreleased](https://github.com/altinity/ice/compare/v0.5.0...master)
+
+## [0.5.0](https://github.com/altinity/ice/compare/v0.4.0...v0.5.0)
+
+### Added
+- ice: Support for `ice insert` in watch mode. See [examples/s3watch](examples/s3watch) for details.
+- ice-rest-catalog: `MANIFEST_COMPACTION`, `DATA_COMPACTION`, `SNAPSHOT_CLEANUP` and `ORPHAN_CLEANUP` maintenance routines. These
+can be enabled either via ice-rest-catalog.yaml/maintenance section or performed ad-hoc via `ice-rest-catalog perform-maintenance`.
+- ice: `ice delete` command.
+
+### Changed
+- ice: `ice delete-table` not to delete any data unless `--purge` is specified.
+- ice-rest-catalog: catalog maintenance config section. `snapshotTTLInDays` moved to `maintenance.snapshotTTLInDays`.
+
+### Fixed
+- ice: Partitioning metadata missing when data is inserted with `--no-copy` or `--s3-copy-object`.
+- ice: `NULLS_FIRST`/`NULLS_LAST` being ignored when sorting.
+- ice: Path construction in `localfileio`.
## [0.4.0](https://github.com/altinity/ice/compare/v0.3.1...v0.4.0)
diff --git a/examples/s3watch/.gitignore b/examples/s3watch/.gitignore
new file mode 100644
index 0000000..e8d7728
--- /dev/null
+++ b/examples/s3watch/.gitignore
@@ -0,0 +1,5 @@
+/.ice-rest-catalog.yaml
+/.terraform.lock.hcl
+/terraform.tfstate
+/terraform.tfstate.backup
+/tf.export
diff --git a/examples/s3watch/.ice-rest-catalog.envsubst.yaml b/examples/s3watch/.ice-rest-catalog.envsubst.yaml
new file mode 100644
index 0000000..c646ba5
--- /dev/null
+++ b/examples/s3watch/.ice-rest-catalog.envsubst.yaml
@@ -0,0 +1,6 @@
+uri: jdbc:sqlite:file:data/ice-rest-catalog/db.sqlite?journal_mode=WAL&synchronous=OFF&journal_size_limit=500
+warehouse: s3://${CATALOG_S3_BUCKET_NAME}
+s3:
+ region: ${AWS_REGION}
+bearerTokens:
+- value: foo
diff --git a/examples/s3watch/.ice.yaml b/examples/s3watch/.ice.yaml
new file mode 100644
index 0000000..b81bc0a
--- /dev/null
+++ b/examples/s3watch/.ice.yaml
@@ -0,0 +1,3 @@
+uri: http://localhost:5000
+bearerToken: foo
+httpCacheDir: data/ice/http/cache
diff --git a/examples/s3watch/README.md b/examples/s3watch/README.md
new file mode 100644
index 0000000..c6bbef0
--- /dev/null
+++ b/examples/s3watch/README.md
@@ -0,0 +1,108 @@
+# examples/s3watch
+
+This example demonstrates how ice can be used to continuously add files to the catalog as they are being uploaded to s3
+bucket. It works by making ice listen for S3 object creation events via SQS queue.
+
+1. Allocate AWS resources.
+
+```shell
+devbox shell
+
+# auth into AWS
+#
+# either create a file named "aws.credentials" containing
+#
+# export AWS_ACCESS_KEY_ID=...
+# export AWS_SECRET_ACCESS_KEY=...
+# export AWS_SESSION_TOKEN=...
+# export AWS_REGION=us-west-2
+#
+# and then load it as shown below or use any other method
+source aws.credentials
+
+# create s3 bucket + configure notification queue
+terraform init
+terraform apply
+
+# save terraform output for easy loading
+echo $"
+export CATALOG_S3_BUCKET_NAME=$(terraform output -raw s3_bucket_name)
+export CATALOG_SQS_QUEUE_URL=$(terraform output -raw sqs_queue_url)
+" > tf.export
+```
+
+2. Start Iceberg REST Catalog.
+
+```shell
+devbox shell
+
+source aws.credentials
+source tf.export
+
+# generate config
+cat .ice-rest-catalog.envsubst.yaml | \
+ envsubst -no-unset -no-empty > .ice-rest-catalog.yaml
+
+# run
+ice-rest-catalog
+```
+
+3. Start `ice insert` in watch mode.
+
+```shell
+devbox shell
+
+source aws.credentials # for sqs:ReceiveMessages
+source tf.export
+
+# run
+ice insert flowers.iris -p --no-copy --skip-duplicates \
+ s3://$CATALOG_S3_BUCKET_NAME/flowers/iris/external-data/*.parquet \
+ --watch="$CATALOG_SQS_QUEUE_URL"
+```
+
+4. Put some data into s3 bucket any way you want, e.g. using `aws s3 cp`.
+
+```shell
+devbox shell
+
+source aws.credentials
+source tf.export
+
+# upload data to s3
+aws s3 cp iris.parquet s3://$CATALOG_S3_BUCKET_NAME/flowers/iris/external-data/
+```
+
+5. Query data from ClickHouse.
+
+```shell
+devbox shell
+
+source tf.export
+
+clickhouse local -q $"
+SET allow_experimental_database_iceberg = 1;
+
+-- (re)create iceberg db
+DROP DATABASE IF EXISTS ice;
+
+CREATE DATABASE ice
+ ENGINE = DataLakeCatalog('http://localhost:5000')
+ SETTINGS catalog_type = 'rest',
+ auth_header = 'Authorization: Bearer foo',
+ warehouse = 's3://${CATALOG_S3_BUCKET_NAME}';
+
+select count(*) from ice.\`flowers.iris\`;
+"
+```
+
+6. Clean up.
+
+```shell
+devbox shell
+
+source aws.credentials
+
+terraform destroy
+rm -rf data/
+```
diff --git a/examples/s3watch/devbox.json b/examples/s3watch/devbox.json
new file mode 100644
index 0000000..b23fac7
--- /dev/null
+++ b/examples/s3watch/devbox.json
@@ -0,0 +1,17 @@
+{
+ "$schema": "https://raw.githubusercontent.com/jetify-com/devbox/0.10.7/.schema/devbox.schema.json",
+ "packages": [
+ "awscli2@latest",
+ "terraform@1.12.2",
+ "jdk-headless@21.0.7+6"
+ ],
+ "env": {
+ "AT": "ice:examples/s3watch"
+ },
+ "shell": {
+ "init_hook": [
+ "export PATH=$(pwd):$(pwd)/.devbox/bin:$PATH",
+ "[ -f .devbox/bin/clickhouse ] || (curl https://clickhouse.com/ | sh && mv clickhouse .devbox/bin/)"
+ ]
+ }
+}
diff --git a/examples/s3watch/devbox.lock b/examples/s3watch/devbox.lock
new file mode 100644
index 0000000..89d22e8
--- /dev/null
+++ b/examples/s3watch/devbox.lock
@@ -0,0 +1,156 @@
+{
+ "lockfile_version": "1",
+ "packages": {
+ "awscli2@latest": {
+ "last_modified": "2025-08-08T08:05:48Z",
+ "resolved": "github:NixOS/nixpkgs/a3f3e3f2c983e957af6b07a1db98bafd1f87b7a1#awscli2",
+ "source": "devbox-search",
+ "version": "2.28.1",
+ "systems": {
+ "aarch64-darwin": {
+ "outputs": [
+ {
+ "name": "out",
+ "path": "/nix/store/rgavcd6nf9ycmm53r86js1n8zv6k5717-awscli2-2.28.1",
+ "default": true
+ },
+ {
+ "name": "dist",
+ "path": "/nix/store/bcqbw14w0689hslsnd9r1vqh8445n5d3-awscli2-2.28.1-dist"
+ }
+ ],
+ "store_path": "/nix/store/rgavcd6nf9ycmm53r86js1n8zv6k5717-awscli2-2.28.1"
+ },
+ "aarch64-linux": {
+ "outputs": [
+ {
+ "name": "out",
+ "path": "/nix/store/agnnh10lr6xhvvyy9k74bbsiwaxd18ma-awscli2-2.28.1",
+ "default": true
+ },
+ {
+ "name": "dist",
+ "path": "/nix/store/bjn51pqs4q12ajiq8idsv4pxfi85zqyv-awscli2-2.28.1-dist"
+ }
+ ],
+ "store_path": "/nix/store/agnnh10lr6xhvvyy9k74bbsiwaxd18ma-awscli2-2.28.1"
+ },
+ "x86_64-darwin": {
+ "outputs": [
+ {
+ "name": "out",
+ "path": "/nix/store/fy6lwzj66lhbzqx4023jpkl78l5sb3l4-awscli2-2.28.1",
+ "default": true
+ },
+ {
+ "name": "dist",
+ "path": "/nix/store/72zij4gvx5rhr4l8jn0d4bas35brf19h-awscli2-2.28.1-dist"
+ }
+ ],
+ "store_path": "/nix/store/fy6lwzj66lhbzqx4023jpkl78l5sb3l4-awscli2-2.28.1"
+ },
+ "x86_64-linux": {
+ "outputs": [
+ {
+ "name": "out",
+ "path": "/nix/store/a830f5ksp5fa8v8fl0bw29amwazwbf50-awscli2-2.28.1",
+ "default": true
+ },
+ {
+ "name": "dist",
+ "path": "/nix/store/6ddshy6z4h24brknz0a1ahjdvbm7yl6s-awscli2-2.28.1-dist"
+ }
+ ],
+ "store_path": "/nix/store/a830f5ksp5fa8v8fl0bw29amwazwbf50-awscli2-2.28.1"
+ }
+ }
+ },
+ "github:NixOS/nixpkgs/nixpkgs-unstable": {
+ "resolved": "github:NixOS/nixpkgs/32f313e49e42f715491e1ea7b306a87c16fe0388?lastModified=1755268003&narHash=sha256-nNaeJjo861wFR0tjHDyCnHs1rbRtrMgxAKMoig9Sj%2Fw%3D"
+ },
+ "jdk-headless@21.0.7+6": {
+ "last_modified": "2025-07-28T17:09:23Z",
+ "resolved": "github:NixOS/nixpkgs/648f70160c03151bc2121d179291337ad6bc564b#jdk21_headless",
+ "source": "devbox-search",
+ "version": "21.0.7+6",
+ "systems": {
+ "aarch64-linux": {
+ "outputs": [
+ {
+ "name": "out",
+ "path": "/nix/store/sr7lg8i0c9f999klq8j7zpajwhx5y8j5-openjdk-headless-21.0.7+6",
+ "default": true
+ },
+ {
+ "name": "debug",
+ "path": "/nix/store/7j7r7k6rj9lb33xsmk1rn00dj8kq1ajw-openjdk-headless-21.0.7+6-debug"
+ }
+ ],
+ "store_path": "/nix/store/sr7lg8i0c9f999klq8j7zpajwhx5y8j5-openjdk-headless-21.0.7+6"
+ },
+ "x86_64-linux": {
+ "outputs": [
+ {
+ "name": "out",
+ "path": "/nix/store/b8272lgswnw20fkd3b8av2ghqi60m725-openjdk-headless-21.0.7+6",
+ "default": true
+ },
+ {
+ "name": "debug",
+ "path": "/nix/store/cjziiakfgvacvhdjg91bdcnz4zyh1q84-openjdk-headless-21.0.7+6-debug"
+ }
+ ],
+ "store_path": "/nix/store/b8272lgswnw20fkd3b8av2ghqi60m725-openjdk-headless-21.0.7+6"
+ }
+ }
+ },
+ "terraform@1.12.2": {
+ "last_modified": "2025-07-28T17:09:23Z",
+ "resolved": "github:NixOS/nixpkgs/648f70160c03151bc2121d179291337ad6bc564b#terraform",
+ "source": "devbox-search",
+ "version": "1.12.2",
+ "systems": {
+ "aarch64-darwin": {
+ "outputs": [
+ {
+ "name": "out",
+ "path": "/nix/store/40gjbhfk5r4njbvkny3jcn8dz9slr138-terraform-1.12.2",
+ "default": true
+ }
+ ],
+ "store_path": "/nix/store/40gjbhfk5r4njbvkny3jcn8dz9slr138-terraform-1.12.2"
+ },
+ "aarch64-linux": {
+ "outputs": [
+ {
+ "name": "out",
+ "path": "/nix/store/bvi0sf7qi75lk7aczgwp1bq811py9rj0-terraform-1.12.2",
+ "default": true
+ }
+ ],
+ "store_path": "/nix/store/bvi0sf7qi75lk7aczgwp1bq811py9rj0-terraform-1.12.2"
+ },
+ "x86_64-darwin": {
+ "outputs": [
+ {
+ "name": "out",
+ "path": "/nix/store/km3x95bvr81n0lpyfr7nf00vifl3q6hy-terraform-1.12.2",
+ "default": true
+ }
+ ],
+ "store_path": "/nix/store/km3x95bvr81n0lpyfr7nf00vifl3q6hy-terraform-1.12.2"
+ },
+ "x86_64-linux": {
+ "outputs": [
+ {
+ "name": "out",
+ "path": "/nix/store/vxazqkmsxlc6fgx3kl53jxiwjszghwrm-terraform-1.12.2",
+ "default": true
+ }
+ ],
+ "store_path": "/nix/store/vxazqkmsxlc6fgx3kl53jxiwjszghwrm-terraform-1.12.2"
+ }
+ }
+ }
+ }
+}
diff --git a/examples/s3watch/iris.parquet b/examples/s3watch/iris.parquet
new file mode 100644
index 0000000..028c64c
Binary files /dev/null and b/examples/s3watch/iris.parquet differ
diff --git a/examples/s3watch/main.tf b/examples/s3watch/main.tf
new file mode 100644
index 0000000..18500fb
--- /dev/null
+++ b/examples/s3watch/main.tf
@@ -0,0 +1,67 @@
+terraform {
+ required_providers {
+ aws = {
+ source = "hashicorp/aws"
+ version = "~> 6.0"
+ }
+ }
+}
+
+provider "aws" {}
+
+locals {
+ sqs_queue_prefix = "ice-s3watch"
+ s3_bucket_prefix = "ice-s3watch"
+}
+
+resource "aws_s3_bucket" "this" {
+ bucket_prefix = local.s3_bucket_prefix
+ force_destroy = true
+}
+
+resource "aws_sqs_queue" "this" {
+ name_prefix = local.sqs_queue_prefix
+}
+
+resource "aws_sqs_queue_policy" "this" {
+ queue_url = aws_sqs_queue.this.id
+ policy = data.aws_iam_policy_document.queue.json
+}
+
+data "aws_iam_policy_document" "queue" {
+ statement {
+ effect = "Allow"
+
+ principals {
+ type = "*"
+ identifiers = ["*"]
+ }
+
+ actions = ["sqs:SendMessage"]
+ resources = [aws_sqs_queue.this.arn]
+
+ condition {
+ test = "ArnEquals"
+ variable = "aws:SourceArn"
+ values = [aws_s3_bucket.this.arn]
+ }
+ }
+}
+
+resource "aws_s3_bucket_notification" "this" {
+ bucket = aws_s3_bucket.this.id
+
+ queue {
+ queue_arn = aws_sqs_queue.this.arn
+ events = ["s3:ObjectCreated:*"]
+ filter_suffix = ".parquet"
+ }
+}
+
+output "s3_bucket_name" {
+ value = aws_s3_bucket.this.id
+}
+
+output "sqs_queue_url" {
+ value = aws_sqs_queue.this.id
+}
diff --git a/examples/scratch/README.md b/examples/scratch/README.md
index 454a808..e2e50bb 100644
--- a/examples/scratch/README.md
+++ b/examples/scratch/README.md
@@ -50,9 +50,6 @@ ice create-table flowers.iris_no_copy --schema-from-parquet=file://iris.parquet
local-mc cp iris.parquet local/bucket1/flowers/iris_no_copy/
ice insert flowers.iris_no_copy --no-copy s3://bucket1/flowers/iris_no_copy/iris.parquet
-# delete partition(By default --dry-run=true is enabled to print the list of partitions that will be deleted)
-ice delete nyc.taxis --dry-run=false
-
# inspect
ice describe
diff --git a/ice-rest-catalog/pom.xml b/ice-rest-catalog/pom.xml
index c0f700f..dd1b71b 100644
--- a/ice-rest-catalog/pom.xml
+++ b/ice-rest-catalog/pom.xml
@@ -13,9 +13,6 @@
ice-rest-catalog
- 11.0.25
- 6.1.0
- 1.3.6
3.46.1.0
0.8.5
1.70.0
diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/Main.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/Main.java
index a0fcc5d..c010a8c 100644
--- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/Main.java
+++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/Main.java
@@ -9,15 +9,24 @@
*/
package com.altinity.ice.rest.catalog;
+import com.altinity.ice.internal.io.Matcher;
+import com.altinity.ice.internal.jetty.DebugServer;
+import com.altinity.ice.internal.jetty.PlainErrorHandler;
+import com.altinity.ice.internal.jetty.ServerConfig;
import com.altinity.ice.internal.picocli.VersionProvider;
import com.altinity.ice.internal.strings.Strings;
import com.altinity.ice.rest.catalog.internal.auth.Session;
import com.altinity.ice.rest.catalog.internal.aws.CredentialsProvider;
import com.altinity.ice.rest.catalog.internal.config.Config;
+import com.altinity.ice.rest.catalog.internal.config.MaintenanceConfig;
import com.altinity.ice.rest.catalog.internal.etcd.EtcdCatalog;
-import com.altinity.ice.rest.catalog.internal.jetty.PlainErrorHandler;
-import com.altinity.ice.rest.catalog.internal.jetty.ServerConfig;
+import com.altinity.ice.rest.catalog.internal.maintenance.DataCompaction;
+import com.altinity.ice.rest.catalog.internal.maintenance.MaintenanceJob;
+import com.altinity.ice.rest.catalog.internal.maintenance.MaintenanceRunner;
import com.altinity.ice.rest.catalog.internal.maintenance.MaintenanceScheduler;
+import com.altinity.ice.rest.catalog.internal.maintenance.ManifestCompaction;
+import com.altinity.ice.rest.catalog.internal.maintenance.OrphanCleanup;
+import com.altinity.ice.rest.catalog.internal.maintenance.SnapshotCleanup;
import com.altinity.ice.rest.catalog.internal.rest.RESTCatalogAdapter;
import com.altinity.ice.rest.catalog.internal.rest.RESTCatalogAuthorizationHandler;
import com.altinity.ice.rest.catalog.internal.rest.RESTCatalogHandler;
@@ -26,25 +35,21 @@
import com.altinity.ice.rest.catalog.internal.rest.RESTCatalogServlet;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.net.HostAndPort;
-import io.prometheus.metrics.exporter.servlet.jakarta.PrometheusMetricsServlet;
import io.prometheus.metrics.instrumentation.jvm.JvmMetrics;
-import jakarta.servlet.http.HttpServlet;
-import jakarta.servlet.http.HttpServletRequest;
-import jakarta.servlet.http.HttpServletResponse;
import java.io.IOException;
-import java.io.PrintWriter;
-import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.aws.s3.S3FileIOProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.eclipse.jetty.http.MimeTypes;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.gzip.GzipHandler;
@@ -85,6 +90,114 @@ public String configFile() {
private Main() {}
+ // TODO: ice-rest-catalog maintenance list-orphan-files/list-data-compaction-candidates
+ @CommandLine.Command(name = "perform-maintenance", description = "Run maintenance job(s).")
+ void performMaintenance(
+ @CommandLine.Option(
+ names = "--job",
+ description =
+ "MANIFEST_COMPACTION, DATA_COMPACTION, SNAPSHOT_CLEANUP, ORPHAN_CLEANUP (all by default)")
+ MaintenanceConfig.Job[] modes,
+ @CommandLine.Option(
+ names = "--max-snapshot-age-hours",
+ description = "Max snapshot age in hours (5 days by default)")
+ Integer maxSnapshotAgeHours,
+ @CommandLine.Option(
+ names = "--min-snapshots-to-keep",
+ description = "Minimum number of snapshots to keep (1 by default)")
+ Integer minSnapshotsToKeep,
+ @CommandLine.Option(
+ names = "--target-file-size-mb",
+ description = "The target file size for the table in MB (64 min; 512 by default)")
+ Integer targetFileSizeMB,
+ @CommandLine.Option(
+ names = "--min-input-files",
+ description =
+ "The minimum number of files to be compacted if a table partition size is smaller than the target file size (5 by default)")
+ Integer minInputFiles,
+ @CommandLine.Option(
+ names = "--data-compaction-candidate-min-age-in-hours",
+ description =
+ "How long to wait before considering file for data compaction (1 day by default; -1 to disable)")
+ Integer dataCompactionCandidateMinAgeInHours,
+ @CommandLine.Option(
+ names = "--orphan-file-retention-period-in-days",
+ description =
+ "The number of days to retain orphan files before deleting them (3 by default; -1 to disable)")
+ Integer orphanFileRetentionPeriodInDays,
+ @CommandLine.Option(
+ names = "--orphan-whitelist",
+ description = "Orphan whitelist (defaults to */metadata/*, */data/*)")
+ String[] orphanWhitelist,
+ @CommandLine.Option(names = "--dry-run", description = "Print changes without applying them")
+ Boolean dryRun,
+ @CommandLine.Option(
+ names = "--schedule",
+ description =
+ "Maintenance schedule in https://github.com/shyiko/skedule?tab=readme-ov-file#format format, e.g. \"every day 00:00\". Empty schedule means one time run (default)")
+ String schedule,
+ @CommandLine.Option(
+ names = "--debug-addr",
+ description = "host:port (0.0.0.0:5001 by default)")
+ String debugAddr)
+ throws IOException, InterruptedException {
+ var config = Config.load(configFile());
+
+ MaintenanceConfig maintenanceConfig = config.maintenance();
+ maintenanceConfig =
+ new MaintenanceConfig(
+ modes != null ? modes : maintenanceConfig.jobs(),
+ Objects.requireNonNullElse(
+ maxSnapshotAgeHours, maintenanceConfig.maxSnapshotAgeHours()),
+ Objects.requireNonNullElse(minSnapshotsToKeep, maintenanceConfig.minSnapshotsToKeep()),
+ Objects.requireNonNullElse(targetFileSizeMB, maintenanceConfig.targetFileSizeMB()),
+ Objects.requireNonNullElse(minInputFiles, maintenanceConfig.minInputFiles()),
+ Objects.requireNonNullElse(
+ dataCompactionCandidateMinAgeInHours,
+ maintenanceConfig.dataCompactionCandidateMinAgeInHours()),
+ Objects.requireNonNullElse(
+ orphanFileRetentionPeriodInDays,
+ maintenanceConfig.orphanFileRetentionPeriodInDays()),
+ Objects.requireNonNullElse(orphanWhitelist, maintenanceConfig.orphanWhitelist()),
+ Objects.requireNonNullElse(dryRun, maintenanceConfig.dryRun()));
+
+ var icebergConfig = config.toIcebergConfig();
+ logger.debug(
+ "Iceberg configuration: {}",
+ icebergConfig.entrySet().stream()
+ .map(e -> !e.getKey().contains("key") ? e.getKey() + "=" + e.getValue() : e.getKey())
+ .sorted()
+ .collect(Collectors.joining(", ")));
+
+ var catalog = loadCatalog(config, icebergConfig);
+
+ MaintenanceRunner maintenanceRunner = newMaintenanceRunner(catalog, maintenanceConfig);
+ String activeSchedule =
+ !Strings.isNullOrEmpty(schedule) ? schedule : config.maintenanceSchedule();
+ if (Strings.isNullOrEmpty(activeSchedule)) {
+ maintenanceRunner.run();
+ } else {
+ // TODO: ensure all http handlers are hooked in
+ JvmMetrics.builder().register();
+
+ HostAndPort debugHostAndPort =
+ HostAndPort.fromString(
+ !Strings.isNullOrEmpty(debugAddr) ? debugAddr : config.debugAddr());
+ Server debugServer =
+ DebugServer.create(debugHostAndPort.getHost(), debugHostAndPort.getPort());
+ try {
+ debugServer.start();
+ } catch (Exception e) {
+ throw new RuntimeException(e); // TODO: find a better one
+ }
+ logger.info("Serving http://{}/{metrics,healtz,livez,readyz}", debugHostAndPort);
+
+ startMaintenanceScheduler(activeSchedule, maintenanceRunner);
+
+ debugServer.join();
+ }
+ }
+
private static Server createServer(
String host, int port, Catalog catalog, Config config, Map icebergConfig) {
var s = createBaseServer(catalog, config, icebergConfig, true);
@@ -227,43 +340,6 @@ private static RESTCatalogAuthorizationHandler createAuthorizationHandler(
return new RESTCatalogAuthorizationHandler(tokens, anonymousSession);
}
- private static Server createDebugServer(String host, int port) {
- var mux = new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
- mux.insertHandler(new GzipHandler());
-
- mux.addServlet(new ServletHolder(new PrometheusMetricsServlet()), "/metrics");
- var h =
- new ServletHolder(
- new HttpServlet() {
- @Override
- protected void doGet(HttpServletRequest req, HttpServletResponse resp)
- throws IOException {
- resp.setStatus(HttpServletResponse.SC_OK);
- resp.setContentType(MimeTypes.Type.TEXT_PLAIN.asString());
- resp.setCharacterEncoding(StandardCharsets.UTF_8.name());
- try (PrintWriter w = resp.getWriter()) {
- w.write("OK");
- }
- }
- });
- mux.addServlet(h, "/healthz");
-
- // TODO: provide proper impl
- mux.addServlet(h, "/livez");
- mux.addServlet(h, "/readyz");
-
- var s = new Server();
- overrideJettyDefaults(s);
- s.setHandler(mux);
-
- ServerConnector connector = new ServerConnector(s);
- connector.setHost(host);
- connector.setPort(port);
- s.addConnector(connector);
-
- return s;
- }
-
private static void overrideJettyDefaults(Server s) {
ServerConfig.setQuiet(s);
s.setErrorHandler(new PlainErrorHandler());
@@ -280,7 +356,11 @@ public Integer call() throws Exception {
.map(e -> !e.getKey().contains("key") ? e.getKey() + "=" + e.getValue() : e.getKey())
.sorted()
.collect(Collectors.joining(", ")));
+
+ var catalog = loadCatalog(config, icebergConfig);
+
ObjectMapper om = new ObjectMapper();
+
for (Config.Token t : config.bearerTokens()) {
if (Strings.isNullOrEmpty(t.name())) {
logger.info(
@@ -299,29 +379,18 @@ public Integer call() throws Exception {
om.writeValueAsString(config.anonymousAccess().accessConfig()));
}
- // FIXME: remove
- if (config.s3() != null) {
- var awsRegion = config.s3().region();
- if (!awsRegion.isEmpty()) {
- System.setProperty("aws.region", awsRegion);
- }
+ // Initialize and start the maintenance scheduler
+ if (!Strings.isNullOrEmpty(config.maintenanceSchedule())) {
+ MaintenanceRunner maintenanceRunner = newMaintenanceRunner(catalog, config.maintenance());
+ startMaintenanceScheduler(config.maintenanceSchedule(), maintenanceRunner);
+ // TODO: http endpoint to trigger
+ } else {
+ logger.info("Catalog maintenance disabled (no maintenance schedule specified)");
}
// TODO: ensure all http handlers are hooked in
JvmMetrics.builder().register();
- String catalogName = config.name();
- String catalogImpl = icebergConfig.get(CatalogProperties.CATALOG_IMPL);
- Catalog catalog;
- if (EtcdCatalog.class.getName().equals(catalogImpl)) {
- catalog = newEctdCatalog(catalogName, icebergConfig);
- } else {
- catalog = CatalogUtil.buildIcebergCatalog(catalogName, icebergConfig, null);
- }
-
- // Initialize and start the maintenance scheduler
- initializeMaintenanceScheduler(catalog, config);
-
// TODO: replace with uds (jetty-unixdomain-server is all that is needed here but in ice you'll
// need to implement custom org.apache.iceberg.rest.RESTClient)
if (!Strings.isNullOrEmpty(config.adminAddr())) {
@@ -345,25 +414,76 @@ public Integer call() throws Exception {
// FIXME: exception here does not terminate the process
HostAndPort debugHostAndPort = HostAndPort.fromString(config.debugAddr());
- createDebugServer(debugHostAndPort.getHost(), debugHostAndPort.getPort()).start();
+ DebugServer.create(debugHostAndPort.getHost(), debugHostAndPort.getPort()).start();
logger.info("Serving http://{}/{metrics,healtz,livez,readyz}", debugHostAndPort);
httpServer.join();
return 0;
}
- private void initializeMaintenanceScheduler(Catalog catalog, Config config) {
- if (Strings.isNullOrEmpty(config.maintenanceSchedule())) {
+ private Catalog loadCatalog(Config config, Map icebergConfig) throws IOException {
+ // FIXME: remove
+ if (config.s3() != null) {
+ var awsRegion = config.s3().region();
+ if (!awsRegion.isEmpty()) {
+ System.setProperty("aws.region", awsRegion);
+ }
+ }
+
+ String catalogName = config.name();
+ String catalogImpl = icebergConfig.get(CatalogProperties.CATALOG_IMPL);
+ Catalog catalog;
+ if (EtcdCatalog.class.getName().equals(catalogImpl)) {
+ catalog = newEctdCatalog(catalogName, icebergConfig);
+ } else {
+ catalog = CatalogUtil.buildIcebergCatalog(catalogName, icebergConfig, null);
+ }
+ return catalog;
+ }
+
+ private MaintenanceRunner newMaintenanceRunner(Catalog catalog, MaintenanceConfig config) {
+ final boolean dryRun = config.dryRun();
+ MaintenanceConfig.Job[] jobNames =
+ Objects.requireNonNullElse(config.jobs(), MaintenanceConfig.Job.values());
+ List jobs =
+ Arrays.stream(jobNames)
+ .distinct()
+ .map(
+ maintenanceMode ->
+ (MaintenanceJob)
+ switch (maintenanceMode) {
+ case DATA_COMPACTION ->
+ new DataCompaction(
+ config.targetFileSizeMB() * 1024L * 1024L,
+ config.minInputFiles(),
+ TimeUnit.HOURS.toMillis(
+ config.dataCompactionCandidateMinAgeInHours()),
+ dryRun);
+ case MANIFEST_COMPACTION -> new ManifestCompaction(dryRun);
+ case ORPHAN_CLEANUP ->
+ new OrphanCleanup(
+ TimeUnit.DAYS.toMillis(config.orphanFileRetentionPeriodInDays()),
+ Matcher.from(config.orphanWhitelist()),
+ dryRun);
+ case SNAPSHOT_CLEANUP ->
+ new SnapshotCleanup(
+ config.maxSnapshotAgeHours(),
+ config.minSnapshotsToKeep(),
+ dryRun);
+ })
+ .toList();
+ return new MaintenanceRunner(catalog, jobs);
+ }
+
+ private void startMaintenanceScheduler(String schedule, MaintenanceRunner maintenanceRunner) {
+ if (Strings.isNullOrEmpty(schedule)) {
logger.info("Catalog maintenance disabled (no maintenance schedule specified)");
return;
}
try {
- MaintenanceScheduler scheduler =
- new MaintenanceScheduler(
- catalog, config.maintenanceSchedule(), config.snapshotTTLInDays());
+ MaintenanceScheduler scheduler = new MaintenanceScheduler(schedule, maintenanceRunner);
scheduler.startScheduledMaintenance();
- logger.info(
- "Maintenance scheduler initialized with schedule: {}", config.maintenanceSchedule());
+ logger.info("Maintenance scheduler initialized with schedule: {}", schedule);
} catch (Exception e) {
logger.error("Failed to initialize maintenance scheduler", e);
throw new RuntimeException(e);
diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/Config.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/Config.java
index 9ba4fce..9f69ed2 100644
--- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/Config.java
+++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/Config.java
@@ -50,10 +50,13 @@ public record Config(
@JsonPropertyDescription("Settings for warehouse=s3://...") S3 s3,
Token[] bearerTokens,
@JsonPropertyDescription("Anonymous access configuration") AnonymousAccess anonymousAccess,
+
+ // TODO: per-table maintenance config
+
@JsonPropertyDescription(
"Maintenance schedule in https://github.com/shyiko/skedule?tab=readme-ov-file#format format, e.g. \"every day 00:00\". Empty schedule disables automatic maintenance (default)")
String maintenanceSchedule,
- @JsonPropertyDescription("TTL for snapshots in days.") int snapshotTTLInDays,
+ @JsonPropertyDescription("Maintenance config") MaintenanceConfig maintenance,
@JsonPropertyDescription(
"(experimental) Extra properties to include in loadTable REST response.")
Map loadTableProperties,
@@ -78,7 +81,7 @@ public Config(
Token[] bearerTokens,
AnonymousAccess anonymousAccess,
String maintenanceSchedule,
- int snapshotTTLInDays,
+ MaintenanceConfig maintenance,
Map loadTableProperties,
@JsonProperty("iceberg") Map icebergProperties) {
this.addr = Strings.orDefault(addr, DEFAULT_ADDR);
@@ -93,7 +96,9 @@ public Config(
this.anonymousAccess =
Objects.requireNonNullElse(anonymousAccess, new AnonymousAccess(false, null));
this.maintenanceSchedule = maintenanceSchedule;
- this.snapshotTTLInDays = snapshotTTLInDays;
+ this.maintenance =
+ Objects.requireNonNullElseGet(
+ maintenance, () -> new MaintenanceConfig(null, 0, 0, 0, 0, 0, 0, null, false));
this.loadTableProperties = Objects.requireNonNullElse(loadTableProperties, Map.of());
this.icebergProperties = Objects.requireNonNullElse(icebergProperties, Map.of());
}
diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/MaintenanceConfig.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/MaintenanceConfig.java
new file mode 100644
index 0000000..85efae9
--- /dev/null
+++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/config/MaintenanceConfig.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright (c) 2025 Altinity Inc and/or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ */
+package com.altinity.ice.rest.catalog.internal.config;
+
+import com.fasterxml.jackson.annotation.JsonPropertyDescription;
+import org.apache.iceberg.TableProperties;
+
+public record MaintenanceConfig(
+ @JsonPropertyDescription(
+ "MANIFEST_COMPACTION, DATA_COMPACTION, SNAPSHOT_CLEANUP, ORPHAN_CLEANUP")
+ Job[] jobs,
+ @JsonPropertyDescription("Max snapshot age in hours (5 days by default).")
+ int maxSnapshotAgeHours,
+ @JsonPropertyDescription("Minimum number of snapshots to keep (1 by default).")
+ int minSnapshotsToKeep,
+ @JsonPropertyDescription("The target file size for the table in MB (64 min; 512 by default).")
+ int targetFileSizeMB,
+ @JsonPropertyDescription(
+ "The minimum number of files to be compacted if a table partition size is smaller than the target file size (5 by default).")
+ int minInputFiles,
+ @JsonPropertyDescription(
+ "How long to wait before considering file for data compaction (3 hours by default; -1 to disable).")
+ int dataCompactionCandidateMinAgeInHours,
+ @JsonPropertyDescription(
+ "The number of days to retain orphan files before deleting them (3 by default; -1 to disable).")
+ int orphanFileRetentionPeriodInDays,
+ @JsonPropertyDescription("Orphan whitelist (defaults to */metadata/*, */data/*)")
+ String[] orphanWhitelist,
+ @JsonPropertyDescription("Print changes without applying them") boolean dryRun) {
+
+ public enum Job {
+ MANIFEST_COMPACTION,
+ DATA_COMPACTION,
+ SNAPSHOT_CLEANUP,
+ ORPHAN_CLEANUP;
+ }
+
+ public MaintenanceConfig(
+ Job[] jobs,
+ int maxSnapshotAgeHours,
+ int minSnapshotsToKeep,
+ int targetFileSizeMB,
+ int minInputFiles,
+ int dataCompactionCandidateMinAgeInHours,
+ int orphanFileRetentionPeriodInDays,
+ String[] orphanWhitelist,
+ boolean dryRun) {
+ this.jobs = jobs;
+ this.maxSnapshotAgeHours =
+ maxSnapshotAgeHours > 0
+ ? maxSnapshotAgeHours
+ : (int) (TableProperties.MAX_SNAPSHOT_AGE_MS_DEFAULT / 1000 / 60 / 60); // 5 days
+ this.minSnapshotsToKeep =
+ minSnapshotsToKeep > 0
+ ? minSnapshotsToKeep
+ : TableProperties.MIN_SNAPSHOTS_TO_KEEP_DEFAULT; // 1
+ this.targetFileSizeMB = targetFileSizeMB >= 64 ? targetFileSizeMB : 512;
+ this.minInputFiles = minInputFiles > 0 ? minInputFiles : 5;
+ this.dataCompactionCandidateMinAgeInHours =
+ dataCompactionCandidateMinAgeInHours > 0
+ ? dataCompactionCandidateMinAgeInHours
+ : dataCompactionCandidateMinAgeInHours == -1 ? 0 : 3;
+ this.orphanFileRetentionPeriodInDays =
+ orphanFileRetentionPeriodInDays > 0
+ ? orphanFileRetentionPeriodInDays
+ : orphanFileRetentionPeriodInDays == -1 ? 0 : 3;
+ this.orphanWhitelist =
+ orphanWhitelist == null ? new String[] {"*/metadata/*", "*/data/*"} : orphanWhitelist;
+ this.dryRun = dryRun;
+ }
+}
diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/DataCompaction.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/DataCompaction.java
new file mode 100644
index 0000000..3656465
--- /dev/null
+++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/DataCompaction.java
@@ -0,0 +1,270 @@
+/*
+ * Copyright (c) 2025 Altinity Inc and/or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ */
+package com.altinity.ice.rest.catalog.internal.maintenance;
+
+import com.altinity.ice.cli.internal.cmd.Insert;
+import com.altinity.ice.cli.internal.iceberg.RecordComparator;
+import com.altinity.ice.cli.internal.iceberg.parquet.Metadata;
+import com.altinity.ice.internal.iceberg.io.SchemeFileIO;
+import com.altinity.ice.internal.strings.Strings;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.stream.Stream;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DeleteFiles;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileInfo;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.parquet.ParquetUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
+import org.apache.iceberg.relocated.com.google.common.collect.PeekingIterator;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO: avoid full file scans
+public record DataCompaction(
+ long targetFileSizeInBytes, int minInputFiles, long olderThanMillis, boolean dryRun)
+ implements MaintenanceJob {
+
+ private static final Logger logger = LoggerFactory.getLogger(OrphanCleanup.class);
+
+ public DataCompaction {
+ Preconditions.checkArgument(targetFileSizeInBytes > 0);
+ }
+
+ @Override
+ public void perform(Table table) throws IOException {
+ FileIO tableIO = table.io();
+
+ SchemeFileIO schemeFileIO;
+ if (tableIO instanceof SchemeFileIO) {
+ schemeFileIO = (SchemeFileIO) tableIO;
+ } else {
+ throw new UnsupportedOperationException("SchemeFileIO is required for S3 locations");
+ }
+
+ var cutOffTimestamp = System.currentTimeMillis() - olderThanMillis;
+
+ long sizeSoFar = 0;
+ Map> filesSoFarGroupedByPartition = new HashMap<>();
+ // TODO: avoid unnecessary work by skipping already considered partitions
+ try (CloseableIterable scan = table.newScan().planFiles()) {
+ for (FileScanTask planFile : scan) {
+ if (planFile.sizeBytes() >= targetFileSizeInBytes) {
+ continue;
+ }
+
+ if (olderThanMillis > 0) {
+ long createdAt = createdAt(schemeFileIO, planFile.file().location());
+ if (createdAt > cutOffTimestamp) {
+ continue;
+ }
+ }
+
+ sizeSoFar += planFile.sizeBytes();
+ StructLike partition = planFile.partition();
+ List filesSoFar =
+ filesSoFarGroupedByPartition.computeIfAbsent(
+ partition, structLike -> new ArrayList<>(5));
+ filesSoFar.add(planFile.file());
+
+ // TODO: support custom strategies; greedy is not optimal
+ // TODO: planFile.deletes().getFirst()
+
+ if (sizeSoFar > targetFileSizeInBytes) {
+ merge(table, filesSoFar, table.sortOrder(), partition, dryRun);
+ sizeSoFar = 0;
+ filesSoFar.clear();
+ }
+ }
+ }
+
+ filesSoFarGroupedByPartition.forEach(
+ (partition, dataFiles) -> {
+ if (dataFiles.size() >= minInputFiles) {
+ try {
+ merge(table, dataFiles, table.sortOrder(), partition, dryRun);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+ });
+ }
+
+ private static long createdAt(SchemeFileIO tableIO, String location) {
+ Iterator iter = tableIO.listPrefix(location).iterator();
+ if (!iter.hasNext()) {
+ throw new NotFoundException(location);
+ }
+ FileInfo next = iter.next();
+ if (!location.equals(next.location())) {
+ // FIXME
+ throw new IllegalStateException("listPrefix(" + location + ") matched " + next.location());
+ }
+ return next.createdAtMillis();
+ }
+
+ private void merge(
+ Table table,
+ List dataFiles,
+ SortOrder sortOrder,
+ StructLike partition,
+ boolean dryRun)
+ throws IOException {
+
+ FileIO tableIO = table.io();
+ Schema tableSchema = table.schema();
+ PartitionSpec tableSpec = table.spec();
+
+ Transaction tx = table.newTransaction();
+
+ String dstPath = Insert.DataFileNamingStrategy.defaultDataLocation(table);
+ String dstDataFile =
+ new Insert.DataFileNamingStrategy.Default(dstPath, System.currentTimeMillis() + "-")
+ .get("comp");
+
+ logger.info(
+ "Combining {} into {}",
+ dataFiles.stream().map(ContentFile::location).toList(),
+ dstDataFile);
+
+ if (dryRun) {
+ return;
+ }
+
+ OutputFile outputFile =
+ tableIO.newOutputFile(Strings.replacePrefix(dstDataFile, "s3://", "s3a://"));
+
+ Parquet.WriteBuilder writeBuilder =
+ Parquet.write(outputFile)
+ .overwrite(false)
+ .createWriterFunc(GenericParquetWriter::buildWriter)
+ .schema(tableSchema);
+
+ long dataFileSizeInBytes = 0;
+
+ if (sortOrder.isSorted()) {
+ // k-way merge sort
+
+ List> inputs = new ArrayList<>(dataFiles.size());
+ for (DataFile dataFile : dataFiles) {
+ Parquet.ReadBuilder readBuilder =
+ Parquet.read(tableIO.newInputFile(dataFile.location()))
+ .createReaderFunc(s -> GenericParquetReaders.buildReader(tableSchema, s))
+ .project(tableSchema)
+ .reuseContainers();
+ inputs.add(readBuilder.build());
+ }
+ List> iterators =
+ inputs.stream().map(CloseableIterable::iterator).map(Iterators::peekingIterator).toList();
+
+ PriorityQueue> heap =
+ new PriorityQueue<>(
+ Comparator.comparing(
+ PeekingIterator::peek, new RecordComparator(sortOrder, tableSchema)));
+
+ for (PeekingIterator it : iterators) {
+ if (it.hasNext()) heap.add(it);
+ }
+
+ FileAppender writer = null;
+ try {
+ writer = writeBuilder.build();
+
+ while (!heap.isEmpty()) {
+ PeekingIterator it = heap.poll();
+ Record next = it.next();
+ writer.add(next);
+ if (it.hasNext()) heap.add(it);
+ }
+ } finally {
+ if (writer != null) {
+ writer.close();
+ }
+ }
+
+ dataFileSizeInBytes = writer.length();
+ } else {
+
+ FileAppender writer = null;
+ try {
+ writer = writeBuilder.build();
+
+ for (DataFile dataFile : dataFiles) {
+ Parquet.ReadBuilder rb =
+ Parquet.read(tableIO.newInputFile(dataFile.location()))
+ .createReaderFunc(s -> GenericParquetReaders.buildReader(tableSchema, s))
+ .project(tableSchema)
+ .reuseContainers();
+
+ try (CloseableIterable r = rb.build()) {
+ writer.addAll(r);
+ }
+ }
+ } finally {
+ if (writer != null) {
+ writer.close();
+ }
+ }
+ dataFileSizeInBytes = writer.length();
+ }
+
+ AppendFiles appendOp = tx.newAppend();
+
+ ParquetMetadata metadata = Metadata.read(outputFile.toInputFile());
+
+ MetricsConfig metricsConfig = MetricsConfig.forTable(table);
+ Metrics metrics = ParquetUtil.footerMetrics(metadata, Stream.empty(), metricsConfig);
+ DataFile dataFileObj =
+ new DataFiles.Builder(tableSpec)
+ .withPath(outputFile.location())
+ .withFormat("PARQUET")
+ .withFileSizeInBytes(dataFileSizeInBytes)
+ .withMetrics(metrics)
+ .withPartition(partition)
+ .build();
+ appendOp.appendFile(dataFileObj);
+ appendOp.commit();
+
+ DeleteFiles delOp = tx.newDelete();
+ dataFiles.forEach(delOp::deleteFile);
+ delOp.commit();
+
+ tx.commitTransaction();
+ }
+}
diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceJob.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceJob.java
new file mode 100644
index 0000000..3a05b33
--- /dev/null
+++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceJob.java
@@ -0,0 +1,18 @@
+/*
+ * Copyright (c) 2025 Altinity Inc and/or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ */
+package com.altinity.ice.rest.catalog.internal.maintenance;
+
+import java.io.IOException;
+import org.apache.iceberg.Table;
+
+public interface MaintenanceJob {
+
+ void perform(Table table) throws IOException;
+}
diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceRunner.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceRunner.java
new file mode 100644
index 0000000..e47dd33
--- /dev/null
+++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceRunner.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2025 Altinity Inc and/or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ */
+package com.altinity.ice.rest.catalog.internal.maintenance;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+public record MaintenanceRunner(Catalog catalog, Collection jobs) {
+
+ private static final Logger logger = LoggerFactory.getLogger(MaintenanceRunner.class);
+
+ public void run() throws IOException {
+ List namespaces;
+ if (catalog instanceof SupportsNamespaces nsCatalog) {
+ namespaces = nsCatalog.listNamespaces();
+ } else {
+ throw new UnsupportedOperationException("Catalog does not support namespace operations");
+ }
+
+ logger.info("Performing catalog maintenance");
+ for (Namespace namespace : namespaces) {
+ List tables = catalog.listTables(namespace);
+ for (TableIdentifier tableIdent : tables) {
+ Table table = catalog.loadTable(tableIdent);
+ logger.info("Performing maintenance on table: {}", table.name());
+ MDC.put("msgContext", table.name() + ": ");
+ try {
+ for (MaintenanceJob job : jobs) {
+ job.perform(table);
+ }
+ } finally {
+ MDC.remove("msgContext");
+ }
+ }
+ }
+ logger.info("Catalog maintenance completed");
+ }
+}
diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java
index 954fd66..2840415 100644
--- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java
+++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/MaintenanceScheduler.java
@@ -11,38 +11,30 @@
import com.github.shyiko.skedule.Schedule;
import java.time.ZonedDateTime;
-import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.catalog.Catalog;
-import org.apache.iceberg.catalog.Namespace;
-import org.apache.iceberg.catalog.SupportsNamespaces;
-import org.apache.iceberg.catalog.TableIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MaintenanceScheduler {
private static final Logger logger = LoggerFactory.getLogger(MaintenanceScheduler.class);
- private final Catalog catalog;
+ private final MaintenanceRunner maintenanceRunner;
private final AtomicBoolean isMaintenanceMode = new AtomicBoolean(false);
private final ScheduledExecutorService executor;
private final Schedule schedule;
private final Object taskLock = new Object();
private ScheduledFuture> currentTask;
- private final Integer snapshotExpirationDays;
- public MaintenanceScheduler(Catalog catalog, String schedule, int snapshotExpirationDays) {
- this.catalog = catalog;
+ public MaintenanceScheduler(String schedule, MaintenanceRunner maintenanceRunner) {
+ this.maintenanceRunner = maintenanceRunner;
this.executor = new ScheduledThreadPoolExecutor(1);
((ScheduledThreadPoolExecutor) executor).setRemoveOnCancelPolicy(true);
this.schedule = Schedule.parse(schedule);
- this.snapshotExpirationDays = snapshotExpirationDays;
}
public void startScheduledMaintenance() {
@@ -91,40 +83,7 @@ public void performMaintenance() {
logger.info("Starting scheduled maintenance task");
setMaintenanceMode(true);
- if (catalog != null) {
- logger.info("Performing maintenance on catalog: {}", catalog.name());
- List namespaces;
- if (catalog instanceof SupportsNamespaces nsCatalog) {
- namespaces = nsCatalog.listNamespaces();
- for (Namespace ns : namespaces) {
- logger.debug("Namespace: " + ns);
- }
- } else {
- logger.error("Catalog does not support namespace operations.");
- return;
- }
-
- for (Namespace namespace : namespaces) {
- List tables = catalog.listTables(namespace);
- for (TableIdentifier tableIdent : tables) {
- long olderThanMillis =
- System.currentTimeMillis() - TimeUnit.DAYS.toMillis(snapshotExpirationDays);
- Table table = catalog.loadTable(tableIdent);
-
- // Check if table has any snapshots before performing maintenance
- if (table.currentSnapshot() == null) {
- logger.warn("Table {} has no snapshots, skipping maintenance", tableIdent);
- continue;
- }
-
- table.rewriteManifests().rewriteIf(manifest -> true).commit();
- table.expireSnapshots().expireOlderThan(olderThanMillis).commit();
- }
- }
- logger.info("Maintenance operations completed for catalog: {}", catalog.name());
- } else {
- logger.warn("No catalog available for maintenance operations");
- }
+ maintenanceRunner.run();
logger.info("Scheduled maintenance task completed successfully");
} catch (Exception e) {
diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/ManifestCompaction.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/ManifestCompaction.java
new file mode 100644
index 0000000..4f48823
--- /dev/null
+++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/ManifestCompaction.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright (c) 2025 Altinity Inc and/or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ */
+package com.altinity.ice.rest.catalog.internal.maintenance;
+
+import org.apache.iceberg.RewriteManifests;
+import org.apache.iceberg.Table;
+
+public record ManifestCompaction(boolean dryRun) implements MaintenanceJob {
+
+ @Override
+ public void perform(Table table) {
+ // FIXME: generates new files even where there are no changes
+ RewriteManifests op = table.rewriteManifests();
+ if (dryRun) {
+ op.apply();
+ } else {
+ op.commit();
+ }
+ }
+}
diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/OrphanCleanup.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/OrphanCleanup.java
new file mode 100644
index 0000000..3e46543
--- /dev/null
+++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/OrphanCleanup.java
@@ -0,0 +1,177 @@
+/*
+ * Copyright (c) 2025 Altinity Inc and/or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ */
+package com.altinity.ice.rest.catalog.internal.maintenance;
+
+import com.altinity.ice.internal.iceberg.io.SchemeFileIO;
+import com.altinity.ice.internal.io.Matcher;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Stream;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.PartitionStatisticsFile;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StatisticsFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.FileInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public record OrphanCleanup(long olderThanMillis, Matcher whitelist, boolean dryRun)
+ implements MaintenanceJob {
+
+ private static final Logger logger = LoggerFactory.getLogger(OrphanCleanup.class);
+
+ @Override
+ public void perform(Table table) throws IOException {
+ String location = table.location();
+
+ logger.info("Searching for orphaned files at {}", location);
+
+ Set orphanedFiles = listOrphanedFiles(table, location, olderThanMillis);
+
+ int excluded = 0;
+ Iterator iterator = orphanedFiles.iterator();
+ while (iterator.hasNext()) {
+ String orphanedFile = iterator.next();
+ if (!whitelist.test(orphanedFile)) {
+ iterator.remove();
+ excluded++;
+ }
+ }
+
+ logger.info("Found {} orphaned file(s) ({} excluded)", orphanedFiles.size(), excluded);
+
+ if (orphanedFiles.isEmpty()) {
+ return;
+ }
+
+ if (!dryRun) {
+ FileIO tableIO = table.io();
+
+ int numThreads = Math.min(8, orphanedFiles.size());
+ try (ExecutorService executor = Executors.newFixedThreadPool(numThreads)) {
+ orphanedFiles.forEach(
+ file ->
+ executor.submit(
+ () -> {
+ try {
+ logger.info("Deleting {}", file);
+ tableIO.deleteFile(file);
+ return file;
+ } catch (Exception e) {
+ logger.warn("Failed to delete file {}", file, e);
+ return null;
+ }
+ }));
+ }
+ } else {
+ orphanedFiles.stream().sorted().forEach(file -> logger.info("To be deleted: {}", file));
+ }
+ }
+
+ public Set listOrphanedFiles(Table table, String location, long olderThanMillis) {
+ FileIO tableIO = table.io();
+
+ SchemeFileIO schemeFileIO;
+ if (tableIO instanceof SchemeFileIO) {
+ schemeFileIO = (SchemeFileIO) tableIO;
+ } else {
+ throw new UnsupportedOperationException("SchemeFileIO is required for S3 locations");
+ }
+
+ Set allFiles = new HashSet<>();
+
+ var cutOffTimestamp = System.currentTimeMillis() - olderThanMillis;
+
+ Iterable fileInfos = schemeFileIO.listPrefix(location);
+ for (FileInfo fileInfo : fileInfos) {
+ if (olderThanMillis == 0 || fileInfo.createdAtMillis() < cutOffTimestamp) {
+ allFiles.add(fileInfo.location());
+ }
+ }
+
+ Set knownFiles = listKnownFiles(table);
+ allFiles.removeAll(knownFiles);
+
+ return allFiles;
+ }
+
+ private Set listKnownFiles(Table table) {
+ Set knownFiles = new HashSet<>();
+
+ FileIO tableIO = table.io();
+
+ Deque metadataFilesToScan = new ArrayDeque<>();
+ Set scannedMetadataFiles = new HashSet<>();
+
+ TableMetadata current = ((BaseTable) table).operations().current();
+ metadataFilesToScan.add(current.metadataFileLocation());
+
+ while (!metadataFilesToScan.isEmpty()) {
+ String metadataFileLocation = metadataFilesToScan.poll();
+ if (!scannedMetadataFiles.add(metadataFileLocation)) {
+ continue; // already scanned
+ }
+
+ knownFiles.add(metadataFileLocation);
+
+ TableMetadata meta = TableMetadataParser.read(tableIO, metadataFileLocation);
+
+ for (Snapshot snapshot : meta.snapshots()) {
+ knownFiles.add(snapshot.manifestListLocation());
+
+ Stream.concat(
+ snapshot.dataManifests(tableIO).stream(),
+ snapshot.deleteManifests(tableIO).stream())
+ .forEach(
+ manifest -> {
+ knownFiles.add(manifest.path());
+
+ try (CloseableIterable files = ManifestFiles.read(manifest, tableIO)) {
+ for (DataFile file : files) {
+ knownFiles.add(file.location());
+ }
+ } catch (NotFoundException e) {
+ // ignore
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ });
+ }
+
+ for (StatisticsFile statsFile : meta.statisticsFiles()) {
+ knownFiles.add(statsFile.path());
+ }
+
+ for (PartitionStatisticsFile psFile : meta.partitionStatisticsFiles()) {
+ knownFiles.add(psFile.path());
+ }
+
+ for (TableMetadata.MetadataLogEntry prev : meta.previousFiles()) {
+ metadataFilesToScan.add(prev.file());
+ }
+ }
+ return knownFiles;
+ }
+}
diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/SnapshotCleanup.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/SnapshotCleanup.java
new file mode 100644
index 0000000..f5c5967
--- /dev/null
+++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/SnapshotCleanup.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2025 Altinity Inc and/or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ */
+package com.altinity.ice.rest.catalog.internal.maintenance;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.ExpireSnapshots;
+import org.apache.iceberg.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public record SnapshotCleanup(long maxSnapshotAgeHours, int minSnapshotsToKeep, boolean dryRun)
+ implements MaintenanceJob {
+
+ private static final Logger logger = LoggerFactory.getLogger(SnapshotCleanup.class);
+
+ @Override
+ public void perform(Table table) {
+ if (table.currentSnapshot() == null) {
+ logger.warn("Table {} has no snapshots, skipping maintenance", table.name());
+ return;
+ }
+
+ ExpireSnapshots op = table.expireSnapshots();
+
+ long ttlInMs = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(maxSnapshotAgeHours);
+ if (ttlInMs > 0) {
+ op.expireOlderThan(ttlInMs);
+ }
+ if (minSnapshotsToKeep > 0) {
+ op.retainLast(minSnapshotsToKeep);
+ }
+ if (dryRun) {
+ op.apply();
+ } else {
+ op.commit();
+ }
+ }
+}
diff --git a/ice/pom.xml b/ice/pom.xml
index 49aa73f..feb357a 100644
--- a/ice/pom.xml
+++ b/ice/pom.xml
@@ -155,6 +155,17 @@
+
+ software.amazon.awssdk
+ sqs
+ ${aws.java.sdk.version}
+
+
+ org.slf4j
+ slf4j-api
+
+
+
software.amazon.awssdk
@@ -412,6 +423,50 @@
+
+
+ org.eclipse.jetty
+ jetty-server
+ ${jetty.version}
+
+
+ org.slf4j
+ slf4j-api
+
+
+
+
+ org.eclipse.jetty
+ jetty-servlet
+ ${jetty.version}
+
+
+ org.slf4j
+ slf4j-api
+
+
+
+
+ jakarta.servlet
+ jakarta.servlet-api
+ ${jakarta.version}
+
+
+
+ io.prometheus
+ prometheus-metrics-core
+ ${prometheus.version}
+
+
+ io.prometheus
+ prometheus-metrics-instrumentation-jvm
+ ${prometheus.version}
+
+
+ io.prometheus
+ prometheus-metrics-exporter-servlet-jakarta
+ ${prometheus.version}
+
info.picocli
diff --git a/ice/src/main/java/com/altinity/ice/cli/Main.java b/ice/src/main/java/com/altinity/ice/cli/Main.java
index 2444fe8..6e5a9f5 100644
--- a/ice/src/main/java/com/altinity/ice/cli/Main.java
+++ b/ice/src/main/java/com/altinity/ice/cli/Main.java
@@ -18,15 +18,18 @@
import com.altinity.ice.cli.internal.cmd.DeleteTable;
import com.altinity.ice.cli.internal.cmd.Describe;
import com.altinity.ice.cli.internal.cmd.Insert;
+import com.altinity.ice.cli.internal.cmd.InsertWatch;
import com.altinity.ice.cli.internal.cmd.Scan;
import com.altinity.ice.cli.internal.config.Config;
import com.altinity.ice.cli.internal.iceberg.rest.RESTCatalogFactory;
+import com.altinity.ice.internal.jetty.DebugServer;
import com.altinity.ice.internal.picocli.VersionProvider;
import com.altinity.ice.internal.strings.Strings;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import io.prometheus.metrics.instrumentation.jvm.JvmMetrics;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
@@ -36,9 +39,11 @@
import java.util.List;
import java.util.Scanner;
import java.util.stream.Collectors;
+import org.apache.curator.shaded.com.google.common.net.HostAndPort;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.rest.RESTCatalog;
+import org.eclipse.jetty.server.Server;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.AutoComplete;
@@ -270,11 +275,27 @@ void insert(
description =
"Sort order, e.g. [{\"column\":\"name\", \"desc\":false, \"nullFirst\":false}]")
String sortOrderJson,
+ @CommandLine.Option(
+ names = {"--assume-sorted"},
+ description = "Skip data sorting. Assume it's already sorted.")
+ boolean assumeSorted,
@CommandLine.Option(
names = {"--thread-count"},
description = "Number of threads to use for inserting data",
defaultValue = "-1")
- int threadCount)
+ int threadCount,
+ @CommandLine.Option(
+ names = {"--watch"},
+ description = "Event queue. Supported: AWS SQS")
+ String watch,
+ @CommandLine.Option(
+ names = {"--watch-fire-once"},
+ description = "")
+ boolean watchFireOnce,
+ @CommandLine.Option(
+ names = {"--watch-debug-addr"},
+ description = "")
+ String watchDebugAddr)
throws IOException, InterruptedException {
if (s3NoSignRequest && s3CopyObject) {
throw new UnsupportedOperationException(
@@ -305,7 +326,9 @@ void insert(
}
TableIdentifier tableId = TableIdentifier.parse(name);
- if (createTableIfNotExists) {
+ boolean watchMode = !Strings.isNullOrEmpty(watch);
+
+ if (createTableIfNotExists && !watchMode) {
CreateTable.run(
catalog,
tableId,
@@ -315,23 +338,46 @@ void insert(
s3NoSignRequest,
partitions,
sortOrders);
+ } // delayed in watch mode
+
+ Insert.Options options =
+ Insert.Options.builder()
+ .dataFileNamingStrategy(dataFileNamingStrategy)
+ .skipDuplicates(skipDuplicates)
+ .noCommit(noCommit)
+ .noCopy(noCopy)
+ .forceNoCopy(forceNoCopy)
+ .forceTableAuth(forceTableAuth)
+ .s3NoSignRequest(s3NoSignRequest)
+ .s3CopyObject(s3CopyObject)
+ .assumeSorted(assumeSorted)
+ .retryListFile(retryList)
+ .partitionList(partitions)
+ .sortOrderList(sortOrders)
+ .threadCount(
+ threadCount < 1 ? Runtime.getRuntime().availableProcessors() : threadCount)
+ .build();
+
+ if (!watchMode) {
+ Insert.run(catalog, tableId, dataFiles, options);
+ } else {
+ if (!Strings.isNullOrEmpty(watchDebugAddr)) {
+ JvmMetrics.builder().register();
+
+ HostAndPort debugHostAndPort = HostAndPort.fromString(watchDebugAddr);
+ Server debugServer =
+ DebugServer.create(debugHostAndPort.getHost(), debugHostAndPort.getPort());
+ try {
+ debugServer.start();
+ } catch (Exception e) {
+ throw new RuntimeException(e); // TODO: find a better one
+ }
+ logger.info("Serving http://{}/{metrics,healtz,livez,readyz}", debugHostAndPort);
+ }
+
+ InsertWatch.run(
+ catalog, tableId, dataFiles, watch, watchFireOnce, createTableIfNotExists, options);
}
- Insert.run(
- catalog,
- tableId,
- dataFiles,
- dataFileNamingStrategy,
- skipDuplicates,
- noCommit,
- noCopy,
- forceNoCopy,
- forceTableAuth,
- s3NoSignRequest,
- s3CopyObject,
- retryList,
- partitions,
- sortOrders,
- threadCount < 1 ? Runtime.getRuntime().availableProcessors() : threadCount);
}
}
@@ -387,10 +433,14 @@ void deleteTable(
@CommandLine.Option(
names = {"-p"},
description = "Ignore not found")
- boolean ignoreNotFound)
+ boolean ignoreNotFound,
+ @CommandLine.Option(
+ names = {"--purge"},
+ description = "Delete data")
+ boolean purge)
throws IOException {
try (RESTCatalog catalog = loadCatalog()) {
- DeleteTable.run(catalog, TableIdentifier.parse(name), ignoreNotFound);
+ DeleteTable.run(catalog, TableIdentifier.parse(name), ignoreNotFound, purge);
}
}
diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/DeleteTable.java b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/DeleteTable.java
index 2ca9ff6..0772ee6 100644
--- a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/DeleteTable.java
+++ b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/DeleteTable.java
@@ -22,9 +22,10 @@ public final class DeleteTable {
private DeleteTable() {}
- public static void run(RESTCatalog catalog, TableIdentifier nsTable, boolean ignoreNotFound)
+ public static void run(
+ RESTCatalog catalog, TableIdentifier nsTable, boolean ignoreNotFound, boolean purge)
throws IOException {
- var purge = true; // FIXME
+ // TODO: exclude files outside of table location during purge
if (!catalog.dropTable(nsTable, purge)) {
if (!ignoreNotFound) {
throw new NotFoundException(String.format("Table %s not found", nsTable));
diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java
index 9e88bdc..353ece4 100644
--- a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java
+++ b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java
@@ -20,15 +20,9 @@
import com.altinity.ice.cli.internal.s3.S3;
import com.altinity.ice.internal.strings.Strings;
import java.io.IOException;
-import java.time.Instant;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.OffsetDateTime;
-import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -51,7 +45,6 @@
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.MetricsConfig;
-import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.ReplaceSortOrder;
@@ -64,7 +57,6 @@
import org.apache.iceberg.aws.s3.S3FileIO;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericAppenderFactory;
-import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
@@ -100,41 +92,13 @@ private Insert() {}
// TODO: refactor
public static void run(
- RESTCatalog catalog,
- TableIdentifier nsTable,
- String[] files,
- DataFileNamingStrategy.Name dataFileNamingStrategy,
- boolean skipDuplicates,
- boolean noCommit,
- boolean noCopy,
- boolean forceNoCopy,
- boolean forceTableAuth,
- boolean s3NoSignRequest,
- boolean s3CopyObject,
- @Nullable String retryListFile,
- @Nullable List partitionList,
- @Nullable List sortOrderList,
- int threadCount)
+ RESTCatalog catalog, TableIdentifier nsTable, String[] files, Options options)
throws IOException, InterruptedException {
if (files.length == 0) {
// no work to be done
return;
}
- Options options =
- Options.builder()
- .skipDuplicates(skipDuplicates)
- .noCommit(noCommit)
- .noCopy(noCopy)
- .forceNoCopy(forceNoCopy)
- .forceTableAuth(forceTableAuth)
- .s3NoSignRequest(s3NoSignRequest)
- .s3CopyObject(s3CopyObject)
- .threadCount(threadCount)
- .build();
- // FIXME: refactor: move to builder
- final Options finalOptions =
- options.forceNoCopy() ? options.toBuilder().noCopy(true).build() : options;
Table table = catalog.loadTable(nsTable);
// Create transaction and pass it to updatePartitionAndSortOrderMetadata
@@ -142,14 +106,14 @@ public static void run(
try (FileIO tableIO = table.io()) {
final Supplier s3ClientSupplier;
- if (finalOptions.forceTableAuth()) {
+ if (options.forceTableAuth()) {
if (!(tableIO instanceof S3FileIO)) {
throw new UnsupportedOperationException(
"--force-table-auth is currently only supported for s3:// tables");
}
s3ClientSupplier = ((S3FileIO) tableIO)::client;
} else {
- s3ClientSupplier = () -> S3.newClient(finalOptions.s3NoSignRequest());
+ s3ClientSupplier = () -> S3.newClient(options.s3NoSignRequest());
}
Lazy s3ClientLazy = new Lazy<>(s3ClientSupplier);
try {
@@ -186,15 +150,16 @@ public static void run(
var tableEmpty = tableDataFiles.isEmpty();
// TODO: move to update-table
var tablePartitionSpec =
- syncTablePartitionSpec(txn, table, tableSchema, tableEmpty, partitionList);
- var tableSortOrder = syncTableSortOrder(txn, table, tableSchema, tableEmpty, sortOrderList);
+ syncTablePartitionSpec(txn, table, tableSchema, tableEmpty, options.partitionList);
+ var tableSortOrder =
+ syncTableSortOrder(txn, table, tableSchema, tableEmpty, options.sortOrderList);
if (tablePartitionSpec.isPartitioned() || tableSortOrder.isSorted()) {
updateWriteDistributionModeIfNotSet(txn, table);
}
String dstPath = DataFileNamingStrategy.defaultDataLocation(table);
DataFileNamingStrategy dstDataFileSource =
- switch (dataFileNamingStrategy) {
+ switch (options.dataFileNamingStrategy) {
case DEFAULT ->
new DataFileNamingStrategy.Default(dstPath, System.currentTimeMillis() + "-");
case PRESERVE_ORIGINAL -> new DataFileNamingStrategy.PreserveOriginal(dstPath);
@@ -205,12 +170,12 @@ public static void run(
try (FileIO inputIO = Input.newIO(filesExpanded.getFirst(), table, s3ClientLazy);
RetryLog retryLog =
- retryListFile != null && !retryListFile.isEmpty()
- ? new RetryLog(retryListFile)
+ options.retryListFile != null && !options.retryListFile.isEmpty()
+ ? new RetryLog(options.retryListFile)
: null) {
boolean atLeastOneFileAppended = false;
- int numThreads = Math.min(finalOptions.threadCount(), filesExpanded.size());
+ int numThreads = Math.min(options.threadCount(), filesExpanded.size());
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
try {
var futures = new ArrayList>>();
@@ -234,11 +199,11 @@ public static void run(
tableIO,
inputIO,
tableDataFiles,
- finalOptions,
+ options,
s3ClientLazy,
dstDataFileSource,
tableSchemaCopy,
- dataFileNamingStrategy,
+ options.dataFileNamingStrategy,
file);
} catch (Exception e) {
if (retryLog != null) {
@@ -271,7 +236,7 @@ public static void run(
executor.awaitTermination(1, TimeUnit.MINUTES);
}
- if (!finalOptions.noCommit()) {
+ if (!options.noCommit()) {
// TODO: log
if (atLeastOneFileAppended) {
appendOp.commit();
@@ -281,9 +246,10 @@ public static void run(
if (retryLog != null) {
retryLog.commit();
}
-
- // Commit transaction.
- txn.commitTransaction();
+ if (atLeastOneFileAppended) {
+ // Commit transaction.
+ txn.commitTransaction();
+ }
} else {
logger.warn("Table commit skipped (--no-commit)");
}
@@ -366,8 +332,8 @@ private static void updateWriteDistributionModeIfNotSet(Transaction txn, Table t
private static List processFile(
RESTCatalog catalog,
Table table,
- PartitionSpec tableSpec,
- SortOrder tableOrderSpec,
+ PartitionSpec partitionSpec,
+ SortOrder sortOrder,
FileIO tableIO,
FileIO inputIO,
Set tableDataFiles,
@@ -397,13 +363,41 @@ private static List processFile(
InputFile inputFile = Input.newFile(file, catalog, inputIO == null ? tableIO : inputIO);
ParquetMetadata metadata = Metadata.read(inputFile);
+ boolean sorted = options.assumeSorted;
+ if (!sorted && sortOrder.isSorted()) {
+ sorted = Sorting.isSorted(inputFile, tableSchema, sortOrder);
+ if (!sorted) {
+ if (options.noCopy || options.s3CopyObject) {
+ throw new BadRequestException(
+ String.format("%s does not appear to be sorted", inputFile.location()));
+ }
+ logger.warn(
+ "{} does not appear to be sorted. Falling back to full scan (slow)",
+ inputFile.location());
+ }
+ }
+
+ PartitionKey partitionKey = null;
+ if (partitionSpec.isPartitioned()) {
+ partitionKey = Partitioning.inferPartitionKey(metadata, partitionSpec);
+ if (partitionKey == null) {
+ if (options.noCopy || options.s3CopyObject) {
+ throw new BadRequestException(
+ String.format(
+ "Cannot infer partition key of %s from the metadata", inputFile.location()));
+ }
+ logger.warn(
+ "{} does not appear to be partitioned. Falling back to full scan (slow)",
+ inputFile.location());
+ }
+ }
+
MessageType type = metadata.getFileMetaData().getSchema();
Schema fileSchema = ParquetSchemaUtil.convert(type); // nameMapping applied (when present)
if (!sameSchema(table, fileSchema)) {
throw new BadRequestException(
String.format("%s's schema doesn't match table's schema", file));
}
- // assuming datafiles can be anywhere when table.location() is empty
var noCopyPossible = file.startsWith(table.location()) || options.forceNoCopy();
// TODO: check before uploading anything
if (options.noCopy() && !noCopyPossible) {
@@ -440,21 +434,23 @@ private static List processFile(
s3ClientLazy.getValue().copyObject(copyReq);
dataFileSizeInBytes = inputFile.getLength();
dataFile = dstDataFile;
- } else if (tableSpec.isPartitioned()) {
- return copyParquetWithPartition(
- file, tableSchema, tableSpec, tableOrderSpec, tableIO, inputFile, dstDataFileSource);
- } else if (tableOrderSpec.isSorted()) {
+ } else if (partitionSpec.isPartitioned() && partitionKey == null) {
+ return copyPartitionedAndSorted(
+ file, tableSchema, partitionSpec, sortOrder, tableIO, inputFile, dstDataFileSource);
+ } else if (sortOrder.isSorted() && !sorted) {
return Collections.singletonList(
- copyParquetWithSortOrder(
+ copySorted(
file,
- Strings.replacePrefix(dstDataFileSource.get(file), "s3://", "s3a://"),
+ dstDataFileSource.get(file),
tableSchema,
- tableSpec,
- tableOrderSpec,
+ partitionSpec,
+ sortOrder,
tableIO,
inputFile,
- dataFileNamingStrategy));
+ dataFileNamingStrategy,
+ partitionKey));
} else {
+ // Table isn't partitioned or sorted. Copy as is.
String dstDataFile = dstDataFileSource.get(file);
if (checkNotExists.apply(dstDataFile)) {
return Collections.emptyList();
@@ -462,13 +458,14 @@ private static List processFile(
OutputFile outputFile =
tableIO.newOutputFile(Strings.replacePrefix(dstDataFile, "s3://", "s3a://"));
// TODO: support transferTo below (note that compression, etc. might be different)
- // try (var d = outputFile.create()) { try (var s = inputFile.newStream()) {
- // s.transferTo(d); }}
+ // try (var d = outputFile.create()) {
+ // try (var s = inputFile.newStream()) { s.transferTo(d); }
+ // }
Parquet.ReadBuilder readBuilder =
Parquet.read(inputFile)
.createReaderFunc(s -> GenericParquetReaders.buildReader(tableSchema, s))
- .project(tableSchema); // TODO: ?
- // TODO: reuseContainers?
+ .project(tableSchema)
+ .reuseContainers();
Parquet.WriteBuilder writeBuilder =
Parquet.write(outputFile)
.overwrite(dataFileNamingStrategy == DataFileNamingStrategy.Name.PRESERVE_ORIGINAL)
@@ -484,96 +481,60 @@ private static List processFile(
MetricsConfig metricsConfig = MetricsConfig.forTable(table);
Metrics metrics = ParquetUtil.footerMetrics(metadata, Stream.empty(), metricsConfig);
DataFile dataFileObj =
- new DataFiles.Builder(tableSpec)
+ new DataFiles.Builder(partitionSpec)
.withPath(dataFile)
.withFormat("PARQUET")
.withFileSizeInBytes(dataFileSizeInBytes)
.withMetrics(metrics)
+ .withPartition(partitionKey)
.build();
return Collections.singletonList(dataFileObj);
}
- private static List copyParquetWithPartition(
+ private static List copyPartitionedAndSorted(
String file,
Schema tableSchema,
- PartitionSpec tableSpec,
- SortOrder tableOrderSpec,
+ PartitionSpec partitionSpec,
+ SortOrder sortOrder,
FileIO tableIO,
InputFile inputFile,
DataFileNamingStrategy dstDataFileSource)
throws IOException {
- logger.info("{}: partitioning{}", file, tableOrderSpec.isSorted() ? "+sorting" : "");
+ logger.info("{}: partitioning{}", file, sortOrder.isSorted() ? "+sorting" : "");
- GenericAppenderFactory appenderFactory = new GenericAppenderFactory(tableSchema, tableSpec);
+ GenericAppenderFactory appenderFactory = new GenericAppenderFactory(tableSchema, partitionSpec);
- PartitionKey partitionKeyMold = new PartitionKey(tableSpec, tableSchema);
- Map> partitionedRecords = new HashMap<>();
-
- Parquet.ReadBuilder readBuilder =
- Parquet.read(inputFile)
- .createReaderFunc(s -> GenericParquetReaders.buildReader(tableSchema, s))
- .project(tableSchema);
-
- try (CloseableIterable records = readBuilder.build()) {
- for (Record record : records) {
- Record partitionRecord = GenericRecord.create(tableSchema);
- for (Types.NestedField field : tableSchema.columns()) {
- partitionRecord.setField(field.name(), record.getField(field.name()));
- }
- for (PartitionField field : tableSpec.fields()) {
- String fieldName = tableSchema.findField(field.sourceId()).name();
- Object value = partitionRecord.getField(fieldName);
- if (value != null) {
- String transformName = field.transform().toString();
- switch (transformName) {
- case "day", "month", "hour":
- long micros = toPartitionMicros(value);
- partitionRecord.setField(fieldName, micros);
- break;
- case "identity":
- break;
- default:
- throw new UnsupportedOperationException(
- "unexpected transform value: " + transformName);
- }
- }
- }
-
- // Partition based on converted values
- partitionKeyMold.partition(partitionRecord);
- PartitionKey partitionKey = partitionKeyMold.copy();
-
- // Store the original record (without converted timestamp fields)
- partitionedRecords.computeIfAbsent(partitionKey, k -> new ArrayList<>()).add(record);
- }
- }
-
- List dataFiles = new ArrayList<>();
+ // FIXME: stream to reduce memory usage
+ Map> partitionedRecords =
+ Partitioning.partition(inputFile, tableSchema, partitionSpec);
// Create a comparator based on table.sortOrder()
RecordComparator comparator =
- tableOrderSpec.isSorted() ? new RecordComparator(tableOrderSpec, tableSchema) : null;
+ sortOrder.isSorted() ? new RecordComparator(sortOrder, tableSchema) : null;
+
+ List dataFiles = new ArrayList<>(partitionedRecords.size());
// Write sorted records for each partition
for (Map.Entry> entry : partitionedRecords.entrySet()) {
PartitionKey partKey = entry.getKey();
List records = entry.getValue();
+ entry.setValue(List.of()); // allow "records" to be gc-ed once w're done with them
// Sort records within the partition
if (comparator != null) {
records.sort(comparator);
}
- String dstDataFile = dstDataFileSource.get(tableSpec, partKey, file);
- OutputFile outFile =
+ String dstDataFile = dstDataFileSource.get(partitionSpec, partKey, file);
+ OutputFile outputFile =
tableIO.newOutputFile(Strings.replacePrefix(dstDataFile, "s3://", "s3a://"));
long fileSizeInBytes;
Metrics metrics;
try (FileAppender appender =
- appenderFactory.newAppender(outFile, FileFormat.PARQUET)) {
- for (Record rec : records) {
- appender.add(rec);
+ appenderFactory.newAppender(outputFile, FileFormat.PARQUET)) {
+ for (Record record : records) {
+ appender.add(record);
}
appender.close();
fileSizeInBytes = appender.length();
@@ -582,59 +543,35 @@ private static List copyParquetWithPartition(
logger.info("{}: adding data file: {}", file, dstDataFile);
dataFiles.add(
- DataFiles.builder(tableSpec)
- .withPath(outFile.location())
+ DataFiles.builder(partitionSpec)
+ .withPath(outputFile.location())
.withFileSizeInBytes(fileSizeInBytes)
- .withPartition(partKey)
.withFormat(FileFormat.PARQUET)
.withMetrics(metrics)
+ .withPartition(partKey)
.build());
}
return dataFiles;
}
- public static long toPartitionMicros(Object tsValue) {
- switch (tsValue) {
- case Long l -> {
- return l;
- }
- case String s -> {
- LocalDateTime ldt = LocalDateTime.parse(s);
- return ldt.toInstant(ZoneOffset.UTC).toEpochMilli() * 1000L;
- }
- case LocalDate localDate -> {
- return localDate.atStartOfDay().toInstant(ZoneOffset.UTC).toEpochMilli() * 1000L;
- }
- case LocalDateTime localDateTime -> {
- return localDateTime.toInstant(ZoneOffset.UTC).toEpochMilli() * 1000L;
- }
- case OffsetDateTime offsetDateTime -> {
- return offsetDateTime.toInstant().toEpochMilli() * 1000L;
- }
- case Instant instant -> {
- return instant.toEpochMilli() * 1000L;
- }
- default ->
- throw new UnsupportedOperationException("unexpected value type: " + tsValue.getClass());
- }
- }
-
- private static DataFile copyParquetWithSortOrder(
+ private static DataFile copySorted(
String file,
String dstDataFile,
Schema tableSchema,
- PartitionSpec tableSpec,
- SortOrder tableOrderSpec,
+ PartitionSpec partitionSpec,
+ SortOrder sortOrder,
FileIO tableIO,
InputFile inputFile,
- DataFileNamingStrategy.Name dataFileNamingStrategy)
+ DataFileNamingStrategy.Name dataFileNamingStrategy,
+ PartitionKey partitionKey)
throws IOException {
logger.info("{}: copying (sorted) to {}", file, dstDataFile);
long start = System.currentTimeMillis();
- OutputFile outputFile = tableIO.newOutputFile(dstDataFile);
+ OutputFile outputFile =
+ tableIO.newOutputFile(Strings.replacePrefix(dstDataFile, "s3://", "s3a://"));
Parquet.ReadBuilder readBuilder =
Parquet.read(inputFile)
@@ -650,8 +587,8 @@ private static DataFile copyParquetWithSortOrder(
}
// Sort
- if (!tableOrderSpec.isUnsorted()) {
- records.sort(new RecordComparator(tableOrderSpec, tableSchema));
+ if (!sortOrder.isUnsorted()) {
+ records.sort(new RecordComparator(sortOrder, tableSchema));
}
// Write sorted records to outputFile
@@ -678,11 +615,12 @@ private static DataFile copyParquetWithSortOrder(
file,
(System.currentTimeMillis() - start) / 1000);
- return new DataFiles.Builder(tableSpec)
- .withPath(dstDataFile)
+ return new DataFiles.Builder(partitionSpec)
+ .withPath(outputFile.location())
.withFormat("PARQUET")
.withFileSizeInBytes(fileSizeInBytes)
.withMetrics(metrics)
+ .withPartition(partitionKey)
.build();
}
@@ -773,6 +711,7 @@ public String get(PartitionSpec spec, StructLike partitionData, String file) {
}
public record Options(
+ DataFileNamingStrategy.Name dataFileNamingStrategy,
boolean skipDuplicates,
boolean noCommit,
boolean noCopy,
@@ -780,25 +719,18 @@ public record Options(
boolean forceTableAuth,
boolean s3NoSignRequest,
boolean s3CopyObject,
+ boolean assumeSorted,
+ @Nullable String retryListFile,
+ @Nullable List partitionList,
+ @Nullable List sortOrderList,
int threadCount) {
public static Builder builder() {
return new Builder();
}
- public Builder toBuilder() {
- return builder()
- .skipDuplicates(skipDuplicates)
- .noCommit(noCommit)
- .noCopy(noCopy)
- .forceNoCopy(forceNoCopy)
- .forceTableAuth(forceTableAuth)
- .s3NoSignRequest(s3NoSignRequest)
- .s3CopyObject(s3CopyObject)
- .threadCount(threadCount);
- }
-
public static final class Builder {
+ DataFileNamingStrategy.Name dataFileNamingStrategy;
private boolean skipDuplicates;
private boolean noCommit;
private boolean noCopy;
@@ -806,10 +738,19 @@ public static final class Builder {
private boolean forceTableAuth;
private boolean s3NoSignRequest;
private boolean s3CopyObject;
+ private boolean assumeSorted;
+ String retryListFile;
+ List partitionList = List.of();
+ List sortOrderList = List.of();
private int threadCount = Runtime.getRuntime().availableProcessors();
private Builder() {}
+ public Builder dataFileNamingStrategy(DataFileNamingStrategy.Name dataFileNamingStrategy) {
+ this.dataFileNamingStrategy = dataFileNamingStrategy;
+ return this;
+ }
+
public Builder skipDuplicates(boolean skipDuplicates) {
this.skipDuplicates = skipDuplicates;
return this;
@@ -845,6 +786,26 @@ public Builder s3CopyObject(boolean s3CopyObject) {
return this;
}
+ public Builder assumeSorted(boolean assumeSorted) {
+ this.assumeSorted = assumeSorted;
+ return this;
+ }
+
+ public Builder retryListFile(String retryListFile) {
+ this.retryListFile = retryListFile;
+ return this;
+ }
+
+ public Builder partitionList(List partitionList) {
+ this.partitionList = partitionList;
+ return this;
+ }
+
+ public Builder sortOrderList(List sortOrderList) {
+ this.sortOrderList = sortOrderList;
+ return this;
+ }
+
public Builder threadCount(int threadCount) {
this.threadCount = threadCount;
return this;
@@ -852,13 +813,18 @@ public Builder threadCount(int threadCount) {
public Options build() {
return new Options(
+ dataFileNamingStrategy,
skipDuplicates,
noCommit,
- noCopy,
+ forceNoCopy || noCopy,
forceNoCopy,
forceTableAuth,
s3NoSignRequest,
s3CopyObject,
+ assumeSorted,
+ retryListFile,
+ partitionList,
+ sortOrderList,
threadCount);
}
}
diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/InsertWatch.java b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/InsertWatch.java
new file mode 100644
index 0000000..934f6a7
--- /dev/null
+++ b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/InsertWatch.java
@@ -0,0 +1,239 @@
+/*
+ * Copyright (c) 2025 Altinity Inc and/or its affiliates. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ */
+package com.altinity.ice.cli.internal.cmd;
+
+import com.altinity.ice.internal.io.Matcher;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.function.Supplier;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.rest.RESTCatalog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.core.exception.SdkException;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
+import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
+import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry;
+import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse;
+import software.amazon.awssdk.services.sqs.model.Message;
+import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
+
+public class InsertWatch {
+
+ private static final Logger logger = LoggerFactory.getLogger(InsertWatch.class);
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+
+ public static void run(
+ RESTCatalog catalog,
+ TableIdentifier nsTable,
+ String[] input,
+ String sqsQueueURL,
+ boolean terminateAfterOneBatch,
+ boolean createTableIfNotExists,
+ Insert.Options options)
+ throws IOException, InterruptedException {
+
+ if (!options.noCopy() || !options.skipDuplicates()) {
+ throw new IllegalArgumentException(
+ "--watch currently requires --no-copy and --skip-duplicates");
+ }
+
+ if (input.length == 0) {
+ throw new IllegalArgumentException("At least one input required");
+ }
+
+ var matchers = Arrays.stream(input).map(Matcher::from).toList();
+
+ final SqsClient sqs = SqsClient.builder().build();
+ ReceiveMessageRequest req =
+ ReceiveMessageRequest.builder()
+ .queueUrl(sqsQueueURL)
+ .maxNumberOfMessages(10) // 10 is max
+ .waitTimeSeconds(20) // 20 is max
+ .build();
+
+ ReceiveMessageRequest tailReq =
+ ReceiveMessageRequest.builder()
+ .queueUrl(sqsQueueURL)
+ .maxNumberOfMessages(10) // 10 is max
+ .waitTimeSeconds(0)
+ .build();
+
+ logger.info("Pulling messages from {}", sqsQueueURL);
+
+ Supplier backoff = () -> Duration.ofSeconds(20);
+ Runnable resetBackoff =
+ () -> {
+ // TODO: implement
+ };
+
+ boolean createTableExecuted = false;
+
+ //noinspection LoopConditionNotUpdatedInsideLoop
+ do {
+ List batch = new LinkedList<>();
+ try {
+ var messages = sqs.receiveMessage(req).messages();
+ batch.addAll(messages);
+ } catch (SdkException e) {
+ if (!e.retryable()) {
+ throw e; // TODO: should we really?
+ }
+ Duration delay = backoff.get();
+ logger.error("Failed to pull messages from the SQS queue (retry in {})", delay, e);
+ Thread.sleep(delay);
+ continue;
+ }
+ if (!batch.isEmpty()) {
+ try {
+ var maxBatchSize = 100; // FIXME: make configurable
+
+ List tailMessages;
+ do {
+ tailMessages = sqs.receiveMessage(tailReq).messages();
+ batch.addAll(tailMessages);
+ } while (!tailMessages.isEmpty() && batch.size() < maxBatchSize);
+
+ logger.info("Processing {} message(s)", batch.size());
+ // FIXME: handle files not found
+
+ var insertBatch = filter(batch, matchers);
+ if (!insertBatch.isEmpty()) {
+ logger.info("Inserting {}", insertBatch);
+
+ if (createTableIfNotExists && !createTableExecuted) {
+ if (!catalog.tableExists(nsTable)) {
+ CreateTable.run(
+ catalog,
+ nsTable,
+ insertBatch.iterator().next(),
+ null,
+ true,
+ options.s3NoSignRequest(),
+ null,
+ null);
+ }
+ createTableExecuted = true;
+ }
+
+ Insert.run(catalog, nsTable, insertBatch.toArray(String[]::new), options);
+ }
+
+ confirmProcessed(sqs, sqsQueueURL, batch);
+ } catch (InterruptedException e) {
+ // terminate
+ Thread.currentThread().interrupt();
+ throw new InterruptedException();
+ } catch (Exception e) {
+ Duration delay = backoff.get();
+ logger.error("Failed to process batch of messages (retry in {})", delay, e);
+ Thread.sleep(delay);
+ continue;
+ }
+ }
+ resetBackoff.run();
+ } while (!terminateAfterOneBatch);
+ }
+
+ private static Collection filter(List messages, Collection matchers) {
+ Collection r = new LinkedHashSet<>();
+ for (Message message : messages) {
+ // Message body() example:
+ //
+ // {
+ // "Records": [
+ // {
+ // "eventTime": "2024-07-29T21:12:30.123Z",
+ // "eventName": "ObjectCreated:Put",
+ // "s3": {
+ // "bucket": {
+ // "name": "my-bucket"
+ // },
+ // "object": {
+ // "key": "path/to/my-object.txt",
+ // "size": 12345
+ // }
+ // }
+ // }
+ // ]
+ // }
+ JsonNode root;
+ try {
+ root = objectMapper.readTree(message.body());
+ } catch (JsonProcessingException e) {
+ logger.error("Failed to parse message#{} body", message.messageId(), e);
+ // TODO: dlq?
+ continue;
+ }
+ // TODO: use type
+ for (JsonNode record : root.path("Records")) {
+ String eventName = record.path("eventName").asText();
+ String bucketName = record.at("/s3/bucket/name").asText();
+ String objectKey = record.at("/s3/object/key").asText();
+ var target = String.format("s3://%s/%s", bucketName, objectKey);
+ switch (eventName) {
+ case "ObjectCreated:Put":
+ // TODO: exclude metadata/data dirs by default
+ if (matchers.stream().anyMatch(matcher -> matcher.test(target))) {
+ r.add(target);
+ }
+ default:
+ if (logger.isTraceEnabled()) {
+ logger.trace("Message skipped: {} {}", eventName, target);
+ }
+ }
+ }
+ }
+ return r;
+ }
+
+ private static void confirmProcessed(SqsClient sqs, String sqsQueueURL, List messages) {
+ int failedCount = 0;
+ int len = messages.size();
+ for (int i = 0; i < len; i = i + 10) {
+ List batch = messages.subList(i, Math.min(i + 10, len));
+ DeleteMessageBatchResponse res = deleteMessageBatch(sqs, sqsQueueURL, batch);
+ if (res.hasFailed()) {
+ List failed = res.failed();
+ failedCount += failed.size();
+ }
+ }
+ if (failedCount > 0) {
+ // TODO: pick a better exception class
+ throw new RuntimeException(String.format("Failed to confirm %d message(s)", failedCount));
+ }
+ }
+
+ private static DeleteMessageBatchResponse deleteMessageBatch(
+ SqsClient sqs, String sqsQueueURL, List messages) {
+ return sqs.deleteMessageBatch(
+ DeleteMessageBatchRequest.builder()
+ .queueUrl(sqsQueueURL)
+ .entries(
+ messages.stream()
+ .map(
+ m ->
+ DeleteMessageBatchRequestEntry.builder()
+ .id(m.messageId())
+ .receiptHandle(m.receiptHandle())
+ .build())
+ .toList())
+ .build());
+ }
+}
diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/iceberg/Partitioning.java b/ice/src/main/java/com/altinity/ice/cli/internal/iceberg/Partitioning.java
index 960dcb7..4fadf2f 100644
--- a/ice/src/main/java/com/altinity/ice/cli/internal/iceberg/Partitioning.java
+++ b/ice/src/main/java/com/altinity/ice/cli/internal/iceberg/Partitioning.java
@@ -10,12 +10,38 @@
package com.altinity.ice.cli.internal.iceberg;
import com.altinity.ice.cli.Main;
+import java.io.IOException;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.UpdatePartitionSpec;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.transforms.Transform;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.util.SerializableFunction;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
public final class Partitioning {
@@ -93,4 +119,187 @@ public static void apply(UpdatePartitionSpec op, List columns
}
}
}
+
+ // TODO: fall back to path when statistics is not available
+ public static @Nullable PartitionKey inferPartitionKey(
+ ParquetMetadata metadata, PartitionSpec spec) {
+ Schema schema = spec.schema();
+
+ List blocks = metadata.getBlocks();
+
+ Record partitionRecord = GenericRecord.create(schema);
+
+ for (PartitionField field : spec.fields()) {
+ int sourceId = field.sourceId();
+ String sourceName = schema.findField(sourceId).name();
+ Type type = schema.findField(sourceId).type();
+
+ Object value = null;
+ Object valueTransformed = null;
+ boolean same = true;
+
+ for (BlockMetaData block : blocks) {
+ Statistics> stats =
+ block.getColumns().stream()
+ .filter(c -> c.getPath().toDotString().equals(sourceName))
+ .findFirst()
+ .map(ColumnChunkMetaData::getStatistics)
+ .orElse(null);
+
+ if (stats == null
+ || !stats.hasNonNullValue()
+ || stats.genericGetMin() == null
+ || stats.genericGetMax() == null) {
+ same = false;
+ break;
+ }
+
+ Transform