Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
13fd29e
chore: gitignore .claude/ and docs/
debsahu May 4, 2026
943dc86
feat(dashboard): skeleton package with embed.FS
debsahu May 4, 2026
a8500b3
feat(dashboard): HMAC session cookie helpers
debsahu May 4, 2026
c933c2c
feat(dashboard): file-backed multi-user credential store
debsahu May 4, 2026
1af86f1
feat(dashboard): in-memory failed-login lockout tracker
debsahu May 4, 2026
bec438d
feat(dashboard): CSRF token helpers
debsahu May 4, 2026
3b37160
feat(dashboard): auth middleware (login, force-rotate, expiry, RBAC)
debsahu May 4, 2026
8ef257d
feat(rest): paginated response helper
debsahu May 4, 2026
fd108e6
feat(rest): paginated GET /clients with prefix search
debsahu May 4, 2026
c5dcb65
feat(rest): paginated GET /subscriptions with topic/client filters
debsahu May 4, 2026
7b02753
feat(rest): GET /topics tree endpoint
debsahu May 4, 2026
08c390c
feat(rest): DELETE /clients/{id}/subscriptions/{topic}
debsahu May 4, 2026
155cf8a
feat(rest): retained-message browser endpoints
debsahu May 4, 2026
3223032
feat(rest): session inspector endpoints + Server.Hooks() accessor
debsahu May 4, 2026
42d565d
feat(dashboard): SSE event hub
debsahu May 4, 2026
82f7132
feat(dashboard): SSE handler at /dashboard/events
debsahu May 4, 2026
9be7227
feat(dashboard): mqtt-hook to SSE event bridge
debsahu May 4, 2026
ae33ba5
feat(cluster/rest): mirror dashboard read endpoints
debsahu May 4, 2026
8e9ac0f
build(dashboard): Tailwind v3 build pipeline (no Node)
debsahu May 4, 2026
c61655b
feat(dashboard): layout, nav, theme toggle, vendored htmx
debsahu May 4, 2026
6488213
feat(dashboard): login + logout flow with lockout integration
debsahu May 4, 2026
c7fac7c
feat(dashboard): force-rotation password change flow
debsahu May 4, 2026
937daac
feat(dashboard): overview page with polling cards
debsahu May 4, 2026
001b3cb
feat(dashboard): clients table with pagination and search
debsahu May 4, 2026
36f3a30
feat(dashboard): blacklist page (admin-write, viewer-read)
debsahu May 4, 2026
63b7280
feat(dashboard): publish-message tool
debsahu May 4, 2026
c88813e
feat(dashboard): settings page with chroma syntax highlighting
debsahu May 4, 2026
477d7ad
feat(dashboard): users page with admin-only RBAC management
debsahu May 4, 2026
2c0402e
feat(dashboard): personal account page
debsahu May 4, 2026
b1cba45
feat(single): wire dashboard into the http listener
debsahu May 4, 2026
d37ca12
feat(cluster): wire dashboard into the http listener
debsahu May 4, 2026
e1d1531
feat(dashboard): SSE-driven recent events feed on overview
debsahu May 4, 2026
dfa1479
feat(dashboard): inline-SVG sparkline helper
debsahu May 4, 2026
467af47
feat(dashboard): client detail page with per-sub unsubscribe
debsahu May 4, 2026
3f24ef2
feat(dashboard): subscriptions, topics, retained, sessions pages
debsahu May 4, 2026
fa17923
feat(dashboard): redis-backed credential store
debsahu May 4, 2026
d16267c
feat(dashboard): redis-backed HMAC secret
debsahu May 4, 2026
7a06f7c
feat(dashboard): cluster page (cluster binary only)
debsahu May 4, 2026
bb2d332
feat(dashboard): cluster-wide event aggregation via redis pub/sub
debsahu May 4, 2026
49ef2c8
test(dashboard): cluster-wide event aggregation integration test
debsahu May 4, 2026
1c93750
feat(cluster): wire redis cred store, secret, and event bridge
debsahu May 4, 2026
b4ebf06
fix(dashboard): isolate per-page template trees so 'content' blocks d…
debsahu May 4, 2026
dd27400
fix(dashboard): add Flash field on detail-page data + $SYS filter tog…
debsahu May 4, 2026
85aee27
feat(dashboard): cluster status block on overview + mobile-friendly n…
debsahu May 4, 2026
790b0ea
feat(dashboard): Vercel theme + cluster topology graphic + leader fix
debsahu May 4, 2026
2d54d71
fix(dashboard): stop RateSampler and Bridge goroutines on shutdown
debsahu May 4, 2026
22ee912
refactor(dashboard): promote dashboard to top-level package
debsahu May 6, 2026
40aecf6
feat(dashboard): IoT-Blue palette + themed select chrome
debsahu May 6, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,11 @@ raft.db
.idea
.vscode
.claude/
docs/
/bin/
/data/
/cmd/single/data/
/cmd/cluster/data/
/single
/comqtt
/comqtt-cluster
19 changes: 19 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
TAILWIND_VERSION ?= v3.4.7
TAILWIND_BIN := bin/tailwindcss

