Skip to content

Commit b4e6c59

Browse files
Compactor: add per tenant compaction delete enabled flag (grafana#6410)
* Add per tenant compaction delete enabled flag Signed-off-by: Michel Hollands <[email protected]> * Remove changes in wrong place Signed-off-by: Michel Hollands <[email protected]> * Add compactor deletion enabled field Signed-off-by: Michel Hollands <[email protected]> * Use limit in compactor Signed-off-by: Michel Hollands <[email protected]> * Use http middleware and add test Signed-off-by: Michel Hollands <[email protected]> * Fix lint issue Signed-off-by: Michel Hollands <[email protected]> * Add changelog Signed-off-by: Michel Hollands <[email protected]> * Revert to default setting if no override Signed-off-by: Michel Hollands <[email protected]> * Add default value command line option Signed-off-by: Michel Hollands <[email protected]> * Update the docs Signed-off-by: Michel Hollands <[email protected]> * Enable access to deletion API for integration test Signed-off-by: Michel Hollands <[email protected]> * Rename flag to allow_deletes Signed-off-by: Michel Hollands <[email protected]> * Update per review comments Signed-off-by: Michel Hollands <[email protected]>
1 parent 8ee0d62 commit b4e6c59

File tree

9 files changed

+211
-10
lines changed

9 files changed

+211
-10
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
## Main
22

3+
* [6410](https://github.com/grafana/loki/pull/6410) **MichelHollands**: Add support for per tenant delete API access enabling.
34
* [6372](https://github.com/grafana/loki/pull/6372) **splitice**: Add support for numbers in JSON fields.
45
* [6105](https://github.com/grafana/loki/pull/6105) **rutgerke** Export metrics for the Promtail journal target.
56
* [6099](https://github.com/grafana/loki/pull/6099) **cstyan**: Drop lines with malformed JSON in Promtail JSON pipeline stage.

docs/sources/configuration/_index.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2372,6 +2372,10 @@ The `limits_config` block configures global and per-tenant limits in Loki.
23722372
# This also determines how cache keys are chosen when result caching is enabled
23732373
# CLI flag: -querier.split-queries-by-interval
23742374
[split_queries_by_interval: <duration> | default = 30m]
2375+
2376+
# When true, access to the deletion API is enabled.
2377+
# CLI flag: -compactor.allow_deletes
2378+
[allow_deletes: <boolean> | default = false]
23752379
```
23762380
23772381
## sigv4_config

docs/sources/operations/storage/logs-deletion.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,10 @@ With `whole-stream-deletion`, all the log entries matching the query given in th
2424
With `filter-only`, log lines matching the query in the delete request are filtered out when querying Loki. They are not removed from the on-disk chunks.
2525
With `filter-and-delete`, log lines matching the query in the delete request are filtered out when querying Loki, and they are also removed from the on-disk chunks.
2626

27-
2827
A delete request may be canceled within a configurable cancellation period. Set the `delete_request_cancel_period` in the Compactor's YAML configuration or on the command line when invoking Loki. Its default value is 24h.
2928

29+
Access to the deletion API can be enabled per tenant via the `allow_deletes` setting.
30+
3031
## Compactor endpoints
3132

3233
The Compactor exposes endpoints to allow for the deletion of log entries from specified streams.

integration/loki_micro_services_delete_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
2626
"-boltdb.shipper.compactor.deletion-mode=filter-and-delete",
2727
// By default a minute is added to the delete request start time. This compensates for that.
2828
"-boltdb.shipper.compactor.delete-request-cancel-period=-60s",
29+
"-compactor.allow-deletes=true",
2930
)
3031
tIndexGateway = clu.AddComponent(
3132
"index-gateway",

pkg/loki/modules.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -873,10 +873,10 @@ func (t *Loki) initCompactor() (services.Service, error) {
873873
if t.Cfg.CompactorConfig.RetentionEnabled {
874874
switch t.compactor.DeleteMode() {
875875
case deletion.WholeStreamDeletion, deletion.FilterOnly, deletion.FilterAndDelete:
876-
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.AddDeleteRequestHandler)))
877-
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler)))
878-
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("DELETE").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler)))
879-
t.Server.HTTP.Path("/loki/api/v1/cache/generation_numbers").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.compactor.DeleteRequestsHandler.GetCacheGenerationNumberHandler)))
876+
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("PUT", "POST").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.AddDeleteRequestHandler()))
877+
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.GetAllDeleteRequestsHandler()))
878+
t.Server.HTTP.Path("/loki/api/v1/delete").Methods("DELETE").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.CancelDeleteRequestHandler()))
879+
t.Server.HTTP.Path("/loki/api/v1/cache/generation_numbers").Methods("GET").Handler(t.HTTPAuthMiddleware.Wrap(t.compactor.DeleteRequestsHandler.GetCacheGenerationNumberHandler()))
880880
default:
881881
break
882882
}

pkg/storage/stores/shipper/compactor/compactor.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,7 @@ func (c *Compactor) initDeletes(r prometheus.Registerer, limits retention.Limits
265265
c.DeleteRequestsHandler = deletion.NewDeleteRequestHandler(
266266
c.deleteRequestsStore,
267267
c.cfg.DeleteRequestCancelPeriod,
268+
limits,
268269
r,
269270
)
270271

pkg/storage/stores/shipper/compactor/deletion/request_handler.go

Lines changed: 57 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,30 +12,40 @@ import (
1212

1313
"github.com/grafana/dskit/tenant"
1414

15+
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention"
1516
"github.com/grafana/loki/pkg/util"
1617
util_log "github.com/grafana/loki/pkg/util/log"
1718
)
1819

20+
const deletionNotAvailableMsg = "deletion is not available for this tenant"
21+
1922
// DeleteRequestHandler provides handlers for delete requests
2023
type DeleteRequestHandler struct {
2124
deleteRequestsStore DeleteRequestsStore
2225
metrics *deleteRequestHandlerMetrics
26+
limits retention.Limits
2327
deleteRequestCancelPeriod time.Duration
2428
}
2529

2630
// NewDeleteRequestHandler creates a DeleteRequestHandler
27-
func NewDeleteRequestHandler(deleteStore DeleteRequestsStore, deleteRequestCancelPeriod time.Duration, registerer prometheus.Registerer) *DeleteRequestHandler {
31+
func NewDeleteRequestHandler(deleteStore DeleteRequestsStore, deleteRequestCancelPeriod time.Duration, limits retention.Limits, registerer prometheus.Registerer) *DeleteRequestHandler {
2832
deleteMgr := DeleteRequestHandler{
2933
deleteRequestsStore: deleteStore,
3034
deleteRequestCancelPeriod: deleteRequestCancelPeriod,
35+
limits: limits,
3136
metrics: newDeleteRequestHandlerMetrics(registerer),
3237
}
3338

3439
return &deleteMgr
3540
}
3641

3742
// AddDeleteRequestHandler handles addition of a new delete request
38-
func (dm *DeleteRequestHandler) AddDeleteRequestHandler(w http.ResponseWriter, r *http.Request) {
43+
func (dm *DeleteRequestHandler) AddDeleteRequestHandler() http.Handler {
44+
return dm.deletionMiddleware(http.HandlerFunc(dm.addDeleteRequestHandler))
45+
}
46+
47+
// AddDeleteRequestHandler handles addition of a new delete request
48+
func (dm *DeleteRequestHandler) addDeleteRequestHandler(w http.ResponseWriter, r *http.Request) {
3949
ctx := r.Context()
4050
userID, err := tenant.TenantID(ctx)
4151
if err != nil {
@@ -98,7 +108,12 @@ func (dm *DeleteRequestHandler) AddDeleteRequestHandler(w http.ResponseWriter, r
98108
}
99109

100110
// GetAllDeleteRequestsHandler handles get all delete requests
101-
func (dm *DeleteRequestHandler) GetAllDeleteRequestsHandler(w http.ResponseWriter, r *http.Request) {
111+
func (dm *DeleteRequestHandler) GetAllDeleteRequestsHandler() http.Handler {
112+
return dm.deletionMiddleware(http.HandlerFunc(dm.getAllDeleteRequestsHandler))
113+
}
114+
115+
// GetAllDeleteRequestsHandler handles get all delete requests
116+
func (dm *DeleteRequestHandler) getAllDeleteRequestsHandler(w http.ResponseWriter, r *http.Request) {
102117
ctx := r.Context()
103118
userID, err := tenant.TenantID(ctx)
104119
if err != nil {
@@ -120,7 +135,12 @@ func (dm *DeleteRequestHandler) GetAllDeleteRequestsHandler(w http.ResponseWrite
120135
}
121136

122137
// CancelDeleteRequestHandler handles delete request cancellation
123-
func (dm *DeleteRequestHandler) CancelDeleteRequestHandler(w http.ResponseWriter, r *http.Request) {
138+
func (dm *DeleteRequestHandler) CancelDeleteRequestHandler() http.Handler {
139+
return dm.deletionMiddleware(http.HandlerFunc(dm.cancelDeleteRequestHandler))
140+
}
141+
142+
// CancelDeleteRequestHandler handles delete request cancellation
143+
func (dm *DeleteRequestHandler) cancelDeleteRequestHandler(w http.ResponseWriter, r *http.Request) {
124144
ctx := r.Context()
125145
userID, err := tenant.TenantID(ctx)
126146
if err != nil {
@@ -163,7 +183,12 @@ func (dm *DeleteRequestHandler) CancelDeleteRequestHandler(w http.ResponseWriter
163183
}
164184

165185
// GetCacheGenerationNumberHandler handles requests for a user's cache generation number
166-
func (dm *DeleteRequestHandler) GetCacheGenerationNumberHandler(w http.ResponseWriter, r *http.Request) {
186+
func (dm *DeleteRequestHandler) GetCacheGenerationNumberHandler() http.Handler {
187+
return dm.deletionMiddleware(http.HandlerFunc(dm.getCacheGenerationNumberHandler))
188+
}
189+
190+
// GetCacheGenerationNumberHandler handles requests for a user's cache generation number
191+
func (dm *DeleteRequestHandler) getCacheGenerationNumberHandler(w http.ResponseWriter, r *http.Request) {
167192
ctx := r.Context()
168193
userID, err := tenant.TenantID(ctx)
169194
if err != nil {
@@ -183,3 +208,30 @@ func (dm *DeleteRequestHandler) GetCacheGenerationNumberHandler(w http.ResponseW
183208
http.Error(w, fmt.Sprintf("Error marshalling response: %v", err), http.StatusInternalServerError)
184209
}
185210
}
211+
212+
func (dm *DeleteRequestHandler) deletionMiddleware(next http.Handler) http.Handler {
213+
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
214+
ctx := r.Context()
215+
userID, err := tenant.TenantID(ctx)
216+
if err != nil {
217+
http.Error(w, err.Error(), http.StatusBadRequest)
218+
return
219+
}
220+
221+
allLimits := dm.limits.AllByUserID()
222+
userLimits, ok := allLimits[userID]
223+
if ok {
224+
if !userLimits.CompactorDeletionEnabled {
225+
http.Error(w, deletionNotAvailableMsg, http.StatusForbidden)
226+
return
227+
}
228+
} else {
229+
if !dm.limits.DefaultLimits().CompactorDeletionEnabled {
230+
http.Error(w, deletionNotAvailableMsg, http.StatusForbidden)
231+
return
232+
}
233+
}
234+
235+
next.ServeHTTP(w, r)
236+
})
237+
}
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
package deletion
2+
3+
import (
4+
"net/http"
5+
"net/http/httptest"
6+
"path/filepath"
7+
"testing"
8+
"time"
9+
10+
"github.com/prometheus/common/model"
11+
"github.com/stretchr/testify/require"
12+
"github.com/weaveworks/common/user"
13+
14+
"github.com/grafana/loki/pkg/storage/chunk/client/local"
15+
"github.com/grafana/loki/pkg/storage/stores/shipper/storage"
16+
"github.com/grafana/loki/pkg/validation"
17+
)
18+
19+
type retentionLimit struct {
20+
compactorDeletionEnabled bool
21+
retentionPeriod time.Duration
22+
streamRetention []validation.StreamRetention
23+
}
24+
25+
func (r retentionLimit) convertToValidationLimit() *validation.Limits {
26+
return &validation.Limits{
27+
CompactorDeletionEnabled: r.compactorDeletionEnabled,
28+
RetentionPeriod: model.Duration(r.retentionPeriod),
29+
StreamRetention: r.streamRetention,
30+
}
31+
}
32+
33+
type fakeLimits struct {
34+
defaultLimit retentionLimit
35+
perTenant map[string]retentionLimit
36+
}
37+
38+
func (f fakeLimits) RetentionPeriod(userID string) time.Duration {
39+
return f.perTenant[userID].retentionPeriod
40+
}
41+
42+
func (f fakeLimits) StreamRetention(userID string) []validation.StreamRetention {
43+
return f.perTenant[userID].streamRetention
44+
}
45+
46+
func (f fakeLimits) CompactorDeletionEnabled(userID string) bool {
47+
return f.perTenant[userID].compactorDeletionEnabled
48+
}
49+
50+
func (f fakeLimits) DefaultLimits() *validation.Limits {
51+
return f.defaultLimit.convertToValidationLimit()
52+
}
53+
54+
func (f fakeLimits) AllByUserID() map[string]*validation.Limits {
55+
res := make(map[string]*validation.Limits)
56+
for userID, ret := range f.perTenant {
57+
res[userID] = ret.convertToValidationLimit()
58+
}
59+
return res
60+
}
61+
62+
func TestDeleteRequestHandlerDeletionMiddleware(t *testing.T) {
63+
// build the store
64+
tempDir := t.TempDir()
65+
66+
workingDir := filepath.Join(tempDir, "working-dir")
67+
objectStorePath := filepath.Join(tempDir, "object-store")
68+
69+
objectClient, err := local.NewFSObjectClient(local.FSConfig{
70+
Directory: objectStorePath,
71+
})
72+
require.NoError(t, err)
73+
testDeleteRequestsStore, err := NewDeleteStore(workingDir, storage.NewIndexStorageClient(objectClient, ""))
74+
require.NoError(t, err)
75+
76+
// limits
77+
fl := &fakeLimits{
78+
perTenant: map[string]retentionLimit{
79+
"1": {compactorDeletionEnabled: true},
80+
"2": {compactorDeletionEnabled: false},
81+
},
82+
}
83+
84+
// Setup handler
85+
drh := NewDeleteRequestHandler(testDeleteRequestsStore, 10*time.Second, fl, nil)
86+
middle := drh.deletionMiddleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}))
87+
88+
// User that has deletion enabled
89+
req := httptest.NewRequest(http.MethodGet, "http://www.your-domain.com", nil)
90+
req = req.WithContext(user.InjectOrgID(req.Context(), "1"))
91+
92+
res := httptest.NewRecorder()
93+
middle.ServeHTTP(res, req)
94+
95+
require.Equal(t, http.StatusOK, res.Result().StatusCode)
96+
97+
// User that does not have deletion enabled
98+
req = httptest.NewRequest(http.MethodGet, "http://www.your-domain.com", nil)
99+
req = req.WithContext(user.InjectOrgID(req.Context(), "2"))
100+
101+
res = httptest.NewRecorder()
102+
middle.ServeHTTP(res, req)
103+
104+
require.Equal(t, http.StatusForbidden, res.Result().StatusCode)
105+
106+
// User without override, this should use the default value which is false
107+
req = httptest.NewRequest(http.MethodGet, "http://www.your-domain.com", nil)
108+
req = req.WithContext(user.InjectOrgID(req.Context(), "3"))
109+
110+
res = httptest.NewRecorder()
111+
middle.ServeHTTP(res, req)
112+
113+
require.Equal(t, http.StatusForbidden, res.Result().StatusCode)
114+
115+
// User without override, after the default value is set to true
116+
fl.defaultLimit.compactorDeletionEnabled = true
117+
118+
req = httptest.NewRequest(http.MethodGet, "http://www.your-domain.com", nil)
119+
req = req.WithContext(user.InjectOrgID(req.Context(), "3"))
120+
121+
res = httptest.NewRecorder()
122+
middle.ServeHTTP(res, req)
123+
124+
require.Equal(t, http.StatusOK, res.Result().StatusCode)
125+
126+
// User header is not given
127+
req = httptest.NewRequest(http.MethodGet, "http://www.your-domain.com", nil)
128+
129+
res = httptest.NewRecorder()
130+
middle.ServeHTTP(res, req)
131+
132+
require.Equal(t, http.StatusBadRequest, res.Result().StatusCode)
133+
}

pkg/validation/limits.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,8 @@ type Limits struct {
111111
RulerRemoteWriteQueueRetryOnRateLimit bool `yaml:"ruler_remote_write_queue_retry_on_ratelimit" json:"ruler_remote_write_queue_retry_on_ratelimit"`
112112
RulerRemoteWriteSigV4Config *sigv4.SigV4Config `yaml:"ruler_remote_write_sigv4_config" json:"ruler_remote_write_sigv4_config"`
113113

114+
CompactorDeletionEnabled bool `yaml:"allow_deletes" json:"allow_deletes"`
115+
114116
// Global and per tenant retention
115117
RetentionPeriod model.Duration `yaml:"retention_period" json:"retention_period"`
116118
StreamRetention []StreamRetention `yaml:"retention_stream,omitempty" json:"retention_stream,omitempty"`
@@ -193,6 +195,8 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
193195

194196
_ = l.QuerySplitDuration.Set("30m")
195197
f.Var(&l.QuerySplitDuration, "querier.split-queries-by-interval", "Split queries by an interval and execute in parallel, 0 disables it. This also determines how cache keys are chosen when result caching is enabled")
198+
199+
f.BoolVar(&l.CompactorDeletionEnabled, "compactor.allow-deletes", false, "Enable access to the deletion API.")
196200
}
197201

198202
// UnmarshalYAML implements the yaml.Unmarshaler interface.
@@ -532,6 +536,10 @@ func (o *Overrides) UnorderedWrites(userID string) bool {
532536
return o.getOverridesForUser(userID).UnorderedWrites
533537
}
534538

539+
func (o *Overrides) CompactorDeletionEnabled(userID string) bool {
540+
return o.getOverridesForUser(userID).CompactorDeletionEnabled
541+
}
542+
535543
func (o *Overrides) DefaultLimits() *Limits {
536544
return o.defaultLimits
537545
}

0 commit comments

Comments
 (0)