Skip to content

Commit 01fe534

Browse files
Add index gateway to the read path (grafana#6385)
1 parent 9ff6218 commit 01fe534

File tree

3 files changed

+109
-38
lines changed

3 files changed

+109
-38
lines changed

pkg/loki/loki.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,7 @@ func (t *Loki) setupModuleManager() error {
511511
IngesterQuerier: {Ring},
512512
IndexGatewayRing: {RuntimeConfig, Server, MemberlistKV},
513513
All: {QueryScheduler, QueryFrontend, Querier, Ingester, Distributor, Ruler, Compactor},
514-
Read: {QueryScheduler, QueryFrontend, Querier, Ruler, Compactor},
514+
Read: {QueryScheduler, QueryFrontend, Querier, Ruler, Compactor, IndexGateway},
515515
Write: {Ingester, Distributor},
516516
MemberlistKV: {Server},
517517
}

pkg/loki/modules.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -902,6 +902,12 @@ func (t *Loki) initIndexGateway() (services.Service, error) {
902902
}
903903

904904
func (t *Loki) initIndexGatewayRing() (_ services.Service, err error) {
905+
// IndexGateway runs by default on read target, and should always assume
906+
// ring mode when run in this way.
907+
if t.isModuleActive(Read) {
908+
t.Cfg.IndexGateway.Mode = indexgateway.RingMode
909+
}
910+
905911
if t.Cfg.IndexGateway.Mode != indexgateway.RingMode {
906912
return
907913
}

pkg/loki/modules_test.go

Lines changed: 102 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77

88
"github.com/grafana/dskit/flagext"
99
"github.com/prometheus/common/model"
10+
"github.com/stretchr/testify/assert"
1011
"github.com/stretchr/testify/require"
1112

1213
"github.com/prometheus/client_golang/prometheus"
@@ -16,6 +17,8 @@ import (
1617
"github.com/grafana/loki/pkg/storage/config"
1718
"github.com/grafana/loki/pkg/storage/stores/indexshipper"
1819
"github.com/grafana/loki/pkg/storage/stores/shipper"
20+
21+
"github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway"
1922
)
2023

2124
func Test_calculateMaxLookBack(t *testing.T) {
@@ -144,44 +147,8 @@ func TestMultiKVSetup(t *testing.T) {
144147
t.Run(target, func(t *testing.T) {
145148
prepareGlobalMetricsRegistry(t)
146149

147-
cfg := Config{}
148-
cfg.SchemaConfig = config.SchemaConfig{
149-
Configs: []config.PeriodConfig{
150-
{
151-
IndexType: config.StorageTypeInMemory,
152-
ObjectType: config.StorageTypeFileSystem,
153-
RowShards: 16,
154-
Schema: "v11",
155-
From: config.DayTime{
156-
Time: model.Now(),
157-
},
158-
},
159-
},
160-
}
161-
flagext.DefaultValues(&cfg)
162-
// Set to 0 to find any free port.
163-
cfg.Server.HTTPListenPort = 0
164-
cfg.Server.GRPCListenPort = 0
165-
cfg.Target = []string{target}
166-
167-
// Must be set, otherwise MultiKV config provider will not be set.
150+
cfg := minimalWorkingConfig(t, dir, target)
168151
cfg.RuntimeConfig.LoadPath = filepath.Join(dir, "config.yaml")
169-
170-
// This would be overwritten by the default values setting.
171-
cfg.StorageConfig = storage.Config{
172-
FSConfig: local.FSConfig{Directory: dir},
173-
BoltDBShipperConfig: shipper.Config{
174-
Config: indexshipper.Config{
175-
SharedStoreType: config.StorageTypeFileSystem,
176-
ActiveIndexDirectory: dir,
177-
CacheLocation: dir,
178-
Mode: indexshipper.ModeWriteOnly,
179-
},
180-
},
181-
}
182-
cfg.Ruler.Config.StoreConfig.Type = config.StorageTypeLocal
183-
cfg.Ruler.Config.StoreConfig.Local.Directory = dir
184-
185152
c, err := New(cfg)
186153
require.NoError(t, err)
187154

@@ -193,3 +160,101 @@ func TestMultiKVSetup(t *testing.T) {
193160
})
194161
}
195162
}
163+
164+
func TestIndexGatewayRingMode_when_TargetIsRead(t *testing.T) {
165+
dir := t.TempDir()
166+
167+
t.Run("IndexGateway always set to ring mode when running as part of read target", func(t *testing.T) {
168+
cfg := minimalWorkingConfig(t, dir, Read)
169+
c, err := New(cfg)
170+
require.NoError(t, err)
171+
172+
services, err := c.ModuleManager.InitModuleServices(Read)
173+
defer func() {
174+
for _, service := range services {
175+
service.StopAsync()
176+
}
177+
}()
178+
179+
require.NoError(t, err)
180+
assert.Equal(t, c.Cfg.IndexGateway.Mode, indexgateway.RingMode)
181+
})
182+
183+
t.Run("When IndexGateway is running independent of Read target", func(t *testing.T) {
184+
t.Run("IndexGateway respects configured simple mode", func(t *testing.T) {
185+
cfg := minimalWorkingConfig(t, dir, IndexGatewayRing)
186+
cfg.IndexGateway.Mode = indexgateway.SimpleMode
187+
c, err := New(cfg)
188+
require.NoError(t, err)
189+
190+
services, err := c.ModuleManager.InitModuleServices(IndexGateway)
191+
defer func() {
192+
for _, service := range services {
193+
service.StopAsync()
194+
}
195+
}()
196+
197+
require.NoError(t, err)
198+
assert.Equal(t, c.Cfg.IndexGateway.Mode, indexgateway.SimpleMode)
199+
})
200+
201+
t.Run("IndexGateway respects configured ring mode", func(t *testing.T) {
202+
cfg := minimalWorkingConfig(t, dir, IndexGatewayRing)
203+
cfg.IndexGateway.Mode = indexgateway.RingMode
204+
c, err := New(cfg)
205+
require.NoError(t, err)
206+
207+
services, err := c.ModuleManager.InitModuleServices(IndexGateway)
208+
defer func() {
209+
for _, service := range services {
210+
service.StopAsync()
211+
}
212+
}()
213+
214+
require.NoError(t, err)
215+
assert.Equal(t, c.Cfg.IndexGateway.Mode, indexgateway.RingMode)
216+
})
217+
218+
})
219+
220+
}
221+
222+
func minimalWorkingConfig(t *testing.T, dir, target string) Config {
223+
prepareGlobalMetricsRegistry(t)
224+
225+
cfg := Config{}
226+
cfg.SchemaConfig = config.SchemaConfig{
227+
Configs: []config.PeriodConfig{
228+
{
229+
IndexType: config.StorageTypeInMemory,
230+
ObjectType: config.StorageTypeFileSystem,
231+
RowShards: 16,
232+
Schema: "v11",
233+
From: config.DayTime{
234+
Time: model.Now(),
235+
},
236+
},
237+
},
238+
}
239+
flagext.DefaultValues(&cfg)
240+
// Set to 0 to find any free port.
241+
cfg.Server.HTTPListenPort = 0
242+
cfg.Server.GRPCListenPort = 0
243+
cfg.Target = []string{target}
244+
245+
// This would be overwritten by the default values setting.
246+
cfg.StorageConfig = storage.Config{
247+
FSConfig: local.FSConfig{Directory: dir},
248+
BoltDBShipperConfig: shipper.Config{
249+
Config: indexshipper.Config{
250+
SharedStoreType: config.StorageTypeFileSystem,
251+
ActiveIndexDirectory: dir,
252+
CacheLocation: dir,
253+
Mode: indexshipper.ModeWriteOnly,
254+
},
255+
},
256+
}
257+
cfg.Ruler.Config.StoreConfig.Type = config.StorageTypeLocal
258+
cfg.Ruler.Config.StoreConfig.Local.Directory = dir
259+
return cfg
260+
}

0 commit comments

Comments
 (0)