.PHONY: dashboard
dashboard: $(TAILWIND_BIN)
$(TAILWIND_BIN) -c dashboard/web/tailwind.config.js \
-i dashboard/web/input.css \
-o dashboard/static/tailwind.css --minify

$(TAILWIND_BIN):
@mkdir -p bin
@case "$$(uname -s)-$$(uname -m)" in \
Darwin-arm64) URL="https://github.com/tailwindlabs/tailwindcss/releases/download/$(TAILWIND_VERSION)/tailwindcss-macos-arm64";; \
Darwin-x86_64) URL="https://github.com/tailwindlabs/tailwindcss/releases/download/$(TAILWIND_VERSION)/tailwindcss-macos-x64";; \
Linux-x86_64) URL="https://github.com/tailwindlabs/tailwindcss/releases/download/$(TAILWIND_VERSION)/tailwindcss-linux-x64";; \
Linux-aarch64) URL="https://github.com/tailwindlabs/tailwindcss/releases/download/$(TAILWIND_VERSION)/tailwindcss-linux-arm64";; \
*) echo "unsupported platform: $$(uname -s)-$$(uname -m)" >&2; exit 1;; \
esac; \
curl -L -o $@ $$URL && chmod +x $@
33 changes: 33 additions & 0 deletions cluster/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,39 @@ func (a *Agent) GetMemberList() []discovery.Member {
return a.membership.Members()
}

// Leader returns the node name of the current Raft leader, or "" if no
// leader is known. Used by integrations (REST handlers, dashboard) that
// need to display cluster topology.
//
// Tries the raft-reported ID first; falls back to matching the leader's
// host address against the membership list. This handles setups where
// raft.LocalID and the discovery node name differ.
func (a *Agent) Leader() string {
if a.raftPeer == nil {
return ""
}
addr, id := a.raftPeer.GetLeader()
if id != "" {
return id
}
if addr == "" {
return ""
}
host := addr
for i := len(addr) - 1; i >= 0; i-- {
if addr[i] == ':' {
host = addr[:i]
break
}
}
for _, m := range a.membership.Members() {
if m.Addr == host {
return m.Name
}
}
return ""
}

