Skip to content

Commit b19f8c5

Browse files
dehaansakalleep
andauthored
cherry picks for 1.10.2 (grafana#4218)
* Update the walqueue to get fix for shard management (grafana#4211) (cherry picked from commit 41b6ea3) * fix: gcplog push message to support both message_id and messageId (grafana#4110) * Support both variants for push messages from gcp (cherry picked from commit 853309c) * Update changelog * Fix changelog --------- Co-authored-by: Karl Persson <23356117+kalleep@users.noreply.github.com>
1 parent dfa43ee commit b19f8c5

File tree

6 files changed

+117
-22
lines changed

6 files changed

+117
-22
lines changed

CHANGELOG.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,15 @@ This document contains a historical list of changes between releases. Only
77
changes that impact end-user behavior are listed; changes to documentation or
88
internal API changes are not present.
99

10+
v1.10.2
11+
-----------------
12+
13+
### Bugfixes
14+
15+
- Fix issue in `prometheus.write.queue` causing inability to increase shard count if existing WAL data was present on start. (@kgeckhart)
16+
17+
- Fix issue with `loki.source.gcplog` when push messages sent by gcp pub/sub only includes `messageId`. (@kalleep)
18+
1019
v1.10.1
1120
-----------------
1221

go.mod

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ require (
7676
github.com/grafana/snowflake-prometheus-exporter v0.0.0-20250627131542-0c2feac3a700
7777
github.com/grafana/tail v0.0.0-20230510142333-77b18831edf0
7878
github.com/grafana/vmware_exporter v0.0.5-beta.0.20250218170317-73398ba08329
79-
github.com/grafana/walqueue v0.0.0-20250806200024-8857bde78364 // alloy-1.10 branch to cherry pick fixes w/o prom3 update
79+
github.com/grafana/walqueue v0.0.0-20250818155145-61b631a3f799 // alloy-1.10 branch to cherry pick fixes w/o prom3 update
8080
github.com/hashicorp/consul/api v1.32.1
8181
github.com/hashicorp/go-discover v1.1.0
8282
github.com/hashicorp/go-multierror v1.1.1
@@ -528,7 +528,7 @@ require (
528528
github.com/cyphar/filepath-securejoin v0.4.1 // indirect
529529
github.com/danieljoos/wincred v1.2.2 // indirect
530530
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
531-
github.com/deneonet/benc v1.1.8 // indirect
531+
github.com/deneonet/benc v1.1.7 // indirect
532532
github.com/dennwc/btrfs v0.0.0-20230312211831-a1f570bd01a1 // indirect
533533
github.com/dennwc/ioctl v1.0.0 // indirect
534534
github.com/dennwc/varint v1.0.0 // indirect
@@ -1102,3 +1102,6 @@ replace github.com/prometheus/procfs => github.com/prometheus/procfs v0.12.0
11021102

11031103
// Use v0.62.0 of prometheus/common for all dependencies until mongodb_exporter is updated to support 0.63.0
11041104
replace github.com/prometheus/common => github.com/prometheus/common v0.62.0
1105+
1106+
// Do not remove this until the bug breaking backwards compatibility is resolved & included in walqueue: https://github.com/deneonet/benc/issues/13
1107+
replace github.com/deneonet/benc => github.com/deneonet/benc v1.1.7

go.sum

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -872,8 +872,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
872872
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
873873
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
874874
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
875-
github.com/deneonet/benc v1.1.8 h1:Qk9diyH0UcnduvCrZ62mBrwUeSZzte4kQxMbclVdhW4=
876-
github.com/deneonet/benc v1.1.8/go.mod h1:UCfkM5Od0B2huwv/ZItvtUb7QnALFt9YXtX8NXX4Lts=
875+
github.com/deneonet/benc v1.1.7 h1:0XPxTTVJZq/ulxXvMn2Mzjx5XquekVky3wX6eTgA0vA=
876+
github.com/deneonet/benc v1.1.7/go.mod h1:UCfkM5Od0B2huwv/ZItvtUb7QnALFt9YXtX8NXX4Lts=
877877
github.com/denisenkom/go-mssqldb v0.0.0-20180620032804-94c9c97e8c9f/go.mod h1:xN/JuLBIz4bjkxNmByTiV1IbhfnYb6oo99phBn4Eqhc=
878878
github.com/denisenkom/go-mssqldb v0.0.0-20190707035753-2be1aa521ff4/go.mod h1:zAg7JM8CkOJ43xKXIj7eRO9kmWm/TW578qo+oDO6tuM=
879879
github.com/denisenkom/go-mssqldb v0.0.0-20191128021309-1d7a30a10f73/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27NDyej4t/EjAShU=
@@ -1449,8 +1449,8 @@ github.com/grafana/tail v0.0.0-20230510142333-77b18831edf0 h1:bjh0PVYSVVFxzINqPF
14491449
github.com/grafana/tail v0.0.0-20230510142333-77b18831edf0/go.mod h1:7t5XR+2IA8P2qggOAHTj/GCZfoLBle3OvNSYh1VkRBU=
14501450
github.com/grafana/vmware_exporter v0.0.5-beta.0.20250218170317-73398ba08329 h1:Rs4H1yv2Abk3xE82qpyhMGGA8rswAOA0HQZde/BYkFo=
14511451
github.com/grafana/vmware_exporter v0.0.5-beta.0.20250218170317-73398ba08329/go.mod h1:Z28219aViNlsLlPvuCnlgHDagRdZBAZ7JOnQg1b3eWg=
1452-
github.com/grafana/walqueue v0.0.0-20250806200024-8857bde78364 h1:KNk1ugoycr2YBY+jISAUWSl8u7u0DBMFzBTBmLl3juI=
1453-
github.com/grafana/walqueue v0.0.0-20250806200024-8857bde78364/go.mod h1:r5esBN3LfoYHk5JKD+o4/IBwuftA1uCmaqT6C8av21c=
1452+
github.com/grafana/walqueue v0.0.0-20250818155145-61b631a3f799 h1:PtCAPUoGn0FLxBwAs7AsV9Lse9ayQSzYMDUxQp1yYOM=
1453+
github.com/grafana/walqueue v0.0.0-20250818155145-61b631a3f799/go.mod h1:VBs5QI+CljQRsh+x7K8QamyDQ/nTOPqCRdk8h31Kb7Y=
14541454
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
14551455
github.com/grobie/gomemcache v0.0.0-20230213081705-239240bbc445 h1:FlKQKUYPZ5yDCN248M3R7x8yu2E3yEZ0H7aLomE4EoE=
14561456
github.com/grobie/gomemcache v0.0.0-20230213081705-239240bbc445/go.mod h1:L69/dBlPQlWkcnU76WgcppK5e4rrxzQdi6LhLnK/ytA=

internal/component/loki/source/gcplog/internal/gcplogtarget/push_target.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ func (p *PushTarget) push(w http.ResponseWriter, r *http.Request) {
7676
}
7777
defer cancel()
7878

79-
pushMessage := PushMessage{}
79+
pushMessage := PushMessageBody{}
8080
bs, err := io.ReadAll(r.Body)
8181
if err != nil {
8282
p.metrics.gcpPushErrors.WithLabelValues("read_error").Inc()

internal/component/loki/source/gcplog/internal/gcplogtarget/push_translation.go

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,24 +22,26 @@ import (
2222
// pipeline stages
2323
const ReservedLabelTenantID = "__tenant_id__"
2424

25-
// PushMessage is the POST body format sent by GCP PubSub push subscriptions.
25+
// PushMessageBody is the POST body format sent by GCP PubSub push subscriptions.
2626
// See https://cloud.google.com/pubsub/docs/push for details.
27+
type PushMessageBody struct {
28+
Message PushMessage `json:"message"`
29+
Subscription string `json:"subscription"`
30+
}
31+
2732
type PushMessage struct {
28-
Message struct {
29-
Attributes map[string]string `json:"attributes"`
30-
Data string `json:"data"`
31-
ID string `json:"message_id"`
32-
PublishTimestamp string `json:"publish_time"`
33-
} `json:"message"`
34-
Subscription string `json:"subscription"`
33+
Attributes map[string]string `json:"attributes"`
34+
Data string `json:"data"`
35+
MessageID string `json:"messageId"`
36+
DeprecatedMessageID string `json:"message_id"`
3537
}
3638

3739
// Validate checks that the required fields of a PushMessage are set.
38-
func (pm PushMessage) Validate() error {
40+
func (pm PushMessageBody) Validate() error {
3941
if pm.Message.Data == "" {
4042
return fmt.Errorf("push message has no data")
4143
}
42-
if pm.Message.ID == "" {
44+
if pm.ID() == "" {
4345
return fmt.Errorf("push message has no ID")
4446
}
4547
if pm.Subscription == "" {
@@ -48,14 +50,21 @@ func (pm PushMessage) Validate() error {
4850
return nil
4951
}
5052

53+
func (pm PushMessageBody) ID() string {
54+
if pm.Message.MessageID != "" {
55+
return pm.Message.MessageID
56+
}
57+
return pm.Message.DeprecatedMessageID
58+
}
59+
5160
// translate converts a GCP PushMessage into a loki.Entry. It parses the
5261
// push-specific labels and delegates the rest to parseGCPLogsEntry.
53-
func translate(m PushMessage, other model.LabelSet, useIncomingTimestamp bool, useFullLine bool, relabelConfigs []*relabel.Config, xScopeOrgID string) (loki.Entry, error) {
62+
func translate(m PushMessageBody, other model.LabelSet, useIncomingTimestamp bool, useFullLine bool, relabelConfigs []*relabel.Config, xScopeOrgID string) (loki.Entry, error) {
5463
// Collect all push-specific labels. Every one of them is first configured
5564
// as optional, and the user can relabel it if needed. The relabeling and
5665
// internal drop is handled in parseGCPLogsEntry.
5766
lbs := labels.NewBuilder(nil)
58-
lbs.Set("__gcp_message_id", m.Message.ID)
67+
lbs.Set("__gcp_message_id", m.ID())
5968
lbs.Set("__gcp_subscription_name", m.Subscription)
6069
for k, v := range m.Message.Attributes {
6170
lbs.Set(fmt.Sprintf("__gcp_attributes_%s", convertToLokiCompatibleLabel(k)), v)

internal/component/loki/source/gcplog/internal/gcplogtarget/push_translation_test.go

Lines changed: 77 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,13 @@ package gcplogtarget
22

33
import (
44
"testing"
5+
6+
"github.com/grafana/regexp"
7+
"github.com/prometheus/common/model"
8+
"github.com/prometheus/prometheus/model/relabel"
9+
"github.com/stretchr/testify/require"
10+
11+
"github.com/grafana/alloy/internal/component/common/loki"
512
)
613

714
func TestConvertToLokiCompatibleLabel(t *testing.T) {
@@ -37,9 +44,76 @@ func TestConvertToLokiCompatibleLabel(t *testing.T) {
3744
}
3845
for _, tt := range tests {
3946
t.Run(tt.name, func(t *testing.T) {
40-
if got := convertToLokiCompatibleLabel(tt.args.label); got != tt.want {
41-
t.Errorf("convertToLokiCompatibleLabel() = %v, want %v", got, tt.want)
42-
}
47+
require.Equal(t, tt.want, convertToLokiCompatibleLabel(tt.args.label))
48+
})
49+
}
50+
}
51+
52+
const pushData = "eyJpbnNlcnRJZCI6IjRhZmZhODU4LWU1ZjItNDdmNy05MjU0LWU2MDliNWMwMTRkMCIsImxhYmVscyI6e30sImxvZ05hbWUiOiJwcm9qZWN0cy90ZXN0LXByb2plY3QvbG9ncy9jbG91ZGF1ZGl0Lmdvb2dsZWFwaXMuY29tJTJGZGF0YV9hY2Nlc3MiLCJyZWNlaXZlVGltZXN0YW1wIjoiMjAyMi0wOS0wNlQxODowNzo0My40MTc3MTQwNDZaIiwicmVzb3VyY2UiOnsibGFiZWxzIjp7ImNsdXN0ZXJfbmFtZSI6ImRldi11cy1jZW50cmFsLTQyIiwibG9jYXRpb24iOiJ1cy1jZW50cmFsMSIsInByb2plY3RfaWQiOiJ0ZXN0LXByb2plY3QifSwidHlwZSI6Ims4c19jbHVzdGVyIn0sInRpbWVzdGFtcCI6IjIwMjItMDktMDZUMTg6MDc6NDIuMzYzMTEzWiJ9Cg=="
53+
54+
func TestTranslate(t *testing.T) {
55+
type testCase struct {
56+
name string
57+
pm PushMessageBody
58+
expected loki.Entry
59+
}
60+
61+
tests := []testCase{
62+
{
63+
name: "deprecated message id",
64+
pm: PushMessageBody{
65+
Message: PushMessage{
66+
DeprecatedMessageID: "1",
67+
Data: pushData,
68+
},
69+
Subscription: "test",
70+
},
71+
expected: loki.Entry{
72+
Labels: model.LabelSet{
73+
"message_id": "1",
74+
},
75+
},
76+
},
77+
{
78+
name: "standard message id",
79+
pm: PushMessageBody{
80+
Message: PushMessage{
81+
MessageID: "1",
82+
Data: pushData,
83+
},
84+
Subscription: "test",
85+
},
86+
expected: loki.Entry{
87+
Labels: model.LabelSet{
88+
"message_id": "1",
89+
},
90+
},
91+
},
92+
}
93+
94+
rc := []*relabel.Config{
95+
{
96+
SourceLabels: model.LabelNames{"__gcp_message_id"},
97+
Regex: mustNewRegexp("(.*)"),
98+
Action: relabel.Replace,
99+
Replacement: "$1",
100+
TargetLabel: "message_id",
101+
},
102+
}
103+
104+
for _, tt := range tests {
105+
t.Run(tt.name, func(t *testing.T) {
106+
entry, err := translate(tt.pm, model.LabelSet{}, false, false, rc, "")
107+
require.NoError(t, err)
108+
require.EqualValues(t, tt.expected.Labels, entry.Labels)
43109
})
44110
}
45111
}
112+
113+
func mustNewRegexp(s string) relabel.Regexp {
114+
re, err := regexp.Compile("^(?:" + s + ")$")
115+
if err != nil {
116+
panic(err)
117+
}
118+
return relabel.Regexp{Regexp: re}
119+
}

0 commit comments

Comments
 (0)