-
Notifications
You must be signed in to change notification settings - Fork 180
Description
goka newbie here --
I have a goka setup working that allows me to batch actions coming in under a certain key and submit if the length of actions in the batch exceeds 10, but I also want to add a timer functionality that runs in a loop externally and dumps the batch of actions every 30 seconds. I think I can make this happen with the use of a control message like this:
func (uq *userActionQueue) userActionsProcessing(ctx goka.Context, msg any) {
if err != nil {
return
}
batch := uq.getOrCreateBatch(ctx)
// If a control message is identified, submit batch
controlMsg, ok := msg.(*ControlMessage)
if ok && controlMsg.Action == TimedSubmitBatchAction {
uq.submitBatch(ctx.Context(), batch)
batch.Clear()
ctx.SetValue(batch)
return
}
event, ok := msg.(*UserActionsEvent)
if !ok {
return
}
// Store item, check length of batch and potentially submit
...
I want to do something like this for the timer:
go func() {
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
keys := []string{"?"}
for range ticker.C {
for _, key := range keys {
submitBatchEvent := ControlMessage{Action: TimedSubmitBatchAction}
err := uq.userActionEmitter.EmitSync(key, submitBatchEvent)
if err != nil {
return
}
}
}
}()
but obviously, this requires me to have a list of the keys for which this processor instance is responsible. I know I can get information about the keys from a view using the group table topic, but won't that give me all of the keys across all processors? Clearly the processor knows which keys it is responsible for because it is able to load up that local state after a crash, so I am wondering if I can get access to this list somehow? But I could also be misunderstanding how this works.