Skip to content

Commit 17448ff

Browse files
committed
improve processor stability
1 parent a0e150f commit 17448ff

File tree

6 files changed

+30
-20
lines changed

6 files changed

+30
-20
lines changed

options.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,7 @@ func (opt *poptions) applyOptions(gg *GroupGraph, opts ...ProcessorOption) error
363363
}
364364

365365
if globalConfig.Producer.RequiredAcks == sarama.NoResponse {
366-
return fmt.Errorf("Processors do not work with `Config.Producer.RequiredAcks==sarama.NoResponse`, as it uses the response's offset to store the value")
366+
return fmt.Errorf("processors do not work with `Config.Producer.RequiredAcks==sarama.NoResponse`, as it uses the response's offset to store the value")
367367
}
368368

369369
if opt.builders.producer == nil {

partition_processor.go

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ func (pp *PartitionProcessor) Start(setupCtx, ctx context.Context) error {
257257
join := join
258258
pp.runnerGroup.Go(func() error {
259259
defer pp.state.SetState(PPStateStopping)
260-
return join.CatchupForever(runnerCtx, false)
260+
return join.CatchupForever(runnerCtx, true)
261261
})
262262
}
263263

@@ -274,7 +274,7 @@ func (pp *PartitionProcessor) Start(setupCtx, ctx context.Context) error {
274274
// (b) run the processor table in catchup mode so it keeps updating it's state.
275275
case runModePassive:
276276
if pp.table != nil {
277-
err = pp.table.CatchupForever(runnerCtx, false)
277+
err = pp.table.CatchupForever(runnerCtx, true)
278278
}
279279
default:
280280
err = fmt.Errorf("processor has invalid run mode")
@@ -298,16 +298,16 @@ func (pp *PartitionProcessor) Stop() error {
298298
pp.state.SetState(PPStateStopping)
299299
defer pp.state.SetState(PPStateStopped)
300300

301-
close(pp.input)
302-
close(pp.visitInput)
303-
304301
if pp.cancelRunnerGroup != nil {
305302
pp.cancelRunnerGroup()
306303
}
307304

308305
// wait for the runner to be done
309306
runningErrs := multierror.Append(pp.runnerGroup.Wait().ErrorOrNil())
310307

308+
close(pp.input)
309+
close(pp.visitInput)
310+
311311
// close all the tables
312312
stopErrg, _ := multierr.NewErrGroup(context.Background())
313313
for _, join := range pp.joins {
@@ -637,15 +637,6 @@ func (pp *PartitionProcessor) VisitValues(ctx context.Context, name string, meta
637637

638638
var wg sync.WaitGroup
639639

640-
// drains the channel and drops out when closed.
641-
// This is done when the processor shuts down during visit
642-
// and makes sure the waitgroup is fully counted down.
643-
drainUntilClose := func() {
644-
for range pp.visitInput {
645-
wg.Done()
646-
}
647-
}
648-
649640
// drains the input channel until there are no more items.
650641
// does not wait for close, because the channel stays open for the next visit
651642
drainUntilEmpty := func() {
@@ -662,6 +653,17 @@ func (pp *PartitionProcessor) VisitValues(ctx context.Context, name string, meta
662653
}
663654
}
664655

656+
// register a channel that will close once the visitor itself is done.
657+
visitDone := make(chan struct{})
658+
defer close(visitDone)
659+
660+
// start a goroutine in the processor's runner-errgroup that prevents the broker from shutting down
661+
// while the visitor is running.
662+
pp.runnerGroup.Go(func() error {
663+
<-visitDone
664+
return nil
665+
})
666+
665667
defer it.Release()
666668

667669
stopping, doneWaitingForStop := pp.stopping()
@@ -673,7 +675,7 @@ func (pp *PartitionProcessor) VisitValues(ctx context.Context, name string, meta
673675
wg.Add(1)
674676
select {
675677
case <-stopping:
676-
drainUntilClose()
678+
drainUntilEmpty()
677679
wg.Done()
678680
return ErrVisitAborted
679681
case <-ctx.Done():
@@ -703,7 +705,7 @@ func (pp *PartitionProcessor) VisitValues(ctx context.Context, name string, meta
703705
}()
704706
select {
705707
case <-stopping:
706-
drainUntilClose()
708+
drainUntilEmpty()
707709
return ErrVisitAborted
708710
case <-ctx.Done():
709711
drainUntilEmpty()

processor.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -421,11 +421,11 @@ func (g *Processor) handleSessionErrors(ctx, sessionCtx context.Context, session
421421
)
422422

423423
if errors.As(err, &errProc) {
424-
g.log.Debugf("error processing message (non-transient), shutting down processor: %v", err)
424+
g.log.Printf("error processing message (non-transient), shutting down processor: %v", err)
425425
sessionCtxCancel()
426426
}
427427
if errors.As(err, &errSetup) {
428-
g.log.Debugf("setup error (non-transient), shutting down processor: %v", err)
428+
g.log.Printf("setup error (non-transient), shutting down processor: %v", err)
429429
sessionCtxCancel()
430430
}
431431
}

systemtest/proc_disconnect_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ func TestProcessorShutdown_KafkaDisconnect(t *testing.T) {
1818
brokers := initSystemTest(t)
1919
var (
2020
topic = goka.Stream(fmt.Sprintf("goka_systemtest_proc_shutdown_disconnect-%d", time.Now().Unix()))
21+
join = goka.Stream(fmt.Sprintf("goka_systemtest_proc_shutdown_disconnect-%d-join", time.Now().Unix()))
2122
group = goka.Group(topic)
2223
)
2324

@@ -29,6 +30,7 @@ func TestProcessorShutdown_KafkaDisconnect(t *testing.T) {
2930
tmgr, err := goka.DefaultTopicManagerBuilder(brokers)
3031
require.NoError(t, err)
3132
require.NoError(t, tmgr.EnsureStreamExists(string(topic), 10))
33+
require.NoError(t, tmgr.EnsureTableExists(string(join), 10))
3234

3335
// emit values
3436
errg.Go(func() error {
@@ -69,6 +71,7 @@ func TestProcessorShutdown_KafkaDisconnect(t *testing.T) {
6971
ctx.SetValue(msg)
7072
}
7173
}),
74+
goka.Join(goka.Table(join), new(codec.String)),
7275
goka.Persist(new(codec.Int64)),
7376
),
7477
goka.WithConsumerGroupBuilder(goka.ConsumerGroupBuilderWithConfig(cfg)),

systemtest/processor_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,7 @@ func TestRebalance(t *testing.T) {
305305
var (
306306
group = goka.Group(fmt.Sprintf("goka-systemtest-rebalance-%d", time.Now().Unix()))
307307
inputStream string = string(group) + "-input"
308+
joinTable string = string(group) + "-join"
308309
basepath = "/tmp/goka-rebalance-test"
309310
)
310311

@@ -320,6 +321,9 @@ func TestRebalance(t *testing.T) {
320321
err = tm.EnsureStreamExists(inputStream, 20)
321322
require.NoError(t, err)
322323

324+
err = tm.EnsureTableExists(joinTable, 20)
325+
require.NoError(t, err)
326+
323327
em, err := goka.NewEmitter(brokers, goka.Stream(inputStream), new(codec.String))
324328
require.NoError(t, err)
325329

@@ -338,6 +342,7 @@ func TestRebalance(t *testing.T) {
338342
goka.DefineGroup(
339343
group,
340344
goka.Input(goka.Stream(inputStream), new(codec.String), func(ctx goka.Context, msg interface{}) { ctx.SetValue(msg) }),
345+
goka.Join(goka.Table(joinTable), new(codec.String)),
341346
goka.Persist(new(codec.String)),
342347
),
343348
goka.WithRecoverAhead(),

topic_manager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func checkBroker(broker Broker, config *sarama.Config) error {
8989
}
9090

9191
err := broker.Open(config)
92-
if err != nil {
92+
if err != nil && !errors.Is(err, sarama.ErrAlreadyConnected) {
9393
return fmt.Errorf("error opening broker connection: %v", err)
9494
}
9595
connected, err := broker.Connected()

0 commit comments

Comments
 (0)