Skip to content

Add ValkeyClient.subscribe functions that uses one connection and.reconnects if it is lost #131

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: main
Choose a base branch
from

Conversation

adam-fowler
Copy link
Collaborator

No description provided.

Copy link

github-actions bot commented Jul 15, 2025

✅ Pull request no significant performance differences ✅

Summary

New baseline 'pull_request' is WITHIN the 'main' baseline thresholds.

Full Benchmark Comparison

Comparing results between 'main' and 'pull_request'

Host 'd637131128cb' with 4 'x86_64' processors with 15 GB memory, running:
#18~24.04.1-Ubuntu SMP Sat Jun 28 04:46:03 UTC 2025

ValkeyBenchmarks

Client: GET benchmark metrics

Malloc (total): results within specified thresholds, fold down for details.

Malloc (total) * p0 p25 p50 p75 p90 p99 p100 Samples
main 79 79 80 83 84 84 84 6
pull_request 75 79 82 83 84 84 84 6
Δ -4 0 2 0 0 0 0 0
Improvement % 5 0 -2 0 0 0 0 0

Client: GET benchmark | parallel 20 | 20 concurrent connections metrics

Malloc (total): results within specified thresholds, fold down for details.

Malloc (total) * p0 p25 p50 p75 p90 p99 p100 Samples
main 92 97 98 101 105 110 110 24
pull_request 92 96 99 101 105 111 111 25
Δ 0 -1 1 0 0 1 1 1
Improvement % 0 1 -1 0 0 -1 -1 1

Connection: GET benchmark metrics

Malloc (total): results within specified thresholds, fold down for details.

Malloc (total) * p0 p25 p50 p75 p90 p99 p100 Samples
main 4 4 4 4 4 4 4 8
pull_request 4 4 4 4 4 4 4 8
Δ 0 0 0 0 0 0 0 0
Improvement % 0 0 0 0 0 0 0 0

Connection: Pipeline benchmark metrics

Malloc (total): results within specified thresholds, fold down for details.

Malloc (total) * p0 p25 p50 p75 p90 p99 p100 Samples
main 37 37 37 37 37 37 37 5
pull_request 36 37 37 37 37 37 37 5
Δ -1 0 0 0 0 0 0 0
Improvement % 3 0 0 0 0 0 0 0

HashSlot – {user}.whatever metrics

Malloc (total): results within specified thresholds, fold down for details.

Malloc (total) * p0 p25 p50 p75 p90 p99 p100 Samples
main 0 0 0 0 0 0 0 18
pull_request 0 0 0 0 0 0 0 18
Δ 0 0 0 0 0 0 0 0
Improvement % 0 0 0 0 0 0 0 0

ValkeyCommandEncoder – Command with 7 words metrics

Malloc (total): results within specified thresholds, fold down for details.

Malloc (total) * p0 p25 p50 p75 p90 p99 p100 Samples
main 0 0 0 0 0 0 0 747
pull_request 0 0 0 0 0 0 0 753
Δ 0 0 0 0 0 0 0 6
Improvement % 0 0 0 0 0 0 0 6

ValkeyCommandEncoder – Simple GET metrics

Malloc (total): results within specified thresholds, fold down for details.

Malloc (total) * p0 p25 p50 p75 p90 p99 p100 Samples
main 0 0 0 0 0 0 0 1904
pull_request 0 0 0 0 0 0 0 1912
Δ 0 0 0 0 0 0 0 8
Improvement % 0 0 0 0 0 0 0 8

ValkeyCommandEncoder – Simple MGET 15 keys metrics

Malloc (total): results within specified thresholds, fold down for details.

Malloc (total) * p0 p25 p50 p75 p90 p99 p100 Samples
main 0 0 0 0 0 0 0 364
pull_request 0 0 0 0 0 0 0 364
Δ 0 0 0 0 0 0 0 0
Improvement % 0 0 0 0 0 0 0 0

@adam-fowler adam-fowler requested a review from yaxing July 15, 2025 16:04
@adam-fowler
Copy link
Collaborator Author

Was thinking I could probably extend this to the cluster client as well

@@ -328,15 +328,13 @@ public final actor ValkeyConnection: ValkeyConnectionProtocol, Sendable {
/// create a BSD sockets based bootstrap
private static func createSocketsBootstrap(eventLoopGroup: EventLoopGroup) -> ClientBootstrap {
ClientBootstrap(group: eventLoopGroup)
.channelOption(ChannelOptions.allowRemoteHalfClosure, value: true)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The client doesn't need half closure, and it meant we were leaving connections open if the database was closed

Copy link
Collaborator

@fabianfett fabianfett left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3 nits and 1 question

@adam-fowler
Copy link
Collaborator Author

3 nits and 1 question

What's the question

@adam-fowler adam-fowler requested a review from fabianfett July 18, 2025 14:09
process: (AsyncMapSequence<ValkeySubscription, ValkeyKey>) async throws -> sending Value
) async throws -> sending Value {
try await self.subscribe(to: [ValkeySubscriptions.invalidateChannel]) { subscription in
try await self.subscribe(to: [ValkeySubscriptions.invalidateChannel], isolation: isolation) { subscription in
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for a follow up: can we use CollectionOfOne here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this then requires some Collection through a whole series of functions. Can we do this in a separate PR

let value: Value
do {
value = try await process(stream)
try Task.checkCancellation()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it becomes borderline impossible to exit this closure without CancellationError, is this the behavior we want?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not totally sure what you mean here. Its up to the contents of the closure how they exit.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the point with this check it isn't anymore.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw. we must make sure that the unsubscribe job runs outside of this task, as this will always close the connection otherwise. I think we'll need to do a

await Task {
  // cancellation shield
  self.unsubscribe()
}.get()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The checkCancellation was there to ensure a CancellationError was thrown if the Task was cancelled. AsyncStream doesn't throw an error when it is cancelled.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved the unsubscribe into its own unstructured Task

@adam-fowler
Copy link
Collaborator Author

Updated to only use one connection for subscriptions

@adam-fowler adam-fowler changed the title Add ValkeyClient.subscribe functions that reconnect if connection is lost Add ValkeyClient.subscribe functions that uses one connection and.reconnects if it is lost Aug 6, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants