Skip to content

Commit 6a6a996

Browse files
authored
Merge branch 'main' into feature/add-optional-time-range-manifest
2 parents eb0e2e4 + f55e3e4 commit 6a6a996

File tree

14 files changed

+384
-240
lines changed

14 files changed

+384
-240
lines changed

CHANGES.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ Release Notes.
1919
- Add replication integration test for measure.
2020
- Activate the property repair mechanism by default.
2121
- Add snapshot time retention policy to ensure the snapshot only can be deleted after the configured minimum age(time).
22+
- **Breaking Change**: Change the data storage path structure for property model:
23+
- From: `<data-dir>/property/data/shard-<id>/...`
24+
- To: `<data-dir>/property/data/<group>/shard-<id>/...`
2225

2326
### Bug Fixes
2427

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
// Licensed to Apache Software Foundation (ASF) under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Apache Software Foundation (ASF) licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package lifecycle
19+
20+
import (
21+
"context"
22+
23+
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
24+
)
25+
26+
func (l *lifecycleService) GetCurrentNode(_ context.Context, _ *databasev1.GetCurrentNodeRequest) (*databasev1.GetCurrentNodeResponse, error) {
27+
return &databasev1.GetCurrentNodeResponse{
28+
Node: l.currentNode,
29+
}, nil
30+
}

