Skip to content

Commit 00f24eb

Browse files
bkaravanbenjirewis
andauthored
RSDK-9964 Resilient FTDC and directory parsing (#5220)
Co-authored-by: Benji Rewis <[email protected]>
1 parent c0458e6 commit 00f24eb

File tree

5 files changed

+198
-45
lines changed

5 files changed

+198
-45
lines changed

ftdc/custom_format.go

Lines changed: 48 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,12 @@ import (
1616
"go.viam.com/rdk/logging"
1717
)
1818

19-
// epsilon is a small value for determining whether a float is 0.0.
20-
const epsilon = 1e-9
19+
const (
20+
// epsilon is a small value for determining whether a float is 0.0.
21+
epsilon = 1e-9
22+
// nsInADay is the number of nanoseconds in a day.
23+
nsInADay = 8.64e13
24+
)
2125

2226
type schema struct {
2327
// A `Datum`s data is a map[string]any. Even if two datum's have maps with the same keys, we do
@@ -360,19 +364,27 @@ func (flatDatum *FlatDatum) asDatum() datum {
360364
return ret
361365
}
362366

363-
// Parse reads the entire contents from `rawReader` and returns a list of `Datum`. If an error
364-
// occurs, the []Datum parsed up until the place of the error will be returned, in addition to a
365-
// non-nil error.
366-
func Parse(rawReader io.Reader) ([]FlatDatum, error) {
367+
// Parse reads the entire contents from `rawReader` and returns a list of `Datum` and the
368+
// last timestamp that was read. If an error occurs, the []Datum parsed up until the place
369+
// of the error will be returned, in addition to the last-read timestamp and a non-nil
370+
// error. The last-read timestamp is useful for determining the timestamp of the file
371+
// boundary.
372+
func Parse(rawReader io.Reader) ([]FlatDatum, int64, error) {
367373
logger := logging.NewLogger("")
368374
logger.SetLevel(logging.ERROR)
369375

370376
return ParseWithLogger(rawReader, logger)
371377
}
372378

373-
// ParseWithLogger parses with a logger for output.
374-
func ParseWithLogger(rawReader io.Reader, logger logging.Logger) ([]FlatDatum, error) {
375-
ret := make([]FlatDatum, 0)
379+
// ParseWithLogger parses with a logger for output. It returns a slice of flat datums and
380+
// the last timestamp that was read. The latter is useful for determining the timestamp of
381+
// the file boundary.
382+
func ParseWithLogger(rawReader io.Reader, logger logging.Logger) (
383+
ret []FlatDatum,
384+
lastTimestampRead int64,
385+
retErr error,
386+
) {
387+
ret = make([]FlatDatum, 0)
376388

377389
// prevValues are the previous values used for producing the diff bits. This is overwritten when
378390
// a new metrics reading is made. and nilled out when the schema changes.
@@ -390,7 +402,10 @@ func ParseWithLogger(rawReader io.Reader, logger logging.Logger) ([]FlatDatum, e
390402
break
391403
}
392404

393-
return ret, err
405+
if schema == nil {
406+
retErr = errors.New("could not read first byte")
407+
return
408+
}
394409
}
395410

396411
// If the first bit of the first byte is `1`, the next block of data is a schema
@@ -416,14 +431,19 @@ func ParseWithLogger(rawReader io.Reader, logger logging.Logger) ([]FlatDatum, e
416431
prevValues = nil
417432
continue
418433
} else if schema == nil {
419-
return nil, errors.New("first byte of FTDC data must be the magic 0x1 representing a new schema")
434+
retErr = errors.New("first byte of FTDC data must be the magic 0x1 representing a new schema")
435+
return
420436
}
421437

422438
// This FTDC document is a metric document. Read the "diff bits" that describe which metrics
423439
// have changed since the prior metric document. Note, the reader is positioned on the
424440
// "packed byte" where the first bit is not a diff bit. `readDiffBits` must account for
425441
// that.
426-
diffedFieldsIndexes := readDiffBits(reader, schema)
442+
diffedFieldsIndexes, err := readDiffBits(reader, schema)
443+
if err != nil {
444+
logger.Debugw("Error reading diff bits. Returning.", "error", err.Error())
445+
return
446+
}
427447
logger.Debugw("Diff bits",
428448
"changedFieldIndexes", diffedFieldsIndexes,
429449
"changedFieldNames", schema.FieldNamesForIndexes(diffedFieldsIndexes))
@@ -432,16 +452,26 @@ func ParseWithLogger(rawReader io.Reader, logger logging.Logger) ([]FlatDatum, e
432452
var dataTime int64
433453
if err = binary.Read(reader, binary.BigEndian, &dataTime); err != nil {
434454
logger.Debugw("Error reading time", "error", err)
435-
return ret, err
455+
retErr = err
456+
return
436457
}
437458
logger.Debugw("Read time", "time", dataTime, "seconds", dataTime/1e9)
438459

460+
// If the time is lower than the previously recorded timestamp, or significantly
461+
// further ahead than the previous recorded timestamp (> a day ahead), we are in a bad
462+
// state and need to return.
463+
if lastTimestampRead != 0 && (dataTime < lastTimestampRead || dataTime > (lastTimestampRead+nsInADay)) {
464+
return
465+
}
466+
lastTimestampRead = dataTime
467+
439468
// Read the payload. There will be one float32 value for each diff bit set to `1`, i.e:
440469
// `len(diffedFields)`.
441470
data, err := readData(reader, schema, diffedFieldsIndexes, prevValues)
442471
if err != nil {
443472
logger.Debugw("Error reading data", "error", err)
444-
return ret, err
473+
retErr = err
474+
return
445475
}
446476
logger.Debugw("Read data", "data", data)
447477

@@ -458,7 +488,7 @@ func ParseWithLogger(rawReader io.Reader, logger logging.Logger) ([]FlatDatum, e
458488
logger.Debugw("Hydrated data", "data", ret[len(ret)-1].Readings)
459489
}
460490

461-
return ret, nil
491+
return
462492
}
463493

464494
func flatDatumsToDatums(inp []FlatDatum) []datum {
@@ -527,7 +557,7 @@ func readSchema(reader *bufio.Reader) (*schema, *bufio.Reader) {
527557
// metrics that have changed. Note that the first byte of the input reader is "packed" with the
528558
// schema bit. Thus the first byte can represent 7 metrics and the remaining bytes can each
529559
// represent 8 metrics.
530-
func readDiffBits(reader *bufio.Reader, schema *schema) []int {
560+
func readDiffBits(reader *bufio.Reader, schema *schema) ([]int, error) {
531561
// 1 diff bit per metric + 1 bit for the packed "schema bit".
532562
numBits := len(schema.fieldOrder) + 1
533563

@@ -539,7 +569,7 @@ func readDiffBits(reader *bufio.Reader, schema *schema) []int {
539569
diffBytes := make([]byte, numBytes)
540570
_, err := io.ReadFull(reader, diffBytes)
541571
if err != nil {
542-
panic(err)
572+
return nil, err
543573
}
544574

545575
var ret []int
@@ -561,7 +591,7 @@ func readDiffBits(reader *bufio.Reader, schema *schema) []int {
561591
}
562592
}
563593

564-
return ret
594+
return ret, nil
565595
}
566596

567597
// readData returns the "hydrated" metrics for a data reading. For example, if there are ten metrics

ftdc/custom_format_test.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,12 @@ func TestCustomFormatRoundtripBasic(t *testing.T) {
6060
datumV2.Data["s2"].(*Basic).Foo = 3
6161
ftdc.writeDatum(datumV2)
6262

63-
parsed, err := Parse(serializedData)
63+
parsed, lastTimestampRead, err := Parse(serializedData)
6464
test.That(t, err, test.ShouldBeNil)
65+
66+
// Last datum parsed had a `Time` of 3.
67+
test.That(t, lastTimestampRead, test.ShouldEqual, 3)
68+
6569
logger.Info("Parsed data:", parsed)
6670

6771
// There are four datapoints in total.
@@ -120,8 +124,12 @@ func TestCustomFormatRoundtripRich(t *testing.T) {
120124
ftdc.writeDatum(datumV2)
121125
}
122126

123-
flatDatums, err := Parse(serializedData)
127+
flatDatums, lastTimestampRead, err := Parse(serializedData)
124128
test.That(t, err, test.ShouldBeNil)
129+
130+
// Last datum parsed had a `Time` of 19 (2*numDatumsPerSchema-1).
131+
test.That(t, lastTimestampRead, test.ShouldEqual, 2*numDatumsPerSchema-1)
132+
125133
datums := flatDatumsToDatums(flatDatums)
126134
logger.Info("Parsed data:", datums)
127135

ftdc/ftdc_test.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func TestCopeWithChangingSchema(t *testing.T) {
7171
err = ftdc.writeDatum(datum)
7272
test.That(t, err, test.ShouldBeNil)
7373

74-
datums, err := Parse(ftdcData)
74+
datums, _ /*variable lastTimestampRead*/, err := Parse(ftdcData)
7575
test.That(t, err, test.ShouldBeNil)
7676
test.That(t, len(datums), test.ShouldEqual, 2)
7777

@@ -109,7 +109,7 @@ func TestCopeWithSubtleSchemaChange(t *testing.T) {
109109
err = ftdc.writeDatum(datum)
110110
test.That(t, err, test.ShouldBeNil)
111111

112-
datums, err := Parse(ftdcData)
112+
datums, _ /*variable lastTimestampRead*/, err := Parse(ftdcData)
113113
test.That(t, err, test.ShouldBeNil)
114114
test.That(t, len(datums), test.ShouldEqual, 2)
115115

@@ -160,7 +160,7 @@ func TestMapStatser(t *testing.T) {
160160
test.That(t, err, test.ShouldBeNil)
161161

162162
// Verify the contents of the ftdc data.
163-
datums, err := ParseWithLogger(ftdcData, logger)
163+
datums, _ /*variable lastTimestampRead*/, err := ParseWithLogger(ftdcData, logger)
164164
test.That(t, err, test.ShouldBeNil)
165165

166166
test.That(t, len(datums), test.ShouldEqual, 2)
@@ -223,7 +223,7 @@ func TestNestedStructs(t *testing.T) {
223223
err = ftdc.writeDatum(datum)
224224
test.That(t, err, test.ShouldBeNil)
225225

226-
flatDatums, err := ParseWithLogger(ftdcData, logger)
226+
flatDatums, _ /*variable lastTimestampRead*/, err := ParseWithLogger(ftdcData, logger)
227227
test.That(t, err, test.ShouldBeNil)
228228
datums := flatDatumsToDatums(flatDatums)
229229
test.That(t, len(datums), test.ShouldEqual, 2)
@@ -315,10 +315,14 @@ func TestCountingBytes(t *testing.T) {
315315
// Temporarily set the log level to INFO to avoid spammy logs. Debug logs during parsing are
316316
// only interesting when parsing fails.
317317
logger.SetLevel(logging.INFO)
318-
datums, err := ParseWithLogger(ftdcFile, logger)
318+
datums, lastTimestampRead, err := ParseWithLogger(ftdcFile, logger)
319319
logger.SetLevel(logging.DEBUG)
320320
test.That(t, err, test.ShouldBeNil)
321321

322+
// lastReadTimestamp must be between [0, 1000).
323+
test.That(t, lastTimestampRead, test.ShouldBeGreaterThanOrEqualTo, 0)
324+
test.That(t, lastTimestampRead, test.ShouldBeLessThan, 1000)
325+
322326
for _, flatDatum := range datums {
323327
// Each datum contains two metrics: `foo.X` and `foo.Y`. The "time" must be between [0,
324328
// 1000).

0 commit comments

Comments
 (0)