Skip to content

Commit 2ed2056

Browse files
authored
Merge pull request #825 from ydb-platform/copy-tables
* Added `table.Session.CopyTables` method
2 parents 17e76a4 + 38e3516 commit 2ed2056

File tree

8 files changed

+301
-1
lines changed

8 files changed

+301
-1
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
* Added `table.Session.CopyTables` method
12
* Added `x-ydb-trace-id` header into grpc calls
23
* Improved topic reader logs
34
* Fixed `internal/xstring` package with deprecated warning in `go1.21` about `reflect.{String,Slice}Header`

internal/table/errors.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ var (
3434

3535
// errNodeIsNotObservable returned by a Client instance to indicate that required node is not observable
3636
errNodeIsNotObservable = xerrors.Wrap(errors.New("node is not observable"))
37+
38+
// errParamsRequired returned by a Client instance to indicate that required params is not defined
39+
errParamsRequired = xerrors.Wrap(errors.New("params required"))
3740
)
3841

3942
func isCreateSessionErrorRetriable(err error) bool {

internal/table/session.go

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package table
22

33
import (
44
"context"
5+
"fmt"
56
"net/url"
67
"strconv"
78
"sync"
@@ -531,7 +532,58 @@ func (s *session) CopyTable(
531532
}
532533
}
533534
_, err = s.tableService.CopyTable(ctx, &request)
534-
return xerrors.WithStackTrace(err)
535+
if err != nil {
536+
return xerrors.WithStackTrace(err)
537+
}
538+
return nil
539+
}
540+
541+
func copyTables(
542+
ctx context.Context,
543+
sessionID string,
544+
operationTimeout time.Duration,
545+
operationCancelAfter time.Duration,
546+
service interface {
547+
CopyTables(
548+
ctx context.Context, in *Ydb_Table.CopyTablesRequest, opts ...grpc.CallOption,
549+
) (*Ydb_Table.CopyTablesResponse, error)
550+
},
551+
opts ...options.CopyTablesOption,
552+
) (err error) {
553+
request := Ydb_Table.CopyTablesRequest{
554+
SessionId: sessionID,
555+
OperationParams: operation.Params(
556+
ctx,
557+
operationTimeout,
558+
operationCancelAfter,
559+
operation.ModeSync,
560+
),
561+
}
562+
for _, opt := range opts {
563+
if opt != nil {
564+
opt((*options.CopyTablesDesc)(&request))
565+
}
566+
}
567+
if len(request.Tables) == 0 {
568+
return xerrors.WithStackTrace(fmt.Errorf("no CopyTablesItem: %w", errParamsRequired))
569+
}
570+
_, err = service.CopyTables(ctx, &request)
571+
if err != nil {
572+
return xerrors.WithStackTrace(err)
573+
}
574+
return nil
575+
}
576+
577+
// CopyTables creates copy of table at given path.
578+
func (s *session) CopyTables(
579+
ctx context.Context,
580+
opts ...options.CopyTablesOption,
581+
) (err error) {
582+
err = copyTables(ctx, s.id, s.config.OperationTimeout(), s.config.OperationCancelAfter(), s.tableService, opts...)
583+
if err != nil {
584+
return xerrors.WithStackTrace(err)
585+
}
586+
return nil
535587
}
536588

537589
// Explain explains data query represented by text.

