Skip to content

Commit bba9a30

Browse files
Add filter for catalog name to Polaris synchronizer/migrator (#10)
* Add catalog name filter planner * Added builder for SynchronizationPlan * Change formatting * Fixed builder * Changed builder contract
1 parent ab310d5 commit bba9a30

File tree

4 files changed

+231
-5
lines changed

4 files changed

+231
-5
lines changed
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.polaris.tools.sync.polaris.planning;
21+
22+
import org.apache.polaris.core.admin.model.Catalog;
23+
import org.apache.polaris.tools.sync.polaris.planning.plan.SynchronizationPlan;
24+
25+
import java.util.ArrayList;
26+
import java.util.HashMap;
27+
import java.util.List;
28+
import java.util.Map;
29+
30+
/**
31+
* Planner that filters out catalogs from the source and target on the basis of a RegEx before
32+
* they are passed to an encapsulated planner.
33+
*/
34+
public class CatalogNameFilterPlanner extends DelegatedPlanner implements SynchronizationPlanner {
35+
36+
private final String catalogNameFilterPattern;
37+
38+
public CatalogNameFilterPlanner(String regex, SynchronizationPlanner delegate) {
39+
super(delegate);
40+
this.catalogNameFilterPattern = regex;
41+
}
42+
43+
@Override
44+
public SynchronizationPlan<Catalog> planCatalogSync(List<Catalog> catalogsOnSource, List<Catalog> catalogsOnTarget) {
45+
List<Catalog> filteredSourceCatalogs = new ArrayList<>();
46+
47+
// store the names of the catalogs we skip so that we don't also mark target catalogs with the same name
48+
// twice
49+
Map<String, Catalog> skippedSourceCatalogsByName = new HashMap<>();
50+
51+
for (Catalog catalog : catalogsOnSource) {
52+
if (catalog.getName().matches(catalogNameFilterPattern)) {
53+
filteredSourceCatalogs.add(catalog);
54+
} else {
55+
skippedSourceCatalogsByName.put(catalog.getName(), catalog);
56+
}
57+
}
58+
59+
List<Catalog> filteredTargetCatalogs = new ArrayList<>();
60+
61+
List<Catalog> skippedTargetCatalogs = new ArrayList<>();
62+
63+
for (Catalog catalog : catalogsOnTarget) {
64+
if (catalog.getName().matches(catalogNameFilterPattern)) {
65+
filteredTargetCatalogs.add(catalog);
66+
} else if (!skippedSourceCatalogsByName.containsKey(catalog.getName())) {
67+
// if we already skipped a catalog with the same name on the source, we don't want to mark it as
68+
// skipped again, but we do want to mark catalogs that aren't on the source but were instead filtered
69+
// out solely from the target
70+
skippedTargetCatalogs.add(catalog);
71+
}
72+
}
73+
74+
SynchronizationPlan<Catalog> delegatedPlan =
75+
delegate.planCatalogSync(filteredSourceCatalogs, filteredTargetCatalogs);
76+
77+
skippedSourceCatalogsByName.values().forEach(delegatedPlan::skipEntityAndSkipChildren);
78+
skippedTargetCatalogs.forEach(delegatedPlan::skipEntityAndSkipChildren);
79+
80+
return delegatedPlan;
81+
}
82+
}

polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/planning/SynchronizationPlanner.java

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.polaris.tools.sync.polaris.planning;
2020

21+
import java.util.ArrayList;
2122
import java.util.List;
2223
import java.util.Set;
2324
import org.apache.iceberg.catalog.Namespace;
@@ -35,6 +36,64 @@
3536
*/
3637
public interface SynchronizationPlanner {
3738

39+
class SynchronizationPlannerBuilder {
40+
41+
@FunctionalInterface
42+
public interface PlannerWrapper {
43+
44+
/**
45+
* Wrap a provided {@link SynchronizationPlanner} by another {@link SynchronizationPlanner}.
46+
* @param planner the planner to wrap
47+
* @return a wrapped planner
48+
*/
49+
SynchronizationPlanner wrap(SynchronizationPlanner planner);
50+
}
51+
52+
private final SynchronizationPlanner innermost;
53+
54+
private final List<PlannerWrapper> plannerWrappers = new ArrayList<>();
55+
56+
private SynchronizationPlannerBuilder(SourceParitySynchronizationPlanner innermost) {
57+
this.innermost = innermost;
58+
}
59+
60+
/**
61+
* Wrap the current chain of planners.
62+
* @param outer the planner to wrap by
63+
*/
64+
public SynchronizationPlannerBuilder wrapBy(PlannerWrapper outer) {
65+
plannerWrappers.add(outer);
66+
return this;
67+
}
68+
69+
/**
70+
* Wrap the current chain of planners if the condition is true.
71+
* @param condition if true, will wrap the current chain of planners by the provided outer planner
72+
* @param outer the planner to wrap by
73+
*/
74+
public SynchronizationPlannerBuilder conditionallyWrapBy(boolean condition, PlannerWrapper outer) {
75+
if (condition) {
76+
plannerWrappers.add(outer);
77+
}
78+
return this;
79+
}
80+
81+
/**
82+
* Build the chained set of planners.
83+
*/
84+
public SynchronizationPlanner build() {
85+
SynchronizationPlanner current = innermost;
86+
for (PlannerWrapper plannerWrapper : plannerWrappers) {
87+
current = plannerWrapper.wrap(current);
88+
}
89+
return current;
90+
}
91+
}
92+
93+
static SynchronizationPlannerBuilder builder(SourceParitySynchronizationPlanner innermost) {
94+
return new SynchronizationPlannerBuilder(innermost);
95+
}
96+
3897
SynchronizationPlan<Principal> planPrincipalSync(
3998
List<Principal> principalsOnSource, List<Principal> principalsOnTarget);
4099

@@ -77,4 +136,4 @@ SynchronizationPlan<TableIdentifier> planTableSync(
77136
Namespace namespace,
78137
Set<TableIdentifier> tablesOnSource,
79138
Set<TableIdentifier> tablesOnTarget);
80-
}
139+
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.polaris.tools.sync.polaris;
21+
22+
import org.apache.polaris.core.admin.model.Catalog;
23+
import org.apache.polaris.tools.sync.polaris.planning.CatalogNameFilterPlanner;
24+
import org.apache.polaris.tools.sync.polaris.planning.NoOpSyncPlanner;
25+
import org.apache.polaris.tools.sync.polaris.planning.SynchronizationPlanner;
26+
import org.apache.polaris.tools.sync.polaris.planning.plan.SynchronizationPlan;
27+
import org.junit.jupiter.api.Assertions;
28+
import org.junit.jupiter.api.Test;
29+
30+
import java.util.List;
31+
32+
public class CatalogNameFilterPlannerTest {
33+
34+
private static final Catalog CATALOG_1 = new Catalog().name("catalog-1");
35+
36+
private static final Catalog CATALOG_2 = new Catalog().name("catalog-2");
37+
38+
@Test
39+
public void testFiltersOutCatalog() {
40+
SynchronizationPlanner planner = new CatalogNameFilterPlanner(
41+
"^catalog-1$", new NoOpSyncPlanner());
42+
43+
SynchronizationPlan<Catalog> plan
44+
= planner.planCatalogSync(List.of(CATALOG_1, CATALOG_2), List.of());
45+
46+
Assertions.assertTrue(plan.entitiesToSkipAndSkipChildren().contains(CATALOG_2));
47+
Assertions.assertFalse(plan.entitiesToSkipAndSkipChildren().contains(CATALOG_1));
48+
}
49+
50+
@Test
51+
public void onlyMarksSourceCatalogForFilteringWhenCatalogIsOnSourceAndTarget() {
52+
SynchronizationPlanner planner = new CatalogNameFilterPlanner(
53+
"^something that doesn't match the catalog name$", new NoOpSyncPlanner());
54+
55+
SynchronizationPlan<Catalog> plan
56+
= planner.planCatalogSync(List.of(CATALOG_1), List.of(CATALOG_1));
57+
58+
Assertions.assertTrue(plan.entitiesToSkipAndSkipChildren().contains(CATALOG_1));
59+
60+
// ensure only marks the source catalog, doesn't also mark target catalog as well
61+
Assertions.assertEquals(1, plan.entitiesToSkipAndSkipChildren().size());
62+
}
63+
64+
@Test
65+
public void marksTargetCatalogWhenSourceCatalogDoesNotExist() {
66+
SynchronizationPlanner planner = new CatalogNameFilterPlanner(
67+
"^something that doesn't match either catalog name$", new NoOpSyncPlanner());
68+
69+
SynchronizationPlan<Catalog> plan
70+
= planner.planCatalogSync(List.of(CATALOG_1), List.of(CATALOG_2));
71+
72+
Assertions.assertTrue(plan.entitiesToSkipAndSkipChildren().contains(CATALOG_1));
73+
Assertions.assertTrue(plan.entitiesToSkipAndSkipChildren().contains(CATALOG_2));
74+
}
75+
}

polaris-synchronizer/cli/src/main/java/org/apache/polaris/tools/sync/polaris/SyncPolarisCommand.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.concurrent.Callable;
2323
import org.apache.polaris.tools.sync.polaris.catalog.ETagManager;
2424
import org.apache.polaris.tools.sync.polaris.planning.AccessControlAwarePlanner;
25+
import org.apache.polaris.tools.sync.polaris.planning.CatalogNameFilterPlanner;
2526
import org.apache.polaris.tools.sync.polaris.planning.ModificationAwarePlanner;
2627
import org.apache.polaris.tools.sync.polaris.planning.SourceParitySynchronizationPlanner;
2728
import org.apache.polaris.tools.sync.polaris.planning.SynchronizationPlanner;
@@ -90,11 +91,20 @@ public class SyncPolarisCommand implements Callable<Integer> {
9091
)
9192
private boolean haltOnFailure;
9293

94+
@CommandLine.Option(
95+
names = {"--catalog-name-regex"},
96+
description = "If specified, only catalogs with names that match the provided RegEx will be staged for " +
97+
"synchronization. This applies to catalogs on both the source and target."
98+
)
99+
private String catalogNameRegex;
100+
93101
@Override
94102
public Integer call() throws Exception {
95-
SynchronizationPlanner sourceParityPlanner = new SourceParitySynchronizationPlanner();
96-
SynchronizationPlanner modificationAwareSourceParityPlanner = new ModificationAwarePlanner(sourceParityPlanner);
97-
SynchronizationPlanner accessControlAwarePlanner = new AccessControlAwarePlanner(modificationAwareSourceParityPlanner);
103+
SynchronizationPlanner planner = SynchronizationPlanner.builder(new SourceParitySynchronizationPlanner())
104+
.wrapBy(ModificationAwarePlanner::new)
105+
.conditionallyWrapBy(catalogNameRegex != null, p -> new CatalogNameFilterPlanner(catalogNameRegex, p))
106+
.wrapBy(AccessControlAwarePlanner::new)
107+
.build();
98108

99109
// auto generate omnipotent principals with write access on the target, read only access on source
100110
sourceProperties.put(PolarisApiService.ICEBERG_WRITE_ACCESS_PROPERTY, Boolean.toString(false));
@@ -111,7 +121,7 @@ public Integer call() throws Exception {
111121
new PolarisSynchronizer(
112122
consoleLog,
113123
haltOnFailure,
114-
accessControlAwarePlanner,
124+
planner,
115125
source,
116126
target,
117127
etagManager);

0 commit comments

Comments
 (0)