Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 69 additions & 6 deletions banyand/metadata/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/banyand/metadata/dns"
"github.com/apache/skywalking-banyandb/banyand/metadata/discovery/dns"
"github.com/apache/skywalking-banyandb/banyand/metadata/discovery/file"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/pkg/logger"
Expand All @@ -52,6 +53,8 @@ const (
NodeDiscoveryModeEtcd = "etcd"
// NodeDiscoveryModeDNS represents DNS-based node discovery mode.
NodeDiscoveryModeDNS = "dns"
// NodeDiscoveryModeFile represents file-based node discovery mode.
NodeDiscoveryModeFile = "file"
)

const flagEtcdUsername = "etcd-username"
Expand All @@ -76,6 +79,7 @@ func NewClient(toRegisterNode, forceRegisterNode bool) (Service, error) {
type clientService struct {
schemaRegistry schema.Registry
dnsDiscovery *dns.Service
fileDiscovery *file.Service
closer *run.Closer
nodeInfo *databasev1.Node
etcdTLSCertFile string
Expand All @@ -86,6 +90,7 @@ type clientService struct {
etcdTLSKeyFile string
namespace string
nodeDiscoveryMode string
filePath string
dnsSRVAddresses []string
endpoints []string
registryTimeout time.Duration
Expand All @@ -94,6 +99,7 @@ type clientService struct {
dnsFetchInterval time.Duration
grpcTimeout time.Duration
etcdFullSyncInterval time.Duration
fileFetchInterval time.Duration
nodeInfoMux sync.Mutex
forceRegisterNode bool
toRegisterNode bool
Expand All @@ -118,7 +124,7 @@ func (s *clientService) FlagSet() *run.FlagSet {

// node discovery configuration
fs.StringVar(&s.nodeDiscoveryMode, "node-discovery-mode", NodeDiscoveryModeEtcd,
"Node discovery mode: 'etcd' for etcd-based discovery, 'dns' for DNS-based discovery")
"Node discovery mode: 'etcd' for etcd-based discovery, 'dns' for DNS-based discovery, 'file' for file-based discovery")
fs.StringSliceVar(&s.dnsSRVAddresses, "node-discovery-dns-srv-addresses", []string{},
"DNS SRV addresses for node discovery (e.g., _grpc._tcp.banyandb.svc.cluster.local)")
fs.DurationVar(&s.dnsFetchInitInterval, "node-discovery-dns-fetch-init-interval", 5*time.Second,
Expand All @@ -133,13 +139,19 @@ func (s *clientService) FlagSet() *run.FlagSet {
"Enable TLS for DNS discovery gRPC connections")
fs.StringSliceVar(&s.dnsCACertPaths, "node-discovery-dns-ca-certs", []string{},
"Comma-separated list of CA certificate files to verify DNS discovered nodes (one per SRV address, in same order)")
fs.StringVar(&s.filePath, "node-discovery-file-path", "",
"File path for static node configuration (file mode only)")
fs.DurationVar(&s.fileFetchInterval, "node-discovery-file-fetch-interval", 20*time.Second,
"Fetch file interval for nodes in file discovery mode")
Copy link

Copilot AI Jan 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flag name uses "fetch-interval" but the documentation calls it "retry-interval". The flag should be renamed to "node-discovery-file-retry-interval" to match the documentation at line 311, or the documentation should be updated to use "fetch-interval".

Suggested change
fs.DurationVar(&s.fileFetchInterval, "node-discovery-file-fetch-interval", 20*time.Second,
"Fetch file interval for nodes in file discovery mode")
fs.DurationVar(&s.fileFetchInterval, "node-discovery-file-retry-interval", 20*time.Second,
"Retry interval for reading node configuration file in file discovery mode")

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be changed to this one: "Interval to poll the discovery file and retry failed nodes in file discovery mode".


return fs
}

func (s *clientService) Validate() error {
if s.nodeDiscoveryMode != NodeDiscoveryModeEtcd && s.nodeDiscoveryMode != NodeDiscoveryModeDNS {
return fmt.Errorf("invalid node-discovery-mode: %s, must be '%s' or '%s'", s.nodeDiscoveryMode, NodeDiscoveryModeEtcd, NodeDiscoveryModeDNS)
if s.nodeDiscoveryMode != NodeDiscoveryModeEtcd && s.nodeDiscoveryMode != NodeDiscoveryModeDNS &&
s.nodeDiscoveryMode != NodeDiscoveryModeFile {
return fmt.Errorf("invalid node-discovery-mode: %s, must be '%s', '%s', or '%s'",
s.nodeDiscoveryMode, NodeDiscoveryModeEtcd, NodeDiscoveryModeDNS, NodeDiscoveryModeFile)
}

// Validate etcd endpoints (always required for schema storage, regardless of node discovery mode)
Expand All @@ -163,6 +175,16 @@ func (s *clientService) Validate() error {
}
}

// Validate file mode specific requirements
if s.nodeDiscoveryMode == NodeDiscoveryModeFile {
if s.filePath == "" {
return errors.New("file mode requires non-empty file path")
}
if _, err := os.Stat(s.filePath); err != nil {
return fmt.Errorf("file path validation failed: %w", err)
}
}

return nil
}

Expand Down Expand Up @@ -230,8 +252,23 @@ func (s *clientService) PreRun(ctx context.Context) error {
}
}

// skip node registration if DNS mode is enabled or node registration is disabled
if !s.toRegisterNode || s.nodeDiscoveryMode == NodeDiscoveryModeDNS {
if s.nodeDiscoveryMode == NodeDiscoveryModeFile {
l.Info().Str("file-path", s.filePath).Msg("Initializing file-based node discovery")

var createErr error
s.fileDiscovery, createErr = file.NewService(file.Config{
FilePath: s.filePath,
GRPCTimeout: s.grpcTimeout,
FetchInterval: s.fileFetchInterval,
})
if createErr != nil {
return fmt.Errorf("failed to create file discovery service: %w", createErr)
}
}

// skip node registration if DNS/file mode is enabled or node registration is disabled
if !s.toRegisterNode || s.nodeDiscoveryMode == NodeDiscoveryModeDNS ||
s.nodeDiscoveryMode == NodeDiscoveryModeFile {
Comment on lines +281 to +283
Copy link

Copilot AI Jan 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment says "skip node registration if DNS/file mode is enabled" but the actual condition only skips if the mode is DNS or File AND toRegisterNode is true. The comment should be more precise: "skip node registration if DNS/file mode is enabled (these modes don't support manual registration) or node registration is disabled".

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrong comment, for now, the judgment is correct. If there is no need to register a node, or if it is DNS discovery, or if it is file discovery, the register would be ignored.

return nil
}
val := ctx.Value(common.ContextNodeKey)
Expand Down Expand Up @@ -299,6 +336,13 @@ func (s *clientService) Serve() run.StopNotify {
}
}

// Start file discovery
if s.fileDiscovery != nil {
if err := s.fileDiscovery.Start(s.closer.Ctx()); err != nil {
logger.GetLogger(s.Name()).Error().Err(err).Msg("failed to start file discovery")
}
}

return s.closer.CloseNotify()
}

Expand All @@ -312,6 +356,12 @@ func (s *clientService) GracefulStop() {
}
}

if s.fileDiscovery != nil {
if err := s.fileDiscovery.Close(); err != nil {
logger.GetLogger(s.Name()).Error().Err(err).Msg("failed to close file discovery")
}
}

if s.schemaRegistry != nil {
if err := s.schemaRegistry.Close(); err != nil {
logger.GetLogger(s.Name()).Error().Err(err).Msg("failed to close schema registry")
Expand All @@ -320,6 +370,10 @@ func (s *clientService) GracefulStop() {
}

func (s *clientService) RegisterHandler(name string, kind schema.Kind, handler schema.EventHandler) {
if kind == schema.KindNode && s.fileDiscovery != nil {
s.fileDiscovery.RegisterHandler(name, handler)
return
}
if kind == schema.KindNode && s.dnsDiscovery != nil {
s.dnsDiscovery.RegisterHandler(name, handler)
return
Expand Down Expand Up @@ -360,6 +414,10 @@ func (s *clientService) NodeRegistry() schema.Node {
if s.dnsDiscovery != nil {
return s.dnsDiscovery
}
// If file discovery is enabled, use it instead of etcd
if s.fileDiscovery != nil {
return s.fileDiscovery
}
// Otherwise use etcd schema registry
return s.schemaRegistry
Copy link

Copilot AI Jan 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The order of checking discovery services in NodeRegistry() is inconsistent with the order in RegisterHandler(). In RegisterHandler, file discovery is checked before DNS discovery (lines 373-379), but in NodeRegistry, DNS discovery is checked before file discovery (lines 414-422). This inconsistency could lead to confusion. Consider using a consistent priority order across all methods.

Copilot uses AI. Check for mistakes.
}
Expand All @@ -374,6 +432,11 @@ func (s *clientService) SetMetricsRegistry(omr observability.MetricsRegistry) {
factory := observability.RootScope.SubScope("metadata").SubScope("dns_discovery")
s.dnsDiscovery.SetMetrics(omr.With(factory))
}
// initialize file discovery with metrics if it exists
if s.fileDiscovery != nil {
factory := observability.RootScope.SubScope("metadata").SubScope("file_discovery")
s.fileDiscovery.SetMetrics(omr.With(factory))
}
}

func (s *clientService) Name() string {
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (

commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/banyand/metadata/dns"
"github.com/apache/skywalking-banyandb/banyand/metadata/discovery/dns"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/test/flags"
Expand Down
Loading
Loading