-
Notifications
You must be signed in to change notification settings - Fork 193
feat(router): add ability efficiently unmarshall events (Cosmo Streams) #2352
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat(router): add ability efficiently unmarshall events (Cosmo Streams) #2352
Conversation
Co-authored-by: Ludwig Bedacht <[email protected]> Co-authored-by: StarpTech <[email protected]> Co-authored-by: Dominik Korittki <[email protected]>
Co-authored-by: Ludwig Bedacht <[email protected]> Co-authored-by: StarpTech <[email protected]> Co-authored-by: Dominik Korittki <[email protected]>
Merge conflict was on go.mod files in router and router-tests, because a new upstream engine version was used. I go-getted the latest engine version of the topic branch, which includes the latest engine changes + cosmo streams.
Co-authored-by: Alessandro Pagnin <[email protected]>
) Co-authored-by: Alessandro Pagnin <[email protected]>
The updateSubscription method is running once per subscription client in seperate go routines. The events slice passed into this function is shared between all routines. The hook execution in line 107 can either return the original events slice or a new deep copy, if events go modified by a hook. After a hook call returned we cleaned up the events slice from nil elements, thus modifying it. If the hook call prior to that returned the original events slice instead of a deep copy, we caused a race condition, since we operate on the original slice with its underlying shared array. To prevent this I removed the cleanup of nil elements and instead filter for these elements on the only place where they could hurt: The subscription update call to the engine. This way the updateSubscription function does not write to the slice at all, only reads. The only writes could happen in the hook call and our hook interface design should prevent modifying the original slice and instead of to do a deep copy first.
It might not be garantueed that there are no more subscription updaters running when the event updater Close() function is called. There is no logic waiting for the channel to close and since the garbage collector cleans up orphaned channels, there is no actual need to close the channel here.
The MarshalJSONTemplate method creates json-like output data, but it's not actually json conform. It contains placeholders with dollar signs as delimiters. For this reason we cannot use a json marshaller to render the output and resorted to creating the string manually. This could become a security problem because we do not really validate the input we use to create the output. Hypothetically an attacker could escape in that function and inject custom fields into the output. To circumvent this we marshall as many fields as we can. The only remaining field p.Event.Data does not contain user-input data, so we are safe not escaping it.
If a hook returns an error we stop processing the hook and use that error for further error handling.
The core.NewHttpGraphqlError() method returns an error, which does not fit the OnPublishEvent hook very well. Instead we return a generic error, which results in the same thing. The error handling does not behave differently because of it.
If this hook returns an error it is not wanted that the error is logged. Other Cosmo Streams hooks don't do it. The reason is we want the hook developer to decide what log level to use for errors, or if he doesn't want to log an error at all. This error log handling is already in place for the other two hooks but somehow was forgotten on this one.
This makes the OnPublishEvent hook be more consistent in nil event filtering with the OnReceiveEvent hook.
core.NewHttpGraphqlError() is not really the best way to describe
an error in hooks processing. We have to set http specific codes
and status texts, which does not really fit well with subscriptions.
I created a new error for this called StreamHookError, which lets you
pass a message. It can be returned from a hook:
return core.StreamHandlerError{Message: "my hook error"}
and it gets sent to the subscription client
{"id":"1","type":"error","payload":[{"message":"my hook error"}]}
Afterwards we close the connection. This behaviour remains unchanged.
Removed a switch case, which is not reachable anymore. Also removed an early return in case a type assertion did not work. This way we ensure the response is actually sent either way.
This actually introduces a bug, where occasionally subscription clients will get disconnected after receiving messages when we deal with lots of connecting and disconnecting clients. It depends on two contexts being regularly checked inside the Kafka topic poller and if one of them cancels, the connections would get closed. These contexts are meant to handle the lifecycle of the Kafka adapter, not subscriptions. So we should not close the subscription updater when the poller is canceled. After this commit it behaves like the router behaves before Cosmo Streams. All of this can be reevaluated, maybe there is room for improvement. But for now its better not to change this bevaviour.
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Router image scan passed✅ No security vulnerabilities found in image: |
@coderabbitai summary
Checklist
The Cosmo Streams hook OnReceiveEvents is executed for each subscriber but the events stay the same during that time. A hook developer has to unmarshall the same event over and over again because of that. This can quickly become a performance bottleneck.
We should give hook developers a way to unmarshal events efficiently. It must be ensured that the interface of the handler does not change.
In order to achieve that this pull request introduces a new method on events:
Decode(v). It accepts a pointer parametervinto which it will unmarshal the events payload, similar to howjson.Unmarshalworks. To make it efficient we cache the unmarshal results inside the event struct. Subsequent calls to Decode on the same event will return the cached results. We store results in a sync.Map, and the key is the type of v. This allows us to remember the result based on what type we marshalled in. This could be multiple.I also made sure to avoid copying the cache on Clone() because sync.Map can't be copied. Also, when SetData() is called we clear the cache to avoid stale results. Unfortunately the
Dataon aMutableEventis publicly accessable, so the data field can change without a cache clear, and we can't make the field private, since the router code needs it to be public at the moment. We could improve on this in the future.