Skip to content

Commit d4b74ab

Browse files
anisha2442Anisha Aggarwaljmwample
authored
Connection Counting (#196)
Add connection counting and printing --------- Co-authored-by: Anisha Aggarwal <[email protected]> Co-authored-by: jmwample <[email protected]>
1 parent 052af6a commit d4b74ab

File tree

2 files changed

+99
-11
lines changed

2 files changed

+99
-11
lines changed

cmd/application/conns.go

Lines changed: 97 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -190,11 +190,12 @@ func (cm *connManager) handleNewTCPConn(regManager *cj.RegistrationManager, clie
190190
// buffer fill up and stopped ACKing after 8192 + (buffer size)
191191
// bytes for obfs4, as an example, that would be quite clear.
192192
_, err = io.Copy(io.Discard, clientConn)
193-
if errors.Is(err, syscall.ECONNRESET) {
193+
err = generalizeErr(err)
194+
if errors.Is(err, errConnReset) {
194195
// log reset error without client ip
195196
logger.Errorln("error occurred discarding data (read 0 B): rst")
196197
cm.discardToReset(asn, cc)
197-
} else if et, ok := err.(net.Error); ok && et.Timeout() {
198+
} else if errors.Is(err, errConnTimeout) {
198199
logger.Errorln("error occurred discarding data (read 0 B): timeout")
199200
cm.discardToTimeout(asn, cc)
200201
} else if err != nil {
@@ -222,11 +223,12 @@ readLoop:
222223
cj.Stat().ConnErr()
223224

224225
_, err = io.Copy(io.Discard, clientConn)
225-
if errors.Is(err, syscall.ECONNRESET) {
226+
err = generalizeErr(err)
227+
if errors.Is(err, errConnReset) {
226228
// log reset error without client ip
227229
logger.Errorf("error occurred discarding data (read %d B): rst\n", received.Len())
228230
cm.discardToReset(asn, cc)
229-
} else if et, ok := err.(net.Error); ok && et.Timeout() {
231+
} else if errors.Is(err, errConnTimeout) {
230232
logger.Errorf("error occurred discarding data (read %d B): timeout\n", received.Len())
231233
cm.discardToTimeout(asn, cc)
232234
} else if err != nil {
@@ -241,15 +243,16 @@ readLoop:
241243
}
242244

243245
n, err := clientConn.Read(buf[:])
246+
err = generalizeErr(err)
244247
if err != nil {
245-
if errors.Is(err, syscall.ECONNRESET) {
248+
if errors.Is(err, errConnReset) {
246249
logger.Errorf("got error while reading from connection, giving up after %d bytes: rst\n", received.Len()+n)
247250
if received.Len() == 0 {
248251
cm.createdToReset(asn, cc)
249252
} else {
250253
cm.readToReset(asn, cc)
251254
}
252-
} else if et, ok := err.(net.Error); ok && et.Timeout() {
255+
} else if errors.Is(err, errConnTimeout) {
253256
logger.Errorf("got error while reading from connection, giving up after %d bytes: timeout\n", received.Len()+n)
254257
if received.Len() == 0 {
255258
cm.createdToTimeout(asn, cc)
@@ -280,6 +283,7 @@ readLoop:
280283
transports:
281284
for i, t := range possibleTransports {
282285
reg, wrapped, err = t.WrapConnection(&received, clientConn, originalDstIP, regManager)
286+
err = generalizeErr(err)
283287
if errors.Is(err, transports.ErrTryAgain) {
284288
continue transports
285289
} else if errors.Is(err, transports.ErrNotTransport) {
@@ -364,6 +368,8 @@ type statCounts struct {
364368
numDiscardToClose int64 // Number of times connections have moved from Discard to Close
365369

366370
totalTransitions int64 // Number of all transitions tracked
371+
numNewConns int64 // Number new connections potentially handshaking
372+
numResolved int64 // Number connections that have reached a terminal state.
367373
}
368374

369375
type asnCounts struct {
@@ -385,7 +391,12 @@ func (c *connStats) PrintAndReset(logger *log.Logger) {
385391
// prevent div by 0 if thread starvation happens
386392
var epochDur float64 = math.Max(float64(time.Since(c.epochStart).Milliseconds()), 1)
387393

388-
logger.Infof("conn-stats: %d %d %d %d %d %.3f %d %.3f %d %.3f %d %.3f %d %.3f",
394+
numASNs := 0
395+
if c.geoIPMap != nil {
396+
numASNs = len(c.geoIPMap)
397+
}
398+
399+
logger.Infof("conn-stats: %d %d %d %d %d %.3f %d %.3f %d %.3f %d %.3f %d %.3f %d",
389400
atomic.LoadInt64(&c.numCreated),
390401
atomic.LoadInt64(&c.numReading),
391402
atomic.LoadInt64(&c.numChecking),
@@ -400,12 +411,13 @@ func (c *connStats) PrintAndReset(logger *log.Logger) {
400411
1000*float64(atomic.LoadInt64(&c.numErr))/epochDur,
401412
atomic.LoadInt64(&c.numClosed),
402413
1000*float64(atomic.LoadInt64(&c.numClosed))/epochDur,
414+
numASNs,
403415
)
404416

405417
for asn, counts := range c.geoIPMap {
406418
var tt float64 = math.Max(1, float64(atomic.LoadInt64(&counts.totalTransitions)))
407419

408-
logger.Infof("conn-stats-verbose: %d %s %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f",
420+
logger.Infof("conn-stats-verbose: %d %s %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %d %d %d %d",
409421
asn,
410422
counts.cc,
411423
atomic.LoadInt64(&counts.numCreatedToDiscard),
@@ -445,6 +457,10 @@ func (c *connStats) PrintAndReset(logger *log.Logger) {
445457
float64(atomic.LoadInt64(&counts.numDiscardToTimeout))/tt,
446458
float64(atomic.LoadInt64(&counts.numDiscardToError))/tt,
447459
float64(atomic.LoadInt64(&counts.numDiscardToClose))/tt,
460+
atomic.LoadInt64(&c.numNewConns),
461+
atomic.LoadInt64(&counts.numNewConns),
462+
atomic.LoadInt64(&c.numResolved),
463+
atomic.LoadInt64(&counts.numResolved),
448464
)
449465
}
450466

@@ -482,6 +498,8 @@ func (c *connStats) reset() {
482498
atomic.StoreInt64(&c.numDiscardToError, 0)
483499
atomic.StoreInt64(&c.numDiscardToClose, 0)
484500
atomic.StoreInt64(&c.totalTransitions, 0)
501+
atomic.StoreInt64(&c.numNewConns, 0)
502+
atomic.StoreInt64(&c.numResolved, 0)
485503

486504
c.geoIPMap = make(map[uint]*asnCounts)
487505

@@ -491,6 +509,7 @@ func (c *connStats) reset() {
491509
func (c *connStats) addCreated(asn uint, cc string) {
492510
// Overall tracking
493511
atomic.AddInt64(&c.numCreated, 1)
512+
atomic.AddInt64(&c.numNewConns, 1)
494513

495514
// GeoIP tracking
496515
if isValidCC(cc) {
@@ -502,6 +521,7 @@ func (c *connStats) addCreated(asn uint, cc string) {
502521
c.geoIPMap[asn].cc = cc
503522
}
504523
atomic.AddInt64(&c.geoIPMap[asn].numCreated, 1)
524+
atomic.AddInt64(&c.geoIPMap[asn].numNewConns, 1)
505525
}
506526
}
507527

@@ -557,6 +577,7 @@ func (c *connStats) createdToReset(asn uint, cc string) {
557577
atomic.AddInt64(&c.numReset, 1)
558578
atomic.AddInt64(&c.numCreatedToReset, 1)
559579
atomic.AddInt64(&c.totalTransitions, 1)
580+
atomic.AddInt64(&c.numResolved, 1)
560581

561582
// GeoIP tracking
562583
if isValidCC(cc) {
@@ -571,6 +592,7 @@ func (c *connStats) createdToReset(asn uint, cc string) {
571592
atomic.AddInt64(&c.geoIPMap[asn].numReset, 1)
572593
atomic.AddInt64(&c.geoIPMap[asn].numCreatedToReset, 1)
573594
atomic.AddInt64(&c.geoIPMap[asn].totalTransitions, 1)
595+
atomic.AddInt64(&c.geoIPMap[asn].numResolved, 1)
574596
}
575597
}
576598

@@ -580,6 +602,7 @@ func (c *connStats) createdToTimeout(asn uint, cc string) {
580602
atomic.AddInt64(&c.numTimeout, 1)
581603
atomic.AddInt64(&c.numCreatedToTimeout, 1)
582604
atomic.AddInt64(&c.totalTransitions, 1)
605+
atomic.AddInt64(&c.numResolved, 1)
583606

584607
// GeoIP tracking
585608
if isValidCC(cc) {
@@ -594,6 +617,7 @@ func (c *connStats) createdToTimeout(asn uint, cc string) {
594617
atomic.AddInt64(&c.geoIPMap[asn].numTimeout, 1)
595618
atomic.AddInt64(&c.geoIPMap[asn].numCreatedToTimeout, 1)
596619
atomic.AddInt64(&c.geoIPMap[asn].totalTransitions, 1)
620+
atomic.AddInt64(&c.geoIPMap[asn].numResolved, 1)
597621
}
598622
}
599623

@@ -603,6 +627,7 @@ func (c *connStats) createdToError(asn uint, cc string) {
603627
atomic.AddInt64(&c.numErr, 1)
604628
atomic.AddInt64(&c.numCreatedToError, 1)
605629
atomic.AddInt64(&c.totalTransitions, 1)
630+
atomic.AddInt64(&c.numResolved, 1)
606631

607632
// GeoIP tracking
608633
if isValidCC(cc) {
@@ -617,6 +642,7 @@ func (c *connStats) createdToError(asn uint, cc string) {
617642
atomic.AddInt64(&c.geoIPMap[asn].numErr, 1)
618643
atomic.AddInt64(&c.geoIPMap[asn].numCreatedToError, 1)
619644
atomic.AddInt64(&c.geoIPMap[asn].totalTransitions, 1)
645+
atomic.AddInt64(&c.geoIPMap[asn].numResolved, 1)
620646
}
621647
}
622648

@@ -649,6 +675,7 @@ func (c *connStats) readToTimeout(asn uint, cc string) {
649675
atomic.AddInt64(&c.numTimeout, 1)
650676
atomic.AddInt64(&c.numReadToTimeout, 1)
651677
atomic.AddInt64(&c.totalTransitions, 1)
678+
atomic.AddInt64(&c.numResolved, 1)
652679

653680
// GeoIP tracking
654681
if isValidCC(cc) {
@@ -663,6 +690,7 @@ func (c *connStats) readToTimeout(asn uint, cc string) {
663690
atomic.AddInt64(&c.geoIPMap[asn].numTimeout, 1)
664691
atomic.AddInt64(&c.geoIPMap[asn].numReadToTimeout, 1)
665692
atomic.AddInt64(&c.geoIPMap[asn].totalTransitions, 1)
693+
atomic.AddInt64(&c.geoIPMap[asn].numResolved, 1)
666694
}
667695
}
668696

@@ -672,6 +700,7 @@ func (c *connStats) readToReset(asn uint, cc string) {
672700
atomic.AddInt64(&c.numReset, 1)
673701
atomic.AddInt64(&c.numReadToReset, 1)
674702
atomic.AddInt64(&c.totalTransitions, 1)
703+
atomic.AddInt64(&c.numResolved, 1)
675704

676705
// GeoIP tracking
677706
if isValidCC(cc) {
@@ -686,6 +715,7 @@ func (c *connStats) readToReset(asn uint, cc string) {
686715
atomic.AddInt64(&c.geoIPMap[asn].numReset, 1)
687716
atomic.AddInt64(&c.geoIPMap[asn].numReadToReset, 1)
688717
atomic.AddInt64(&c.geoIPMap[asn].totalTransitions, 1)
718+
atomic.AddInt64(&c.geoIPMap[asn].numResolved, 1)
689719
}
690720
}
691721

@@ -695,6 +725,7 @@ func (c *connStats) readToError(asn uint, cc string) {
695725
atomic.AddInt64(&c.numErr, 1)
696726
atomic.AddInt64(&c.numReadToError, 1)
697727
atomic.AddInt64(&c.totalTransitions, 1)
728+
atomic.AddInt64(&c.numResolved, 1)
698729

699730
// GeoIP tracking
700731
if isValidCC(cc) {
@@ -709,6 +740,7 @@ func (c *connStats) readToError(asn uint, cc string) {
709740
atomic.AddInt64(&c.geoIPMap[asn].numErr, 1)
710741
atomic.AddInt64(&c.geoIPMap[asn].numReadToError, 1)
711742
atomic.AddInt64(&c.geoIPMap[asn].totalTransitions, 1)
743+
atomic.AddInt64(&c.geoIPMap[asn].numResolved, 1)
712744
}
713745
}
714746

@@ -764,6 +796,7 @@ func (c *connStats) checkToFound(asn uint, cc string) {
764796
atomic.AddInt64(&c.numFound, 1)
765797
atomic.AddInt64(&c.numCheckToFound, 1)
766798
atomic.AddInt64(&c.totalTransitions, 1)
799+
atomic.AddInt64(&c.numResolved, 1)
767800

768801
// GeoIP tracking
769802
if isValidCC(cc) {
@@ -778,6 +811,7 @@ func (c *connStats) checkToFound(asn uint, cc string) {
778811
atomic.AddInt64(&c.geoIPMap[asn].numFound, 1)
779812
atomic.AddInt64(&c.geoIPMap[asn].numCheckToFound, 1)
780813
atomic.AddInt64(&c.geoIPMap[asn].totalTransitions, 1)
814+
atomic.AddInt64(&c.geoIPMap[asn].numResolved, 1)
781815
}
782816
}
783817

@@ -787,6 +821,7 @@ func (c *connStats) checkToError(asn uint, cc string) {
787821
atomic.AddInt64(&c.numErr, 1)
788822
atomic.AddInt64(&c.numCheckToError, 1)
789823
atomic.AddInt64(&c.totalTransitions, 1)
824+
atomic.AddInt64(&c.numResolved, 1)
790825

791826
// GeoIP tracking
792827
if isValidCC(cc) {
@@ -801,6 +836,7 @@ func (c *connStats) checkToError(asn uint, cc string) {
801836
atomic.AddInt64(&c.geoIPMap[asn].numErr, 1)
802837
atomic.AddInt64(&c.geoIPMap[asn].numCheckToError, 1)
803838
atomic.AddInt64(&c.geoIPMap[asn].totalTransitions, 1)
839+
atomic.AddInt64(&c.geoIPMap[asn].numResolved, 1)
804840
}
805841
}
806842

@@ -833,6 +869,7 @@ func (c *connStats) discardToReset(asn uint, cc string) {
833869
atomic.AddInt64(&c.numReset, 1)
834870
atomic.AddInt64(&c.numDiscardToReset, 1)
835871
atomic.AddInt64(&c.totalTransitions, 1)
872+
atomic.AddInt64(&c.numResolved, 1)
836873

837874
// GeoIP tracking
838875
if isValidCC(cc) {
@@ -847,6 +884,7 @@ func (c *connStats) discardToReset(asn uint, cc string) {
847884
atomic.AddInt64(&c.geoIPMap[asn].numReset, 1)
848885
atomic.AddInt64(&c.geoIPMap[asn].numDiscardToReset, 1)
849886
atomic.AddInt64(&c.geoIPMap[asn].totalTransitions, 1)
887+
atomic.AddInt64(&c.geoIPMap[asn].numResolved, 1)
850888
}
851889
}
852890

@@ -856,6 +894,7 @@ func (c *connStats) discardToTimeout(asn uint, cc string) {
856894
atomic.AddInt64(&c.numTimeout, 1)
857895
atomic.AddInt64(&c.numDiscardToTimeout, 1)
858896
atomic.AddInt64(&c.totalTransitions, 1)
897+
atomic.AddInt64(&c.numResolved, 1)
859898

860899
// GeoIP tracking
861900
if isValidCC(cc) {
@@ -870,6 +909,7 @@ func (c *connStats) discardToTimeout(asn uint, cc string) {
870909
atomic.AddInt64(&c.geoIPMap[asn].numTimeout, 1)
871910
atomic.AddInt64(&c.geoIPMap[asn].numDiscardToTimeout, 1)
872911
atomic.AddInt64(&c.geoIPMap[asn].totalTransitions, 1)
912+
atomic.AddInt64(&c.geoIPMap[asn].numResolved, 1)
873913
}
874914
}
875915

@@ -879,6 +919,7 @@ func (c *connStats) discardToError(asn uint, cc string) {
879919
atomic.AddInt64(&c.numErr, 1)
880920
atomic.AddInt64(&c.numDiscardToError, 1)
881921
atomic.AddInt64(&c.totalTransitions, 1)
922+
atomic.AddInt64(&c.numResolved, 1)
882923

883924
// GeoIP tracking
884925
if isValidCC(cc) {
@@ -893,6 +934,7 @@ func (c *connStats) discardToError(asn uint, cc string) {
893934
atomic.AddInt64(&c.geoIPMap[asn].numErr, 1)
894935
atomic.AddInt64(&c.geoIPMap[asn].numDiscardToError, 1)
895936
atomic.AddInt64(&c.geoIPMap[asn].totalTransitions, 1)
937+
atomic.AddInt64(&c.geoIPMap[asn].numResolved, 1)
896938
}
897939
}
898940

@@ -902,6 +944,7 @@ func (c *connStats) discardToClose(asn uint, cc string) {
902944
atomic.AddInt64(&c.numClosed, 1)
903945
atomic.AddInt64(&c.numDiscardToClose, 1)
904946
atomic.AddInt64(&c.totalTransitions, 1)
947+
atomic.AddInt64(&c.numResolved, 1)
905948

906949
// GeoIP tracking
907950
if isValidCC(cc) {
@@ -916,9 +959,55 @@ func (c *connStats) discardToClose(asn uint, cc string) {
916959
atomic.AddInt64(&c.geoIPMap[asn].numClosed, 1)
917960
atomic.AddInt64(&c.geoIPMap[asn].numDiscardToClose, 1)
918961
atomic.AddInt64(&c.geoIPMap[asn].totalTransitions, 1)
962+
atomic.AddInt64(&c.geoIPMap[asn].numResolved, 1)
919963
}
920964
}
921965

922966
func isValidCC(cc string) bool {
923967
return cc != ""
924968
}
969+
970+
var (
971+
// errConnReset replaces the reset error in the halfpipe to remove ips and extra bytes
972+
errConnReset = errors.New("rst")
973+
974+
// errConnTimeout replaces the ip.timeout error in the halfpipe to remove ips and extra bytes
975+
errConnTimeout = errors.New("timeout")
976+
977+
// replaces refused error to prevent client IP logging
978+
errConnRefused = errors.New("refused")
979+
980+
// errUnreachable replaces unreachable error to prevent client IP logging
981+
errUnreachable = errors.New("unreachable")
982+
983+
// errConnAborted replaces aborted error to prevent client IP logging
984+
errConnAborted = errors.New("aborted")
985+
)
986+
987+
func generalizeErr(err error) error {
988+
switch {
989+
case err == nil:
990+
return nil
991+
case
992+
errors.Is(err, net.ErrClosed), // Errors indicating operation on something already closed.
993+
errors.Is(err, io.EOF),
994+
errors.Is(err, syscall.EPIPE),
995+
errors.Is(err, os.ErrClosed):
996+
return nil
997+
case errors.Is(err, syscall.ECONNRESET):
998+
return errConnReset
999+
case errors.Is(err, syscall.ECONNREFUSED):
1000+
return errConnRefused
1001+
case errors.Is(err, syscall.ECONNABORTED):
1002+
return errConnAborted
1003+
case errors.Is(err, syscall.EHOSTUNREACH):
1004+
return errUnreachable
1005+
default:
1006+
if errN, ok := err.(net.Error); ok && errN.Timeout() {
1007+
return errConnTimeout
1008+
}
1009+
}
1010+
1011+
// if it is not a well known error, return it
1012+
return err
1013+
}

cmd/application/conns_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package main
33
import (
44
"fmt"
55
"io"
6-
"io/ioutil"
76
golog "log"
87
"math"
98
"net"
@@ -127,7 +126,7 @@ func TestConnPrintAndReset(t *testing.T) {
127126

128127
func TestConnHandleConcurrent(t *testing.T) {
129128
// We don't actually care about what gets written
130-
logger := log.New(ioutil.Discard, "[TEST CONN STATS] ", golog.Ldate|golog.Lmicroseconds)
129+
logger := log.New(io.Discard, "[TEST CONN STATS] ", golog.Ldate|golog.Lmicroseconds)
131130

132131
testSubnetPath := conjurepath.Root + "/pkg/station/lib/test/phantom_subnets.toml"
133132
os.Setenv("PHANTOM_SUBNET_LOCATION", testSubnetPath)
@@ -194,7 +193,7 @@ func TestConnHandleConcurrent(t *testing.T) {
194193

195194
func TestConnForceRace(t *testing.T) {
196195
// We don't actually care about what gets written
197-
logger := log.New(ioutil.Discard, "[TEST CONN STATS] ", golog.Ldate|golog.Lmicroseconds)
196+
logger := log.New(io.Discard, "[TEST CONN STATS] ", golog.Ldate|golog.Lmicroseconds)
198197
cs := &connStats{geoIPMap: make(map[uint]*asnCounts)}
199198
exit := make(chan struct{})
200199

0 commit comments

Comments
 (0)