Skip to content

Commit 40346a4

Browse files
committed
2nd iteration of refactoring
Signed-off-by: Martin Buchleitner <mabunixda@gmail.com>
1 parent a719d80 commit 40346a4

File tree

3 files changed

+154
-69
lines changed

3 files changed

+154
-69
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
all: clean fmt wattpilot_shell wattpilot_exporter
1+
all: clean fmt test wattpilot_shell wattpilot_exporter
22

33
preprocess: fmt
44
go generate ./...

wattpilot.go

Lines changed: 122 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import (
2525
)
2626

2727
const (
28-
ContextTimeout = 30 // seconds
28+
ContextTimeout = 60 // seconds
2929
ReconnectTimeout = 5 // seconds
3030

3131
EventTypeHello = "hello"
@@ -74,11 +74,14 @@ type Wattpilot struct {
7474
hashedpassword string
7575
isInitialized bool
7676
isConnected bool
77+
isConnecting atomic.Bool // New: To prevent concurrent reconnection attempts
7778
data map[string]interface{}
7879
eventHandler map[string]eventFunc
7980

80-
sendResponse chan string
81-
interrupt chan os.Signal
81+
sendResponse chan string
82+
interrupt chan os.Signal
83+
reconnectChan chan struct{} // New: To trigger immediate reconnection
84+
managerOnce sync.Once // New: To ensure connectionManager starts only once
8285

8386
notify *Pubsub
8487
logger *logrus.Logger
@@ -92,10 +95,11 @@ func New(host string, password string) *Wattpilot {
9295
password: password,
9396
hashedpassword: "",
9497

95-
connected: make(chan bool),
96-
initialized: make(chan bool),
97-
sendResponse: make(chan string),
98-
interrupt: make(chan os.Signal),
98+
connected: make(chan bool, 1),
99+
initialized: make(chan bool, 1),
100+
sendResponse: make(chan string),
101+
interrupt: make(chan os.Signal, 1),
102+
reconnectChan: make(chan struct{}, 1),
99103

100104
conn: nil,
101105
isConnected: false,
@@ -114,7 +118,7 @@ func New(host string, password string) *Wattpilot {
114118
}
115119
}
116120

117-
signal.Notify(w.interrupt, os.Interrupt) // Notify the interrupt channel for SIGINT
121+
signal.Notify(w.interrupt, os.Interrupt, syscall.SIGTERM)
118122

119123
w.eventHandler = map[string]eventFunc{
120124
EventTypeHello: w.onEventHello,
@@ -387,126 +391,177 @@ func (w *Wattpilot) RequestStatusUpdate() error {
387391
}
388392

389393
func (w *Wattpilot) Connect() error {
394+
if w.IsInitialized() {
395+
return nil
396+
}
397+
398+
w.logger.WithFields(logrus.Fields{"wattpilot": w.host}).Info("Connecting...")
399+
400+
w.managerOnce.Do(func() {
401+
go w.connectionManager()
402+
})
403+
404+
if w.connectAndWait(30 * time.Second) {
405+
return nil
406+
}
407+
408+
return errors.New("failed to connect within timeout")
409+
}
410+
411+
func (w *Wattpilot) connectAndWait(timeout time.Duration) bool {
412+
// Non-blocking trigger
413+
select {
414+
case w.reconnectChan <- struct{}{}:
415+
default:
416+
}
417+
418+
// Wait for initialization
419+
select {
420+
case <-w.initialized:
421+
return true
422+
case <-time.After(timeout):
423+
return w.IsInitialized()
424+
}
425+
}
390426

391-
if w.isConnected || w.isInitialized {
392-
w.logger.WithFields(logrus.Fields{"wattpilot": w.host}).Debug("Already Connected")
427+
func (w *Wattpilot) connectImpl() error {
428+
if w.isConnected {
393429
return nil
394430
}
395431

396-
w.logger.WithFields(logrus.Fields{"wattpilot": w.host}).Info("Connecting")
397-
var err error
432+
ctx, cancel := context.WithTimeout(context.Background(), ContextTimeout*time.Second)
433+
defer cancel()
398434

399-
conn, _, err := websocket.Dial(context.Background(), fmt.Sprintf("ws://%s/ws", w.host), nil)
435+
conn, _, err := websocket.Dial(ctx, fmt.Sprintf("ws://%s/ws", w.host), nil)
400436
if err != nil {
401437
return err
402438
}
403439
w.conn = conn
404440

405-
go w.processLoop(context.Background())
406441
go w.receiveHandler()
407442

408-
w.logger.WithFields(logrus.Fields{"wattpilot": w.host}).Trace("Waiting on initial handshake and authentication")
409-
w.isConnected = <-w.connected
410-
w.logger.WithFields(logrus.Fields{"wattpilot": w.host}).Trace("Connection is ", w.isConnected)
411-
if !w.isConnected {
412-
return errors.New("could not connect")
443+
// Wait for handshake
444+
select {
445+
case w.isConnected = <-w.connected:
446+
if !w.isConnected {
447+
w.disconnectImpl()
448+
return errors.New("authentication failed")
449+
}
450+
case <-ctx.Done():
451+
w.disconnectImpl()
452+
return errors.New("connection handshake timeout")
413453
}
414454

415-
w.logger.WithFields(logrus.Fields{"wattpilot": w.host}).Trace("Connected - waiting for initializiation...")
416-
417-
<-w.initialized
418-
419-
w.logger.WithFields(logrus.Fields{"wattpilot": w.host}).Trace("Connected - and initializiated")
455+
// Wait for initialization
456+
select {
457+
case <-w.initialized:
458+
// isInitialized is set by onEventFullStatus
459+
case <-ctx.Done():
460+
w.disconnectImpl()
461+
return errors.New("initialization timeout")
462+
}
420463

421464
return nil
422465
}
423466

424467
func (w *Wattpilot) reconnect() {
468+
if !w.isConnecting.CompareAndSwap(false, true) {
469+
return // Already reconnecting
470+
}
471+
defer w.isConnecting.Store(false)
425472

426473
if w.isConnected {
427-
err := w.RequestStatusUpdate()
428-
if err == nil && w.isInitialized {
429-
w.logger.WithFields(logrus.Fields{"wattpilot": w.host}).Info("reconnect - valid connection")
430-
return
474+
if err := w.RequestStatusUpdate(); err == nil {
475+
return // Healthy
431476
}
432-
w.logger.WithFields(logrus.Fields{"wattpilot": w.host}).Error("Full Status Update failed: ", err)
477+
w.logger.WithFields(logrus.Fields{"wattpilot": w.host}).Warn("Health check failed, reconnecting.")
433478
w.disconnectImpl()
434479
}
435480

436-
w.logger.WithFields(logrus.Fields{"wattpilot": w.host}).Debug("Reconnecting..")
437-
438-
if err := w.Connect(); err != nil {
439-
w.logger.WithFields(logrus.Fields{"wattpilot": w.host}).Debug("Reconnect failure: ", err)
440-
return
481+
w.logger.WithFields(logrus.Fields{"wattpilot": w.host}).Info("Attempting to connect...")
482+
if err := w.connectImpl(); err != nil {
483+
w.logger.WithFields(logrus.Fields{"wattpilot": w.host}).Error("Failed to connect: ", err)
484+
} else {
485+
w.logger.WithFields(logrus.Fields{"wattpilot": w.host}).Info("Successfully reconnected.")
441486
}
442-
w.logger.WithFields(logrus.Fields{"wattpilot": w.host}).Info("Successfully reconnected")
443-
444487
}
445488

446489
func (w *Wattpilot) Disconnect() {
447-
w.logger.WithFields(logrus.Fields{"wattpilot": w.host}).Info("Going to disconnect...")
448-
w.disconnectImpl()
490+
w.logger.WithFields(logrus.Fields{"wattpilot": w.host}).Info("Disconnecting client...")
449491
w.interrupt <- syscall.SIGINT
450492
}
451493

452494
func (w *Wattpilot) disconnectImpl() {
453-
w.logger.WithFields(logrus.Fields{"wattpilot": w.host}).Info("Disconnecting...")
495+
w.logger.WithFields(logrus.Fields{"wattpilot": w.host}).Info("Closing connection...")
454496

455-
if w.conn != nil {
456-
w.logger.WithFields(logrus.Fields{"wattpilot": w.host}).Trace("Closing connection...")
457-
if err := (*w.conn).CloseNow(); err != nil {
458-
w.logger.WithFields(logrus.Fields{"wattpilot": w.host}).Trace("Error on closing connection: ", err)
459-
}
497+
if w.conn == nil {
498+
return // Already disconnected
460499
}
461500

462-
w.logger.WithFields(logrus.Fields{"wattpilot": w.host}).Trace("Closed Connection")
501+
if err := (*w.conn).CloseNow(); err != nil {
502+
w.logger.WithFields(logrus.Fields{"wattpilot": w.host}).Trace("Error on closing connection: ", err)
503+
}
463504

464505
w.isInitialized = false
465506
w.isConnected = false
466507
w.conn = nil
467508
w.data = make(map[string]interface{})
509+
510+
// Drain channels to prevent blocking future operations
511+
select {
512+
case <-w.connected:
513+
default:
514+
}
515+
select {
516+
case <-w.initialized:
517+
default:
518+
}
468519
}
469520

470-
func (w *Wattpilot) processLoop(ctx context.Context) {
521+
func (w *Wattpilot) connectionManager() {
522+
w.logger.WithFields(logrus.Fields{"wattpilot": w.host}).Info("Starting connection manager.")
471523

472-
w.logger.WithFields(logrus.Fields{"wattpilot": w.host}).Info("Starting processing loop...")
473-
delayDuration := time.Duration(time.Second * ContextTimeout)
474-
delay := time.NewTimer(delayDuration)
524+
healthCheckTicker := time.NewTicker(time.Second * ContextTimeout) // Use for health checks
525+
defer healthCheckTicker.Stop()
475526

476527
for {
477528
select {
478-
case <-delay.C:
479-
w.logger.WithFields(logrus.Fields{"wattpilot": w.host}).Debug("Hello...")
480-
481-
delay.Reset(delayDuration)
482-
483-
if !w.isInitialized {
484-
w.disconnectImpl()
529+
case <-healthCheckTicker.C:
530+
if !w.isConnected || !w.isInitialized {
531+
// Connection is not healthy, attempt to reconnect
532+
select {
533+
case w.reconnectChan <- struct{}{}:
534+
default:
535+
}
485536
}
486-
w.reconnect()
487-
488-
case <-ctx.Done():
537+
case <-w.reconnectChan:
538+
time.Sleep(ReconnectTimeout * time.Second) // Wait a bit before reconnecting
539+
go w.reconnect()
489540
case <-w.interrupt:
490-
w.logger.WithFields(logrus.Fields{"wattpilot": w.host}).Trace("Stopping process loop...")
541+
w.logger.WithFields(logrus.Fields{"wattpilot": w.host}).Info("Stopping connection manager.")
491542
w.disconnectImpl()
492-
if !delay.Stop() {
493-
w.logger.WithFields(logrus.Fields{"wattpilot": w.host}).Trace("Waiting on delay...")
494-
// <-delay.C
495-
}
496-
w.logger.WithFields(logrus.Fields{"wattpilot": w.host}).Trace("Stopped process loop...")
497543
return
498544
}
499545
}
500546
}
501547

502548
func (w *Wattpilot) receiveHandler() {
503-
504549
w.logger.WithFields(logrus.Fields{"wattpilot": w.host}).Info("Starting receive handler...")
505550

506551
for {
552+
if w.conn == nil {
553+
w.logger.WithFields(logrus.Fields{"wattpilot": w.host}).Info("Connection is nil, stopping receive handler.")
554+
return
555+
}
507556
_, msg, err := w.conn.Read(context.Background())
508557
if err != nil {
509-
w.logger.WithFields(logrus.Fields{"wattpilot": w.host}).Info("Stopping receive handler...")
558+
w.logger.WithFields(logrus.Fields{"wattpilot": w.host}).Warn("Connection read error: ", err)
559+
w.disconnectImpl()
560+
select {
561+
case w.reconnectChan <- struct{}{}:
562+
default:
563+
}
564+
w.logger.WithFields(logrus.Fields{"wattpilot": w.host}).Info("Stopping receive handler.")
510565
return
511566
}
512567
data := make(map[string]interface{})
@@ -527,7 +582,6 @@ func (w *Wattpilot) receiveHandler() {
527582
funcCall(data)
528583
w.logger.WithFields(logrus.Fields{"wattpilot": w.host}).Trace("done ", msgType)
529584
}
530-
531585
}
532586

533587
func (w *Wattpilot) onEventHello(message map[string]interface{}) {

wattpilot_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
package wattpilot
22

33
import (
4+
"context"
5+
"os"
46
"testing"
7+
"time"
58

69
"github.com/sirupsen/logrus"
710
"github.com/stretchr/testify/assert"
@@ -44,3 +47,31 @@ func TestAlias(t *testing.T) {
4447
assert.NotNil(t, w.Alias())
4548
assert.Equal(t, "acs", w.LookupAlias("accessState"))
4649
}
50+
51+
func TestConnect(t *testing.T) {
52+
host := os.Getenv("WATTPILOT_HOST")
53+
pwd := os.Getenv("WATTPILOT_PASSWORD")
54+
if host == "" || pwd == "" {
55+
t.Skip("WATTPILOT_HOST and WATTPILOT_PASSWORD environment variables not set. Skipping integration test.")
56+
}
57+
58+
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
59+
defer cancel()
60+
61+
w := New(host, pwd)
62+
63+
done := make(chan error, 1)
64+
go func() {
65+
done <- w.Connect()
66+
}()
67+
68+
select {
69+
case err := <-done:
70+
assert.NoError(t, err, "Connect should not return an error")
71+
assert.True(t, w.IsInitialized(), "Wattpilot should be initialized after successful connection")
72+
case <-ctx.Done():
73+
assert.Fail(t, "Test timed out after 60 seconds", ctx.Err())
74+
}
75+
76+
w.Disconnect()
77+
}

0 commit comments

Comments
 (0)