diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index baedbb90aba90..033ec20c52e59 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -443,6 +443,7 @@ type KeyedStream struct { HashKey uint32 HashKeyNoShard uint64 Stream logproto.Stream + Policy string } // TODO taken from Cortex, see if we can refactor out an usable interface. @@ -532,33 +533,34 @@ func (d *Distributor) PushWithResolver(ctx context.Context, req *logproto.PushRe shouldDiscoverGenericFields := fieldDetector.shouldDiscoverGenericFields() shardStreamsCfg := d.validator.ShardStreams(tenantID) - maybeShardByRate := func(stream logproto.Stream, pushSize int) { + maybeShardByRate := func(stream logproto.Stream, pushSize int, policy string) { if shardStreamsCfg.Enabled { - streams = append(streams, d.shardStream(stream, pushSize, tenantID)...) + streams = append(streams, d.shardStream(stream, pushSize, tenantID, policy)...) return } streams = append(streams, KeyedStream{ HashKey: lokiring.TokenFor(tenantID, stream.Labels), HashKeyNoShard: stream.Hash, Stream: stream, + Policy: policy, }) } - maybeShardStreams := func(stream logproto.Stream, labels labels.Labels, pushSize int) { + maybeShardStreams := func(stream logproto.Stream, labels labels.Labels, pushSize int, policy string) { if !shardStreamsCfg.TimeShardingEnabled { - maybeShardByRate(stream, pushSize) + maybeShardByRate(stream, pushSize, policy) return } ignoreRecentFrom := now.Add(-shardStreamsCfg.TimeShardingIgnoreRecent) streamsByTime, ok := shardStreamByTime(stream, labels, d.ingesterCfg.MaxChunkAge/2, ignoreRecentFrom) if !ok { - maybeShardByRate(stream, pushSize) + maybeShardByRate(stream, pushSize, policy) return } for _, ts := range streamsByTime { - maybeShardByRate(ts.Stream, ts.linesTotalLen) + maybeShardByRate(ts.Stream, ts.linesTotalLen, policy) } } @@ -702,7 +704,7 @@ func (d *Distributor) PushWithResolver(ctx context.Context, req *logproto.PushRe continue } - maybeShardStreams(stream, lbs, pushSize) + maybeShardStreams(stream, lbs, pushSize, policy) } return nil }() @@ -997,13 +999,13 @@ func shardStreamByTime(stream logproto.Stream, lbls labels.Labels, timeShardLen // streams and their associated keys for hashing to ingesters. // // The number of shards is limited by the number of entries. -func (d *Distributor) shardStream(stream logproto.Stream, pushSize int, tenantID string) []KeyedStream { +func (d *Distributor) shardStream(stream logproto.Stream, pushSize int, tenantID string, policy string) []KeyedStream { shardStreamsCfg := d.validator.ShardStreams(tenantID) logger := log.With(util_log.WithUserID(tenantID, d.logger), "stream", stream.Labels) shardCount := d.shardCountFor(logger, &stream, pushSize, tenantID, shardStreamsCfg) if shardCount <= 1 { - return []KeyedStream{{HashKey: lokiring.TokenFor(tenantID, stream.Labels), HashKeyNoShard: stream.Hash, Stream: stream}} + return []KeyedStream{{HashKey: lokiring.TokenFor(tenantID, stream.Labels), HashKeyNoShard: stream.Hash, Stream: stream, Policy: policy}} } d.streamShardCount.Inc() @@ -1011,11 +1013,11 @@ func (d *Distributor) shardStream(stream logproto.Stream, pushSize int, tenantID level.Info(logger).Log("msg", "sharding request", "shard_count", shardCount) } - return d.divideEntriesBetweenShards(tenantID, shardCount, shardStreamsCfg, stream) + return d.divideEntriesBetweenShards(tenantID, shardCount, shardStreamsCfg, stream, policy) } -func (d *Distributor) divideEntriesBetweenShards(tenantID string, totalShards int, shardStreamsCfg shardstreams.Config, stream logproto.Stream) []KeyedStream { - derivedStreams := d.createShards(stream, totalShards, tenantID, shardStreamsCfg) +func (d *Distributor) divideEntriesBetweenShards(tenantID string, totalShards int, shardStreamsCfg shardstreams.Config, stream logproto.Stream, policy string) []KeyedStream { + derivedStreams := d.createShards(stream, totalShards, tenantID, shardStreamsCfg, policy) for i := 0; i < len(stream.Entries); i++ { streamIndex := i % len(derivedStreams) @@ -1026,7 +1028,7 @@ func (d *Distributor) divideEntriesBetweenShards(tenantID string, totalShards in return derivedStreams } -func (d *Distributor) createShards(stream logproto.Stream, totalShards int, tenantID string, shardStreamsCfg shardstreams.Config) []KeyedStream { +func (d *Distributor) createShards(stream logproto.Stream, totalShards int, tenantID string, shardStreamsCfg shardstreams.Config, policy string) []KeyedStream { var ( streamLabels = labelTemplate(stream.Labels, d.logger) streamPattern = streamLabels.String() @@ -1050,6 +1052,7 @@ func (d *Distributor) createShards(stream logproto.Stream, totalShards int, tena HashKey: lokiring.TokenFor(tenantID, shard.Labels), HashKeyNoShard: stream.Hash, Stream: shard, + Policy: policy, }) if shardStreamsCfg.LoggingEnabled { diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 1da5e5ab01105..153dbe697afa1 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -861,7 +861,7 @@ func TestStreamShard(t *testing.T) { shardTracker: NewShardTracker(), } - derivedStreams := d.shardStream(baseStream, tc.streamSize, "fake") + derivedStreams := d.shardStream(baseStream, tc.streamSize, "fake", "") require.Len(t, derivedStreams, tc.wantDerivedStreamSize) for _, s := range derivedStreams { @@ -906,7 +906,7 @@ func TestStreamShardAcrossCalls(t *testing.T) { shardTracker: NewShardTracker(), } - derivedStreams := d.shardStream(baseStream, streamRate, "fake") + derivedStreams := d.shardStream(baseStream, streamRate, "fake", "") require.Len(t, derivedStreams, 2) for i, s := range derivedStreams { @@ -917,7 +917,7 @@ func TestStreamShardAcrossCalls(t *testing.T) { require.Equal(t, lbls.Get(ingester.ShardLbName), fmt.Sprint(i)) } - derivedStreams = d.shardStream(baseStream, streamRate, "fake") + derivedStreams = d.shardStream(baseStream, streamRate, "fake", "") require.Len(t, derivedStreams, 2) for i, s := range derivedStreams { @@ -1245,7 +1245,7 @@ func BenchmarkShardStream(b *testing.B) { b.ResetTimer() for n := 0; n < b.N; n++ { - d.shardStream(stream, 0, "fake") //nolint:errcheck + d.shardStream(stream, 0, "fake", "") //nolint:errcheck } }) @@ -1255,7 +1255,7 @@ func BenchmarkShardStream(b *testing.B) { b.ResetTimer() for n := 0; n < b.N; n++ { - d.shardStream(stream, 0, "fake") //nolint:errcheck + d.shardStream(stream, 0, "fake", "") //nolint:errcheck } }) @@ -1265,7 +1265,7 @@ func BenchmarkShardStream(b *testing.B) { b.ResetTimer() for n := 0; n < b.N; n++ { - d.shardStream(stream, 0, "fake") //nolint:errcheck + d.shardStream(stream, 0, "fake", "") //nolint:errcheck } }) @@ -1275,7 +1275,7 @@ func BenchmarkShardStream(b *testing.B) { b.ResetTimer() for n := 0; n < b.N; n++ { - d.shardStream(stream, 0, "fake") //nolint:errcheck + d.shardStream(stream, 0, "fake", "") //nolint:errcheck } }) } diff --git a/pkg/distributor/ingest_limits.go b/pkg/distributor/ingest_limits.go index 25290593c8a66..77d5c7522a2f0 100644 --- a/pkg/distributor/ingest_limits.go +++ b/pkg/distributor/ingest_limits.go @@ -168,8 +168,9 @@ func newExceedsLimitsRequest(tenant string, streams []KeyedStream) (*proto.Excee for _, stream := range streams { entriesSize, structuredMetadataSize := calculateStreamSizes(stream.Stream) streamMetadata = append(streamMetadata, &proto.StreamMetadata{ - StreamHash: stream.HashKeyNoShard, - TotalSize: entriesSize + structuredMetadataSize, + StreamHash: stream.HashKeyNoShard, + TotalSize: entriesSize + structuredMetadataSize, + IngestionPolicy: stream.Policy, }) } return &proto.ExceedsLimitsRequest{ diff --git a/pkg/limits/consumer.go b/pkg/limits/consumer.go index 0481c5d67ac1c..3a08023d942fb 100644 --- a/pkg/limits/consumer.go +++ b/pkg/limits/consumer.go @@ -28,6 +28,7 @@ type consumer struct { client kafkaConsumer partitionManager *partitionManager usage *usageStore + limits Limits // readinessCheck checks if a waiting or replaying partition can be // switched to ready. readinessCheck partitionReadinessCheck @@ -50,6 +51,7 @@ func newConsumer( client kafkaConsumer, partitionManager *partitionManager, usage *usageStore, + limits Limits, readinessCheck partitionReadinessCheck, zone string, logger log.Logger, @@ -59,6 +61,7 @@ func newConsumer( client: client, partitionManager: partitionManager, usage: usage, + limits: limits, readinessCheck: readinessCheck, zone: zone, logger: logger, diff --git a/pkg/limits/consumer_test.go b/pkg/limits/consumer_test.go index d984ba19d6239..5caf91fb7b955 100644 --- a/pkg/limits/consumer_test.go +++ b/pkg/limits/consumer_test.go @@ -51,10 +51,10 @@ func TestConsumer_ProcessRecords(t *testing.T) { m.SetReplaying(1, 1000) // Create a usage store, we will use this to check if the record // was stored. - u, err := newUsageStore(DefaultActiveWindow, DefaultRateWindow, DefaultBucketSize, 1, reg) + u, err := newUsageStore(DefaultActiveWindow, DefaultRateWindow, DefaultBucketSize, 1, &mockLimits{}, reg) require.NoError(t, err) u.clock = clock - c := newConsumer(&kafka, m, u, newOffsetReadinessCheck(m), "zone1", + c := newConsumer(&kafka, m, u, &mockLimits{}, newOffsetReadinessCheck(m), "zone1", log.NewNopLogger(), prometheus.NewRegistry()) ctx := context.Background() require.NoError(t, c.pollFetches(ctx)) @@ -101,10 +101,10 @@ func TestConsumer_ProcessRecords(t *testing.T) { m.SetReady(1) // Create a usage store, we will use this to check if the record // was discarded. - u, err := newUsageStore(DefaultActiveWindow, DefaultRateWindow, DefaultBucketSize, 1, reg) + u, err := newUsageStore(DefaultActiveWindow, DefaultRateWindow, DefaultBucketSize, 1, &mockLimits{}, reg) require.NoError(t, err) u.clock = clock - c := newConsumer(&kafka, m, u, newOffsetReadinessCheck(m), "zone1", + c := newConsumer(&kafka, m, u, &mockLimits{}, newOffsetReadinessCheck(m), "zone1", log.NewNopLogger(), prometheus.NewRegistry()) ctx := context.Background() require.NoError(t, c.pollFetches(ctx)) @@ -180,10 +180,10 @@ func TestConsumer_ReadinessCheck(t *testing.T) { // has been consumed. m.SetReplaying(1, 2) // We don't need the usage store for this test. - u, err := newUsageStore(DefaultActiveWindow, DefaultRateWindow, DefaultBucketSize, 1, reg) + u, err := newUsageStore(DefaultActiveWindow, DefaultRateWindow, DefaultBucketSize, 1, &mockLimits{}, reg) require.NoError(t, err) u.clock = clock - c := newConsumer(&kafka, m, u, newOffsetReadinessCheck(m), "zone1", + c := newConsumer(&kafka, m, u, &mockLimits{}, newOffsetReadinessCheck(m), "zone1", log.NewNopLogger(), prometheus.NewRegistry()) // The first poll should fetch the first record. ctx := context.Background() diff --git a/pkg/limits/http.go b/pkg/limits/http.go index 5e3a204573827..db4cca229e50c 100644 --- a/pkg/limits/http.go +++ b/pkg/limits/http.go @@ -10,9 +10,10 @@ import ( ) type httpTenantLimitsResponse struct { - Tenant string `json:"tenant"` - Streams uint64 `json:"streams"` - Rate float64 `json:"rate"` + Tenant string `json:"tenant"` + Streams uint64 `json:"streams"` + PerPolicyStreams map[string]uint64 `json:"per_policy_streams"` + Rate float64 `json:"rate"` } // ServeHTTP implements the http.Handler interface. @@ -24,11 +25,13 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } var streams, sumBuckets uint64 + perPolicyStreams := make(map[string]uint64) s.usage.IterTenant(tenant, func(_ string, _ int32, stream streamUsage) { streams++ for _, bucket := range stream.rateBuckets { sumBuckets += bucket.size } + perPolicyStreams[stream.policy]++ }) rate := float64(sumBuckets) / s.cfg.ActiveWindow.Seconds() @@ -44,8 +47,9 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Use util.WriteJSONResponse to write the JSON response util.WriteJSONResponse(w, httpTenantLimitsResponse{ - Tenant: tenant, - Streams: streams, - Rate: rate, + Tenant: tenant, + Streams: streams, + PerPolicyStreams: perPolicyStreams, + Rate: rate, }) } diff --git a/pkg/limits/http_test.go b/pkg/limits/http_test.go index fdac617280c7d..a4f29a80c0d6a 100644 --- a/pkg/limits/http_test.go +++ b/pkg/limits/http_test.go @@ -17,7 +17,7 @@ import ( func TestIngestLimits_ServeHTTP(t *testing.T) { clock := quartz.NewMock(t) - store, err := newUsageStore(DefaultActiveWindow, DefaultRateWindow, DefaultBucketSize, 1, prometheus.NewRegistry()) + store, err := newUsageStore(DefaultActiveWindow, DefaultRateWindow, DefaultBucketSize, 1, &mockLimits{}, prometheus.NewRegistry()) require.NoError(t, err) store.clock = clock store.setForTests("tenant1", streamUsage{ diff --git a/pkg/limits/limit.go b/pkg/limits/limit.go index 83d7a54a45826..7d88e32ba8d43 100644 --- a/pkg/limits/limit.go +++ b/pkg/limits/limit.go @@ -18,6 +18,7 @@ type Limits interface { IngestionRateBytes(userID string) float64 IngestionBurstSizeBytes(userID string) int MaxGlobalStreamsPerUser(userID string) int + PolicyMaxGlobalStreamsPerUser(userID, policy string) int } type limitsChecker struct { @@ -73,7 +74,7 @@ func (c *limitsChecker) ExceedsLimits(ctx context.Context, req *proto.ExceedsLim } streams = streams[:valid] - toProduce, accepted, rejected, err := c.store.UpdateCond(req.Tenant, streams, c.clock.Now(), c.limits) + toProduce, accepted, rejected, err := c.store.UpdateCond(req.Tenant, streams, c.clock.Now()) if err != nil { return nil, err } diff --git a/pkg/limits/mock_test.go b/pkg/limits/mock_test.go index fe4027b046ef9..5472fd4e65543 100644 --- a/pkg/limits/mock_test.go +++ b/pkg/limits/mock_test.go @@ -13,17 +13,27 @@ type mockLimits struct { } func (m *mockLimits) MaxGlobalStreamsPerUser(_ string) int { - return m.MaxGlobalStreams + if m.MaxGlobalStreams != 0 { + return m.MaxGlobalStreams + } + return 1000 } func (m *mockLimits) IngestionRateBytes(_ string) float64 { - return m.IngestionRate + if m.IngestionRate != 0 { + return m.IngestionRate + } + return 0 } func (m *mockLimits) IngestionBurstSizeBytes(_ string) int { return 1000 } +func (m *mockLimits) PolicyMaxGlobalStreamsPerUser(_ string, _ string) int { + return 0 +} + // mockKafka mocks a [kgo.Client]. The zero value is usable. type mockKafka struct { fetches []kgo.Fetches diff --git a/pkg/limits/proto/limits.pb.go b/pkg/limits/proto/limits.pb.go index fb9e7481b4556..6bb74ceaaea3e 100644 --- a/pkg/limits/proto/limits.pb.go +++ b/pkg/limits/proto/limits.pb.go @@ -255,6 +255,8 @@ func (m *GetAssignedPartitionsResponse) GetAssignedPartitions() map[int32]int64 type StreamMetadata struct { StreamHash uint64 `protobuf:"varint,1,opt,name=streamHash,proto3" json:"streamHash,omitempty"` TotalSize uint64 `protobuf:"varint,2,opt,name=totalSize,proto3" json:"totalSize,omitempty"` + // The resolved ingestion policy for this stream if any. May be used to override some ingestion limits for this stream such as the max streams allowed. + IngestionPolicy string `protobuf:"bytes,3,opt,name=ingestionPolicy,proto3" json:"ingestionPolicy,omitempty"` } func (m *StreamMetadata) Reset() { *m = StreamMetadata{} } @@ -303,6 +305,13 @@ func (m *StreamMetadata) GetTotalSize() uint64 { return 0 } +func (m *StreamMetadata) GetIngestionPolicy() string { + if m != nil { + return m.IngestionPolicy + } + return "" +} + type StreamMetadataRecord struct { Zone string `protobuf:"bytes,1,opt,name=zone,proto3" json:"zone,omitempty"` Tenant string `protobuf:"bytes,2,opt,name=tenant,proto3" json:"tenant,omitempty"` @@ -376,38 +385,39 @@ func init() { func init() { proto.RegisterFile("pkg/limits/proto/limits.proto", fileDescriptor_aaed9e7d5298ac0f) } var fileDescriptor_aaed9e7d5298ac0f = []byte{ - // 487 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x92, 0xbf, 0x6e, 0x13, 0x41, - 0x10, 0xc6, 0x6f, 0xed, 0x38, 0x21, 0x03, 0x41, 0x68, 0xb1, 0xe1, 0x64, 0x9c, 0x95, 0x75, 0x50, - 0xb8, 0xb2, 0x85, 0x49, 0x81, 0x10, 0x0d, 0x48, 0xe6, 0x8f, 0x14, 0x23, 0xb4, 0x79, 0x00, 0xb4, - 0xe4, 0x46, 0xe6, 0x94, 0xf3, 0x9e, 0xb9, 0x9d, 0x43, 0x24, 0x15, 0x8f, 0xc0, 0x63, 0xf0, 0x1c, - 0x54, 0x94, 0x2e, 0x28, 0x52, 0xe2, 0x73, 0x43, 0x99, 0x47, 0x40, 0xd9, 0x5b, 0x83, 0xed, 0xd8, - 0x81, 0x82, 0xca, 0x3b, 0x3b, 0x9f, 0xbf, 0xb9, 0x6f, 0xe7, 0x07, 0xbb, 0xa3, 0xa3, 0x41, 0x27, - 0x8e, 0x86, 0x11, 0x99, 0xce, 0x28, 0x4d, 0x28, 0x71, 0x45, 0xdb, 0x16, 0xbc, 0x62, 0x7f, 0x82, - 0x37, 0x50, 0xed, 0x7d, 0x3c, 0x44, 0x0c, 0xcd, 0xbe, 0xed, 0x4a, 0x7c, 0x9f, 0xa1, 0x21, 0x7e, - 0x0b, 0x36, 0x09, 0xb5, 0xd2, 0xe4, 0xb3, 0x26, 0x6b, 0x6d, 0x4b, 0x57, 0xf1, 0x0e, 0x6c, 0x19, - 0x4a, 0x51, 0x0d, 0x8d, 0x5f, 0x6a, 0x96, 0x5b, 0x57, 0xbb, 0xb5, 0xc2, 0xaf, 0x7d, 0x60, 0x6f, - 0xfb, 0x48, 0x2a, 0x54, 0xa4, 0xe4, 0x4c, 0x15, 0xf4, 0xa1, 0xb6, 0x34, 0xc0, 0x8c, 0x12, 0x6d, - 0x90, 0xef, 0xc1, 0x56, 0x8a, 0x26, 0x8b, 0xc9, 0xf8, 0xcc, 0x3a, 0xd5, 0x9d, 0xd3, 0xb2, 0x3c, - 0x8b, 0x49, 0xce, 0xa4, 0x41, 0x1f, 0x6e, 0xae, 0xe8, 0x73, 0x01, 0x50, 0x0c, 0x7c, 0xa1, 0xcc, - 0x3b, 0xfb, 0xc9, 0x1b, 0x72, 0xee, 0xe6, 0x3c, 0x4e, 0x8a, 0xca, 0x24, 0xda, 0x2f, 0x35, 0x59, - 0x6b, 0x47, 0xba, 0x2a, 0x10, 0xd0, 0x78, 0x8e, 0xf4, 0xc4, 0x98, 0x68, 0xa0, 0x31, 0x7c, 0xad, - 0x52, 0x8a, 0x28, 0x4a, 0xf4, 0xec, 0x19, 0x82, 0xef, 0x0c, 0x76, 0xd7, 0x08, 0x5c, 0x8c, 0x18, - 0xb8, 0xba, 0xd0, 0x75, 0x89, 0x1e, 0xbb, 0x44, 0x97, 0x3a, 0xb4, 0x2f, 0xb6, 0x7a, 0x9a, 0xd2, - 0x63, 0xb9, 0xc2, 0xb7, 0xde, 0x83, 0xdb, 0x6b, 0xe4, 0xfc, 0x06, 0x94, 0x8f, 0xf0, 0xd8, 0x66, - 0xaf, 0xc8, 0xf3, 0x23, 0xaf, 0x42, 0xe5, 0x83, 0x8a, 0x33, 0xb4, 0x99, 0xcb, 0xb2, 0x28, 0x1e, - 0x95, 0x1e, 0xb2, 0xe0, 0x15, 0x5c, 0x5f, 0xdc, 0xd7, 0x5f, 0x1f, 0xb0, 0x01, 0xdb, 0x94, 0x90, - 0x8a, 0x0f, 0xa2, 0x93, 0xc2, 0x6f, 0x43, 0xfe, 0xb9, 0x08, 0x32, 0xa8, 0x2e, 0xed, 0x1f, 0x0f, - 0x93, 0x34, 0xe4, 0x1c, 0x36, 0x4e, 0x12, 0x8d, 0x8e, 0x21, 0x7b, 0x9e, 0x23, 0xab, 0xb4, 0x40, - 0xd6, 0x7d, 0xb8, 0x32, 0x74, 0xff, 0xf6, 0xcb, 0x4d, 0xb6, 0x1e, 0xad, 0xdf, 0xb2, 0x6e, 0x08, - 0xd5, 0x97, 0x7a, 0x80, 0x86, 0x0a, 0x16, 0x9e, 0xa5, 0x89, 0x26, 0xd4, 0x21, 0xdf, 0x87, 0x9d, - 0x05, 0x48, 0xf8, 0x9d, 0xd5, 0x68, 0xd9, 0x1d, 0xd7, 0x1b, 0x6b, 0xb8, 0xb3, 0xdb, 0x09, 0xbc, - 0xee, 0x57, 0x06, 0xd7, 0xe6, 0xc7, 0xfc, 0x5f, 0x7b, 0x1e, 0x42, 0x6d, 0x25, 0x1f, 0xfc, 0xee, - 0xe5, 0xf4, 0x14, 0xee, 0xf7, 0xfe, 0x05, 0xb1, 0xc0, 0x7b, 0xba, 0x37, 0x9e, 0x08, 0xef, 0x74, - 0x22, 0xbc, 0xb3, 0x89, 0x60, 0x9f, 0x72, 0xc1, 0xbe, 0xe4, 0x82, 0x7d, 0xcb, 0x05, 0x1b, 0xe7, - 0x82, 0xfd, 0xc8, 0x05, 0xfb, 0x99, 0x0b, 0xef, 0x2c, 0x17, 0xec, 0xf3, 0x54, 0x78, 0xe3, 0xa9, - 0xf0, 0x4e, 0xa7, 0xc2, 0x7b, 0xbb, 0x69, 0xcd, 0x1f, 0xfc, 0x0a, 0x00, 0x00, 0xff, 0xff, 0x36, - 0x1b, 0xa0, 0x70, 0x4c, 0x04, 0x00, 0x00, + // 503 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x92, 0x4d, 0x6f, 0x12, 0x41, + 0x18, 0xc7, 0x77, 0xa0, 0xb4, 0xf6, 0xd1, 0xaa, 0x19, 0x41, 0x37, 0x48, 0x27, 0x64, 0xf5, 0xc0, + 0x09, 0x22, 0xf6, 0x60, 0x8c, 0x17, 0x4d, 0xf0, 0x25, 0x29, 0x49, 0x33, 0xfd, 0x00, 0x66, 0x64, + 0x9f, 0xe0, 0xa6, 0xcb, 0x0c, 0xee, 0xcc, 0x9a, 0xd2, 0x93, 0x1f, 0xc1, 0x8f, 0xe1, 0xe7, 0xf0, + 0xe4, 0x91, 0x83, 0x87, 0x1e, 0x65, 0xb9, 0x78, 0xec, 0x47, 0x30, 0xcc, 0x0e, 0x0a, 0x14, 0xaa, + 0x07, 0x4f, 0x3b, 0xcf, 0xcb, 0xfe, 0x9f, 0xe7, 0x3f, 0xf3, 0x83, 0xfd, 0xe1, 0x49, 0xbf, 0x15, + 0x47, 0x83, 0xc8, 0xe8, 0xd6, 0x30, 0x51, 0x46, 0xb9, 0xa0, 0x69, 0x03, 0x5a, 0xb2, 0x9f, 0xe0, + 0x2d, 0x94, 0x3b, 0xa7, 0x3d, 0xc4, 0x50, 0x1f, 0xda, 0x2a, 0xc7, 0x0f, 0x29, 0x6a, 0x43, 0xef, + 0xc2, 0xb6, 0x41, 0x29, 0xa4, 0xf1, 0x49, 0x9d, 0x34, 0x76, 0xb9, 0x8b, 0x68, 0x0b, 0x76, 0xb4, + 0x49, 0x50, 0x0c, 0xb4, 0x5f, 0xa8, 0x17, 0x1b, 0xd7, 0xdb, 0x95, 0x5c, 0xaf, 0x79, 0x6c, 0xb3, + 0x5d, 0x34, 0x22, 0x14, 0x46, 0xf0, 0x79, 0x57, 0xd0, 0x85, 0xca, 0xca, 0x00, 0x3d, 0x54, 0x52, + 0x23, 0x3d, 0x80, 0x9d, 0x04, 0x75, 0x1a, 0x1b, 0xed, 0x13, 0xab, 0x54, 0x75, 0x4a, 0xab, 0xed, + 0x69, 0x6c, 0xf8, 0xbc, 0x35, 0xe8, 0xc2, 0x9d, 0x35, 0x75, 0xca, 0x00, 0xf2, 0x81, 0xaf, 0x85, + 0x7e, 0x6f, 0x57, 0xde, 0xe2, 0x0b, 0x99, 0x99, 0x9d, 0x04, 0x85, 0x56, 0xd2, 0x2f, 0xd4, 0x49, + 0x63, 0x8f, 0xbb, 0x28, 0x60, 0x50, 0x7b, 0x85, 0xe6, 0xb9, 0xd6, 0x51, 0x5f, 0x62, 0x78, 0x24, + 0x12, 0x13, 0x99, 0x48, 0xc9, 0xf9, 0x35, 0x04, 0xdf, 0x09, 0xec, 0x6f, 0x68, 0x70, 0x36, 0x62, + 0xa0, 0xe2, 0x52, 0xd5, 0x39, 0x7a, 0xe6, 0x1c, 0x5d, 0xa9, 0xd0, 0xbc, 0x5c, 0xea, 0x48, 0x93, + 0x8c, 0xf8, 0x1a, 0xdd, 0x6a, 0x07, 0xee, 0x6d, 0x68, 0xa7, 0xb7, 0xa1, 0x78, 0x82, 0x23, 0xeb, + 0xbd, 0xc4, 0x67, 0x47, 0x5a, 0x86, 0xd2, 0x47, 0x11, 0xa7, 0x68, 0x3d, 0x17, 0x79, 0x1e, 0x3c, + 0x2d, 0x3c, 0x21, 0xc1, 0x29, 0xdc, 0x5c, 0x7e, 0xaf, 0xbf, 0x5e, 0x60, 0x0d, 0x76, 0x8d, 0x32, + 0x22, 0x3e, 0x8e, 0xce, 0x72, 0xbd, 0x2d, 0xfe, 0x27, 0x41, 0x1b, 0x70, 0x2b, 0x92, 0x7d, 0xd4, + 0xb3, 0x75, 0x8e, 0x54, 0x1c, 0xf5, 0x46, 0x7e, 0xd1, 0x62, 0xb3, 0x9a, 0x0e, 0x52, 0x28, 0xaf, + 0x90, 0x82, 0x3d, 0x95, 0x84, 0x94, 0xc2, 0xd6, 0x99, 0x92, 0xe8, 0x68, 0xb3, 0xe7, 0x05, 0x06, + 0x0b, 0x4b, 0x0c, 0x3e, 0x82, 0x6b, 0x03, 0xf7, 0xb7, 0x1d, 0xb3, 0x11, 0xc2, 0xdf, 0x6d, 0xed, + 0x10, 0xca, 0x6f, 0xec, 0x26, 0x39, 0x35, 0x2f, 0x13, 0x25, 0x0d, 0xca, 0x90, 0x1e, 0xc2, 0xde, + 0x12, 0x4e, 0xf4, 0xfe, 0x7a, 0x08, 0x2d, 0x0d, 0xd5, 0xda, 0x06, 0x42, 0xed, 0x3b, 0x06, 0x5e, + 0xfb, 0x2b, 0x81, 0x1b, 0x8b, 0x63, 0xfe, 0xaf, 0x3c, 0x0d, 0xa1, 0xb2, 0x96, 0x24, 0xfa, 0xe0, + 0x6a, 0xce, 0x72, 0xf5, 0x87, 0xff, 0x02, 0x63, 0xe0, 0xbd, 0x38, 0x18, 0x4f, 0x98, 0x77, 0x3e, + 0x61, 0xde, 0xc5, 0x84, 0x91, 0x4f, 0x19, 0x23, 0x5f, 0x32, 0x46, 0xbe, 0x65, 0x8c, 0x8c, 0x33, + 0x46, 0x7e, 0x64, 0x8c, 0xfc, 0xcc, 0x98, 0x77, 0x91, 0x31, 0xf2, 0x79, 0xca, 0xbc, 0xf1, 0x94, + 0x79, 0xe7, 0x53, 0xe6, 0xbd, 0xdb, 0xb6, 0xe2, 0x8f, 0x7f, 0x05, 0x00, 0x00, 0xff, 0xff, 0xd3, + 0xc4, 0x41, 0xd4, 0x76, 0x04, 0x00, 0x00, } func (this *ExceedsLimitsRequest) Equal(that interface{}) bool { @@ -573,6 +583,9 @@ func (this *StreamMetadata) Equal(that interface{}) bool { if this.TotalSize != that1.TotalSize { return false } + if this.IngestionPolicy != that1.IngestionPolicy { + return false + } return true } func (this *StreamMetadataRecord) Equal(that interface{}) bool { @@ -676,10 +689,11 @@ func (this *StreamMetadata) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 6) + s := make([]string, 0, 7) s = append(s, "&proto.StreamMetadata{") s = append(s, "StreamHash: "+fmt.Sprintf("%#v", this.StreamHash)+",\n") s = append(s, "TotalSize: "+fmt.Sprintf("%#v", this.TotalSize)+",\n") + s = append(s, "IngestionPolicy: "+fmt.Sprintf("%#v", this.IngestionPolicy)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -1089,6 +1103,13 @@ func (m *StreamMetadata) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if len(m.IngestionPolicy) > 0 { + i -= len(m.IngestionPolicy) + copy(dAtA[i:], m.IngestionPolicy) + i = encodeVarintLimits(dAtA, i, uint64(len(m.IngestionPolicy))) + i-- + dAtA[i] = 0x1a + } if m.TotalSize != 0 { i = encodeVarintLimits(dAtA, i, uint64(m.TotalSize)) i-- @@ -1249,6 +1270,10 @@ func (m *StreamMetadata) Size() (n int) { if m.TotalSize != 0 { n += 1 + sovLimits(uint64(m.TotalSize)) } + l = len(m.IngestionPolicy) + if l > 0 { + n += 1 + l + sovLimits(uint64(l)) + } return n } @@ -1357,6 +1382,7 @@ func (this *StreamMetadata) String() string { s := strings.Join([]string{`&StreamMetadata{`, `StreamHash:` + fmt.Sprintf("%v", this.StreamHash) + `,`, `TotalSize:` + fmt.Sprintf("%v", this.TotalSize) + `,`, + `IngestionPolicy:` + fmt.Sprintf("%v", this.IngestionPolicy) + `,`, `}`, }, "") return s @@ -1950,6 +1976,38 @@ func (m *StreamMetadata) Unmarshal(dAtA []byte) error { break } } + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field IngestionPolicy", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLimits + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthLimits + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthLimits + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.IngestionPolicy = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipLimits(dAtA[iNdEx:]) diff --git a/pkg/limits/proto/limits.proto b/pkg/limits/proto/limits.proto index 5cd39ac881e45..9b9c5f7786b97 100644 --- a/pkg/limits/proto/limits.proto +++ b/pkg/limits/proto/limits.proto @@ -34,6 +34,8 @@ message GetAssignedPartitionsResponse { message StreamMetadata { uint64 streamHash = 1; uint64 totalSize = 2; + // The resolved ingestion policy for this stream if any. May be used to override some ingestion limits for this stream such as the max streams allowed. + string ingestionPolicy = 3; } message StreamMetadataRecord { diff --git a/pkg/limits/service.go b/pkg/limits/service.go index cff3a96d82c7e..f8a513674d678 100644 --- a/pkg/limits/service.go +++ b/pkg/limits/service.go @@ -82,7 +82,7 @@ func New(cfg Config, limits Limits, logger log.Logger, reg prometheus.Registerer if err != nil { return nil, fmt.Errorf("failed to create partition manager: %w", err) } - s.usage, err = newUsageStore(cfg.ActiveWindow, cfg.RateWindow, cfg.BucketSize, cfg.NumPartitions, reg) + s.usage, err = newUsageStore(cfg.ActiveWindow, cfg.RateWindow, cfg.BucketSize, cfg.NumPartitions, limits, reg) if err != nil { return nil, fmt.Errorf("failed to create usage store: %w", err) } @@ -143,6 +143,7 @@ func New(cfg Config, limits Limits, logger log.Logger, reg prometheus.Registerer s.kafkaReader, s.partitionManager, s.usage, + limits, newOffsetReadinessCheck(s.partitionManager), cfg.LifecyclerConfig.Zone, logger, diff --git a/pkg/limits/store.go b/pkg/limits/store.go index bc462c960d218..a144995993c84 100644 --- a/pkg/limits/store.go +++ b/pkg/limits/store.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "hash/fnv" + "math" "sync" "time" @@ -38,6 +39,20 @@ var ( // iterateFunc is a closure called for each stream. type iterateFunc func(tenant string, partition int32, stream streamUsage) +// getPolicyBucketAndLimit determines which policy bucket to use and the max streams limit +// for a given tenant and policy. Returns the policy bucket name and the max streams limit. +// The policy bucket will be the input policy name only if the max streams limit is overriden for the policy. +func (s *usageStore) getPolicyBucketAndLimit(tenant, policy string) (policyBucket string, maxStreams uint64) { + defaultMaxStreams := uint64(math.Max(float64(s.limits.MaxGlobalStreamsPerUser(tenant)/s.numPartitions), 1)) + + if policy != "" { + if policyMaxStreams := s.limits.PolicyMaxGlobalStreamsPerUser(tenant, policy); policyMaxStreams > 0 { + return policy, uint64(math.Max(float64(policyMaxStreams/s.numPartitions), 1)) // Use policy-specific bucket + } + } + return "", defaultMaxStreams // Use default bucket +} + // usageStore stores per-tenant stream usage data. type usageStore struct { activeWindow time.Duration @@ -47,13 +62,16 @@ type usageStore struct { numPartitions int stripes []map[string]tenantUsage locks []stripeLock + limits Limits // Used for tests. clock quartz.Clock } // tenantUsage contains the per-partition stream usage for a tenant. -type tenantUsage map[int32]map[uint64]streamUsage +// The structure is: partition -> policy -> streamHash -> streamUsage +// Policy "" represents streams that don't match any specific policy. +type tenantUsage map[int32]map[string]map[uint64]streamUsage // streamUsage represents the metadata for a stream loaded from the kafka topic. // It contains the minimal information to count per tenant active streams and @@ -68,6 +86,7 @@ type streamUsage struct { // implementing rate limits to a later date in the future. totalSize uint64 rateBuckets []rateBucket + policy string // The ingestion policy for this stream if any } // RateBucket represents the bytes received during a specific time interval @@ -84,7 +103,7 @@ type stripeLock struct { } // newUsageStore returns a new UsageStore. -func newUsageStore(activeWindow, rateWindow, bucketSize time.Duration, numPartitions int, reg prometheus.Registerer) (*usageStore, error) { +func newUsageStore(activeWindow, rateWindow, bucketSize time.Duration, numPartitions int, limits Limits, reg prometheus.Registerer) (*usageStore, error) { s := &usageStore{ activeWindow: activeWindow, rateWindow: rateWindow, @@ -93,6 +112,7 @@ func newUsageStore(activeWindow, rateWindow, bucketSize time.Duration, numPartit numPartitions: numPartitions, stripes: make([]map[string]tenantUsage, numStripes), locks: make([]stripeLock, numStripes), + limits: limits, clock: quartz.NewReal(), } for i := range s.stripes { @@ -116,14 +136,16 @@ func (s *usageStore) Iter(f iterateFunc) { ) s.forEachRLock(func(i int) { for tenant, partitions := range s.stripes[i] { - for partition, streams := range partitions { - for _, stream := range streams { - if withinActiveWindow(stream.lastSeenAt) { - stream.rateBuckets = getActiveRateBuckets( - stream.rateBuckets, - withinRateWindow, - ) - f(tenant, partition, stream) + for partition, policies := range partitions { + for _, streams := range policies { + for _, stream := range streams { + if withinActiveWindow(stream.lastSeenAt) { + stream.rateBuckets = getActiveRateBuckets( + stream.rateBuckets, + withinRateWindow, + ) + f(tenant, partition, stream) + } } } } @@ -143,14 +165,16 @@ func (s *usageStore) IterTenant(tenant string, f iterateFunc) { withinRateWindow = s.newRateWindowFunc(now) ) s.withRLock(tenant, func(i int) { - for partition, streams := range s.stripes[i][tenant] { - for _, stream := range streams { - if withinActiveWindow(stream.lastSeenAt) { - stream.rateBuckets = getActiveRateBuckets( - stream.rateBuckets, - withinRateWindow, - ) - f(tenant, partition, stream) + for partition, policies := range s.stripes[i][tenant] { + for _, streams := range policies { + for _, stream := range streams { + if withinActiveWindow(stream.lastSeenAt) { + stream.rateBuckets = getActiveRateBuckets( + stream.rateBuckets, + withinRateWindow, + ) + f(tenant, partition, stream) + } } } } @@ -187,30 +211,35 @@ func (s *usageStore) Update(tenant string, metadata *proto.StreamMetadata, seenA return errOutsideActiveWindow } partition := s.getPartitionForHash(metadata.StreamHash) + policyBucket, _ := s.getPolicyBucketAndLimit(tenant, metadata.IngestionPolicy) s.withLock(tenant, func(i int) { - s.update(i, tenant, partition, metadata, seenAt) + s.update(i, tenant, partition, policyBucket, metadata, seenAt) }) return nil } -func (s *usageStore) UpdateCond(tenant string, metadata []*proto.StreamMetadata, seenAt time.Time, limits Limits) ([]*proto.StreamMetadata, []*proto.StreamMetadata, []*proto.StreamMetadata, error) { +func (s *usageStore) UpdateCond(tenant string, metadata []*proto.StreamMetadata, seenAt time.Time) ([]*proto.StreamMetadata, []*proto.StreamMetadata, []*proto.StreamMetadata, error) { if !s.withinActiveWindow(seenAt.UnixNano()) { return nil, nil, nil, errOutsideActiveWindow } var ( - now = s.clock.Now() - toProduce = make([]*proto.StreamMetadata, 0, len(metadata)) - accepted = make([]*proto.StreamMetadata, 0, len(metadata)) - rejected = make([]*proto.StreamMetadata, 0, len(metadata)) - cutoff = seenAt.Add(-s.activeWindow).UnixNano() - maxStreams = uint64(limits.MaxGlobalStreamsPerUser(tenant) / s.numPartitions) + now = s.clock.Now() + toProduce = make([]*proto.StreamMetadata, 0, len(metadata)) + accepted = make([]*proto.StreamMetadata, 0, len(metadata)) + rejected = make([]*proto.StreamMetadata, 0, len(metadata)) + cutoff = seenAt.Add(-s.activeWindow).UnixNano() ) s.withLock(tenant, func(i int) { for _, m := range metadata { partition := s.getPartitionForHash(m.StreamHash) - s.checkInitMap(i, tenant, partition) - streams := s.stripes[i][tenant][partition] + + // Determine which policy bucket to use and the max streams limit + policyBucket, maxStreamsForStream := s.getPolicyBucketAndLimit(tenant, m.IngestionPolicy) + + s.checkInitMap(i, tenant, partition, policyBucket) + streams := s.stripes[i][tenant][partition][policyBucket] stream, ok := streams[m.StreamHash] + // If the stream does not exist, or exists but has expired, // we need to check if accepting it would exceed the maximum // stream limit. @@ -226,17 +255,18 @@ func (s *usageStore) UpdateCond(tenant string, metadata []*proto.StreamMetadata, // in the partition which is O(N) instead of O(1). Instead, // we accept that expired streams will be counted towards the // limit until evicted. - numStreams := uint64(len(s.stripes[i][tenant][partition])) - if numStreams >= maxStreams { + numStreams := uint64(len(s.stripes[i][tenant][partition][policyBucket])) + + if numStreams >= maxStreamsForStream { rejected = append(rejected, m) continue } } - s.update(i, tenant, partition, m, seenAt) + s.update(i, tenant, partition, policyBucket, m, seenAt) // Hard-coded produce cutoff of 1 minute. produceCutoff := now.Add(-time.Minute).UnixNano() if stream.lastProducedAt < produceCutoff { - s.setLastProducedAt(i, tenant, partition, m.StreamHash, now) + s.setLastProducedAt(i, tenant, partition, m.StreamHash, policyBucket, now) toProduce = append(toProduce, m) } accepted = append(accepted, m) @@ -251,11 +281,13 @@ func (s *usageStore) Evict() map[string]int { evicted := make(map[string]int) s.forEachLock(func(i int) { for tenant, partitions := range s.stripes[i] { - for partition, streams := range partitions { - for streamHash, stream := range streams { - if stream.lastSeenAt < cutoff { - delete(s.stripes[i][tenant][partition], streamHash) - evicted[tenant]++ + for partition, policies := range partitions { + for policy, streams := range policies { + for streamHash, stream := range streams { + if stream.lastSeenAt < cutoff { + delete(s.stripes[i][tenant][partition][policy], streamHash) + evicted[tenant]++ + } } } } @@ -295,11 +327,13 @@ func (s *usageStore) Collect(metrics chan<- prometheus.Metric) { // streams for each tenants. s.forEachRLock(func(i int) { for tenant, partitions := range s.stripes[i] { - for _, streams := range partitions { - for _, stream := range streams { - total[tenant]++ - if stream.lastSeenAt >= cutoff { - active[tenant]++ + for _, policies := range partitions { + for _, streams := range policies { + for _, stream := range streams { + total[tenant]++ + if stream.lastSeenAt >= cutoff { + active[tenant]++ + } } } } @@ -328,24 +362,32 @@ func (s *usageStore) get(i int, tenant string, partition int32, streamHash uint6 if !ok { return } - streams, ok := partitions[partition] + policies, ok := partitions[partition] if !ok { return } - stream, ok = streams[streamHash] - return + // Search across all policies for this stream + // Note that in most cases, there will only be one item on this list (empty policy, ""). + // and at most just a few items. One idea to speed this up would be to keep a cache of tenant-partition-streamHash -> policy for those streams that have a matching policy. + for _, streams := range policies { + if stream, ok = streams[streamHash]; ok { + return + } + } + return streamUsage{}, false } -func (s *usageStore) update(i int, tenant string, partition int32, metadata *proto.StreamMetadata, seenAt time.Time) { - s.checkInitMap(i, tenant, partition) - streamHash, _ := metadata.StreamHash, metadata.TotalSize +func (s *usageStore) update(i int, tenant string, partition int32, policyBucket string, metadata *proto.StreamMetadata, seenAt time.Time) { + s.checkInitMap(i, tenant, partition, policyBucket) + streamHash := metadata.StreamHash // Get the stats for the stream. - stream, ok := s.stripes[i][tenant][partition][streamHash] + stream, ok := s.stripes[i][tenant][partition][policyBucket][streamHash] cutoff := seenAt.Add(-s.activeWindow).UnixNano() // If the stream does not exist, or it has expired, reset it. if !ok || stream.lastSeenAt < cutoff { stream.hash = streamHash stream.totalSize = 0 + stream.policy = metadata.IngestionPolicy // stream.rateBuckets = make([]rateBucket, s.numBuckets) } seenAtUnixNano := seenAt.UnixNano() @@ -370,13 +412,13 @@ func (s *usageStore) update(i int, tenant string, partition int32, metadata *pro // } // bucket.size += totalSize // stream.rateBuckets[bucketIdx] = bucket - s.stripes[i][tenant][partition][streamHash] = stream + s.stripes[i][tenant][partition][policyBucket][streamHash] = stream } -func (s *usageStore) setLastProducedAt(i int, tenant string, partition int32, streamHash uint64, now time.Time) { - stream := s.stripes[i][tenant][partition][streamHash] +func (s *usageStore) setLastProducedAt(i int, tenant string, partition int32, streamHash uint64, policy string, now time.Time) { + stream := s.stripes[i][tenant][partition][policy][streamHash] stream.lastProducedAt = now.UnixNano() - s.stripes[i][tenant][partition][streamHash] = stream + s.stripes[i][tenant][partition][policy][streamHash] = stream } // forEachRLock executes fn with a shared lock for each stripe. @@ -456,12 +498,15 @@ func (s *usageStore) newRateWindowFunc(now time.Time) func(t int64) bool { // checkInitMap checks if the maps for the tenant and partition are // initialized, and if not, initializes them. It must not be called without // the stripe lock for i. -func (s *usageStore) checkInitMap(i int, tenant string, partition int32) { +func (s *usageStore) checkInitMap(i int, tenant string, partition int32, policy string) { if _, ok := s.stripes[i][tenant]; !ok { s.stripes[i][tenant] = make(tenantUsage) } if _, ok := s.stripes[i][tenant][partition]; !ok { - s.stripes[i][tenant][partition] = make(map[uint64]streamUsage) + s.stripes[i][tenant][partition] = make(map[string]map[uint64]streamUsage) + } + if _, ok := s.stripes[i][tenant][partition][policy]; !ok { + s.stripes[i][tenant][partition][policy] = make(map[uint64]streamUsage) } } @@ -476,8 +521,8 @@ func (s *usageStore) getForTests(tenant string, streamHash uint64) (streamUsage, func (s *usageStore) setForTests(tenant string, stream streamUsage) { partition := s.getPartitionForHash(stream.hash) s.withLock(tenant, func(i int) { - s.checkInitMap(i, tenant, partition) - s.stripes[i][tenant][partition][stream.hash] = stream + s.checkInitMap(i, tenant, partition, stream.policy) + s.stripes[i][tenant][partition][stream.policy][stream.hash] = stream }) } diff --git a/pkg/limits/store_bench_test.go b/pkg/limits/store_bench_test.go index 6970d4687cf8d..c02a992a36ed3 100644 --- a/pkg/limits/store_bench_test.go +++ b/pkg/limits/store_bench_test.go @@ -45,7 +45,7 @@ func BenchmarkUsageStore_Store(b *testing.B) { } for _, bm := range benchmarks { - s, err := newUsageStore(DefaultActiveWindow, DefaultRateWindow, DefaultBucketSize, bm.numPartitions, prometheus.NewRegistry()) + s, err := newUsageStore(DefaultActiveWindow, DefaultRateWindow, DefaultBucketSize, bm.numPartitions, &mockLimits{}, prometheus.NewRegistry()) require.NoError(b, err) b.Run(fmt.Sprintf("%s_create", bm.name), func(b *testing.B) { now := time.Now() @@ -62,7 +62,7 @@ func BenchmarkUsageStore_Store(b *testing.B) { TotalSize: 1500, }} - _, _, _, err := s.UpdateCond(tenant, metadata, updateTime, nil) + _, _, _, err := s.UpdateCond(tenant, metadata, updateTime) require.NoError(b, err) } }) @@ -82,12 +82,12 @@ func BenchmarkUsageStore_Store(b *testing.B) { TotalSize: 1500, }} - _, _, _, err := s.UpdateCond(tenant, metadata, updateTime, nil) + _, _, _, err := s.UpdateCond(tenant, metadata, updateTime) require.NoError(b, err) } }) - s, err = newUsageStore(DefaultActiveWindow, DefaultRateWindow, DefaultBucketSize, bm.numPartitions, prometheus.NewRegistry()) + s, err = newUsageStore(DefaultActiveWindow, DefaultRateWindow, DefaultBucketSize, bm.numPartitions, &mockLimits{}, prometheus.NewRegistry()) require.NoError(b, err) // Run parallel benchmark @@ -106,7 +106,7 @@ func BenchmarkUsageStore_Store(b *testing.B) { TotalSize: 1500, }} - _, _, _, err := s.UpdateCond(tenant, metadata, updateTime, nil) + _, _, _, err := s.UpdateCond(tenant, metadata, updateTime) require.NoError(b, err) i++ } @@ -128,7 +128,7 @@ func BenchmarkUsageStore_Store(b *testing.B) { TotalSize: 1500, }} - _, _, _, err := s.UpdateCond(tenant, metadata, updateTime, nil) + _, _, _, err := s.UpdateCond(tenant, metadata, updateTime) require.NoError(b, err) i++ } diff --git a/pkg/limits/store_test.go b/pkg/limits/store_test.go index 85b7492b34ca5..d6cc0bab58055 100644 --- a/pkg/limits/store_test.go +++ b/pkg/limits/store_test.go @@ -14,7 +14,7 @@ import ( func TestUsageStore_Iter(t *testing.T) { t.Run("iterates all streams", func(t *testing.T) { - s, err := newUsageStore(15*time.Minute, 5*time.Minute, time.Minute, 10, prometheus.NewRegistry()) + s, err := newUsageStore(15*time.Minute, 5*time.Minute, time.Minute, 10, &mockLimits{}, prometheus.NewRegistry()) require.NoError(t, err) clock := quartz.NewMock(t) s.clock = clock @@ -39,7 +39,7 @@ func TestUsageStore_Iter(t *testing.T) { }) t.Run("does not iterate expired streams", func(t *testing.T) { - s, err := newUsageStore(15*time.Minute, 5*time.Minute, time.Minute, 1, prometheus.NewRegistry()) + s, err := newUsageStore(15*time.Minute, 5*time.Minute, time.Minute, 1, &mockLimits{}, prometheus.NewRegistry()) require.NoError(t, err) clock := quartz.NewMock(t) s.clock = clock @@ -61,7 +61,7 @@ func TestUsageStore_Iter(t *testing.T) { func TestUsageStore_IterTenant(t *testing.T) { t.Run("iterates all streams for tenant", func(t *testing.T) { - s, err := newUsageStore(15*time.Minute, 5*time.Minute, time.Minute, 10, prometheus.NewRegistry()) + s, err := newUsageStore(15*time.Minute, 5*time.Minute, time.Minute, 10, &mockLimits{}, prometheus.NewRegistry()) require.NoError(t, err) clock := quartz.NewMock(t) s.clock = clock @@ -96,7 +96,7 @@ func TestUsageStore_IterTenant(t *testing.T) { }) t.Run("does not iterate expired streams", func(t *testing.T) { - s, err := newUsageStore(15*time.Minute, 5*time.Minute, time.Minute, 1, prometheus.NewRegistry()) + s, err := newUsageStore(15*time.Minute, 5*time.Minute, time.Minute, 1, &mockLimits{}, prometheus.NewRegistry()) require.NoError(t, err) clock := quartz.NewMock(t) s.clock = clock @@ -117,7 +117,7 @@ func TestUsageStore_IterTenant(t *testing.T) { } func TestUsageStore_Update(t *testing.T) { - s, err := newUsageStore(15*time.Minute, 5*time.Minute, time.Minute, 1, prometheus.NewRegistry()) + s, err := newUsageStore(15*time.Minute, 5*time.Minute, time.Minute, 1, &mockLimits{}, prometheus.NewRegistry()) require.NoError(t, err) clock := quartz.NewMock(t) s.clock = clock @@ -137,7 +137,7 @@ func TestUsageStore_Update(t *testing.T) { // buckets are implemented as a circular list, when we reach the end of // list the next bucket is the start of the list. // func TestUsageStore_UpdateRateBuckets(t *testing.T) { -// s, err := newUsageStore(15*time.Minute, 5*time.Minute, time.Minute, 1, prometheus.NewRegistry()) +// s, err := newUsageStore(15*time.Minute, 5*time.Minute, time.Minute, 1, &mockLimits{}, prometheus.NewRegistry()) // require.NoError(t, err) // clock := quartz.NewMock(t) // s.clock = clock @@ -197,7 +197,7 @@ func TestUsageStore_Update(t *testing.T) { // This test asserts that rate buckets are not updated while the TODOs are // in place. func TestUsageStore_RateBucketsAreNotUsed(t *testing.T) { - s, err := newUsageStore(15*time.Minute, 5*time.Minute, time.Minute, 1, prometheus.NewRegistry()) + s, err := newUsageStore(15*time.Minute, 5*time.Minute, time.Minute, 1, &mockLimits{}, prometheus.NewRegistry()) require.NoError(t, err) clock := quartz.NewMock(t) s.clock = clock @@ -337,15 +337,14 @@ func TestUsageStore_UpdateCond(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - s, err := newUsageStore(DefaultActiveWindow, DefaultRateWindow, DefaultBucketSize, test.numPartitions, prometheus.NewRegistry()) + s, err := newUsageStore(DefaultActiveWindow, DefaultRateWindow, DefaultBucketSize, test.numPartitions, &mockLimits{MaxGlobalStreams: test.maxGlobalStreams}, prometheus.NewRegistry()) require.NoError(t, err) clock := quartz.NewMock(t) s.clock = clock for _, stream := range test.seed { require.NoError(t, s.Update("tenant", stream, clock.Now())) } - limits := mockLimits{MaxGlobalStreams: test.maxGlobalStreams} - toProduce, accepted, rejected, err := s.UpdateCond("tenant", test.streams, clock.Now(), &limits) + toProduce, accepted, rejected, err := s.UpdateCond("tenant", test.streams, clock.Now()) require.NoError(t, err) require.ElementsMatch(t, test.expectedToProduce, toProduce) require.ElementsMatch(t, test.expectedAccepted, accepted) @@ -355,16 +354,15 @@ func TestUsageStore_UpdateCond(t *testing.T) { } func TestUsageStore_UpdateCond_ToProduce(t *testing.T) { - s, err := newUsageStore(15*time.Minute, 5*time.Minute, time.Minute, 1, prometheus.NewRegistry()) + s, err := newUsageStore(15*time.Minute, 5*time.Minute, time.Minute, 1, &mockLimits{}, prometheus.NewRegistry()) require.NoError(t, err) clock := quartz.NewMock(t) s.clock = clock - limits := mockLimits{MaxGlobalStreams: 10} metadata1 := []*proto.StreamMetadata{{ StreamHash: 0x1, TotalSize: 100, }} - toProduce, accepted, rejected, err := s.UpdateCond("tenant", metadata1, clock.Now(), &limits) + toProduce, accepted, rejected, err := s.UpdateCond("tenant", metadata1, clock.Now()) require.NoError(t, err) require.Empty(t, rejected) require.Len(t, accepted, 1) @@ -372,7 +370,7 @@ func TestUsageStore_UpdateCond_ToProduce(t *testing.T) { // Another update for the same stream in the same minute should not produce // a new record. clock.Advance(time.Second) - toProduce, accepted, rejected, err = s.UpdateCond("tenant", metadata1, clock.Now(), &limits) + toProduce, accepted, rejected, err = s.UpdateCond("tenant", metadata1, clock.Now()) require.NoError(t, err) require.Empty(t, rejected) require.Empty(t, toProduce) @@ -382,14 +380,14 @@ func TestUsageStore_UpdateCond_ToProduce(t *testing.T) { StreamHash: 0x2, TotalSize: 100, }} - toProduce, accepted, rejected, err = s.UpdateCond("tenant", metadata2, clock.Now(), &limits) + toProduce, accepted, rejected, err = s.UpdateCond("tenant", metadata2, clock.Now()) require.NoError(t, err) require.Empty(t, rejected) require.Len(t, accepted, 1) require.Equal(t, metadata2, toProduce) // Move the clock forward and metadata1 should be produced again. clock.Advance(time.Minute) - toProduce, accepted, rejected, err = s.UpdateCond("tenant", metadata1, clock.Now(), &limits) + toProduce, accepted, rejected, err = s.UpdateCond("tenant", metadata1, clock.Now()) require.NoError(t, err) require.Empty(t, rejected) require.Len(t, accepted, 1) @@ -397,7 +395,7 @@ func TestUsageStore_UpdateCond_ToProduce(t *testing.T) { } func TestUsageStore_Evict(t *testing.T) { - s, err := newUsageStore(15*time.Minute, 5*time.Minute, time.Minute, 1, prometheus.NewRegistry()) + s, err := newUsageStore(15*time.Minute, 5*time.Minute, time.Minute, 1, &mockLimits{}, prometheus.NewRegistry()) require.NoError(t, err) clock := quartz.NewMock(t) s.clock = clock @@ -430,7 +428,7 @@ func TestUsageStore_Evict(t *testing.T) { func TestUsageStore_EvictPartitions(t *testing.T) { // Create a store with 10 partitions. - s, err := newUsageStore(DefaultActiveWindow, DefaultRateWindow, DefaultBucketSize, 10, prometheus.NewRegistry()) + s, err := newUsageStore(DefaultActiveWindow, DefaultRateWindow, DefaultBucketSize, 10, &mockLimits{}, prometheus.NewRegistry()) require.NoError(t, err) clock := quartz.NewMock(t) s.clock = clock @@ -450,6 +448,202 @@ func TestUsageStore_EvictPartitions(t *testing.T) { require.ElementsMatch(t, expected, actual) } +func TestUsageStore_PolicyBasedStreamLimits(t *testing.T) { + t.Run("policy-specific stream limits override default limits", func(t *testing.T) { + // Create a mockLimits with policy-specific overrides + limits := &mockLimits{ + MaxGlobalStreams: 10, // Default limit: 10 streams + } + + // Add policy-specific limits + policyLimits := &mockLimitsWithPolicy{ + mockLimits: *limits, + policyLimits: map[string]int{ + "high-priority": 5, // Policy-specific limit: 5 streams + "low-priority": 3, // Policy-specific limit: 3 streams + }, + } + + s, err := newUsageStore(15*time.Minute, 5*time.Minute, time.Minute, 1, policyLimits, prometheus.NewRegistry()) + require.NoError(t, err) + clock := quartz.NewMock(t) + s.clock = clock + + // Test 1: Default streams (no policy) should use default limit (10) + defaultStreams := []*proto.StreamMetadata{ + {StreamHash: 0x1, TotalSize: 100}, // Should be accepted + {StreamHash: 0x2, TotalSize: 100}, // Should be accepted + {StreamHash: 0x3, TotalSize: 100}, // Should be accepted + {StreamHash: 0x4, TotalSize: 100}, // Should be accepted + {StreamHash: 0x5, TotalSize: 100}, // Should be accepted + {StreamHash: 0x6, TotalSize: 100}, // Should be accepted + {StreamHash: 0x7, TotalSize: 100}, // Should be accepted + {StreamHash: 0x8, TotalSize: 100}, // Should be accepted + {StreamHash: 0x9, TotalSize: 100}, // Should be accepted + {StreamHash: 0xA, TotalSize: 100}, // Should be accepted (10th stream) + {StreamHash: 0xB, TotalSize: 100}, // Should be rejected (11th stream) + } + + toProduce, accepted, rejected, err := s.UpdateCond("tenant", defaultStreams, clock.Now()) + require.NoError(t, err) + require.Len(t, accepted, 10) // First 10 streams accepted + require.Len(t, rejected, 1) // 11th stream rejected + require.Len(t, toProduce, 10) // First 10 streams should be produced + }) + + t.Run("streams with different policies tracked separately", func(t *testing.T) { + policyLimits := &mockLimitsWithPolicy{ + mockLimits: mockLimits{MaxGlobalStreams: 10}, + policyLimits: map[string]int{ + "high-priority": 3, // 3 streams allowed + "low-priority": 2, // 2 streams allowed + }, + } + + s, err := newUsageStore(15*time.Minute, 5*time.Minute, time.Minute, 1, policyLimits, prometheus.NewRegistry()) + require.NoError(t, err) + clock := quartz.NewMock(t) + s.clock = clock + + // Test high-priority policy streams + highPriorityStreams := []*proto.StreamMetadata{ + {StreamHash: 0x1, TotalSize: 100, IngestionPolicy: "high-priority"}, // Accepted + {StreamHash: 0x2, TotalSize: 100, IngestionPolicy: "high-priority"}, // Accepted + {StreamHash: 0x3, TotalSize: 100, IngestionPolicy: "high-priority"}, // Accepted + {StreamHash: 0x4, TotalSize: 100, IngestionPolicy: "high-priority"}, // Rejected (4th stream) + } + + toProduce, accepted, rejected, err := s.UpdateCond("tenant", highPriorityStreams, clock.Now()) + require.NoError(t, err) + require.Len(t, accepted, 3) // First 3 streams accepted + require.Len(t, rejected, 1) // 4th stream rejected + require.Len(t, toProduce, 3) + + // Test low-priority policy streams (should be tracked separately) + lowPriorityStreams := []*proto.StreamMetadata{ + {StreamHash: 0x5, TotalSize: 100, IngestionPolicy: "low-priority"}, // Accepted + {StreamHash: 0x6, TotalSize: 100, IngestionPolicy: "low-priority"}, // Accepted + {StreamHash: 0x7, TotalSize: 100, IngestionPolicy: "low-priority"}, // Rejected (3rd stream) + } + + toProduce, accepted, rejected, err = s.UpdateCond("tenant", lowPriorityStreams, clock.Now()) + require.NoError(t, err) + require.Len(t, accepted, 2) // First 2 streams accepted + require.Len(t, rejected, 1) // 3rd stream rejected + require.Len(t, toProduce, 2) + }) + + t.Run("default streams tracked separately from policy streams", func(t *testing.T) { + policyLimits := &mockLimitsWithPolicy{ + mockLimits: mockLimits{MaxGlobalStreams: 5}, // Default limit: 5 streams + policyLimits: map[string]int{ + "special-policy": 3, // Policy limit: 3 streams + }, + } + + s, err := newUsageStore(15*time.Minute, 5*time.Minute, time.Minute, 1, policyLimits, prometheus.NewRegistry()) + require.NoError(t, err) + clock := quartz.NewMock(t) + s.clock = clock + + // First, add some default streams (no policy) + defaultStreams := []*proto.StreamMetadata{ + {StreamHash: 0x1, TotalSize: 100}, // Default stream - accepted + {StreamHash: 0x2, TotalSize: 100}, // Default stream - accepted + {StreamHash: 0x3, TotalSize: 100}, // Default stream - accepted + {StreamHash: 0x4, TotalSize: 100}, // Default stream - accepted + {StreamHash: 0x5, TotalSize: 100}, // Default stream - accepted + {StreamHash: 0x6, TotalSize: 100}, // Default stream - rejected (6th stream) + } + + toProduce, accepted, rejected, err := s.UpdateCond("tenant", defaultStreams, clock.Now()) + require.NoError(t, err) + require.Len(t, accepted, 5) // First 5 default streams accepted + require.Len(t, rejected, 1) // 6th default stream rejected + require.Len(t, toProduce, 5) + + // Now add policy streams (should be tracked separately and not affect default stream count) + policyStreams := []*proto.StreamMetadata{ + {StreamHash: 0x7, TotalSize: 100, IngestionPolicy: "special-policy"}, // Policy stream - accepted + {StreamHash: 0x8, TotalSize: 100, IngestionPolicy: "special-policy"}, // Policy stream - accepted + {StreamHash: 0x9, TotalSize: 100, IngestionPolicy: "special-policy"}, // Policy stream - accepted + {StreamHash: 0xA, TotalSize: 100, IngestionPolicy: "special-policy"}, // Policy stream - rejected (4th policy stream) + } + + toProduce, accepted, rejected, err = s.UpdateCond("tenant", policyStreams, clock.Now()) + require.NoError(t, err) + require.Len(t, accepted, 3) // First 3 policy streams accepted + require.Len(t, rejected, 1) // 4th policy stream rejected + require.Len(t, toProduce, 3) + + // Verify that we can still add more default streams (they're tracked separately) + moreDefaultStreams := []*proto.StreamMetadata{ + {StreamHash: 0xB, TotalSize: 100}, // This should still be rejected because we already have 5 default streams + } + + toProduce, accepted, rejected, err = s.UpdateCond("tenant", moreDefaultStreams, clock.Now()) + require.NoError(t, err) + require.Len(t, accepted, 0) // No more default streams can be accepted + require.Len(t, rejected, 1) // This default stream is rejected + require.Len(t, toProduce, 0) + }) + + t.Run("policies without custom limits use default bucket", func(t *testing.T) { + policyLimits := &mockLimitsWithPolicy{ + mockLimits: mockLimits{MaxGlobalStreams: 4}, // Default limit: 4 streams + policyLimits: map[string]int{ + "custom-policy": 2, // Only this policy has a custom limit + }, + } + + s, err := newUsageStore(15*time.Minute, 5*time.Minute, time.Minute, 1, policyLimits, prometheus.NewRegistry()) + require.NoError(t, err) + clock := quartz.NewMock(t) + s.clock = clock + + // Streams with "custom-policy" should use the custom limit (2 streams) + customPolicyStreams := []*proto.StreamMetadata{ + {StreamHash: 0x1, TotalSize: 100, IngestionPolicy: "custom-policy"}, // Accepted + {StreamHash: 0x2, TotalSize: 100, IngestionPolicy: "custom-policy"}, // Accepted + {StreamHash: 0x3, TotalSize: 100, IngestionPolicy: "custom-policy"}, // Rejected (3rd stream) + } + + toProduce, accepted, rejected, err := s.UpdateCond("tenant", customPolicyStreams, clock.Now()) + require.NoError(t, err) + require.Len(t, accepted, 2) // First 2 custom policy streams accepted + require.Len(t, rejected, 1) // 3rd custom policy stream rejected + require.Len(t, toProduce, 2) + + // Streams with "other-policy" (no custom limit) should use default bucket and count against default limit + otherPolicyStreams := []*proto.StreamMetadata{ + {StreamHash: 0x4, TotalSize: 100, IngestionPolicy: "other-policy"}, // Should go to default bucket + {StreamHash: 0x5, TotalSize: 100, IngestionPolicy: "other-policy"}, // Should go to default bucket + {StreamHash: 0x6, TotalSize: 100, IngestionPolicy: "other-policy"}, // Should go to default bucket + {StreamHash: 0x7, TotalSize: 100, IngestionPolicy: "other-policy"}, // Should go to default bucket + {StreamHash: 0x8, TotalSize: 100, IngestionPolicy: "other-policy"}, // Should be rejected (5th stream in default bucket) + } + + toProduce, accepted, rejected, err = s.UpdateCond("tenant", otherPolicyStreams, clock.Now()) + require.NoError(t, err) + require.Len(t, accepted, 4) // First 4 other policy streams accepted (using default bucket) + require.Len(t, rejected, 1) // 5th other policy stream rejected + require.Len(t, toProduce, 4) + }) +} + +// mockLimitsWithPolicy extends mockLimits to support policy-specific limits +type mockLimitsWithPolicy struct { + mockLimits + policyLimits map[string]int +} + +func (m *mockLimitsWithPolicy) PolicyMaxGlobalStreamsPerUser(_, policy string) int { + if limit, exists := m.policyLimits[policy]; exists { + return limit + } + return 0 // No custom limit for this policy +} + func newRateBuckets(rateWindow, bucketSize time.Duration) []rateBucket { return make([]rateBucket, int(rateWindow/bucketSize)) }