Skip to content

Commit f114c29

Browse files
authored
fix AllUserStats during rolling updates on ingester (#7026)
* fix AllUserStats during rolling updates on ingester Signed-off-by: SungJin1212 <[email protected]> * Add dedicated column to expose # Queried Ingesters Signed-off-by: SungJin1212 <[email protected]> * fix test Signed-off-by: SungJin1212 <[email protected]> --------- Signed-off-by: SungJin1212 <[email protected]>
1 parent 651386b commit f114c29

File tree

7 files changed

+117
-17
lines changed

7 files changed

+117
-17
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@
9898
* [BUGFIX] Store Gateway: Avoid race condition by deduplicating entries in bucket stores user scan. #6863
9999
* [BUGFIX] Runtime-config: Change to check tenant limit validation when loading runtime config only for `all`, `distributor`, `querier`, and `ruler` targets. #6880
100100
* [BUGFIX] Frontend: Fix remote read snappy input due to request string logging when query stats enabled. #7025
101+
* [BUGFIX] Distributor: Fix the `/distributor/all_user_stats` api to work during rolling updates on ingesters. #7026
101102

102103
## 1.19.0 2025-02-27
103104

integration/api_endpoints_test.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,15 @@ import (
99
"net/http"
1010
"path/filepath"
1111
"testing"
12+
"time"
1213

14+
"github.com/prometheus/prometheus/model/labels"
1315
"github.com/stretchr/testify/assert"
1416
"github.com/stretchr/testify/require"
1517
"github.com/thanos-io/thanos/pkg/runutil"
1618

1719
"github.com/cortexproject/cortex/integration/e2e"
20+
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
1821
"github.com/cortexproject/cortex/integration/e2ecortex"
1922
)
2023

@@ -85,3 +88,50 @@ func TestConfigAPIEndpoint(t *testing.T) {
8588
cortex2 := e2ecortex.NewSingleBinaryWithConfigFile("cortex-2", cortexConfigFile, configOverrides, "", 9009, 9095)
8689
require.NoError(t, s.StartAndWaitReady(cortex2))
8790
}
91+
92+
func Test_AllUserStats_WhenIngesterRollingUpdate(t *testing.T) {
93+
s, err := e2e.NewScenario(networkName)
94+
require.NoError(t, err)
95+
defer s.Close()
96+
97+
flags := BlocksStorageFlags()
98+
flags["-distributor.replication-factor"] = "3"
99+
flags["-distributor.sharding-strategy"] = "shuffle-sharding"
100+
flags["-distributor.ingestion-tenant-shard-size"] = "3"
101+
flags["-distributor.shard-by-all-labels"] = "true"
102+
103+
// Start dependencies.
104+
consul := e2edb.NewConsul()
105+
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
106+
require.NoError(t, s.StartAndWaitReady(consul, minio))
107+
108+
// Start Cortex components.
109+
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
110+
ingester1 := e2ecortex.NewIngester("ingester-1", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
111+
ingester2 := e2ecortex.NewIngester("ingester-2", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
112+
ingester3 := e2ecortex.NewIngester("ingester-3", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
113+
require.NoError(t, s.StartAndWaitReady(distributor, ingester1, ingester2, ingester3))
114+
115+
// Wait until distributor has updated the ring.
116+
require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
117+
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
118+
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))
119+
120+
// stop ingester1 to emulate rolling update
121+
require.NoError(t, s.Stop(ingester1))
122+
123+
client, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-1")
124+
require.NoError(t, err)
125+
126+
now := time.Now()
127+
series, _ := generateSeries("series_1", now)
128+
res, err := client.Push(series)
129+
require.NoError(t, err)
130+
require.Equal(t, 200, res.StatusCode)
131+
132+
// QueriedIngesters is 2 since ingester1 has been stopped.
133+
userStats, err := client.AllUserStats()
134+
require.NoError(t, err)
135+
require.Len(t, userStats, 1)
136+
require.Equal(t, uint64(2), userStats[0].QueriedIngesters)
137+
}

integration/e2ecortex/client.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"go.opentelemetry.io/collector/pdata/pmetric"
3434
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
3535

36+
"github.com/cortexproject/cortex/pkg/ingester"
3637
"github.com/cortexproject/cortex/pkg/ruler"
3738
"github.com/cortexproject/cortex/pkg/util/backoff"
3839
)
@@ -115,6 +116,40 @@ func NewPromQueryClient(address string) (*Client, error) {
115116
return c, nil
116117
}
117118

119+
func (c *Client) AllUserStats() ([]ingester.UserIDStats, error) {
120+
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("http://%s/distributor/all_user_stats", c.distributorAddress), nil)
121+
if err != nil {
122+
return nil, err
123+
}
124+
req.Header.Set("Accept", "application/json")
125+
126+
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
127+
defer cancel()
128+
129+
// Execute HTTP request
130+
res, err := c.httpClient.Do(req.WithContext(ctx))
131+
if err != nil {
132+
return nil, err
133+
}
134+
defer res.Body.Close()
135+
if res.StatusCode != http.StatusOK {
136+
return nil, err
137+
}
138+
139+
bodyBytes, err := io.ReadAll(res.Body)
140+
if err != nil {
141+
return nil, err
142+
}
143+
144+
userStats := make([]ingester.UserIDStats, 0)
145+
err = json.Unmarshal(bodyBytes, &userStats)
146+
if err != nil {
147+
return nil, err
148+
}
149+
150+
return userStats, nil
151+
}
152+
118153
// Push the input timeseries to the remote endpoint
119154
func (c *Client) Push(timeseries []prompb.TimeSeries, metadata ...prompb.MetricMetadata) (*http.Response, error) {
120155
// Create write request

pkg/distributor/distributor.go

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1603,26 +1603,31 @@ func (d *Distributor) UserStats(ctx context.Context) (*ingester.UserStats, error
16031603

16041604
// AllUserStats returns statistics about all users.
16051605
// Note it does not divide by the ReplicationFactor like UserStats()
1606-
func (d *Distributor) AllUserStats(ctx context.Context) ([]ingester.UserIDStats, error) {
1606+
func (d *Distributor) AllUserStats(ctx context.Context) ([]ingester.UserIDStats, int, error) {
16071607
// Add up by user, across all responses from ingesters
16081608
perUserTotals := make(map[string]ingester.UserStats)
1609+
queriedIngesterNum := 0
16091610

16101611
req := &ingester_client.UserStatsRequest{}
16111612
ctx = user.InjectOrgID(ctx, "1") // fake: ingester insists on having an org ID
1612-
// Not using d.ForReplicationSet(), so we can fail after first error.
16131613
replicationSet, err := d.ingestersRing.GetAllHealthy(ring.Read)
16141614
if err != nil {
1615-
return nil, err
1615+
return nil, 0, err
16161616
}
16171617
for _, ingester := range replicationSet.Instances {
16181618
client, err := d.ingesterPool.GetClientFor(ingester.Addr)
16191619
if err != nil {
1620-
return nil, err
1620+
return nil, 0, err
16211621
}
16221622
resp, err := client.(ingester_client.IngesterClient).AllUserStats(ctx, req)
16231623
if err != nil {
1624-
return nil, err
1624+
// During an ingester rolling update, an ingester might be temporarily
1625+
// in stopping or starting state. Therefore, returning an error would
1626+
// cause the API to fail during the update. This is an expected error in
1627+
// that scenario, we continue the loop to work API.
1628+
continue
16251629
}
1630+
queriedIngesterNum++
16261631
for _, u := range resp.Stats {
16271632
s := perUserTotals[u.UserId]
16281633
s.IngestionRate += u.Data.IngestionRate
@@ -1631,6 +1636,7 @@ func (d *Distributor) AllUserStats(ctx context.Context) ([]ingester.UserIDStats,
16311636
s.NumSeries += u.Data.NumSeries
16321637
s.ActiveSeries += u.Data.ActiveSeries
16331638
s.LoadedBlocks += u.Data.LoadedBlocks
1639+
s.QueriedIngesters += 1
16341640
perUserTotals[u.UserId] = s
16351641
}
16361642
}
@@ -1647,22 +1653,23 @@ func (d *Distributor) AllUserStats(ctx context.Context) ([]ingester.UserIDStats,
16471653
NumSeries: stats.NumSeries,
16481654
ActiveSeries: stats.ActiveSeries,
16491655
LoadedBlocks: stats.LoadedBlocks,
1656+
QueriedIngesters: stats.QueriedIngesters,
16501657
},
16511658
})
16521659
}
16531660

1654-
return response, nil
1661+
return response, queriedIngesterNum, nil
16551662
}
16561663

16571664
// AllUserStatsHandler shows stats for all users.
16581665
func (d *Distributor) AllUserStatsHandler(w http.ResponseWriter, r *http.Request) {
1659-
stats, err := d.AllUserStats(r.Context())
1666+
stats, queriedIngesterNum, err := d.AllUserStats(r.Context())
16601667
if err != nil {
16611668
http.Error(w, err.Error(), http.StatusInternalServerError)
16621669
return
16631670
}
16641671

1665-
ingester.AllUserStatsRender(w, r, stats, d.ingestersRing.ReplicationFactor())
1672+
ingester.AllUserStatsRender(w, r, stats, d.ingestersRing.ReplicationFactor(), queriedIngesterNum)
16661673
}
16671674

16681675
func (d *Distributor) ServeHTTP(w http.ResponseWriter, req *http.Request) {

pkg/ingester/http_admin.go

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ const tpl = `
2525
{{if (gt .ReplicationFactor 0)}}
2626
<p><b>NB stats do not account for replication factor, which is currently set to {{ .ReplicationFactor }}</b></p>
2727
{{end}}
28+
<p><b> These stats were aggregated from {{ .QueriedIngesterNum }} ingesters.</b></p>
29+
2830
<form action="" method="POST">
2931
<input type="hidden" name="csrf_token" value="$__CSRF_TOKEN_PLACEHOLDER__">
3032
<table border="1">
@@ -37,6 +39,7 @@ const tpl = `
3739
<th>Total Ingest Rate</th>
3840
<th>API Ingest Rate</th>
3941
<th>Rule Ingest Rate</th>
42+
<th># Queried Ingesters</th>
4043
</tr>
4144
</thead>
4245
<tbody>
@@ -49,6 +52,7 @@ const tpl = `
4952
<td align='right'>{{ printf "%.2f" .UserStats.IngestionRate }}</td>
5053
<td align='right'>{{ printf "%.2f" .UserStats.APIIngestionRate }}</td>
5154
<td align='right'>{{ printf "%.2f" .UserStats.RuleIngestionRate }}</td>
55+
<td align='right'>{{ .UserStats.QueriedIngesters }}</td>
5256
</tr>
5357
{{ end }}
5458
</tbody>
@@ -87,10 +91,11 @@ type UserStats struct {
8791
RuleIngestionRate float64 `json:"RuleIngestionRate"`
8892
ActiveSeries uint64 `json:"activeSeries"`
8993
LoadedBlocks uint64 `json:"loadedBlocks"`
94+
QueriedIngesters uint64 `json:"queriedIngesters"`
9095
}
9196

9297
// AllUserStatsRender render data for all users or return in json format.
93-
func AllUserStatsRender(w http.ResponseWriter, r *http.Request, stats []UserIDStats, rf int) {
98+
func AllUserStatsRender(w http.ResponseWriter, r *http.Request, stats []UserIDStats, rf, queriedIngesterNum int) {
9499
sort.Sort(UserStatsByTimeseries(stats))
95100

96101
if encodings, found := r.Header["Accept"]; found &&
@@ -102,12 +107,14 @@ func AllUserStatsRender(w http.ResponseWriter, r *http.Request, stats []UserIDSt
102107
}
103108

104109
util.RenderHTTPResponse(w, struct {
105-
Now time.Time `json:"now"`
106-
Stats []UserIDStats `json:"stats"`
107-
ReplicationFactor int `json:"replicationFactor"`
110+
Now time.Time `json:"now"`
111+
Stats []UserIDStats `json:"stats"`
112+
ReplicationFactor int `json:"replicationFactor"`
113+
QueriedIngesterNum int `json:"queriedIngesterNum"`
108114
}{
109-
Now: time.Now(),
110-
Stats: stats,
111-
ReplicationFactor: rf,
115+
Now: time.Now(),
116+
Stats: stats,
117+
ReplicationFactor: rf,
118+
QueriedIngesterNum: queriedIngesterNum,
112119
}, UserStatsTmpl, r)
113120
}

pkg/ingester/http_admin_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ func TestUserStatsPageRendered(t *testing.T) {
2424
},
2525
},
2626
}
27-
AllUserStatsRender(res, req, userStats, 3)
27+
AllUserStatsRender(res, req, userStats, 3, 3)
2828
assert.Equal(t, http.StatusOK, res.Code)
2929
body := res.Body.String()
3030
assert.Regexp(t, "<td.+123.+/td>", body)

pkg/ingester/ingester.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2143,7 +2143,7 @@ func (i *Ingester) userStats() []UserIDStats {
21432143
func (i *Ingester) AllUserStatsHandler(w http.ResponseWriter, r *http.Request) {
21442144
stats := i.userStats()
21452145

2146-
AllUserStatsRender(w, r, stats, 0)
2146+
AllUserStatsRender(w, r, stats, 0, 0)
21472147
}
21482148

21492149
// AllUserStats returns ingestion statistics for all users known to this ingester.

0 commit comments

Comments
 (0)