-
Notifications
You must be signed in to change notification settings - Fork 505
Description
Bug Description
In contrib/valkey-io/valkey-go/v2, the dedicatedClient wrapper struct does not properly delegate Do(), DoMulti(), Receive(), and Close() to the underlying valkey.DedicatedClient. These methods are unintentionally promoted from the embedded *client, causing them to execute against the original cluster client instead of the dedicated connection.
This makes Client.Dedicated() and Client.Dedicate() completely non-functional when using the tracing wrapper — any command executed inside a Dedicated callback goes through the shared connection pool, not the dedicated wire.
Root Cause
In valkey.go, the dedicatedClient struct is defined as:
type dedicatedClient struct {
*client // embedded — promotes Do(), DoMulti(), Receive(), Close()
dedicatedClient valkey.DedicatedClient // named field — methods are NOT promoted
}Because dedicatedClient (the field) is a named field, its methods are not promoted. Meanwhile, *client is an embedded (anonymous) field, so its methods — including Do(), DoMulti(), Receive(), and Close() — are promoted to dedicatedClient.
Only SetPubSubHooks() is explicitly overridden:
func (c *dedicatedClient) SetPubSubHooks(hooks valkey.PubSubHooks) <-chan error {
return c.dedicatedClient.SetPubSubHooks(hooks)
}The promoted *client.Do() calls c.client.Do(), which is the original valkey.Client (the cluster/standalone client), not the dedicated connection:
func (c *client) Do(ctx context.Context, cmd valkey.Completed) valkey.ValkeyResult {
span, ctx := c.startSpan(ctx, processCommand(&cmd))
resp := c.client.Do(ctx, cmd) // ← this is the original cluster client, NOT the dedicated connection
// ...
}Impact
Any use of Dedicated() with the DD tracing wrapper is broken:
| Method | Override? | Actual target |
|---|---|---|
SetPubSubHooks() |
✅ Yes | Dedicated connection |
Do() |
❌ No (promoted) | Original cluster client |
DoMulti() |
❌ No (promoted) | Original cluster client |
Receive() |
❌ No (promoted) | Original cluster client |
Close() |
❌ No (promoted) | Closes entire client |
B() |
❌ No (promoted) | Cluster client (harmless — Builder is stateless) |
Pub/Sub is completely broken
This is the most critical impact. The standard valkey-go pattern for sharded Pub/Sub using Dedicated + SetPubSubHooks silently fails:
client.Dedicated(func(dc valkey.DedicatedClient) error {
// ✅ Hooks are registered on the dedicated wire
wait := dc.SetPubSubHooks(valkey.PubSubHooks{
OnMessage: func(msg valkey.PubSubMessage) {
fmt.Println("received:", msg.Message) // Never called!
},
})
// ❌ BUG: SSUBSCRIBE is sent through the cluster client's connection pool,
// NOT through the dedicated wire where hooks are registered.
if err := dc.Do(ctx, dc.B().Ssubscribe().Channel("ch").Build()).Error(); err != nil {
return err
}
// Blocks forever — the dedicated wire never subscribed, so no messages arrive.
select {
case err := <-wait:
return err
case <-ctx.Done():
return nil
}
})The subscription command (SSUBSCRIBE) goes to a random pool connection, while SetPubSubHooks listens on the dedicated wire. Messages are never delivered.
Dedicated transactions are also broken
client.Dedicated(func(dc valkey.DedicatedClient) error {
// These commands go through the cluster client pool — NOT a single dedicated connection.
// There is no atomicity or connection pinning guarantee.
dc.Do(ctx, dc.B().Multi().Build())
dc.Do(ctx, dc.B().Set().Key("k").Value("v").Build())
dc.Do(ctx, dc.B().Exec().Build())
return nil
})Suggested Fix
Add explicit method overrides on dedicatedClient that delegate to c.dedicatedClient:
func (c *dedicatedClient) Do(ctx context.Context, cmd valkey.Completed) valkey.ValkeyResult {
span, ctx := c.startSpan(ctx, processCommand(&cmd))
resp := c.dedicatedClient.Do(ctx, cmd)
setClientCacheTags(span, resp)
c.finishSpan(span, resp.Error())
return resp
}
func (c *dedicatedClient) DoMulti(ctx context.Context, multi ...valkey.Completed) []valkey.ValkeyResult {
span, ctx := c.startSpan(ctx, processCommandMulti(multi))
resp := c.dedicatedClient.DoMulti(ctx, multi...)
c.finishSpan(span, c.firstError(resp))
return resp
}
func (c *dedicatedClient) Receive(ctx context.Context, subscribe valkey.Completed, fn func(msg valkey.PubSubMessage)) error {
span, ctx := c.startSpan(ctx, processCommand(&subscribe))
err := c.dedicatedClient.Receive(ctx, subscribe, fn)
c.finishSpan(span, err)
return err
}
func (c *dedicatedClient) Close() {
c.dedicatedClient.Close()
}Environment
github.com/DataDog/dd-trace-go/contrib/valkey-io/valkey-go/v2v2.5.0 (also confirmed in v2.0.1)github.com/valkey-io/valkey-gov1.0.62- Go 1.25.3
- Valkey cluster mode
Workaround
Use client.Receive() instead of client.Dedicated() + SetPubSubHooks() for Pub/Sub. The Receive() method on *client correctly delegates to the underlying client:
// Works correctly through the DD wrapper
err := client.Receive(ctx, client.B().Ssubscribe().Channel("ch").Build(), func(msg valkey.PubSubMessage) {
fmt.Println("received:", msg.Message)
})Note: there is no known workaround for dedicated transactions (MULTI/EXEC).