Skip to content

Commit 84dd72f

Browse files
committed
prefer master for dblog
commit_hash:09845e67ac55a4a96de01e6a03964927350cb9ad
1 parent 4cb89b5 commit 84dd72f

File tree

10 files changed

+282
-21
lines changed

10 files changed

+282
-21
lines changed

pkg/dblog/tests/utils_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@ import (
1818
var (
1919
storage = CreateMockStorage()
2020

21-
isSupporterKeyType = func(keyType string) bool {
21+
isSupporterKeyType = func(keyType string) dblog.TypeSupport {
2222
if keyType == "pg:text" || keyType == "pg:int" {
23-
return true
23+
return dblog.TypeSupported
2424
}
2525

26-
return false
26+
return dblog.TypeUnsupported
2727
}
2828

2929
converter = func(val interface{}, colSchema abstract.ColSchema) (string, error) {

pkg/dblog/utils.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,14 @@ const (
2222
emptySQLTuple = "()"
2323
)
2424

25+
type TypeSupport int
26+
27+
const (
28+
TypeSupported TypeSupport = 1
29+
TypeUnsupported TypeSupport = 2
30+
TypeUnknown TypeSupport = 3
31+
)
32+
2533
type ChangeItemConverter func(val interface{}, colSchema abstract.ColSchema) (string, error)
2634

2735
func InferChunkSize(storage abstract.SampleableStorage, tableID abstract.TableID, chunkSizeInBytes uint64) (uint64, error) {
@@ -77,7 +85,7 @@ func PKeysToStringArr(item *abstract.ChangeItem, primaryKey []string, converter
7785
fastTableSchema := changeitem.MakeFastTableSchema(item.TableSchema.Columns())
7886
var columnNamesIndices map[string]int
7987

80-
keysChanged := item.KeysChanged()
88+
keysChanged := item.KeysChanged() || item.Kind == abstract.DeleteKind
8189
if keysChanged {
8290
columnNamesIndices = make(map[string]int, len(item.OldKeys.KeyNames))
8391

@@ -114,7 +122,7 @@ func ResolvePrimaryKeyColumns(
114122
ctx context.Context,
115123
storage abstract.Storage,
116124
tableID abstract.TableID,
117-
IsSupportedKeyType func(keyType string) bool,
125+
checkTypeCompatibility func(keyType string) TypeSupport,
118126
) ([]string, error) {
119127

120128
schema, err := storage.TableSchema(ctx, tableID)
@@ -127,10 +135,13 @@ func ResolvePrimaryKeyColumns(
127135
for _, column := range schema.Columns() {
128136
if column.PrimaryKey {
129137
primaryKey = append(primaryKey, column.ColumnName)
130-
}
131-
132-
if !IsSupportedKeyType(column.OriginalType) {
133-
return nil, xerrors.Errorf("unsupported by data-transfer incremental snapshot")
138+
switch checkTypeCompatibility(column.OriginalType) {
139+
case TypeSupported:
140+
case TypeUnsupported:
141+
return nil, abstract.NewFatalError(xerrors.Errorf("unsupported by data-transfer incremental snapshot type: %s, column: %s", column.OriginalType, column.ColumnName))
142+
case TypeUnknown:
143+
return nil, abstract.NewFatalError(xerrors.Errorf("unknown type: %s, column: %s", column.OriginalType, column.ColumnName))
144+
}
134145
}
135146
}
136147

pkg/providers/postgres/dblog/storage.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func (s *Storage) Ping() error {
5959
}
6060

6161
func (s *Storage) LoadTable(ctx context.Context, tableDescr abstract.TableDescription, pusher abstract.Pusher) error {
62-
pkColNames, err := dblog.ResolvePrimaryKeyColumns(ctx, s.pgStorage, tableDescr.ID(), IsSupportedKeyType)
62+
pkColNames, err := dblog.ResolvePrimaryKeyColumns(ctx, s.pgStorage, tableDescr.ID(), CheckTypeCompatibility)
6363
if err != nil {
6464
return xerrors.Errorf("unable to get primary key: %w", err)
6565
}

pkg/providers/postgres/dblog/supported_key_type.go

Lines changed: 68 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package dblog
33
import (
44
"strings"
55

6+
"github.com/transferia/transferia/pkg/dblog"
67
"github.com/transferia/transferia/pkg/util/set"
78
)
89

@@ -11,18 +12,21 @@ var supportedTypesArr = []string{
1112
"bit",
1213
"varbit",
1314

15+
"real",
1416
"smallint",
1517
"smallserial",
1618
"integer",
1719
"serial",
1820
"bigint",
1921
"bigserial",
2022
"oid",
23+
"tid",
2124

2225
"double precision",
2326

2427
"char",
2528
"varchar",
29+
"name",
2630

2731
"character",
2832
"character varying",
@@ -62,11 +66,72 @@ var supportedTypesArr = []string{
6266
"cidr",
6367
"macaddr",
6468
"citext",
69+
70+
"_jsonb",
71+
"_numeric",
72+
"_text",
73+
"_timestamp",
74+
"_timestamptz",
75+
"_uuid",
76+
"_varchar",
77+
"_bool",
78+
"_bpchar",
79+
"_bytea",
80+
"_cidr",
81+
"_date",
82+
"_float4",
83+
"_float8",
84+
"_inet",
85+
"_int2",
86+
"_int4",
87+
"_int8",
88+
"bpchar",
89+
"float4",
90+
"float8",
91+
"int2",
92+
"int4",
93+
"int8",
94+
"bool",
95+
"nummultirange",
96+
"int4multirange",
97+
"int8multirange",
98+
}
99+
100+
var unsupportedTypesArr = []string{
101+
"json", // could not identify a comparison function
102+
"hstore", // has no default operator class for access method "btree"
103+
"ltree", // has no default operator class for access method "btree"
104+
"point", // has no default operator class for access method "btree"
105+
"polygon", // has no default operator class for access method "btree"
106+
"line", // has no default operator class for access method "btree"
107+
"circle", // has no default operator class for access method "btree"
108+
"box", // has no default operator class for access method "btree"
109+
"path", // has no default operator class for access method "btree"
110+
"aclitem", // could not identify a comparison function
111+
"_aclitem", // could not identify a comparison function
112+
"cid", // has no default operator class for access method "btree"
113+
"record", // pseudo-type
114+
"unknown", // pseudo-type
115+
"xid", // has no default operator class for access method "btree"
116+
"lseg", // has no default operator class for access method "btree"
117+
"_json", // could not identify a comparison function
118+
"_tsrange", // failed to execute SELECT: ERROR: malformed range literal: "[2023-01-01 00:00:00" (SQLSTATE 22P02). Need to rewrite where statement
119+
"_tstzrange", // failed to execute SELECT: ERROR: malformed range literal: "[2023-01-01 00:00:00Z" (SQLSTATE 22P02). Need to rewrite where statement
65120
}
66121

67-
var supportedTypes = set.New(supportedTypesArr...)
122+
var supportedTypesSet = set.New(supportedTypesArr...)
123+
var unsupportedTypesSet = set.New(unsupportedTypesArr...)
68124

69-
func IsSupportedKeyType(keyType string) bool {
125+
func CheckTypeCompatibility(keyType string) dblog.TypeSupport {
70126
normalKeyType := strings.Split(keyType, "(")[0]
71-
return supportedTypes.Contains(strings.TrimPrefix(normalKeyType, "pg:"))
127+
normalKeyType = strings.TrimPrefix(normalKeyType, "pg:")
128+
129+
switch {
130+
case supportedTypesSet.Contains(normalKeyType):
131+
return dblog.TypeSupported
132+
case unsupportedTypesSet.Contains(normalKeyType):
133+
return dblog.TypeUnsupported
134+
default:
135+
return dblog.TypeUnknown
136+
}
72137
}

pkg/providers/postgres/dblog/tests/alltypes/check_all_types_test.go

Lines changed: 94 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"os"
7+
"strings"
78
"testing"
89

910
"github.com/jackc/pgx/v4"
@@ -13,6 +14,7 @@ import (
1314
"github.com/transferia/transferia/pkg/dblog"
1415
"github.com/transferia/transferia/pkg/dblog/tablequery"
1516
"github.com/transferia/transferia/pkg/providers/postgres"
17+
pg_dblog "github.com/transferia/transferia/pkg/providers/postgres/dblog"
1618
"github.com/transferia/transferia/pkg/providers/postgres/pgrecipe"
1719
"github.com/transferia/transferia/tests/helpers"
1820
)
@@ -39,12 +41,14 @@ var (
3941
"bigint",
4042
"bigserial",
4143
"oid",
44+
"tid",
4245

4346
"real",
4447
"double precision",
4548

4649
"char",
4750
"varchar",
51+
"name",
4852

4953
"character",
5054
"character varying",
@@ -86,6 +90,40 @@ var (
8690
"cidr",
8791
"macaddr",
8892
"citext",
93+
94+
"_jsonb",
95+
"_numeric",
96+
"_text",
97+
"_timestamp",
98+
"_timestamptz",
99+
// "_tsrange", failed to execute SELECT: ERROR: malformed range literal: "[2023-01-01 00:00:00" (SQLSTATE 22P02)
100+
// "_tstzrange", failed to execute SELECT: ERROR: malformed range literal: "[2023-01-01 00:00:00Z" (SQLSTATE 22P02)
101+
"_uuid",
102+
"_varchar",
103+
// "_aclitem", could not identify a comparison function for type aclitem
104+
"_bool",
105+
"_bpchar",
106+
"_bytea",
107+
"_cidr",
108+
"_date",
109+
"_float4",
110+
"_float8",
111+
"_inet",
112+
"_int2",
113+
"_int4",
114+
"_int8",
115+
"bpchar",
116+
"float4",
117+
"float8",
118+
"int2",
119+
"int4",
120+
"int8",
121+
"bool",
122+
123+
// pg 14+
124+
"nummultirange",
125+
"int4multirange",
126+
"int8multirange",
89127
}
90128
)
91129

@@ -110,7 +148,14 @@ func TestIncrementalSnapshot(t *testing.T) {
110148
tx, err := conn.BeginTx(context.TODO(), repeatableReadWriteTxOptions)
111149
require.NoError(t, err)
112150

151+
tableNames := make([]string, 0, len(postgresTypes))
152+
113153
for _, pgType := range postgresTypes {
154+
require.Equal(t, dblog.TypeSupported, pg_dblog.CheckTypeCompatibility(pgType), "pgType: %s", pgType)
155+
156+
tableName := createTableNameForType(pgType)
157+
tableNames = append(tableNames, tableName)
158+
114159
_, err := tx.Exec(context.TODO(), createTableWithPkTypeQuery(pgType))
115160
require.NoError(t, err)
116161
}
@@ -133,6 +178,7 @@ func TestIncrementalSnapshot(t *testing.T) {
133178

134179
"char_pk_table": {"'A'", "'B'"},
135180
"varchar_pk_table": {"'alpha'", "'beta'"},
181+
"name_pk_table": {"'A'", "'B'"},
136182

137183
"character_pk_table": {"'A'", "'B'"},
138184
"character varying_pk_table": {"'alpha'", "'beta'"},
@@ -168,10 +214,56 @@ func TestIncrementalSnapshot(t *testing.T) {
168214
"cidr_pk_table": {"'192.168.1.0/24'", "'192.168.2.0/24'"},
169215
"macaddr_pk_table": {"'08:00:2b:01:02:03'", "'08:00:2b:01:02:04'"},
170216
"citext_pk_table": {"'example'", "'test'"},
217+
218+
"tid_pk_table": {"'(0,1)'", "'(0,2)'"},
219+
"bpchar_pk_table": {"'A'", "'B'"},
220+
"float4_pk_table": {"'1.1'", "'2.2'"},
221+
"float8_pk_table": {"'1.1'", "'2.2'"},
222+
"int2_pk_table": {"'1'", "'2'"},
223+
"int4_pk_table": {"'1'", "'2'"},
224+
"int8_pk_table": {"'100'", "'200'"},
225+
"bool_pk_table": {"'false'", "'true'"},
226+
227+
// pg 14+
228+
"nummultirange_pk_table": {"'{(15e-1,25e-1), (25e-1,35e-1)}'", "'{(20e-1,30e-1), (30e-1,40e-1)}'"},
229+
"int4multirange_pk_table": {"'{[3,7), [8,9)}'", "'{[4,8), [9,10)}'"},
230+
"int8multirange_pk_table": {"'{[1,100), [200,300)}'", "'{[100,200), [300,400)}'"},
231+
232+
"_jsonb_pk_table": {"'{1, 2, 3}'", "'{4, 5, 6}'"},
233+
"_numeric_pk_table": {"ARRAY['1.1', '2.2']::numeric[]", "ARRAY['3.3', '4.4']::numeric[]"},
234+
"_text_pk_table": {"ARRAY['alpha', 'beta']::text[]", "ARRAY['gamma', 'delta']::text[]"},
235+
"_timestamp_pk_table": {"ARRAY['2023-01-01 00:00:00', '2023-01-02 00:00:00']::timestamp[]", "ARRAY['2023-01-03 00:00:00']::timestamp[]"},
236+
"_timestamptz_pk_table": {"'{2023-01-01 00:00:00+03, 2023-01-02 00:00:00+03}'", "'{2023-01-03 00:00:00+03, 2023-01-04 00:00:00+03}'"},
237+
// failed to execute SELECT: ERROR: malformed range literal: "[2023-01-01 00:00:00" (SQLSTATE 22P02)
238+
// "_tsrange_pk_table": {"'{\"[2023-01-01 00:00:00,2023-01-01 01:00:00)\"}'::tsrange[]", "'{\"[2023-01-02 00:00:00,2023-01-02 01:00:00)\"}'::tsrange[]"}
239+
// "_tstzrange_pk_table": {"'{\"[2023-01-01 00:00:00Z,2023-01-01 01:00:00Z)\"}'", "'{\"[2023-01-02 00:00:00Z,2023-01-02 01:00:00Z)\"}'"},
240+
"_uuid_pk_table": {"'{550e8400-e29b-41d4-a716-446655440000, 550e8400-e29b-41d4-a716-446655440001}'", "'{550e8400-e29b-41d4-a716-446655440002, 550e8400-e29b-41d4-a716-446655440003}'"},
241+
"_varchar_pk_table": {"'{alpha, beta}'", "'{gamma, delta}'"},
242+
"_bool_pk_table": {"'{false, true}'", "'{true, true}'"},
243+
"_bpchar_pk_table": {"'{A, B}'", "'{C, D}'"},
244+
"_bytea_pk_table": {"'{\\x00, \\xff}'", "'{\\xaa}'"},
245+
"_cidr_pk_table": {"'{192.168.1.0/24, 192.168.2.0/24}'", "'{192.168.3.0/24}'"},
246+
"_date_pk_table": {"'{2023-01-01, 2023-01-02}'", "'{2023-01-03}'"},
247+
"_float4_pk_table": {"'{1.1, 2.2}'", "'{3.3, 4.4}'"},
248+
"_float8_pk_table": {"'{1.1, 2.2}'", "'{3.3, 4.4}'"},
249+
"_inet_pk_table": {"'{192.168.1.1, 192.168.1.2}'", "'{192.168.1.3}'"},
250+
"_int2_pk_table": {"'{1, 2}'", "'{3, 4}'"},
251+
"_int4_pk_table": {"'{1, 2}'", "'{3, 4}'"},
252+
"_int8_pk_table": {"'{100, 200}'", "'{300, 400}'"},
253+
}
254+
255+
keys := make([]string, 0, len(inserts))
256+
tableNamesInInserts := make([]string, 0, len(inserts))
257+
for key := range inserts {
258+
keys = append(keys, strings.TrimSuffix(key, "_pk_table"))
259+
tableNamesInInserts = append(tableNamesInInserts, key)
171260
}
172261

173-
for tableName, insert := range inserts {
174-
_, err := tx.Exec(context.TODO(), insertQueryValues(tableName, addIdxToPk(insert)))
262+
require.ElementsMatch(t, postgresTypes, keys)
263+
require.ElementsMatch(t, tableNamesInInserts, tableNames)
264+
265+
for _, tableName := range tableNames {
266+
_, err := tx.Exec(context.TODO(), insertQueryValues(tableName, addIdxToPk(inserts[tableName])))
175267
require.NoError(t, err)
176268
}
177269

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
11
CREATE EXTENSION citext;
2+
CREATE EXTENSION ltree;
3+
CREATE EXTENSION hstore;
Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
CREATE TABLE __test_num_table
22
(
33
id SERIAL PRIMARY KEY,
4-
num INT
4+
num INT,
5+
unsupported_pk_type json default null
56
);
67

7-
INSERT INTO __test_num_table (num)
8-
SELECT generate_series(1, 10);
8+
INSERT INTO __test_num_table (num, unsupported_pk_type)
9+
SELECT generate_series(1, 10), '{"a": "b"}';
910

1011

1112
alter table __test_num_table replica identity full;

pkg/providers/postgres/model_pg_source.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -471,9 +471,10 @@ func (s *PgSource) isPreferReplica(transfer *model.Transfer) bool {
471471
// - It can be used only on heterogeneous transfers - bcs "for homo there are some technical restrictions" (https://github.com/transferia/transferia/review/4059241/details#comment-5973004)
472472
// There are some issues with reading sequence values from replica
473473
// - It can be used only on SNAPSHOT_ONLY transfer - bcs we can't take consistent slot on master & snapshot on replica
474+
// - It can be used only when DBLog is disabled - bcs DBLog requires master connection to insert/update records in signal table
474475
//
475476
// When 'PreferReplica' is true - reading happens from synchronous replica
476-
return !s.IsHomo && transfer != nil && (transfer.SnapshotOnly() || !transfer.IncrementOnly())
477+
return !s.IsHomo && transfer != nil && (transfer.SnapshotOnly() || !transfer.IncrementOnly()) && !s.DBLogEnabled
477478
}
478479

479480
func (s *PgSource) ToStorageParams(transfer *model.Transfer) *PgStorageParams {

0 commit comments

Comments
 (0)