Skip to content

Commit 3fbd408

Browse files
authored
Update Conns Test (#201)
Handle closed errors and add created to close transition --------- authored-by: Anisha Aggarwal <[email protected]>
1 parent 2582d97 commit 3fbd408

File tree

1 file changed

+68
-3
lines changed

1 file changed

+68
-3
lines changed

cmd/application/conns.go

Lines changed: 68 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -221,14 +221,15 @@ func (cm *connManager) handleNewTCPConn(regManager *cj.RegistrationManager, clie
221221
} else if errors.Is(err, errConnTimeout) {
222222
logger.Errorln("error occurred discarding data (read 0 B): timeout")
223223
cm.discardToTimeout(asn, cc, isIPv4)
224+
} else if errors.Is(err, errConnClosed) {
225+
cm.discardToClose(asn, cc, isIPv4)
224226
} else if err != nil {
225227
//Log any other error
226228
logger.Errorln("error occurred discarding data (read 0 B):", err)
227229
cm.discardToError(asn, cc, isIPv4)
228230
} else {
229231
cm.discardToClose(asn, cc, isIPv4)
230232
}
231-
232233
return
233234
}
234235

@@ -254,6 +255,8 @@ readLoop:
254255
} else if errors.Is(err, errConnTimeout) {
255256
logger.Errorf("error occurred discarding data (read %d B): timeout\n", received.Len())
256257
cm.discardToTimeout(asn, cc, isIPv4)
258+
} else if errors.Is(err, errConnClosed) {
259+
cm.discardToClose(asn, cc, isIPv4)
257260
} else if err != nil {
258261
//Log any other error
259262
logger.Errorf("error occurred discarding data (read %d B): %v\n", received.Len(), err)
@@ -282,6 +285,13 @@ readLoop:
282285
} else {
283286
cm.readToTimeout(asn, cc, isIPv4)
284287
}
288+
} else if errors.Is(err, errConnClosed) {
289+
logger.Errorf("got error while reading from connection, giving up after %d bytes: closed\n", received.Len()+n)
290+
if received.Len() == 0 {
291+
cm.createdToClose(asn, cc, isIPv4)
292+
} else {
293+
cm.readToError(asn, cc, isIPv4)
294+
}
285295
} else {
286296
logger.Errorf("got error while reading from connection, giving up after %d bytes: %v\n", received.Len()+n, err)
287297
if received.Len() == 0 {
@@ -373,6 +383,7 @@ type statCounts struct {
373383
numCreatedToReset int64 // Number of times connections have moved from Created to Reset
374384
numCreatedToTimeout int64 // Number of times connections have moved from Created to Timeout
375385
numCreatedToError int64 // Number of times connections have moved from Created to Error
386+
numCreatedToClose int64 // Number of times connections have moved from Created to Closed
376387

377388
numReadToCheck int64 // Number of times connections have moved from Read to Check
378389
numReadToTimeout int64 // Number of times connections have moved from Read to Timeout
@@ -473,7 +484,7 @@ func (c *connStats) PrintAndReset(logger *log.Logger) {
473484
}
474485
for asn, counts := range val {
475486
var tt = math.Max(1, float64(atomic.LoadInt64(&counts.totalTransitions)))
476-
logger.Infof("conn-stats-verbose (IPv%d): %d %s %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %d %d %d %d",
487+
logger.Infof("conn-stats-verbose (IPv%d): %d %s %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %d %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f %d %d %d %d",
477488
ip_ver,
478489
asn,
479490
counts.cc,
@@ -482,6 +493,7 @@ func (c *connStats) PrintAndReset(logger *log.Logger) {
482493
atomic.LoadInt64(&counts.numCreatedToReset),
483494
atomic.LoadInt64(&counts.numCreatedToTimeout),
484495
atomic.LoadInt64(&counts.numCreatedToError),
496+
atomic.LoadInt64(&counts.numCreatedToClose),
485497
atomic.LoadInt64(&counts.numReadToCheck),
486498
atomic.LoadInt64(&counts.numReadToTimeout),
487499
atomic.LoadInt64(&counts.numReadToReset),
@@ -872,6 +884,56 @@ func (c *connStats) createdToError(asn uint, cc string, isIPv4 bool) {
872884
}
873885
}
874886

887+
func (c *connStats) createdToClose(asn uint, cc string, isIPv4 bool) {
888+
if isIPv4 {
889+
// Overall tracking
890+
atomic.AddInt64(&c.ipv4.numCreated, -1)
891+
atomic.AddInt64(&c.ipv4.numClosed, 1)
892+
atomic.AddInt64(&c.ipv4.numCreatedToClose, 1)
893+
atomic.AddInt64(&c.ipv4.totalTransitions, 1)
894+
atomic.AddInt64(&c.ipv4.numResolved, 1)
895+
896+
// GeoIP tracking
897+
if isValidCC(cc) {
898+
c.m.Lock()
899+
defer c.m.Unlock()
900+
if _, ok := c.v4geoIPMap[asn]; !ok {
901+
// We haven't seen asn before, so add it to the map
902+
c.v4geoIPMap[asn] = &asnCounts{}
903+
c.v4geoIPMap[asn].cc = cc
904+
}
905+
atomic.AddInt64(&c.v4geoIPMap[asn].numCreated, -1)
906+
atomic.AddInt64(&c.v4geoIPMap[asn].numClosed, 1)
907+
atomic.AddInt64(&c.v4geoIPMap[asn].numCreatedToClose, 1)
908+
atomic.AddInt64(&c.v4geoIPMap[asn].totalTransitions, 1)
909+
atomic.AddInt64(&c.v4geoIPMap[asn].numResolved, 1)
910+
}
911+
} else {
912+
// Overall tracking
913+
atomic.AddInt64(&c.ipv6.numCreated, -1)
914+
atomic.AddInt64(&c.ipv6.numClosed, 1)
915+
atomic.AddInt64(&c.ipv6.numCreatedToClose, 1)
916+
atomic.AddInt64(&c.ipv6.totalTransitions, 1)
917+
atomic.AddInt64(&c.ipv6.numResolved, 1)
918+
919+
// GeoIP tracking
920+
if isValidCC(cc) {
921+
c.m.Lock()
922+
defer c.m.Unlock()
923+
if _, ok := c.v6geoIPMap[asn]; !ok {
924+
// We haven't seen asn before, so add it to the map
925+
c.v6geoIPMap[asn] = &asnCounts{}
926+
c.v6geoIPMap[asn].cc = cc
927+
}
928+
atomic.AddInt64(&c.v6geoIPMap[asn].numCreated, -1)
929+
atomic.AddInt64(&c.v6geoIPMap[asn].numClosed, 1)
930+
atomic.AddInt64(&c.v6geoIPMap[asn].numCreatedToClose, 1)
931+
atomic.AddInt64(&c.v6geoIPMap[asn].totalTransitions, 1)
932+
atomic.AddInt64(&c.v6geoIPMap[asn].numResolved, 1)
933+
}
934+
}
935+
}
936+
875937
func (c *connStats) readToCheck(asn uint, cc string, isIPv4 bool) {
876938
if isIPv4 {
877939
// Overall tracking
@@ -1525,6 +1587,9 @@ var (
15251587

15261588
// errConnAborted replaces aborted error to prevent client IP logging
15271589
errConnAborted = errors.New("aborted")
1590+
1591+
// errConnClosed replaces closed errors to prevent client IP logging
1592+
errConnClosed = errors.New("closed")
15281593
)
15291594

15301595
func generalizeErr(err error) error {
@@ -1536,7 +1601,7 @@ func generalizeErr(err error) error {
15361601
errors.Is(err, io.EOF),
15371602
errors.Is(err, syscall.EPIPE),
15381603
errors.Is(err, os.ErrClosed):
1539-
return nil
1604+
return errConnClosed
15401605
case errors.Is(err, syscall.ECONNRESET):
15411606
return errConnReset
15421607
case errors.Is(err, syscall.ECONNREFUSED):

0 commit comments

Comments
 (0)