Skip to content

Commit 0474b9f

Browse files
authored
fix: Fix Transform hang in CLI sync (#2001)
When CLI runs sync_v3 and a transformer is in the spec, if the transformer errors at transform time, the CLI can hang forever. This fix makes sure that the `.Transform()` function always returns after an error. Tested 20 times: ```bash Loading spec(s) from cmd/testdata/transformer-errors.yml Starting sync for: test (cloudquery/[email protected]) -> [test (cloudquery/[email protected])] Error: failed to sync v3 source test: rpc error: code = Internal desc = failing at the transformer stage according to spec requirements exit status 1 Loading spec(s) from cmd/testdata/transformer-errors.yml Starting sync for: test (cloudquery/[email protected]) -> [test (cloudquery/[email protected])] Error: failed to sync v3 source test: rpc error: code = Internal desc = failing at the transformer stage according to spec requirements exit status 1 Loading spec(s) from cmd/testdata/transformer-errors.yml Starting sync for: test (cloudquery/[email protected]) -> [test (cloudquery/[email protected])] Error: failed to sync v3 source test: rpc error: code = Internal desc = failing at the transformer stage according to spec requirements exit status 1 Loading spec(s) from cmd/testdata/transformer-errors.yml Starting sync for: test (cloudquery/[email protected]) -> [test (cloudquery/[email protected])] Error: failed to sync v3 source test: rpc error: code = Internal desc = failing at the transformer stage according to spec requirements exit status 1 Loading spec(s) from cmd/testdata/transformer-errors.yml Starting sync for: test (cloudquery/[email protected]) -> [test (cloudquery/[email protected])] Error: failed to sync v3 source test: rpc error: code = Internal desc = failing at the transformer stage according to spec requirements exit status 1 Loading spec(s) from cmd/testdata/transformer-errors.yml Starting sync for: test (cloudquery/[email protected]) -> [test (cloudquery/[email protected])] Error: failed to sync v3 source test: rpc error: code = Internal desc = failing at the transformer stage according to spec requirements exit status 1 Loading spec(s) from cmd/testdata/transformer-errors.yml Starting sync for: test (cloudquery/[email protected]) -> [test (cloudquery/[email protected])] Error: failed to sync v3 source test: EOF exit status 1 Loading spec(s) from cmd/testdata/transformer-errors.yml Starting sync for: test (cloudquery/[email protected]) -> [test (cloudquery/[email protected])] Error: failed to sync v3 source test: rpc error: code = Internal desc = failing at the transformer stage according to spec requirements exit status 1 Loading spec(s) from cmd/testdata/transformer-errors.yml Starting sync for: test (cloudquery/[email protected]) -> [test (cloudquery/[email protected])] Error: failed to sync v3 source test: rpc error: code = Internal desc = failing at the transformer stage according to spec requirements exit status 1 Loading spec(s) from cmd/testdata/transformer-errors.yml Starting sync for: test (cloudquery/[email protected]) -> [test (cloudquery/[email protected])] Error: failed to sync v3 source test: rpc error: code = Internal desc = failing at the transformer stage according to spec requirements exit status 1 Loading spec(s) from cmd/testdata/transformer-errors.yml Starting sync for: test (cloudquery/[email protected]) -> [test (cloudquery/[email protected])] Error: failed to sync v3 source test: rpc error: code = Internal desc = failing at the transformer stage according to spec requirements exit status 1 Loading spec(s) from cmd/testdata/transformer-errors.yml Starting sync for: test (cloudquery/[email protected]) -> [test (cloudquery/[email protected])] Error: failed to sync v3 source test: rpc error: code = Internal desc = failing at the transformer stage according to spec requirements exit status 1 Loading spec(s) from cmd/testdata/transformer-errors.yml Starting sync for: test (cloudquery/[email protected]) -> [test (cloudquery/[email protected])] Error: failed to sync v3 source test: rpc error: code = Internal desc = failing at the transformer stage according to spec requirements exit status 1 Loading spec(s) from cmd/testdata/transformer-errors.yml Starting sync for: test (cloudquery/[email protected]) -> [test (cloudquery/[email protected])] Error: failed to sync v3 source test: rpc error: code = Internal desc = failing at the transformer stage according to spec requirements exit status 1 Loading spec(s) from cmd/testdata/transformer-errors.yml Starting sync for: test (cloudquery/[email protected]) -> [test (cloudquery/[email protected])] Error: failed to sync v3 source test: rpc error: code = Internal desc = failing at the transformer stage according to spec requirements exit status 1 Loading spec(s) from cmd/testdata/transformer-errors.yml Starting sync for: test (cloudquery/[email protected]) -> [test (cloudquery/[email protected])] Error: failed to sync v3 source test: rpc error: code = Internal desc = failing at the transformer stage according to spec requirements exit status 1 Loading spec(s) from cmd/testdata/transformer-errors.yml Starting sync for: test (cloudquery/[email protected]) -> [test (cloudquery/[email protected])] Error: failed to sync v3 source test: rpc error: code = Internal desc = failing at the transformer stage according to spec requirements exit status 1 Loading spec(s) from cmd/testdata/transformer-errors.yml Starting sync for: test (cloudquery/[email protected]) -> [test (cloudquery/[email protected])] Error: failed to sync v3 source test: rpc error: code = Internal desc = failing at the transformer stage according to spec requirements exit status 1 Loading spec(s) from cmd/testdata/transformer-errors.yml Starting sync for: test (cloudquery/[email protected]) -> [test (cloudquery/[email protected])] Error: failed to sync v3 source test: rpc error: code = Internal desc = failing at the transformer stage according to spec requirements exit status 1 Loading spec(s) from cmd/testdata/transformer-errors.yml Starting sync for: test (cloudquery/[email protected]) -> [test (cloudquery/[email protected])] Error: failed to sync v3 source test: rpc error: code = Internal desc = failing at the transformer stage according to spec requirements exit status 1 ```
1 parent 88a7833 commit 0474b9f

File tree

2 files changed

+29
-29
lines changed

2 files changed

+29
-29
lines changed

internal/servers/plugin/v3/plugin.go

Lines changed: 26 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -402,23 +402,21 @@ func (s *Server) Write(stream pb.Plugin_WriteServer) error {
402402

403403
func (s *Server) Transform(stream pb.Plugin_TransformServer) error {
404404
var (
405-
recvRecords = make(chan arrow.Record)
406-
sendRecords = make(chan arrow.Record)
407-
pluginStopsWriter = make(chan struct{})
408-
doneReading = false
409-
ctx = stream.Context()
410-
eg, gctx = errgroup.WithContext(ctx)
405+
recvRecords = make(chan arrow.Record)
406+
sendRecords = make(chan arrow.Record)
407+
ctx = stream.Context()
408+
eg, gctx = errgroup.WithContext(ctx)
411409
)
412410

413411
// Run the plugin's transform with both channels.
414412
//
415413
// When the plugin is done, it must return with either an error or nil.
416414
// The plugin must not close either channel.
417415
eg.Go(func() error {
418-
err := s.Plugin.Transform(gctx, recvRecords, sendRecords)
419-
close(pluginStopsWriter)
420-
doneReading = true
421-
return err
416+
if err := s.Plugin.Transform(gctx, recvRecords, sendRecords); err != nil {
417+
return status.Error(codes.Internal, err.Error())
418+
}
419+
return nil
422420
})
423421

424422
// Write transformed records from transformer to destination.
@@ -429,21 +427,16 @@ func (s *Server) Transform(stream pb.Plugin_TransformServer) error {
429427
// The reading never closes the writer, because it's up to the Plugin to decide when to finish
430428
// writing, regardless of if the reading finished.
431429
eg.Go(func() error {
432-
for {
433-
select {
434-
case record := <-sendRecords:
435-
recordBytes, err := pb.RecordToBytes(record)
436-
if err != nil {
437-
return status.Errorf(codes.Internal, "failed to convert record to bytes: %v", err)
438-
}
439-
440-
if err := stream.Send(&pb.Transform_Response{Record: recordBytes}); err != nil {
441-
return fmt.Errorf("error sending response: %w", err)
442-
}
443-
case <-pluginStopsWriter:
444-
return nil
430+
for record := range sendRecords {
431+
recordBytes, err := pb.RecordToBytes(record)
432+
if err != nil {
433+
return status.Errorf(codes.Internal, "failed to convert record to bytes: %v", err)
434+
}
435+
if err := stream.Send(&pb.Transform_Response{Record: recordBytes}); err != nil {
436+
return status.Errorf(codes.Internal, "error sending response: %v", err)
445437
}
446438
}
439+
return nil
447440
})
448441

449442
// Read records from source to transformer
@@ -463,18 +456,23 @@ func (s *Server) Transform(stream pb.Plugin_TransformServer) error {
463456
}
464457
if err != nil {
465458
close(recvRecords)
466-
return fmt.Errorf("Error receiving request: %v", err)
467-
}
468-
if doneReading {
469-
return nil
459+
return status.Errorf(codes.Internal, "Error receiving request: %v", err)
470460
}
471461
record, err := pb.NewRecordFromBytes(req.Record)
472462
if err != nil {
473463
close(recvRecords)
474464
return status.Errorf(codes.InvalidArgument, "failed to create record: %v", err)
475465
}
476466

477-
recvRecords <- record
467+
select {
468+
case recvRecords <- record:
469+
case <-gctx.Done():
470+
close(recvRecords)
471+
return status.Errorf(codes.Canceled, "context done: %v", gctx.Err())
472+
case <-ctx.Done():
473+
close(recvRecords)
474+
return status.Errorf(codes.Canceled, "context done: %v", ctx.Err())
475+
}
478476
}
479477
})
480478

plugin/plugin_transformer.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ type TransformerClient interface {
1212
}
1313

1414
func (p *Plugin) Transform(ctx context.Context, recvRecords <-chan arrow.Record, sendRecords chan<- arrow.Record) error {
15-
return p.client.Transform(ctx, recvRecords, sendRecords)
15+
err := p.client.Transform(ctx, recvRecords, sendRecords)
16+
close(sendRecords)
17+
return err
1618
}
1719
func (p *Plugin) TransformSchema(ctx context.Context, old *arrow.Schema) (*arrow.Schema, error) {
1820
return p.client.TransformSchema(ctx, old)

0 commit comments

Comments
 (0)