Skip to content

Commit 43f5ae2

Browse files
committed
Poll when reached end of log
1 parent 189b9ed commit 43f5ae2

File tree

1 file changed

+32
-9
lines changed

1 file changed

+32
-9
lines changed

cmd/ctmon-ingest/main.go

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ const (
9090
dbBatchSize = 2000 // Number of entries to batch for database insertion
9191
dbBatchTimeout = 5 * time.Second // Max time to wait before flushing a partial batch
9292
logChannelBuffer = 5000 // Buffer size for the log entry channel
93+
pollingInterval = 5 * time.Second // Interval to poll when log reaches its end
9394
)
9495

9596
// CircuitBreaker tracks database connection health
@@ -135,13 +136,23 @@ func isRetryableError(err error) bool {
135136
strings.Contains(errorStr, "connection reset") ||
136137
strings.Contains(errorStr, "temporary failure") ||
137138
strings.Contains(errorStr, "i/o timeout") ||
138-
strings.Contains(errorStr, "network is unreachable")
139+
strings.Contains(errorStr, "network is unreachable") ||
140+
strings.Contains(errorStr, "context deadline exceeded") ||
141+
strings.HasPrefix(errorStr, "retryable http error ")
139142
}
140143

141144
func isRetryableHTTPStatus(statusCode int) bool {
142145
return statusCode >= 500 || statusCode == 429 || statusCode == 408
143146
}
144147

148+
func isEndOfLogError(err error) bool {
149+
if err == nil {
150+
return false
151+
}
152+
errorStr := err.Error()
153+
return strings.Contains(errorStr, "400 Bad Request")
154+
}
155+
145156
func fetchSTH(client *http.Client, logURL string) (*STHResponse, error) {
146157
if !strings.HasSuffix(logURL, "/") {
147158
logURL += "/"
@@ -198,7 +209,11 @@ func fetchEntriesWithRetry(client *http.Client, logURL string, start, end int64)
198209
break
199210
}
200211

201-
// Check if error is retryable
212+
// Check if error is retryable or end-of-log condition
213+
if isEndOfLogError(err) {
214+
// This is end-of-log, don't retry but return special error type
215+
return nil, fmt.Errorf("end_of_log: %w", err)
216+
}
202217
if !isRetryableError(err) {
203218
log.Printf("Non-retryable error, giving up: %v", err)
204219
break
@@ -415,7 +430,8 @@ func isRetryableDBError(err error) bool {
415430
strings.Contains(errorStr, "broken pipe") ||
416431
strings.Contains(errorStr, "connection lost") ||
417432
strings.Contains(errorStr, "server is not ready") ||
418-
strings.Contains(errorStr, "too many connections")
433+
strings.Contains(errorStr, "too many connections") ||
434+
strings.Contains(errorStr, "context deadline exceeded")
419435
}
420436

421437
func boolToUint8(b bool) uint8 {
@@ -782,17 +798,24 @@ func main() {
782798
log.Printf("Fetching entries from %s: %d to %d (batch size %d)", logID, currentIndex, endIndex, currentBatchSize)
783799

784800
getEntriesResp, err := fetchEntriesWithRetry(client, *logURLFlag, currentIndex, endIndex)
785-
if err != nil {
801+
if err != nil || len(getEntriesResp.Entries) == 0 {
802+
// Check if this is an end-of-log condition
803+
if (getEntriesResp != nil && len(getEntriesResp.Entries) == 0) || strings.Contains(err.Error(), "end_of_log:") {
804+
log.Printf("Reached end of log at index %d. Polling every %v for new entries...", currentIndex, pollingInterval)
805+
// Wait and then continue the loop to try again
806+
select {
807+
case <-time.After(pollingInterval):
808+
continue
809+
case <-done:
810+
log.Printf("Received shutdown signal during polling, stopping...")
811+
return
812+
}
813+
}
786814
log.Printf("Error fetching entries %d-%d after all retries: %v", currentIndex, endIndex, err)
787815
// On fetch error, we'll stop the main loop
788816
return
789817
}
790818

791-
if len(getEntriesResp.Entries) == 0 {
792-
log.Printf("No more entries returned by log at index %d. Stopping.", currentIndex)
793-
return
794-
}
795-
796819
for i, rawEntry := range getEntriesResp.Entries {
797820
entryActualIndex := currentIndex + int64(i)
798821
details, err := parseLogEntry(rawEntry, logID, entryActualIndex)

0 commit comments

Comments
 (0)