Skip to content

Commit e7e763e

Browse files
authored
Merge pull request #182 from bookingcom/grzkv/fuzz_and_refactoring
Misc improvements
2 parents c67bced + 0ae9b84 commit e7e763e

File tree

13 files changed

+146
-28063
lines changed

13 files changed

+146
-28063
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ jobs:
1313
with:
1414
go-version: ${{ matrix.go-version }}
1515
- name: setup linter
16-
run: curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sudo sh -s -- -b /usr/local/bin v1.45.0
16+
run: curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sudo sh -s -- -b /usr/local/bin v1.46.2
1717
- name: checkout code
1818
uses: actions/checkout@v2
1919
- name: lint

cmd/nanotube/process.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func worker(wg *sync.WaitGroup, queue <-chan []byte, rules rules.Rules, rewrites
7373

7474
func proc(s []byte, rules rules.Rules, rewrites rewrites.Rewrites, shouldNormalize bool, shouldLog bool, lg *zap.Logger,
7575
metrics *metrics.Prom) { //nolint:golint,unparam
76-
r, err := rec.ParseRecBytes(s, shouldNormalize, shouldLog, time.Now, lg)
76+
r, err := rec.ParseRec(s, shouldNormalize, shouldLog, time.Now, lg)
7777
if err != nil {
7878
lg.Info("Error parsing incoming record", zap.String("record_str", string(s)),
7979
zap.Binary("record_base64", s), zap.Error(err))
@@ -89,7 +89,7 @@ func proc(s []byte, rules rules.Rules, rewrites rewrites.Rewrites, shouldNormali
8989
}
9090

9191
for _, rec := range recs {
92-
rules.RouteRecBytes(rec, lg)
92+
rules.RouteRec(rec, lg)
9393
}
9494
}
9595

pkg/in/tcp.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ loop:
5757
}
5858

5959
// AcceptAndListenTCP listens for incoming TCP connections.
60+
// TODO: Remove after changing NK mechanics.
6061
func AcceptAndListenTCP(l net.Listener, queue chan<- []byte, term <-chan struct{},
6162
cfg *conf.Main, connWG *sync.WaitGroup, ms *metrics.Prom, lg *zap.Logger) {
6263
var wg sync.WaitGroup
@@ -101,6 +102,7 @@ loop:
101102
connWG.Done()
102103
}
103104

105+
// TODO: Remove after changing NK mechanics.
104106
func readFromConnectionTCP(wg *sync.WaitGroup, conn net.Conn, queue chan<- []byte, stop <-chan struct{}, cfg *conf.Main, ms *metrics.Prom, lg *zap.Logger) {
105107
defer wg.Done() // executed after the connection is closed
106108
defer func() {
@@ -145,6 +147,7 @@ func readFromConnectionTCPBuf(wg *sync.WaitGroup, conn net.Conn, queue chan<- []
145147
scanForRecordsTCPBuf(conn, queue, cfg, ms, lg)
146148
}
147149

150+
// TODO: Remove after changing NK mechanics.
148151
func scanForRecordsTCP(conn net.Conn, queue chan<- []byte, stop <-chan struct{}, cfg *conf.Main, ms *metrics.Prom, lg *zap.Logger) {
149152
sc := bufio.NewScanner(conn)
150153
in := make(chan []byte)
@@ -185,6 +188,9 @@ loop:
185188
func scanForRecordsTCPBuf(conn net.Conn, queue chan<- [][]byte, cfg *conf.Main, ms *metrics.Prom, lg *zap.Logger) {
186189
sc := bufio.NewScanner(conn)
187190

191+
buf := make([]byte, 2048)
192+
sc.Buffer(buf, bufio.MaxScanTokenSize)
193+
188194
qb := NewBatchChan(queue, int(cfg.MainQueueBatchSize), int(cfg.BatchFlushPerdiodSec), ms)
189195
defer qb.Close()
190196

pkg/k8s/container.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ import (
2121
"go.uber.org/zap"
2222
)
2323

24-
// TODO: Move debug logging to appropriate zap.Debug level.
25-
2624
// Cont represents a container.
2725
type Cont struct {
2826
ID string

pkg/rec/rec_bytes.go

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -10,24 +10,24 @@ import (
1010

1111
// RecBytes represents a single piece of data (a datapoint) that can be sent.
1212
type RecBytes struct { // nolint:revive
13-
Path []byte
14-
RawVal []byte // this is to avoid discrepancies in precision and formatting
15-
RawTime []byte // to avoid differences when encoding, and save time
16-
// Raw string // to avoid wasting time for serialization
13+
Path []byte
14+
Val []byte
15+
Time []byte
16+
1717
Received time.Time
1818
}
1919

20-
// ParseRecBytes parses a single datapoint record from a string. Makes sure it's valid.
20+
// ParseRec parses a single datapoint record from a string. Makes sure it's valid.
2121
// Performs normalizations.
22-
func ParseRecBytes(s []byte, normalize bool, shouldLog bool, nowF func() time.Time, lg *zap.Logger) (*RecBytes, error) {
22+
func ParseRec(s []byte, normalize bool, shouldLog bool, nowF func() time.Time, lg *zap.Logger) (*RecBytes, error) {
2323
pathStart, pathEnd, valStart, valEnd, timeStart, timeEnd, err := recFields(s)
2424
if err != nil {
2525
return nil, errors.Wrap(err, "failed to break record into fields")
2626
}
2727

2828
var path []byte
2929
if normalize {
30-
path, err = normalizePathBytes(s[pathStart:pathEnd])
30+
path, err = normalizePath(s[pathStart:pathEnd])
3131
if err != nil {
3232
return nil, errors.Wrap(err, "failed to normalize path")
3333
}
@@ -39,8 +39,8 @@ func ParseRecBytes(s []byte, normalize bool, shouldLog bool, nowF func() time.Ti
3939
Path: path,
4040
Received: nowF(),
4141
}
42-
res.RawVal = append(res.RawVal, s[valStart:valEnd]...)
43-
res.RawTime = append(res.RawTime, s[timeStart:timeEnd]...)
42+
res.Val = append(res.Val, s[valStart:valEnd]...)
43+
res.Time = append(res.Time, s[timeStart:timeEnd]...)
4444

4545
return &res, nil
4646
}
@@ -100,20 +100,19 @@ func isWhitespace(c byte) bool {
100100
// Serialize makes record into a string ready to be sent via TCP w/ line protocol.
101101
func (r *RecBytes) Serialize() []byte {
102102
// TODO (grzkv): Copy can be avoided if string was not changed
103-
res := make([]byte, 0, len(r.Path)+len(r.RawTime)+len(r.RawVal)+3)
103+
res := make([]byte, 0, len(r.Path)+len(r.Time)+len(r.Val)+3)
104104
res = append(res, r.Path...)
105105
res = append(res, ' ')
106-
res = append(res, r.RawVal...)
106+
res = append(res, r.Val...)
107107
res = append(res, ' ')
108-
res = append(res, r.RawTime...)
108+
res = append(res, r.Time...)
109109
res = append(res, '\n')
110110

111111
return res
112112
}
113113

114114
// normalizePath does path normalization as described in the docs
115-
// returns: (updated path, was any normalization done)
116-
func normalizePathBytes(s []byte) ([]byte, error) {
115+
func normalizePath(s []byte) ([]byte, error) {
117116
if len(s) == 0 {
118117
return []byte{}, nil
119118
}
@@ -181,21 +180,21 @@ func (r RecBytes) Copy() (*RecBytes, error) {
181180
cpy := &RecBytes{
182181
Received: r.Received,
183182
Path: make([]byte, len(r.Path)),
184-
RawVal: make([]byte, len(r.RawVal)),
185-
RawTime: make([]byte, len(r.RawTime)),
183+
Val: make([]byte, len(r.Val)),
184+
Time: make([]byte, len(r.Time)),
186185
}
187186

188187
n := copy(cpy.Path, r.Path)
189188
if n != len(r.Path) {
190189
return nil, errors.Errorf("did not copy full path, expected %d bytes, copied %d bytes", len(r.Path), n)
191190
}
192-
n = copy(cpy.RawVal, r.RawVal)
193-
if n != len(r.RawVal) {
194-
return nil, errors.Errorf("did not copy full value, expected %d bytes, copied %d bytes", len(r.RawVal), n)
191+
n = copy(cpy.Val, r.Val)
192+
if n != len(r.Val) {
193+
return nil, errors.Errorf("did not copy full value, expected %d bytes, copied %d bytes", len(r.Val), n)
195194
}
196-
n = copy(cpy.RawTime, r.RawTime)
197-
if n != len(r.RawTime) {
198-
return nil, errors.Errorf("did not copy full time, expected %d bytes, copied %d bytes", len(r.RawTime), n)
195+
n = copy(cpy.Time, r.Time)
196+
if n != len(r.Time) {
197+
return nil, errors.Errorf("did not copy full time, expected %d bytes, copied %d bytes", len(r.Time), n)
199198
}
200199

201200
return cpy, nil

pkg/rec/rec_fuzz.go

Lines changed: 0 additions & 15 deletions
This file was deleted.

0 commit comments

Comments
 (0)