Skip to content

Commit ba2012e

Browse files
authored
Merge branch 'master' into grzkv/improving_testing_and_bench_system
2 parents 186c4a5 + e7e763e commit ba2012e

File tree

20 files changed

+405
-28119
lines changed

20 files changed

+405
-28119
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ jobs:
1313
with:
1414
go-version: ${{ matrix.go-version }}
1515
- name: setup linter
16-
run: curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sudo sh -s -- -b /usr/local/bin v1.45.0
16+
run: curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sudo sh -s -- -b /usr/local/bin v1.46.2
1717
- name: checkout code
1818
uses: actions/checkout@v2
1919
- name: lint

cmd/nanotube/process.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func worker(wg *sync.WaitGroup, queue <-chan []byte, rules rules.Rules, rewrites
7373

7474
func proc(s []byte, rules rules.Rules, rewrites rewrites.Rewrites, shouldNormalize bool, shouldLog bool, lg *zap.Logger,
7575
metrics *metrics.Prom) { //nolint:golint,unparam
76-
r, err := rec.ParseRecBytes(s, shouldNormalize, shouldLog, time.Now, lg)
76+
r, err := rec.ParseRec(s, shouldNormalize, shouldLog, time.Now, lg)
7777
if err != nil {
7878
lg.Info("Error parsing incoming record", zap.String("record_str", string(s)),
7979
zap.Binary("record_base64", s), zap.Error(err))
@@ -89,7 +89,7 @@ func proc(s []byte, rules rules.Rules, rewrites rewrites.Rewrites, shouldNormali
8989
}
9090

9191
for _, rec := range recs {
92-
rules.RouteRecBytes(rec, lg)
92+
rules.RouteRec(rec, lg)
9393
}
9494
}
9595

pkg/conf/conf.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ func MakeDefault() Main {
215215
MainQueueSize: 1000,
216216
HostQueueSize: 1000,
217217
MainQueueBatchSize: 1000,
218-
BatchFlushPerdiodSec: 1,
218+
BatchFlushPerdiodSec: 5,
219219

220220
WorkerPoolSize: 0,
221221

pkg/in/batch.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,27 +17,36 @@ type BatchChan struct {
1717
ms *metrics.Prom
1818
m sync.Mutex
1919
period time.Duration
20+
stop chan bool
2021
}
2122

2223
// NewBatchChan makes a new batched chan buffer.
23-
// It also starts a flushing goroutine in the background.
24+
// It also starts a flushing goroutine in the background if periodSec > 0.
2425
func NewBatchChan(q chan<- [][]byte, bufSize int, periodSec int, ms *metrics.Prom) *BatchChan {
2526
qb := &BatchChan{
2627
q: q,
2728
bufSize: bufSize,
2829
ms: ms,
2930
period: time.Second * time.Duration(periodSec),
31+
stop: make(chan bool),
3032
}
3133

32-
go qb.periodicFlush()
34+
if periodSec > 0 {
35+
go qb.periodicFlush()
36+
}
3337

3438
return qb
3539
}
3640

3741
func (qb *BatchChan) periodicFlush() {
3842
for {
39-
time.Sleep(qb.period)
40-
qb.Flush()
43+
select {
44+
case <-qb.stop:
45+
return
46+
default:
47+
time.Sleep(qb.period)
48+
qb.Flush()
49+
}
4150
}
4251
}
4352

@@ -65,6 +74,12 @@ func (qb *BatchChan) Flush() {
6574
qb.buf = [][]byte{}
6675
}
6776

77+
// Close closes the batch channel. It does not close the target channel.
78+
// Must be called exactly once for every new instance.
79+
func (qb *BatchChan) Close() {
80+
close(qb.stop)
81+
}
82+
6883
func (qb *BatchChan) sendToMainQBuf() {
6984
select {
7085
case qb.q <- qb.buf:

pkg/in/tcp.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ loop:
5757
}
5858

5959
// AcceptAndListenTCP listens for incoming TCP connections.
60+
// TODO: Remove after changing NK mechanics.
6061
func AcceptAndListenTCP(l net.Listener, queue chan<- []byte, term <-chan struct{},
6162
cfg *conf.Main, connWG *sync.WaitGroup, ms *metrics.Prom, lg *zap.Logger) {
6263
var wg sync.WaitGroup
@@ -101,6 +102,7 @@ loop:
101102
connWG.Done()
102103
}
103104

105+
// TODO: Remove after changing NK mechanics.
104106
func readFromConnectionTCP(wg *sync.WaitGroup, conn net.Conn, queue chan<- []byte, stop <-chan struct{}, cfg *conf.Main, ms *metrics.Prom, lg *zap.Logger) {
105107
defer wg.Done() // executed after the connection is closed
106108
defer func() {
@@ -145,6 +147,7 @@ func readFromConnectionTCPBuf(wg *sync.WaitGroup, conn net.Conn, queue chan<- []
145147
scanForRecordsTCPBuf(conn, queue, cfg, ms, lg)
146148
}
147149

150+
// TODO: Remove after changing NK mechanics.
148151
func scanForRecordsTCP(conn net.Conn, queue chan<- []byte, stop <-chan struct{}, cfg *conf.Main, ms *metrics.Prom, lg *zap.Logger) {
149152
sc := bufio.NewScanner(conn)
150153
in := make(chan []byte)
@@ -185,7 +188,11 @@ loop:
185188
func scanForRecordsTCPBuf(conn net.Conn, queue chan<- [][]byte, cfg *conf.Main, ms *metrics.Prom, lg *zap.Logger) {
186189
sc := bufio.NewScanner(conn)
187190

191+
buf := make([]byte, 2048)
192+
sc.Buffer(buf, bufio.MaxScanTokenSize)
193+
188194
qb := NewBatchChan(queue, int(cfg.MainQueueBatchSize), int(cfg.BatchFlushPerdiodSec), ms)
195+
defer qb.Close()
189196

190197
for sc.Scan() {
191198
rec := []byte{}
@@ -201,8 +208,6 @@ func scanForRecordsTCPBuf(conn net.Conn, queue chan<- [][]byte, cfg *conf.Main,
201208
qb.Push(rec)
202209
}
203210

204-
lg.Debug("finished scanning for records", zap.Stringer("addr", conn.RemoteAddr()))
205-
206211
qb.Flush()
207212
}
208213

pkg/in/udp.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ func ListenUDPBuf(conn net.PacketConn, queue chan [][]byte, stop <-chan struct{}
6666

6767
buf := make([]byte, 64*1024) // 64k is the max UDP datagram size
6868
qb := NewBatchChan(queue, int(cfg.MainQueueBatchSize), int(cfg.BatchFlushPerdiodSec), ms)
69+
defer qb.Close()
70+
6971
loop:
7072
for {
7173
select {

pkg/in/udp_test.go

Lines changed: 5 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,14 @@
11
package in
22

33
import (
4-
"bufio"
54
"bytes"
65
"fmt"
7-
"log"
86
"net"
9-
"os"
10-
"path/filepath"
117
"sync"
128
"testing"
139
"time"
1410

15-
"github.com/bookingcom/nanotube/pkg/conf"
16-
"github.com/bookingcom/nanotube/pkg/metrics"
17-
"go.uber.org/zap"
11+
"github.com/bookingcom/nanotube/pkg/test"
1812
)
1913

2014
type packetConnMock struct {
@@ -84,43 +78,12 @@ func (c *packetConnMock) SetWriteDeadline(t time.Time) error {
8478
return nil
8579
}
8680

87-
func setup(t *testing.T) (data [][]byte, ms *metrics.Prom, lg *zap.Logger) {
88-
fixturesPath := "testdata/"
89-
90-
in, err := os.Open(filepath.Join(fixturesPath, "in"))
81+
func TestUdpStreaming(t *testing.T) {
82+
data, ms, lg, err := test.Setup()
9183
if err != nil {
92-
t.Fatalf("error opening the in data file %v", err)
93-
}
94-
defer func() {
95-
err := in.Close()
96-
if err != nil {
97-
t.Fatalf("error closing in data test file: %v", err)
98-
}
99-
}()
100-
101-
scanner := bufio.NewScanner(in)
102-
for scanner.Scan() {
103-
token := scanner.Bytes()
104-
rec := make([]byte, len(token))
105-
copy(rec, token)
106-
data = append(data, rec)
107-
}
108-
109-
if err := scanner.Err(); err != nil {
110-
log.Fatalf("error while scan-reading the sample in metrics %v", err)
84+
t.Fatalf("failed to setup the test: %v", err)
11185
}
11286

113-
lg, _ = zap.NewProduction()
114-
115-
cfg := conf.MakeDefault()
116-
ms = metrics.New(&cfg)
117-
118-
return
119-
}
120-
121-
func TestUdpStreaming(t *testing.T) {
122-
data, ms, lg := setup(t)
123-
12487
stop := make(chan struct{})
12588
conn := &packetConnMock{
12689
traffic: data,
@@ -147,7 +110,7 @@ func TestUdpStreaming(t *testing.T) {
147110
wg.Wait()
148111
close(q)
149112

150-
err := <-errCh
113+
err = <-errCh
151114
if err != nil {
152115
t.Fatal(err)
153116
}

pkg/k8s/container.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ import (
2121
"go.uber.org/zap"
2222
)
2323

24-
// TODO: Move debug logging to appropriate zap.Debug level.
25-
2624
// Cont represents a container.
2725
type Cont struct {
2826
ID string

pkg/rec/rec_bytes.go

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -10,24 +10,24 @@ import (
1010

1111
// RecBytes represents a single piece of data (a datapoint) that can be sent.
1212
type RecBytes struct { // nolint:revive
13-
Path []byte
14-
RawVal []byte // this is to avoid discrepancies in precision and formatting
15-
RawTime []byte // to avoid differences when encoding, and save time
16-
// Raw string // to avoid wasting time for serialization
13+
Path []byte
14+
Val []byte
15+
Time []byte
16+
1717
Received time.Time
1818
}
1919

20-
// ParseRecBytes parses a single datapoint record from a string. Makes sure it's valid.
20+
// ParseRec parses a single datapoint record from a string. Makes sure it's valid.
2121
// Performs normalizations.
22-
func ParseRecBytes(s []byte, normalize bool, shouldLog bool, nowF func() time.Time, lg *zap.Logger) (*RecBytes, error) {
22+
func ParseRec(s []byte, normalize bool, shouldLog bool, nowF func() time.Time, lg *zap.Logger) (*RecBytes, error) {
2323
pathStart, pathEnd, valStart, valEnd, timeStart, timeEnd, err := recFields(s)
2424
if err != nil {
2525
return nil, errors.Wrap(err, "failed to break record into fields")
2626
}
2727

2828
var path []byte
2929
if normalize {
30-
path, err = normalizePathBytes(s[pathStart:pathEnd])
30+
path, err = normalizePath(s[pathStart:pathEnd])
3131
if err != nil {
3232
return nil, errors.Wrap(err, "failed to normalize path")
3333
}
@@ -39,8 +39,8 @@ func ParseRecBytes(s []byte, normalize bool, shouldLog bool, nowF func() time.Ti
3939
Path: path,
4040
Received: nowF(),
4141
}
42-
res.RawVal = append(res.RawVal, s[valStart:valEnd]...)
43-
res.RawTime = append(res.RawTime, s[timeStart:timeEnd]...)
42+
res.Val = append(res.Val, s[valStart:valEnd]...)
43+
res.Time = append(res.Time, s[timeStart:timeEnd]...)
4444

4545
return &res, nil
4646
}
@@ -100,20 +100,19 @@ func isWhitespace(c byte) bool {
100100
// Serialize makes record into a string ready to be sent via TCP w/ line protocol.
101101
func (r *RecBytes) Serialize() []byte {
102102
// TODO (grzkv): Copy can be avoided if string was not changed
103-
res := make([]byte, 0, len(r.Path)+len(r.RawTime)+len(r.RawVal)+3)
103+
res := make([]byte, 0, len(r.Path)+len(r.Time)+len(r.Val)+3)
104104
res = append(res, r.Path...)
105105
res = append(res, ' ')
106-
res = append(res, r.RawVal...)
106+
res = append(res, r.Val...)
107107
res = append(res, ' ')
108-
res = append(res, r.RawTime...)
108+
res = append(res, r.Time...)
109109
res = append(res, '\n')
110110

111111
return res
112112
}
113113

114114
// normalizePath does path normalization as described in the docs
115-
// returns: (updated path, was any normalization done)
116-
func normalizePathBytes(s []byte) ([]byte, error) {
115+
func normalizePath(s []byte) ([]byte, error) {
117116
if len(s) == 0 {
118117
return []byte{}, nil
119118
}
@@ -181,21 +180,21 @@ func (r RecBytes) Copy() (*RecBytes, error) {
181180
cpy := &RecBytes{
182181
Received: r.Received,
183182
Path: make([]byte, len(r.Path)),
184-
RawVal: make([]byte, len(r.RawVal)),
185-
RawTime: make([]byte, len(r.RawTime)),
183+
Val: make([]byte, len(r.Val)),
184+
Time: make([]byte, len(r.Time)),
186185
}
187186

188187
n := copy(cpy.Path, r.Path)
189188
if n != len(r.Path) {
190189
return nil, errors.Errorf("did not copy full path, expected %d bytes, copied %d bytes", len(r.Path), n)
191190
}
192-
n = copy(cpy.RawVal, r.RawVal)
193-
if n != len(r.RawVal) {
194-
return nil, errors.Errorf("did not copy full value, expected %d bytes, copied %d bytes", len(r.RawVal), n)
191+
n = copy(cpy.Val, r.Val)
192+
if n != len(r.Val) {
193+
return nil, errors.Errorf("did not copy full value, expected %d bytes, copied %d bytes", len(r.Val), n)
195194
}
196-
n = copy(cpy.RawTime, r.RawTime)
197-
if n != len(r.RawTime) {
198-
return nil, errors.Errorf("did not copy full time, expected %d bytes, copied %d bytes", len(r.RawTime), n)
195+
n = copy(cpy.Time, r.Time)
196+
if n != len(r.Time) {
197+
return nil, errors.Errorf("did not copy full time, expected %d bytes, copied %d bytes", len(r.Time), n)
199198
}
200199

201200
return cpy, nil

pkg/rec/rec_fuzz.go

Lines changed: 0 additions & 15 deletions
This file was deleted.

0 commit comments

Comments
 (0)