|
| 1 | +package main |
| 2 | + |
| 3 | +import ( |
| 4 | + "bufio" |
| 5 | + "bytes" |
| 6 | + "encoding/json" |
| 7 | + "fmt" |
| 8 | + "github.com/smartcontractkit/chainlink-testing-framework/framework" |
| 9 | + "io" |
| 10 | + "net/http" |
| 11 | + "os" |
| 12 | + "path/filepath" |
| 13 | + "sync" |
| 14 | + "time" |
| 15 | + |
| 16 | + "go.uber.org/ratelimit" |
| 17 | +) |
| 18 | + |
| 19 | +var L = framework.L |
| 20 | + |
| 21 | +// LokiPushRequest represents the payload format expected by Loki's push API. |
| 22 | +type LokiPushRequest struct { |
| 23 | + Streams []LokiStream `json:"streams"` |
| 24 | +} |
| 25 | + |
| 26 | +// LokiStream represents one log stream. |
| 27 | +type LokiStream struct { |
| 28 | + Stream map[string]string `json:"stream"` |
| 29 | + Values [][2]string `json:"values"` |
| 30 | +} |
| 31 | + |
| 32 | +const ( |
| 33 | + lokiURL = "http://localhost:3030/loki/api/v1/push" |
| 34 | + grafanaURL = "http://localhost:3000/explore?panes=%7B%22V0P%22:%7B%22datasource%22:%22P8E80F9AEF21F6940%22,%22queries%22:%5B%7B%22refId%22:%22A%22,%22expr%22:%22%7Bjob%3D%5C%22" |
| 35 | + grafanaURL2 = "%5C%22%7D%22,%22queryType%22:%22range%22,%22datasource%22:%7B%22type%22:%22loki%22,%22uid%22:%22P8E80F9AEF21F6940%22%7D,%22editorMode%22:%22code%22%7D%5D,%22range%22:%7B%22from%22:%22now-6h%22,%22to%22:%22now%22%7D%7D%7D&schemaVersion=1&orgId=1" |
| 36 | +) |
| 37 | + |
| 38 | +// processAndUploadDir traverses the given directory recursively and |
| 39 | +// processes every file (ignoring directories) by calling processAndUploadLog. |
| 40 | +func processAndUploadDir(dirPath string, limiter ratelimit.Limiter, chunks int, jobID string) error { |
| 41 | + return filepath.Walk(dirPath, func(path string, info os.FileInfo, err error) error { |
| 42 | + if err != nil { |
| 43 | + L.Error().Err(err).Msgf("Error accessing %s", path) |
| 44 | + return nil |
| 45 | + } |
| 46 | + if info.IsDir() { |
| 47 | + return nil |
| 48 | + } |
| 49 | + L.Info().Msgf("Processing file: %s", path) |
| 50 | + f, err := os.Open(path) |
| 51 | + if err != nil { |
| 52 | + L.Error().Err(err).Msgf("Error opening file %s", path) |
| 53 | + return nil |
| 54 | + } |
| 55 | + defer f.Close() |
| 56 | + |
| 57 | + if err := processAndUploadLog(path, f, limiter, chunks, jobID); err != nil { |
| 58 | + L.Error().Err(err).Msgf("Error processing file %s", path) |
| 59 | + // Continue processing other files even if one fails. |
| 60 | + } |
| 61 | + return nil |
| 62 | + }) |
| 63 | +} |
| 64 | + |
| 65 | +// processAndUploadLog reads log lines from the provided reader, |
| 66 | +// splits them into chunks (if more than 10,000 lines) and uploads each chunk concurrently. |
| 67 | +func processAndUploadLog(source string, r io.Reader, limiter ratelimit.Limiter, chunks int, jobID string) error { |
| 68 | + scanner := bufio.NewScanner(r) |
| 69 | + var values [][2]string |
| 70 | + baseTime := time.Now() |
| 71 | + |
| 72 | + // Read all log lines; each line gets a unique timestamp. |
| 73 | + for scanner.Scan() { |
| 74 | + line := scanner.Text() |
| 75 | + ts := baseTime.UnixNano() |
| 76 | + values = append(values, [2]string{fmt.Sprintf("%d", ts), line}) |
| 77 | + baseTime = baseTime.Add(time.Nanosecond) |
| 78 | + } |
| 79 | + if err := scanner.Err(); err != nil { |
| 80 | + return fmt.Errorf("error scanning logs from %s: %w", source, err) |
| 81 | + } |
| 82 | + |
| 83 | + totalLines := len(values) |
| 84 | + if totalLines == 0 { |
| 85 | + L.Info().Msgf("No log lines found in %s", source) |
| 86 | + return nil |
| 87 | + } |
| 88 | + // Use one chunk if there are 10,000 or fewer lines. |
| 89 | + if totalLines <= 10000 { |
| 90 | + chunks = 1 |
| 91 | + } |
| 92 | + if chunks > totalLines { |
| 93 | + chunks = totalLines |
| 94 | + } |
| 95 | + chunkSize := totalLines / chunks |
| 96 | + remainder := totalLines % chunks |
| 97 | + L.Debug().Int("total_lines", totalLines). |
| 98 | + Int("chunks", chunks). |
| 99 | + Msgf("Starting chunk processing for %s", source) |
| 100 | + var wg sync.WaitGroup |
| 101 | + errCh := make(chan error, chunks) |
| 102 | + start := 0 |
| 103 | + for i := 0; i < chunks; i++ { |
| 104 | + extra := 0 |
| 105 | + if i < remainder { |
| 106 | + extra = 1 |
| 107 | + } |
| 108 | + end := start + chunkSize + extra |
| 109 | + chunkValues := values[start:end] |
| 110 | + startLine := start + 1 |
| 111 | + endLine := end |
| 112 | + start = end |
| 113 | + |
| 114 | + // Use the unique jobID as the "job" label. |
| 115 | + labels := map[string]string{ |
| 116 | + "job": jobID, |
| 117 | + "chunk": fmt.Sprintf("%d", i+1), |
| 118 | + "source": source, |
| 119 | + } |
| 120 | + reqBody := LokiPushRequest{ |
| 121 | + Streams: []LokiStream{ |
| 122 | + { |
| 123 | + Stream: labels, |
| 124 | + Values: chunkValues, |
| 125 | + }, |
| 126 | + }, |
| 127 | + } |
| 128 | + data, err := json.Marshal(reqBody) |
| 129 | + if err != nil { |
| 130 | + return fmt.Errorf("error marshaling JSON for chunk %d: %w", i+1, err) |
| 131 | + } |
| 132 | + chunkMB := float64(len(data)) / (1024 * 1024) |
| 133 | + L.Debug().Int("chunk", i+1). |
| 134 | + Float64("chunk_size_MB", chunkMB). |
| 135 | + Int("start_line", startLine). |
| 136 | + Int("end_line", endLine). |
| 137 | + Msg("Prepared chunk for upload") |
| 138 | + |
| 139 | + wg.Add(1) |
| 140 | + go func(chunkNum, sLine, eLine int, payload []byte, sizeMB float64) { |
| 141 | + defer wg.Done() |
| 142 | + const maxRetries = 50 |
| 143 | + const retryDelay = 1 * time.Second |
| 144 | + |
| 145 | + var resp *http.Response |
| 146 | + var attempt int |
| 147 | + var err error |
| 148 | + for attempt = 1; attempt <= maxRetries; attempt++ { |
| 149 | + limiter.Take() |
| 150 | + resp, err = http.Post(lokiURL, "application/json", bytes.NewReader(payload)) |
| 151 | + if err != nil { |
| 152 | + L.Error().Err(err).Int("attempt", attempt). |
| 153 | + Int("chunk", chunkNum). |
| 154 | + Float64("chunk_size_MB", sizeMB). |
| 155 | + Msg("Error sending POST request") |
| 156 | + time.Sleep(retryDelay) |
| 157 | + continue |
| 158 | + } |
| 159 | + |
| 160 | + body, _ := io.ReadAll(resp.Body) |
| 161 | + resp.Body.Close() |
| 162 | + |
| 163 | + if resp.StatusCode == 429 { |
| 164 | + L.Debug().Int("attempt", attempt). |
| 165 | + Int("chunk", chunkNum). |
| 166 | + Float64("chunk_size_MB", sizeMB). |
| 167 | + Msg("Received 429, retrying...") |
| 168 | + time.Sleep(retryDelay) |
| 169 | + continue |
| 170 | + } |
| 171 | + |
| 172 | + if resp.StatusCode/100 != 2 { |
| 173 | + err = fmt.Errorf("loki error: %s - %s", resp.Status, body) |
| 174 | + L.Error().Err(err).Int("chunk", chunkNum). |
| 175 | + Float64("chunk_size_MB", sizeMB). |
| 176 | + Msg("Chunk upload failed") |
| 177 | + time.Sleep(retryDelay) |
| 178 | + continue |
| 179 | + } |
| 180 | + |
| 181 | + L.Info().Int("chunk", chunkNum). |
| 182 | + Float64("chunk_size_MB", sizeMB). |
| 183 | + Msg("Successfully uploaded chunk") |
| 184 | + return |
| 185 | + } |
| 186 | + errCh <- fmt.Errorf("max retries reached for chunk %d; last error: %v", chunkNum, err) |
| 187 | + }(i+1, startLine, endLine, data, chunkMB) |
| 188 | + } |
| 189 | + |
| 190 | + wg.Wait() |
| 191 | + close(errCh) |
| 192 | + if len(errCh) > 0 { |
| 193 | + return <-errCh |
| 194 | + } |
| 195 | + |
| 196 | + return nil |
| 197 | +} |
0 commit comments