Skip to content

Commit 9e9a388

Browse files
committed
Overlay iceberg-core-1.8.1 for BaseMetastoreTableOperations::refreshFromMetadataLocation patch
1 parent d7c28e0 commit 9e9a388

File tree

1 file changed

+353
-0
lines changed

1 file changed

+353
-0
lines changed
Lines changed: 353 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,353 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg;
20+
21+
import java.util.Locale;
22+
import java.util.UUID;
23+
import java.util.concurrent.atomic.AtomicReference;
24+
import java.util.function.Function;
25+
import java.util.function.Predicate;
26+
import org.apache.iceberg.encryption.EncryptionManager;
27+
import org.apache.iceberg.exceptions.AlreadyExistsException;
28+
import org.apache.iceberg.exceptions.CommitFailedException;
29+
import org.apache.iceberg.exceptions.NoSuchTableException;
30+
import org.apache.iceberg.exceptions.NotFoundException;
31+
import org.apache.iceberg.io.FileIO;
32+
import org.apache.iceberg.io.LocationProvider;
33+
import org.apache.iceberg.io.OutputFile;
34+
import org.apache.iceberg.relocated.com.google.common.base.Objects;
35+
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
36+
import org.apache.iceberg.util.LocationUtil;
37+
import org.apache.iceberg.util.Tasks;
38+
import org.slf4j.Logger;
39+
import org.slf4j.LoggerFactory;
40+
41+
public abstract class BaseMetastoreTableOperations extends BaseMetastoreOperations
42+
implements TableOperations {
43+
private static final Logger LOG = LoggerFactory.getLogger(BaseMetastoreTableOperations.class);
44+
45+
public static final String TABLE_TYPE_PROP = "table_type";
46+
public static final String ICEBERG_TABLE_TYPE_VALUE = "iceberg";
47+
public static final String METADATA_LOCATION_PROP = "metadata_location";
48+
public static final String PREVIOUS_METADATA_LOCATION_PROP = "previous_metadata_location";
49+
50+
private static final String METADATA_FOLDER_NAME = "metadata";
51+
52+
private TableMetadata currentMetadata = null;
53+
private String currentMetadataLocation = null;
54+
private boolean shouldRefresh = true;
55+
private int version = -1;
56+
57+
protected BaseMetastoreTableOperations() {}
58+
59+
/**
60+
* The full name of the table used for logging purposes only. For example for HiveTableOperations
61+
* it is catalogName + "." + database + "." + table.
62+
*
63+
* @return The full name
64+
*/
65+
protected abstract String tableName();
66+
67+
@Override
68+
public TableMetadata current() {
69+
if (shouldRefresh) {
70+
return refresh();
71+
}
72+
return currentMetadata;
73+
}
74+
75+
public String currentMetadataLocation() {
76+
return currentMetadataLocation;
77+
}
78+
79+
public int currentVersion() {
80+
return version;
81+
}
82+
83+
@Override
84+
public TableMetadata refresh() {
85+
boolean currentMetadataWasAvailable = currentMetadata != null;
86+
try {
87+
doRefresh();
88+
} catch (NoSuchTableException e) {
89+
if (currentMetadataWasAvailable) {
90+
LOG.warn("Could not find the table during refresh, setting current metadata to null", e);
91+
shouldRefresh = true;
92+
}
93+
94+
currentMetadata = null;
95+
currentMetadataLocation = null;
96+
version = -1;
97+
throw e;
98+
}
99+
return current();
100+
}
101+
102+
protected void doRefresh() {
103+
throw new UnsupportedOperationException("Not implemented: doRefresh");
104+
}
105+
106+
@Override
107+
public void commit(TableMetadata base, TableMetadata metadata) {
108+
// if the metadata is already out of date, reject it
109+
if (base != current()) {
110+
if (base != null) {
111+
throw new CommitFailedException("Cannot commit: stale table metadata");
112+
} else {
113+
// when current is non-null, the table exists. but when base is null, the commit is trying
114+
// to create the table
115+
throw new AlreadyExistsException("Table already exists: %s", tableName());
116+
}
117+
}
118+
// if the metadata is not changed, return early
119+
if (base == metadata) {
120+
LOG.info("Nothing to commit.");
121+
return;
122+
}
123+
124+
long start = System.currentTimeMillis();
125+
doCommit(base, metadata);
126+
CatalogUtil.deleteRemovedMetadataFiles(io(), base, metadata);
127+
requestRefresh();
128+
129+
LOG.info(
130+
"Successfully committed to table {} in {} ms",
131+
tableName(),
132+
System.currentTimeMillis() - start);
133+
}
134+
135+
protected void doCommit(TableMetadata base, TableMetadata metadata) {
136+
throw new UnsupportedOperationException("Not implemented: doCommit");
137+
}
138+
139+
protected void requestRefresh() {
140+
this.shouldRefresh = true;
141+
}
142+
143+
protected void disableRefresh() {
144+
this.shouldRefresh = false;
145+
}
146+
147+
protected String writeNewMetadataIfRequired(boolean newTable, TableMetadata metadata) {
148+
return newTable && metadata.metadataFileLocation() != null
149+
? metadata.metadataFileLocation()
150+
: writeNewMetadata(metadata, currentVersion() + 1);
151+
}
152+
153+
protected String writeNewMetadata(TableMetadata metadata, int newVersion) {
154+
String newTableMetadataFilePath = newTableMetadataFilePath(metadata, newVersion);
155+
OutputFile newMetadataLocation = io().newOutputFile(newTableMetadataFilePath);
156+
157+
// write the new metadata
158+
// use overwrite to avoid negative caching in S3. this is safe because the metadata location is
159+
// always unique because it includes a UUID.
160+
TableMetadataParser.overwrite(metadata, newMetadataLocation);
161+
162+
return newMetadataLocation.location();
163+
}
164+
165+
protected void refreshFromMetadataLocation(String newLocation) {
166+
refreshFromMetadataLocation(newLocation, null, 20);
167+
}
168+
169+
protected void refreshFromMetadataLocation(String newLocation, int numRetries) {
170+
refreshFromMetadataLocation(newLocation, null, numRetries);
171+
}
172+
173+
protected void refreshFromMetadataLocation(
174+
String newLocation, Predicate<Exception> shouldRetry, int numRetries) {
175+
refreshFromMetadataLocation(
176+
newLocation,
177+
shouldRetry,
178+
numRetries,
179+
metadataLocation -> TableMetadataParser.read(io(), metadataLocation));
180+
}
181+
182+
protected void refreshFromMetadataLocation(
183+
String newLocation,
184+
Predicate<Exception> shouldRetry,
185+
int numRetries,
186+
Function<String, TableMetadata> metadataLoader) {
187+
// use null-safe equality check because new tables have a null metadata location
188+
if (!Objects.equal(currentMetadataLocation, newLocation)) {
189+
LOG.info("Refreshing table metadata from new version: {}", newLocation);
190+
191+
AtomicReference<TableMetadata> newMetadata = new AtomicReference<>();
192+
Tasks.foreach(newLocation)
193+
.retry(numRetries)
194+
.exponentialBackoff(100, 5000, 600000, 4.0 /* 100, 400, 1600, ... */)
195+
.throwFailureWhenFinished()
196+
.stopRetryOn(NotFoundException.class) // overridden if shouldRetry is non-null
197+
.shouldRetryTest(shouldRetry)
198+
.run(metadataLocation -> newMetadata.set(metadataLoader.apply(metadataLocation)));
199+
200+
String newUUID = newMetadata.get().uuid();
201+
if (currentMetadata != null && currentMetadata.uuid() != null && newUUID != null) {
202+
Preconditions.checkState(
203+
newUUID.equals(currentMetadata.uuid()),
204+
"Table UUID does not match: current=%s != refreshed=%s",
205+
currentMetadata.uuid(),
206+
newUUID);
207+
}
208+
209+
this.currentMetadata = newMetadata.get();
210+
this.currentMetadataLocation = newLocation;
211+
this.version = parseVersion(newLocation);
212+
}
213+
this.shouldRefresh = false;
214+
}
215+
216+
private String metadataFileLocation(TableMetadata metadata, String filename) {
217+
String metadataLocation = metadata.properties().get(TableProperties.WRITE_METADATA_LOCATION);
218+
219+
if (metadataLocation != null) {
220+
return String.format("%s/%s", LocationUtil.stripTrailingSlash(metadataLocation), filename);
221+
} else {
222+
return String.format("%s/%s/%s", metadata.location(), METADATA_FOLDER_NAME, filename);
223+
}
224+
}
225+
226+
@Override
227+
public String metadataFileLocation(String filename) {
228+
return metadataFileLocation(current(), filename);
229+
}
230+
231+
@Override
232+
public LocationProvider locationProvider() {
233+
return LocationProviders.locationsFor(current().location(), current().properties());
234+
}
235+
236+
@Override
237+
public TableOperations temp(TableMetadata uncommittedMetadata) {
238+
return new TableOperations() {
239+
@Override
240+
public TableMetadata current() {
241+
return uncommittedMetadata;
242+
}
243+
244+
@Override
245+
public TableMetadata refresh() {
246+
throw new UnsupportedOperationException(
247+
"Cannot call refresh on temporary table operations");
248+
}
249+
250+
@Override
251+
public void commit(TableMetadata base, TableMetadata metadata) {
252+
throw new UnsupportedOperationException("Cannot call commit on temporary table operations");
253+
}
254+
255+
@Override
256+
public String metadataFileLocation(String fileName) {
257+
return BaseMetastoreTableOperations.this.metadataFileLocation(
258+
uncommittedMetadata, fileName);
259+
}
260+
261+
@Override
262+
public LocationProvider locationProvider() {
263+
return LocationProviders.locationsFor(
264+
uncommittedMetadata.location(), uncommittedMetadata.properties());
265+
}
266+
267+
@Override
268+
public FileIO io() {
269+
return BaseMetastoreTableOperations.this.io();
270+
}
271+
272+
@Override
273+
public EncryptionManager encryption() {
274+
return BaseMetastoreTableOperations.this.encryption();
275+
}
276+
277+
@Override
278+
public long newSnapshotId() {
279+
return BaseMetastoreTableOperations.this.newSnapshotId();
280+
}
281+
};
282+
}
283+
284+
/**
285+
* Attempt to load the table and see if any current or past metadata location matches the one we
286+
* were attempting to set. This is used as a last resort when we are dealing with exceptions that
287+
* may indicate the commit has failed but are not proof that this is the case. Past locations must
288+
* also be searched on the chance that a second committer was able to successfully commit on top
289+
* of our commit.
290+
*
291+
* @param newMetadataLocation the path of the new commit file
292+
* @param config metadata to use for configuration
293+
* @return Commit Status of Success, Failure or Unknown
294+
*/
295+
protected CommitStatus checkCommitStatus(String newMetadataLocation, TableMetadata config) {
296+
return CommitStatus.valueOf(
297+
checkCommitStatus(
298+
tableName(),
299+
newMetadataLocation,
300+
config.properties(),
301+
() -> checkCurrentMetadataLocation(newMetadataLocation))
302+
.name());
303+
}
304+
305+
/**
306+
* Validate if the new metadata location is the current metadata location or present within
307+
* previous metadata files.
308+
*
309+
* @param newMetadataLocation newly written metadata location
310+
* @return true if the new metadata location is the current metadata location or present within
311+
* previous metadata files.
312+
*/
313+
private boolean checkCurrentMetadataLocation(String newMetadataLocation) {
314+
TableMetadata metadata = refresh();
315+
String currentMetadataFileLocation = metadata.metadataFileLocation();
316+
return currentMetadataFileLocation.equals(newMetadataLocation)
317+
|| metadata.previousFiles().stream()
318+
.anyMatch(log -> log.file().equals(newMetadataLocation));
319+
}
320+
321+
private String newTableMetadataFilePath(TableMetadata meta, int newVersion) {
322+
String codecName =
323+
meta.property(
324+
TableProperties.METADATA_COMPRESSION, TableProperties.METADATA_COMPRESSION_DEFAULT);
325+
String fileExtension = TableMetadataParser.getFileExtension(codecName);
326+
return metadataFileLocation(
327+
meta,
328+
String.format(Locale.ROOT, "%05d-%s%s", newVersion, UUID.randomUUID(), fileExtension));
329+
}
330+
331+
/**
332+
* Parse the version from table metadata file name.
333+
*
334+
* @param metadataLocation table metadata file location
335+
* @return version of the table metadata file in success case and -1 if the version is not
336+
* parsable (as a sign that the metadata is not part of this catalog)
337+
*/
338+
private static int parseVersion(String metadataLocation) {
339+
int versionStart = metadataLocation.lastIndexOf('/') + 1; // if '/' isn't found, this will be 0
340+
int versionEnd = metadataLocation.indexOf('-', versionStart);
341+
if (versionEnd < 0) {
342+
// found filesystem table's metadata
343+
return -1;
344+
}
345+
346+
try {
347+
return Integer.parseInt(metadataLocation.substring(versionStart, versionEnd));
348+
} catch (NumberFormatException e) {
349+
LOG.warn("Unable to parse version from metadata location: {}", metadataLocation, e);
350+
return -1;
351+
}
352+
}
353+
}

0 commit comments

Comments
 (0)