Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 16 additions & 13 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ type KeyedStream struct {
HashKey uint32
HashKeyNoShard uint64
Stream logproto.Stream
Policy string
}

// TODO taken from Cortex, see if we can refactor out an usable interface.
Expand Down Expand Up @@ -532,33 +533,34 @@ func (d *Distributor) PushWithResolver(ctx context.Context, req *logproto.PushRe
shouldDiscoverGenericFields := fieldDetector.shouldDiscoverGenericFields()

shardStreamsCfg := d.validator.ShardStreams(tenantID)
maybeShardByRate := func(stream logproto.Stream, pushSize int) {
maybeShardByRate := func(stream logproto.Stream, pushSize int, policy string) {
if shardStreamsCfg.Enabled {
streams = append(streams, d.shardStream(stream, pushSize, tenantID)...)
streams = append(streams, d.shardStream(stream, pushSize, tenantID, policy)...)
return
}
streams = append(streams, KeyedStream{
HashKey: lokiring.TokenFor(tenantID, stream.Labels),
HashKeyNoShard: stream.Hash,
Stream: stream,
Policy: policy,
})
}

maybeShardStreams := func(stream logproto.Stream, labels labels.Labels, pushSize int) {
maybeShardStreams := func(stream logproto.Stream, labels labels.Labels, pushSize int, policy string) {
if !shardStreamsCfg.TimeShardingEnabled {
maybeShardByRate(stream, pushSize)
maybeShardByRate(stream, pushSize, policy)
return
}

ignoreRecentFrom := now.Add(-shardStreamsCfg.TimeShardingIgnoreRecent)
streamsByTime, ok := shardStreamByTime(stream, labels, d.ingesterCfg.MaxChunkAge/2, ignoreRecentFrom)
if !ok {
maybeShardByRate(stream, pushSize)
maybeShardByRate(stream, pushSize, policy)
return
}

for _, ts := range streamsByTime {
maybeShardByRate(ts.Stream, ts.linesTotalLen)
maybeShardByRate(ts.Stream, ts.linesTotalLen, policy)
}
}

Expand Down Expand Up @@ -702,7 +704,7 @@ func (d *Distributor) PushWithResolver(ctx context.Context, req *logproto.PushRe
continue
}

maybeShardStreams(stream, lbs, pushSize)
maybeShardStreams(stream, lbs, pushSize, policy)
}
return nil
}()
Expand Down Expand Up @@ -997,25 +999,25 @@ func shardStreamByTime(stream logproto.Stream, lbls labels.Labels, timeShardLen
// streams and their associated keys for hashing to ingesters.
//
// The number of shards is limited by the number of entries.
func (d *Distributor) shardStream(stream logproto.Stream, pushSize int, tenantID string) []KeyedStream {
func (d *Distributor) shardStream(stream logproto.Stream, pushSize int, tenantID string, policy string) []KeyedStream {
shardStreamsCfg := d.validator.ShardStreams(tenantID)
logger := log.With(util_log.WithUserID(tenantID, d.logger), "stream", stream.Labels)
shardCount := d.shardCountFor(logger, &stream, pushSize, tenantID, shardStreamsCfg)

if shardCount <= 1 {
return []KeyedStream{{HashKey: lokiring.TokenFor(tenantID, stream.Labels), HashKeyNoShard: stream.Hash, Stream: stream}}
return []KeyedStream{{HashKey: lokiring.TokenFor(tenantID, stream.Labels), HashKeyNoShard: stream.Hash, Stream: stream, Policy: policy}}
}

d.streamShardCount.Inc()
if shardStreamsCfg.LoggingEnabled {
level.Info(logger).Log("msg", "sharding request", "shard_count", shardCount)
}

return d.divideEntriesBetweenShards(tenantID, shardCount, shardStreamsCfg, stream)
return d.divideEntriesBetweenShards(tenantID, shardCount, shardStreamsCfg, stream, policy)
}

func (d *Distributor) divideEntriesBetweenShards(tenantID string, totalShards int, shardStreamsCfg shardstreams.Config, stream logproto.Stream) []KeyedStream {
derivedStreams := d.createShards(stream, totalShards, tenantID, shardStreamsCfg)
func (d *Distributor) divideEntriesBetweenShards(tenantID string, totalShards int, shardStreamsCfg shardstreams.Config, stream logproto.Stream, policy string) []KeyedStream {
derivedStreams := d.createShards(stream, totalShards, tenantID, shardStreamsCfg, policy)

for i := 0; i < len(stream.Entries); i++ {
streamIndex := i % len(derivedStreams)
Expand All @@ -1026,7 +1028,7 @@ func (d *Distributor) divideEntriesBetweenShards(tenantID string, totalShards in
return derivedStreams
}

func (d *Distributor) createShards(stream logproto.Stream, totalShards int, tenantID string, shardStreamsCfg shardstreams.Config) []KeyedStream {
func (d *Distributor) createShards(stream logproto.Stream, totalShards int, tenantID string, shardStreamsCfg shardstreams.Config, policy string) []KeyedStream {
var (
streamLabels = labelTemplate(stream.Labels, d.logger)
streamPattern = streamLabels.String()
Expand All @@ -1050,6 +1052,7 @@ func (d *Distributor) createShards(stream logproto.Stream, totalShards int, tena
HashKey: lokiring.TokenFor(tenantID, shard.Labels),
HashKeyNoShard: stream.Hash,
Stream: shard,
Policy: policy,
})

if shardStreamsCfg.LoggingEnabled {
Expand Down
14 changes: 7 additions & 7 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,7 @@ func TestStreamShard(t *testing.T) {
shardTracker: NewShardTracker(),
}

derivedStreams := d.shardStream(baseStream, tc.streamSize, "fake")
derivedStreams := d.shardStream(baseStream, tc.streamSize, "fake", "")
require.Len(t, derivedStreams, tc.wantDerivedStreamSize)

for _, s := range derivedStreams {
Expand Down Expand Up @@ -906,7 +906,7 @@ func TestStreamShardAcrossCalls(t *testing.T) {
shardTracker: NewShardTracker(),
}

derivedStreams := d.shardStream(baseStream, streamRate, "fake")
derivedStreams := d.shardStream(baseStream, streamRate, "fake", "")
require.Len(t, derivedStreams, 2)

for i, s := range derivedStreams {
Expand All @@ -917,7 +917,7 @@ func TestStreamShardAcrossCalls(t *testing.T) {
require.Equal(t, lbls.Get(ingester.ShardLbName), fmt.Sprint(i))
}

derivedStreams = d.shardStream(baseStream, streamRate, "fake")
derivedStreams = d.shardStream(baseStream, streamRate, "fake", "")
require.Len(t, derivedStreams, 2)

for i, s := range derivedStreams {
Expand Down Expand Up @@ -1245,7 +1245,7 @@ func BenchmarkShardStream(b *testing.B) {

b.ResetTimer()
for n := 0; n < b.N; n++ {
d.shardStream(stream, 0, "fake") //nolint:errcheck
d.shardStream(stream, 0, "fake", "") //nolint:errcheck
}
})

Expand All @@ -1255,7 +1255,7 @@ func BenchmarkShardStream(b *testing.B) {

b.ResetTimer()
for n := 0; n < b.N; n++ {
d.shardStream(stream, 0, "fake") //nolint:errcheck
d.shardStream(stream, 0, "fake", "") //nolint:errcheck
}
})

Expand All @@ -1265,7 +1265,7 @@ func BenchmarkShardStream(b *testing.B) {

b.ResetTimer()
for n := 0; n < b.N; n++ {
d.shardStream(stream, 0, "fake") //nolint:errcheck
d.shardStream(stream, 0, "fake", "") //nolint:errcheck
}
})

Expand All @@ -1275,7 +1275,7 @@ func BenchmarkShardStream(b *testing.B) {

b.ResetTimer()
for n := 0; n < b.N; n++ {
d.shardStream(stream, 0, "fake") //nolint:errcheck
d.shardStream(stream, 0, "fake", "") //nolint:errcheck
}
})
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/distributor/ingest_limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,9 @@ func newExceedsLimitsRequest(tenant string, streams []KeyedStream) (*proto.Excee
for _, stream := range streams {
entriesSize, structuredMetadataSize := calculateStreamSizes(stream.Stream)
streamMetadata = append(streamMetadata, &proto.StreamMetadata{
StreamHash: stream.HashKeyNoShard,
TotalSize: entriesSize + structuredMetadataSize,
StreamHash: stream.HashKeyNoShard,
TotalSize: entriesSize + structuredMetadataSize,
IngestionPolicy: stream.Policy,
})
}
return &proto.ExceedsLimitsRequest{
Expand Down
3 changes: 3 additions & 0 deletions pkg/limits/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type consumer struct {
client kafkaConsumer
partitionManager *partitionManager
usage *usageStore
limits Limits
// readinessCheck checks if a waiting or replaying partition can be
// switched to ready.
readinessCheck partitionReadinessCheck
Expand All @@ -50,6 +51,7 @@ func newConsumer(
client kafkaConsumer,
partitionManager *partitionManager,
usage *usageStore,
limits Limits,
readinessCheck partitionReadinessCheck,
zone string,
logger log.Logger,
Expand All @@ -59,6 +61,7 @@ func newConsumer(
client: client,
partitionManager: partitionManager,
usage: usage,
limits: limits,
readinessCheck: readinessCheck,
zone: zone,
logger: logger,
Expand Down
12 changes: 6 additions & 6 deletions pkg/limits/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ func TestConsumer_ProcessRecords(t *testing.T) {
m.SetReplaying(1, 1000)
// Create a usage store, we will use this to check if the record
// was stored.
u, err := newUsageStore(DefaultActiveWindow, DefaultRateWindow, DefaultBucketSize, 1, reg)
u, err := newUsageStore(DefaultActiveWindow, DefaultRateWindow, DefaultBucketSize, 1, &mockLimits{}, reg)
require.NoError(t, err)
u.clock = clock
c := newConsumer(&kafka, m, u, newOffsetReadinessCheck(m), "zone1",
c := newConsumer(&kafka, m, u, &mockLimits{}, newOffsetReadinessCheck(m), "zone1",
log.NewNopLogger(), prometheus.NewRegistry())
ctx := context.Background()
require.NoError(t, c.pollFetches(ctx))
Expand Down Expand Up @@ -101,10 +101,10 @@ func TestConsumer_ProcessRecords(t *testing.T) {
m.SetReady(1)
// Create a usage store, we will use this to check if the record
// was discarded.
u, err := newUsageStore(DefaultActiveWindow, DefaultRateWindow, DefaultBucketSize, 1, reg)
u, err := newUsageStore(DefaultActiveWindow, DefaultRateWindow, DefaultBucketSize, 1, &mockLimits{}, reg)
require.NoError(t, err)
u.clock = clock
c := newConsumer(&kafka, m, u, newOffsetReadinessCheck(m), "zone1",
c := newConsumer(&kafka, m, u, &mockLimits{}, newOffsetReadinessCheck(m), "zone1",
log.NewNopLogger(), prometheus.NewRegistry())
ctx := context.Background()
require.NoError(t, c.pollFetches(ctx))
Expand Down Expand Up @@ -180,10 +180,10 @@ func TestConsumer_ReadinessCheck(t *testing.T) {
// has been consumed.
m.SetReplaying(1, 2)
// We don't need the usage store for this test.
u, err := newUsageStore(DefaultActiveWindow, DefaultRateWindow, DefaultBucketSize, 1, reg)
u, err := newUsageStore(DefaultActiveWindow, DefaultRateWindow, DefaultBucketSize, 1, &mockLimits{}, reg)
require.NoError(t, err)
u.clock = clock
c := newConsumer(&kafka, m, u, newOffsetReadinessCheck(m), "zone1",
c := newConsumer(&kafka, m, u, &mockLimits{}, newOffsetReadinessCheck(m), "zone1",
log.NewNopLogger(), prometheus.NewRegistry())
// The first poll should fetch the first record.
ctx := context.Background()
Expand Down
16 changes: 10 additions & 6 deletions pkg/limits/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ import (
)

type httpTenantLimitsResponse struct {
Tenant string `json:"tenant"`
Streams uint64 `json:"streams"`
Rate float64 `json:"rate"`
Tenant string `json:"tenant"`
Streams uint64 `json:"streams"`
PerPolicyStreams map[string]uint64 `json:"per_policy_streams"`
Rate float64 `json:"rate"`
}

// ServeHTTP implements the http.Handler interface.
Expand All @@ -24,11 +25,13 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
var streams, sumBuckets uint64
perPolicyStreams := make(map[string]uint64)
s.usage.IterTenant(tenant, func(_ string, _ int32, stream streamUsage) {
streams++
for _, bucket := range stream.rateBuckets {
sumBuckets += bucket.size
}
perPolicyStreams[stream.policy]++
})
rate := float64(sumBuckets) / s.cfg.ActiveWindow.Seconds()

Expand All @@ -44,8 +47,9 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {

// Use util.WriteJSONResponse to write the JSON response
util.WriteJSONResponse(w, httpTenantLimitsResponse{
Tenant: tenant,
Streams: streams,
Rate: rate,
Tenant: tenant,
Streams: streams,
PerPolicyStreams: perPolicyStreams,
Rate: rate,
})
}
2 changes: 1 addition & 1 deletion pkg/limits/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

func TestIngestLimits_ServeHTTP(t *testing.T) {
clock := quartz.NewMock(t)
store, err := newUsageStore(DefaultActiveWindow, DefaultRateWindow, DefaultBucketSize, 1, prometheus.NewRegistry())
store, err := newUsageStore(DefaultActiveWindow, DefaultRateWindow, DefaultBucketSize, 1, &mockLimits{}, prometheus.NewRegistry())
require.NoError(t, err)
store.clock = clock
store.setForTests("tenant1", streamUsage{
Expand Down
3 changes: 2 additions & 1 deletion pkg/limits/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Limits interface {
IngestionRateBytes(userID string) float64
IngestionBurstSizeBytes(userID string) int
MaxGlobalStreamsPerUser(userID string) int
PolicyMaxGlobalStreamsPerUser(userID, policy string) int
}

type limitsChecker struct {
Expand Down Expand Up @@ -73,7 +74,7 @@ func (c *limitsChecker) ExceedsLimits(ctx context.Context, req *proto.ExceedsLim
}
streams = streams[:valid]

toProduce, accepted, rejected, err := c.store.UpdateCond(req.Tenant, streams, c.clock.Now(), c.limits)
toProduce, accepted, rejected, err := c.store.UpdateCond(req.Tenant, streams, c.clock.Now())
if err != nil {
return nil, err
}
Expand Down
14 changes: 12 additions & 2 deletions pkg/limits/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,27 @@ type mockLimits struct {
}

func (m *mockLimits) MaxGlobalStreamsPerUser(_ string) int {
return m.MaxGlobalStreams
if m.MaxGlobalStreams != 0 {
return m.MaxGlobalStreams
}
return 1000
}

func (m *mockLimits) IngestionRateBytes(_ string) float64 {
return m.IngestionRate
if m.IngestionRate != 0 {
return m.IngestionRate
}
return 0
}

func (m *mockLimits) IngestionBurstSizeBytes(_ string) int {
return 1000
}

func (m *mockLimits) PolicyMaxGlobalStreamsPerUser(_ string, _ string) int {
return 0
}

// mockKafka mocks a [kgo.Client]. The zero value is usable.
type mockKafka struct {
fetches []kgo.Fetches
Expand Down
Loading