Skip to content

Commit e204b84

Browse files
authored
Merge pull request #5 from numbleroot/feature/gokit-logger
Start to integrate go-kit's logger
2 parents 1a3c278 + 7dd7456 commit e204b84

36 files changed

+2811
-211
lines changed

comm/connection.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package comm
22

33
import (
44
"fmt"
5-
"log"
5+
stdlog "log"
66
"time"
77

88
"crypto/tls"
@@ -70,7 +70,7 @@ func ReliableSend(conn *tls.Conn, text string, remoteAddr string, tlsConfig *tls
7070
_, err = fmt.Fprintf(conn, "%s\r\n", text)
7171
for err != nil {
7272

73-
log.Printf("[comm.ReliableSend] Sending to node '%s' failed, trying to recover...\n", remoteAddr)
73+
stdlog.Printf("[comm.ReliableSend] Sending to node '%s' failed, trying to recover...\n", remoteAddr)
7474

7575
// Define an error we can deal with.
7676
okError := fmt.Sprintf("write tcp %s->%s: write: broken pipe", conn.LocalAddr().String(), remoteAddr)
@@ -83,7 +83,7 @@ func ReliableSend(conn *tls.Conn, text string, remoteAddr string, tlsConfig *tls
8383
return fmt.Errorf("could not reestablish connection with '%s': %s", remoteAddr, err.Error())
8484
}
8585

86-
log.Printf("[comm.ReliableSend] Reconnected to '%s'.\n", remoteAddr)
86+
stdlog.Printf("[comm.ReliableSend] Reconnected to '%s'.\n", remoteAddr)
8787

8888
// Resend message.
8989
_, err = fmt.Fprintf(conn, "%s\r\n", text)

comm/receiver-sender_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package comm_test
22

33
import (
44
"fmt"
5-
"log"
5+
stdlog "log"
66
"testing"
77
"time"
88

@@ -79,7 +79,7 @@ func TestSenderReceiver(t *testing.T) {
7979
updMsg := <-n1ApplyCRDTUpdChan
8080

8181
// Log message.
82-
log.Printf("[comm_test.TestSenderReceiver] %s: Would apply update from message here: %s\n", n1, updMsg)
82+
stdlog.Printf("[comm_test.TestSenderReceiver] %s: Would apply update from message here: %s\n", n1, updMsg)
8383

8484
// Signal success.
8585
n1DoneCRDTUpdChan <- struct{}{}
@@ -102,7 +102,7 @@ func TestSenderReceiver(t *testing.T) {
102102
updMsg := <-n2ApplyCRDTUpdChan
103103

104104
// Log message.
105-
log.Printf("[comm_test.TestSenderReceiver] %s: Would apply update from message here: %s\n", n2, updMsg)
105+
stdlog.Printf("[comm_test.TestSenderReceiver] %s: Would apply update from message here: %s\n", n2, updMsg)
106106

107107
// Signal success.
108108
n2DoneCRDTUpdChan <- struct{}{}

comm/receiver.go

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
"bytes"
66
"fmt"
77
"io"
8-
"log"
8+
stdlog "log"
99
"net"
1010
"os"
1111
"strconv"
@@ -222,7 +222,7 @@ func (recv *Receiver) Shutdown(downRecv chan struct{}) {
222222
// Wait for signal.
223223
<-downRecv
224224

225-
log.Printf("[comm.Shutdown] Receiver: shutting down...\n")
225+
stdlog.Printf("[comm.Shutdown] Receiver: shutting down...\n")
226226

227227
// Instruct other goroutines to shutdown.
228228
recv.shutdown <- struct{}{}
@@ -243,7 +243,7 @@ func (recv *Receiver) Shutdown(downRecv chan struct{}) {
243243
recv.socket.Close()
244244
recv.lock.Unlock()
245245

246-
log.Printf("[comm.Shutdown] Receiver: done!\n")
246+
stdlog.Printf("[comm.Shutdown] Receiver: done!\n")
247247
}
248248

249249
// IncVClockEntry waits for an incoming name of a node on
@@ -291,7 +291,7 @@ func (recv *Receiver) IncVClockEntry() {
291291
// Save updated vector clock to log file.
292292
err := recv.SaveVClockEntries()
293293
if err != nil {
294-
log.Fatalf("[comm.IncVClockEntry] Saving updated vector clock to file failed: %s\n", err.Error())
294+
stdlog.Fatalf("[comm.IncVClockEntry] Saving updated vector clock to file failed: %s\n", err.Error())
295295
}
296296

297297
// Send back the updated vector clock on other
@@ -401,11 +401,11 @@ func (recv *Receiver) StoreIncMsgs(conn net.Conn) {
401401
if err != nil {
402402

403403
if err.Error() == "EOF" {
404-
log.Printf("[comm.StoreIncMsgs] Reading from closed connection. Ignoring.\n")
404+
stdlog.Printf("[comm.StoreIncMsgs] Reading from closed connection. Ignoring.\n")
405405
return
406406
}
407407

408-
log.Fatalf("[comm.StoreIncMsgs] Error while reading sync message: %s\n", err.Error())
408+
stdlog.Fatalf("[comm.StoreIncMsgs] Error while reading sync message: %s\n", err.Error())
409409
}
410410

411411
// Remove trailing characters denoting line end.
@@ -418,20 +418,20 @@ func (recv *Receiver) StoreIncMsgs(conn net.Conn) {
418418
// Write it to message log file.
419419
_, err = recv.writeLog.WriteString(msgRaw)
420420
if err != nil {
421-
log.Fatalf("[comm.StoreIncMsgs] Writing to CRDT log file failed with: %s\n", err.Error())
421+
stdlog.Fatalf("[comm.StoreIncMsgs] Writing to CRDT log file failed with: %s\n", err.Error())
422422
}
423423

424424
// Append a newline symbol to just written line.
425425
newline := []byte("\n")
426426
_, err = recv.writeLog.Write(newline)
427427
if err != nil {
428-
log.Fatalf("[comm.StoreIncMsgs] Appending a newline symbol to CRDT log file failed with: %s\n", err.Error())
428+
stdlog.Fatalf("[comm.StoreIncMsgs] Appending a newline symbol to CRDT log file failed with: %s\n", err.Error())
429429
}
430430

431431
// Save to stable storage.
432432
err = recv.writeLog.Sync()
433433
if err != nil {
434-
log.Fatalf("[comm.StoreIncMsgs] Syncing CRDT log file to stable storage failed with: %s\n", err.Error())
434+
stdlog.Fatalf("[comm.StoreIncMsgs] Syncing CRDT log file to stable storage failed with: %s\n", err.Error())
435435
}
436436

437437
// Unlock mutex.
@@ -485,7 +485,7 @@ func (recv *Receiver) ApplyStoredMsgs() {
485485
// http://stackoverflow.com/a/30948278
486486
info, err := recv.updLog.Stat()
487487
if err != nil {
488-
log.Fatalf("[comm.ApplyStoredMsgs] Could not get CRDT log file information: %s\n", err.Error())
488+
stdlog.Fatalf("[comm.ApplyStoredMsgs] Could not get CRDT log file information: %s\n", err.Error())
489489
}
490490

491491
// Store accessed file size for multiple use.
@@ -501,7 +501,7 @@ func (recv *Receiver) ApplyStoredMsgs() {
501501
// Save current position of head for later use.
502502
curOffset, err := recv.updLog.Seek(0, os.SEEK_CUR)
503503
if err != nil {
504-
log.Fatalf("[comm.ApplyStoredMsgs] Error while retrieving current head position in CRDT log file: %s\n", err.Error())
504+
stdlog.Fatalf("[comm.ApplyStoredMsgs] Error while retrieving current head position in CRDT log file: %s\n", err.Error())
505505
}
506506

507507
// Calculate size of needed buffer.
@@ -515,7 +515,7 @@ func (recv *Receiver) ApplyStoredMsgs() {
515515
// Reset position to beginning of file.
516516
_, err = recv.updLog.Seek(0, os.SEEK_SET)
517517
if err != nil {
518-
log.Fatalf("[comm.ApplyStoredMsgs] Could not reset position in CRDT log file: %s\n", err.Error())
518+
stdlog.Fatalf("[comm.ApplyStoredMsgs] Could not reset position in CRDT log file: %s\n", err.Error())
519519
}
520520

521521
// Unlock log file mutex.
@@ -537,13 +537,13 @@ func (recv *Receiver) ApplyStoredMsgs() {
537537
// Copy contents of log file to prepared buffer.
538538
_, err = io.Copy(buf, recv.updLog)
539539
if err != nil {
540-
log.Fatalf("[comm.ApplyStoredMsgs] Could not copy CRDT log file contents to buffer: %s\n", err.Error())
540+
stdlog.Fatalf("[comm.ApplyStoredMsgs] Could not copy CRDT log file contents to buffer: %s\n", err.Error())
541541
}
542542

543543
// Read current message at head position from log file.
544544
msgRaw, err := buf.ReadString('\n')
545545
if (err != nil) && (err != io.EOF) {
546-
log.Fatalf("[comm.ApplyStoredMsgs] Error during extraction of first line in CRDT log file: %s\n", err.Error())
546+
stdlog.Fatalf("[comm.ApplyStoredMsgs] Error during extraction of first line in CRDT log file: %s\n", err.Error())
547547
}
548548

549549
// Save length of just read message for later use.
@@ -552,7 +552,7 @@ func (recv *Receiver) ApplyStoredMsgs() {
552552
// Parse sync message.
553553
msg, err := Parse(msgRaw)
554554
if err != nil {
555-
log.Fatalf("[comm.ApplyStoredMsgs] Error while parsing sync message: %s\n", err.Error())
555+
stdlog.Fatalf("[comm.ApplyStoredMsgs] Error while parsing sync message: %s\n", err.Error())
556556
}
557557

558558
// Initially, set apply indicator to true. This means,
@@ -611,51 +611,51 @@ func (recv *Receiver) ApplyStoredMsgs() {
611611
// Save updated vector clock to log file.
612612
err := recv.SaveVClockEntries()
613613
if err != nil {
614-
log.Fatalf("[comm.ApplyStoredMsgs] Saving updated vector clock to file failed: %s\n", err.Error())
614+
stdlog.Fatalf("[comm.ApplyStoredMsgs] Saving updated vector clock to file failed: %s\n", err.Error())
615615
}
616616

617617
// Reset head position to curOffset saved at beginning of loop.
618618
_, err = recv.updLog.Seek(curOffset, os.SEEK_SET)
619619
if err != nil {
620-
log.Fatal(err)
620+
stdlog.Fatal(err)
621621
}
622622

623623
// Copy reduced buffer contents back to current position
624624
// of CRDT log file, effectively deleting the read line.
625625
newNumOfBytes, err := io.Copy(recv.updLog, buf)
626626
if err != nil {
627-
log.Fatalf("[comm.ApplyStoredMsgs] Error during copying buffer contents back to CRDT log file: %s\n", err.Error())
627+
stdlog.Fatalf("[comm.ApplyStoredMsgs] Error during copying buffer contents back to CRDT log file: %s\n", err.Error())
628628
}
629629

630630
// Now, truncate log file size to (curOffset + newNumOfBytes),
631631
// reducing the file size by length of handled message.
632632
err = recv.updLog.Truncate((curOffset + newNumOfBytes))
633633
if err != nil {
634-
log.Fatalf("[comm.ApplyStoredMsgs] Could not truncate CRDT log file: %s\n", err.Error())
634+
stdlog.Fatalf("[comm.ApplyStoredMsgs] Could not truncate CRDT log file: %s\n", err.Error())
635635
}
636636

637637
// Sync changes to stable storage.
638638
err = recv.updLog.Sync()
639639
if err != nil {
640-
log.Fatalf("[comm.ApplyStoredMsgs] Syncing CRDT log file to stable storage failed with: %s\n", err.Error())
640+
stdlog.Fatalf("[comm.ApplyStoredMsgs] Syncing CRDT log file to stable storage failed with: %s\n", err.Error())
641641
}
642642

643643
// Reset position to beginning of file because the
644644
// chances are high that we now can proceed in order
645645
// of CRDT message log file.
646646
_, err = recv.updLog.Seek(0, os.SEEK_SET)
647647
if err != nil {
648-
log.Fatalf("[comm.ApplyStoredMsgs] Could not reset position in CRDT log file: %s\n", err.Error())
648+
stdlog.Fatalf("[comm.ApplyStoredMsgs] Could not reset position in CRDT log file: %s\n", err.Error())
649649
}
650650
} else {
651651

652-
log.Printf("[comm.ApplyStoredMsgs] Message was out of order. Next.\n")
652+
stdlog.Printf("[comm.ApplyStoredMsgs] Message was out of order. Next.\n")
653653

654654
// Set position of head to byte after just read message,
655655
// effectively delaying execution of that message.
656656
_, err = recv.updLog.Seek((curOffset + msgRawLength), os.SEEK_SET)
657657
if err != nil {
658-
log.Fatalf("[comm.ApplyStoredMsgs] Error while moving position in CRDT log file to next line: %s\n", err.Error())
658+
stdlog.Fatalf("[comm.ApplyStoredMsgs] Error while moving position in CRDT log file to next line: %s\n", err.Error())
659659
}
660660
}
661661

0 commit comments

Comments
 (0)