Skip to content

Commit 79cabc7

Browse files
committed
add Prometheus query client, move Loki client to framework, expose base obs URLs
1 parent b34ccb3 commit 79cabc7

File tree

5 files changed

+1029
-0
lines changed

5 files changed

+1029
-0
lines changed

framework/loki/loki.go

Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
package loki
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"os"
8+
"time"
9+
10+
"github.com/go-resty/resty/v2"
11+
12+
"github.com/smartcontractkit/chainlink-testing-framework/framework"
13+
)
14+
15+
// APIError is a custom error type for handling non-200 responses from the Loki API
16+
type APIError struct {
17+
StatusCode int
18+
Message string
19+
}
20+
21+
// Implement the `Error` interface for APIError
22+
func (e *APIError) Error() string {
23+
return fmt.Sprintf("Loki API error: %s (status code: %d)", e.Message, e.StatusCode)
24+
}
25+
26+
// BasicAuth holds the authentication details for Loki
27+
type BasicAuth struct {
28+
Login string
29+
Password string
30+
}
31+
32+
// Response represents the structure of the response from Loki
33+
type Response struct {
34+
Data struct {
35+
Result []struct {
36+
Stream map[string]string `json:"stream"`
37+
Values [][]interface{} `json:"values"`
38+
} `json:"result"`
39+
} `json:"data"`
40+
}
41+
42+
// LogEntry represents a single log entry with a timestamp and raw log message
43+
type LogEntry struct {
44+
Timestamp string
45+
Log string
46+
}
47+
48+
// Client represents a client to interact with Loki for querying logs
49+
type Client struct {
50+
BaseURL string
51+
TenantID string
52+
BasicAuth BasicAuth
53+
QueryParams QueryParams
54+
RestyClient *resty.Client
55+
}
56+
57+
// QueryParams holds the parameters required for querying Loki
58+
type QueryParams struct {
59+
Query string
60+
StartTime time.Time
61+
EndTime time.Time
62+
Limit int
63+
}
64+
65+
// NewQueryClient creates a new Loki client with the given parameters, initializes a logger, and configures Resty with debug mode
66+
func NewQueryClient(baseURL, tenantID string, auth BasicAuth, queryParams QueryParams) *Client {
67+
framework.L.Info().
68+
Str("BaseURL", baseURL).
69+
Str("TenantID", tenantID).
70+
Msg("Initializing Loki Client")
71+
72+
// Set debug mode for Resty if RESTY_DEBUG is enabled
73+
isDebug := os.Getenv("RESTY_DEBUG") == "true"
74+
75+
restyClient := resty.New().
76+
SetDebug(isDebug)
77+
78+
return &Client{
79+
BaseURL: baseURL,
80+
TenantID: tenantID,
81+
BasicAuth: auth,
82+
QueryParams: queryParams,
83+
RestyClient: restyClient,
84+
}
85+
}
86+
87+
// QueryRange queries Loki logs based on the query parameters and returns the raw log entries
88+
func (lc *Client) QueryRange(ctx context.Context) ([]LogEntry, error) {
89+
// Log request details
90+
framework.L.Info().
91+
Str("Query", lc.QueryParams.Query).
92+
Str("StartTime", lc.QueryParams.StartTime.Format(time.RFC3339Nano)).
93+
Str("EndTime", lc.QueryParams.EndTime.Format(time.RFC3339Nano)).
94+
Int("Limit", lc.QueryParams.Limit).
95+
Msg("Making request to Loki API")
96+
97+
// Start tracking request duration
98+
start := time.Now()
99+
100+
// Build query parameters
101+
params := map[string]string{
102+
"query": lc.QueryParams.Query,
103+
"start": lc.QueryParams.StartTime.Format(time.RFC3339Nano),
104+
"end": lc.QueryParams.EndTime.Format(time.RFC3339Nano),
105+
"limit": fmt.Sprintf("%d", lc.QueryParams.Limit),
106+
}
107+
108+
// Send request using the pre-configured Resty client
109+
resp, err := lc.RestyClient.R().
110+
SetContext(ctx).
111+
SetHeader("X-Scope-OrgID", lc.TenantID).
112+
SetBasicAuth(lc.BasicAuth.Login, lc.BasicAuth.Password).
113+
SetQueryParams(params).
114+
Get(lc.BaseURL + "/loki/api/v1/query_range")
115+
116+
// Track request duration
117+
duration := time.Since(start)
118+
119+
if err != nil {
120+
framework.L.Error().Err(err).Dur("duration", duration).Msg("Error querying Loki")
121+
return nil, err
122+
}
123+
124+
// Log non-200 responses
125+
if resp.StatusCode() != 200 {
126+
bodySnippet := string(resp.Body())
127+
if len(bodySnippet) > 200 {
128+
bodySnippet = bodySnippet[:200] + "..."
129+
}
130+
framework.L.Error().
131+
Int("StatusCode", resp.StatusCode()).
132+
Dur("duration", duration).
133+
Str("ResponseBody", bodySnippet).
134+
Msg("Loki API returned non-200 status")
135+
return nil, &APIError{
136+
StatusCode: resp.StatusCode(),
137+
Message: "unexpected status code from Loki API",
138+
}
139+
}
140+
141+
// Log successful response
142+
framework.L.Info().
143+
Int("StatusCode", resp.StatusCode()).
144+
Dur("duration", duration).
145+
Msg("Successfully queried Loki API")
146+
147+
// Parse the response into the Response struct
148+
var lokiResp Response
149+
if err := json.Unmarshal(resp.Body(), &lokiResp); err != nil {
150+
framework.L.Error().Err(err).Msg("Error decoding response from Loki")
151+
return nil, err
152+
}
153+
154+
// Extract log entries from the response
155+
logEntries := lc.extractRawLogEntries(lokiResp)
156+
157+
// Log the number of entries retrieved
158+
framework.L.Info().Int("LogEntries", len(logEntries)).Msg("Successfully retrieved logs from Loki")
159+
160+
return logEntries, nil
161+
}
162+
163+
// extractRawLogEntries processes the Response and returns raw log entries
164+
func (lc *Client) extractRawLogEntries(lokiResp Response) []LogEntry {
165+
var logEntries []LogEntry
166+
167+
for _, result := range lokiResp.Data.Result {
168+
for _, entry := range result.Values {
169+
if len(entry) != 2 {
170+
framework.L.Error().Interface("Log entry", entry).Msgf("Error parsing log entry. Expected 2 elements, got %d", len(entry))
171+
continue
172+
}
173+
var timestamp string
174+
if entry[0] == nil {
175+
framework.L.Error().Msg("Error parsing timestamp. Entry at index 0, that should be a timestamp, is nil")
176+
continue
177+
}
178+
if timestampString, ok := entry[0].(string); ok {
179+
timestamp = timestampString
180+
} else if timestampInt, ok := entry[0].(int); ok {
181+
timestamp = fmt.Sprintf("%d", timestampInt)
182+
} else if timestampFloat, ok := entry[0].(float64); ok {
183+
timestamp = fmt.Sprintf("%f", timestampFloat)
184+
} else {
185+
framework.L.Error().Msgf("Error parsing timestamp. Expected string, int, or float64, got %T", entry[0])
186+
continue
187+
}
188+
logLine := entry[1].(string)
189+
logEntries = append(logEntries, LogEntry{
190+
Timestamp: timestamp,
191+
Log: logLine,
192+
})
193+
}
194+
}
195+
196+
return logEntries
197+
}

