Skip to content

Commit e6be87d

Browse files
authored
Add an optimized QueueLeaves implementation for single-leaf batches in the PostgreSQL storage backend (#3769)
* Add an optimized QueueLeaves implementation for single-leaf batches in the PostgreSQL storage backend * Add tests * Update CHANGELOG.md * Add missing CHANGELOG.md entry for PR #3752
1 parent 0678d5c commit e6be87d

File tree

3 files changed

+81
-1
lines changed

3 files changed

+81
-1
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
### Storage
1212

1313
* For PostgreSQL, explicitly create index on SequencedLeafData(TreeId, LeafIdentityHash) by @robstradling in https://github.com/google/trillian/pull/3695
14+
* Fix error checking for QueueLeaves and AddSequencedLeaves in the PostgreSQL storage backend by @robstradling in https://github.com/google/trillian/pull/3752
15+
* Add an optimized QueueLeaves implementation for single-leaf batches in the PostgreSQL storage backend by @robstradling in https://github.com/google/trillian/pull/3769
1416
* Improve PostgreSQL functions by @robstradling in https://github.com/google/trillian/pull/3770
1517

1618
### Misc

storage/postgresql/log_storage.go

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,15 @@ import (
4242
)
4343

4444
const (
45+
queueLeafSQL = "WITH insert_leaf AS (" +
46+
"INSERT INTO LeafData (TreeId,LeafIdentityHash,LeafValue,ExtraData,QueueTimestampNanos) " +
47+
"VALUES ($1,$2,$3,$4,$5) " +
48+
"ON CONFLICT DO NOTHING " +
49+
"RETURNING *" +
50+
") " +
51+
"INSERT INTO Unsequenced (TreeId,Bucket,LeafIdentityHash,MerkleLeafHash,QueueTimestampNanos,QueueID) " +
52+
"SELECT TreeId,0,LeafIdentityHash,$6,QueueTimestampNanos,$7 " +
53+
"FROM insert_leaf"
4554
createTempQueueLeavesTable = "CREATE TEMP TABLE TempQueueLeaves (" +
4655
" TreeId BIGINT," +
4756
" LeafIdentityHash BYTEA," +
@@ -295,7 +304,14 @@ func (m *postgreSQLLogStorage) QueueLeaves(ctx context.Context, tree *trillian.T
295304
if err != nil {
296305
return nil, err
297306
}
298-
existing, err := tx.QueueLeaves(ctx, leaves, queueTimestamp)
307+
308+
// Queue leave(s), using a more efficient implementation when the batch size is 1.
309+
var existing []*trillian.LogLeaf
310+
if len(leaves) == 1 {
311+
existing, err = tx.QueueLeaf(ctx, leaves[0], queueTimestamp)
312+
} else {
313+
existing, err = tx.QueueLeaves(ctx, leaves, queueTimestamp)
314+
}
299315
if err != nil {
300316
return nil, err
301317
}
@@ -389,6 +405,55 @@ func (t *logTreeTX) DequeueLeaves(ctx context.Context, limit int, cutoffTime tim
389405
return leaves, nil
390406
}
391407

408+
func (t *logTreeTX) QueueLeaf(ctx context.Context, leaf *trillian.LogLeaf, queueTimestamp time.Time) ([]*trillian.LogLeaf, error) {
409+
t.mu.Lock()
410+
defer t.mu.Unlock()
411+
412+
// Prepare details to store, but don't accept leaf if invalid.
413+
if len(leaf.LeafIdentityHash) != t.hashSizeBytes {
414+
return nil, fmt.Errorf("queued leaf must have a leaf ID hash of length %d", t.hashSizeBytes)
415+
}
416+
leaf.QueueTimestamp = timestamppb.New(queueTimestamp)
417+
if err := leaf.QueueTimestamp.CheckValid(); err != nil {
418+
return nil, fmt.Errorf("got invalid queue timestamp: %w", err)
419+
}
420+
qTimestamp := leaf.QueueTimestamp.AsTime()
421+
args := queueArgs(t.treeID, leaf.LeafIdentityHash, qTimestamp)
422+
label := labelForTX(t)
423+
424+
// Create the leaf data record and work queue entry, unless the leaf already exists.
425+
existingLeaves := make([]*trillian.LogLeaf, 1)
426+
result, err := t.tx.Exec(ctx, queueLeafSQL, t.treeID, leaf.LeafIdentityHash, leaf.LeafValue, leaf.ExtraData, args[0], leaf.MerkleLeafHash, args[1])
427+
if err != nil {
428+
klog.Warningf("Failed to queue leaf: %s", err)
429+
return nil, postgresqlToGRPC(err)
430+
}
431+
queuedCounter.Add(1, label)
432+
if result.RowsAffected() > 0 {
433+
// Leaf did not already exist.
434+
return existingLeaves, nil
435+
}
436+
437+
var toRetrieve [][]byte
438+
toRetrieve = append(toRetrieve, leaf.LeafIdentityHash)
439+
queuedDupCounter.Inc(label)
440+
results, err := t.getLeafDataByIdentityHash(ctx, toRetrieve)
441+
if err != nil {
442+
return nil, fmt.Errorf("failed to retrieve existing leaf: %v", err)
443+
}
444+
if len(results) != len(toRetrieve) {
445+
return nil, fmt.Errorf("failed to retrieve existing leaf: got %d, want %d", len(results), len(toRetrieve))
446+
}
447+
// Replace the requested leaf with the actual leaf.
448+
if bytes.Equal(results[0].LeafIdentityHash, leaf.LeafIdentityHash) {
449+
existingLeaves[0] = results[0]
450+
} else {
451+
return nil, fmt.Errorf("failed to find existing leaf for hash %x", leaf.LeafIdentityHash)
452+
}
453+
454+
return existingLeaves, nil
455+
}
456+
392457
func (t *logTreeTX) QueueLeaves(ctx context.Context, leaves []*trillian.LogLeaf, queueTimestamp time.Time) ([]*trillian.LogLeaf, error) {
393458
t.mu.Lock()
394459
defer t.mu.Unlock()

storage/postgresql/log_storage_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ func TestQueueDuplicateLeaf(t *testing.T) {
137137
leaves2 := createTestLeaves(int64(count), 12)
138138
leaves3 := createTestLeaves(3, 100)
139139
leaves4 := createTestLeaves(3, 105)
140+
leaves5 := createTestLeaves(1, 110)
140141

141142
// Note that tests accumulate queued leaves on top of each other.
142143
tests := []struct {
@@ -165,6 +166,18 @@ func TestQueueDuplicateLeaf(t *testing.T) {
165166
leaves: []*trillian.LogLeaf{leaves3[0], leaves3[0], leaves4[1], leaves3[1], leaves4[2]},
166167
want: []*trillian.LogLeaf{leaves3[0], leaves3[0], leaves4[1], leaves3[1], leaves4[2]},
167168
},
169+
{
170+
// single leaf (dup)
171+
desc: "[100]",
172+
leaves: []*trillian.LogLeaf{leaves3[0]},
173+
want: []*trillian.LogLeaf{leaves3[0]},
174+
},
175+
{
176+
// single leaf (new)
177+
desc: "[110]",
178+
leaves: []*trillian.LogLeaf{leaves5[0]},
179+
want: []*trillian.LogLeaf{leaves5[0]},
180+
},
168181
}
169182

170183
for _, test := range tests {

0 commit comments

Comments
 (0)