Skip to content

Commit 8022130

Browse files
authored
Merge pull request #1803 from ydb-platform/issue_1511
Split bulk for bulkupsert
2 parents 64cd01d + 60e9c35 commit 8022130

File tree

6 files changed

+203
-12
lines changed

6 files changed

+203
-12
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Added the ability to send BulkRequest exceeding the GrpcMaxMessageSize
2+
13
## v3.110.0
24
* Added read partitions in parallel for topic listener.
35

driver.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,8 @@ func (d *Driver) connect(ctx context.Context) (err error) {
470470
// prepend common params from root config
471471
[]tableConfig.Option{
472472
tableConfig.With(d.config.Common),
473+
474+
tableConfig.WithMaxRequestMessageSize(d.config.GrpcMaxMessageSize()),
473475
},
474476
d.tableOptions...,
475477
)...,

internal/table/client.go

Lines changed: 93 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,14 @@ package table
22

33
import (
44
"context"
5+
"fmt"
56

67
"github.com/jonboulle/clockwork"
78
"github.com/ydb-platform/ydb-go-genproto/Ydb_Table_V1"
89
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
910
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Table"
1011
"google.golang.org/grpc"
12+
"google.golang.org/protobuf/proto"
1113

1214
"github.com/ydb-platform/ydb-go-sdk/v3/internal/pool"
1315
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
@@ -299,7 +301,7 @@ func (c *Client) BulkUpsert(
299301
retry.WithTrace(&trace.Retry{
300302
OnRetry: func(info trace.RetryLoopStartInfo) func(trace.RetryLoopDoneInfo) {
301303
return func(info trace.RetryLoopDoneInfo) {
302-
attempts = info.Attempts
304+
attempts += max(info.Attempts-1, 0) // `max` guarded against negative values
303305
}
304306
},
305307
}),
@@ -309,32 +311,111 @@ func (c *Client) BulkUpsert(
309311
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/table.(*Client).BulkUpsert"),
310312
)
311313
defer func() {
312-
onDone(finalErr, attempts)
314+
onDone(finalErr, attempts+1)
313315
}()
314316

315317
request, err := data.ToYDB(tableName)
316318
if err != nil {
317319
return xerrors.WithStackTrace(err)
318320
}
319321

322+
chunks := make([]*Ydb_Table.BulkUpsertRequest, 0, 1)
323+
324+
// We must send requests in chunks to avoid exceeding the maximum message size
325+
chunks, err = chunkBulkUpsertRequest(chunks, request, c.config.MaxRequestMessageSize())
326+
if err != nil {
327+
return err
328+
}
329+
330+
return c.sendBulkUpsertRequest(ctx, chunks, config.RetryOptions...)
331+
}
332+
333+
func (c *Client) sendBulkUpsertRequest(
334+
ctx context.Context,
335+
requests []*Ydb_Table.BulkUpsertRequest,
336+
retryOpts ...retry.Option,
337+
) error {
320338
client := Ydb_Table_V1.NewTableServiceClient(c.cc)
321339

322-
err = retry.Retry(ctx,
323-
func(ctx context.Context) (err error) {
324-
attempts++
325-
_, err = client.BulkUpsert(ctx, request)
340+
for _, request := range requests {
341+
err := retry.Retry(ctx,
342+
func(ctx context.Context) (err error) {
343+
_, err = client.BulkUpsert(ctx, request)
326344

327-
return err
328-
},
329-
config.RetryOptions...,
330-
)
331-
if err != nil {
332-
return xerrors.WithStackTrace(err)
345+
return err
346+
},
347+
retryOpts...,
348+
)
349+
if err != nil {
350+
return xerrors.WithStackTrace(err)
351+
}
333352
}
334353

335354
return nil
336355
}
337356

357+
// chunkBulkUpsertRequest splits a bulk upsert request into smaller chunks if it exceeds the maximum message size.
358+
// It recursively divides the request into smaller parts, ensuring each chunk is within the size limit.
359+
// Returns a slice of chunked bulk upsert requests or an error if the request cannot be split.
360+
func chunkBulkUpsertRequest(
361+
dst []*Ydb_Table.BulkUpsertRequest,
362+
req *Ydb_Table.BulkUpsertRequest,
363+
maxBytes int,
364+
) ([]*Ydb_Table.BulkUpsertRequest, error) {
365+
reqSize := proto.Size(req)
366+
367+
// not exceed the maximum size -> ret original request
368+
if reqSize <= maxBytes {
369+
return append(dst, req), nil
370+
}
371+
372+
// not a row bulk upsert request -> ret original request
373+
if req.GetRows() == nil || req.GetRows().GetValue() == nil {
374+
return nil, xerrors.WithStackTrace(
375+
xerrors.Wrap(
376+
fmt.Errorf("ydb: request size (%d bytes) exceeds maximum size (%d bytes) "+
377+
" but cannot be chunked (only row-based bulk upserts support chunking)", reqSize, maxBytes)))
378+
}
379+
380+
n := len(req.GetRows().GetValue().GetItems())
381+
if n == 0 {
382+
return dst, nil
383+
}
384+
385+
// we cannot split one item and one item is too big
386+
if n == 1 {
387+
return nil, xerrors.WithStackTrace(
388+
xerrors.Wrap(
389+
fmt.Errorf("ydb: single row size (%d bytes) exceeds maximum request size (%d bytes) "+
390+
"- row is too large to process", reqSize, maxBytes)))
391+
}
392+
393+
left, right := splitBulkUpsertRequestAt(req, n/2)
394+
395+
dst, err := chunkBulkUpsertRequest(dst, left, maxBytes)
396+
if err != nil {
397+
return nil, err
398+
}
399+
400+
return chunkBulkUpsertRequest(dst, right, maxBytes)
401+
}
402+
403+
// splitBulkUpsertRequestAt splits a bulk upsert request into two parts at the specified position.
404+
// It divides the request's items into two separate requests, with the first request containing
405+
// items from the start up to the specified position, and the second request containing the remaining items.
406+
// Returns two modified bulk upsert requests with their respective item sets.
407+
func splitBulkUpsertRequestAt(req *Ydb_Table.BulkUpsertRequest, pos int) (_, _ *Ydb_Table.BulkUpsertRequest) {
408+
items := req.GetRows().GetValue().GetItems() // save original items
409+
req.Rows.Value.Items = nil
410+
411+
right := proto.Clone(req).(*Ydb_Table.BulkUpsertRequest) //nolint:forcetypeassert
412+
413+
req.Rows.Value.Items = items[:pos]
414+
right.Rows.Value.Items = items[pos:]
415+
416+
return req, right
417+
}
418+
338419
func makeReadRowsRequest(
339420
sessionID string,
340421
path string,

internal/table/client_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
"testing"
99
"time"
1010

11+
"github.com/stretchr/testify/assert"
12+
"github.com/stretchr/testify/require"
1113
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Table"
1214
"google.golang.org/grpc"
1315
"google.golang.org/protobuf/proto"
@@ -19,6 +21,7 @@ import (
1921
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync"
2022
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest"
2123
"github.com/ydb-platform/ydb-go-sdk/v3/table"
24+
"github.com/ydb-platform/ydb-go-sdk/v3/table/types"
2225
"github.com/ydb-platform/ydb-go-sdk/v3/testutil"
2326
)
2427

@@ -80,6 +83,54 @@ func TestRaceWgClosed(t *testing.T) {
8083
}, xtest.StopAfter(27*time.Second))
8184
}
8285

86+
func TestChunkBulkUpsertRequest(t *testing.T) {
87+
t.Run("empty request", func(t *testing.T) {
88+
input := newTestBulkRequest(t, 0)
89+
got, err := chunkBulkUpsertRequest(nil, input, 100)
90+
require.NoError(t, err)
91+
assert.Len(t, got, 1)
92+
assert.Equal(t, input, got[0])
93+
})
94+
95+
t.Run("one chunk greater than maxSize", func(t *testing.T) {
96+
input := newTestBulkRequest(t, 1)
97+
_, err := chunkBulkUpsertRequest(nil, input, 10)
98+
assert.Error(t, err)
99+
})
100+
101+
t.Run("one request", func(t *testing.T) {
102+
input := newTestBulkRequest(t, 50)
103+
got, err := chunkBulkUpsertRequest(nil, input, 100)
104+
require.NoError(t, err)
105+
assert.Len(t, got, 2)
106+
assert.Less(t, proto.Size(got[0]), 100)
107+
assert.Less(t, proto.Size(got[1]), 100)
108+
})
109+
110+
t.Run("zero max size", func(t *testing.T) {
111+
input := newTestBulkRequest(t, 50)
112+
_, err := chunkBulkUpsertRequest(nil, input, 0)
113+
assert.Error(t, err)
114+
})
115+
}
116+
117+
func newTestBulkRequest(t *testing.T, itemsLen int) *Ydb_Table.BulkUpsertRequest {
118+
t.Helper()
119+
120+
rows := make([]types.Value, itemsLen)
121+
122+
for i := range itemsLen {
123+
rows[i] = types.StructValue()
124+
}
125+
126+
req, err := table.BulkUpsertDataRows(
127+
types.ListValue(rows...),
128+
).ToYDB("testTable")
129+
require.NoError(t, err)
130+
131+
return req
132+
}
133+
83134
var okHandler = func(interface{}) (proto.Message, error) {
84135
return &emptypb.Empty{}, nil
85136
}

internal/table/config/config.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,13 @@ func WithIgnoreTruncated() Option {
162162
}
163163
}
164164

