Skip to content

Commit 63e9d29

Browse files
committed
Merge branch 'main' into altinity
2 parents b3555c9 + fb40108 commit 63e9d29

File tree

134 files changed

+2797
-1113
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

134 files changed

+2797
-1113
lines changed

.mapping.json

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -817,6 +817,8 @@
817817
"pkg/connection/clickhouse/connection.go":"transfer_manager/go/pkg/connection/clickhouse/connection.go",
818818
"pkg/connection/clickhouse/host.go":"transfer_manager/go/pkg/connection/clickhouse/host.go",
819819
"pkg/connection/connections.go":"transfer_manager/go/pkg/connection/connections.go",
820+
"pkg/connection/greenplum/connection.go":"transfer_manager/go/pkg/connection/greenplum/connection.go",
821+
"pkg/connection/greenplum/host.go":"transfer_manager/go/pkg/connection/greenplum/host.go",
820822
"pkg/connection/mongo/connection.go":"transfer_manager/go/pkg/connection/mongo/connection.go",
821823
"pkg/connection/opensearch/connection.go":"transfer_manager/go/pkg/connection/opensearch/connection.go",
822824
"pkg/connection/opensearch/host.go":"transfer_manager/go/pkg/connection/opensearch/host.go",
@@ -1142,14 +1144,18 @@
11421144
"pkg/parsers/registry/protobuf/parser_config_proto_lb.go":"transfer_manager/go/pkg/parsers/registry/protobuf/parser_config_proto_lb.go",
11431145
"pkg/parsers/registry/protobuf/parser_proto.go":"transfer_manager/go/pkg/parsers/registry/protobuf/parser_proto.go",
11441146
"pkg/parsers/registry/protobuf/protoparser/gotest/extract_message.desc":"transfer_manager/go/pkg/parsers/registry/protobuf/protoparser/gotest/extract_message.desc",
1145-
"pkg/parsers/registry/protobuf/protoparser/gotest/metrika-streaming-data/metrika_cloud_export_hit_log.desc":"transfer_manager/go/pkg/parsers/registry/protobuf/protoparser/gotest/metrika-streaming-data/metrika_cloud_export_hit_log.desc",
1146-
"pkg/parsers/registry/protobuf/protoparser/gotest/metrika-streaming-data/metrika_cloud_export_hit_log.proto":"transfer_manager/go/pkg/parsers/registry/protobuf/protoparser/gotest/metrika-streaming-data/metrika_cloud_export_hit_log.proto",
1147-
"pkg/parsers/registry/protobuf/protoparser/gotest/metrika-streaming-data/metrika_cloud_export_hit_log_data.bin":"transfer_manager/go/pkg/parsers/registry/protobuf/protoparser/gotest/metrika-streaming-data/metrika_cloud_export_hit_log_data.bin",
1147+
"pkg/parsers/registry/protobuf/protoparser/gotest/metrika-data/metrika_hit_log.desc":"transfer_manager/go/pkg/parsers/registry/protobuf/protoparser/gotest/metrika-data/metrika_hit_log.desc",
1148+
"pkg/parsers/registry/protobuf/protoparser/gotest/metrika-data/metrika_hit_log.proto":"transfer_manager/go/pkg/parsers/registry/protobuf/protoparser/gotest/metrika-data/metrika_hit_log.proto",
1149+
"pkg/parsers/registry/protobuf/protoparser/gotest/metrika-data/metrika_hit_log_data.bin":"transfer_manager/go/pkg/parsers/registry/protobuf/protoparser/gotest/metrika-data/metrika_hit_log_data.bin",
1150+
"pkg/parsers/registry/protobuf/protoparser/gotest/metrika-data/metrika_hit_protoseq.desc":"transfer_manager/go/pkg/parsers/registry/protobuf/protoparser/gotest/metrika-data/metrika_hit_protoseq.desc",
1151+
"pkg/parsers/registry/protobuf/protoparser/gotest/metrika-data/metrika_hit_protoseq.proto":"transfer_manager/go/pkg/parsers/registry/protobuf/protoparser/gotest/metrika-data/metrika_hit_protoseq.proto",
1152+
"pkg/parsers/registry/protobuf/protoparser/gotest/metrika-data/metrika_hit_protoseq_data.bin":"transfer_manager/go/pkg/parsers/registry/protobuf/protoparser/gotest/metrika-data/metrika_hit_protoseq_data.bin",
11481153
"pkg/parsers/registry/protobuf/protoparser/gotest/proto-samples":"transfer_manager/go/pkg/parsers/registry/protobuf/protoparser/gotest/proto-samples",
11491154
"pkg/parsers/registry/protobuf/protoparser/gotest/prototest":"transfer_manager/go/pkg/parsers/registry/protobuf/protoparser/gotest/prototest",
11501155
"pkg/parsers/registry/protobuf/protoparser/gotest/prototest/std_data_types.pb.go":"",
11511156
"pkg/parsers/registry/protobuf/protoparser/gotest/prototest/std_data_types.proto":"transfer_manager/go/pkg/parsers/registry/protobuf/protoparser/gotest/prototest/std_data_types.proto",
1152-
"pkg/parsers/registry/protobuf/protoparser/metrika_streaming_data_test.go":"transfer_manager/go/pkg/parsers/registry/protobuf/protoparser/metrika_streaming_data_test.go",
1157+
"pkg/parsers/registry/protobuf/protoparser/metrika_hit_log_test.go":"transfer_manager/go/pkg/parsers/registry/protobuf/protoparser/metrika_hit_log_test.go",
1158+
"pkg/parsers/registry/protobuf/protoparser/metrika_hit_protoseq_test.go":"transfer_manager/go/pkg/parsers/registry/protobuf/protoparser/metrika_hit_protoseq_test.go",
11531159
"pkg/parsers/registry/protobuf/protoparser/proto_parser.go":"transfer_manager/go/pkg/parsers/registry/protobuf/protoparser/proto_parser.go",
11541160
"pkg/parsers/registry/protobuf/protoparser/proto_parser_config.go":"transfer_manager/go/pkg/parsers/registry/protobuf/protoparser/proto_parser_config.go",
11551161
"pkg/parsers/registry/protobuf/protoparser/proto_parser_config_test.go":"transfer_manager/go/pkg/parsers/registry/protobuf/protoparser/proto_parser_config_test.go",
@@ -1326,17 +1332,24 @@
13261332
"pkg/providers/clickhouse/query_builder_test.go":"transfer_manager/go/pkg/providers/clickhouse/query_builder_test.go",
13271333
"pkg/providers/clickhouse/recipe/chrecipe.go":"transfer_manager/go/pkg/providers/clickhouse/recipe/chrecipe.go",
13281334
"pkg/providers/clickhouse/schema.go":"transfer_manager/go/pkg/providers/clickhouse/schema.go",
1335+
"pkg/providers/clickhouse/schema/build_ddl_for_sink.go":"transfer_manager/go/pkg/providers/clickhouse/schema/build_ddl_for_sink.go",
13291336
"pkg/providers/clickhouse/schema/ddl.go":"transfer_manager/go/pkg/providers/clickhouse/schema/ddl.go",
13301337
"pkg/providers/clickhouse/schema/ddl_batch.go":"transfer_manager/go/pkg/providers/clickhouse/schema/ddl_batch.go",
13311338
"pkg/providers/clickhouse/schema/ddl_parser/clickhouse_lexer/clickhouse_lexer.go":"transfer_manager/go/pkg/providers/clickhouse/schema/ddl_parser/clickhouse_lexer/clickhouse_lexer.go",
13321339
"pkg/providers/clickhouse/schema/ddl_parser/clickhouse_lexer/lexer.go":"transfer_manager/go/pkg/providers/clickhouse/schema/ddl_parser/clickhouse_lexer/lexer.go",
13331340
"pkg/providers/clickhouse/schema/ddl_parser/clickhouse_lexer/readme.md":"transfer_manager/go/pkg/providers/clickhouse/schema/ddl_parser/clickhouse_lexer/readme.md",
13341341
"pkg/providers/clickhouse/schema/ddl_parser/ddl_parser.go":"transfer_manager/go/pkg/providers/clickhouse/schema/ddl_parser/ddl_parser.go",
1335-
"pkg/providers/clickhouse/schema/ddl_sink.go":"transfer_manager/go/pkg/providers/clickhouse/schema/ddl_sink.go",
13361342
"pkg/providers/clickhouse/schema/ddl_source.go":"transfer_manager/go/pkg/providers/clickhouse/schema/ddl_source.go",
1337-
"pkg/providers/clickhouse/schema/ddl_test.go":"transfer_manager/go/pkg/providers/clickhouse/schema/ddl_test.go",
13381343
"pkg/providers/clickhouse/schema/describe.go":"transfer_manager/go/pkg/providers/clickhouse/schema/describe.go",
1339-
"pkg/providers/clickhouse/schema/engine.go":"transfer_manager/go/pkg/providers/clickhouse/schema/engine.go",
1344+
"pkg/providers/clickhouse/schema/engines/abstract.go":"transfer_manager/go/pkg/providers/clickhouse/schema/engines/abstract.go",
1345+
"pkg/providers/clickhouse/schema/engines/build_ddl_for_sink.go":"transfer_manager/go/pkg/providers/clickhouse/schema/engines/build_ddl_for_sink.go",
1346+
"pkg/providers/clickhouse/schema/engines/build_ddl_for_sink_utils.go":"transfer_manager/go/pkg/providers/clickhouse/schema/engines/build_ddl_for_sink_utils.go",
1347+
"pkg/providers/clickhouse/schema/engines/build_ddl_for_sink_utils_test.go":"transfer_manager/go/pkg/providers/clickhouse/schema/engines/build_ddl_for_sink_utils_test.go",
1348+
"pkg/providers/clickhouse/schema/engines/const.go":"transfer_manager/go/pkg/providers/clickhouse/schema/engines/const.go",
1349+
"pkg/providers/clickhouse/schema/engines/factory.go":"transfer_manager/go/pkg/providers/clickhouse/schema/engines/factory.go",
1350+
"pkg/providers/clickhouse/schema/engines/merge_tree_family_engine.go":"transfer_manager/go/pkg/providers/clickhouse/schema/engines/merge_tree_family_engine.go",
1351+
"pkg/providers/clickhouse/schema/engines/replicated_engine.go":"transfer_manager/go/pkg/providers/clickhouse/schema/engines/replicated_engine.go",
1352+
"pkg/providers/clickhouse/schema/engines/util.go":"transfer_manager/go/pkg/providers/clickhouse/schema/engines/util.go",
13401353
"pkg/providers/clickhouse/sharding/sharder.go":"transfer_manager/go/pkg/providers/clickhouse/sharding/sharder.go",
13411354
"pkg/providers/clickhouse/sharding/sharding_model.go":"transfer_manager/go/pkg/providers/clickhouse/sharding/sharding_model.go",
13421355
"pkg/providers/clickhouse/sink.go":"transfer_manager/go/pkg/providers/clickhouse/sink.go",
@@ -2344,6 +2357,8 @@
23442357
"pkg/worker/tasks/load_snapshot_v2.go":"transfer_manager/go/pkg/worker/tasks/load_snapshot_v2.go",
23452358
"pkg/worker/tasks/load_snapshot_v2_test.go":"transfer_manager/go/pkg/worker/tasks/load_snapshot_v2_test.go",
23462359
"pkg/worker/tasks/load_snapshot_with_transformers_test.go":"transfer_manager/go/pkg/worker/tasks/load_snapshot_with_transformers_test.go",
2360+
"pkg/worker/tasks/local_table_part_provider.go":"transfer_manager/go/pkg/worker/tasks/local_table_part_provider.go",
2361+
"pkg/worker/tasks/remote_table_part_provider.go":"transfer_manager/go/pkg/worker/tasks/remote_table_part_provider.go",
23472362
"pkg/worker/tasks/remove_tables.go":"transfer_manager/go/pkg/worker/tasks/remove_tables.go",
23482363
"pkg/worker/tasks/reupload.go":"transfer_manager/go/pkg/worker/tasks/reupload.go",
23492364
"pkg/worker/tasks/s3coordinator/load_sharded_snapshot_test.go":"transfer_manager/go/pkg/worker/tasks/s3coordinator/load_sharded_snapshot_test.go",
@@ -2856,6 +2871,9 @@
28562871
"tests/e2e/pg2ch/alters/alters_test.go":"transfer_manager/go/tests/e2e/pg2ch/alters/alters_test.go",
28572872
"tests/e2e/pg2ch/alters/dump/ch/dump.sql":"transfer_manager/go/tests/e2e/pg2ch/alters/dump/ch/dump.sql",
28582873
"tests/e2e/pg2ch/alters/dump/pg/dump.sql":"transfer_manager/go/tests/e2e/pg2ch/alters/dump/pg/dump.sql",
2874+
"tests/e2e/pg2ch/alters_snapshot/alters_test.go":"transfer_manager/go/tests/e2e/pg2ch/alters_snapshot/alters_test.go",
2875+
"tests/e2e/pg2ch/alters_snapshot/dump/ch/dump.sql":"transfer_manager/go/tests/e2e/pg2ch/alters_snapshot/dump/ch/dump.sql",
2876+
"tests/e2e/pg2ch/alters_snapshot/dump/pg/dump.sql":"transfer_manager/go/tests/e2e/pg2ch/alters_snapshot/dump/pg/dump.sql",
28592877
"tests/e2e/pg2ch/alters_with_defaults/alters_test.go":"transfer_manager/go/tests/e2e/pg2ch/alters_with_defaults/alters_test.go",
28602878
"tests/e2e/pg2ch/alters_with_defaults/dump/ch/dump.sql":"transfer_manager/go/tests/e2e/pg2ch/alters_with_defaults/dump/ch/dump.sql",
28612879
"tests/e2e/pg2ch/alters_with_defaults/dump/pg/dump.sql":"transfer_manager/go/tests/e2e/pg2ch/alters_with_defaults/dump/pg/dump.sql",

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ require (
8383
go.uber.org/mock v0.5.2
8484
go.uber.org/zap v1.27.0
8585
go.ytsaurus.tech/library/go/core/log v0.0.4
86-
go.ytsaurus.tech/yt/go v0.0.26
87-
golang.org/x/crypto v0.39.0
86+
go.ytsaurus.tech/yt/go v0.0.27
87+
golang.org/x/crypto v0.38.0
8888
golang.org/x/exp v0.0.0-20250305212735-054e65f0b394
8989
golang.org/x/mod v0.25.0
9090
golang.org/x/net v0.41.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2608,8 +2608,8 @@ go.ytsaurus.tech/library/go/x/xreflect v0.0.3 h1:LCOjVDGKjKMTaFtn+iudhPAdvcjeJSX
26082608
go.ytsaurus.tech/library/go/x/xreflect v0.0.3/go.mod h1:D57na+z+EjaRuBo+nxgq6KPw5wfdHtO50MdcwBAzhq0=
26092609
go.ytsaurus.tech/library/go/x/xruntime v0.0.4 h1:VNstd2dkPZEN6nsJ3C+q/fVc4b2hajQ6ZYBS7+k7aBg=
26102610
go.ytsaurus.tech/library/go/x/xruntime v0.0.4/go.mod h1:fS4AUByc8QIHG06qxEjXYYs8B41eDh+yo2Q1Pk+msoA=
2611-
go.ytsaurus.tech/yt/go v0.0.26 h1:cxSY0rVV/eB6WXnnWCaU3QTMxGkRtkrQKMfmq3Lfq0Y=
2612-
go.ytsaurus.tech/yt/go v0.0.26/go.mod h1:2m5qZ+jpm5uUohx0A/33Pll9oYEndgOjhHZm+skLq/M=
2611+
go.ytsaurus.tech/yt/go v0.0.27 h1:UZ/WfsyzbPGyCsYdr858EtcmDwu8Vv4tiiY9i9usJcg=
2612+
go.ytsaurus.tech/yt/go v0.0.27/go.mod h1:Lm1+KyATKXVpbV1ZzuhrU1sX3sqcAiqXuXBpmvxliZM=
26132613
golang.org/x/crypto v0.0.0-20171113213409-9f005a07e0d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
26142614
golang.org/x/crypto v0.0.0-20180723164146-c126467f60eb/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
26152615
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=

internal/logger/otel_log.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,13 @@ import (
1515
corezap "go.ytsaurus.tech/library/go/core/log/zap"
1616
)
1717

18+
const (
19+
KeyTransferID = "transfer_id"
20+
KeyDstType = "labels.dst_type"
21+
KeySrcType = "labels.src_type"
22+
KeyApp = "labels.app"
23+
)
24+
1825
var OtelLog log.Logger = &corezap.Logger{L: zap.NewNop()}
1926

2027
type OtelLoggerConfig struct {
@@ -54,7 +61,7 @@ func NewOtelLog(ctx context.Context, cfg OtelLoggerConfig) (lgr log.Logger, clos
5461
"",
5562
otelzap.WithLoggerProvider(logProvider),
5663
)
57-
l := log.With(corezap.NewWithCore(core))
64+
l := log.With(corezap.NewWithCore(core), log.String(KeyApp, cfg.ServiceName))
5865
OtelLog = l
5966
return l, func(ctx context.Context) {
6067
ctx, cancel := context.WithTimeout(ctx, time.Second*5)

library/go/slices/dedup.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package slices
22

33
import (
4+
"slices"
45
"sort"
56

67
"golang.org/x/exp/constraints"
7-
"golang.org/x/exp/slices"
88
)
99

1010
// Dedup removes duplicate values from slice.
@@ -13,15 +13,17 @@ func Dedup[E constraints.Ordered](s []E) []E {
1313
if len(s) < 2 {
1414
return s
1515
}
16+
1617
slices.Sort(s)
17-
tmp := s[:1]
18-
cur := s[0]
18+
19+
cur, tmp := s[0], s[:1]
1920
for i := 1; i < len(s); i++ {
2021
if s[i] != cur {
2122
tmp = append(tmp, s[i])
2223
cur = s[i]
2324
}
2425
}
26+
2527
return tmp
2628
}
2729

@@ -31,15 +33,17 @@ func DedupBools(a []bool) []bool {
3133
if len(a) < 2 {
3234
return a
3335
}
36+
3437
sort.Slice(a, func(i, j int) bool { return a[i] != a[j] })
35-
tmp := a[:1]
36-
cur := a[0]
38+
39+
cur, tmp := a[0], a[:1]
3740
for i := 1; i < len(a); i++ {
3841
if a[i] != cur {
3942
tmp = append(tmp, a[i])
4043
cur = a[i]
4144
}
4245
}
46+
4347
return tmp
4448
}
4549

library/go/slices/intersects.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,22 @@ func Intersection[E comparable](a, b []E) []E {
1414
p, s = b, a
1515
}
1616

17-
m := make(map[E]struct{})
17+
m := make(map[E]struct{}, len(s))
1818
for _, i := range s {
1919
m[i] = struct{}{}
2020
}
2121

22-
var res []E
22+
res := make([]E, 0, len(m))
2323
for _, v := range p {
2424
if _, exists := m[v]; exists {
2525
res = append(res, v)
2626
}
2727
}
2828

29+
if len(res) == 0 {
30+
return nil
31+
}
32+
2933
return res
3034
}
3135

library/go/slices/subtract.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@ package slices
22

33
// Subtract returns copy of slice a without elements of slice b.
44
func Subtract[T comparable](a, b []T) []T {
5-
set := make(map[T]struct{})
5+
set := make(map[T]struct{}, len(b))
66
for _, elem := range b {
77
set[elem] = struct{}{}
88
}
9+
910
return Filter(a, func(elem T) bool {
1011
_, ok := set[elem]
1112
return !ok

pkg/abstract/coordinator/coordinator_inmemory.go

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,20 @@ import (
1111
type CoordinatorInMemory struct {
1212
*CoordinatorNoOp
1313

14-
mu sync.Mutex
15-
state map[string]map[string]*TransferStateData
16-
17-
progress []*model.OperationTablePart
14+
mu sync.Mutex
15+
state map[string]map[string]*TransferStateData
16+
taskState map[string]string
17+
progress []*model.OperationTablePart
1818
}
1919

2020
func NewStatefulFakeClient() *CoordinatorInMemory {
2121
return &CoordinatorInMemory{
2222
CoordinatorNoOp: NewFakeClient(),
2323

24-
mu: sync.Mutex{},
25-
state: map[string]map[string]*TransferStateData{},
26-
27-
progress: nil,
24+
mu: sync.Mutex{},
25+
state: map[string]map[string]*TransferStateData{},
26+
taskState: map[string]string{},
27+
progress: nil,
2828
}
2929
}
3030

@@ -69,3 +69,20 @@ func (f *CoordinatorInMemory) RemoveTransferState(transferID string, stateKeys [
6969
}
7070
return nil
7171
}
72+
73+
func (f *CoordinatorInMemory) SetOperationState(taskID string, state string) error {
74+
f.mu.Lock()
75+
defer f.mu.Unlock()
76+
f.taskState[taskID] = state
77+
return nil
78+
}
79+
80+
func (f *CoordinatorInMemory) GetOperationState(taskID string) (string, error) {
81+
f.mu.Lock()
82+
defer f.mu.Unlock()
83+
state, ok := f.taskState[taskID]
84+
if !ok {
85+
return "", OperationStateNotFoundError
86+
}
87+
return state, nil
88+
}

pkg/abstract/coordinator/operation.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,7 @@ type Sharding interface {
6262
// and return the number of table parts for which the assignment was cleared
6363
ClearAssignedTablesParts(ctx context.Context, operationID string, workerIndex int) (int64, error)
6464
// UpdateOperationTablesParts update tables parts for operation
65-
// used to track more granular part progress
66-
//
67-
// Deprecated: used only in A2
65+
// used to track more granular part progress.
6866
UpdateOperationTablesParts(operationID string, tables []*model.OperationTablePart) error
6967
}
7068

pkg/abstract/model/transfer_operation_table_part.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,21 @@ func (t *OperationTablePart) String() string {
140140
return fmt.Sprintf("%v [%v/%v]%v", t.TableFQTN(), t.PartIndex+1, t.PartsCount, otherInfoString)
141141
}
142142

143+
func (t *OperationTablePart) StringWithoutFilter() string {
144+
otherInfo := []string{}
145+
if t.ETARows != 0 {
146+
otherInfo = append(otherInfo, fmt.Sprintf("ETARows: %v", t.ETARows))
147+
}
148+
if t.Offset != 0 {
149+
otherInfo = append(otherInfo, fmt.Sprintf("Offset: %v", t.Offset))
150+
}
151+
otherInfoString := ""
152+
if len(otherInfo) > 0 {
153+
otherInfoString = fmt.Sprintf(" (%v)", strings.Join(otherInfo, ", "))
154+
}
155+
return fmt.Sprintf("%v [%v/%v]%v", t.TableFQTN(), t.PartIndex+1, t.PartsCount, otherInfoString)
156+
}
157+
143158
func (t *OperationTablePart) Sharded() bool {
144159
return t.PartsCount > 1
145160
}

0 commit comments

Comments
 (0)