Skip to content

Commit 3ef955b

Browse files
committed
Added a background trigger to periodically start msgInLog processing. Also, some clean-ups.
1 parent a3c0e47 commit 3ef955b

File tree

3 files changed

+52
-8
lines changed

3 files changed

+52
-8
lines changed

comm/connection.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,9 @@ func ReliableConnect(remoteAddr string, tlsConfig *tls.Config, retry int) (*tls.
4141
return c, nil
4242
}
4343

44-
// ReliableSend attempts to transmit a message inbetween
45-
// pluto nodes. If the first attempt fails, the node will
46-
// try to reconnect and resend the message until successfully
47-
// transmitted.
44+
// ReliableSend attempts to transmit a message between pluto
45+
// nodes. If the first attempt fails, the node will try to
46+
// reconnect and resend the message until successfully transmitted.
4847
func ReliableSend(conn *tls.Conn, text string, remoteAddr string, tlsConfig *tls.Config, timeout int, retry int) error {
4948

5049
// TODO: Make this routine first check for closed

comm/receiver.go

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"strconv"
1212
"strings"
1313
"sync"
14+
"time"
1415

1516
"io/ioutil"
1617
)
@@ -123,6 +124,10 @@ func InitReceiver(name string, logFilePath string, vclockLogPath string, socket
123124
// initial run to check if log file contains elements.
124125
recv.msgInLog <- struct{}{}
125126

127+
// Start triggering msgInLog events periodically.
128+
recv.wg.Add(1)
129+
go recv.TriggerMsgApplier()
130+
126131
// Accept incoming messages in background.
127132
recv.wg.Add(1)
128133
go recv.AcceptIncMsgs()
@@ -223,6 +228,7 @@ func (recv *Receiver) Shutdown(downRecv chan struct{}) {
223228
recv.shutdown <- struct{}{}
224229
recv.shutdown <- struct{}{}
225230
recv.shutdown <- struct{}{}
231+
recv.shutdown <- struct{}{}
226232

227233
// Close involved channels.
228234
close(recv.incVClock)
@@ -336,6 +342,45 @@ func (recv *Receiver) AcceptIncMsgs() error {
336342
}
337343
}
338344

345+
// TriggerMsgApplier starts a timer that triggers
346+
// an msgInLog event when duration elapsed. Supposed
347+
// to routinely poke the ApplyStoredMsgs into checking
348+
// for unprocessed messages in log.
349+
func (recv *Receiver) TriggerMsgApplier() {
350+
351+
// Specify duration to wait between triggers.
352+
triggerD := 2 * time.Second
353+
354+
// Create a timer that waits for one second
355+
// to elapse and then fires.
356+
triggerT := time.NewTimer(triggerD)
357+
358+
for {
359+
360+
select {
361+
362+
// Check if a shutdown signal was sent.
363+
case <-recv.shutdown:
364+
365+
// Call done handler of wait group for this
366+
// routine on exiting this function.
367+
defer recv.wg.Done()
368+
return
369+
370+
case <-triggerT.C:
371+
372+
// If buffered channel indicating an arrived
373+
// msg is not full yet, make it full.
374+
if len(recv.msgInLog) < 1 {
375+
recv.msgInLog <- struct{}{}
376+
}
377+
378+
// Renew timer.
379+
triggerT.Reset(triggerD)
380+
}
381+
}
382+
}
383+
339384
// StoreIncMsgs takes received message string and saves
340385
// it into incoming CRDT message log file.
341386
func (recv *Receiver) StoreIncMsgs(conn net.Conn) {
@@ -358,9 +403,9 @@ func (recv *Receiver) StoreIncMsgs(conn net.Conn) {
358403
if err.Error() == "EOF" {
359404
log.Printf("[comm.StoreIncMsgs] Reading from closed connection. Ignoring.\n")
360405
return
361-
} else {
362-
log.Fatalf("[comm.StoreIncMsgs] Error while reading sync message: %s\n", err.Error())
363406
}
407+
408+
log.Fatalf("[comm.StoreIncMsgs] Error while reading sync message: %s\n", err.Error())
364409
}
365410

366411
// Remove trailing characters denoting line end.

imap/distributor-commands.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ func (distr *Distributor) Capability(c *Connection, req *Request) bool {
3939
return false
4040
}
4141

42-
// TODO: Change returned capabilites based on IMAP state of
43-
// connection, e.g. more capabilites if authenticated.
42+
// TODO: Change returned capabilities based on IMAP state of
43+
// connection, e.g. more capabilities if authenticated.
4444

4545
return true
4646
}

0 commit comments

Comments
 (0)