banyand/backup/lifecycle/service.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import (
4141
"google.golang.org/grpc/credentials/insecure"
4242
"google.golang.org/grpc/health"
4343
"google.golang.org/grpc/health/grpc_health_v1"
44+
"google.golang.org/protobuf/types/known/timestamppb"
4445

4546
"github.com/apache/skywalking-banyandb/api/common"
4647
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
@@ -70,6 +71,7 @@ var _ service = (*lifecycleService)(nil)
7071

7172
type lifecycleService struct {
7273
databasev1.UnimplementedClusterStateServiceServer
74+
databasev1.UnimplementedNodeQueryServiceServer
7375
metadata metadata.Repo
7476
omr observability.MetricsRegistry
7577
pm protector.Memory
@@ -79,6 +81,7 @@ type lifecycleService struct {
7981
grpcServer *grpclib.Server
8082
httpSrv *http.Server
8183
tlsReloader *pkgtls.Reloader
84+
currentNode *databasev1.Node
8285
clientCloser context.CancelFunc
8386
stopCh chan struct{}
8487
measureRoot string
@@ -165,6 +168,16 @@ func (l *lifecycleService) Validate() error {
165168
return errors.New("missing key file when TLS is enabled")
166169
}
167170
}
171+
l.currentNode = &databasev1.Node{
172+
Metadata: &commonv1.Metadata{
173+
Name: l.lifecycleGRPCAddr,
174+
},
175+
GrpcAddress: l.lifecycleGRPCAddr,
176+
HttpAddress: l.lifecycleHTTPAddr,
177+
Roles: make([]databasev1.Role, 0),
178+
Labels: common.ParseNodeFlags(),
179+
CreatedAt: timestamppb.Now(),
180+
}
168181
}
169182
return nil
170183
}
@@ -309,6 +322,7 @@ func (l *lifecycleService) startServers() {
309322

310323
l.grpcServer = grpclib.NewServer(opts...)
311324
databasev1.RegisterClusterStateServiceServer(l.grpcServer, l)
325+
databasev1.RegisterNodeQueryServiceServer(l.grpcServer, l)
312326
grpc_health_v1.RegisterHealthServer(l.grpcServer, health.NewServer())
313327

314328
// Setup HTTP server

banyand/property/db.go

Lines changed: 121 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -52,14 +52,21 @@ var (
5252
propertyScope = observability.RootScope.SubScope("property")
5353
)
5454

55+
type groupShards struct {
56+
shards atomic.Pointer[[]*shard]
57+
group string
58+
location string
59+
mu sync.RWMutex
60+
}
61+
5562
type database struct {
5663
metadata metadata.Repo
5764
omr observability.MetricsRegistry
5865
lfs fs.FileSystem
5966
lock fs.File
6067
logger *logger.Logger
6168
repairScheduler *repairScheduler
62-
sLst atomic.Pointer[[]*shard]
69+
groups sync.Map
6370
location string
6471
repairBaseDir string
6572
flushInterval time.Duration
@@ -129,18 +136,29 @@ func (db *database) load(ctx context.Context) error {
129136
if db.closed.Load() {
130137
return errors.New("database is closed")
131138
}
132-
return walkDir(db.location, "shard-", func(suffix string) error {
133-
id, err := strconv.Atoi(suffix)
134-
if err != nil {
135-
return err
139+
for _, groupDir := range lfs.ReadDir(db.location) {
140+
if !groupDir.IsDir() {
141+
continue
136142
}
137-
_, err = db.loadShard(ctx, common.ShardID(id))
138-
return err
139-
})
143+
groupName := groupDir.Name()
144+
groupPath := filepath.Join(db.location, groupName)
145+
walkErr := walkDir(groupPath, "shard-", func(suffix string) error {
146+
id, parseErr := strconv.Atoi(suffix)
147+
if parseErr != nil {
148+
return parseErr
149+
}
150+
_, loadErr := db.loadShard(ctx, groupName, common.ShardID(id))
151+
return loadErr
152+
})
153+
if walkErr != nil {
154+
return walkErr
155+
}
156+
}
157+
return nil
140158
}
141159

142160
func (db *database) update(ctx context.Context, shardID common.ShardID, id []byte, property *propertyv1.Property) error {
143-
sd, err := db.loadShard(ctx, shardID)
161+
sd, err := db.loadShard(ctx, property.Metadata.Group, shardID)
144162
if err != nil {
145163
return err
146164
}
@@ -152,14 +170,18 @@ func (db *database) update(ctx context.Context, shardID common.ShardID, id []byt
152170
}
153171

154172
func (db *database) delete(ctx context.Context, docIDs [][]byte) error {
155-
sLst := db.sLst.Load()
156-
if sLst == nil {
157-
return nil
158-
}
159173
var err error
160-
for _, s := range *sLst {
161-
multierr.AppendInto(&err, s.delete(ctx, docIDs))
162-
}
174+
db.groups.Range(func(_, value any) bool {
175+
gs := value.(*groupShards)
176+
sLst := gs.shards.Load()
177+
if sLst == nil {
178+
return true
179+
}
180+
for _, s := range *sLst {
181+
multierr.AppendInto(&err, s.delete(ctx, docIDs))
182+
}
183+
return true
184+
})
163185
return err
164186
}
165187

@@ -168,14 +190,18 @@ func (db *database) query(ctx context.Context, req *propertyv1.QueryRequest) ([]
168190
if err != nil {
169191
return nil, err
170192
}
171-
sLst := db.sLst.Load()
172-
if sLst == nil {
193+
requestedGroups := make(map[string]bool, len(req.Groups))
194+
for _, g := range req.Groups {
195+
requestedGroups[g] = true
196+
}
197+
shards := db.collectGroupShards(requestedGroups)
198+
if len(shards) == 0 {
173199
return nil, nil
174200
}
175201

176202
if req.OrderBy == nil {
177203
var res []*queryProperty
178-
for _, s := range *sLst {
204+
for _, s := range shards {
179205
r, searchErr := s.search(ctx, iq, nil, int(req.Limit))
180206
if searchErr != nil {
181207
return nil, searchErr
@@ -185,8 +211,8 @@ func (db *database) query(ctx context.Context, req *propertyv1.QueryRequest) ([]
185211
return res, nil
186212
}
187213

188-
iters := make([]sort.Iterator[*queryProperty], 0, len(*sLst))
189-
for _, s := range *sLst {
214+
iters := make([]sort.Iterator[*queryProperty], 0, len(shards))
215+
for _, s := range shards {
190216
// Each shard returns pre-sorted results (via SeriesSort)
191217
r, searchErr := s.search(ctx, iq, req.OrderBy, int(req.Limit))
192218
if searchErr != nil {
@@ -216,34 +242,77 @@ func (db *database) query(ctx context.Context, req *propertyv1.QueryRequest) ([]
216242
return result, nil
217243
}
218244

219-
func (db *database) loadShard(ctx context.Context, id common.ShardID) (*shard, error) {
245+
func (db *database) collectGroupShards(requestedGroups map[string]bool) []*shard {
246+
var shards []*shard
247+
db.groups.Range(func(key, value any) bool {
248+
groupName := key.(string)
249+
if len(requestedGroups) > 0 {
250+
if _, ok := requestedGroups[groupName]; !ok {
251+
return true
252+
}
253+
}
254+
gs := value.(*groupShards)
255+
sLst := gs.shards.Load()
256+
if sLst == nil {
257+
return true
258+
}
259+
shards = append(shards, *sLst...)
260+
return true
261+
})
262+
return shards
263+
}
264+
265+
func (db *database) loadShard(ctx context.Context, group string, id common.ShardID) (*shard, error) {
220266
if db.closed.Load() {
221267
return nil, errors.New("database is closed")
222268
}
223-
if s, ok := db.getShard(id); ok {
269+
if s, ok := db.getShard(group, id); ok {
224270
return s, nil
225271
}
226272
db.mu.Lock()
227273
defer db.mu.Unlock()
228-
if s, ok := db.getShard(id); ok {
274+
if s, ok := db.getShard(group, id); ok {
229275
return s, nil
230276
}
231-
sd, err := db.newShard(context.WithValue(ctx, logger.ContextKey, db.logger), id, int64(db.flushInterval.Seconds()),
277+
278+
gs := db.getOrCreateGroupShards(group)
279+
sd, err := db.newShard(context.WithValue(ctx, logger.ContextKey, db.logger),
280+
group, id, int64(db.flushInterval.Seconds()),
232281
int64(db.expireDelete.Seconds()), db.repairBaseDir, db.repairTreeSlotCount)
233282
if err != nil {
234283
return nil, err
235284
}
236-
sLst := db.sLst.Load()
237-
if sLst == nil {
238-
sLst = &[]*shard{}
285+
286+
gs.mu.Lock()
287+
sLst := gs.shards.Load()
288+
var oldList []*shard
289+
if sLst != nil {
290+
oldList = *sLst
239291
}
240-
*sLst = append(*sLst, sd)
241-
db.sLst.Store(sLst)
292+
newList := make([]*shard, len(oldList)+1)
293+
copy(newList, oldList)
294+
newList[len(oldList)] = sd
295+
gs.shards.Store(&newList)
296+
gs.mu.Unlock()
242297
return sd, nil
243298
}
244299

245-
func (db *database) getShard(id common.ShardID) (*shard, bool) {
246-
sLst := db.sLst.Load()
300+
func (db *database) getOrCreateGroupShards(group string) *groupShards {
301+
gs := &groupShards{
302+
group: group,
303+
location: filepath.Join(db.location, group),
304+
}
305+
actual, _ := db.groups.LoadOrStore(group, gs)
306+
return actual.(*groupShards)
307+
}
308+
309+
func (db *database) getShard(group string, id common.ShardID) (*shard, bool) {
310+
value, ok := db.groups.Load(group)
311+
if !ok {
312+
return nil, false
313+
}
314+
gs := value.(*groupShards)
315+
sLst := gs.shards.Load()
247316
if sLst == nil {
248317
return nil, false
249318
}
@@ -262,13 +331,18 @@ func (db *database) close() error {
262331
if db.repairScheduler != nil {
263332
db.repairScheduler.close()
264333
}
265-
sLst := db.sLst.Load()
266334
var err error
267-
if sLst != nil {
335+
db.groups.Range(func(_, value any) bool {
336+
gs := value.(*groupShards)
337+
sLst := gs.shards.Load()
338+
if sLst == nil {
339+
return true
340+
}
268341
for _, s := range *sLst {
269342
multierr.AppendInto(&err, s.close())
270343
}
271-
}
344+
return true
345+
})
272346
db.lock.Close()
273347
return err
274348
}
@@ -277,17 +351,21 @@ func (db *database) collect() {
277351
if db.closed.Load() {
278352
return
279353
}
280-
sLst := db.sLst.Load()
281-
if sLst == nil {
282-
return
283-
}
284-
for _, s := range *sLst {
285-
s.store.CollectMetrics()
286-
}
354+
db.groups.Range(func(_, value any) bool {
355+
gs := value.(*groupShards)
356+
sLst := gs.shards.Load()
357+
if sLst == nil {
358+
return true
359+
}
360+
for _, s := range *sLst {
361+
s.store.CollectMetrics()
362+
}
363+
return true
364+
})
287365
}
288366

289367
func (db *database) repair(ctx context.Context, id []byte, shardID uint64, property *propertyv1.Property, deleteTime int64) error {
290-
s, err := db.loadShard(ctx, common.ShardID(shardID))
368+
s, err := db.loadShard(ctx, property.Metadata.Group, common.ShardID(shardID))
291369
if err != nil {
292370
return errors.WithMessagef(err, "failed to load shard %d", id)
293371
}

0 commit comments

Comments
 (0)