Skip to content

Commit 3637f53

Browse files
authored
Merge pull request #1491 from UgnineSirdis/bulk-upsert-from-csv
Support bulk upsert from scv, arrow and ydb internal formats
2 parents e5f377a + f90d5a1 commit 3637f53

File tree

11 files changed

+574
-17
lines changed

11 files changed

+574
-17
lines changed

CHANGELOG.md

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1+
* Support bulk upsert from scv, arrow and ydb internal formats in table client
2+
13
## v3.82.0
2-
* Fixed error on experimental TopicListener.Close
4+
* Fixed error on experimental `TopicListener.Close`
35
* Disabled reporting of `ydb_go_sdk_query_session_count` when metrics are disabled
46
* Disabled reporting of `ydb_go_sdk_ydb_query_session_create_latency` histogram metrics when metrics are disabled
5-
* Allowed skip column for `ScanStruct` by tag `-`
7+
* Allowed skip column for `ScanStruct` by tag `-`
68

79
## v3.81.4
810
* Returned `topicwriter.ErrQueueLimitExceed`, accidental removed at `v3.81.0`
@@ -14,15 +16,15 @@
1416
* Removed `experimantal` comment for query service client
1517

1618
## v3.81.1
17-
* Fixed nil pointer dereference panic on failed `ydb.Open`
19+
* Fixed nil pointer dereference panic on failed `ydb.Open`
1820
* Added ip discovery. Server can show own ip address and target hostname in the ListEndpoint message. These fields are used to bypass DNS resolving.
1921

2022
## v3.81.0
2123
* Added error ErrMessagesPutToInternalQueueBeforeError to topic writer
2224
* Added write to topics within transactions
2325

2426
## v3.80.10
25-
* Added `ydb.WithSessionPoolSessionUsageLimit()` option for limitation max count of session usage
27+
* Added `ydb.WithSessionPoolSessionUsageLimit()` option for limitation max count of session usage
2628
* Refactored experimental topic iterators in `topicsugar` package
2729

2830
## v3.80.9

