Skip to content

Commit 373a528

Browse files
authored
feat: metastore auto-join (#4474)
* Add -metastore.auto-join for easier scale out * Add test for metastore auto-join * Improve log messages
1 parent 330f7f3 commit 373a528

File tree

5 files changed

+213
-74
lines changed

5 files changed

+213
-74
lines changed

pkg/metastore/metastore.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,9 @@ type Metastore struct {
7474
reg prometheus.Registerer
7575
health health.Service
7676

77-
raft *raftnode.Node
78-
fsm *fsm.FSM
77+
raft *raftnode.Node
78+
fsm *fsm.FSM
79+
raftNodeClient raftnodepb.RaftNodeServiceClient
7980

8081
bucket objstore.Bucket
8182
placement *placement.Manager
@@ -116,13 +117,14 @@ func New(
116117
placementMgr *placement.Manager,
117118
) (*Metastore, error) {
118119
m := &Metastore{
119-
config: config,
120-
overrides: overrides,
121-
logger: logger,
122-
reg: reg,
123-
health: healthService,
124-
bucket: bucket,
125-
placement: placementMgr,
120+
config: config,
121+
overrides: overrides,
122+
logger: logger,
123+
reg: reg,
124+
health: healthService,
125+
bucket: bucket,
126+
placement: placementMgr,
127+
raftNodeClient: client,
126128
}
127129

128130
var err error
@@ -192,7 +194,7 @@ func (m *Metastore) buildRaftNode() (err error) {
192194
// (via FSM.Restore), if it is present. Otherwise, when no snapshots
193195
// available, the state must be initialized explicitly via FSM.Init before
194196
// we call raft.Init, which starts applying the raft log.
195-
if m.raft, err = raftnode.NewNode(m.logger, m.config.Raft, m.reg, m.fsm); err != nil {
197+
if m.raft, err = raftnode.NewNode(m.logger, m.config.Raft, m.reg, m.fsm, m.raftNodeClient); err != nil {
196198
return fmt.Errorf("failed to create raft node: %w", err)
197199
}
198200

pkg/metastore/raftnode/node.go

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ type Config struct {
3030

3131
BootstrapPeers []string `yaml:"bootstrap_peers"`
3232
BootstrapExpectPeers int `yaml:"bootstrap_expect_peers"`
33+
AutoJoin bool `yaml:"auto_join"`
3334

3435
ServerID string `yaml:"server_id"`
3536
BindAddress string `yaml:"bind_address"`
@@ -68,6 +69,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
6869

6970
f.Var((*flagext.StringSlice)(&cfg.BootstrapPeers), prefix+"bootstrap-peers", "")
7071
f.IntVar(&cfg.BootstrapExpectPeers, prefix+"bootstrap-expect-peers", 1, "Expected number of peers including the local node.")
72+
f.BoolVar(&cfg.AutoJoin, prefix+"auto-join", false, "If enabled, new nodes (without a state) will try to join an existing cluster on startup.")
7173

7274
f.StringVar(&cfg.ServerID, prefix+"server-id", "localhost:9099", "")
7375
f.StringVar(&cfg.BindAddress, prefix+"bind-address", "localhost:9099", "")
@@ -109,20 +111,24 @@ type Node struct {
109111

110112
observer *Observer
111113
service *RaftNodeService
114+
115+
raftNodeClient raftnodepb.RaftNodeServiceClient
112116
}
113117

114118
func NewNode(
115119
logger log.Logger,
116120
config Config,
117121
reg prometheus.Registerer,
118122
fsm raft.FSM,
123+
raftNodeClient raftnodepb.RaftNodeServiceClient,
119124
) (_ *Node, err error) {
120125
n := Node{
121-
logger: logger,
122-
config: config,
123-
metrics: newMetrics(reg),
124-
reg: reg,
125-
fsm: fsm,
126+
logger: logger,
127+
config: config,
128+
metrics: newMetrics(reg),
129+
reg: reg,
130+
fsm: fsm,
131+
raftNodeClient: raftNodeClient,
126132
}
127133

128134
defer func() {
@@ -176,7 +182,17 @@ func (n *Node) Init() (err error) {
176182
return fmt.Errorf("failed to check for existing state: %w", err)
177183
}
178184
if !hasState {
179-
level.Warn(n.logger).Log("msg", "no existing state found, trying to bootstrap cluster")
185+
if n.config.AutoJoin {
186+
level.Info(n.logger).Log("msg", "no existing state found and auto-join is enabled, trying to join existing raft cluster...")
187+
if err = n.tryAutoJoin(); err != nil {
188+
level.Warn(n.logger).Log("msg", "failed to auto-join raft cluster", "err", err)
189+
} else {
190+
level.Info(n.logger).Log("msg", "successfully joined existing raft cluster")
191+
return nil
192+
}
193+
}
194+
195+
level.Info(n.logger).Log("msg", "no existing state found and auto-join is disabled, bootstrapping raft cluster...")
180196
if err = n.bootstrap(); err != nil {
181197
return fmt.Errorf("failed to bootstrap cluster: %w", err)
182198
}

pkg/metastore/raftnode/node_bootstrap.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/hashicorp/raft"
1616

1717
"github.com/grafana/pyroscope/pkg/metastore/discovery"
18+
"github.com/grafana/pyroscope/pkg/metastore/raftnode/raftnodepb"
1819
)
1920

2021
func (n *Node) bootstrap() error {
@@ -68,6 +69,36 @@ func (n *Node) bootstrapPeersWithRetries() (peers []raft.Server, err error) {
6869
return nil, fmt.Errorf("failed to resolve bootstrap peers after %d retries %w", backOff.NumRetries(), err)
6970
}
7071

72+
const autoJoinTimeout = 10 * time.Second
73+
74+
func (n *Node) tryAutoJoin() error {
75+
// we can only auto-join if there is a real raft cluster running
76+
ctx, cancel := context.WithTimeout(context.Background(), autoJoinTimeout)
77+
defer cancel()
78+
79+
readIndexResp, err := n.raftNodeClient.ReadIndex(ctx, &raftnodepb.ReadIndexRequest{})
80+
if err != nil {
81+
return fmt.Errorf("failed to get current term for auto-join: %w", err)
82+
}
83+
84+
logger := log.With(n.logger,
85+
"server_id", n.config.ServerID,
86+
"advertise_address", n.config.AdvertiseAddress)
87+
88+
// try to join the cluster via the leader
89+
level.Info(logger).Log("msg", "attempting to join existing cluster", "current_term", readIndexResp.Term)
90+
_, err = n.raftNodeClient.AddNode(ctx, &raftnodepb.AddNodeRequest{
91+
ServerId: n.config.AdvertiseAddress,
92+
CurrentTerm: readIndexResp.Term,
93+
})
94+
95+
if err != nil {
96+
return fmt.Errorf("failed to auto-join cluster: %w", err)
97+
}
98+
99+
return nil
100+
}
101+
71102
func (n *Node) bootstrapPeers(prov *dns.Provider) ([]raft.Server, error) {
72103
// The peer list always includes the local node.
73104
peers := make([]raft.Server, 0, len(n.config.BootstrapPeers)+1)

pkg/test/integration/cluster/cluster_v2.go

Lines changed: 110 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -101,79 +101,88 @@ func (c *Cluster) v2Prepare(_ context.Context, memberlistJoin []string) error {
101101
metastoreLeader := c.metastoreExpectedLeader()
102102

103103
for _, comp := range c.Components {
104-
dataDir := c.dataDir(comp)
104+
if err := c.v2PrepareComponent(comp, metastoreLeader); err != nil {
105+
return err
106+
}
107+
108+
// handle memberlist join
109+
for _, m := range memberlistJoin {
110+
comp.flags = append(comp.flags, fmt.Sprintf("-memberlist.join=%s", m))
111+
}
112+
}
105113

106-
comp.cfg.V2 = true
107-
comp.flags = c.commonFlags(comp)
114+
return nil
115+
}
116+
117+
func (c *Cluster) v2PrepareComponent(comp *Component, metastoreLeader *Component) error {
118+
dataDir := c.dataDir(comp)
119+
120+
comp.cfg.V2 = true
121+
comp.flags = c.commonFlags(comp)
108122

123+
comp.flags = append(comp.flags,
124+
"-enable-query-backend=true",
125+
"-write-path=segment-writer",
126+
"-metastore.min-ready-duration=0",
127+
fmt.Sprintf("-metastore.address=%s:%d/%s", listenAddr, metastoreLeader.grpcPort, metastoreLeader.nodeName()),
128+
)
129+
130+
if c.debuginfodURL != "" && comp.Target == "query-frontend" {
109131
comp.flags = append(comp.flags,
110-
"-enable-query-backend=true",
111-
"-write-path=segment-writer",
112-
"-metastore.min-ready-duration=0",
113-
fmt.Sprintf("-metastore.address=%s:%d/%s", listenAddr, metastoreLeader.grpcPort, metastoreLeader.nodeName()),
132+
fmt.Sprintf("-symbolizer.debuginfod-url=%s", c.debuginfodURL),
133+
"-symbolizer.enabled=true",
114134
)
135+
}
115136

116-
if c.debuginfodURL != "" && comp.Target == "query-frontend" {
117-
comp.flags = append(comp.flags,
118-
fmt.Sprintf("-symbolizer.debuginfod-url=%s", c.debuginfodURL),
119-
"-symbolizer.enabled=true",
120-
)
121-
}
137+
if comp.Target == "segment-writer" {
138+
comp.flags = append(comp.flags,
139+
"-segment-writer.num-tokens=1",
140+
"-segment-writer.min-ready-duration=0",
141+
"-segment-writer.lifecycler.addr="+listenAddr,
142+
"-segment-writer.lifecycler.ID="+comp.nodeName(),
143+
"-segment-writer.heartbeat-period=1s",
144+
)
145+
}
122146

123-
if comp.Target == "segment-writer" {
124-
comp.flags = append(comp.flags,
125-
"-segment-writer.num-tokens=1",
126-
"-segment-writer.min-ready-duration=0",
127-
"-segment-writer.lifecycler.addr="+listenAddr,
128-
"-segment-writer.lifecycler.ID="+comp.nodeName(),
129-
"-segment-writer.heartbeat-period=1s",
130-
)
131-
}
147+
if comp.Target == "compaction-worker" {
148+
comp.flags = append(comp.flags,
149+
"-compaction-worker.job-concurrency=20",
150+
"-compaction-worker.job-poll-interval=1s",
151+
)
152+
}
132153

133-
if comp.Target == "compaction-worker" {
154+
// register query-backends in the frontend and themselves
155+
if comp.Target == "query-frontend" || comp.Target == "query-backend" {
156+
for _, compidx := range c.perTarget["query-backend"] {
134157
comp.flags = append(comp.flags,
135-
"-compaction-worker.job-concurrency=20",
136-
"-compaction-worker.job-poll-interval=1s",
158+
fmt.Sprintf("-query-backend.address=%s:%d", listenAddr, c.Components[compidx].grpcPort),
137159
)
138160
}
161+
}
139162

140-
// register query-backends in the frontend and themselves
141-
if comp.Target == "query-frontend" || comp.Target == "query-backend" {
142-
for _, compidx := range c.perTarget["query-backend"] {
143-
comp.flags = append(comp.flags,
144-
fmt.Sprintf("-query-backend.address=%s:%d", listenAddr, c.Components[compidx].grpcPort),
145-
)
146-
}
163+
// handle metastore folders and ports
164+
if comp.Target == "metastore" {
165+
cfgPath, err := c.metastoreConfig()
166+
if err != nil {
167+
return err
147168
}
169+
comp.flags = append(comp.flags,
170+
fmt.Sprint("-config.file=", cfgPath),
171+
fmt.Sprintf("-metastore.data-dir=%s", dataDir+"../metastore-ephemeral"),
172+
fmt.Sprintf("-metastore.raft.dir=%s", dataDir+"../metastore-raft"),
173+
fmt.Sprintf("-metastore.raft.snapshots-dir=%s", dataDir+"../metastore-snapshots"),
174+
fmt.Sprintf("-metastore.raft.bind-address=%s:%d", listenAddr, comp.raftPort),
175+
fmt.Sprintf("-metastore.raft.advertise-address=%s:%d", listenAddr, comp.raftPort),
176+
fmt.Sprintf("-metastore.raft.server-id=%s", comp.nodeName()),
177+
fmt.Sprintf("-metastore.raft.bootstrap-expect-peers=%d", len(c.perTarget[comp.Target])),
178+
)
148179

149-
// handle metastore folders and ports
150-
if comp.Target == "metastore" {
151-
cfgPath, err := c.metastoreConfig()
152-
if err != nil {
153-
return err
154-
}
180+
// add bootstrap peers
181+
for _, compidx := range c.perTarget[comp.Target] {
182+
peer := c.Components[compidx]
155183
comp.flags = append(comp.flags,
156-
fmt.Sprint("-config.file=", cfgPath),
157-
fmt.Sprintf("-metastore.data-dir=%s", dataDir+"../metastore-ephemeral"),
158-
fmt.Sprintf("-metastore.raft.dir=%s", dataDir+"../metastore-raft"),
159-
fmt.Sprintf("-metastore.raft.snapshots-dir=%s", dataDir+"../metastore-snapshots"),
160-
fmt.Sprintf("-metastore.raft.bind-address=%s:%d", listenAddr, comp.raftPort),
161-
fmt.Sprintf("-metastore.raft.advertise-address=%s:%d", listenAddr, comp.raftPort),
162-
fmt.Sprintf("-metastore.raft.server-id=%s", comp.nodeName()),
163-
fmt.Sprintf("-metastore.raft.bootstrap-expect-peers=%d", len(c.perTarget[comp.Target])),
184+
fmt.Sprintf("-metastore.raft.bootstrap-peers=%s:%d/%s", listenAddr, peer.raftPort, peer.nodeName()),
164185
)
165-
166-
// add bootstrap peers
167-
for _, compidx := range c.perTarget[comp.Target] {
168-
peer := c.Components[compidx]
169-
comp.flags = append(comp.flags,
170-
fmt.Sprintf("-metastore.raft.bootstrap-peers=%s:%d/%s", listenAddr, peer.raftPort, peer.nodeName()),
171-
)
172-
}
173-
}
174-
// handle memberlist join
175-
for _, m := range memberlistJoin {
176-
comp.flags = append(comp.flags, fmt.Sprintf("-memberlist.join=%s", m))
177186
}
178187
}
179188

@@ -236,3 +245,46 @@ func (comp *Component) metastoreReadyCheck(ctx context.Context, metastores []*Co
236245
})
237246
return err
238247
}
248+
249+
func (c *Cluster) GetMetastoreRaftNodeClient() (raftnodepb.RaftNodeServiceClient, error) {
250+
leader := c.metastoreExpectedLeader()
251+
opts := []grpc.DialOption{
252+
grpc.WithTransportCredentials(insecure.NewCredentials()),
253+
}
254+
cc, err := grpc.NewClient(fmt.Sprintf("127.0.0.1:%d", leader.grpcPort), opts...)
255+
if err != nil {
256+
return nil, err
257+
}
258+
259+
return raftnodepb.NewRaftNodeServiceClient(cc), nil
260+
}
261+
262+
func (c *Cluster) AddMetastoreWithAutoJoin(ctx context.Context) error {
263+
leader := c.metastoreExpectedLeader()
264+
265+
comp := newComponent("metastore")
266+
comp.replica = len(c.perTarget["metastore"])
267+
c.Components = append(c.Components, comp)
268+
c.perTarget["metastore"] = append(c.perTarget["metastore"], len(c.Components)-1)
269+
270+
if err := c.v2PrepareComponent(comp, leader); err != nil {
271+
return err
272+
}
273+
comp.flags = append(comp.flags, "-metastore.raft.auto-join=true")
274+
275+
p, err := comp.start(ctx)
276+
if err != nil {
277+
return fmt.Errorf("failed to start component: %w", err)
278+
}
279+
comp.p = p
280+
281+
c.wg.Add(1)
282+
go func() {
283+
defer c.wg.Done()
284+
if err := p.Run(); err != nil {
285+
fmt.Printf("metastore with auto-join stopped with error: %v\n", err)
286+
}
287+
}()
288+
289+
return nil
290+
}

pkg/test/integration/microservices_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
querierv1 "github.com/grafana/pyroscope/api/gen/proto/go/querier/v1"
2020
"github.com/grafana/pyroscope/api/gen/proto/go/querier/v1/querierv1connect"
2121
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
22+
"github.com/grafana/pyroscope/pkg/metastore/raftnode/raftnodepb"
2223
"github.com/grafana/pyroscope/pkg/pprof/testhelper"
2324
"github.com/grafana/pyroscope/pkg/tenant"
2425
"github.com/grafana/pyroscope/pkg/test/integration/cluster"
@@ -163,6 +164,43 @@ func TestMicroServicesIntegrationV2(t *testing.T) {
163164

164165
}
165166

167+
// TestMetastoreAutoJoin tests that a new metastore node can join an existing cluster
168+
// using the auto-join feature without requiring bootstrap configuration.
169+
func TestMetastoreAutoJoin(t *testing.T) {
170+
c := cluster.NewMicroServiceCluster(cluster.WithV2())
171+
ctx := context.Background()
172+
173+
require.NoError(t, c.Prepare(ctx))
174+
for _, comp := range c.Components {
175+
t.Log(comp.String())
176+
}
177+
178+
require.NoError(t, c.Start(ctx))
179+
defer func() {
180+
waitStopped := c.Stop()
181+
require.NoError(t, waitStopped(ctx))
182+
}()
183+
184+
client, err := c.GetMetastoreRaftNodeClient()
185+
require.NoError(t, err)
186+
nodeInfo, err := client.NodeInfo(ctx, &raftnodepb.NodeInfoRequest{})
187+
require.NoError(t, err)
188+
require.Equal(t, 3, len(nodeInfo.Node.Peers), "initial cluster should have 3 peers")
189+
190+
err = c.AddMetastoreWithAutoJoin(ctx)
191+
require.NoError(t, err)
192+
193+
require.Eventually(t, func() bool {
194+
nodeInfo, err := client.NodeInfo(ctx, &raftnodepb.NodeInfoRequest{})
195+
if err != nil {
196+
t.Logf("Failed to get node info: %v", err)
197+
return false
198+
}
199+
t.Logf("Current peer count: %d", len(nodeInfo.Node.Peers))
200+
return len(nodeInfo.Node.Peers) == 4
201+
}, 30*time.Second, 1*time.Second, "new metastore should join cluster")
202+
}
203+
166204
func newTestCtx(x interface {
167205
PushClient() pushv1connect.PusherServiceClient
168206
QueryClient() querierv1connect.QuerierServiceClient

0 commit comments

Comments
 (0)