Skip to content

Commit 4cb89b5

Browse files
committed
alter schema in pg-sink
Added: * add new columns on * add/remove enum values * create new enums few nits: * cache inmemory table definition * make sql (ddl or describe) only if given changeItem schema differ from cached * when new table created during replication Required columns are now translated as "not null" commit_hash:6eb0e235d5a0e23e9b9e82c465dc585d885d3f37
1 parent c7dfcb3 commit 4cb89b5

File tree

7 files changed

+1108
-80
lines changed

7 files changed

+1108
-80
lines changed

.mapping.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1686,6 +1686,8 @@
16861686
"pkg/providers/postgres/publisher_polling.go":"transfer_manager/go/pkg/providers/postgres/publisher_polling.go",
16871687
"pkg/providers/postgres/publisher_replication.go":"transfer_manager/go/pkg/providers/postgres/publisher_replication.go",
16881688
"pkg/providers/postgres/publisher_test.go":"transfer_manager/go/pkg/providers/postgres/publisher_test.go",
1689+
"pkg/providers/postgres/queries.go":"transfer_manager/go/pkg/providers/postgres/queries.go",
1690+
"pkg/providers/postgres/queries_test.go":"transfer_manager/go/pkg/providers/postgres/queries_test.go",
16891691
"pkg/providers/postgres/schema.go":"transfer_manager/go/pkg/providers/postgres/schema.go",
16901692
"pkg/providers/postgres/sequence.go":"transfer_manager/go/pkg/providers/postgres/sequence.go",
16911693
"pkg/providers/postgres/sequencer/lsn_transaction.go":"transfer_manager/go/pkg/providers/postgres/sequencer/lsn_transaction.go",
@@ -2965,6 +2967,8 @@
29652967
"tests/e2e/pg2pg/access/check_db_test.go":"transfer_manager/go/tests/e2e/pg2pg/access/check_db_test.go",
29662968
"tests/e2e/pg2pg/access/dump/dump.sql":"transfer_manager/go/tests/e2e/pg2pg/access/dump/dump.sql",
29672969
"tests/e2e/pg2pg/all_types/check_db_test.go":"transfer_manager/go/tests/e2e/pg2pg/all_types/check_db_test.go",
2970+
"tests/e2e/pg2pg/alters/alters_test.go":"transfer_manager/go/tests/e2e/pg2pg/alters/alters_test.go",
2971+
"tests/e2e/pg2pg/alters/dump/pg/dump.sql":"transfer_manager/go/tests/e2e/pg2pg/alters/dump/pg/dump.sql",
29682972
"tests/e2e/pg2pg/bytea_key/check_db_test.go":"transfer_manager/go/tests/e2e/pg2pg/bytea_key/check_db_test.go",
29692973
"tests/e2e/pg2pg/bytea_key/init_source/dump.sql":"transfer_manager/go/tests/e2e/pg2pg/bytea_key/init_source/dump.sql",
29702974
"tests/e2e/pg2pg/bytea_key/init_target/dump.sql":"transfer_manager/go/tests/e2e/pg2pg/bytea_key/init_target/dump.sql",

pkg/abstract/changeitem/table_schema.go

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,21 @@ import (
88
)
99

1010
type TableSchema struct {
11-
columns TableColumns
12-
hash string
11+
tableID TableID
12+
fastTableSchema FastTableSchema
13+
columns TableColumns
14+
hash string
15+
}
16+
17+
func (s *TableSchema) TableID() TableID {
18+
if s.tableID.Name == "" && len(s.columns) != 0 {
19+
return s.columns[0].TableID()
20+
}
21+
return s.tableID
22+
}
23+
24+
func (s *TableSchema) SetTableID(TableID TableID) {
25+
s.tableID = TableID
1326
}
1427

