Skip to content

Commit 30d5ca9

Browse files
author
Przemyslaw Stepien
committed
feat: Add handling of error messages to sdk
1 parent d220f63 commit 30d5ca9

File tree

7 files changed

+48
-2
lines changed

7 files changed

+48
-2
lines changed

internal/servers/plugin/v3/plugin.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,17 @@ func (s *Server) Sync(req *pb.Sync_Request, stream pb.Plugin_SyncServer) error {
259259
WhereClause: whereClause,
260260
},
261261
}
262+
case *message.SyncError:
263+
if req.WithErrorMessages {
264+
pbMsg.Message = &pb.Sync_Response_Error{
265+
Error: &pb.Sync_MessageError{
266+
TableName: m.TableName,
267+
Error: m.Error,
268+
},
269+
}
270+
} else {
271+
continue
272+
}
262273
default:
263274
return status.Errorf(codes.Internal, "unknown message type: %T", msg)
264275
}

message/sync_message.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,3 +121,13 @@ type SyncDeleteRecord struct {
121121
func (m SyncDeleteRecord) GetTable() *schema.Table {
122122
return &schema.Table{Name: m.TableName}
123123
}
124+
125+
type SyncError struct {
126+
syncBaseMessage
127+
TableName string
128+
Error string
129+
}
130+
131+
func (e SyncError) GetTable() *schema.Table {
132+
return &schema.Table{Name: e.TableName}
133+
}

scheduler/queue/scheduler.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55

66
"github.com/cloudquery/plugin-sdk/v4/caser"
7+
"github.com/cloudquery/plugin-sdk/v4/message"
78
"github.com/cloudquery/plugin-sdk/v4/scheduler/metrics"
89
"github.com/cloudquery/plugin-sdk/v4/schema"
910
"github.com/google/uuid"
@@ -32,6 +33,8 @@ type Scheduler struct {
3233
metrics *metrics.Metrics
3334
invocationID string
3435
seed int64
36+
// message channel for sending SyncError messages
37+
msgChan chan<- message.SyncMessage
3538
}
3639

3740
type Option func(*Scheduler)
@@ -77,7 +80,7 @@ func NewShuffleQueueScheduler(logger zerolog.Logger, m *metrics.Metrics, seed in
7780
return scheduler
7881
}
7982

80-
func (d *Scheduler) Sync(ctx context.Context, tableClients []WorkUnit, resolvedResources chan<- *schema.Resource) {
83+
func (d *Scheduler) Sync(ctx context.Context, tableClients []WorkUnit, resolvedResources chan<- *schema.Resource, msgChan chan<- message.SyncMessage) {
8184
if len(tableClients) == 0 {
8285
return
8386
}
@@ -102,6 +105,7 @@ func (d *Scheduler) Sync(ctx context.Context, tableClients []WorkUnit, resolvedR
102105
d.invocationID,
103106
d.deterministicCQID,
104107
d.metrics,
108+
msgChan,
105109
).work(ctx, activeWorkSignal)
106110
return nil
107111
})

scheduler/queue/worker.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/cloudquery/plugin-sdk/v4/caser"
1212
"github.com/cloudquery/plugin-sdk/v4/helpers"
13+
"github.com/cloudquery/plugin-sdk/v4/message"
1314
"github.com/cloudquery/plugin-sdk/v4/scheduler/metrics"
1415
"github.com/cloudquery/plugin-sdk/v4/scheduler/resolvers"
1516
"github.com/cloudquery/plugin-sdk/v4/schema"
@@ -29,6 +30,8 @@ type worker struct {
2930
invocationID string
3031
deterministicCQID bool
3132
metrics *metrics.Metrics
33+
// message channel for sending SyncError messages
34+
msgChan chan<- message.SyncMessage
3235
}
3336

3437
func (w *worker) work(ctx context.Context, activeWorkSignal *activeWorkSignal) {
@@ -51,6 +54,7 @@ func newWorker(
5154
invocationID string,
5255
deterministicCQID bool,
5356
m *metrics.Metrics,
57+
msgChan chan<- message.SyncMessage,
5458
) *worker {
5559
return &worker{
5660
jobs: jobs,
@@ -61,6 +65,7 @@ func newWorker(
6165
deterministicCQID: deterministicCQID,
6266
invocationID: invocationID,
6367
metrics: m,
68+
msgChan: msgChan,
6469
}
6570
}
6671

@@ -105,6 +110,12 @@ func (w *worker) resolveTable(ctx context.Context, table *schema.Table, client s
105110
logger.Error().Err(err).Msg("table resolver finished with error")
106111
tableMetrics.OtelErrorsAdd(ctx, 1)
107112
atomic.AddUint64(&tableMetrics.Errors, 1)
113+
// Send SyncError message
114+
syncErrorMsg := &message.SyncError{
115+
TableName: table.Name,
116+
Error: err.Error(),
117+
}
118+
w.msgChan <- syncErrorMsg
108119
return
109120
}
110121
}()

scheduler/scheduler.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,8 @@ type syncClient struct {
139139
metrics *metrics.Metrics
140140
logger zerolog.Logger
141141
invocationID string
142+
// message channel for sending SyncError messages
143+
msgChan chan<- message.SyncMessage
142144

143145
shard *shard
144146
}
@@ -213,6 +215,7 @@ func (s *Scheduler) Sync(ctx context.Context, client schema.ClientMeta, tables s
213215
scheduler: s,
214216
logger: s.logger,
215217
invocationID: s.invocationID,
218+
msgChan: res,
216219
}
217220
for _, opt := range opts {
218221
opt(syncClient)

scheduler/scheduler_dfs.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"time"
1010

1111
"github.com/cloudquery/plugin-sdk/v4/helpers"
12+
"github.com/cloudquery/plugin-sdk/v4/message"
1213
"github.com/cloudquery/plugin-sdk/v4/scheduler/batchsender"
1314
"github.com/cloudquery/plugin-sdk/v4/scheduler/metrics"
1415
"github.com/cloudquery/plugin-sdk/v4/scheduler/resolvers"
@@ -119,6 +120,12 @@ func (s *syncClient) resolveTableDfs(ctx context.Context, table *schema.Table, c
119120
logger.Error().Err(err).Msg("table resolver finished with error")
120121
tableMetrics.OtelErrorsAdd(ctx, 1)
121122
atomic.AddUint64(&tableMetrics.Errors, 1)
123+
// Send SyncError message
124+
syncErrorMsg := &message.SyncError{
125+
TableName: table.Name,
126+
Error: err.Error(),
127+
}
128+
s.msgChan <- syncErrorMsg
122129
return
123130
}
124131
}()

scheduler/scheduler_shuffle_queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,5 +45,5 @@ func (s *syncClient) syncShuffleQueue(ctx context.Context, resolvedResources cha
4545
Client: tc.client,
4646
})
4747
}
48-
scheduler.Sync(ctx, queueClients, resolvedResources)
48+
scheduler.Sync(ctx, queueClients, resolvedResources, s.msgChan)
4949
}

0 commit comments

Comments
 (0)