Skip to content

Commit 737bdef

Browse files
authored
makes clearer pg timestamps should not be used for ordering txs (#2832)
1 parent 5acbcfb commit 737bdef

File tree

6 files changed

+66
-26
lines changed

6 files changed

+66
-26
lines changed

internal/datastore/postgres/postgres.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -503,7 +503,7 @@ func (pgd *pgDatastore) ReadWriteTx(
503503
return nil, spiceerrors.MustBugf("could not cast timestamp to uint64")
504504
}
505505

506-
return postgresRevision{snapshot: newSnapshot.markComplete(newXID.Uint64), optionalTxID: newXID, optionalNanosTimestamp: nanosTimestamp}, nil
506+
return postgresRevision{snapshot: newSnapshot.markComplete(newXID.Uint64), optionalTxID: newXID, optionalInexactNanosTimestamp: nanosTimestamp}, nil
507507
}
508508

509509
if !config.DisableRetries {

internal/datastore/postgres/postgres_shared_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -455,7 +455,7 @@ func ReadWriteTxReturnsOptionalRevisionFields(t *testing.T, ds datastore.Datasto
455455
require.NoError(err)
456456
pgRev, ok := rev.(postgresRevision)
457457
require.True(ok)
458-
require.Positive(pgRev.optionalNanosTimestamp, "revision timestamp should be set")
458+
require.Positive(pgRev.optionalInexactNanosTimestamp, "revision timestamp should be set")
459459
require.True(pgRev.optionalTxID.Valid, "revision txid be set")
460460
}
461461

@@ -977,8 +977,8 @@ func OverlappingRevisionTest(t *testing.T, b testdatastore.RunningEngineForTest)
977977
5 * time.Second,
978978
0,
979979
[]postgresRevision{
980-
{optionalTxID: NewXid8(3), snapshot: pgSnapshot{xmin: 1, xmax: 4, xipList: []uint64{2}}, optionalNanosTimestamp: uint64((time.Second * 1) * time.Nanosecond)},
981-
{optionalTxID: NewXid8(2), snapshot: pgSnapshot{xmin: 1, xmax: 4, xipList: []uint64{3}}, optionalNanosTimestamp: uint64((time.Second * 2) * time.Nanosecond)},
980+
{optionalTxID: NewXid8(3), snapshot: pgSnapshot{xmin: 1, xmax: 4, xipList: []uint64{2}}, optionalInexactNanosTimestamp: uint64((time.Second * 1) * time.Nanosecond)},
981+
{optionalTxID: NewXid8(2), snapshot: pgSnapshot{xmin: 1, xmax: 4, xipList: []uint64{3}}, optionalInexactNanosTimestamp: uint64((time.Second * 2) * time.Nanosecond)},
982982
},
983983
2, 0,
984984
},
@@ -1022,7 +1022,7 @@ func OverlappingRevisionTest(t *testing.T, b testdatastore.RunningEngineForTest)
10221022
stmt := sq.StatementBuilder.PlaceholderFormat(sq.Dollar)
10231023
insertTxn := stmt.Insert(schema.TableTransaction).Columns(schema.ColXID, schema.ColSnapshot, schema.ColTimestamp)
10241024

1025-
ts := time.Unix(0, int64(rev.optionalNanosTimestamp)) //nolint:gosec
1025+
ts := time.Unix(0, int64(rev.optionalInexactNanosTimestamp)) //nolint:gosec
10261026
sql, args, err := insertTxn.Values(rev.optionalTxID, rev.snapshot, ts).ToSql()
10271027
require.NoError(err)
10281028

internal/datastore/postgres/revisions.go

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -229,8 +229,8 @@ func parseRevisionProto(revisionStr string) (datastore.Revision, error) {
229229
xmax: xmax,
230230
xipList: xips,
231231
},
232-
optionalTxID: xid8{Uint64: decoded.OptionalTxid, Valid: decoded.OptionalTxid != 0},
233-
optionalNanosTimestamp: decoded.OptionalTimestamp,
232+
optionalTxID: xid8{Uint64: decoded.OptionalTxid, Valid: decoded.OptionalTxid != 0},
233+
optionalInexactNanosTimestamp: decoded.OptionalTimestamp,
234234
}, nil
235235
}
236236

@@ -318,10 +318,14 @@ func createNewTransaction(ctx context.Context, tx pgx.Tx, metadata map[string]an
318318
}
319319

320320
type postgresRevision struct {
321-
snapshot pgSnapshot
322-
optionalTxID xid8
323-
optionalNanosTimestamp uint64
324-
optionalMetadata dscommon.TransactionMetadata
321+
snapshot pgSnapshot
322+
optionalTxID xid8
323+
// timestamps cannot be used for ordering transactions in PG, but are useful to get a rough estimate of
324+
// when did the transaction happen (e.g. for lag-computation purposes on Watch API consumers).
325+
// For ordering purposes, only the snapshot values should be used (and it's still possible for two snapshots to
326+
// be overlapping)
327+
optionalInexactNanosTimestamp uint64
328+
optionalMetadata dscommon.TransactionMetadata
325329
}
326330

