Skip to content

Commit c478fb5

Browse files
authored
Merge pull request #31 from Altinity/catalog-retry-policy
patch: Catalog retry policy
2 parents d7c28e0 + dbac8c9 commit c478fb5

File tree

1 file changed

+361
-0
lines changed

1 file changed

+361
-0
lines changed
Lines changed: 361 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,361 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg;
20+
21+
import java.util.Locale;
22+
import java.util.UUID;
23+
import java.util.concurrent.atomic.AtomicReference;
24+
import java.util.function.Function;
25+
import java.util.function.Predicate;
26+
import org.apache.iceberg.encryption.EncryptionManager;
27+
import org.apache.iceberg.exceptions.AlreadyExistsException;
28+
import org.apache.iceberg.exceptions.CommitFailedException;
29+
import org.apache.iceberg.exceptions.NoSuchTableException;
30+
import org.apache.iceberg.exceptions.NotFoundException;
31+
import org.apache.iceberg.io.FileIO;
32+
import org.apache.iceberg.io.LocationProvider;
33+
import org.apache.iceberg.io.OutputFile;
34+
import org.apache.iceberg.relocated.com.google.common.base.Objects;
35+
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
36+
import org.apache.iceberg.util.LocationUtil;
37+
import org.apache.iceberg.util.Tasks;
38+
import org.slf4j.Logger;
39+
import org.slf4j.LoggerFactory;
40+
import software.amazon.awssdk.core.exception.SdkServiceException;
41+
42+
public abstract class BaseMetastoreTableOperations extends BaseMetastoreOperations
43+
implements TableOperations {
44+
private static final Logger LOG = LoggerFactory.getLogger(BaseMetastoreTableOperations.class);
45+
46+
public static final String TABLE_TYPE_PROP = "table_type";
47+
public static final String ICEBERG_TABLE_TYPE_VALUE = "iceberg";
48+
public static final String METADATA_LOCATION_PROP = "metadata_location";
49+
public static final String PREVIOUS_METADATA_LOCATION_PROP = "previous_metadata_location";
50+
51+
private static final String METADATA_FOLDER_NAME = "metadata";
52+
53+
private TableMetadata currentMetadata = null;
54+
private String currentMetadataLocation = null;
55+
private boolean shouldRefresh = true;
56+
private int version = -1;
57+
58+
protected BaseMetastoreTableOperations() {}
59+
60+
/**
61+
* The full name of the table used for logging purposes only. For example for HiveTableOperations
62+
* it is catalogName + "." + database + "." + table.
63+
*
64+
* @return The full name
65+
*/
66+
protected abstract String tableName();
67+
68+
@Override
69+
public TableMetadata current() {
70+
if (shouldRefresh) {
71+
return refresh();
72+
}
73+
return currentMetadata;
74+
}
75+
76+
public String currentMetadataLocation() {
77+
return currentMetadataLocation;
78+
}
79+
80+
public int currentVersion() {
81+
return version;
82+
}
83+
84+
@Override
85+
public TableMetadata refresh() {
86+
boolean currentMetadataWasAvailable = currentMetadata != null;
87+
try {
88+
doRefresh();
89+
} catch (NoSuchTableException e) {
90+
if (currentMetadataWasAvailable) {
91+
LOG.warn("Could not find the table during refresh, setting current metadata to null", e);
92+
shouldRefresh = true;
93+
}
94+
95+
currentMetadata = null;
96+
currentMetadataLocation = null;
97+
version = -1;
98+
throw e;
99+
}
100+
return current();
101+
}
102+
103+
protected void doRefresh() {
104+
throw new UnsupportedOperationException("Not implemented: doRefresh");
105+
}
106+
107+
@Override
108+
public void commit(TableMetadata base, TableMetadata metadata) {
109+
// if the metadata is already out of date, reject it
110+
if (base != current()) {
111+
if (base != null) {
112+
throw new CommitFailedException("Cannot commit: stale table metadata");
113+
} else {
114+
// when current is non-null, the table exists. but when base is null, the commit is trying
115+
// to create the table
116+
throw new AlreadyExistsException("Table already exists: %s", tableName());
117+
}
118+
}
119+
// if the metadata is not changed, return early
120+
if (base == metadata) {
121+
LOG.info("Nothing to commit.");
122+
return;
123+
}
124+
125+
long start = System.currentTimeMillis();
126+
doCommit(base, metadata);
127+
CatalogUtil.deleteRemovedMetadataFiles(io(), base, metadata);
128+
requestRefresh();
129+
130+
LOG.info(
131+
"Successfully committed to table {} in {} ms",
132+
tableName(),
133+
System.currentTimeMillis() - start);
134+
}
135+
136+
protected void doCommit(TableMetadata base, TableMetadata metadata) {
137+
throw new UnsupportedOperationException("Not implemented: doCommit");
138+
}
139+
140+
protected void requestRefresh() {
141+
this.shouldRefresh = true;
142+
}
143+
144+
protected void disableRefresh() {
145+
this.shouldRefresh = false;
146+
}
147+
148+
protected String writeNewMetadataIfRequired(boolean newTable, TableMetadata metadata) {
149+
return newTable && metadata.metadataFileLocation() != null
150+
? metadata.metadataFileLocation()
151+
: writeNewMetadata(metadata, currentVersion() + 1);
152+
}
153+
154+
protected String writeNewMetadata(TableMetadata metadata, int newVersion) {
155+
String newTableMetadataFilePath = newTableMetadataFilePath(metadata, newVersion);
156+
OutputFile newMetadataLocation = io().newOutputFile(newTableMetadataFilePath);
157+
158+
// write the new metadata
159+
// use overwrite to avoid negative caching in S3. this is safe because the metadata location is
160+
// always unique because it includes a UUID.
161+
TableMetadataParser.overwrite(metadata, newMetadataLocation);
162+
163+
return newMetadataLocation.location();
164+
}
165+
166+
protected void refreshFromMetadataLocation(String newLocation) {
167+
refreshFromMetadataLocation(newLocation, null, 20);
168+
}
169+
170+
protected void refreshFromMetadataLocation(String newLocation, int numRetries) {
171+
refreshFromMetadataLocation(newLocation, null, numRetries);
172+
}
173+
174+
protected void refreshFromMetadataLocation(
175+
String newLocation, Predicate<Exception> shouldRetry, int numRetries) {
176+
refreshFromMetadataLocation(
177+
newLocation,
178+
shouldRetry,
179+
numRetries,
180+
metadataLocation -> TableMetadataParser.read(io(), metadataLocation));
181+
}
182+
183+
protected void refreshFromMetadataLocation(
184+
String newLocation,
185+
Predicate<Exception> shouldRetry,
186+
int numRetries,
187+
Function<String, TableMetadata> metadataLoader) {
188+
// use null-safe equality check because new tables have a null metadata location
189+
if (!Objects.equal(currentMetadataLocation, newLocation)) {
190+
LOG.info("Refreshing table metadata from new version: {}", newLocation);
191+
192+
if (shouldRetry == null) {
193+
shouldRetry =
194+
e ->
195+
e instanceof SdkServiceException
196+
&& ((SdkServiceException) e).isRetryableException();
197+
}
198+
199+
AtomicReference<TableMetadata> newMetadata = new AtomicReference<>();
200+
Tasks.foreach(newLocation)
201+
.retry(numRetries)
202+
.exponentialBackoff(100, 5000, 600000, 4.0 /* 100, 400, 1600, ... */)
203+
.throwFailureWhenFinished()
204+
.stopRetryOn(NotFoundException.class) // overridden if shouldRetry is non-null
205+
.shouldRetryTest(shouldRetry)
206+
.run(metadataLocation -> newMetadata.set(metadataLoader.apply(metadataLocation)));
207+
208+
String newUUID = newMetadata.get().uuid();
209+
if (currentMetadata != null && currentMetadata.uuid() != null && newUUID != null) {
210+
Preconditions.checkState(
211+
newUUID.equals(currentMetadata.uuid()),
212+
"Table UUID does not match: current=%s != refreshed=%s",
213+
currentMetadata.uuid(),
214+
newUUID);
215+
}
216+
217+
this.currentMetadata = newMetadata.get();
218+
this.currentMetadataLocation = newLocation;
219+
this.version = parseVersion(newLocation);
220+
}
221+
this.shouldRefresh = false;
222+
}
223+
224+
private String metadataFileLocation(TableMetadata metadata, String filename) {
225+
String metadataLocation = metadata.properties().get(TableProperties.WRITE_METADATA_LOCATION);
226+
227+
if (metadataLocation != null) {
228+
return String.format("%s/%s", LocationUtil.stripTrailingSlash(metadataLocation), filename);
229+
} else {
230+
return String.format("%s/%s/%s", metadata.location(), METADATA_FOLDER_NAME, filename);
231+
}
232+
}
233+
234+
@Override
235+
public String metadataFileLocation(String filename) {
236+
return metadataFileLocation(current(), filename);
237+
}
238+
239+
@Override
240+
public LocationProvider locationProvider() {
241+
return LocationProviders.locationsFor(current().location(), current().properties());
242+
}
243+
244+
@Override
245+
public TableOperations temp(TableMetadata uncommittedMetadata) {
246+
return new TableOperations() {
247+
@Override
248+
public TableMetadata current() {
249+
return uncommittedMetadata;
250+
}
251+
252+
@Override
253+
public TableMetadata refresh() {
254+
throw new UnsupportedOperationException(
255+
"Cannot call refresh on temporary table operations");
256+
}
257+
258+
@Override
259+
public void commit(TableMetadata base, TableMetadata metadata) {
260+
throw new UnsupportedOperationException("Cannot call commit on temporary table operations");
261+
}
262+
263+
@Override
264+
public String metadataFileLocation(String fileName) {
265+
return BaseMetastoreTableOperations.this.metadataFileLocation(
266+
uncommittedMetadata, fileName);
267+
}
268+
269+
@Override
270+
public LocationProvider locationProvider() {
271+
return LocationProviders.locationsFor(
272+
uncommittedMetadata.location(), uncommittedMetadata.properties());
273+
}
274+
275+
@Override
276+
public FileIO io() {
277+
return BaseMetastoreTableOperations.this.io();
278+
}
279+
280+
@Override
281+
public EncryptionManager encryption() {
282+
return BaseMetastoreTableOperations.this.encryption();
283+
}
284+
285+
@Override
286+
public long newSnapshotId() {
287+
return BaseMetastoreTableOperations.this.newSnapshotId();
288+
}
289+
};
290+
}
291+
292+
/**
293+
* Attempt to load the table and see if any current or past metadata location matches the one we
294+
* were attempting to set. This is used as a last resort when we are dealing with exceptions that
295+
* may indicate the commit has failed but are not proof that this is the case. Past locations must
296+
* also be searched on the chance that a second committer was able to successfully commit on top
297+
* of our commit.
298+
*
299+
* @param newMetadataLocation the path of the new commit file
300+
* @param config metadata to use for configuration
301+
* @return Commit Status of Success, Failure or Unknown
302+
*/
303+
protected CommitStatus checkCommitStatus(String newMetadataLocation, TableMetadata config) {
304+
return CommitStatus.valueOf(
305+
checkCommitStatus(
306+
tableName(),
307+
newMetadataLocation,
308+
config.properties(),
309+
() -> checkCurrentMetadataLocation(newMetadataLocation))
310+
.name());
311+
}
312+
313+
/**
314+
* Validate if the new metadata location is the current metadata location or present within
315+
* previous metadata files.
316+
*
317+
* @param newMetadataLocation newly written metadata location
318+
* @return true if the new metadata location is the current metadata location or present within
319+
* previous metadata files.
320+
*/
321+
private boolean checkCurrentMetadataLocation(String newMetadataLocation) {
322+
TableMetadata metadata = refresh();
323+
String currentMetadataFileLocation = metadata.metadataFileLocation();
324+
return currentMetadataFileLocation.equals(newMetadataLocation)
325+
|| metadata.previousFiles().stream()
326+
.anyMatch(log -> log.file().equals(newMetadataLocation));
327+
}
328+
329+
private String newTableMetadataFilePath(TableMetadata meta, int newVersion) {
330+
String codecName =
331+
meta.property(
332+
TableProperties.METADATA_COMPRESSION, TableProperties.METADATA_COMPRESSION_DEFAULT);
333+
String fileExtension = TableMetadataParser.getFileExtension(codecName);
334+
return metadataFileLocation(
335+
meta,
336+
String.format(Locale.ROOT, "%05d-%s%s", newVersion, UUID.randomUUID(), fileExtension));
337+
}
338+
339+
/**
340+
* Parse the version from table metadata file name.
341+
*
342+
* @param metadataLocation table metadata file location
343+
* @return version of the table metadata file in success case and -1 if the version is not
344+
* parsable (as a sign that the metadata is not part of this catalog)
345+
*/
346+
private static int parseVersion(String metadataLocation) {
347+
int versionStart = metadataLocation.lastIndexOf('/') + 1; // if '/' isn't found, this will be 0
348+
int versionEnd = metadataLocation.indexOf('-', versionStart);
349+
if (versionEnd < 0) {
350+
// found filesystem table's metadata
351+
return -1;
352+
}
353+
354+
try {
355+
return Integer.parseInt(metadataLocation.substring(versionStart, versionEnd));
356+
} catch (NumberFormatException e) {
357+
LOG.warn("Unable to parse version from metadata location: {}", metadataLocation, e);
358+
return -1;
359+
}
360+
}
361+
}

0 commit comments

Comments
 (0)