Skip to content

Commit d59a8d1

Browse files
authored
UnrollSetMany now respects gRPC size limits, too (#589)
* UnrollSetMany now respects gRPC size limits, too * const lives in one place
1 parent 8bac1bd commit d59a8d1

File tree

6 files changed

+206
-58
lines changed

6 files changed

+206
-58
lines changed

pkg/dotc1z/session_store.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,6 @@ type SessionStore interface {
1717

1818
var _ sessions.SessionStore = (*C1File)(nil)
1919

20-
// The default gRPC message size limit is 4MB (we subtract 30KB for general overhead, which is overkill).
21-
// Unfortunately, this layer has to be aware of the size limit to avoid exceeding the size limit
22-
// because the client does not know the size of the items it requests.
23-
const sessionStoreSizeLimit = 4163584
2420
const sessionStoreTableVersion = "1"
2521
const sessionStoreTableName = "connector_sessions"
2622
const sessionStoreTableSchema = `
@@ -308,7 +304,7 @@ func (c *C1File) GetMany(ctx context.Context, keys []string, opt ...sessions.Ses
308304
key := r.key
309305

310306
netItemSize := len(value) + 10 // 10 is extra padding for overhead.
311-
if messageSize+netItemSize <= sessionStoreSizeLimit {
307+
if messageSize+netItemSize <= sessions.MaxSessionStoreSizeLimit {
312308
messageSize += netItemSize
313309
ret[key] = value
314310
} else {
@@ -331,7 +327,7 @@ func (c *C1File) GetAll(ctx context.Context, pageToken string, opt ...sessions.S
331327
}
332328

333329
result := make(map[string][]byte)
334-
messageSizeRemaining := sessionStoreSizeLimit
330+
messageSizeRemaining := sessions.MaxSessionStoreSizeLimit
335331
for {
336332
items, nextPageToken, itemsSize, err := c.getAllChunk(ctx, pageToken, messageSizeRemaining, bag)
337333
if err != nil {

pkg/dotc1z/session_store_test.go

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -225,10 +225,10 @@ func TestC1FileSessionStore_GetMany(t *testing.T) {
225225
require.Empty(t, unprocessedKeys)
226226
})
227227

228-
// Test 1: Size limit enforcement - values exceed 4163584 bytes
228+
// Test 1: Size limit enforcement - values exceed MaxSessionStoreSizeLimit
229229
t.Run("GetMany size limit enforcement", func(t *testing.T) {
230-
// Create values that total more than 4163584 bytes
231-
// Each value is ~1MB, so 5 values = ~5MB > 4163584 bytes
230+
// Create values that total more than MaxSessionStoreSizeLimit
231+
// Each value is ~1MB, so 5 values = ~5MB > MaxSessionStoreSizeLimit
232232
keys := []string{"size-limit-key1", "size-limit-key2", "size-limit-key3", "size-limit-key4", "size-limit-key5"}
233233
for i, key := range keys {
234234
// Each value is approximately 1MB (1048576 bytes)
@@ -244,12 +244,12 @@ func TestC1FileSessionStore_GetMany(t *testing.T) {
244244
require.NotEmpty(t, result, "should have some results")
245245
require.NotEmpty(t, unprocessedKeys, "should have some unprocessed keys")
246246

247-
// Verify total size of result is <= 4163584
247+
// Verify total size of result is <= MaxSessionStoreSizeLimit
248248
totalSize := 0
249249
for _, value := range result {
250250
totalSize += len(value)
251251
}
252-
require.LessOrEqual(t, totalSize, sessionStoreSizeLimit, "result size should be within limit")
252+
require.LessOrEqual(t, totalSize, sessions.MaxSessionStoreSizeLimit, "result size should be within limit")
253253

254254
// Verify all requested keys are either in result or unprocessedKeys
255255
allKeys := make(map[string]bool)
@@ -310,11 +310,11 @@ func TestC1FileSessionStore_GetMany(t *testing.T) {
310310
require.True(t, hasLargeUnprocessed, "at least one large key should be unprocessed")
311311
})
312312

313-
// Test 4: Exact boundary at 4163584 bytes
313+
// Test 4: Exact boundary at MaxSessionStoreSizeLimit
314314
t.Run("GetMany exact boundary at limit", func(t *testing.T) {
315-
// Create values that sum to exactly 4163584 bytes
315+
// Create values that sum to exactly MaxSessionStoreSizeLimit
316316
keys := []string{"1", "2", "3", "4"}
317-
valueSize := (sessionStoreSizeLimit - 4 - (20 * 4)) / 4 // 1040896 bytes each - (4) bytes for key and 20 bytes for value
317+
valueSize := (sessions.MaxSessionStoreSizeLimit - 4 - (20 * 4)) / 4 // 1040896 bytes each - (4) bytes for key and 20 bytes for value
318318

319319
for i, key := range keys {
320320
value := bytes.Repeat([]byte{byte(i)}, valueSize)
@@ -330,7 +330,7 @@ func TestC1FileSessionStore_GetMany(t *testing.T) {
330330
for _, value := range result {
331331
totalSize += len(value)
332332
}
333-
require.LessOrEqual(t, totalSize, sessionStoreSizeLimit, "total size should be within limit")
333+
require.LessOrEqual(t, totalSize, sessions.MaxSessionStoreSizeLimit, "total size should be within limit")
334334
require.Empty(t, unprocessedKeys, "all keys should fit at exact boundary")
335335
require.Len(t, result, len(keys), "all keys should be in result")
336336
})
@@ -370,7 +370,7 @@ func TestC1FileSessionStore_GetMany(t *testing.T) {
370370
keys := make([]string, numKeys)
371371
for i := range numKeys {
372372
keys[i] = fmt.Sprintf("completeness-key-%d", i)
373-
// Each value is ~500KB, so 10 values = ~5MB > 4163584 bytes
373+
// Each value is ~500KB, so 10 values = ~5MB > MaxSessionStoreSizeLimit
374374
value := bytes.Repeat([]byte{byte(i)}, 500000)
375375
err := c1zFile.Set(ctx, keys[i], value, sessions.WithSyncID(syncID))
376376
require.NoError(t, err)
@@ -449,7 +449,7 @@ func TestC1FileSessionStore_GetMany(t *testing.T) {
449449
for _, value := range result {
450450
totalSize += len(value)
451451
}
452-
require.LessOrEqual(t, totalSize, sessionStoreSizeLimit, "result size should be within limit")
452+
require.LessOrEqual(t, totalSize, sessions.MaxSessionStoreSizeLimit, "result size should be within limit")
453453

454454
// Large values should be in unprocessedKeys (or at least some)
455455
require.NotEmpty(t, unprocessedKeys, "should have unprocessed keys for large values")
@@ -802,7 +802,7 @@ func TestC1FileSessionStore_GetAll(t *testing.T) {
802802
require.NoError(t, err)
803803

804804
// Create items that will exceed size limit
805-
// Each value is ~500KB, so 10 items = ~5MB > 4163584 bytes
805+
// Each value is ~500KB, so 10 items = ~5MB > MaxSessionStoreSizeLimit
806806
for i := range 10 {
807807
key := fmt.Sprintf("size-limit-key-%d", i)
808808
value := bytes.Repeat([]byte{byte(i)}, 500000)
@@ -826,7 +826,7 @@ func TestC1FileSessionStore_GetAll(t *testing.T) {
826826
for _, value := range result {
827827
pageSize += len(value)
828828
}
829-
require.LessOrEqual(t, pageSize, sessionStoreSizeLimit, "page size should be within limit")
829+
require.LessOrEqual(t, pageSize, sessions.MaxSessionStoreSizeLimit, "page size should be within limit")
830830

831831
maps.Copy(all, result)
832832

@@ -1168,7 +1168,7 @@ func TestC1FileSessionStore_GetAll(t *testing.T) {
11681168
for _, value := range result {
11691169
pageSize += len(value)
11701170
}
1171-
require.LessOrEqual(t, pageSize, sessionStoreSizeLimit, "each page should be within size limit")
1171+
require.LessOrEqual(t, pageSize, sessions.MaxSessionStoreSizeLimit, "each page should be within size limit")
11721172

11731173
maps.Copy(all, result)
11741174

@@ -1401,7 +1401,7 @@ func TestC1FileSessionStore_GetAll(t *testing.T) {
14011401
for _, value := range result {
14021402
pageSize += len(value)
14031403
}
1404-
require.LessOrEqual(t, pageSize, sessionStoreSizeLimit, "page size should be within limit")
1404+
require.LessOrEqual(t, pageSize, sessions.MaxSessionStoreSizeLimit, "page size should be within limit")
14051405

14061406
maps.Copy(all, result)
14071407

@@ -1501,7 +1501,7 @@ func TestC1FileSessionStore_GetAll(t *testing.T) {
15011501
for _, value := range result {
15021502
pageSize += len(value)
15031503
}
1504-
require.LessOrEqual(t, pageSize, sessionStoreSizeLimit, "chunk should respect size limit")
1504+
require.LessOrEqual(t, pageSize, sessions.MaxSessionStoreSizeLimit, "chunk should respect size limit")
15051505
// With 500KB items, should have ~8 items per chunk, not 100
15061506
if len(result) > 0 {
15071507
require.Less(t, len(result), 100, "chunk should be limited by size, not item count")
@@ -1529,7 +1529,7 @@ func TestC1FileSessionStore_GetAll(t *testing.T) {
15291529

15301530
// Create scenario where size limit and empty pageToken could both apply
15311531
// Create items that will exactly fill the size limit
1532-
itemSize := sessionStoreSizeLimit / 5 // ~832KB per item, 5 items = ~4MB
1532+
itemSize := sessions.MaxSessionStoreSizeLimit / 5 // ~832KB per item, 5 items = ~4MB
15331533
for i := range 5 {
15341534
key := fmt.Sprintf("terminate-key-%d", i)
15351535
value := bytes.Repeat([]byte{byte(i)}, itemSize)
@@ -1811,7 +1811,7 @@ func TestC1FileSessionStore_Performance(t *testing.T) {
18111811
for _, value := range items {
18121812
itemsSize += len(value)
18131813
}
1814-
require.Less(t, itemsSize, sessionStoreSizeLimit)
1814+
require.Less(t, itemsSize, sessions.MaxSessionStoreSizeLimit)
18151815
maps.Copy(all, items)
18161816

18171817
log.Printf("itemsSize: %d, items: %d, nextPageToken: %s, pageToken: %s", itemsSize, len(items), nextPageToken, pageToken)

pkg/session/json_session.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,25 @@ func GetManyJSON[T any](ctx context.Context, ss sessions.SessionStore, keys []st
3030
}
3131

3232
func SetManyJSON[T any](ctx context.Context, ss sessions.SessionStore, items map[string]T, opt ...sessions.SessionStoreOption) error {
33-
bytesMap := make(map[string][]byte)
34-
35-
for key, item := range items {
36-
bytes, err := json.Marshal(item)
37-
if err != nil {
38-
return fmt.Errorf("failed to marshal item for key %s: %w", key, err)
33+
// Lazy iterator that marshals items on demand, yielding (item, error) pairs
34+
sizedItems := func(yield func(SizedItem[[]byte], error) bool) {
35+
for key, item := range items {
36+
bytes, err := json.Marshal(item)
37+
if err != nil {
38+
yield(SizedItem[[]byte]{}, fmt.Errorf("failed to marshal item for key %s: %w", key, err))
39+
return
40+
}
41+
if !yield(SizedItem[[]byte]{
42+
Key: key,
43+
Value: bytes,
44+
Size: len(key) + len(bytes) + 20,
45+
}, nil) {
46+
return
47+
}
3948
}
40-
bytesMap[key] = bytes
4149
}
4250

43-
return UnrollSetMany(ctx, ss, bytesMap, opt...)
51+
return UnrollSetMany(ctx, ss, sizedItems, opt...)
4452
}
4553

4654
func GetJSON[T any](ctx context.Context, ss sessions.SessionStore, key string, opt ...sessions.SessionStoreOption) (T, bool, error) {

pkg/session/session.go

Lines changed: 38 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@ import (
99
"github.com/conductorone/baton-sdk/pkg/types/sessions"
1010
)
1111

12-
const MaxKeysPerRequest = 100
13-
1412
func Chunk[T any](items []T, chunkSize int) iter.Seq[[]T] {
1513
return func(yield func([]T) bool) {
1614
for i := 0; i < len(items); i += chunkSize {
@@ -33,7 +31,7 @@ func UnrollGetMany[T any](ctx context.Context, ss GetManyable[T], keys []string,
3331
}
3432

3533
// TODO(Kans): parallelize this?
36-
for keyChunk := range Chunk(keys, MaxKeysPerRequest) {
34+
for keyChunk := range Chunk(keys, sessions.MaxKeysPerRequest) {
3735
// For each chunk, unroll any unprocessed keys until all are processed
3836
remainingKeys := keyChunk
3937
for {
@@ -66,31 +64,50 @@ type SetManyable[T any] interface {
6664
SetMany(ctx context.Context, values map[string]T, opt ...sessions.SessionStoreOption) error
6765
}
6866

69-
func UnrollSetMany[T any](ctx context.Context, ss SetManyable[T], items map[string]T, opt ...sessions.SessionStoreOption) error {
70-
if len(items) == 0 {
71-
return nil
72-
}
73-
if len(items) <= MaxKeysPerRequest {
74-
return ss.SetMany(ctx, items, opt...)
75-
}
67+
// SizedItem represents a key-value pair with its size in bytes.
68+
type SizedItem[T any] struct {
69+
Key string
70+
Value T
71+
Size int // size in bytes of key + value
72+
}
7673

77-
keys := make([]string, 0, len(items))
78-
for key := range items {
79-
keys = append(keys, key)
80-
}
74+
// UnrollSetMany takes an iterator of sized items and batches them into SetMany calls,
75+
// respecting both MaxKeysPerRequest and MaxSessionStoreSizeLimit.
76+
// The iterator yields (item, error) pairs; iteration stops on the first error.
77+
func UnrollSetMany[T any](ctx context.Context, ss SetManyable[T], items iter.Seq2[SizedItem[T], error], opt ...sessions.SessionStoreOption) error {
78+
currentChunk := make(map[string]T)
79+
currentSize := 0
8180

82-
// TODO(Kans): parallelize this?
83-
for keyChunk := range Chunk(keys, MaxKeysPerRequest) {
84-
some := make(map[string]T)
85-
for _, key := range keyChunk {
86-
some[key] = items[key]
81+
flush := func() error {
82+
if len(currentChunk) == 0 {
83+
return nil
84+
}
85+
err := ss.SetMany(ctx, currentChunk, opt...)
86+
if err != nil {
87+
return err
8788
}
88-
err := ss.SetMany(ctx, some, opt...)
89+
currentChunk = make(map[string]T)
90+
currentSize = 0
91+
return nil
92+
}
93+
94+
for item, err := range items {
8995
if err != nil {
9096
return err
9197
}
98+
99+
// Flush if adding this item would exceed either limit
100+
if len(currentChunk) >= sessions.MaxKeysPerRequest || (currentSize+item.Size >= sessions.MaxSessionStoreSizeLimit && len(currentChunk) > 0) {
101+
if err := flush(); err != nil {
102+
return err
103+
}
104+
}
105+
106+
currentChunk[item.Key] = item.Value
107+
currentSize += item.Size
92108
}
93-
return nil
109+
110+
return flush()
94111
}
95112

96113
type GetAllable[T any] interface {

0 commit comments

Comments
 (0)