internal/table/client.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,11 @@ import (
44
"context"
55

66
"github.com/jonboulle/clockwork"
7+
"github.com/ydb-platform/ydb-go-genproto/Ydb_Table_V1"
8+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Table"
79
"google.golang.org/grpc"
810

11+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator"
912
"github.com/ydb-platform/ydb-go-sdk/v3/internal/pool"
1013
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
1114
"github.com/ydb-platform/ydb-go-sdk/v3/internal/table/config"
@@ -27,6 +30,7 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, config *config.Config
2730
return &Client{
2831
clock: config.Clock(),
2932
config: config,
33+
client: Ydb_Table_V1.NewTableServiceClient(cc),
3034
cc: cc,
3135
build: func(ctx context.Context) (s *session, err error) {
3236
return newSession(ctx, cc, config)
@@ -86,6 +90,7 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, config *config.Config
8690
type Client struct {
8791
// read-only fields
8892
config *config.Config
93+
client Ydb_Table_V1.TableServiceClient
8994
build sessionBuilder
9095
cc grpc.ClientConnInterface
9196
clock clockwork.Clock
@@ -264,6 +269,54 @@ func (c *Client) DoTx(ctx context.Context, op table.TxOperation, opts ...table.O
264269
}, config.RetryOptions...)
265270
}
266271

272+
func (c *Client) BulkUpsert(
273+
ctx context.Context,
274+
tableName string,
275+
data table.BulkUpsertData,
276+
opts ...table.Option,
277+
) (finalErr error) {
278+
if c == nil {
279+
return xerrors.WithStackTrace(errNilClient)
280+
}
281+
282+
if c.isClosed() {
283+
return xerrors.WithStackTrace(errClosedClient)
284+
}
285+
286+
a := allocator.New()
287+
defer a.Free()
288+
289+
config := c.retryOptions(opts...)
290+
config.RetryOptions = append(config.RetryOptions, retry.WithIdempotent(true))
291+
292+
attempts, onDone := 0, trace.TableOnBulkUpsert(config.Trace, &ctx,
293+
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/table.(*Client).BulkUpsert"),
294+
)
295+
defer func() {
296+
onDone(finalErr, attempts)
297+
}()
298+
299+
request := Ydb_Table.BulkUpsertRequest{
300+
Table: tableName,
301+
}
302+
finalErr = data.ApplyBulkUpsertRequest(a, (*table.BulkUpsertRequest)(&request))
303+
if finalErr != nil {
304+
return finalErr
305+
}
306+
307+
finalErr = retry.Retry(ctx,
308+
func(ctx context.Context) (err error) {
309+
attempts++
310+
_, err = c.client.BulkUpsert(ctx, &request)
311+
312+
return err
313+
},
314+
config.RetryOptions...,
315+
)
316+
317+
return xerrors.WithStackTrace(finalErr)
318+
}
319+
267320
func executeTxOperation(ctx context.Context, c *Client, op table.TxOperation, tx table.Transaction) (err error) {
268321
if panicCallback := c.config.PanicCallback(); panicCallback != nil {
269322
defer func() {

internal/table/session.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1261,12 +1261,11 @@ func (s *session) BulkUpsert(ctx context.Context, table string, rows value.Value
12611261
onDone = trace.TableOnSessionBulkUpsert(
12621262
s.config.Trace(), &ctx,
12631263
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/table.(*session).BulkUpsert"),
1264-
s,
12651264
)
12661265
)
12671266
defer func() {
12681267
defer a.Free()
1269-
onDone(err)
1268+
onDone(err, 1)
12701269
}()
12711270

12721271
for _, opt := range opts {

table/table.go

Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@ import (
44
"context"
55
"time"
66

7+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Formats"
78
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Table"
89

10+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator"
911
"github.com/ydb-platform/ydb-go-sdk/v3/internal/closer"
1012
"github.com/ydb-platform/ydb-go-sdk/v3/internal/params"
1113
"github.com/ydb-platform/ydb-go-sdk/v3/internal/tx"
@@ -68,6 +70,12 @@ type Client interface {
6870
// If op TxOperation return non nil - transaction will be rollback
6971
// Warning: if context without deadline or cancellation func than DoTx can run indefinitely
7072
DoTx(ctx context.Context, op TxOperation, opts ...Option) error
73+
74+
// BulkUpsert upserts a batch of rows non-transactionally.
75+
//
76+
// Returns success only when all rows were successfully upserted. In case of an error some rows might
77+
// be upserted and some might not.
78+
BulkUpsert(ctx context.Context, table string, data BulkUpsertData, opts ...Option) error
7179
}
7280

7381
type SessionStatus = string
@@ -179,6 +187,7 @@ type Session interface {
179187
opts ...options.ExecuteScanQueryOption,
180188
) (_ result.StreamResult, err error)
181189

190+
// Deprecated: use Client instance instead.
182191
BulkUpsert(
183192
ctx context.Context,
184193
table string,
@@ -578,3 +587,205 @@ func (opt traceOption) ApplyTableOption(opts *Options) {
578587
func WithTrace(t trace.Table) traceOption { //nolint:gocritic
579588
return traceOption{t: &t}
580589
}
590+
591+
type (
592+
BulkUpsertRequest Ydb_Table.BulkUpsertRequest
593+
)
594+
595+
type BulkUpsertData interface {
596+
ApplyBulkUpsertRequest(a *allocator.Allocator, req *BulkUpsertRequest) error
597+
}
598+
599+
type bulkUpsertRows struct {
600+
Rows value.Value
601+
}
602+
603+
func (data bulkUpsertRows) ApplyBulkUpsertRequest(a *allocator.Allocator, req *BulkUpsertRequest) error {
604+
req.Rows = value.ToYDB(data.Rows, a)
605+
606+
return nil
607+
}
608+
609+
func NewBulkUpsertRows(rows value.Value) bulkUpsertRows {
610+
return bulkUpsertRows{
611+
Rows: rows,
612+
}
613+
}
614+
615+
type bulkUpsertCsv struct {
616+
Data []byte
617+
Options []CsvFormatOption
618+
}
619+
620+
type CsvFormatOption interface {
621+
ApplyCsvFormatOption(req *BulkUpsertRequest) (err error)
622+
}
623+
624+
func (data bulkUpsertCsv) ApplyBulkUpsertRequest(a *allocator.Allocator, req *BulkUpsertRequest) error {
625+
req.Data = data.Data
626+
627+
var err error
628+
for _, opt := range data.Options {
629+
if opt != nil {
630+
err = opt.ApplyCsvFormatOption(req)
631+
if err != nil {
632+
return err
633+
}
634+
}
635+
}
636+
637+
return err
638+
}
639+
640+
func NewBulkUpsertCsv(data []byte, opts ...CsvFormatOption) bulkUpsertCsv {
641+
return bulkUpsertCsv{
642+
Data: data,
643+
Options: opts,
644+
}
645+
}
646+
647+
func ensureCsvDataFormatSettings(req *BulkUpsertRequest) (format *Ydb_Formats.CsvSettings) {
648+
if settings, ok := req.DataFormat.(*Ydb_Table.BulkUpsertRequest_CsvSettings); ok {
649+
if settings.CsvSettings == nil {
650+
settings.CsvSettings = &Ydb_Formats.CsvSettings{}
651+
}
652+
653+
return settings.CsvSettings
654+
}
655+
656+
req.DataFormat = &Ydb_Table.BulkUpsertRequest_CsvSettings{
657+
CsvSettings: &Ydb_Formats.CsvSettings{},
658+
}
659+
660+
settings, ok := req.DataFormat.(*Ydb_Table.BulkUpsertRequest_CsvSettings)
661+
if !ok {
662+
return nil
663+
}
664+
665+
return settings.CsvSettings
666+
}
667+
668+
type csvHeaderOption struct{}
669+
670+
func (opt *csvHeaderOption) ApplyCsvFormatOption(req *BulkUpsertRequest) error {
671+
ensureCsvDataFormatSettings(req).Header = true
672+
673+
return nil
674+
}
675+
676+
// First not skipped line is a CSV header (list of column names).
677+
func WithCsvHeader() CsvFormatOption {
678+
return &csvHeaderOption{}
679+
}
680+
681+
type csvNullValueOption struct {
682+
Value []byte
683+
}
684+
685+
func (opt *csvNullValueOption) ApplyCsvFormatOption(req *BulkUpsertRequest) error {
686+
ensureCsvDataFormatSettings(req).NullValue = opt.Value
687+
688+
return nil
689+
}
690+
691+
// String value that would be interpreted as NULL.
692+
func WithCsvNullValue(value []byte) CsvFormatOption {
693+
return &csvNullValueOption{value}
694+
}
695+
696+
type csvDelimiterOption struct {
697+
Value []byte
698+
}
699+
700+
func (opt *csvDelimiterOption) ApplyCsvFormatOption(req *BulkUpsertRequest) error {
701+
ensureCsvDataFormatSettings(req).Delimiter = opt.Value
702+
703+
return nil
704+
}
705+
706+
// Fields delimiter in CSV file. It's "," if not set.
707+
func WithCsvDelimiter(value []byte) CsvFormatOption {
708+
return &csvDelimiterOption{value}
709+
}
710+
711+
type csvSkipRowsOption struct {
712+
Count uint32
713+
}
714+
715+
func (opt *csvSkipRowsOption) ApplyCsvFormatOption(req *BulkUpsertRequest) error {
716+
ensureCsvDataFormatSettings(req).SkipRows = opt.Count
717+
718+
return nil
719+
}
720+
721+
// Number of rows to skip before CSV data. It should be present only in the first upsert of CSV file.
722+
func WithCsvSkipRows(count uint32) CsvFormatOption {
723+
return &csvSkipRowsOption{count}
724+
}
725+
726+
type bulkUpsertArrow struct {
727+
Data []byte
728+
Options []ArrowFormatOption
729+
}
730+
731+
type ArrowFormatOption interface {
732+
ApplyArrowFormatOption(req *BulkUpsertRequest) (err error)
733+
}
734+
735+
func (data bulkUpsertArrow) ApplyBulkUpsertRequest(a *allocator.Allocator, req *BulkUpsertRequest) error {
736+
req.Data = data.Data
737+
738+
var err error
739+
for _, opt := range data.Options {
740+
if opt != nil {
741+
err = opt.ApplyArrowFormatOption(req)
742+
if err != nil {
743+
return err
744+
}
745+
}
746+
}
747+
748+
return err
749+
}
750+
751+
func NewBulkUpsertArrow(data []byte, opts ...ArrowFormatOption) bulkUpsertArrow {
752+
return bulkUpsertArrow{
753+
Data: data,
754+
Options: opts,
755+
}
756+
}
757+
758+
func ensureArrowDataFormatSettings(req *BulkUpsertRequest) (format *Ydb_Formats.ArrowBatchSettings) {
759+
if settings, ok := req.DataFormat.(*Ydb_Table.BulkUpsertRequest_ArrowBatchSettings); ok {
760+
if settings.ArrowBatchSettings == nil {
761+
settings.ArrowBatchSettings = &Ydb_Formats.ArrowBatchSettings{}
762+
}
763+
764+
return settings.ArrowBatchSettings
765+
}
766+
767+
req.DataFormat = &Ydb_Table.BulkUpsertRequest_ArrowBatchSettings{
768+
ArrowBatchSettings: &Ydb_Formats.ArrowBatchSettings{},
769+
}
770+
771+
settings, ok := req.DataFormat.(*Ydb_Table.BulkUpsertRequest_ArrowBatchSettings)
772+
if !ok {
773+
return nil
774+
}
775+
776+
return settings.ArrowBatchSettings
777+
}
778+
779+
type arrowSchemaOption struct {
780+
Schema []byte
781+
}
782+
783+
func (opt *arrowSchemaOption) ApplyArrowFormatOption(req *BulkUpsertRequest) error {
784+
ensureArrowDataFormatSettings(req).Schema = opt.Schema
785+
786+
return nil
787+
}
788+
789+
func WithArrowSchema(schema []byte) ArrowFormatOption {
790+
return &arrowSchemaOption{schema}
791+
}

tests/integration/helpers_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ func (scope *scopeT) Folder() string {
180180
scope.Require.NoError(sugar.RemoveRecursive(scope.Ctx, driver, folderPath))
181181
}
182182
}
183-
scope.Logf("Createing folder done: %v", folderPath)
183+
scope.Logf("Creating folder done: %v", folderPath)
184184
return fixenv.NewGenericResultWithCleanup(folderPath, clean), nil
185185
}
186186
return fixenv.CacheResult(scope.Env, f)
@@ -310,7 +310,7 @@ func (scope *scopeT) TableName(opts ...func(t *tableNameParams)) string {
310310
createTableQueryTemplate: `
311311
PRAGMA TablePathPrefix("{{.TablePathPrefix}}");
312312
CREATE TABLE {{.TableName}} (
313-
id Int64 NOT NULL,
313+
id Int64 NOT NULL,
314314
val Text,
315315
PRIMARY KEY (id)
316316
)

0 commit comments

Comments
 (0)