Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions pkg/test/integration/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,19 +88,20 @@ func (t *testTransport) RoundTrip(req *http.Request) (*http.Response, error) {

func (t *testTransport) DialContext(ctx context.Context, network, addr string) (net.Conn, error) {
var err error
switch addr {
case "push:80":
addr, err = t.c.pickHealthyComponent("distributor")
if err != nil {
return nil, err
if t.c.microservices {
switch addr {
case "push:80":
addr, err = t.c.pickHealthyComponent("distributor")
case "querier:80":
addr, err = t.c.pickHealthyComponent("query-frontend", "querier")
default:
return nil, fmt.Errorf("unknown addr %s", addr)
}
case "querier:80":
addr, err = t.c.pickHealthyComponent("query-frontend", "querier")
if err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("unknown addr %s", addr)
} else {
addr, err = t.c.pickHealthyComponent("all")
}
if err != nil {
return nil, err
}

return t.defaultDialContext(ctx, network, addr)
Expand Down Expand Up @@ -135,6 +136,7 @@ type Cluster struct {
v2 bool // is this a v2 cluster
debuginfodURL string // debuginfod URL for symbolization
expectedComponents []string // number of expected components
microservices bool

tmpDir string
httpClient *http.Client
Expand Down
19 changes: 17 additions & 2 deletions pkg/test/integration/cluster/cluster_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,20 @@ import (
"path/filepath"
)

func WithV1SingleTarget() ClusterOption {
return func(c *Cluster) {
c.v2 = false
c.microservices = false
c.expectedComponents = []string{
"all",
}
}
}

func WithV1() ClusterOption {
return func(c *Cluster) {
c.v2 = false
c.microservices = true
c.expectedComponents = []string{
"distributor",
"distributor",
Expand Down Expand Up @@ -58,8 +69,6 @@ func (c *Cluster) v1Prepare(_ context.Context, memberlistJoin []string) error {
fmt.Sprintf("-blocks-storage.bucket-store.sync-dir=%s", syncDir),
fmt.Sprintf("-compactor.data-dir=%s", compactorDir),
fmt.Sprintf("-pyroscopedb.data-path=%s", dataDir),
"-distributor.replication-factor=3",
"-store-gateway.sharding-ring.replication-factor=3",
"-query-scheduler.ring.instance-id="+comp.nodeName(),
"-query-scheduler.ring.instance-addr="+listenAddr,
"-store-gateway.sharding-ring.instance-id="+comp.nodeName(),
Expand All @@ -70,6 +79,12 @@ func (c *Cluster) v1Prepare(_ context.Context, memberlistJoin []string) error {
"-ingester.lifecycler.ID="+comp.nodeName(),
"-ingester.min-ready-duration=0",
)
if c.microservices {
comp.flags = append(comp.flags,
"-distributor.replication-factor=3",
"-store-gateway.sharding-ring.replication-factor=3",
)
}

// handle memberlist join
for _, m := range memberlistJoin {
Expand Down
1 change: 1 addition & 0 deletions pkg/test/integration/cluster/cluster_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
func WithV2() ClusterOption {
return func(c *Cluster) {
c.v2 = true
c.microservices = true
c.expectedComponents = []string{
"distributor",
"distributor",
Expand Down
23 changes: 22 additions & 1 deletion pkg/test/integration/microservices_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,28 @@ import (
// of the services and runs the same queries again to check if the cluster is still
// able to respond to queries.
func TestMicroServicesIntegrationV1(t *testing.T) {
c := cluster.NewMicroServiceCluster()
tests := []struct {
name string
opts []cluster.ClusterOption
}{
{
name: "v1-microservices",
opts: []cluster.ClusterOption{cluster.WithV1()},
},
{
name: "v1-single-tareget",
opts: []cluster.ClusterOption{cluster.WithV1SingleTarget()},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
testMicroServicesIntegrationV1(t, tt.opts)
})
}
}

func testMicroServicesIntegrationV1(t *testing.T, opts []cluster.ClusterOption) {
c := cluster.NewMicroServiceCluster(opts...)
ctx := context.Background()

require.NoError(t, c.Prepare(ctx))
Expand Down
Loading