327331
func (pr postgresRevision) ByteSortable() bool {
@@ -374,13 +378,14 @@ func (pr postgresRevision) OptionalTransactionID() (xid8, bool) {
374378
}
375379

376380
// OptionalNanosTimestamp returns a unix epoch timestamp in nanos representing the time at which the transaction committed
377-
// as defined by the Postgres primary. This is not guaranteed to be monotonically increasing
381+
// as defined by the Postgres primary. This is not guaranteed to be monotonically increasing, and thus should
382+
// not be used for ordering transactions.
378383
func (pr postgresRevision) OptionalNanosTimestamp() (uint64, bool) {
379-
if pr.optionalNanosTimestamp == 0 {
384+
if pr.optionalInexactNanosTimestamp == 0 {
380385
return 0, false
381386
}
382387

383-
return pr.optionalNanosTimestamp, true
388+
return pr.optionalInexactNanosTimestamp, true
384389
}
385390

386391
// MarshalBinary creates a version of the snapshot that uses relative encoding
@@ -409,7 +414,7 @@ func (pr postgresRevision) MarshalBinary() ([]byte, error) {
409414
RelativeXmax: relativeXmax - xminInt,
410415
RelativeXips: relativeXips,
411416
OptionalTxid: pr.optionalTxID.Uint64,
412-
OptionalTimestamp: pr.optionalNanosTimestamp,
417+
OptionalTimestamp: pr.optionalInexactNanosTimestamp,
413418
}
414419

415420
return protoRevision.MarshalVT()

internal/datastore/postgres/revisions_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,9 @@ func TestRevisionSerDe(t *testing.T) {
8181
require := require.New(t)
8282

8383
rev := postgresRevision{
84-
snapshot: tc.snapshot,
85-
optionalTxID: xid8{Uint64: tc.optionalTxID, Valid: tc.optionalTxID > 0},
86-
optionalNanosTimestamp: tc.optionalNanoTS,
84+
snapshot: tc.snapshot,
85+
optionalTxID: xid8{Uint64: tc.optionalTxID, Valid: tc.optionalTxID > 0},
86+
optionalInexactNanosTimestamp: tc.optionalNanoTS,
8787
}
8888
serialized := rev.String()
8989
require.Equal(tc.expectedStr, serialized)
@@ -99,7 +99,7 @@ func TestTxIDTimestampAvailable(t *testing.T) {
9999
// Timestamps should be non-negative
100100
testTimestamp := safecast.RequireConvert[uint64](t, time.Now().Unix())
101101
snapshot := snap(0, 5, 1)
102-
pgr := postgresRevision{snapshot: snapshot, optionalTxID: NewXid8(1), optionalNanosTimestamp: testTimestamp}
102+
pgr := postgresRevision{snapshot: snapshot, optionalTxID: NewXid8(1), optionalInexactNanosTimestamp: testTimestamp}
103103
receivedTimestamp, ok := pgr.OptionalNanosTimestamp()
104104
require.True(t, ok)
105105
require.Equal(t, receivedTimestamp, testTimestamp)

internal/datastore/postgres/watch.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -173,9 +173,9 @@ func (pgd *pgDatastore) Watch(
173173
currentTxn = newTxns[len(newTxns)-1]
174174
for _, newTx := range newTxns {
175175
currentTxn = postgresRevision{
176-
snapshot: currentTxn.snapshot.markComplete(newTx.optionalTxID.Uint64),
177-
optionalTxID: currentTxn.optionalTxID,
178-
optionalNanosTimestamp: currentTxn.optionalNanosTimestamp,
176+
snapshot: currentTxn.snapshot.markComplete(newTx.optionalTxID.Uint64),
177+
optionalTxID: currentTxn.optionalTxID,
178+
optionalInexactNanosTimestamp: currentTxn.optionalInexactNanosTimestamp,
179179
}
180180
}
181181

@@ -229,10 +229,10 @@ func (pgd *pgDatastore) getNewRevisions(ctx context.Context, afterTX postgresRev
229229
}
230230

231231
ids = append(ids, postgresRevision{
232-
snapshot: nextSnapshot.markComplete(nextXID.Uint64),
233-
optionalTxID: nextXID,
234-
optionalNanosTimestamp: nanosTimestamp,
235-
optionalMetadata: metadata,
232+
snapshot: nextSnapshot.markComplete(nextXID.Uint64),
233+
optionalTxID: nextXID,
234+
optionalInexactNanosTimestamp: nanosTimestamp,
235+
optionalMetadata: metadata,
236236
})
237237
}
238238
if rows.Err() != nil {

0 commit comments

Comments
 (0)