Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion options.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func (opt *poptions) applyOptions(gg *GroupGraph, opts ...ProcessorOption) error
}

if globalConfig.Producer.RequiredAcks == sarama.NoResponse {
return fmt.Errorf("Processors do not work with `Config.Producer.RequiredAcks==sarama.NoResponse`, as it uses the response's offset to store the value")
return fmt.Errorf("processors do not work with `Config.Producer.RequiredAcks==sarama.NoResponse`, as it uses the response's offset to store the value")
}

if opt.builders.producer == nil {
Expand Down
34 changes: 18 additions & 16 deletions partition_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func (pp *PartitionProcessor) Start(setupCtx, ctx context.Context) error {
join := join
pp.runnerGroup.Go(func() error {
defer pp.state.SetState(PPStateStopping)
return join.CatchupForever(runnerCtx, false)
return join.CatchupForever(runnerCtx, true)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this makes the join-table trying to reconnect while the processor is running (together with the other table some lines below)

})
}

Expand All @@ -274,7 +274,7 @@ func (pp *PartitionProcessor) Start(setupCtx, ctx context.Context) error {
// (b) run the processor table in catchup mode so it keeps updating it's state.
case runModePassive:
if pp.table != nil {
err = pp.table.CatchupForever(runnerCtx, false)
err = pp.table.CatchupForever(runnerCtx, true)
}
default:
err = fmt.Errorf("processor has invalid run mode")
Expand All @@ -298,16 +298,16 @@ func (pp *PartitionProcessor) Stop() error {
pp.state.SetState(PPStateStopping)
defer pp.state.SetState(PPStateStopped)

close(pp.input)
close(pp.visitInput)

if pp.cancelRunnerGroup != nil {
pp.cancelRunnerGroup()
}

// wait for the runner to be done
runningErrs := multierror.Append(pp.runnerGroup.Wait().ErrorOrNil())

close(pp.input)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

channels are now closed after the runner-group is done --> visitors are attaching to the runner-group for this.

close(pp.visitInput)

// close all the tables
stopErrg, _ := multierr.NewErrGroup(context.Background())
for _, join := range pp.joins {
Expand Down Expand Up @@ -637,15 +637,6 @@ func (pp *PartitionProcessor) VisitValues(ctx context.Context, name string, meta

var wg sync.WaitGroup

// drains the channel and drops out when closed.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there was actually no point to distinguish between draining until close or draining until it's empty, because this function is writing to the channel.
In case two visitors are started at the same time and one of them panics or is stopped, it'll drain the other's messages too - but that is an issue that existed before, so we'll ignore it here :)

// This is done when the processor shuts down during visit
// and makes sure the waitgroup is fully counted down.
drainUntilClose := func() {
for range pp.visitInput {
wg.Done()
}
}

// drains the input channel until there are no more items.
// does not wait for close, because the channel stays open for the next visit
drainUntilEmpty := func() {
Expand All @@ -662,6 +653,17 @@ func (pp *PartitionProcessor) VisitValues(ctx context.Context, name string, meta
}
}

// register a channel that will close once the visitor itself is done.
visitDone := make(chan struct{})
defer close(visitDone)

// start a goroutine in the processor's runner-errgroup that prevents the broker from shutting down
// while the visitor is running.
pp.runnerGroup.Go(func() error {
<-visitDone
return nil
})

defer it.Release()

stopping, doneWaitingForStop := pp.stopping()
Expand All @@ -673,7 +675,7 @@ func (pp *PartitionProcessor) VisitValues(ctx context.Context, name string, meta
wg.Add(1)
select {
case <-stopping:
drainUntilClose()
drainUntilEmpty()
wg.Done()
return ErrVisitAborted
case <-ctx.Done():
Expand Down Expand Up @@ -703,7 +705,7 @@ func (pp *PartitionProcessor) VisitValues(ctx context.Context, name string, meta
}()
select {
case <-stopping:
drainUntilClose()
drainUntilEmpty()
return ErrVisitAborted
case <-ctx.Done():
drainUntilEmpty()
Expand Down
4 changes: 2 additions & 2 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,11 +421,11 @@ func (g *Processor) handleSessionErrors(ctx, sessionCtx context.Context, session
)

if errors.As(err, &errProc) {
g.log.Debugf("error processing message (non-transient), shutting down processor: %v", err)
g.log.Printf("error processing message (non-transient), shutting down processor: %v", err)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's have those important errors not as debug.

sessionCtxCancel()
}
if errors.As(err, &errSetup) {
g.log.Debugf("setup error (non-transient), shutting down processor: %v", err)
g.log.Printf("setup error (non-transient), shutting down processor: %v", err)
sessionCtxCancel()
}
}
Expand Down
3 changes: 3 additions & 0 deletions systemtest/proc_disconnect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ func TestProcessorShutdown_KafkaDisconnect(t *testing.T) {
brokers := initSystemTest(t)
var (
topic = goka.Stream(fmt.Sprintf("goka_systemtest_proc_shutdown_disconnect-%d", time.Now().Unix()))
join = goka.Stream(fmt.Sprintf("goka_systemtest_proc_shutdown_disconnect-%d-join", time.Now().Unix()))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adding some join tables to the tests so we can test the reconnecting joins change from above.

group = goka.Group(topic)
)

Expand All @@ -29,6 +30,7 @@ func TestProcessorShutdown_KafkaDisconnect(t *testing.T) {
tmgr, err := goka.DefaultTopicManagerBuilder(brokers)
require.NoError(t, err)
require.NoError(t, tmgr.EnsureStreamExists(string(topic), 10))
require.NoError(t, tmgr.EnsureTableExists(string(join), 10))

// emit values
errg.Go(func() error {
Expand Down Expand Up @@ -69,6 +71,7 @@ func TestProcessorShutdown_KafkaDisconnect(t *testing.T) {
ctx.SetValue(msg)
}
}),
goka.Join(goka.Table(join), new(codec.String)),
goka.Persist(new(codec.Int64)),
),
goka.WithConsumerGroupBuilder(goka.ConsumerGroupBuilderWithConfig(cfg)),
Expand Down
5 changes: 5 additions & 0 deletions systemtest/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ func TestRebalance(t *testing.T) {
var (
group = goka.Group(fmt.Sprintf("goka-systemtest-rebalance-%d", time.Now().Unix()))
inputStream string = string(group) + "-input"
joinTable string = string(group) + "-join"
basepath = "/tmp/goka-rebalance-test"
)

Expand All @@ -320,6 +321,9 @@ func TestRebalance(t *testing.T) {
err = tm.EnsureStreamExists(inputStream, 20)
require.NoError(t, err)

err = tm.EnsureTableExists(joinTable, 20)
require.NoError(t, err)

em, err := goka.NewEmitter(brokers, goka.Stream(inputStream), new(codec.String))
require.NoError(t, err)

Expand All @@ -338,6 +342,7 @@ func TestRebalance(t *testing.T) {
goka.DefineGroup(
group,
goka.Input(goka.Stream(inputStream), new(codec.String), func(ctx goka.Context, msg interface{}) { ctx.SetValue(msg) }),
goka.Join(goka.Table(joinTable), new(codec.String)),
goka.Persist(new(codec.String)),
),
goka.WithRecoverAhead(),
Expand Down
2 changes: 1 addition & 1 deletion topic_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func checkBroker(broker Broker, config *sarama.Config) error {
}

err := broker.Open(config)
if err != nil {
if err != nil && !errors.Is(err, sarama.ErrAlreadyConnected) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

accordin to docs, Open might return this if it's already connected and it's not an error.

return fmt.Errorf("error opening broker connection: %v", err)
}
connected, err := broker.Connected()
Expand Down