framework/loki/loki_test.go

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
package loki
2+
3+
import (
4+
"context"
5+
"errors"
6+
"net/http"
7+
"net/http/httptest"
8+
"os"
9+
"testing"
10+
"time"
11+
12+
"github.com/stretchr/testify/assert"
13+
)
14+
15+
// TestLokiClient_QueryLogs tests the Client's ability to query Loki logs
16+
func TestLokiClient_SuccessfulQuery(t *testing.T) {
17+
// Create a mock Loki server using httptest
18+
mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
19+
assert.Equal(t, "/loki/api/v1/query_range", r.URL.Path)
20+
w.WriteHeader(http.StatusOK)
21+
_, err := w.Write([]byte(`{
22+
"data": {
23+
"result": [
24+
{
25+
"stream": {"namespace": "test"},
26+
"values": [
27+
["1234567890", "Log message 1"],
28+
["1234567891", "Log message 2"]
29+
]
30+
}
31+
]
32+
}
33+
}`))
34+
assert.NoError(t, err)
35+
}))
36+
defer mockServer.Close()
37+
38+
// Create a BasicAuth object for testing
39+
auth := BasicAuth{
40+
Login: "test-login",
41+
Password: "test-password",
42+
}
43+
44+
// Set the query parameters
45+
queryParams := QueryParams{
46+
Query: `{namespace="test"}`,
47+
StartTime: time.Now().Add(-1 * time.Hour),
48+
EndTime: time.Now(),
49+
Limit: 100,
50+
}
51+
52+
// Create the Loki client with the mock server URL
53+
lokiClient := NewQueryClient(mockServer.URL, "test-tenant", auth, queryParams)
54+
55+
// Query logs
56+
logEntries, err := lokiClient.QueryRange(context.Background())
57+
assert.NoError(t, err)
58+
assert.Len(t, logEntries, 2)
59+
60+
// Verify the content of the log entries
61+
assert.Equal(t, "1234567890", logEntries[0].Timestamp)
62+
assert.Equal(t, "Log message 1", logEntries[0].Log)
63+
assert.Equal(t, "1234567891", logEntries[1].Timestamp)
64+
assert.Equal(t, "Log message 2", logEntries[1].Log)
65+
}
66+
67+
func TestLokiClient_AuthenticationFailure(t *testing.T) {
68+
// Create a mock Loki server
69+
mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
70+
assert.Equal(t, "/loki/api/v1/query_range", r.URL.Path)
71+
w.WriteHeader(http.StatusUnauthorized) // Simulate authentication failure
72+
}))
73+
defer mockServer.Close()
74+
75+
// Create a Loki client with incorrect credentials
76+
auth := BasicAuth{
77+
Login: "wrong-login",
78+
Password: "wrong-password",
79+
}
80+
queryParams := QueryParams{
81+
Query: `{namespace="test"}`,
82+
StartTime: time.Now().Add(-1 * time.Hour),
83+
EndTime: time.Now(),
84+
Limit: 100,
85+
}
86+
lokiClient := NewQueryClient(mockServer.URL, "test-tenant", auth, queryParams)
87+
88+
// Query logs and expect an error
89+
logEntries, err := lokiClient.QueryRange(context.Background())
90+
assert.Nil(t, logEntries)
91+
assert.Error(t, err)
92+
var lokiErr *APIError
93+
if errors.As(err, &lokiErr) {
94+
assert.Equal(t, http.StatusUnauthorized, lokiErr.StatusCode)
95+
} else {
96+
t.Fatalf("Expected APIError, got %v", err)
97+
}
98+
}
99+
100+
func TestLokiClient_InternalServerError(t *testing.T) {
101+
// Create a mock Loki server
102+
mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
103+
assert.Equal(t, "/loki/api/v1/query_range", r.URL.Path)
104+
w.WriteHeader(http.StatusInternalServerError) // Simulate server error
105+
_, err := w.Write([]byte(`{"message": "internal server error"}`)) // Error message in the response body
106+
assert.NoError(t, err)
107+
}))
108+
defer mockServer.Close()
109+
110+
// Create a Loki client
111+
auth := BasicAuth{
112+
Login: "test-login",
113+
Password: "test-password",
114+
}
115+
queryParams := QueryParams{
116+
Query: `{namespace="test"}`,
117+
StartTime: time.Now().Add(-1 * time.Hour),
118+
EndTime: time.Now(),
119+
Limit: 100,
120+
}
121+
lokiClient := NewQueryClient(mockServer.URL, "test-tenant", auth, queryParams)
122+
123+
// Query logs and expect an error
124+
logEntries, err := lokiClient.QueryRange(context.Background())
125+
assert.Nil(t, logEntries)
126+
assert.Error(t, err)
127+
var lokiErr *APIError
128+
if errors.As(err, &lokiErr) {
129+
assert.Equal(t, http.StatusInternalServerError, lokiErr.StatusCode)
130+
} else {
131+
t.Fatalf("Expected APIError, got %v", err)
132+
}
133+
}
134+
135+
func TestLokiClient_DebugMode(t *testing.T) {
136+
// Set the RESTY_DEBUG environment variable
137+
os.Setenv("RESTY_DEBUG", "true")
138+
defer os.Unsetenv("RESTY_DEBUG") // Clean up after the test
139+
140+
// Create a mock Loki server
141+
mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
142+
assert.Equal(t, "/loki/api/v1/query_range", r.URL.Path)
143+
w.WriteHeader(http.StatusOK)
144+
_, err := w.Write([]byte(`{
145+
"data": {
146+
"result": [
147+
{
148+
"stream": {"namespace": "test"},
149+
"values": [
150+
["1234567890", "Log message 1"],
151+
["1234567891", "Log message 2"]
152+
]
153+
}
154+
]
155+
}
156+
}`))
157+
assert.NoError(t, err)
158+
}))
159+
defer mockServer.Close()
160+
161+
// Create a Loki client
162+
auth := BasicAuth{
163+
Login: "test-login",
164+
Password: "test-password",
165+
}
166+
queryParams := QueryParams{
167+
Query: `{namespace="test"}`,
168+
StartTime: time.Now().Add(-1 * time.Hour),
169+
EndTime: time.Now(),
170+
Limit: 100,
171+
}
172+
lokiClient := NewQueryClient(mockServer.URL, "test-tenant", auth, queryParams)
173+
174+
// Query logs
175+
logEntries, err := lokiClient.QueryRange(context.Background())
176+
assert.NoError(t, err)
177+
assert.Len(t, logEntries, 2)
178+
179+
// Check if debug mode was enabled
180+
assert.True(t, lokiClient.RestyClient.Debug)
181+
}

