Skip to content

Commit 4c86707

Browse files
authored
feat: support nanosecond MID (phase 2) (#291)
1 parent 602c42a commit 4c86707

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+594
-189
lines changed

asyncsearcher/async_searcher.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ func newAsyncSearchInfo(r AsyncSearchRequest, list fracmanager.List) asyncSearch
183183
}
184184
ctx, cancel := context.WithCancel(context.Background())
185185
return asyncSearchInfo{
186-
Version: infoVersion1,
186+
Version: infoVersion2,
187187
Finished: false,
188188
Error: "",
189189
CanceledAt: time.Time{},
@@ -591,10 +591,10 @@ func loadAsyncRequests(dataDir string) (map[string]asyncSearchInfo, error) {
591591
if info.Version == 0 {
592592
info.Version = infoVersion1
593593
}
594-
if info.Version == infoVersion2 {
595-
info.Request.Params.From = seq.NanosToMID(uint64(info.Request.Params.From))
596-
info.Request.Params.To = seq.NanosToMID(uint64(info.Request.Params.To))
597-
info.Version = infoVersion1
594+
if info.Version == infoVersion1 {
595+
info.Request.Params.From = seq.MillisToMID(uint64(info.Request.Params.From))
596+
info.Request.Params.To = seq.MillisToMID(uint64(info.Request.Params.To))
597+
info.Version = infoVersion2
598598
}
599599

600600
info.merged.Store(areQPRsMerged[requestID])

asyncsearcher/encoding.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ var availableVersions = map[qprBinVersion]struct{}{
2727
}
2828

2929
func marshalQPR(q *seq.QPR, dst []byte) []byte {
30-
dst = append(dst, uint8(qprBinVersion1))
30+
dst = append(dst, uint8(qprBinVersion2))
3131

3232
blocksLenPos := len(dst)
3333
dst = append(dst, make([]byte, 8)...)
@@ -238,8 +238,8 @@ func unmarshalIDsDelta(dst seq.IDSources, block []byte, version qprBinVersion) (
238238
block = block[hintSize:]
239239

240240
var midValue seq.MID
241-
if version == qprBinVersion2 {
242-
midValue = seq.NanosToMID(uint64(mid))
241+
if version == qprBinVersion1 {
242+
midValue = seq.MillisToMID(uint64(mid))
243243
} else {
244244
midValue = seq.MID(mid)
245245
}
@@ -296,8 +296,8 @@ func unmarshalHistogram(src []byte, version qprBinVersion) (map[seq.MID]uint64,
296296
}
297297

298298
var midValue seq.MID
299-
if version == qprBinVersion2 {
300-
midValue = seq.NanosToMID(uint64(mid))
299+
if version == qprBinVersion1 {
300+
midValue = seq.MillisToMID(uint64(mid))
301301
} else {
302302
midValue = seq.MID(mid)
303303
}
@@ -460,8 +460,8 @@ func unmarshalAggregatableSamples(q *seq.AggregatableSamples, src []byte, versio
460460
src = tail
461461

462462
var midValue seq.MID
463-
if version == qprBinVersion2 {
464-
midValue = seq.NanosToMID(uint64(mid))
463+
if version == qprBinVersion1 {
464+
midValue = seq.MillisToMID(uint64(mid))
465465
} else {
466466
midValue = seq.MID(mid)
467467
}

asyncsearcher/encoding_test.go

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -92,34 +92,34 @@ func TestQPRMarshalUnmarshal(t *testing.T) {
9292
}
9393
}
9494

95-
// TestQPRVersion2Compatibility tests that it's possible to unmarshall and read version 2 async search result encoded.
96-
// MIDs in IDs and a histogram must be converted to milliseconds
97-
func TestQPRVersion2Compatibility(t *testing.T) {
95+
// TestQPRVersion1Compatibility tests that it's possible to unmarshall and read version 1 async search result encoded.
96+
// MIDs in IDs and a histogram must be converted from millis to nanos
97+
func TestQPRVersion1Compatibility(t *testing.T) {
9898
qpr := seq.QPR{
9999
IDs: seq.IDSources{
100100
{
101-
ID: seq.ID{MID: 1761812502573000000, RID: 34734732392},
101+
ID: seq.ID{MID: seq.MID(1761812502573), RID: 34734732392},
102102
},
103103
},
104104
Histogram: map[seq.MID]uint64{
105-
1761812502573000000: 433,
106-
1761812502463000000: 743,
105+
seq.MID(1761812502573): 433,
106+
seq.MID(1761812502463): 743,
107107
},
108108
Aggs: []seq.AggregatableSamples{
109109
{
110110
SamplesByBin: map[seq.AggBin]*seq.SamplesContainer{
111111
{Token: "_not_exists"}: {
112112
Total: 1,
113113
},
114-
{Token: "seq-db store", MID: seq.MID(1761812502953000000)}: {
114+
{Token: "seq-db store", MID: seq.MID(1761812502953)}: {
115115
Min: 3,
116116
Max: 5,
117117
Sum: 794,
118118
Total: 1,
119119
NotExists: 7,
120120
Samples: []float64{324},
121121
},
122-
{Token: "seq-db store", MID: seq.MID(1761812502456000000)}: {
122+
{Token: "seq-db store", MID: seq.MID(1761812502456)}: {
123123
Min: 2,
124124
Max: 6,
125125
Sum: 544,
@@ -132,27 +132,30 @@ func TestQPRVersion2Compatibility(t *testing.T) {
132132
},
133133
},
134134
}
135+
135136
rawQPR := marshalQPR(&qpr, nil)
136-
rawQPR[0] = uint8(qprBinVersion2)
137-
var outQpr seq.QPR
138-
tail, err := unmarshalQPR(&outQpr, rawQPR, math.MaxInt)
137+
rawQPR[0] = uint8(qprBinVersion1)
138+
139+
var outQPR seq.QPR
140+
tail, err := unmarshalQPR(&outQPR, rawQPR, math.MaxInt)
139141
require.NoError(t, err)
140142
require.Equal(t, 0, len(tail))
141-
require.Equal(t, seq.MID(1761812502573), outQpr.IDs[0].ID.MID, "mid doesn't match, should convert to milliseconds")
142143

143-
require.Equal(t, 2, len(outQpr.Histogram))
144-
require.Equal(t, uint64(433), outQpr.Histogram[seq.MID(1761812502573)], "histogram bucket doesn't match")
145-
require.Equal(t, uint64(743), outQpr.Histogram[seq.MID(1761812502463)], "histogram bucket doesn't match")
144+
require.Equal(t, seq.MID(1761812502573000000), outQPR.IDs[0].ID.MID, "mid doesn't match, should convert to nanoseconds")
145+
146+
require.Len(t, outQPR.Histogram, 2)
147+
require.Equal(t, uint64(433), outQPR.Histogram[seq.MID(1761812502573000000)], "histogram bucket doesn't match")
148+
require.Equal(t, uint64(743), outQPR.Histogram[seq.MID(1761812502463000000)], "histogram bucket doesn't match")
146149

147-
require.Equal(t, 1, len(outQpr.Aggs), "should have one AggregatableSamples")
148-
agg := outQpr.Aggs[0]
149-
require.Equal(t, 3, len(agg.SamplesByBin), "should have 3 samples in bin")
150+
require.Len(t, outQPR.Aggs, 1, "should have one AggregatableSamples")
151+
agg := outQPR.Aggs[0]
152+
require.Len(t, agg.SamplesByBin, 3, "should have 3 samples in bin")
150153

151154
notExistsBin := seq.AggBin{Token: "_not_exists"}
152155
require.Equal(t, int64(1), agg.SamplesByBin[notExistsBin].Total, "bucket doesn't match")
153-
bin1 := seq.AggBin{Token: "seq-db store", MID: seq.MID(1761812502953)}
156+
bin1 := seq.AggBin{Token: "seq-db store", MID: seq.MID(1761812502953000000)}
154157
require.Equal(t, int64(1), agg.SamplesByBin[bin1].Total, "bucket doesn't match")
155-
bin2 := seq.AggBin{Token: "seq-db store", MID: seq.MID(1761812502456)}
158+
bin2 := seq.AggBin{Token: "seq-db store", MID: seq.MID(1761812502456000000)}
156159
require.Equal(t, int64(2), agg.SamplesByBin[bin2].Total, "bucket doesn't match")
157160
}
158161

cmd/distribution/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,8 +183,8 @@ func main() {
183183
zap.String("name", info.Name()),
184184
zap.String("ver", info.Ver),
185185
zap.Uint32("docs_total", info.DocsTotal),
186-
zap.String("from", util.MsTsToESFormat(uint64(info.From))),
187-
zap.String("to", util.MsTsToESFormat(uint64(info.To))),
186+
zap.String("from", util.NsTsToESFormat(uint64(info.From))),
187+
zap.String("to", util.NsTsToESFormat(uint64(info.To))),
188188
zap.String("creation_time", util.MsTsToESFormat(info.CreationTime)),
189189
)
190190
}

config/frac_version.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,4 @@ const (
1111
BinaryDataV2
1212
)
1313

14-
const CurrentFracVersion = BinaryDataV1
14+
const CurrentFracVersion = BinaryDataV2

docs/en/internal/common.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
## Common
44

55
* ID: MID-RID .
6-
* MID - milliseconds part of ID, generated (extracted from doc) by ingestor before sending to store.
6+
* MID - nanoseconds part of ID, generated (extracted from doc) by ingestor before sending to store.
77
* RID - random part of ID, generated by ingestor before sending to store.
88
* docParam - link of ID and block position, position of doc in block.
99
* Only active fraction has meta file. It is used for restoring index in memory and in process of sealing fraction it is used to form index file.

docs/en/internal/search.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ Some basic overview of nodes:
2626
>
2727
> **ID** (document ID) - full id of a document, that you can use on proxy to find this specific doc. Consists of two parts: mid and rid.
2828
>
29-
> **MID** (milliseconds ID) - timestamp of a document. This is a timestamp, that log was written into stdout on the machine, not when it came into seq-db, meaning that it can be quite old relative to the time on the seq-db machine.
29+
> **MID** (nanoseconds ID) - timestamp of a document. This is a timestamp, that log was written into stdout on the machine, not when it came into seq-db, meaning that it can be quite old relative to the time on the seq-db machine.
3030
>
3131
> **RID** (random ID) - random part of an id
3232

docs/ru/internal/common.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
## Common
44

55
* ID: MID-RID .
6-
* MID - milliseconds part of ID, generated (extracted from doc) by ingestor before sending to store.
6+
* MID - nanoseconds part of ID, generated (extracted from doc) by ingestor before sending to store.
77
* RID - random part of ID, generated by ingestor before sending to store.
88
* docParam - link of ID and block position, position of doc in block.
99
* Only active fraction has meta file. It is used for restoring index in memory and in process of sealing fraction it is used to form index file.

docs/ru/internal/search.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ Some basic overview of nodes:
2626
>
2727
> **ID** (document ID) - full id of a document, that you can use on proxy to find this specific doc. Consists of two parts: mid and rid.
2828
>
29-
> **MID** (milliseconds ID) - timestamp of a document. This is a timestamp, that log was written into stdout on the machine, not when it came into seq-db, meaning that it can be quite old relative to the time on the seq-db machine.
29+
> **MID** (nanoseconds ID) - timestamp of a document. This is a timestamp, that log was written into stdout on the machine, not when it came into seq-db, meaning that it can be quite old relative to the time on the seq-db machine.
3030
>
3131
> **RID** (random ID) - random part of an id
3232

frac/common/info.go

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package common
22

33
import (
4+
"encoding/json"
45
"fmt"
56
"math"
67
"path"
@@ -80,7 +81,7 @@ func (s *Info) BuildDistribution(mids []uint64) {
8081
}
8182

8283
func (s *Info) InitEmptyDistribution() bool {
83-
from := time.UnixMilli(int64(s.From))
84+
from := s.From.Time()
8485
creationTime := time.UnixMilli(int64(s.CreationTime))
8586
if creationTime.Sub(from) < DistributionSpreadThreshold { // no big spread in past
8687
return false
@@ -117,3 +118,35 @@ func (s *Info) IsIntersecting(from, to seq.MID) bool {
117118
// check with distribution
118119
return s.Distribution.IsIntersecting(from, to)
119120
}
121+
122+
// MarshalJSON implements custom JSON marshaling to always store From and To in milliseconds
123+
func (s *Info) MarshalJSON() ([]byte, error) {
124+
type TmpInfo Info // type alias to avoid infinite recursion
125+
126+
tmp := TmpInfo(*s)
127+
128+
// We convert "from" and "to" to milliseconds in order to guarantee we can rollback on deploy.
129+
// When converting nanos to millis we must round "from" down (floor) and round "to" up (ceiling).
130+
// This guarantees that a fraction time range (checked on search with Contains and IsIntersecting methods) is not narrowed down,
131+
// and we do not lose messages on search.
132+
tmp.From = seq.MID(seq.MIDToMillis(s.From))
133+
tmp.To = seq.MID(seq.MIDToCeilingMillis(s.To))
134+
135+
return json.Marshal(tmp)
136+
}
137+
138+
// UnmarshalJSON implements custom JSON unmarshaling to convert From and To from milliseconds to nanoseconds
139+
func (s *Info) UnmarshalJSON(data []byte) error {
140+
type TmpInfo Info // type alias to avoid infinite recursion
141+
var tmp TmpInfo
142+
143+
err := json.Unmarshal(data, &tmp)
144+
if err != nil {
145+
return err
146+
}
147+
148+
*s = Info(tmp)
149+
s.From = seq.MillisToMID(uint64(tmp.From))
150+
s.To = seq.MillisToMID(uint64(tmp.To))
151+
return nil
152+
}

0 commit comments

Comments
 (0)