@@ -20,6 +20,7 @@ package table
2020import (
2121 "bytes"
2222 "context"
23+ "encoding/json"
2324 "errors"
2425 "io"
2526 "io/fs"
@@ -135,6 +136,10 @@ func manifestSize(t *testing.T, version int, spec iceberg.PartitionSpec, schema
135136}
136137
137138func newTestDataFile (t * testing.T , spec iceberg.PartitionSpec , path string , partition map [int ]any ) iceberg.DataFile {
139+ return newTestDataFileWithCount (t , spec , path , partition , 1 )
140+ }
141+
142+ func newTestDataFileWithCount (t * testing.T , spec iceberg.PartitionSpec , path string , partition map [int ]any , count int64 ) iceberg.DataFile {
138143 t .Helper ()
139144
140145 builder , err := iceberg .NewDataFileBuilder (
@@ -145,8 +150,8 @@ func newTestDataFile(t *testing.T, spec iceberg.PartitionSpec, path string, part
145150 partition ,
146151 nil ,
147152 nil ,
148- 1 ,
149- 1 ,
153+ count ,
154+ count ,
150155 )
151156 require .NoError (t , err , "new data file builder" )
152157
@@ -299,6 +304,91 @@ func TestCommitV3RowLineageDeltaIncludesExistingRows(t *testing.T) {
299304 require .Equal (t , int64 (3 ), meta2 .NextRowID (), "next-row-id = first-row-id + assigned delta (1+2)" )
300305}
301306
307+ func readManifestListFromPath (t * testing.T , fs iceio.IO , path string ) []iceberg.ManifestFile {
308+ t .Helper ()
309+
310+ f , err := fs .Open (path )
311+ require .NoError (t , err , "open manifest list: %s" , path )
312+ defer f .Close ()
313+
314+ list , err := iceberg .ReadManifestList (f )
315+ require .NoError (t , err , "read manifest list: %s" , path )
316+
317+ return list
318+ }
319+
320+ func manifestFirstRowIDForSnapshot (t * testing.T , manifests []iceberg.ManifestFile , snapshotID int64 ) int64 {
321+ t .Helper ()
322+
323+ type manifestRowLineage struct {
324+ AddedSnapshotID int64 `json:"AddedSnapshotID"`
325+ FirstRowID * int64 `json:"FirstRowId"`
326+ }
327+
328+ for _ , manifest := range manifests {
329+ raw , err := json .Marshal (manifest )
330+ require .NoError (t , err , "marshal manifest" )
331+
332+ var decoded manifestRowLineage
333+ require .NoError (t , json .Unmarshal (raw , & decoded ), "unmarshal manifest row-lineage fields" )
334+
335+ if decoded .AddedSnapshotID == snapshotID {
336+ require .NotNil (t , decoded .FirstRowID , "first_row_id must be persisted for v3 data manifests" )
337+
338+ return * decoded .FirstRowID
339+ }
340+ }
341+
342+ require .Failf (t , "missing manifest for snapshot" , "snapshot-id=%d" , snapshotID )
343+
344+ return 0
345+ }
346+
347+ // TestCommitV3RowLineagePersistsManifestFirstRowID verifies that snapshot producer
348+ // writes first_row_id to manifest list entries using the snapshot's start row-id.
349+ func TestCommitV3RowLineagePersistsManifestFirstRowID (t * testing.T ) {
350+ spec := iceberg .NewPartitionSpec ()
351+ ident := Identifier {"db" , "tbl" }
352+ txn , memIO := createTestTransactionWithMemIO (t , spec )
353+ txn .meta .formatVersion = 3
354+
355+ // Use multi-row files to make row-range starts obvious.
356+ sp1 := newFastAppendFilesProducer (OpAppend , txn , memIO , nil , nil )
357+ sp1 .appendDataFile (newTestDataFileWithCount (t , spec , "file://data-1.parquet" , nil , 3 ))
358+ updates1 , reqs1 , err := sp1 .commit ()
359+ require .NoError (t , err , "first commit should succeed" )
360+ addSnap1 , ok := updates1 [0 ].(* addSnapshotUpdate )
361+ require .True (t , ok , "first update must be AddSnapshot" )
362+ require .Equal (t , int64 (0 ), * addSnap1 .Snapshot .FirstRowID , "snapshot first-row-id for commit 1" )
363+
364+ manifests1 := readManifestListFromPath (t , memIO , addSnap1 .Snapshot .ManifestList )
365+ currentManifestFirstRowID1 := manifestFirstRowIDForSnapshot (t , manifests1 , addSnap1 .Snapshot .SnapshotID )
366+ require .Equal (t , * addSnap1 .Snapshot .FirstRowID , currentManifestFirstRowID1 ,
367+ "persisted manifest first_row_id must match snapshot first-row-id for current commit" )
368+
369+ err = txn .apply (updates1 , reqs1 )
370+ require .NoError (t , err , "first apply should succeed" )
371+ meta1 , err := txn .meta .Build ()
372+ require .NoError (t , err )
373+ require .Equal (t , int64 (3 ), meta1 .NextRowID ())
374+
375+ tbl2 := New (ident , meta1 , "metadata.json" , func (context.Context ) (iceio.IO , error ) { return memIO , nil }, nil )
376+ txn2 := tbl2 .NewTransaction ()
377+ txn2 .meta .formatVersion = 3
378+ sp2 := newFastAppendFilesProducer (OpAppend , txn2 , memIO , nil , nil )
379+ sp2 .appendDataFile (newTestDataFileWithCount (t , spec , "file://data-2.parquet" , nil , 5 ))
380+ updates2 , _ , err := sp2 .commit ()
381+ require .NoError (t , err , "second commit should succeed" )
382+ addSnap2 , ok := updates2 [0 ].(* addSnapshotUpdate )
383+ require .True (t , ok , "first update must be AddSnapshot" )
384+ require .Equal (t , int64 (3 ), * addSnap2 .Snapshot .FirstRowID , "snapshot first-row-id for commit 2" )
385+
386+ manifests2 := readManifestListFromPath (t , memIO , addSnap2 .Snapshot .ManifestList )
387+ currentManifestFirstRowID2 := manifestFirstRowIDForSnapshot (t , manifests2 , addSnap2 .Snapshot .SnapshotID )
388+ require .Equal (t , * addSnap2 .Snapshot .FirstRowID , currentManifestFirstRowID2 ,
389+ "persisted manifest first_row_id must match snapshot first-row-id for current commit" )
390+ }
391+
302392func TestSnapshotProducerManifestsClosesWriterOnError (t * testing.T ) {
303393 spec := partitionedSpec ()
304394 schema := simpleSchema ()
0 commit comments