func getRaftPeerAddr(member *discovery.Member) string {
// using serf
if raftPort, ok := member.Tags[discovery.TagRaftPort]; ok {
Expand Down
116 changes: 106 additions & 10 deletions cluster/rest/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,24 @@ func New(agent *cs.Agent) *rest {

func (s *rest) GenHandlers() map[string]rt.Handler {
return map[string]rt.Handler{
"GET /api/v1/node/config": s.viewConfig,
"DELETE /api/v1/node/{name}": s.leave,
"GET /api/v1/cluster/nodes": s.getNodes,
"POST /api/v1/cluster/nodes": s.join,
"POST /api/v1/cluster/peers": s.addRaftPeer,
"DELETE /api/v1/cluster/peers/{name}": s.removeRaftPeer,
"GET /api/v1/cluster/stat/online": s.getOnlineCount,
"GET /api/v1/cluster/clients/{id}": s.getClient,
"POST /api/v1/cluster/blacklist/{id}": s.kickClient,
"DELETE /api/v1/cluster/blacklist/{id}": s.blanchClient,
"GET /api/v1/node/config": s.viewConfig,
"DELETE /api/v1/node/{name}": s.leave,
"GET /api/v1/cluster/nodes": s.getNodes,
"POST /api/v1/cluster/nodes": s.join,
"POST /api/v1/cluster/peers": s.addRaftPeer,
"DELETE /api/v1/cluster/peers/{name}": s.removeRaftPeer,
"GET /api/v1/cluster/stat/online": s.getOnlineCount,
"GET /api/v1/cluster/clients/{id}": s.getClient,
"POST /api/v1/cluster/blacklist/{id}": s.kickClient,
"DELETE /api/v1/cluster/blacklist/{id}": s.blanchClient,
"GET /api/v1/cluster/clients": s.listClients,
"GET /api/v1/cluster/subscriptions": s.listSubscriptions,
"GET /api/v1/cluster/topics": s.topicsTree,
"DELETE /api/v1/cluster/clients/{id}/subscriptions/{topic}": s.unsubscribeClient,
"GET /api/v1/cluster/retained": s.listRetained,
"DELETE /api/v1/cluster/retained/{topic}": s.clearRetained,
"GET /api/v1/cluster/sessions": s.listSessions,
"DELETE /api/v1/cluster/sessions/{id}": s.clearSession,
}
}

Expand Down Expand Up @@ -167,6 +175,94 @@ func (s *rest) blanchClient(w http.ResponseWriter, r *http.Request) {
rt.Ok(w, rs)
}

// listClients fan out client listing to every node in the cluster
// GET api/v1/cluster/clients
func (s *rest) listClients(w http.ResponseWriter, r *http.Request) {
path := rt.MqttListClientsPath
if r.URL.RawQuery != "" {
path += "?" + r.URL.RawQuery
}
urls := genUrls(s.agent.GetMemberList(), path)
rs := fetchM(HttpGet, urls, nil)
rt.Ok(w, rs)
}

// listSubscriptions fan out subscription listing to every node in the cluster
// GET api/v1/cluster/subscriptions
func (s *rest) listSubscriptions(w http.ResponseWriter, r *http.Request) {
path := rt.MqttListSubscriptionsPath
if r.URL.RawQuery != "" {
path += "?" + r.URL.RawQuery
}
urls := genUrls(s.agent.GetMemberList(), path)
rs := fetchM(HttpGet, urls, nil)
rt.Ok(w, rs)
}

// topicsTree fan out topic tree retrieval to every node in the cluster
// GET api/v1/cluster/topics
func (s *rest) topicsTree(w http.ResponseWriter, r *http.Request) {
urls := genUrls(s.agent.GetMemberList(), rt.MqttTopicsTreePath)
rs := fetchM(HttpGet, urls, nil)
rt.Ok(w, rs)
}

// unsubscribeClient fan out unsubscribe to every node in the cluster
// DELETE api/v1/cluster/clients/{id}/subscriptions/{topic}
func (s *rest) unsubscribeClient(w http.ResponseWriter, r *http.Request) {
cid := r.PathValue("id")
topic := r.PathValue("topic")
path := strings.Replace(rt.MqttUnsubscribeClientPath, "{id}", cid, 1)
path = strings.Replace(path, "{topic}", topic, 1)
urls := genUrls(s.agent.GetMemberList(), path)
rs := fetchM(HttpDelete, urls, nil)
rt.Ok(w, rs)
}

// listRetained fan out retained message listing to every node in the cluster
// GET api/v1/cluster/retained
func (s *rest) listRetained(w http.ResponseWriter, r *http.Request) {
path := rt.MqttListRetainedPath
if r.URL.RawQuery != "" {
path += "?" + r.URL.RawQuery
}
urls := genUrls(s.agent.GetMemberList(), path)
rs := fetchM(HttpGet, urls, nil)
rt.Ok(w, rs)
}

// clearRetained fan out retained clear to every node in the cluster
// DELETE api/v1/cluster/retained/{topic}
func (s *rest) clearRetained(w http.ResponseWriter, r *http.Request) {
topic := r.PathValue("topic")
path := strings.Replace(rt.MqttClearRetainedPath, "{topic}", topic, 1)
urls := genUrls(s.agent.GetMemberList(), path)
rs := fetchM(HttpDelete, urls, nil)
rt.Ok(w, rs)
}

// listSessions fan out session listing to every node in the cluster
// GET api/v1/cluster/sessions
func (s *rest) listSessions(w http.ResponseWriter, r *http.Request) {
path := rt.MqttListSessionsPath
if r.URL.RawQuery != "" {
path += "?" + r.URL.RawQuery
}
urls := genUrls(s.agent.GetMemberList(), path)
rs := fetchM(HttpGet, urls, nil)
rt.Ok(w, rs)
}

// clearSession fan out session clear to every node in the cluster
// DELETE api/v1/cluster/sessions/{id}
func (s *rest) clearSession(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
path := strings.Replace(rt.MqttClearSessionPath, "{id}", id, 1)
urls := genUrls(s.agent.GetMemberList(), path)
rs := fetchM(HttpDelete, urls, nil)
rt.Ok(w, rs)
}

// genUrls generate urls
func genUrls(ms []discovery.Member, path string) []string {
urls := make([]string, len(ms))
Expand Down
25 changes: 25 additions & 0 deletions cmd/cluster/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
coredis "github.com/wind-c/comqtt/v2/cluster/storage/redis"
"github.com/wind-c/comqtt/v2/config"
mqtt "github.com/wind-c/comqtt/v2/mqtt"
"github.com/wind-c/comqtt/v2/dashboard"
"github.com/wind-c/comqtt/v2/mqtt/hooks/auth"
"github.com/wind-c/comqtt/v2/mqtt/listeners"
mqttRt "github.com/wind-c/comqtt/v2/mqtt/rest"
Expand Down Expand Up @@ -144,6 +145,29 @@ func realMain(ctx context.Context) error {
csHls := csRt.New(agent).GenHandlers()
mqHls := mqttRt.New(server).GenHandlers()
maps.Copy(csHls, mqHls)

dashCleanup := func() {}
if cfg.Dashboard.Enabled {
dashRedis := redis.NewClient(&redis.Options{
Addr: cfg.Redis.Options.Addr,
Password: cfg.Redis.Options.Password,
DB: cfg.Redis.Options.DB,
})
dashRoutes, cleanup, err := dashboard.Routes(dashboard.Options{
Server: server,
Cluster: true,
ClusterAgent: agent,
Redis: dashRedis,
Secret: cfg.Dashboard.DecodeSecret(),
PasswordExpiryDays: cfg.Dashboard.PasswordExpiryDays,
})
if err != nil {
return fmt.Errorf("dashboard routes: %w", err)
}
dashCleanup = cleanup
maps.Copy(csHls, dashRoutes)
}

http := listeners.NewHTTP("stats", cfg.Mqtt.HTTP, nil, csHls)
onError(server.AddListener(http), "add http listener")

Expand All @@ -165,6 +189,7 @@ func realMain(ctx context.Context) error {
case <-ctx.Done():
server.Log.Warn("caught signal, stopping...")
}
dashCleanup()
agent.Stop()
server.Close()
return nil
Expand Down
23 changes: 22 additions & 1 deletion cmd/single/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/wind-c/comqtt/v2/cluster/log"
"github.com/wind-c/comqtt/v2/config"
"github.com/wind-c/comqtt/v2/mqtt"
"github.com/wind-c/comqtt/v2/dashboard"
"github.com/wind-c/comqtt/v2/mqtt/hooks/auth"
"github.com/wind-c/comqtt/v2/mqtt/hooks/storage/badger"
"github.com/wind-c/comqtt/v2/mqtt/hooks/storage/bolt"
Expand Down Expand Up @@ -135,7 +136,26 @@ func realMain(ctx context.Context) error {
onError(server.AddListener(ws), "add websocket listener")

// add http listener
http := listeners.NewHTTP("stats", cfg.Mqtt.HTTP, nil, rest.New(server).GenHandlers())
restHandlers := rest.New(server).GenHandlers()

dashCleanup := func() {}
if cfg.Dashboard.Enabled {
dashRoutes, cleanup, err := dashboard.Routes(dashboard.Options{
Server: server,
Cluster: false,
Secret: cfg.Dashboard.DecodeSecret(),
PasswordExpiryDays: cfg.Dashboard.PasswordExpiryDays,
})
if err != nil {
return fmt.Errorf("dashboard routes: %w", err)
}
dashCleanup = cleanup
for path, h := range dashRoutes {
restHandlers[path] = h
}
}

http := listeners.NewHTTP("stats", cfg.Mqtt.HTTP, nil, restHandlers)
onError(server.AddListener(http), "add http listener")

errCh := make(chan error, 1)
Expand All @@ -155,6 +175,7 @@ func realMain(ctx context.Context) error {
case <-ctx.Done():
log.Warn("caught signal, stopping...")
}
dashCleanup()
server.Close()
log.Info("main.go finished")
return nil
Expand Down
33 changes: 31 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
tls2 "crypto/tls"
"crypto/x509"
"crypto/x509/pkix"
"encoding/base64"
"encoding/pem"
"errors"
"math/big"
Expand Down Expand Up @@ -67,7 +68,9 @@ var (
)

func New() *Config {
return &Config{}
return &Config{
Dashboard: Dashboard{Enabled: true},
}
}

func Load(yamlFile string) (*Config, error) {
Expand All @@ -79,7 +82,9 @@ func Load(yamlFile string) (*Config, error) {
}

func parse(buf []byte) (*Config, error) {
conf := &Config{}
// Pre-populate defaults so omitted YAML keys keep them. yaml.v3
// overwrites only the fields that are present in the document.
conf := New()
err := yaml.Unmarshal(buf, conf)
if err != nil {
return nil, err
Expand All @@ -97,9 +102,33 @@ type Config struct {
Cluster Cluster `yaml:"cluster"`
Redis redis `yaml:"redis"`
Log log.Options `yaml:"log"`
Dashboard Dashboard `yaml:"dashboard"`
PprofEnable bool `yaml:"pprof-enable"`
}

// Dashboard holds the v1 web-dashboard wiring choices. Defaults are applied
// in dashboard.Options.applyDefaults; these YAML fields let operators
// opt out (Enabled=false), pin a session secret across restarts, or change
// the password expiry policy without code changes.
type Dashboard struct {
Enabled bool `yaml:"enabled"`
SessionSecret string `yaml:"session-secret"`
PasswordExpiryDays int `yaml:"password-expiry-days"`
}

// DecodeSecret returns the SessionSecret as raw bytes. It accepts either a
// base64-encoded string or a raw secret. Returns nil if SessionSecret is
// empty (in which case the dashboard auto-generates and persists one).
func (d *Dashboard) DecodeSecret() []byte {
if d.SessionSecret == "" {
return nil
}
if b, err := base64.StdEncoding.DecodeString(d.SessionSecret); err == nil && len(b) >= 16 {
return b
}
return []byte(d.SessionSecret)
}

type auth struct {
Way uint `yaml:"way"`
Datasource uint `yaml:"datasource"`
Expand Down
Loading
Loading