Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion redhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ package redhub

import (
"bytes"
"context"
"errors"
"sync"
"time"

Expand Down Expand Up @@ -203,6 +205,10 @@ type RedHub struct {
handler func(cmd resp.Command, out []byte) ([]byte, Action)
redHubBufMap map[gnet.Conn]*connBuffer
connSync *sync.RWMutex
mu sync.Mutex
addr string
running bool
engine gnet.Engine
}

// connBuffer holds the buffer and commands for each connection.
Expand Down Expand Up @@ -253,6 +259,9 @@ func NewRedHub(
// The engine parameter provides access to server-wide operations.
// Typically returns gnet.None to indicate normal startup.
func (rs *RedHub) OnBoot(eng gnet.Engine) (action gnet.Action) {
rs.mu.Lock()
rs.engine = eng
rs.mu.Unlock()
return gnet.None
}

Expand Down Expand Up @@ -427,5 +436,34 @@ func ListenAndServe(addr string, options Options, rh *RedHub) error {
opts = append(opts, gnet.WithEdgeTriggeredIO(true))
}

return gnet.Run(rh, addr, opts...)
rh.mu.Lock()
rh.addr = addr
rh.running = true
rh.mu.Unlock()

err := gnet.Run(rh, addr, opts...)

rh.mu.Lock()
rh.running = false
rh.mu.Unlock()

return err
}

// Close gracefully shuts down the RedHub server.
//
// This method stops the server and closes all active connections. It is safe to call
// multiple times. If the server is not currently running, it returns an error.
//
// Returns an error if the server is not running or if the shutdown fails.
func (rs *RedHub) Close() error {
rs.mu.Lock()
defer rs.mu.Unlock()

if !rs.running {
return errors.New("server not running")
}

rs.running = false
return rs.engine.Stop(context.Background())
}
65 changes: 65 additions & 0 deletions redhub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,3 +310,68 @@ func TestShutdownAction(t *testing.T) {
action := rh.OnTraffic(mock)
assert.Equal(t, gnet.Close, action)
}

func TestClose_NotRunning(t *testing.T) {
rh := NewRedHub(nil, nil, func(cmd resp.Command, out []byte) ([]byte, Action) {
return out, None
})

err := rh.Close()
assert.Error(t, err)
assert.Contains(t, err.Error(), "server not running")
}

func TestClose_Integration(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}

rh := NewRedHub(
func(c *Conn) (out []byte, action Action) {
return nil, None
},
func(c *Conn, err error) (action Action) {
return None
},
func(cmd resp.Command, out []byte) ([]byte, Action) {
return resp.AppendString(out, "OK"), None
},
)

// Start server in a goroutine
serverErr := make(chan error, 1)
go func() {
serverErr <- ListenAndServe("tcp://127.0.0.1:16379", Options{Multicore: false}, rh)
}()

// Wait for server to start
time.Sleep(100 * time.Millisecond)

// Test that server is running
conn, err := net.DialTimeout("tcp", "127.0.0.1:16379", time.Second)
assert.NoError(t, err)
assert.NotNil(t, conn)
conn.Close()

// Close the server
err = rh.Close()
assert.NoError(t, err)

// Wait a moment for server to stop
time.Sleep(200 * time.Millisecond)

// Verify connection fails after close
conn, err = net.DialTimeout("tcp", "127.0.0.1:16379", 200*time.Millisecond)
if err == nil {
conn.Close()
t.Error("Expected connection error after server close")
}

// Verify server goroutine returns gracefully (no error when stopped via Close)
select {
case err := <-serverErr:
assert.NoError(t, err)
case <-time.After(2 * time.Second):
t.Error("Server did not stop within timeout")
}
}
Loading