@@ -20,6 +20,8 @@ import (
2020 "errors"
2121 "fmt"
2222 "io"
23+ "strings"
24+ "time"
2325
2426 "github.com/google/uuid"
2527 "github.com/higress-group/proxy-wasm-go-sdk/proxywasm"
@@ -114,7 +116,11 @@ type RedisClusterClient[C Cluster] struct {
114116}
115117
116118type redisOption struct {
117- dataBase int
119+ dataBase int
120+ bufferFlushTimeout int // in milliseconds, default 3ms when not set
121+ maxBufferSizeBeforeFlush int // in bytes, default 1024 bytes when not set
122+ bufferFlushTimeoutSet bool // flag to indicate if bufferFlushTimeout was explicitly set
123+ maxBufferSizeBeforeFlushSet bool // flag to indicate if maxBufferSizeBeforeFlush was explicitly set
118124}
119125
120126type optionFunc func (* redisOption )
@@ -125,6 +131,32 @@ func WithDataBase(dataBase int) optionFunc {
125131 }
126132}
127133
134+ func WithBufferFlushTimeout (timeout time.Duration ) optionFunc {
135+ return func (o * redisOption ) {
136+ o .bufferFlushTimeout = int (timeout .Milliseconds ())
137+ o .bufferFlushTimeoutSet = true
138+ }
139+ }
140+
141+ func WithMaxBufferSizeBeforeFlush (size int ) optionFunc {
142+ return func (o * redisOption ) {
143+ o .maxBufferSizeBeforeFlush = size
144+ o .maxBufferSizeBeforeFlushSet = true
145+ }
146+ }
147+
148+ // WithDisableBuffer is a convenience function that disables all buffering by setting
149+ // both buffer_flush_timeout and max_buffer_size_before_flush to 0.
150+ // This is useful for latency-sensitive scenarios where immediate query execution is required.
151+ func WithDisableBuffer () optionFunc {
152+ return func (o * redisOption ) {
153+ o .bufferFlushTimeout = 0
154+ o .bufferFlushTimeoutSet = true
155+ o .maxBufferSizeBeforeFlush = 0
156+ o .maxBufferSizeBeforeFlushSet = true
157+ }
158+ }
159+
128160func NewRedisClusterClient [C Cluster ](cluster C ) * RedisClusterClient [C ] {
129161 return & RedisClusterClient [C ]{
130162 cluster : cluster ,
@@ -221,12 +253,42 @@ func (c *RedisClusterClient[C]) Ready() bool {
221253}
222254
223255func (c * RedisClusterClient [C ]) Init (username , password string , timeout int64 , opts ... optionFunc ) error {
256+ // Apply user options
224257 for _ , opt := range opts {
225258 opt (& c .option )
226259 }
260+
227261 clusterName := c .cluster .ClusterName ()
262+
263+ // Build query parameters based on options
264+ params := make ([]string , 0 , 3 )
265+
266+ // Add database parameter if configured
228267 if c .option .dataBase != 0 {
229- clusterName = fmt .Sprintf ("%s?db=%d" , clusterName , c .option .dataBase )
268+ params = append (params , fmt .Sprintf ("db=%d" , c .option .dataBase ))
269+ }
270+
271+ // Add buffer_flush_timeout parameter
272+ // Default: 3ms if not explicitly set by user
273+ // If explicitly set to 0, use 0 (disable timeout-based flushing)
274+ bufferTimeout := c .option .bufferFlushTimeout
275+ if ! c .option .bufferFlushTimeoutSet {
276+ bufferTimeout = 3 // default 3ms when not configured
277+ }
278+ params = append (params , fmt .Sprintf ("buffer_flush_timeout=%d" , bufferTimeout ))
279+
280+ // Add max_buffer_size_before_flush parameter
281+ // Default: 1024 bytes if not explicitly set by user
282+ // If explicitly set to 0, use 0 (disable buffering, send immediately)
283+ bufferSize := c .option .maxBufferSizeBeforeFlush
284+ if ! c .option .maxBufferSizeBeforeFlushSet {
285+ bufferSize = 1024 // default 1024 bytes when not configured
286+ }
287+ params = append (params , fmt .Sprintf ("max_buffer_size_before_flush=%d" , bufferSize ))
288+
289+ // Append all parameters to cluster name
290+ if len (params ) > 0 {
291+ clusterName = fmt .Sprintf ("%s?%s" , clusterName , strings .Join (params , "&" ))
230292 }
231293
232294 // Always set checkReadyFunc to support re-authentication
0 commit comments