framework/observability.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ import (
1313
var EmbeddedObservabilityFiles embed.FS
1414

1515
const (
16+
LocalGrafanaBaseURL = "http://localhost:3000"
17+
LocalLokiBaseURL = "http://localhost:3030"
18+
LocalPrometheusBaseURL = "http://localhost:9099"
1619
LocalCLNodeErrorsURL = "http://localhost:3000/d/a7de535b-3e0f-4066-bed7-d505b6ec9ef1/cl-node-errors?orgId=1&refresh=5s"
1720
LocalWorkflowEngineURL = "http://localhost:3000/d/ce589a98-b4be-4f80-bed1-bc62f3e4414a/workflow-engine?orgId=1&refresh=5s&from=now-15m&to=now"
1821
LocalLogsURL = "http://localhost:3000/explore?panes=%7B%22qZw%22:%7B%22datasource%22:%22P8E80F9AEF21F6940%22,%22queries%22:%5B%7B%22refId%22:%22A%22,%22expr%22:%22%7Bjob%3D%5C%22ctf%5C%22%7D%22,%22queryType%22:%22range%22,%22datasource%22:%7B%22type%22:%22loki%22,%22uid%22:%22P8E80F9AEF21F6940%22%7D,%22editorMode%22:%22code%22%7D%5D,%22range%22:%7B%22from%22:%22now-15m%22,%22to%22:%22now%22%7D%7D%7D&schemaVersion=1&orgId=1"

0 commit comments

Comments
 (0)