Skip to content

Commit 46fe5e0

Browse files
Added append only, upsert only modes for polaris-synchronizer, and put modification aware behind opt-in flag. (#11)
* Added append only and remove strategies and put modification aware behind configurable flag * Rename to diffOnly * Changed test
1 parent bba9a30 commit 46fe5e0

File tree

9 files changed

+454
-343
lines changed

9 files changed

+454
-343
lines changed

polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/PolarisSynchronizer.java

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -61,20 +61,24 @@ public class PolarisSynchronizer {
6161

6262
private final boolean haltOnFailure;
6363

64+
private final boolean diffOnly;
65+
6466
public PolarisSynchronizer(
6567
Logger clientLogger,
6668
boolean haltOnFailure,
6769
SynchronizationPlanner synchronizationPlanner,
6870
PolarisService source,
6971
PolarisService target,
70-
ETagManager etagManager) {
72+
ETagManager etagManager,
73+
boolean diffOnly) {
7174
this.clientLogger =
7275
clientLogger == null ? LoggerFactory.getLogger(PolarisSynchronizer.class) : clientLogger;
7376
this.haltOnFailure = haltOnFailure;
7477
this.syncPlanner = synchronizationPlanner;
7578
this.source = source;
7679
this.target = target;
7780
this.etagManager = etagManager;
81+
this.diffOnly = diffOnly;
7882
}
7983

8084
/**
@@ -1035,18 +1039,25 @@ public void syncNamespaces(
10351039
try {
10361040
Map<String, String> sourceNamespaceMetadata =
10371041
sourceIcebergCatalogService.loadNamespaceMetadata(namespace);
1038-
Map<String, String> targetNamespaceMetadata =
1039-
targetIcebergCatalogService.loadNamespaceMetadata(namespace);
10401042

1041-
if (sourceNamespaceMetadata.equals(targetNamespaceMetadata)) {
1042-
clientLogger.info(
1043-
"Namespace metadata for namespace {} in namespace {} for catalog {} was not modified, skipping. - {}/{}",
1044-
namespace,
1045-
parentNamespace,
1046-
catalogName,
1047-
++syncsCompleted,
1048-
totalSyncsToComplete);
1049-
continue;
1043+
if (this.diffOnly) {
1044+
// if only configured to migrate the diff between the source and the target Polaris,
1045+
// then we can load the target namespace metadata and perform a comparison to discontinue early
1046+
// if we notice the metadata did not change
1047+
1048+
Map<String, String> targetNamespaceMetadata =
1049+
targetIcebergCatalogService.loadNamespaceMetadata(namespace);
1050+
1051+
if (sourceNamespaceMetadata.equals(targetNamespaceMetadata)) {
1052+
clientLogger.info(
1053+
"Namespace metadata for namespace {} in namespace {} for catalog {} was not modified, skipping. - {}/{}",
1054+
namespace,
1055+
parentNamespace,
1056+
catalogName,
1057+
++syncsCompleted,
1058+
totalSyncsToComplete);
1059+
continue;
1060+
}
10501061
}
10511062

10521063
targetIcebergCatalogService.setNamespaceProperties(namespace, sourceNamespaceMetadata);
@@ -1206,7 +1217,7 @@ public void syncTables(
12061217
try {
12071218
Table table;
12081219

1209-
if (sourceIcebergCatalogService instanceof PolarisIcebergCatalogService polarisCatalogService) {
1220+
if (this.diffOnly && sourceIcebergCatalogService instanceof PolarisIcebergCatalogService polarisCatalogService) {
12101221
String etag = etagManager.getETag(catalogName, tableId);
12111222
table = polarisCatalogService.loadTable(tableId, etag);
12121223
} else {
Lines changed: 75 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -33,29 +33,58 @@
3333
import org.apache.polaris.tools.sync.polaris.planning.plan.SynchronizationPlan;
3434

3535
/**
36-
* Sync planner that attempts to create total parity between the source and target Polaris
37-
* instances. This involves creating new entities, overwriting entities that exist on both source
38-
* and target, and removing entities that exist only on the target.
36+
* Planner that implements the base level strategy that can be applied to synchronize the source and target.
37+
* Can be configured at different levels of modification.
3938
*/
40-
public class SourceParitySynchronizationPlanner implements SynchronizationPlanner {
39+
public class BaseStrategyPlanner implements SynchronizationPlanner {
4140

4241
/**
43-
* Sort entities from the source into create, overwrite, and remove categories
42+
* The strategy to employ when using {@link BaseStrategyPlanner}.
43+
*/
44+
public enum Strategy {
45+
46+
/**
47+
* Only create entities that exist on source but don't already exist on the target
48+
*/
49+
CREATE_ONLY,
50+
51+
/**
52+
* Create entities that do not exist on the target, and overwrite existing ones with same name/identifier
53+
*/
54+
CREATE_AND_OVERWRITE,
55+
56+
/**
57+
* Create entities that exist on the source and not target, update entities that exist on both, remove entities
58+
* from the target that do not exist on the source.
59+
*/
60+
REPLICATE
61+
62+
}
63+
64+
private final Strategy strategy;
65+
66+
public BaseStrategyPlanner(Strategy strategy) {
67+
this.strategy = strategy;
68+
}
69+
70+
/**
71+
* Sort entities from the source into create, overwrite, remove, and skip categories
4472
* on the basis of which identifiers exist on the source and target Polaris.
4573
* Identifiers that are both on the source and target instance will be marked
46-
* for overwrite. Entities that are only on the source instance will be marked for
47-
* creation. Entities that are only on the target instance will be marked for deletion.
74+
* for overwrite if overwriting is enabled. Entities that are only on the source instance
75+
* will be marked for creation. Entities that are only on the target instance will be marked for deletion
76+
* only if the {@link Strategy#REPLICATE} strategy is used.
4877
* @param entitiesOnSource the entities from the source
4978
* @param entitiesOnTarget the entities from the target
50-
* @param supportOverwrites true if "overwriting" the entity is necessary. Most grant record entities do not need overwriting.
79+
* @param requiresOverwrites true if "overwriting" the entity is necessary. Most grant record entities do not need overwriting.
5180
* @param entityIdentifierSupplier consumes an entity and returns an identifying representation of that entity
5281
* @return a {@link SynchronizationPlan} with the entities sorted based on the souce parity strategy
5382
* @param <T> the type of the entity
5483
*/
5584
private <T> SynchronizationPlan<T> sortOnIdentifier(
5685
Collection<T> entitiesOnSource,
5786
Collection<T> entitiesOnTarget,
58-
boolean supportOverwrites,
87+
boolean requiresOverwrites,
5988
Function<T, Object> entityIdentifierSupplier
6089
) {
6190
Set<Object> sourceEntityIdentifiers = entitiesOnSource.stream().map(entityIdentifierSupplier).collect(Collectors.toSet());
@@ -65,11 +94,28 @@ private <T> SynchronizationPlan<T> sortOnIdentifier(
6594

6695
for (T entityOnSource : entitiesOnSource) {
6796
Object sourceEntityId = entityIdentifierSupplier.apply(entityOnSource);
97+
6898
if (targetEntityIdentifiers.contains(sourceEntityId)) {
69-
if (supportOverwrites) {
99+
// If an entity with this identifier exists on both the source and the target
100+
101+
if (strategy == Strategy.CREATE_ONLY) {
102+
// if the same entity identifier is on the source and target,
103+
// but we only permit creates, skip it
104+
plan.skipEntity(entityOnSource);
105+
} else {
70106
// if the same entity identifier is on the source and the target,
71107
// overwrite the one on the target with the one on the source
72-
plan.overwriteEntity(entityOnSource);
108+
109+
if (requiresOverwrites) {
110+
// If the entity requires a drop-and-recreate to perform an overwrite.
111+
// some grant records can be "created" indefinitely even if they already exists, for example,
112+
// catalog roles can be assigned the same principal role many times
113+
plan.overwriteEntity(entityOnSource);
114+
} else {
115+
// if the entity is not a type that requires "overwriting" in the sense of
116+
// dropping and recreating, then just create it again
117+
plan.createEntity(entityOnSource);
118+
}
73119
}
74120
} else {
75121
// if the entity identifier only exists on the source, that means
@@ -89,7 +135,15 @@ private <T> SynchronizationPlan<T> sortOnIdentifier(
89135
// or a catalog role was revoked from a principal role, in which case the target
90136
// should reflect this change when the tool is run multiple times, because we don't
91137
// want to take chances with over-extending privileges
92-
plan.removeEntity(entityOnTarget);
138+
139+
if (strategy == Strategy.REPLICATE) {
140+
plan.removeEntity(entityOnTarget);
141+
} else {
142+
// skip children here because if we want to remove the entity
143+
// and then that means it does not exist on the source, so there are no child
144+
// entities to sync
145+
plan.skipEntityAndSkipChildren(entityOnTarget);
146+
}
93147
}
94148
}
95149

@@ -99,7 +153,7 @@ private <T> SynchronizationPlan<T> sortOnIdentifier(
99153
@Override
100154
public SynchronizationPlan<Principal> planPrincipalSync(
101155
List<Principal> principalsOnSource, List<Principal> principalsOnTarget) {
102-
return sortOnIdentifier(principalsOnSource, principalsOnTarget, /* supportsOverwrites */ true, Principal::getName);
156+
return sortOnIdentifier(principalsOnSource, principalsOnTarget, /* requiresOverwrites */ true, Principal::getName);
103157
}
104158

105159
@Override
@@ -111,7 +165,7 @@ public SynchronizationPlan<PrincipalRole> planAssignPrincipalsToPrincipalRolesSy
111165
return sortOnIdentifier(
112166
assignedPrincipalRolesOnSource,
113167
assignedPrincipalRolesOnTarget,
114-
/* supportsOverwrites */ false, // do not need to overwrite an assignment of a principal role to a principal
168+
/* requiresOverwrites */ false, // do not need to overwrite an assignment of a principal role to a principal
115169
PrincipalRole::getName
116170
);
117171
}
@@ -123,15 +177,15 @@ public SynchronizationPlan<PrincipalRole> planPrincipalRoleSync(
123177
return sortOnIdentifier(
124178
principalRolesOnSource,
125179
principalRolesOnTarget,
126-
/* supportsOverwrites */ true,
180+
/* requiresOverwrites */ true,
127181
PrincipalRole::getName
128182
);
129183
}
130184

131185
@Override
132186
public SynchronizationPlan<Catalog> planCatalogSync(
133187
List<Catalog> catalogsOnSource, List<Catalog> catalogsOnTarget) {
134-
return sortOnIdentifier(catalogsOnSource, catalogsOnTarget, /* supportsOverwrites */ true, Catalog::getName);
188+
return sortOnIdentifier(catalogsOnSource, catalogsOnTarget, /* requiresOverwrites */ true, Catalog::getName);
135189
}
136190

137191
@Override
@@ -140,7 +194,7 @@ public SynchronizationPlan<CatalogRole> planCatalogRoleSync(
140194
List<CatalogRole> catalogRolesOnSource,
141195
List<CatalogRole> catalogRolesOnTarget) {
142196
return sortOnIdentifier(
143-
catalogRolesOnSource, catalogRolesOnTarget, /* supportsOverwrites */ true, CatalogRole::getName);
197+
catalogRolesOnSource, catalogRolesOnTarget, /* requiresOverwrites */ true, CatalogRole::getName);
144198
}
145199

146200
@Override
@@ -152,7 +206,7 @@ public SynchronizationPlan<GrantResource> planGrantSync(
152206
return sortOnIdentifier(
153207
grantsOnSource,
154208
grantsOnTarget,
155-
/* supportsOverwrites */ false,
209+
/* requiresOverwrites */ false,
156210
grant -> grant // grants can just be compared by the entire generated object
157211
);
158212
}
@@ -166,7 +220,7 @@ public SynchronizationPlan<PrincipalRole> planAssignPrincipalRolesToCatalogRoles
166220
return sortOnIdentifier(
167221
assignedPrincipalRolesOnSource,
168222
assignedPrincipalRolesOnTarget,
169-
/* supportsOverwrites */ false,
223+
/* requiresOverwrites */ false,
170224
PrincipalRole::getName
171225
);
172226
}
@@ -177,7 +231,7 @@ public SynchronizationPlan<Namespace> planNamespaceSync(
177231
Namespace namespace,
178232
List<Namespace> namespacesOnSource,
179233
List<Namespace> namespacesOnTarget) {
180-
return sortOnIdentifier(namespacesOnSource, namespacesOnTarget, /* supportsOverwrites */ true, ns -> ns);
234+
return sortOnIdentifier(namespacesOnSource, namespacesOnTarget, /* requiresOverwrites */ true, ns -> ns);
181235
}
182236

183237
@Override
@@ -187,6 +241,6 @@ public SynchronizationPlan<TableIdentifier> planTableSync(
187241
Set<TableIdentifier> tablesOnSource,
188242
Set<TableIdentifier> tablesOnTarget) {
189243
return sortOnIdentifier(
190-
tablesOnSource, tablesOnTarget, /* supportsOverwrites */ true, tableId -> tableId);
244+
tablesOnSource, tablesOnTarget, /* requiresOverwrites */ true, tableId -> tableId);
191245
}
192246
}

polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/planning/SynchronizationPlanner.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public interface PlannerWrapper {
5353

5454
private final List<PlannerWrapper> plannerWrappers = new ArrayList<>();
5555

56-
private SynchronizationPlannerBuilder(SourceParitySynchronizationPlanner innermost) {
56+
private SynchronizationPlannerBuilder(BaseStrategyPlanner innermost) {
5757
this.innermost = innermost;
5858
}
5959

@@ -90,7 +90,7 @@ public SynchronizationPlanner build() {
9090
}
9191
}
9292

93-
static SynchronizationPlannerBuilder builder(SourceParitySynchronizationPlanner innermost) {
93+
static SynchronizationPlannerBuilder builder(BaseStrategyPlanner innermost) {
9494
return new SynchronizationPlannerBuilder(innermost);
9595
}
9696

0 commit comments

Comments
 (0)