Skip to content

Commit 93886d8

Browse files
cbb330claude
andauthored
Fix stale snapshot detection to return 409 instead of 400 (#425)
## Summary - Fix stale snapshot detection during concurrent modifications to return HTTP 409 (Conflict) instead of HTTP 400 (Bad Request) - Reclassify `ValidationException` with stale snapshot message to `CommitFailedException` (409) to allow client retry - Ensure other ValidationException instances are handled as HTTP 400 Bad Request responses (e.g., attempting to delete a non-existent snapshot) ## Problem When concurrent modifications occur during a transaction commit: 1. Client builds snapshots based on table version N (e.g., `lastSequenceNumber = 4`) 2. Client sends commit request with these snapshots in `SNAPSHOTS_JSON_KEY` 3. Meanwhile, another process commits version N+1 (e.g., `lastSequenceNumber = 5`) 4. Server calls `doRefresh()` which updates `current()` to version N+1 5. **Bug:** The snapshots in `SNAPSHOTS_JSON_KEY` are now stale (their sequence numbers are based on version N) 6. Iceberg's `TableMetadata.addSnapshot()` throws `ValidationException` → mapped to 400 Bad Request 7. Should return 409 Conflict so clients know to refresh and retry ## Solution Let Iceberg's existing validation detect sequence number conflicts, then catch the `ValidationException` and reclassify it as `CommitFailedException` for the specific stale snapshot error pattern: ```java } catch (ValidationException e) { // Stale snapshot errors are retryable - client should refresh and retry if (isStaleSnapshotError(e)) { throw new CommitFailedException(e); } throw new BadRequestException(e, e.getMessage()); } ``` This approach is simpler than pre-checking and leverages Iceberg's existing validation. ## Test Plan - [x] Unit test `testStaleSnapshotErrorDetection()` verifies error detection logic - [x] All existing internalcatalog tests pass - [ ] Integration testing in staging environment --------- Co-authored-by: Claude Opus 4.5 <[email protected]>
1 parent 1555bf5 commit 93886d8

File tree

4 files changed

+124
-2
lines changed

4 files changed

+124
-2
lines changed

iceberg/openhouse/internalcatalog/src/main/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalTableOperations.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,13 @@ updatedMtDataRef, io().newOutputFile(newMetadataLocation)),
383383
e);
384384
}
385385
throw new CommitFailedException(ioe);
386-
} catch (InvalidIcebergSnapshotException | IllegalArgumentException | ValidationException e) {
386+
} catch (InvalidIcebergSnapshotException | IllegalArgumentException e) {
387+
throw new BadRequestException(e, e.getMessage());
388+
} catch (ValidationException e) {
389+
// Stale snapshot errors are retryable - client should refresh and retry
390+
if (isStaleSnapshotError(e)) {
391+
throw new CommitFailedException(e);
392+
}
387393
throw new BadRequestException(e, e.getMessage());
388394
} catch (CommitFailedException e) {
389395
throw e;
@@ -556,6 +562,17 @@ private void failIfRetryUpdate(Map<String, String> properties) {
556562
}
557563
}
558564