internal/table/session_test.go

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package table
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"reflect"
78
"testing"
@@ -14,13 +15,16 @@ import (
1415
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Operations"
1516
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Scheme"
1617
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Table"
18+
"google.golang.org/grpc"
1719
"google.golang.org/protobuf/proto"
20+
"google.golang.org/protobuf/types/known/durationpb"
1821

1922
"github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator"
2023
"github.com/ydb-platform/ydb-go-sdk/v3/internal/operation"
2124
"github.com/ydb-platform/ydb-go-sdk/v3/internal/table/config"
2225
"github.com/ydb-platform/ydb-go-sdk/v3/internal/value"
2326
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
27+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest"
2428
"github.com/ydb-platform/ydb-go-sdk/v3/table"
2529
"github.com/ydb-platform/ydb-go-sdk/v3/table/options"
2630
"github.com/ydb-platform/ydb-go-sdk/v3/table/types"
@@ -624,3 +628,138 @@ func TestDescribeTableRegression(t *testing.T) {
624628

625629
assert.Equal(t, exp, act)
626630
}
631+
632+
var errUnexpectedRequest = errors.New("unexpected request")
633+
634+
type copyTablesMock struct {
635+
*Ydb_Table.CopyTablesRequest
636+
}
637+
638+
func (mock *copyTablesMock) CopyTables(
639+
_ context.Context, in *Ydb_Table.CopyTablesRequest, opts ...grpc.CallOption,
640+
) (*Ydb_Table.CopyTablesResponse, error) {
641+
if in.String() == mock.String() {
642+
return &Ydb_Table.CopyTablesResponse{}, nil
643+
}
644+
return nil, fmt.Errorf("%w: %s, exp: %s", errUnexpectedRequest, in, mock.String())
645+
}
646+
647+
func Test_copyTables(t *testing.T) {
648+
ctx := xtest.Context(t)
649+
for _, tt := range []struct {
650+
sessionID string
651+
operationTimeout time.Duration
652+
operationCancelAfter time.Duration
653+
service *copyTablesMock
654+
opts []options.CopyTablesOption
655+
err error
656+
}{
657+
{
658+
sessionID: "1",
659+
operationTimeout: time.Second,
660+
operationCancelAfter: time.Second,
661+
service: &copyTablesMock{
662+
CopyTablesRequest: &Ydb_Table.CopyTablesRequest{
663+
SessionId: "1",
664+
Tables: []*Ydb_Table.CopyTableItem{
665+
{
666+
SourcePath: "from",
667+
DestinationPath: "to",
668+
OmitIndexes: true,
669+
},
670+
},
671+
OperationParams: &Ydb_Operations.OperationParams{
672+
OperationMode: Ydb_Operations.OperationParams_SYNC,
673+
OperationTimeout: durationpb.New(time.Second),
674+
CancelAfter: durationpb.New(time.Second),
675+
},
676+
},
677+
},
678+
opts: []options.CopyTablesOption{
679+
options.CopyTablesItem("from", "to", true),
680+
},
681+
err: nil,
682+
},
683+
{
684+
sessionID: "2",
685+
operationTimeout: 2 * time.Second,
686+
operationCancelAfter: 2 * time.Second,
687+
service: &copyTablesMock{
688+
CopyTablesRequest: &Ydb_Table.CopyTablesRequest{
689+
SessionId: "2",
690+
Tables: []*Ydb_Table.CopyTableItem{
691+
{
692+
SourcePath: "from1",
693+
DestinationPath: "to1",
694+
OmitIndexes: true,
695+
},
696+
{
697+
SourcePath: "from2",
698+
DestinationPath: "to2",
699+
OmitIndexes: false,
700+
},
701+
{
702+
SourcePath: "from3",
703+
DestinationPath: "to3",
704+
OmitIndexes: true,
705+
},
706+
},
707+
OperationParams: &Ydb_Operations.OperationParams{
708+
OperationMode: Ydb_Operations.OperationParams_SYNC,
709+
OperationTimeout: durationpb.New(2 * time.Second),
710+
CancelAfter: durationpb.New(2 * time.Second),
711+
},
712+
},
713+
},
714+
opts: []options.CopyTablesOption{
715+
options.CopyTablesItem("from1", "to1", true),
716+
options.CopyTablesItem("from2", "to2", false),
717+
options.CopyTablesItem("from3", "to3", true),
718+
},
719+
err: nil,
720+
},
721+
{
722+
sessionID: "3",
723+
operationTimeout: time.Second,
724+
operationCancelAfter: time.Second,
725+
service: &copyTablesMock{
726+
CopyTablesRequest: &Ydb_Table.CopyTablesRequest{
727+
SessionId: "1",
728+
Tables: []*Ydb_Table.CopyTableItem{
729+
{
730+
SourcePath: "from",
731+
DestinationPath: "to",
732+
OmitIndexes: true,
733+
},
734+
},
735+
OperationParams: &Ydb_Operations.OperationParams{
736+
OperationMode: Ydb_Operations.OperationParams_SYNC,
737+
OperationTimeout: durationpb.New(time.Second),
738+
CancelAfter: durationpb.New(time.Second),
739+
},
740+
},
741+
},
742+
opts: []options.CopyTablesOption{
743+
options.CopyTablesItem("from1", "to1", true),
744+
},
745+
err: errUnexpectedRequest,
746+
},
747+
{
748+
sessionID: "4",
749+
operationTimeout: time.Second,
750+
operationCancelAfter: time.Second,
751+
service: &copyTablesMock{},
752+
opts: nil,
753+
err: errParamsRequired,
754+
},
755+
} {
756+
t.Run("", func(t *testing.T) {
757+
err := copyTables(ctx, tt.sessionID, tt.operationTimeout, tt.operationCancelAfter, tt.service, tt.opts...)
758+
if tt.err != nil {
759+
require.ErrorIs(t, err, tt.err)
760+
} else {
761+
require.NoError(t, err)
762+
}
763+
})
764+
}
765+
}

table/example_test.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -394,3 +394,28 @@ func Example_scanQueryWithCompression() {
394394
fmt.Printf("unexpected error: %v", err)
395395
}
396396
}
397+
398+
func Example_copyTables() {
399+
ctx := context.TODO()
400+
db, err := ydb.Open(ctx, "grpc://localhost:2136/local")
401+
if err != nil {
402+
fmt.Printf("failed connect: %v", err)
403+
return
404+
}
405+
defer db.Close(ctx) // cleanup resources
406+
err = db.Table().Do(ctx,
407+
func(ctx context.Context, s table.Session) (err error) {
408+
return s.CopyTables(ctx,
409+
options.CopyTablesItem(
410+
path.Join(db.Name(), "from", "series"),
411+
path.Join(db.Name(), "to", "series"),
412+
true,
413+
),
414+
)
415+
},
416+
table.WithIdempotent(),
417+
)
418+
if err != nil {
419+
fmt.Printf("unexpected error: %v", err)
420+
}
421+
}

