Skip to content

Commit 9ec199b

Browse files
authored
chore: remove loki imports in loki.source.api (#4939)
* vendor loghttp functions we need to parse loki http requests
1 parent c564d09 commit 9ec199b

File tree

5 files changed

+291
-22
lines changed

5 files changed

+291
-22
lines changed
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package loghttp
2+
3+
// KEEP IN SYNC WITH:
4+
// https://github.com/grafana/loki/blob/main/pkg/loghttp/labels.go
5+
// Local modifications should be minimized.
6+
7+
import (
8+
"sort"
9+
"strconv"
10+
"strings"
11+
12+
"github.com/grafana/jsonparser"
13+
)
14+
15+
// LabelSet is a key/value pair mapping of labels
16+
type LabelSet map[string]string
17+
18+
func (l *LabelSet) UnmarshalJSON(data []byte) error {
19+
if *l == nil {
20+
*l = make(LabelSet)
21+
}
22+
return jsonparser.ObjectEach(data, func(key, val []byte, _ jsonparser.ValueType, _ int) error {
23+
v, err := jsonparser.ParseString(val)
24+
if err != nil {
25+
return err
26+
}
27+
k, err := jsonparser.ParseString(key)
28+
if err != nil {
29+
return err
30+
}
31+
(*l)[k] = v
32+
return nil
33+
})
34+
}
35+
36+
// String implements the Stringer interface. It returns a formatted/sorted set of label key/value pairs.
37+
func (l LabelSet) String() string {
38+
var b strings.Builder
39+
40+
keys := make([]string, 0, len(l))
41+
for k := range l {
42+
keys = append(keys, k)
43+
}
44+
sort.Strings(keys)
45+
46+
b.WriteByte('{')
47+
for i, k := range keys {
48+
if i > 0 {
49+
b.WriteByte(',')
50+
b.WriteByte(' ')
51+
}
52+
b.WriteString(k)
53+
b.WriteByte('=')
54+
b.WriteString(strconv.Quote(l[k]))
55+
}
56+
b.WriteByte('}')
57+
return b.String()
58+
}
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
package loghttp
2+
3+
// KEEP IN SYNC WITH:
4+
// https://github.com/grafana/loki/blob/main/pkg/loghttp/query.go
5+
// Local modifications should be minimized.
6+
7+
import (
8+
"time"
9+
10+
"github.com/grafana/jsonparser"
11+
"github.com/grafana/loki/pkg/push"
12+
)
13+
14+
// PushRequest models a log stream push but is unmarshalled to proto push format.
15+
type PushRequest struct {
16+
Streams []Stream `json:"streams"`
17+
}
18+
19+
// Stream helps with unmarshalling of each log stream for push request.
20+
type Stream push.Stream
21+
22+
func (s *Stream) UnmarshalJSON(data []byte) error {
23+
err := jsonparser.ObjectEach(data, func(key, val []byte, ty jsonparser.ValueType, _ int) error {
24+
switch string(key) {
25+
case "stream":
26+
var labels LabelSet
27+
if err := labels.UnmarshalJSON(val); err != nil {
28+
return err
29+
}
30+
s.Labels = labels.String()
31+
case "values":
32+
if ty == jsonparser.Null {
33+
return nil
34+
}
35+
entries, err := unmarshalHTTPToLogProtoEntries(val)
36+
if err != nil {
37+
return err
38+
}
39+
s.Entries = entries
40+
}
41+
return nil
42+
})
43+
return err
44+
}
45+
46+
func unmarshalHTTPToLogProtoEntries(data []byte) ([]push.Entry, error) {
47+
var (
48+
entries []push.Entry
49+
parseError error
50+
)
51+
if _, err := jsonparser.ArrayEach(data, func(value []byte, ty jsonparser.ValueType, _ int, err error) {
52+
if err != nil || parseError != nil {
53+
return
54+
}
55+
if ty == jsonparser.Null {
56+
return
57+
}
58+
e, err := unmarshalHTTPToLogProtoEntry(value)
59+
if err != nil {
60+
parseError = err
61+
return
62+
}
63+
entries = append(entries, e)
64+
}); err != nil {
65+
parseError = err
66+
}
67+
68+
if parseError != nil {
69+
return nil, parseError
70+
}
71+
72+
return entries, nil
73+
}
74+
75+
func unmarshalHTTPToLogProtoEntry(data []byte) (push.Entry, error) {
76+
var (
77+
i int
78+
parseError error
79+
e push.Entry
80+
)
81+
_, err := jsonparser.ArrayEach(data, func(value []byte, t jsonparser.ValueType, _ int, _ error) {
82+
// assert that both items in array are of type string
83+
if (i == 0 || i == 1) && t != jsonparser.String {
84+
parseError = jsonparser.MalformedStringError
85+
return
86+
} else if i == 2 && t != jsonparser.Object {
87+
parseError = jsonparser.MalformedObjectError
88+
return
89+
}
90+
switch i {
91+
case 0: // timestamp
92+
ts, err := jsonparser.ParseInt(value)
93+
if err != nil {
94+
parseError = err
95+
return
96+
}
97+
e.Timestamp = time.Unix(0, ts)
98+
case 1: // value
99+
v, err := jsonparser.ParseString(value)
100+
if err != nil {
101+
parseError = err
102+
return
103+
}
104+
e.Line = v
105+
case 2: // structuredMetadata
106+
var structuredMetadata []push.LabelAdapter
107+
err := jsonparser.ObjectEach(value, func(key, val []byte, dataType jsonparser.ValueType, _ int) error {
108+
if dataType != jsonparser.String {
109+
return jsonparser.MalformedStringError
110+
}
111+
// Parse the string to properly handle escaped characters like newlines
112+
parsedVal, err := jsonparser.ParseString(val)
113+
if err != nil {
114+
return err
115+
}
116+
structuredMetadata = append(structuredMetadata, push.LabelAdapter{
117+
Name: string(key),
118+
Value: parsedVal,
119+
})
120+
return nil
121+
})
122+
if err != nil {
123+
parseError = err
124+
return
125+
}
126+
e.StructuredMetadata = structuredMetadata
127+
}
128+
i++
129+
})
130+
if parseError != nil {
131+
return e, parseError
132+
}
133+
return e, err
134+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package loghttp
2+
3+
import (
4+
"compress/flate"
5+
"compress/gzip"
6+
"fmt"
7+
"io"
8+
"mime"
9+
"net/http"
10+
11+
"github.com/grafana/loki/pkg/push"
12+
13+
"github.com/grafana/alloy/internal/loki/util"
14+
)
15+
16+
var (
17+
contentType = http.CanonicalHeaderKey("Content-Type")
18+
contentEnc = http.CanonicalHeaderKey("Content-Encoding")
19+
)
20+
21+
const applicationJSON = "application/json"
22+
23+
// ParsePushRequest returns push.PushRequest from http.Request body, deserialized according to specified content type.
24+
func ParsePushRequest(r *http.Request, maxRecvMsgSize int) (*push.PushRequest, error) {
25+
// Body
26+
var body io.Reader
27+
contentEncoding := r.Header.Get(contentEnc)
28+
switch contentEncoding {
29+
case "", "snappy":
30+
body = r.Body
31+
case "gzip":
32+
gzipReader, err := gzip.NewReader(r.Body)
33+
if err != nil {
34+
return nil, err
35+
}
36+
defer gzipReader.Close()
37+
body = gzipReader
38+
case "deflate":
39+
flateReader := flate.NewReader(r.Body)
40+
defer flateReader.Close()
41+
body = flateReader
42+
default:
43+
return nil, fmt.Errorf("Content-Encoding %q not supported", contentEncoding)
44+
}
45+
46+
contentType := r.Header.Get(contentType)
47+
contentType, _ /* params */, err := mime.ParseMediaType(contentType)
48+
if err != nil {
49+
return nil, err
50+
}
51+
52+
var req push.PushRequest
53+
switch contentType {
54+
case applicationJSON:
55+
if err = decodePushRequest(body, &req); err != nil {
56+
return nil, err
57+
}
58+
default:
59+
// When no content-type header is set or when it is set to
60+
// `application/x-protobuf`: expect snappy compression.
61+
if err := util.ParseProtoReader(r.Context(), body, int(r.ContentLength), maxRecvMsgSize, &req, util.RawSnappy); err != nil {
62+
return nil, err
63+
}
64+
}
65+
66+
return &req, nil
67+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package loghttp
2+
3+
// KEEP IN SYNC WITH:
4+
// https://github.com/grafana/loki/blob/main/pkg/util/unmarshal/unmarshal.go
5+
// Local modifications should be minimized.
6+
7+
import (
8+
"io"
9+
"unsafe"
10+
11+
"github.com/grafana/loki/pkg/push"
12+
jsoniter "github.com/json-iterator/go"
13+
)
14+
15+
// decodePushRequest directly decodes json to a push.PushRequest
16+
func decodePushRequest(b io.Reader, r *push.PushRequest) error {
17+
var request PushRequest
18+
19+
if err := jsoniter.NewDecoder(b).Decode(&request); err != nil {
20+
return err
21+
}
22+
23+
*r = push.PushRequest{
24+
Streams: *(*[]push.Stream)(unsafe.Pointer(&request.Streams)), //#nosec G103 -- Just preventing an allocation, safe, there's no chance of an incorrect type cast here. -- nosemgrep: use-of-unsafe-block
25+
}
26+
27+
return nil
28+
}

internal/component/loki/source/api/internal/lokipush/push_api_server.go

Lines changed: 4 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,6 @@ import (
1313
"github.com/grafana/dskit/tenant"
1414
"github.com/grafana/dskit/user"
1515
lokipush "github.com/grafana/loki/pkg/push"
16-
"github.com/grafana/loki/v3/pkg/loghttp/push"
17-
"github.com/grafana/loki/v3/pkg/util/constants"
18-
util_log "github.com/grafana/loki/v3/pkg/util/log"
1916
"github.com/prometheus/client_golang/prometheus"
2017
"github.com/prometheus/common/model"
2118
"github.com/prometheus/prometheus/model/labels"
@@ -26,6 +23,7 @@ import (
2623
"github.com/grafana/alloy/internal/component/common/loki/client"
2724
fnet "github.com/grafana/alloy/internal/component/common/net"
2825
frelabel "github.com/grafana/alloy/internal/component/common/relabel"
26+
loghttp2 "github.com/grafana/alloy/internal/component/loki/source/api/internal/loghttp"
2927
"github.com/grafana/alloy/internal/runtime/logging/level"
3028
)
3129

@@ -189,24 +187,8 @@ func (s *PushAPIServer) getRelabelRules() []*relabel.Config {
189187
return newRules
190188
}
191189

192-
// NOTE: This code is copied from Promtail (https://github.com/grafana/loki/commit/47e2c5884f443667e64764f3fc3948f8f11abbb8) with changes kept to the minimum.
193-
// Only the HTTP handler functions are copied to allow for Alloy-specific server configuration and lifecycle management.
194190
func (s *PushAPIServer) handleLoki(w http.ResponseWriter, r *http.Request) {
195-
logger := util_log.WithContext(r.Context(), util_log.Logger)
196-
tenantID, _ := tenant.TenantID(r.Context())
197-
req, _, err := push.ParseRequest(
198-
logger,
199-
tenantID,
200-
int(s.maxSendMessageSize),
201-
r,
202-
push.EmptyLimits{},
203-
nil,
204-
push.ParseLokiRequest,
205-
nil, // usage tracker
206-
nil,
207-
"",
208-
constants.Loki,
209-
)
191+
req, err := loghttp2.ParsePushRequest(r, int(s.maxSendMessageSize))
210192
if err != nil {
211193
level.Warn(s.logger).Log("msg", "failed to parse incoming push request", "err", err.Error())
212194
http.Error(w, err.Error(), http.StatusBadRequest)
@@ -221,6 +203,8 @@ func (s *PushAPIServer) handleLoki(w http.ResponseWriter, r *http.Request) {
221203
var (
222204
entries []loki.Entry
223205
lastErr error
206+
207+
tenantID, _ = tenant.TenantID(r.Context())
224208
)
225209
for _, stream := range req.Streams {
226210
ls, err := promql_parser.ParseMetric(stream.Labels)
@@ -299,8 +283,6 @@ func (s *PushAPIServer) handleLoki(w http.ResponseWriter, r *http.Request) {
299283
w.WriteHeader(http.StatusNoContent)
300284
}
301285

302-
// NOTE: This code is copied from Promtail (https://github.com/grafana/loki/commit/47e2c5884f443667e64764f3fc3948f8f11abbb8) with changes kept to the minimum.
303-
// Only the HTTP handler functions are copied to allow for Alloy-specific server configuration and lifecycle management.
304286
func (s *PushAPIServer) handlePlaintext(w http.ResponseWriter, r *http.Request) {
305287
defer r.Body.Close()
306288
body := bufio.NewReader(r.Body)

0 commit comments

Comments
 (0)