Skip to content

improve processor stability#473

Merged
frairon merged 1 commit intomasterfrom
processor-stability
Feb 26, 2025
Merged

improve processor stability#473
frairon merged 1 commit intomasterfrom
processor-stability

Conversation

@frairon
Copy link
Contributor

@frairon frairon commented Feb 25, 2025

What this PR tries to improve

Stability of processors in the face of restarting/unstable kafka processors.

Background

We're facing the issue that our kafka-cluster restarts or rebalances from time to time, which makes all processors restart. Since the processors will rebalance, this PR uses reconnecting views to be used for the join/lookup tables.

pp.runnerGroup.Go(func() error {
defer pp.state.SetState(PPStateStopping)
return join.CatchupForever(runnerCtx, false)
return join.CatchupForever(runnerCtx, true)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this makes the join-table trying to reconnect while the processor is running (together with the other table some lines below)

// wait for the runner to be done
runningErrs := multierror.Append(pp.runnerGroup.Wait().ErrorOrNil())

close(pp.input)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

channels are now closed after the runner-group is done --> visitors are attaching to the runner-group for this.


var wg sync.WaitGroup

// drains the channel and drops out when closed.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there was actually no point to distinguish between draining until close or draining until it's empty, because this function is writing to the channel.
In case two visitors are started at the same time and one of them panics or is stopped, it'll drain the other's messages too - but that is an issue that existed before, so we'll ignore it here :)


if errors.As(err, &errProc) {
g.log.Debugf("error processing message (non-transient), shutting down processor: %v", err)
g.log.Printf("error processing message (non-transient), shutting down processor: %v", err)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's have those important errors not as debug.


err := broker.Open(config)
if err != nil {
if err != nil && !errors.Is(err, sarama.ErrAlreadyConnected) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

accordin to docs, Open might return this if it's already connected and it's not an error.

brokers := initSystemTest(t)
var (
topic = goka.Stream(fmt.Sprintf("goka_systemtest_proc_shutdown_disconnect-%d", time.Now().Unix()))
join = goka.Stream(fmt.Sprintf("goka_systemtest_proc_shutdown_disconnect-%d-join", time.Now().Unix()))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adding some join tables to the tests so we can test the reconnecting joins change from above.

@frairon frairon force-pushed the processor-stability branch from 2de9c0e to 17448ff Compare February 25, 2025 08:00
@frairon frairon merged commit 3a26dac into master Feb 26, 2025
5 checks passed
@frairon frairon deleted the processor-stability branch February 26, 2025 05:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants