-
Notifications
You must be signed in to change notification settings - Fork 902
websocket: Initial refactor for websocket handling to support context propagation #2066
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
- Updated websocket read and handle functions to accept context.Context for better cancellation and timeout management. - Modified test cases to pass context when invoking websocket handling methods. - Enhanced data handler to use a message relay for improved message processing. - Adjusted various functions across the exchange package to ensure context is utilized where necessary.
|
@codex please review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements a significant refactor of the websocket handling system to support context propagation throughout the websocket data flow. The primary motivation is to enable comprehensive metrics and tracing from websocket readers through to external consumers, eliminating circular dependencies and improving the overall architecture.
Key changes include:
- Replaces direct channel usage with a new
message.Relayabstraction for websocket data handling - Adds context parameter propagation through all websocket handling methods
- Removes the
observeDatamethod which was causing overhead by moving data through unnecessary channels
Reviewed Changes
Copilot reviewed 69 out of 69 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| exchanges/trade/trade_types.go | Updates Trade struct to use message.Relay instead of direct channel |
| exchanges/trade/trade.go | Modifies Setup and Update methods to work with message.Relay and context |
| exchanges/sharedtestvalues/sharedtestvalues.go | Updates test websocket creation to use message.Relay |
| exchanges/fill/fill_types.go | Changes Fills struct to use message.Relay for data handling |
| exchanges/fill/fill.go | Updates fill processing methods to support context and message.Relay |
| exchanges/exchange.go | Updates websocket-related method calls to include context parameters |
| Multiple exchange websocket files | Extensive updates to websocket handlers adding context parameters and using message.Relay.Send() |
| } | ||
| oSide, err = order.StringToOrderSide(notification.Data[i].OrderMode) | ||
|
|
||
| oSide, err = order.StringToOrderSide(strings.ReplaceAll(notification.Data[i].OrderMode, "MODE_", "")) |
Copilot
AI
Sep 22, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] This string replacement logic appears to be handling a specific data format transformation. Consider extracting this into a named constant or helper function for better maintainability and to document the purpose of removing the 'MODE_' prefix.
| oSide, err = order.StringToOrderSide(strings.ReplaceAll(notification.Data[i].OrderMode, "MODE_", "")) | |
| oSide, err = order.StringToOrderSide(stripOrderModePrefix(notification.Data[i].OrderMode)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was just to fix a test, the other implementations don't seem to have the same handling which leads me to believe the test websocket payload might be wrong but this is all out of scope.
| } | ||
| errCode, err := jsonparser.GetInt(respRaw, "code") | ||
| if err != nil { | ||
| log.Errorf(log.ExchangeSys, "%s %s 'code': %s from message: %s", e.Name, common.ErrParsingWSField, err, respRaw) | ||
| } | ||
| return fmt.Errorf("WS auth subscription error; Status: %s Error Code: %d", status, errCode) |
Copilot
AI
Sep 22, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code will always return an error even when authentication succeeds. The error return should be conditional - it should only return an error when the status indicates failure, not success.
| } | |
| errCode, err := jsonparser.GetInt(respRaw, "code") | |
| if err != nil { | |
| log.Errorf(log.ExchangeSys, "%s %s 'code': %s from message: %s", e.Name, common.ErrParsingWSField, err, respRaw) | |
| } | |
| return fmt.Errorf("WS auth subscription error; Status: %s Error Code: %d", status, errCode) | |
| } else if status == "ERROR" { | |
| errCode, err := jsonparser.GetInt(respRaw, "code") | |
| if err != nil { | |
| log.Errorf(log.ExchangeSys, "%s %s 'code': %s from message: %s", e.Name, common.ErrParsingWSField, err, respRaw) | |
| } | |
| return fmt.Errorf("WS auth subscription error; Status: %s Error Code: %d", status, errCode) | |
| } | |
| // For any other status, do not return an error, just log it for visibility | |
| log.Warnf(log.ExchangeSys, "%s received unhandled WS auth status: %s from message: %s", e.Name, status, respRaw) | |
| return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
status return might not ever be "ERROR" and this doesn't take in an unhandled status event; as this is out of scope I will defer this.
| for i := range snapBundle { | ||
| if positionData, ok := snapBundle[i].([]any); ok { | ||
| e.wsHandleOrder(positionData) | ||
| return e.wsHandleOrder(ctx, positionData) |
Copilot
AI
Sep 22, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This return statement breaks out of the loop after processing only the first position. The loop should continue processing all positions and only return after the loop completes or if an error occurs.
| return e.wsHandleOrder(ctx, positionData) | |
| if err := e.wsHandleOrder(ctx, positionData); err != nil { | |
| return err | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This I will do.
| ctx := account.DeployCredentialsToContext(t.Context(), &account.Credentials{ | ||
| Key: "testkey", | ||
| Secret: base64.StdEncoding.EncodeToString([]byte("testsecret")), | ||
| }) |
Copilot
AI
Sep 22, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Consider extracting the test credentials creation into a helper function since this pattern may be reused across multiple test functions. This would improve maintainability and reduce code duplication.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unless others want me to do this I will leave this open.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Codex Review: Here are some suggestions.
About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you open a pull request for review, mark a draft as ready, or comment "@codex review". If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex fix this CI failure" or "@codex address that feedback".
exchange/websocket/buffer/buffer.go
Outdated
|
|
||
| // Publish all state changes, disregarding verbosity or sync requirements. | ||
| holder.ob.Publish() | ||
| o.dataHandler <- holder.ob | ||
| o.dataHandler.Send(ctx, holder.ob) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[P1] Handle relay Send errors to avoid dropping websocket data
The new message.Relay reports a full buffer via an error, but both Orderbook.LoadSnapshot and Orderbook.Update call o.dataHandler.Send(ctx, holder.ob) and ignore the return value. When the buffer is full these sends will now fail silently, so consumers never see the published order book state even though internal structures were updated. Previously the channel send blocked until consumers drained the queue, so no data was lost. The same pattern also exists in exchanges/trade/trade.go and exchanges/fill/fill.go. Consider propagating or at least logging the error so backpressure can be surfaced and the update retried or dropped explicitly.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔥
engine/rpcserver.go
Outdated
| return nil, fmt.Errorf("websocket not supported for exchange %s", r.Exchange) | ||
| } | ||
|
|
||
| err = w.SetProxyAddress(r.Proxy) | ||
| err = w.SetProxyAddress(ctx, r.Proxy) | ||
| if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[P1] Do not reuse RPC request context for websocket lifetime
The RPC handlers now pass the request’s ctx into w.Enable and w.SetProxyAddress. Connect stores this context and uses it for future reconnects inside the connection monitor. gRPC cancels the request context as soon as the RPC returns, so any later reconnect attempt will run with a canceled context and fail immediately with context canceled, leaving the websocket permanently disconnected after the first dropout or proxy change. These long‑lived operations should use a background context or derive a new context instead of the short‑lived RPC context.
Useful? React with 👍 / 👎.
|
@codex please review |
|
Codex Review: Didn't find any major issues. What shall we delve into next? About Codex in GitHubYour team has set up Codex to review pull requests in this repo. Reviews are triggered when you open a pull request for review, mark a draft as ready, or comment "@codex review". If Codex has suggestions, it will comment; otherwise it will react with 👍. Codex can also answer questions or update the PR. Try commenting "@codex fix this CI failure" or "@codex address that feedback". |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #2066 +/- ##
==========================================
- Coverage 42.13% 42.07% -0.06%
==========================================
Files 445 446 +1
Lines 143218 142919 -299
==========================================
- Hits 60341 60133 -208
+ Misses 75749 75639 -110
- Partials 7128 7147 +19
🚀 New features to boost your workflow:
|
gloriousCode
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I appreciate context usage, but the nolint leaves me uncertain. I will leave the AI output for you to consider
Co-authored-by: Gareth Kirwan <[email protected]>
Yeah I like it. All the values that are put in by the library are captured across this boundary by freezecontext so you don't need to set the required values you want in common, it will auto scale. Anything else are dropped and it keeps its speed. Will see if @thrasher- or @gloriousCode wants this for this PR. Then I will drop it in. Or we can open another PR for it after. |
gloriousCode
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have attempted to improve things. I haven't come up short, but I haven't smashed face either
Happy as is, it works. Can always adjust it once the patch is more fleshed out and in a different PR |
gbjk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good work.
Couple of nits and some pedantry about package naming.
Otherwise I think I'll be 👍 on the next round.
engine/rpcserver.go
Outdated
| // rpcContextToLongLivedSession converts a short-lived incoming context to a long-lived outgoing context, this is due | ||
| // to the incoming context being cancelled when the RPC call completes. | ||
| func (s *RPCServer) rpcContextToLongLivedSession(ctx context.Context) context.Context { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🐒 Optional nitpick:
Okay; I kinda get it, except LongLivedSession seems ... well weird.
We're basically just stripping out the lifecycle from the context, right?
So ... isn't this just a very fancy context.WithoutCancel ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah good point, any other naming suggestions and I will change it, and should this function live somewhere like common?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean... if it's context.WithoutCancel you don't need this function... right?
Just drop in replace it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well I feel like an idiot. 😆
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch: ca24dc1
exchange/message/message.go
Outdated
| @@ -0,0 +1,48 @@ | |||
| package message | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚧 Change request:
exchange/message feels like a very overloaded term and it feels like you're domain-squatting it a bit here.
I get that this could represent any message queue.
But right now we don't have an idea of any other type of messages here.
And this is fundamentally a relay, not a message thing.
So I'd like to see this in exchange/websocket/relay/relay.go instead.
But I'm also thinking it's even better in exchange/websocket/buffer/relay.go 🎉
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My reasoning for placing this in exchange/message was that the relay is intentionally generic — it doesn’t depend on WebSockets or exchange code, and its API (message.Relay) is transport-agnostic by design.
If we move it into websocket/relay, it implicitly couples the type to a subsystem it doesn’t rely on and might make it harder to reuse later for other messaging paths (e.g., FIX, -- these are all a strech --> RPC, internal event pumps, REST batch queues).
That said, I’m happy to relocate it if the consensus is that the Relay should remain WebSocket-specific for now.
Also Buffer naming I think will be subject to change because no websocket orderbooks should buffer events or sort them.
Also what about exchange/stream/relay.go?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think exchange/stream/relay.go is the best. Removes the websocket coupling since it's protocol agnostic and I agree that message is too generic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exchange/stream/relay.gois good 👍
🥃 Your reasoning about buffer was backwards. Just because websocket orderbooks are in a package called buffer doesn't mean that nothing else can be, or that when they move out we couldn't leave a relay in there. But that's just an aside.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was more meaning, its going to change, then we will have to change the package name for all relay instances and it doesn't really reflect buffer that's all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done: db647bf
Co-authored-by: Gareth Kirwan <[email protected]>
gbjk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good work.
One change request, but optional.
|
This PR is stale because it has been open 21 days with no activity. Please provide an update on the progress of this PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is looking good. I've been trying to break things in various ways. I'll continue tomorrow later. For now I thought I'd give you one thing which I think is solid to do
gloriousCode
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tACK! Thanks for working through the issues
thrasher-
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just needs a rebase and then looks good to go
thrasher-
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for making those changes. NOTE: This gets closer to achieving @romanornr's happy place with faster ctrl + C's
Adds package
messagefor the websocket data handler so there is no circular dependenciesSendSendtakes in context that can be consumed for future metrics/trace. e.g. external consumer using signal (orderbook change) can get metrics all the way back from the websocket reader routine.This removes
observeDatamethod on websocket manager. This was just moving data to another channel just to indicate blockage which had a bit of overhead, the above design eliminates the need for that mechanism.Tries to keep things contained in websocket only a few context.TODO's and these will be opened up in a future PR:
This complements issue #2026 for a context based OTEL solution
Also complements issue #2068
This also will help trace along with profiling #1951 as it seems to be just a garbage collection spike, optionally can track GC cycles on full trace for that signal.
Type of change
Please delete options that are not relevant and add an
xin[]as item is complete.How has this been tested
Please describe the tests that you ran to verify your changes. Provide instructions so we can reproduce. Please also list any relevant details for your test configuration and
also consider improving test coverage whilst working on a certain feature or package.
Checklist