-
Notifications
You must be signed in to change notification settings - Fork 107
Open
Description
When use service.Stream.Run
https://github.com/redpanda-data/benthos/blob/main/public/service/stream.go#L72
func (s *Stream) Run(ctx context.Context) (err error) {
s.strmMut.Lock()
if s.strm != nil {
err = errors.New("stream has already been run")
} else {
s.strm, err = stream.New(s.conf, s.mgr,
stream.OptOnClose(func() {
s.shutSig.TriggerHasStopped()
}))
}
//...
}the stream.New will be called
https://github.com/redpanda-data/benthos/blob/main/internal/stream/type.go#L53
func New(conf Config, mgr bundle.NewManagement, opts ...func(*Type)) (*Type, error) {
t := &Type{
conf: conf,
manager: mgr,
onClose: func() {},
closed: 0,
}
for _, opt := range opts {
opt(t)
}
if err := t.start(); err != nil {
// bug in here!
// when we get an error, we need do some clean up
return nil, err
}
// ...
}Because in stream.Type.start we construct all layer.
If we construct the input layer success, and the input loop will be run in background.
Then, we construct the output layer fail, the func just returned, and the input loop leaked.
https://github.com/redpanda-data/benthos/blob/main/internal/stream/type.go#L141
func (t *Type) start() (err error) {
// Constructors
iMgr := t.manager.IntoPath("input")
// bug in here
// the input loop start when the input component constructed
// https://github.com/redpanda-data/benthos/blob/main/internal/component/input/async_reader.go#L68
if t.inputLayer, err = iMgr.NewInput(t.conf.Input); err != nil {
return
}
if t.conf.Buffer.Type != "none" {
bMgr := t.manager.IntoPath("buffer")
if t.bufferLayer, err = bMgr.NewBuffer(t.conf.Buffer); err != nil {
return
}
}
if tLen := len(t.conf.Pipeline.Processors); tLen > 0 {
pMgr := t.manager.IntoPath("pipeline")
if t.pipelineLayer, err = pipeline.New(t.conf.Pipeline, pMgr); err != nil {
return
}
}
oMgr := t.manager.IntoPath("output")
if t.outputLayer, err = oMgr.NewOutput(t.conf.Output); err != nil {
return
}
// ...
}And also, other layers loop start when the Consume interface been called.
We also need to handle their clean up work when some error happened.
Metadata
Metadata
Assignees
Labels
No labels