Skip to content

Commit c98b5c5

Browse files
authored
fix: Log warning instead of erroring out of PK component validation failure (#2039)
#### Summary Follow up to #2037. Gave it another thought and instead of erroring out might be better to log a warning (for starts) so we can fix the issues instead of breaking tables in the AWS plugin. Even when a PK component has a null value CQ ID will still be calculated with the null value, so the data can be inserted (opposed to a missing PK where in the case of DB destinations the insertion will fail) --- Use the following steps to ensure your PR is ready to be reviewed - [ ] Read the [contribution guidelines](../blob/main/CONTRIBUTING.md) 🧑‍🎓 - [ ] Run `go fmt` to format your code 🖊 - [ ] Lint your changes via `golangci-lint run` 🚨 (install golangci-lint [here](https://golangci-lint.run/usage/install/#local-installation)) - [ ] Update or add tests 🧪 - [ ] Ensure the status checks below are successful ✅
1 parent 13437c2 commit c98b5c5

File tree

4 files changed

+39
-18
lines changed

4 files changed

+39
-18
lines changed

scheduler/queue/worker.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -148,10 +148,15 @@ func (w *worker) resolveResource(ctx context.Context, table *schema.Table, clien
148148
return
149149
}
150150
if err := resolvedResource.Validate(); err != nil {
151-
tableMetrics := w.metrics.TableClient[table.Name][client.ID()]
152-
w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation error")
153-
atomic.AddUint64(&tableMetrics.Errors, 1)
154-
return
151+
switch err.(type) {
152+
case *schema.PKError:
153+
tableMetrics := w.metrics.TableClient[table.Name][client.ID()]
154+
w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation error")
155+
atomic.AddUint64(&tableMetrics.Errors, 1)
156+
return
157+
case *schema.PKComponentError:
158+
w.logger.Warn().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation warning")
159+
}
155160
}
156161
select {
157162
case resourcesChan <- resolvedResource:

scheduler/scheduler_dfs.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -189,10 +189,15 @@ func (s *syncClient) resolveResourcesDfs(ctx context.Context, table *schema.Tabl
189189
return
190190
}
191191
if err := resolvedResource.Validate(); err != nil {
192-
tableMetrics := s.metrics.TableClient[table.Name][client.ID()]
193-
s.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation error")
194-
atomic.AddUint64(&tableMetrics.Errors, 1)
195-
return
192+
switch err.(type) {
193+
case *schema.PKError:
194+
tableMetrics := s.metrics.TableClient[table.Name][client.ID()]
195+
s.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation error")
196+
atomic.AddUint64(&tableMetrics.Errors, 1)
197+
return
198+
case *schema.PKComponentError:
199+
s.logger.Warn().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation warning")
200+
}
196201
}
197202
select {
198203
case resourcesChan <- resolvedResource:

schema/resource.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,22 @@ func (r *Resource) storeCQID(value uuid.UUID) error {
123123
return r.Set(CqIDColumn.Name, b)
124124
}
125125

126+
type PKError struct {
127+
MissingPKs []string
128+
}
129+
130+
func (e *PKError) Error() string {
131+
return fmt.Sprintf("missing primary key on columns: %v", e.MissingPKs)
132+
}
133+
134+
type PKComponentError struct {
135+
MissingPKComponents []string
136+
}
137+
138+
func (e *PKComponentError) Error() string {
139+
return fmt.Sprintf("missing primary key component on columns: %v", e.MissingPKComponents)
140+
}
141+
126142
// Validates that all primary keys have values.
127143
func (r *Resource) Validate() error {
128144
var missingPks []string
@@ -136,10 +152,10 @@ func (r *Resource) Validate() error {
136152
}
137153
}
138154
if len(missingPks) > 0 {
139-
return fmt.Errorf("missing primary key on columns: %v", missingPks)
155+
return &PKError{MissingPKs: missingPks}
140156
}
141157
if len(missingPKComponents) > 0 {
142-
return fmt.Errorf("missing primary key component on columns: %v", missingPKComponents)
158+
return &PKComponentError{MissingPKComponents: missingPKComponents}
143159
}
144160
return nil
145161
}

schema/resource_test.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package schema
22

33
import (
4-
"errors"
54
"testing"
65

76
"github.com/apache/arrow-go/v18/arrow"
@@ -39,12 +38,12 @@ func TestResource_Validate(t *testing.T) {
3938
{
4039
name: "invalid resource with primary keys",
4140
resource: NewResourceData(&Table{Name: "test", Columns: ColumnList{{Name: "col1", Type: arrow.BinaryTypes.String, PrimaryKey: true}}}, nil, nil),
42-
err: errors.New(`missing primary key on columns: [col1]`),
41+
err: &PKError{MissingPKs: []string{"col1"}},
4342
},
4443
{
4544
name: "invalid resource with primary key components",
4645
resource: NewResourceData(&Table{Name: "test", Columns: ColumnList{{Name: "col1", Type: arrow.BinaryTypes.String, PrimaryKeyComponent: true}}}, nil, nil),
47-
err: errors.New(`missing primary key component on columns: [col1]`),
46+
err: &PKComponentError{MissingPKComponents: []string{"col1"}},
4847
},
4948
}
5049
for _, tt := range tests {
@@ -53,11 +52,7 @@ func TestResource_Validate(t *testing.T) {
5352
require.NoError(t, tt.valueSetter(tt.resource))
5453
}
5554
validationError := tt.resource.Validate()
56-
if tt.err == nil {
57-
require.NoError(t, validationError)
58-
} else {
59-
require.ErrorContains(t, validationError, tt.err.Error())
60-
}
55+
require.Equal(t, tt.err, validationError)
6156
})
6257
}
6358
}

0 commit comments

Comments
 (0)