diff --git a/compress.go b/compress.go index de9290c..ddf44fe 100644 --- a/compress.go +++ b/compress.go @@ -120,7 +120,7 @@ func (whisper *Whisper) WriteHeaderCompressed() (err error) { i += FreeCompressedArchiveInfoSize - mixSpecSize if FreeCompressedArchiveInfoSize < mixSpecSize { - panic("out of FreeCompressedArchiveInfoSize") // a panic that should never happens + panic("out of FreeCompressedArchiveInfoSize") // a panic that should never happen } } @@ -137,7 +137,11 @@ func (whisper *Whisper) WriteHeaderCompressed() (err error) { i += copy(b[i:], archive.buffer) } } - + for _, oooArchive := range whisper.oooArchives { + i += packInt(b, oooArchive.offset, i) + i += packInt(b, oooArchive.secondsPerPoint, i) + i += packInt(b, oooArchive.numberOfPoints, i) + } whisper.crc32 = crc32(b, 0) packInt(b, int(whisper.crc32), whisper.crc32Offset()) @@ -152,6 +156,7 @@ func (whisper *Whisper) WriteHeaderCompressed() (err error) { } func (whisper *Whisper) readHeaderCompressed() (err error) { + // TODO check if we can do everything in one read if _, err := whisper.file.Seek(int64(len(compressedMagicString)), 0); err != nil { return err } @@ -290,7 +295,16 @@ func (whisper *Whisper) readHeaderCompressed() (err error) { return fmt.Errorf("unable to read archive %d buffer: readed = %d want = %d", i, readed, arc.bufferSize) } } - + // reading oooArchives info, works only for single archive metrics + if whisper.compVersion >= CompVersionLongOOOSingleArchive && len(whisper.archives) == 1 { + b = make([]byte, ArchiveInfoSize) + readed, err = whisper.file.Read(b) + if err != nil || readed != ArchiveInfoSize { + err = fmt.Errorf("unable to read ooo archive metadata: %s", err) + return + } + whisper.oooArchives = append(whisper.oooArchives, unpackArchiveInfo(b)) + } return nil } @@ -473,6 +487,7 @@ func (whisper *Whisper) archiveUpdateManyCompressed(archive *archiveInfo, points baseIntervalsPerUnit, currentUnit, minInterval := archive.getBufferInfo() bufferUnitPointsCount := whisper.bufferUnitPointsCount(archive) + var oooDataPoints []dataPoint for aindex := 0; aindex < len(alignedPoints); { dp := alignedPoints[aindex] dpBaseInterval := archive.AggregateInterval(dp.interval) @@ -482,6 +497,7 @@ func (whisper *Whisper) archiveUpdateManyCompressed(archive *archiveInfo, points if minInterval != 0 && dpBaseInterval < minInterval { // TODO: check against cblock pn1.interval? archive.stats.discard.oldInterval++ aindex++ + oooDataPoints = append(oooDataPoints, dp) continue } @@ -559,6 +575,13 @@ func (whisper *Whisper) archiveUpdateManyCompressed(archive *archiveInfo, points } } } + if len(oooDataPoints) > 0 && len(whisper.oooArchives) != 0 && whisper.oooArchives[0].SecondsPerPoint() == archive.SecondsPerPoint() { + // updating OOO archive, no propagation because OOO archive exits only for single archive + if err := whisper.archiveUpdateManyDataPoints(whisper.oooArchives[0], oooDataPoints, false); err != nil { + return err + } + archive.stats.discard.oldInterval -= uint32(len(oooDataPoints)) + } return nil } diff --git a/compress_test.go b/compress_test.go index d913bbe..b3c9045 100644 --- a/compress_test.go +++ b/compress_test.go @@ -772,7 +772,6 @@ func TestCompressedWhisperBufferOOOWrite(t *testing.T) { points = append(points, p) } } - if diff := cmp.Diff(points, []TimeSeriesPoint{ {Time: 1544476080, Value: 666}, {Time: 1544476140, Value: 666}, {Time: 1544476200, Value: 666}, {Time: 1544476260, Value: 666}, {Time: 1544476320, Value: 666}, {Time: 1544477340, Value: 666}, @@ -842,7 +841,7 @@ func TestCompressedWhisperSingleRetentionOutOfOrderWrite(t *testing.T) { } cwhisper.UpdateMany(points) - // buffer us flushed, can't accept OOO data not within the buffer + // buffer is flushed, can accept OOO data in OOO archive only cwhisper.UpdateMany([]*TimeSeriesPoint{ {Value: 1000, Time: now + 1}, @@ -854,7 +853,7 @@ func TestCompressedWhisperSingleRetentionOutOfOrderWrite(t *testing.T) { } if got, want := data.Points(), []TimeSeriesPoint{ {Time: now + 0, Value: 1}, - {Time: now + 1, Value: 0}, + {Time: now + 1, Value: 1000}, {Time: now + 2, Value: 1}, }; !reflect.DeepEqual(got, want) { t.Errorf("data.Points() = %v; want %v", got, want) @@ -926,6 +925,63 @@ func TestCompressTo(t *testing.T) { } } +func TestCompressToSingleArchive(t *testing.T) { + fpath := "compress_to_single_archive.wsp" + os.Remove(fpath) + + whisper, err := CreateWithOptions( + fpath, + []*Retention{ + {secondsPerPoint: 1, numberOfPoints: 172800}, // 1s:2d + }, + Average, + 0, + &Options{Compressed: false, PointsPerBlock: 7200, InMemory: true}, + ) + if err != nil { + panic(err) + } + whisper.Close() + archive := whisper.archives[0] + var ps []*TimeSeriesPoint + for i := 0; i < archive.numberOfPoints; i++ { + start := Now().Add(time.Second * time.Duration(archive.secondsPerPoint*i) * -1) + ps = append(ps, &TimeSeriesPoint{ + // Time: int(start.Add(time.Duration(i) * time.Second).Unix()), + Time: int(start.Unix()), + // Value: float64(i), + // Value: 2000.0 + float64(rand.Intn(100000))/100.0, // skipcq: GSC-G404 + // Value: rand.NormFloat64(), // skipcq: GSC-G404 + Value: float64(rand.Intn(100000)), // skipcq: GSC-G404 + }) + } + whisper, err = OpenWithOptions(fpath, &Options{InMemory: true}) + if err != nil { + t.Fatal(err) + } + if err := whisper.UpdateMany(ps); err != nil { + t.Fatal(err) + } + if err := whisper.Close(); err != nil { + t.Fatal(err) + } + whisper.file.(*memFile).dumpOnDisk(fpath) + + whisper, err = OpenWithOptions(fpath, &Options{}) + if err != nil { + t.Fatal(err) + } + os.Remove(fpath + ".cwsp") + if err := whisper.CompressTo(fpath + ".cwsp"); err != nil { + t.Fatal(err) + } + + t.Log("go", "run", "cmd/compare.go", "-v", fpath, fpath+".cwsp") + output, err := Compare(fpath, fpath+".cwsp", 0, false, "", false, false, 2) + if err != nil { + t.Fatalf("%s: %s", err, output) + } +} func TestRandomReadWrite(t *testing.T) { // os.Remove("test_random_read_write.wsp") fileTs := time.Now().Unix() diff --git a/debug.go b/debug.go index ec337e7..0e0f9f8 100644 --- a/debug.go +++ b/debug.go @@ -94,7 +94,6 @@ func (whisper *Whisper) Dump(all, showDecompressionInfo bool) { } fmt.Printf("archives.%d.retention: %s%s\n", i, arc.Retention, agg) } - for i, arc := range whisper.archives { fmt.Printf("\nArchive %d info:\n", i) if whisper.compressed { @@ -103,11 +102,18 @@ func (whisper *Whisper) Dump(all, showDecompressionInfo bool) { arc.dumpInfoStandard() } } + if len(whisper.oooArchives) > 0 { + fmt.Printf("\nOOO archive %d info:\n", 0) + whisper.oooArchives[0].dumpInfoStandard() + } if !all { return } - + if len(whisper.oooArchives) > 0 { + fmt.Printf("\nOOO archive %d data:\n", 0) + whisper.dumpDataPointsStandard(whisper.oooArchives[0]) + } for i, arc := range whisper.archives { fmt.Printf("\nArchive %d data:\n", i) if whisper.compressed { diff --git a/whisper.go b/whisper.go index c758ec2..3609a68 100644 --- a/whisper.go +++ b/whisper.go @@ -1,5 +1,5 @@ /* - Package whisper implements Graphite's Whisper database format +Package whisper implements Graphite's Whisper database format */ package whisper @@ -45,7 +45,12 @@ const ( ) const ( BufferUnitPointsCountSingleArchive = 60 - CompVersion = 2 // with added archive buffer for single archive + CompVersionBufferForSingleArchive = 2 + CompVersionLongOOOSingleArchive = 3 + // CompVersion = 1 - initial compressed whisper format https://github.com/bom-d-van/go-whisper/blob/master/doc/compressed.md + // CompVersion = 2 - compressed whisper format with added buffer for single archive + // CompVersion = 3 - compressed whisper format with OOO archives + CompVersion = 3 ) // Note: 4 bytes long in Whisper Header, 1 byte long in Archive Header @@ -147,7 +152,7 @@ type file interface { } /* - Represents a Whisper database file. +Represents a Whisper database file. */ type Whisper struct { // file *os.File @@ -158,6 +163,7 @@ type Whisper struct { maxRetention int xFilesFactor float32 archives []*archiveInfo + oooArchives []*archiveInfo compressed bool compVersion uint8 @@ -175,10 +181,10 @@ type Whisper struct { } /* - A retention level. +A retention level. - Retention levels describe a given archive in the database. How detailed it is and how far back - it records. +Retention levels describe a given archive in the database. How detailed it is and how far back +it records. */ type Retention struct { secondsPerPoint int @@ -190,10 +196,10 @@ type Retention struct { } /* - Describes a time series in a file. +Describes a time series in a file. - The only addition this type has over a Retention is the offset at which it exists within the - whisper file. +The only addition this type has over a Retention is the offset at which it exists within the +whisper file. */ type archiveInfo struct { Retention @@ -298,13 +304,13 @@ func parseRetentionPart(retentionPart string) (int, error) { } /* - Parse a retention definition as you would find in the storage-schemas.conf of a Carbon install. - Note that this only parses a single retention definition, if you have multiple definitions (separated by a comma) - you will have to split them yourself. +Parse a retention definition as you would find in the storage-schemas.conf of a Carbon install. +Note that this only parses a single retention definition, if you have multiple definitions (separated by a comma) +you will have to split them yourself. - ParseRetentionDef("10s:14d") Retention{10, 120960} +ParseRetentionDef("10s:14d") Retention{10, 120960} - See: http://graphite.readthedocs.org/en/1.0/config-carbon.html#storage-schemas-conf +See: http://graphite.readthedocs.org/en/1.0/config-carbon.html#storage-schemas-conf */ func ParseRetentionDef(retentionDef string) (*Retention, error) { parts := strings.Split(retentionDef, ":") @@ -358,7 +364,7 @@ func (whisper *Whisper) fileReadAt(b []byte, off int64) error { } /* - Create a new Whisper database file and write it's header. +Create a new Whisper database file and write it's header. */ func Create(path string, retentions Retentions, aggregationMethod AggregationMethod, xFilesFactor float32) (whisper *Whisper, err error) { return CreateWithOptions(path, retentions, aggregationMethod, xFilesFactor, &Options{ @@ -370,7 +376,8 @@ func Create(path string, retentions Retentions, aggregationMethod AggregationMet // CreateWithOptions is more customizable create function // // avgCompressedPointSize specification order: -// Options.PointSize < Retention.avgCompressedPointSize < Options.MixAggregationSpecs.AvgCompressedPointSize +// +// Options.PointSize < Retention.avgCompressedPointSize < Options.MixAggregationSpecs.AvgCompressedPointSize func CreateWithOptions(path string, retentions Retentions, aggregationMethod AggregationMethod, xFilesFactor float32, options *Options) (whisper *Whisper, err error) { if options == nil { options = &Options{} @@ -430,35 +437,20 @@ func CreateWithOptions(path string, retentions Retentions, aggregationMethod Agg // Set the archive info for i, retention := range retentions { - archive := &archiveInfo{Retention: *retention} - - if archive.avgCompressedPointSize == 0 { - archive.avgCompressedPointSize = whisper.avgCompressedPointSize - } - if archive.blockCount == 0 { - archive.blockCount = whisper.blockCount(archive) - } - - if whisper.aggregationMethod == Mix && i > 0 { - for i, spec := range options.MixAggregationSpecs { - narchive := *archive - narchive.aggregationSpec = &MixAggregationSpec{Method: spec.Method, Percentile: spec.Percentile} - ssp := narchive.secondsPerPoint - sindex := i % len(options.MixAggregationSpecs) - if msizes := options.MixAvgCompressedPointSizes; msizes != nil && - msizes[ssp] != nil && - isGoodFloat32(msizes[ssp][sindex]) { - narchive.avgCompressedPointSize = msizes[ssp][sindex] - } - - whisper.archives = append(whisper.archives, &narchive) - } - } else { - whisper.archives = append(whisper.archives, archive) - } + addMixAggregationSpec := whisper.aggregationMethod == Mix && i > 0 + whisper.archives = append(whisper.archives, NewArchiveInfo(retention, whisper, addMixAggregationSpec, options)...) } + if whisper.compressed && len(retentions) == 1 { // long OOO works only for single archive metrics + whisper.oooArchives = append(whisper.oooArchives, NewArchiveInfo(retentions[0], whisper, false, options)...) + } offset := whisper.MetadataSize() + + // we write oooArchives for compressed whisper before the compressed archives + if whisper.compressed && len(retentions) == 1 { + whisper.oooArchives[0].offset = offset + offset += whisper.oooArchives[0].Retention.Size() + } for i, retention := range retentions { if !whisper.compressed { archive := whisper.archives[i] @@ -513,21 +505,69 @@ func CreateWithOptions(path string, retentions Retentions, aggregationMethod Agg // // compressed format ignores sparse flag if options.Sparse && !options.Compressed { - if _, err = whisper.file.Seek(int64(whisper.Size()-1), 0); err != nil { - return nil, err - } - if _, err = whisper.file.Write([]byte{0}); err != nil { + if err = whisper.allocateSparse(int64(whisper.Size() - 1)); err != nil { return nil, err } } else { - if err := allocateDiskSpace(whisper.file, whisper.Size()-whisper.MetadataSize()); err != nil { - return nil, err + if len(whisper.oooArchives) > 0 { + // allocate place for sparse ooo archive + lastOOOArchiveByte := whisper.oooArchives[0].offset + whisper.oooArchives[0].Size() - 1 + if err = whisper.allocateSparse(int64(lastOOOArchiveByte)); err != nil { + return nil, err + } + // allocate space for cwhisper + if err := allocateDiskSpace(whisper.file, whisper.Size()-whisper.MetadataSize()-whisper.oooArchives[0].Size()); err != nil { + return nil, err + } + } else { + if err := allocateDiskSpace(whisper.file, whisper.Size()-whisper.MetadataSize()); err != nil { + return nil, err + } } } return whisper, nil } +func (whisper *Whisper) allocateSparse(lastByteOffset int64) error { + if _, err := whisper.file.Seek(lastByteOffset, 0); err != nil { + return err + } + if _, err := whisper.file.Write([]byte{0}); err != nil { + return err + } + return nil +} + +func NewArchiveInfo(retention *Retention, whisper *Whisper, addMixAggregationSpec bool, options *Options) []*archiveInfo { + archive := &archiveInfo{Retention: *retention} + var archives []*archiveInfo + + if archive.avgCompressedPointSize == 0 { + archive.avgCompressedPointSize = whisper.avgCompressedPointSize + } + if archive.blockCount == 0 { + archive.blockCount = whisper.blockCount(archive) + } + if addMixAggregationSpec { + for i, spec := range options.MixAggregationSpecs { + narchive := *archive + narchive.aggregationSpec = &MixAggregationSpec{Method: spec.Method, Percentile: spec.Percentile} + ssp := narchive.secondsPerPoint + sindex := i % len(options.MixAggregationSpecs) + if msizes := options.MixAvgCompressedPointSizes; msizes != nil && + msizes[ssp] != nil && + isGoodFloat32(msizes[ssp][sindex]) { + narchive.avgCompressedPointSize = msizes[ssp][sindex] + } + archives = append(archives, &narchive) + } + return archives + } + archives = append(archives, archive) + return archives +} + func isGoodFloat32(n float32) bool { return !math.IsNaN(float64(n)) && n > 0.0 } @@ -583,7 +623,7 @@ func validateRetentions(retentions Retentions) error { } /* - Open an existing Whisper database and read it's header +Open an existing Whisper database and read it's header */ func Open(path string) (whisper *Whisper, err error) { return OpenWithOptions(path, &Options{ @@ -698,14 +738,14 @@ func (whisper *Whisper) initMetaInfo() { prevArc := whisper.archives[i-1] prevArc.next = arc - if whisper.aggregationMethod != Mix && (whisper.compVersion == 1 || whisper.compVersion == 2) { + if whisper.aggregationMethod != Mix { prevArc.bufferSize = arc.secondsPerPoint / prevArc.secondsPerPoint * PointSize * bufferCount prevArc.buffer = make([]byte, prevArc.bufferSize) } } // for OOO write for short time - if len(whisper.archives) == 1 && whisper.compVersion == 2 { - whisper.archives[0].bufferSize = 60 * PointSize * bufferCount + if len(whisper.archives) == 1 && whisper.compVersion >= CompVersionBufferForSingleArchive { + whisper.archives[0].bufferSize = BufferUnitPointsCountSingleArchive * PointSize * bufferCount whisper.archives[0].buffer = make([]byte, whisper.archives[0].bufferSize) } } @@ -734,14 +774,14 @@ func (whisper *Whisper) crc32Offset() int { } /* - Close the whisper file +Close the whisper file */ func (whisper *Whisper) Close() error { return whisper.file.Close() } /* - Calculate the total number of bytes the Whisper file should be according to the metadata. +Calculate the total number of bytes the Whisper file should be according to the metadata. */ func (whisper *Whisper) Size() int { size := whisper.MetadataSize() @@ -752,15 +792,18 @@ func (whisper *Whisper) Size() int { size += archive.Size() } } + if len(whisper.oooArchives) > 0 { + size += whisper.oooArchives[0].Size() + } return size } /* - Calculate the number of bytes the metadata section will be. +Calculate the number of bytes the metadata section will be. */ func (whisper *Whisper) MetadataSize() int { if whisper.compressed { - return len(compressedMagicString) + VersionSize + CompressedMetadataSize + (CompressedArchiveInfoSize * len(whisper.archives)) + whisper.blockRangesSize() + whisper.bufferSize() + return len(compressedMagicString) + VersionSize + CompressedMetadataSize + (CompressedArchiveInfoSize * len(whisper.archives)) + whisper.blockRangesSize() + whisper.bufferSize() + ArchiveInfoSize*len(whisper.oooArchives) } return MetadataSize + (ArchiveInfoSize * len(whisper.archives)) @@ -813,10 +856,10 @@ func (whisper *Whisper) Retentions() []Retention { } /* - Update a value in the database. +Update a value in the database. - If the timestamp is in the future or outside of the maximum retention it will - fail immediately. +If the timestamp is in the future or outside of the maximum retention it will +fail immediately. */ func (whisper *Whisper) Update(value float64, timestamp int) (err error) { // recover panics and return as error @@ -879,7 +922,7 @@ func (whisper *Whisper) UpdateMany(points []*TimeSeriesPoint) (err error) { } /* - Returns updated amount of out-of-order discarded points since opening whisper file +Returns updated amount of out-of-order discarded points since opening whisper file */ func (whisper *Whisper) GetDiscardedPointsSinceOpen() uint32 { var discardedPointsNow uint32 @@ -909,7 +952,7 @@ func (whisper *Whisper) UpdateManyForArchive(points []*TimeSeriesPoint, targetRe } }() - // sort the points, newest first + // sort the points, the newest first reversePoints(points) sort.Stable(timeSeriesPointsNewestFirst{points}) @@ -932,14 +975,14 @@ func (whisper *Whisper) UpdateManyForArchive(points []*TimeSeriesPoint, targetRe reversePoints(currentPoints) if whisper.compressed { // Backfilling lower archives is not allowed/supported for mix - // aggreation policy with the current api, a new api/parameter is - // needed to specify which aggregaton target to backfill. + // aggregation policy with the current api, a new api/parameter is + // needed to specify which aggregation target to backfill. if whisper.aggregationMethod == Mix && i > 0 { break } // TODO: add a new options to update data points in smaller chunks if - // it exceeeds certain size, so extension could be triggered + // it exceeds certain size, so extension could be triggered // properly: ChunkUpdateSize err = whisper.archiveUpdateManyCompressed(archive, currentPoints) } else { @@ -1087,9 +1130,9 @@ func packSequences(archive *archiveInfo, points []dataPoint) (intervals []int, p } /* - Calculate the offset for a given interval in an archive +Calculate the offset for a given interval in an archive - This method retrieves the baseInterval and the +This method retrieves the baseInterval and the */ func (whisper *Whisper) getPointOffset(start int, archive *archiveInfo) int64 { baseInterval := whisper.getBaseInterval(archive) @@ -1100,7 +1143,7 @@ func (whisper *Whisper) getPointOffset(start int, archive *archiveInfo) int64 { } func (whisper *Whisper) getBaseInterval(archive *archiveInfo) int { - if whisper.compressed { + if whisper.compressed && archive.buffer != nil { return unpackInt(archive.buffer) } @@ -1230,7 +1273,7 @@ func (whisper *Whisper) checkSeriesEmptyAt(start, length int64, fromTime, untilT } /* - Calculate the starting time for a whisper db. +Calculate the starting time for a whisper db. */ func (whisper *Whisper) StartTime() int { now := int(Now().Unix()) // TODO: danger of 2030 something overflow @@ -1238,7 +1281,7 @@ func (whisper *Whisper) StartTime() int { } /* - Fetch a TimeSeries for a given time span from the file. +Fetch a TimeSeries for a given time span from the file. */ func (whisper *Whisper) Fetch(fromTime, untilTime int) (timeSeries *TimeSeries, err error) { return whisper.FetchByAggregation(fromTime, untilTime, nil) @@ -1290,76 +1333,98 @@ func (whisper *Whisper) FetchByAggregation(fromTime, untilTime int, spec *MixAgg func (whisper *Whisper) fetchFromArchive(archive *archiveInfo, fromTime, untilTime int) (timeSeries *TimeSeries, err error) { fromInterval := archive.Interval(fromTime) untilInterval := archive.Interval(untilTime) - + // Zero-length time range: always include the next point + if fromInterval == untilInterval { + untilInterval += archive.SecondsPerPoint() + } + step := archive.secondsPerPoint var series []dataPoint if whisper.compressed { series, err = whisper.fetchCompressed(int64(fromInterval), int64(untilInterval), archive) if err != nil { return nil, err } + var values []float64 + if len(whisper.oooArchives) > 0 { + // math.NaN() with OOO values + values, err = whisper.fetch(untilInterval, fromInterval, step, whisper.oooArchives[0]) + if err != nil { + return nil, err + } + } else { + irange := untilInterval - fromInterval + values = make([]float64, irange/archive.secondsPerPoint) - irange := untilInterval - fromInterval - values := make([]float64, irange/archive.secondsPerPoint) - - for i := range values { - values[i] = math.NaN() + for i := range values { + values[i] = math.NaN() + } } + // fetched data can be out of order (because of buffer) // so, let's sort it out - step := archive.secondsPerPoint for _, dPoint := range series { index := (dPoint.interval - fromInterval) / archive.secondsPerPoint if index >= len(values) || index < 0 { continue } + if !math.IsNaN(values[index]) { + // OOO data in values[index], has higher priority than cwhisper data + continue + } values[index] = dPoint.value } return &TimeSeries{fromInterval, untilInterval, step, values}, nil } else { - baseInterval := whisper.getBaseInterval(archive) - - if baseInterval == 0 { - step := archive.secondsPerPoint - points := (untilInterval - fromInterval) / step - values := make([]float64, points) - for i := range values { - values[i] = math.NaN() - } - return &TimeSeries{fromInterval, untilInterval, step, values}, nil - } - - // Zero-length time range: always include the next point - if fromInterval == untilInterval { - untilInterval += archive.SecondsPerPoint() - } - - fromOffset := archive.PointOffset(baseInterval, fromInterval) - untilOffset := archive.PointOffset(baseInterval, untilInterval) - - series, err = whisper.readSeries(fromOffset, untilOffset, archive) + values, err := whisper.fetch(untilInterval, fromInterval, step, archive) if err != nil { return nil, err } + return &TimeSeries{fromInterval, untilInterval, step, values}, nil + } +} - values := make([]float64, len(series)) +func (whisper *Whisper) fetch(untilInterval int, fromInterval int, step int, archive *archiveInfo) ([]float64, error) { + var series []dataPoint + baseInterval := whisper.getBaseInterval(archive) + if baseInterval == 0 { + points := (untilInterval - fromInterval) / step + values := make([]float64, points) for i := range values { values[i] = math.NaN() } - currentInterval := fromInterval - step := archive.secondsPerPoint + return values, nil + } - for i, dPoint := range series { - if dPoint.interval == currentInterval { - values[i] = dPoint.value - } - currentInterval += step + // Zero-length time range: always include the next point + if fromInterval == untilInterval { + untilInterval += archive.SecondsPerPoint() + } + + fromOffset := archive.PointOffset(baseInterval, fromInterval) + untilOffset := archive.PointOffset(baseInterval, untilInterval) + + series, err := whisper.readSeries(fromOffset, untilOffset, archive) + if err != nil { + return nil, err + } + + values := make([]float64, len(series)) + for i := range values { + values[i] = math.NaN() + } + currentInterval := fromInterval + + for i, dPoint := range series { + if dPoint.interval == currentInterval { + values[i] = dPoint.value } - return &TimeSeries{fromInterval, untilInterval, step, values}, nil + currentInterval += step } + return values, nil } /* - Check a TimeSeries has a points for a given time span from the file. +Check a TimeSeries has a points for a given time span from the file. */ func (whisper *Whisper) CheckEmpty(fromTime, untilTime int) (exist bool, err error) { now := int(Now().Unix()) // TODO: danger of 2030 something overflow @@ -1724,8 +1789,8 @@ func getFirstDataPointStrict(b []byte) dataPoint { } /* - Implementation of modulo that works like Python - Thanks @timmow for this +Implementation of modulo that works like Python +Thanks @timmow for this */ func mod(a, b int) int { return a - (b * int(math.Floor(float64(a)/float64(b))))