Skip to content

Commit aa7a4ee

Browse files
authored
e2e: test resync doesn't clear custom configurations (#3538)
tests fix in #3435
1 parent 0582ce5 commit aa7a4ee

File tree

1 file changed

+29
-7
lines changed

1 file changed

+29
-7
lines changed

flow/e2e/api_test.go

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -109,23 +109,32 @@ func (s APITestSuite) waitForActiveSlotForPostgresMirror(env WorkflowRun, conn *
109109
})
110110
}
111111

112-
// checkCatalogTableMapping checks the table mappings in the catalog for a given flow
113-
func (s APITestSuite) checkCatalogTableMapping(
112+
func (s APITestSuite) loadConfigFromCatalog(
114113
ctx context.Context,
115114
conn *pgx.Conn,
116115
flowName string,
117-
expectedSourceTableNames []string,
118-
) (bool, error) {
116+
) (*protos.FlowConnectionConfigs, error) {
119117
var configBytes sql.RawBytes
120118
if err := conn.QueryRow(ctx,
121119
"SELECT config_proto FROM flows WHERE name = $1", flowName,
122120
).Scan(&configBytes); err != nil {
123-
return false, err
121+
return nil, err
124122
}
125123

126124
var config protos.FlowConnectionConfigs
127-
if err := proto.Unmarshal(configBytes, &config); err != nil {
128-
return false, err
125+
return &config, proto.Unmarshal(configBytes, &config)
126+
}
127+
128+
// checkCatalogTableMapping checks the table mappings in the catalog for a given flow
129+
func (s APITestSuite) checkCatalogTableMapping(
130+
ctx context.Context,
131+
conn *pgx.Conn,
132+
flowName string,
133+
expectedSourceTableNames []string,
134+
) (bool, error) {
135+
config, err := s.loadConfigFromCatalog(ctx, conn, flowName)
136+
if err != nil {
137+
return false, fmt.Errorf("failed to load config from catalog: %w", err)
129138
}
130139

131140
if len(config.TableMappings) != len(expectedSourceTableNames) {
@@ -587,6 +596,12 @@ func (s APITestSuite) TestResyncCompleted() {
587596
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
588597
flowConnConfig.DoInitialSnapshot = true
589598
flowConnConfig.InitialSnapshotOnly = true
599+
flowConnConfig.SnapshotNumRowsPerPartition = 3
600+
flowConnConfig.SnapshotMaxParallelWorkers = 7
601+
flowConnConfig.SnapshotNumTablesInParallel = 13
602+
flowConnConfig.IdleTimeoutSeconds = 9
603+
flowConnConfig.MaxBatchSize = 5040
604+
// if true, then the flow will be resynced
590605
response, err := s.CreateCDCFlow(s.t.Context(), &protos.CreateCDCFlowRequest{ConnectionConfigs: flowConnConfig})
591606
require.NoError(s.t, err)
592607
require.NotNil(s.t, response)
@@ -633,6 +648,13 @@ func (s APITestSuite) TestResyncCompleted() {
633648
env, err = GetPeerflow(s.t.Context(), s.pg.PostgresConnector.Conn(), tc, flowConnConfig.FlowJobName)
634649
require.NoError(s.t, err)
635650
EnvWaitForFinished(s.t, env, time.Minute)
651+
652+
// check that custom config options persist across resync
653+
config, err := s.loadConfigFromCatalog(s.t.Context(), s.pg.PostgresConnector.Conn(), flowConnConfig.FlowJobName)
654+
require.NoError(s.t, err)
655+
flowConnConfig.Resync = true // this gets left true after resync
656+
config.Env = nil // env is modified by API
657+
require.EqualExportedValues(s.t, flowConnConfig, config)
636658
}
637659

638660
func (s APITestSuite) TestDropCompleted() {

0 commit comments

Comments
 (0)