1528
func (s *TableSchema) Copy() *TableSchema {
@@ -31,7 +44,11 @@ func (s *TableSchema) ColumnNames() []string {
3144
}
3245

3346
func (s *TableSchema) FastColumns() FastTableSchema {
34-
return MakeFastTableSchema(s.columns)
47+
if s.fastTableSchema != nil {
48+
return s.fastTableSchema
49+
}
50+
s.fastTableSchema = MakeFastTableSchema(s.columns)
51+
return s.fastTableSchema
3552
}
3653

3754
func (s *TableSchema) Hash() (string, error) {
@@ -65,5 +82,10 @@ func NewTableSchema(columns []ColSchema) *TableSchema {
6582
return &TableSchema{
6683
columns: columns,
6784
hash: "",
85+
tableID: TableID{
86+
Namespace: "",
87+
Name: "",
88+
},
89+
fastTableSchema: nil,
6890
}
6991
}

pkg/providers/postgres/queries.go

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
package postgres
2+
3+
import (
4+
"fmt"
5+
"strconv"
6+
"strings"
7+
8+
"github.com/transferia/transferia/library/go/core/xerrors"
9+
"github.com/transferia/transferia/pkg/abstract"
10+
)
11+
12+
func CreateTableQuery(fullTableName string, schema []abstract.ColSchema) (string, error) {
13+
if err := prepareOriginalTypes(schema); err != nil {
14+
return "", xerrors.Errorf("failed to prepare original types for parsing: %w", err)
15+
}
16+
17+
var primaryKeys []string
18+
b := strings.Builder{}
19+
b.WriteString(fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (`, fullTableName))
20+
for idx, col := range schema {
21+
var queryType string
22+
if col.OriginalType != "" {
23+
queryType = strings.TrimPrefix(col.OriginalType, "pg:")
24+
queryType = strings.ReplaceAll(queryType, "USER-DEFINED", "TEXT")
25+
} else {
26+
var err error
27+
queryType, err = DataToOriginal(col.DataType)
28+
if err != nil {
29+
return "", xerrors.Errorf("failed to convert column %q to original type: %w", col.ColumnName, err)
30+
}
31+
}
32+
if strings.HasPrefix(col.Expression, "pg:") {
33+
queryType += " " + strings.TrimPrefix(col.Expression, "pg:")
34+
}
35+
if col.Required {
36+
queryType += " NOT NULL"
37+
}
38+
b.WriteString(fmt.Sprintf(`"%v" %v`, col.ColumnName, queryType))
39+
if col.IsKey() {
40+
primaryKeys = append(primaryKeys, fmt.Sprintf(`"%s"`, col.ColumnName))
41+
}
42+
if idx < len(schema)-1 {
43+
b.WriteString(",")
44+
}
45+
}
46+
if len(primaryKeys) > 0 {
47+
b.WriteString(fmt.Sprintf(", primary key (%v)", strings.Join(primaryKeys, ",")))
48+
}
49+
b.WriteString(")")
50+
51+
return b.String(), nil
52+
}
53+
54+
func createEnumQuery(col abstract.ColSchema) (string, error) {
55+
col, err := prepareOriginalType(col)
56+
if err != nil {
57+
return "", xerrors.Errorf("createEnumQuery:failed to prepare original types for parsing: %w", err)
58+
}
59+
typeName, err := strconv.Unquote(strings.TrimPrefix(col.OriginalType, "pg:"))
60+
if err != nil {
61+
return "", xerrors.Errorf("createEnumQuery:failed to unquote %s: %w", col.OriginalType, err)
62+
}
63+
allVals := GetPropertyEnumAllValues(&col)
64+
65+
buf := strings.Builder{}
66+
for i, v := range allVals {
67+
if i > 0 {
68+
buf.WriteString(",")
69+
}
70+
buf.WriteString(fmt.Sprintf("'%s'", v))
71+
}
72+
return fmt.Sprintf(`
73+
DO $$
74+
BEGIN
75+
IF NOT EXISTS (
76+
SELECT
77+
1
78+
FROM
79+
pg_type
80+
WHERE
81+
typname = '%s') THEN
82+
CREATE TYPE "%s" AS ENUM (%s);
83+
END IF;
84+
END;
85+
$$;`, typeName, typeName, buf.String()), nil
86+
}
87+
88+
func addEnumValsQuery(currentCol, col abstract.ColSchema) ([]string, error) {
89+
col, err := prepareOriginalType(col)
90+
if err != nil {
91+
return nil, xerrors.Errorf("addColsQuery:failed to prepare original types for parsing: %w", err)
92+
}
93+
currValsMap := make(map[string]int)
94+
currVals := GetPropertyEnumAllValues(&currentCol)
95+
for i, val := range currVals {
96+
currValsMap[val] = i
97+
}
98+
99+
newValsMap := make(map[string]int)
100+
newVals := GetPropertyEnumAllValues(&col)
101+
for i, val := range newVals {
102+
newValsMap[val] = i
103+
}
104+
res := make([]string, 0, len(newVals))
105+
106+
var toAdd []struct {
107+
val string
108+
before string
109+
}
110+
var toRemove []string
111+
112+
for i, val := range newVals {
113+
if _, exists := currValsMap[val]; !exists {
114+
var before string
115+
if i < len(newVals)-1 && len(currVals) > 0 {
116+
before = newVals[i+1]
117+
}
118+
toAdd = append(toAdd, struct {
119+
val string
120+
before string
121+
}{val, before})
122+
}
123+
}
124+
125+
for val := range currValsMap {
126+
if _, exists := newValsMap[val]; !exists {
127+
toRemove = append(toRemove, val)
128+
}
129+
}
130+
131+
if len(toAdd) == 0 && len(toRemove) == 0 {
132+
return nil, nil
133+
}
134+
135+
typeName := strings.TrimPrefix(col.OriginalType, "pg:")
136+
137+
for _, add := range toAdd {
138+
139+
if add.before != "" {
140+
res = append(res, fmt.Sprintf(`ALTER TYPE %s ADD VALUE IF NOT EXISTS '%s' BEFORE '%s'`,
141+
typeName, add.val, add.before))
142+
} else {
143+
res = append(res, fmt.Sprintf(`ALTER TYPE %s ADD VALUE IF NOT EXISTS '%s'`,
144+
typeName, add.val))
145+
}
146+
}
147+
148+
for _, val := range toRemove {
149+
res = append(res, fmt.Sprintf(`ALTER TYPE %s DROP VALUE IF EXISTS '%s'`, typeName, val))
150+
}
151+
152+
return res, nil
153+
}
154+
155+
func addColsQuery(ftn string, added []abstract.ColSchema) (string, error) {
156+
if err := prepareOriginalTypes(added); err != nil {
157+
return "", xerrors.Errorf("addColsQuery:failed to prepare original types for parsing: %w", err)
158+
}
159+
160+
b := strings.Builder{}
161+
b.WriteString(fmt.Sprintf(`ALTER TABLE %s `, ftn))
162+
for idx, col := range added {
163+
var queryType string
164+
if col.OriginalType != "" {
165+
queryType = strings.TrimPrefix(col.OriginalType, "pg:")
166+
queryType = strings.ReplaceAll(queryType, "USER-DEFINED", "TEXT")
167+
} else {
168+
var err error
169+
queryType, err = DataToOriginal(col.DataType)
170+
if err != nil {
171+
return "", xerrors.Errorf("addColsQuery:failed to convert column %q to original type: %w", col.ColumnName, err)
172+
}
173+
}
174+
if strings.HasPrefix(col.Expression, "pg:") {
175+
queryType += " " + strings.TrimPrefix(col.Expression, "pg:")
176+
}
177+
if col.Required {
178+
queryType += " NOT NULL"
179+
}
180+
b.WriteString(fmt.Sprintf(`ADD COLUMN IF NOT EXISTS "%v" %v`, col.ColumnName, queryType))
181+
if idx < len(added)-1 {
182+
b.WriteString(",")
183+
}
184+
}
185+
return b.String(), nil
186+
}

0 commit comments

Comments
 (0)