table/options/options.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -787,6 +787,21 @@ type (
787787
CopyTableOption func(*CopyTableDesc)
788788
)
789789

790+
type (
791+
CopyTablesDesc Ydb_Table.CopyTablesRequest
792+
CopyTablesOption func(*CopyTablesDesc)
793+
)
794+
795+
func CopyTablesItem(src, dst string, omitIndexes bool) CopyTablesOption {
796+
return func(desc *CopyTablesDesc) {
797+
desc.Tables = append(desc.Tables, &Ydb_Table.CopyTableItem{
798+
SourcePath: src,
799+
DestinationPath: dst,
800+
OmitIndexes: omitIndexes,
801+
})
802+
}
803+
}
804+
790805
type (
791806
ExecuteSchemeQueryDesc Ydb_Table.ExecuteSchemeQueryRequest
792807
ExecuteSchemeQueryOption func(*ExecuteSchemeQueryDesc)

table/table.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,11 @@ type Session interface {
118118
opts ...options.CopyTableOption,
119119
) (err error)
120120

121+
CopyTables(
122+
ctx context.Context,
123+
opts ...options.CopyTablesOption,
124+
) (err error)
125+
121126
Explain(
122127
ctx context.Context,
123128
query string,
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
//go:build integration
2+
// +build integration
3+
4+
package integration
5+
6+
import (
7+
"context"
8+
"fmt"
9+
"path"
10+
"testing"
11+
12+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest"
13+
"github.com/ydb-platform/ydb-go-sdk/v3/table"
14+
"github.com/ydb-platform/ydb-go-sdk/v3/table/options"
15+
)
16+
17+
func TestCopyTables(t *testing.T) {
18+
var (
19+
ctx = xtest.Context(t)
20+
scope = newScope(t)
21+
db = scope.Driver()
22+
fromPath = scope.TablePath()
23+
dirPath, _ = path.Split(fromPath)
24+
toPath = path.Join(dirPath, "renamed")
25+
)
26+
// copy tables
27+
err := db.Table().Do(ctx,
28+
func(ctx context.Context, s table.Session) (err error) {
29+
return s.CopyTables(ctx,
30+
options.CopyTablesItem(
31+
fromPath,
32+
toPath,
33+
true,
34+
),
35+
)
36+
},
37+
table.WithIdempotent(),
38+
)
39+
if err != nil {
40+
t.Fatal(err)
41+
}
42+
// describe table in destination path
43+
err = db.Table().Do(ctx,
44+
func(ctx context.Context, s table.Session) (err error) {
45+
d, err := s.DescribeTable(ctx, toPath)
46+
if err != nil {
47+
return err
48+
}
49+
fmt.Printf("Table `%s`:\n", toPath)
50+
for _, c := range d.Columns {
51+
fmt.Printf(" - `%s` %s\n", c.Name, c.Type.Yql())
52+
}
53+
return nil
54+
},
55+
table.WithIdempotent(),
56+
)
57+
if err != nil {
58+
t.Fatal(err)
59+
}
60+
}

0 commit comments

Comments
 (0)