Skip to content

Commit 85cda69

Browse files
authored
Merge pull request #143 from codex-team/master
Update prod
2 parents 7a8795d + 23800ad commit 85cda69

File tree

4 files changed

+292
-0
lines changed

4 files changed

+292
-0
lines changed

pkg/redis/client.go

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package redis
33
import (
44
"context"
55
"fmt"
6+
"strings"
67
"sync"
78
"time"
89

@@ -236,3 +237,131 @@ func (r *RedisClient) UpdateRateLimit(projectID string, eventsLimit int64, event
236237
// Script returns 1 if rate limit is not exceeded, 0 if it is
237238
return result.(int64) == 1, nil
238239
}
240+
241+
// TSCreateIfNotExists creates a RedisTimeSeries key if it doesn't exist.
242+
// It sets optional retention policy and attaches labels.
243+
func (r *RedisClient) TSCreateIfNotExists(
244+
key string,
245+
labels map[string]string,
246+
retention time.Duration,
247+
) error {
248+
exists, err := r.rdb.Exists(r.ctx, key).Result()
249+
if err != nil {
250+
return err
251+
}
252+
if exists > 0 {
253+
return nil // already exists
254+
}
255+
256+
labelArgs := []interface{}{"LABELS"}
257+
for k, v := range labels {
258+
labelArgs = append(labelArgs, k, v)
259+
}
260+
261+
args := []interface{}{key}
262+
if retention > 0 {
263+
args = append(args, "RETENTION", int64(retention/time.Millisecond))
264+
}
265+
args = append(args, labelArgs...)
266+
267+
res := r.rdb.Do(r.ctx, append([]interface{}{"TS.CREATE"}, args...)...)
268+
return res.Err()
269+
}
270+
271+
// TSIncrBy increments a RedisTimeSeries key with labels and timestamp.
272+
// Uses RedisTimeSeries command TS.INCRBY.
273+
func (r *RedisClient) TSIncrBy(
274+
key string,
275+
value int64,
276+
timestamp int64,
277+
labels map[string]string,
278+
) error {
279+
// Prepare label arguments
280+
labelArgs := []interface{}{"LABELS"}
281+
for k, v := range labels {
282+
labelArgs = append(labelArgs, k, v)
283+
}
284+
285+
if timestamp == 0 {
286+
timestamp = time.Now().UnixNano() / int64(time.Millisecond)
287+
}
288+
289+
args := []interface{}{key, value, "TIMESTAMP", timestamp}
290+
args = append(args, labelArgs...)
291+
292+
cmdArgs := append([]interface{}{"TS.INCRBY"}, args...)
293+
res := r.rdb.Do(r.ctx, cmdArgs...)
294+
return res.Err()
295+
}
296+
297+
// SafeTSIncrBy ensures that a TS key exists and increments it safely.
298+
// Automatically creates the time series if it doesn't exist.
299+
func (r *RedisClient) SafeTSIncrBy(
300+
key string,
301+
value int64,
302+
labels map[string]string,
303+
retention time.Duration,
304+
) error {
305+
timestamp := time.Now().UnixNano() / int64(time.Millisecond)
306+
307+
err := r.TSIncrBy(key, value, timestamp, labels)
308+
if err != nil && strings.Contains(err.Error(), "TSDB: key does not exist") {
309+
log.Warnf("TS key %s does not exist, creating it...", key)
310+
if err2 := r.TSCreateIfNotExists(key, labels, retention); err2 != nil {
311+
return fmt.Errorf("failed to create TS: %w", err2)
312+
}
313+
return r.TSIncrBy(key, value, timestamp, labels)
314+
}
315+
return err
316+
}
317+
318+
// DeleteKey deletes a key from Redis
319+
func (r *RedisClient) DeleteKey(key string) error {
320+
res := r.rdb.Del(r.ctx, key)
321+
return res.Err()
322+
}
323+
324+
// TSAdd adds a sample to a time series
325+
func (r *RedisClient) TSAdd(
326+
key string,
327+
value int64,
328+
timestamp int64,
329+
labels map[string]string,
330+
) error {
331+
// Prepare label arguments
332+
labelArgs := []interface{}{"LABELS"}
333+
for k, v := range labels {
334+
labelArgs = append(labelArgs, k, v)
335+
}
336+
337+
if timestamp == 0 {
338+
timestamp = time.Now().UnixNano() / int64(time.Millisecond)
339+
}
340+
341+
args := []interface{}{key, timestamp, value, "ON_DUPLICATE", "SUM"}
342+
args = append(args, labelArgs...)
343+
344+
cmdArgs := append([]interface{}{"TS.ADD"}, args...)
345+
res := r.rdb.Do(r.ctx, cmdArgs...)
346+
return res.Err()
347+
}
348+
349+
// SafeTSAdd ensures that a TS key exists and adds a sample safely
350+
func (r *RedisClient) SafeTSAdd(
351+
key string,
352+
value int64,
353+
labels map[string]string,
354+
retention time.Duration,
355+
) error {
356+
timestamp := time.Now().UnixNano() / int64(time.Millisecond)
357+
358+
err := r.TSAdd(key, value, timestamp, labels)
359+
if err != nil && strings.Contains(err.Error(), "TSDB: key does not exist") {
360+
log.Warnf("TS key %s does not exist, creating it...", key)
361+
if err2 := r.TSCreateIfNotExists(key, labels, retention); err2 != nil {
362+
return fmt.Errorf("failed to create TS: %w", err2)
363+
}
364+
return r.TSAdd(key, value, timestamp, labels)
365+
}
366+
return err
367+
}

pkg/server/errorshandler/handler.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,9 @@ func (handler *Handler) process(body []byte) ResponseMessage {
107107
// increment processed errors counter
108108
handler.ErrorsProcessed.Inc()
109109

110+
// record project metrics
111+
handler.recordProjectMetrics(projectId, "events-accepted")
112+
110113
return ResponseMessage{200, false, "OK"}
111114
}
112115

@@ -126,3 +129,38 @@ func GetQueueCache(nonDefaultQueues []string) map[string]bool {
126129
}
127130
return cache
128131
}
132+
133+
// getTimeSeriesKey generates a Redis TimeSeries key for project metrics
134+
func getTimeSeriesKey(projectId, metricType, granularity string) string {
135+
return fmt.Sprintf("ts:project-%s:%s:%s", metricType, projectId, granularity)
136+
}
137+
138+
// recordProjectMetrics records project metrics to Redis TimeSeries
139+
// metricType can be: "events-accepted", "events-rate-limited", etc.
140+
func (handler *Handler) recordProjectMetrics(projectId, metricType string) {
141+
minutelyKey := getTimeSeriesKey(projectId, metricType, "minutely")
142+
hourlyKey := getTimeSeriesKey(projectId, metricType, "hourly")
143+
dailyKey := getTimeSeriesKey(projectId, metricType, "daily")
144+
145+
labels := map[string]string{
146+
"type": "error",
147+
"status": metricType,
148+
"project": projectId,
149+
}
150+
151+
// minutely: store for 24 hours
152+
// Use TS.ADD with ON_DUPLICATE SUM to accumulate events within the same timestamp
153+
if err := handler.RedisClient.SafeTSAdd(minutelyKey, 1, labels, 24*time.Hour); err != nil {
154+
log.Errorf("failed to add minutely TS for %s: %v", metricType, err)
155+
}
156+
157+
// hourly: store for 7 days
158+
if err := handler.RedisClient.SafeTSAdd(hourlyKey, 1, labels, 7*24*time.Hour); err != nil {
159+
log.Errorf("failed to add hourly TS for %s: %v", metricType, err)
160+
}
161+
162+
// daily: store for 90 days
163+
if err := handler.RedisClient.SafeTSAdd(dailyKey, 1, labels, 90*24*time.Hour); err != nil {
164+
log.Errorf("failed to add daily TS for %s: %v", metricType, err)
165+
}
166+
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package errorshandler
2+
3+
import (
4+
"fmt"
5+
"time"
6+
7+
log "github.com/sirupsen/logrus"
8+
)
9+
10+
// GenerateTestTimeSeriesData - generates test data for minutely, hourly, and daily time series
11+
// Automatically deletes existing keys before generating new data
12+
// Usage: handler.GenerateTestTimeSeriesData(projectId)
13+
func (handler *Handler) GenerateTestTimeSeriesData(projectId string) error {
14+
metricType := "events-accepted"
15+
minutelyKey := getTimeSeriesKey(projectId, metricType, "minutely")
16+
hourlyKey := getTimeSeriesKey(projectId, metricType, "hourly")
17+
dailyKey := getTimeSeriesKey(projectId, metricType, "daily")
18+
19+
// Delete existing keys to avoid accumulation
20+
log.Infof("Deleting existing test data keys for project %s...", projectId)
21+
if err := handler.RedisClient.DeleteKey(minutelyKey); err != nil {
22+
log.Warnf("Failed to delete minutely key: %v", err)
23+
}
24+
if err := handler.RedisClient.DeleteKey(hourlyKey); err != nil {
25+
log.Warnf("Failed to delete hourly key: %v", err)
26+
}
27+
if err := handler.RedisClient.DeleteKey(dailyKey); err != nil {
28+
log.Warnf("Failed to delete daily key: %v", err)
29+
}
30+
31+
labels := map[string]string{
32+
"type": "error",
33+
"status": "test",
34+
"project": projectId,
35+
}
36+
37+
now := time.Now()
38+
39+
// Minutely data: last 24 hours (1440 minutes)
40+
log.Infof("Generating minutely test data for project %s...", projectId)
41+
minuteStart := now.Add(-24 * time.Hour)
42+
for t := minuteStart; t.Before(now); t = t.Add(1 * time.Minute) {
43+
// Hash-based pseudo-random: 0-10 events per minute with realistic peaks/valleys
44+
hash := (t.Unix() * 2654435761) ^ 0xdeadbeef
45+
eventsCount := int64((hash % 11))
46+
// Use the minute timestamp for all events in this minute
47+
// ON_DUPLICATE SUM will accumulate them
48+
timestamp := t.UnixNano() / int64(time.Millisecond)
49+
if eventsCount > 0 {
50+
if err := handler.RedisClient.TSAdd(minutelyKey, eventsCount, timestamp, labels); err != nil {
51+
return fmt.Errorf("failed to add minutely test data: %w", err)
52+
}
53+
}
54+
}
55+
56+
// Hourly data: last 7 days (168 hours)
57+
log.Infof("Generating hourly test data for project %s...", projectId)
58+
hourStart := now.Add(-7 * 24 * time.Hour)
59+
for t := hourStart; t.Before(now); t = t.Add(1 * time.Hour) {
60+
// Hash-based pseudo-random: 5-95 events per hour
61+
hash := (t.Unix() * 2654435761) ^ 0xcafebabe
62+
eventsCount := int64(5 + (hash % 90))
63+
// Use the hour timestamp for all events in this hour
64+
timestamp := t.UnixNano() / int64(time.Millisecond)
65+
if err := handler.RedisClient.TSAdd(hourlyKey, eventsCount, timestamp, labels); err != nil {
66+
return fmt.Errorf("failed to add hourly test data: %w", err)
67+
}
68+
}
69+
70+
// Daily data: last 90 days
71+
log.Infof("Generating daily test data for project %s...", projectId)
72+
dayStart := now.Add(-90 * 24 * time.Hour)
73+
for t := dayStart; t.Before(now); t = t.Add(24 * time.Hour) {
74+
// Hash-based pseudo-random: 100-1900 events per day
75+
hash := (t.Unix() * 2654435761) ^ 0xbaadf00d
76+
eventsCount := int64(100 + (hash % 1800))
77+
// Use the day timestamp for all events in this day
78+
timestamp := t.UnixNano() / int64(time.Millisecond)
79+
if err := handler.RedisClient.TSAdd(dailyKey, eventsCount, timestamp, labels); err != nil {
80+
return fmt.Errorf("failed to add daily test data: %w", err)
81+
}
82+
}
83+
84+
log.Infof("Test data generation completed for project %s", projectId)
85+
return nil
86+
}

pkg/server/server.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package server
22

33
import (
4+
"encoding/json"
45
"errors"
56
"fmt"
67
"net/http"
@@ -144,6 +145,8 @@ func (s *Server) handler(ctx *fasthttp.RequestCtx) {
144145
s.ReleaseHandler.HandleHTTP(ctx)
145146
case "/api/0/envelope/":
146147
s.ErrorsHandler.HandleSentry(ctx)
148+
// case "/test/generate-timeseries":
149+
// s.HandleGenerateTestTimeSeries(ctx)
147150
default:
148151
ctx.Error("Not found", fasthttp.StatusNotFound)
149152
}
@@ -171,3 +174,39 @@ func (s *Server) UpdateBlacklist() error {
171174

172175
return nil
173176
}
177+
178+
// HandleGenerateTestTimeSeries - endpoint for generating test time series data
179+
// POST /test/generate-timeseries with JSON body: {"projectId": "67d4adeccf25fa00ab563c32"}
180+
func (s *Server) HandleGenerateTestTimeSeries(ctx *fasthttp.RequestCtx) {
181+
if !ctx.IsPost() {
182+
ctx.Error("Method not allowed", fasthttp.StatusMethodNotAllowed)
183+
return
184+
}
185+
186+
// Parse request body
187+
var reqBody struct {
188+
ProjectId string `json:"projectId"`
189+
}
190+
191+
if err := json.Unmarshal(ctx.PostBody(), &reqBody); err != nil {
192+
ctx.Error("Invalid JSON", fasthttp.StatusBadRequest)
193+
return
194+
}
195+
196+
if reqBody.ProjectId == "" {
197+
ctx.Error("projectId is required", fasthttp.StatusBadRequest)
198+
return
199+
}
200+
201+
log.Infof("Generating test time series data for project %s", reqBody.ProjectId)
202+
203+
// Generate test data
204+
if err := s.ErrorsHandler.GenerateTestTimeSeriesData(reqBody.ProjectId); err != nil {
205+
log.Errorf("Failed to generate test data: %v", err)
206+
ctx.Error(fmt.Sprintf("Failed to generate test data: %v", err), fasthttp.StatusInternalServerError)
207+
return
208+
}
209+
210+
ctx.SetStatusCode(fasthttp.StatusOK)
211+
ctx.SetBodyString(`{"success": true, "message": "Test data generated successfully"}`)
212+
}

0 commit comments

Comments
 (0)