Skip to content

Commit 7f53f51

Browse files
feat: add Close() method to RedHub server
Add a Close() method to allow graceful shutdown of the RedHub server. This addresses issue #14 and provides a similar API to tidwall/redcon. Changes: - Add mu, addr, and running fields to RedHub struct for server state - Add Close() method that gracefully shuts down the server - Modify ListenAndServe to track server state - Add unit tests for Close() functionality - Add integration test for Close() method The Close() method: - Returns error if server is not running - Calls gnet.Stop() to gracefully shutdown - Is thread-safe with mutex protection - Can be called from any goroutine
1 parent 5f6b6da commit 7f53f51

File tree

2 files changed

+104
-1
lines changed

2 files changed

+104
-1
lines changed

redhub.go

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ package redhub
5656

5757
import (
5858
"bytes"
59+
"context"
60+
"errors"
5961
"sync"
6062
"time"
6163

@@ -203,6 +205,10 @@ type RedHub struct {
203205
handler func(cmd resp.Command, out []byte) ([]byte, Action)
204206
redHubBufMap map[gnet.Conn]*connBuffer
205207
connSync *sync.RWMutex
208+
mu sync.Mutex
209+
addr string
210+
running bool
211+
engine gnet.Engine
206212
}
207213

208214
// connBuffer holds the buffer and commands for each connection.
@@ -253,6 +259,9 @@ func NewRedHub(
253259
// The engine parameter provides access to server-wide operations.
254260
// Typically returns gnet.None to indicate normal startup.
255261
func (rs *RedHub) OnBoot(eng gnet.Engine) (action gnet.Action) {
262+
rs.mu.Lock()
263+
rs.engine = eng
264+
rs.mu.Unlock()
256265
return gnet.None
257266
}
258267

@@ -427,5 +436,34 @@ func ListenAndServe(addr string, options Options, rh *RedHub) error {
427436
opts = append(opts, gnet.WithEdgeTriggeredIO(true))
428437
}
429438

430-
return gnet.Run(rh, addr, opts...)
439+
rh.mu.Lock()
440+
rh.addr = addr
441+
rh.running = true
442+
rh.mu.Unlock()
443+
444+
err := gnet.Run(rh, addr, opts...)
445+
446+
rh.mu.Lock()
447+
rh.running = false
448+
rh.mu.Unlock()
449+
450+
return err
451+
}
452+
453+
// Close gracefully shuts down the RedHub server.
454+
//
455+
// This method stops the server and closes all active connections. It is safe to call
456+
// multiple times. If the server is not currently running, it returns an error.
457+
//
458+
// Returns an error if the server is not running or if the shutdown fails.
459+
func (rs *RedHub) Close() error {
460+
rs.mu.Lock()
461+
defer rs.mu.Unlock()
462+
463+
if !rs.running {
464+
return errors.New("server not running")
465+
}
466+
467+
rs.running = false
468+
return rs.engine.Stop(context.Background())
431469
}

redhub_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,3 +310,68 @@ func TestShutdownAction(t *testing.T) {
310310
action := rh.OnTraffic(mock)
311311
assert.Equal(t, gnet.Close, action)
312312
}
313+
314+
func TestClose_NotRunning(t *testing.T) {
315+
rh := NewRedHub(nil, nil, func(cmd resp.Command, out []byte) ([]byte, Action) {
316+
return out, None
317+
})
318+
319+
err := rh.Close()
320+
assert.Error(t, err)
321+
assert.Contains(t, err.Error(), "server not running")
322+
}
323+
324+
func TestClose_Integration(t *testing.T) {
325+
if testing.Short() {
326+
t.Skip("skipping integration test in short mode")
327+
}
328+
329+
rh := NewRedHub(
330+
func(c *Conn) (out []byte, action Action) {
331+
return nil, None
332+
},
333+
func(c *Conn, err error) (action Action) {
334+
return None
335+
},
336+
func(cmd resp.Command, out []byte) ([]byte, Action) {
337+
return resp.AppendString(out, "OK"), None
338+
},
339+
)
340+
341+
// Start server in a goroutine
342+
serverErr := make(chan error, 1)
343+
go func() {
344+
serverErr <- ListenAndServe("tcp://127.0.0.1:16379", Options{Multicore: false}, rh)
345+
}()
346+
347+
// Wait for server to start
348+
time.Sleep(100 * time.Millisecond)
349+
350+
// Test that server is running
351+
conn, err := net.DialTimeout("tcp", "127.0.0.1:16379", time.Second)
352+
assert.NoError(t, err)
353+
assert.NotNil(t, conn)
354+
conn.Close()
355+
356+
// Close the server
357+
err = rh.Close()
358+
assert.NoError(t, err)
359+
360+
// Wait a moment for server to stop
361+
time.Sleep(200 * time.Millisecond)
362+
363+
// Verify connection fails after close
364+
conn, err = net.DialTimeout("tcp", "127.0.0.1:16379", 200*time.Millisecond)
365+
if err == nil {
366+
conn.Close()
367+
t.Error("Expected connection error after server close")
368+
}
369+
370+
// Verify server goroutine returns gracefully (no error when stopped via Close)
371+
select {
372+
case err := <-serverErr:
373+
assert.NoError(t, err)
374+
case <-time.After(2 * time.Second):
375+
t.Error("Server did not stop within timeout")
376+
}
377+
}

0 commit comments

Comments
 (0)