165+
// WithMaxRequestMessageSize sets the maximum size of request message in bytes.
166+
func WithMaxRequestMessageSize(maxMessageSize int) Option {
167+
return func(c *Config) {
168+
c.maxRequestMessageSize = maxMessageSize
169+
}
170+
}
171+
165172
// ExecuteDataQueryOverQueryService overrides Execute handle with query service execute with materialized result
166173
func ExecuteDataQueryOverQueryService(b bool) Option {
167174
return func(c *Config) {
@@ -202,6 +209,8 @@ type Config struct {
202209
useQuerySession bool
203210
executeDataQueryOverQueryService bool
204211

212+
maxRequestMessageSize int
213+
205214
trace *trace.Table
206215

207216
clock clockwork.Clock
@@ -304,6 +313,13 @@ func (c *Config) DeleteTimeout() time.Duration {
304313
return c.deleteTimeout
305314
}
306315

316+
// MaxRequestMessageSize returns the maximum size in bytes for a single request message.
317+
//
318+
// If the value is exceeded, the request will be split into several parts.
319+
func (c *Config) MaxRequestMessageSize() int {
320+
return c.maxRequestMessageSize
321+
}
322+
307323
func defaults() *Config {
308324
return &Config{
309325
poolLimit: DefaultSessionPoolSizeLimit,

tests/integration/table_bulk_upsert_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"context"
88
"fmt"
99
"os"
10+
"strings"
1011
"testing"
1112

1213
"github.com/stretchr/testify/require"
@@ -15,6 +16,7 @@ import (
1516
"github.com/ydb-platform/ydb-go-sdk/v3/table"
1617
"github.com/ydb-platform/ydb-go-sdk/v3/table/result/named"
1718
"github.com/ydb-platform/ydb-go-sdk/v3/table/types"
19+
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
1820
)
1921

2022
func TestTableBulkUpsertSession(t *testing.T) {
@@ -75,6 +77,43 @@ func TestTableBulkUpsert(t *testing.T) {
7577
}
7678
}
7779

80+
func TestTableBulkUpsertGrpcMaxMessageSize(t *testing.T) {
81+
const MB = 1024 * 1024
82+
83+
var (
84+
scope = newScope(t)
85+
driver = scope.Driver(ydb.WithGrpcMaxMessageSize(10 * MB))
86+
tablePath = scope.TablePath()
87+
)
88+
89+
s := strings.Repeat("a", 5*MB)
90+
91+
var rows []types.Value
92+
for i := int64(0); i < 10; i++ { // => 50MB the whole request
93+
rows = append(rows, types.StructValue(
94+
types.StructFieldValue("id", types.Int64Value(i)),
95+
types.StructFieldValue("val", types.TextValue(s)),
96+
))
97+
}
98+
99+
attemptsTraceChecker := trace.Table{
100+
OnBulkUpsert: func(_ trace.TableBulkUpsertStartInfo) func(trace.TableBulkUpsertDoneInfo) {
101+
return func(info trace.TableBulkUpsertDoneInfo) {
102+
scope.Require.Equal(1, info.Attempts, "expected 1 attempt for successful request")
103+
}
104+
},
105+
}
106+
107+
err := driver.Table().BulkUpsert(scope.Ctx, tablePath, table.BulkUpsertDataRows(
108+
types.ListValue(rows...),
109+
), table.WithTrace(attemptsTraceChecker))
110+
scope.Require.NoError(err)
111+
112+
for i := int64(0); i < 10; i++ {
113+
assertIdValue(scope.Ctx, t, tablePath, i, s)
114+
}
115+
}
116+
78117
func TestTableCsvBulkUpsert(t *testing.T) {
79118
var (
80119
scope = newScope(t)

0 commit comments

Comments
 (0)