565+
/**
566+
* Checks if a ValidationException is due to a stale snapshot (sequence number conflict). This
567+
* happens during concurrent modifications and should be retryable (409), not a bad request (400).
568+
*/
569+
private boolean isStaleSnapshotError(ValidationException e) {
570+
String msg = e.getMessage();
571+
return msg != null
572+
&& msg.contains("Cannot add snapshot with sequence number")
573+
&& msg.contains("older than last sequence number");
574+
}
575+
559576
/**
560577
* Process all schema-related operations including client schema (for new tables), evolved schema
561578
* (for updates), and intermediate schemas (for replication scenarios). This consolidates all

iceberg/openhouse/internalcatalog/src/test/java/com/linkedin/openhouse/internal/catalog/IcebergTestUtil.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ public final class IcebergTestUtil {
1616
private static final String SNAPSHOTS_FILE = "serialized_snapshots.json";
1717
private static final String EXTRA_SNAPSHOTS_FILE = "extra_serialized_snapshots.json";
1818
private static final String WAP_SNAPSHOTS_FILE = "wap_serialized_snapshots.json";
19+
private static final String STALE_SNAPSHOT_FILE = "stale_snapshot.json";
1920

2021
private IcebergTestUtil() {}
2122

@@ -31,6 +32,10 @@ public static List<Snapshot> getWapSnapshots() throws IOException {
3132
return loadSnapshots(WAP_SNAPSHOTS_FILE);
3233
}
3334

35+
public static List<Snapshot> getStaleSnapshots() throws IOException {
36+
return loadSnapshots(STALE_SNAPSHOT_FILE);
37+
}
38+
3439
private static List<Snapshot> loadSnapshots(String snapshotFile) throws IOException {
3540
InputStream inputStream =
3641
IcebergTestUtil.class.getClassLoader().getResourceAsStream(snapshotFile);

iceberg/openhouse/internalcatalog/src/test/java/com/linkedin/openhouse/internal/catalog/OpenHouseInternalTableOperationsTest.java

Lines changed: 98 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.apache.iceberg.exceptions.BadRequestException;
5353
import org.apache.iceberg.exceptions.CommitFailedException;
5454
import org.apache.iceberg.exceptions.CommitStateUnknownException;
55+
import org.apache.iceberg.exceptions.ValidationException;
5556
import org.apache.iceberg.hadoop.HadoopFileIO;
5657
import org.apache.iceberg.io.FileIO;
5758
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -77,7 +78,7 @@ public class OpenHouseInternalTableOperationsTest {
7778
Types.NestedField.required(2, "ts", Types.TimestampType.withoutZone())),
7879
PartitionSpec.unpartitioned(),
7980
getTempLocation(),
80-
ImmutableMap.of());
81+
ImmutableMap.of("format-version", "2"));
8182
@Mock private HouseTableRepository mockHouseTableRepository;
8283
@Mock private HouseTableMapper mockHouseTableMapper;
8384
@Mock private HouseTable mockHouseTable;
@@ -1718,4 +1719,100 @@ void testMultipleDiffCommitWithWAPSnapshots() throws IOException {
17181719
Mockito.verify(mockHouseTableRepository, Mockito.times(1)).save(Mockito.eq(mockHouseTable));
17191720
}
17201721
}
1722+
1723+
/**
1724+
* Tests that stale snapshot detection returns 409 Conflict instead of 400 Bad Request.
1725+
*
1726+
* <p>This test reproduces the production scenario where:
1727+
*
1728+
* <ol>
1729+
* <li>Table has 4 existing snapshots with lastSequenceNumber = 4
1730+
* <li>A concurrent modification occurs during commit
1731+
* <li>Client tries to add a snapshot with sequenceNumber = 4 (now stale)
1732+
* <li>Before fix: ValidationException → BadRequestException (400)
1733+
* <li>After fix: Stale snapshot detected → CommitFailedException (409)
1734+
* </ol>
1735+
*
1736+
* <p>The 409 response tells clients to refresh and retry, while 400 incorrectly suggests the
1737+
* request was invalid.
1738+
*/
1739+
/**
1740+
* First verifies that Iceberg's validation actually throws for stale snapshots in format v2, then
1741+
* tests the OpenHouse doCommit path.
1742+
*/
1743+
@Test
1744+
void testStaleSnapshotDuringConcurrentModificationReturns409NotBadRequest() throws IOException {
1745+
// Build base metadata with all 4 test snapshots (lastSequenceNumber = 4)
1746+
List<Snapshot> existingSnapshots = IcebergTestUtil.getSnapshots();
1747+
TableMetadata tempMetadata = BASE_TABLE_METADATA;
1748+
for (Snapshot snapshot : existingSnapshots) {
1749+
tempMetadata =
1750+
TableMetadata.buildFrom(tempMetadata)
1751+
.setBranchSnapshot(snapshot, SnapshotRef.MAIN_BRANCH)
1752+
.build();
1753+
}
1754+
final TableMetadata baseMetadata = tempMetadata;
1755+
1756+
// Verify base metadata has format version 2 and lastSequenceNumber = 4
1757+
Assertions.assertEquals(2, baseMetadata.formatVersion(), "Format version should be 2");
1758+
Assertions.assertEquals(4, baseMetadata.lastSequenceNumber(), "lastSequenceNumber should be 4");
1759+
1760+
// Load stale snapshot with sequenceNumber = 4 (same as lastSequenceNumber)
1761+
List<Snapshot> staleSnapshots = IcebergTestUtil.getStaleSnapshots();
1762+
Snapshot staleSnapshot = staleSnapshots.get(0);
1763+
Assertions.assertEquals(
1764+
4, staleSnapshot.sequenceNumber(), "Stale snapshot should have sequenceNumber = 4");
1765+
1766+
// Verify stale snapshot has a parent (required for Iceberg 1.5+ validation)
1767+
Assertions.assertNotNull(
1768+
staleSnapshot.parentId(), "Stale snapshot must have parentId for validation to trigger");
1769+
1770+
// FIRST: Verify Iceberg's validation works directly
1771+
TableMetadata.Builder directBuilder = TableMetadata.buildFrom(baseMetadata);
1772+
ValidationException icebergException =
1773+
Assertions.assertThrows(
1774+
ValidationException.class,
1775+
() -> directBuilder.addSnapshot(staleSnapshot),
1776+
"Iceberg should throw ValidationException for stale snapshot");
1777+
Assertions.assertTrue(
1778+
icebergException.getMessage().contains("Cannot add snapshot with sequence number"),
1779+
"Iceberg should report sequence number issue: " + icebergException.getMessage());
1780+
1781+
// NOW test the full doCommit path
1782+
List<Snapshot> allSnapshots = new ArrayList<>(existingSnapshots);
1783+
allSnapshots.addAll(staleSnapshots);
1784+
1785+
Map<String, String> properties = new HashMap<>(baseMetadata.properties());
1786+
properties.put(
1787+
CatalogConstants.SNAPSHOTS_JSON_KEY, SnapshotsUtil.serializedSnapshots(allSnapshots));
1788+
properties.put(
1789+
CatalogConstants.SNAPSHOTS_REFS_KEY,
1790+
SnapshotsUtil.serializeMap(
1791+
IcebergTestUtil.createMainBranchRefPointingTo(existingSnapshots.get(3))));
1792+
properties.put(getCanonicalFieldName("tableLocation"), TEST_LOCATION);
1793+
1794+
final TableMetadata metadataWithStaleSnapshot = baseMetadata.replaceProperties(properties);
1795+
1796+
try (MockedStatic<TableMetadataParser> ignoreWriteMock =
1797+
Mockito.mockStatic(TableMetadataParser.class)) {
1798+
1799+
// Should throw CommitFailedException (409), not BadRequestException (400)
1800+
Exception exception =
1801+
Assertions.assertThrows(
1802+
CommitFailedException.class,
1803+
() ->
1804+
openHouseInternalTableOperations.doCommit(
1805+
baseMetadata, metadataWithStaleSnapshot),
1806+
"Stale snapshot should return 409 Conflict (CommitFailedException), not 400 "
1807+
+ "(BadRequestException). 400 suggests invalid request, 409 suggests retry.");
1808+
1809+
String message = exception.getMessage().toLowerCase();
1810+
Assertions.assertTrue(
1811+
message.contains("stale")
1812+
|| message.contains("sequence")
1813+
|| message.contains("concurrent"),
1814+
"Exception message should indicate stale snapshot or sequence number issue: "
1815+
+ exception.getMessage());
1816+
}
1817+
}
17211818
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
[
2+
"{ \"snapshot-id\" : 9999888777666555, \"parent-snapshot-id\" : 4151407017102313398, \"sequence-number\" : 4, \"timestamp-ms\" : 1767258030812, \"summary\" : { \"operation\" : \"append\", \"spark.app.id\" : \"local-1767258030000\", \"added-data-files\" : \"1\", \"added-records\" : \"1\", \"added-files-size\" : \"673\", \"changed-partition-count\" : \"1\", \"total-records\" : \"1\", \"total-files-size\" : \"673\", \"total-data-files\" : \"1\", \"total-delete-files\" : \"0\", \"total-position-deletes\" : \"0\", \"total-equality-deletes\" : \"0\" }, \"manifest-list\" : \"/data/openhouse/tracking_live_daily/SponsoredDocumentPageImpressionEvent/metadata/snap-9999888777666555-1-aa0dcbb9-707f-4f53-9df8-394bad8563f2.avro\", \"schema-id\" : 0}"
3+
]

0 commit comments

Comments
 (0)