Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 62 additions & 111 deletions internal/certificatetransparency/ct-watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (w *Watcher) watchNewLogs() {
}
}

// updateLogs checks the transparency log list for new logs and adds new 0workers for those to the watcher.
// updateLogs checks the transparency log list for new logs and adds new workers for those to the watcher.
func (w *Watcher) updateLogs() {
// Get a list of urls of all CT logs
logList, err := getAllLogs()
Expand All @@ -105,78 +105,90 @@ func (w *Watcher) updateLogs() {
return
}

w.addNewlyAvailableLogs(logList)

if *config.AppConfig.General.DropOldLogs {
w.dropRemovedLogs(logList)
}
}

// addNewlyAvailableLogs checks the transparency log list for new Log servers and adds workers for those to the watcher.
func (w *Watcher) addNewlyAvailableLogs(logList loglist3.LogList) {
log.Println("Checking for new ct logs...")

// Track all URLs that should be monitored after reconciliation
monitoredURLs := make(map[string]struct{})
newCTs := 0

w.workersMu.Lock()
defer w.workersMu.Unlock()
newCTs := 0

// Check the ct log list for new, unwatched logs
// For each CT log, create a worker and start downloading certs
for _, operator := range logList.Operators {
// Iterate over each log of the operator
for _, transparencyLog := range operator.Logs {
newURL := normalizeCtlogURL(transparencyLog.URL)
url := transparencyLog.URL
desc := transparencyLog.Description
normURL := normalizeCtlogURL(url)

if transparencyLog.State.LogStatus() == loglist3.RetiredLogStatus {
log.Printf("Skipping retired CT log: %s\n", newURL)
log.Printf("Skipping retired CT log: %s\n", normURL)
continue
}

// Check if the log is already being watched
alreadyWatched := false

for _, ctWorker := range w.workers {
workerURL := normalizeCtlogURL(ctWorker.ctURL)
if workerURL == newURL {
alreadyWatched = true
break
}
monitoredURLs[normURL] = struct{}{}
if w.addLogIfNew(operator.Name, desc, url) {
newCTs++
}
}
}

// If the log is already being watched, continue
if alreadyWatched {
continue
}
log.Printf("New ct logs found: %d\n", newCTs)

w.wg.Add(1)
newCTs++

// Metrics are initialized with 0.
// Only if recovery is enabled, it is initialized with the last saved index.
lastCTIndex := metrics.GetCTIndex(normalizeCtlogURL(transparencyLog.URL))
ctWorker := worker{
name: transparencyLog.Description,
operatorName: operator.Name,
ctURL: transparencyLog.URL,
entryChan: w.certChan,
ctIndex: lastCTIndex,
// Optionally stop workers for logs not in the monitoredURLs set
if *config.AppConfig.General.DropOldLogs {
removed := 0
for _, ctWorker := range w.workers {
normURL := normalizeCtlogURL(ctWorker.ctURL)
if _, ok := monitoredURLs[normURL]; !ok {
log.Printf("Stopping worker. CT URL not found in LogList or retired: '%s'\n", ctWorker.ctURL)
ctWorker.stop()
removed++
}
w.workers = append(w.workers, &ctWorker)
metrics.Init(operator.Name, normalizeCtlogURL(transparencyLog.URL))

// Start a goroutine for each worker
go func() {
defer w.wg.Done()
ctWorker.startDownloadingCerts(w.context)
w.discardWorker(&ctWorker)
}()
}
log.Printf("Removed ct logs: %d\n", removed)
}

log.Printf("New ct logs found: %d\n", newCTs)
log.Printf("Currently monitored ct logs: %d\n", len(w.workers))
}

// addLogIfNew checks if a log is already being watched and adds it if not.
// Returns true if a new log was added, false otherwise.
func (w *Watcher) addLogIfNew(operatorName, description, url string) bool {
normURL := normalizeCtlogURL(url)

// Check if the log is already being watched
for _, ctWorker := range w.workers {
workerURL := normalizeCtlogURL(ctWorker.ctURL)
if workerURL == normURL {
return false
}
}

// Log is not being watched, so add it
w.wg.Add(1)

lastCTIndex := metrics.GetCTIndex(normURL)
ctWorker := worker{
name: description,
operatorName: operatorName,
ctURL: url,
entryChan: w.certChan,
ctIndex: lastCTIndex,
}
w.workers = append(w.workers, &ctWorker)
metrics.Init(operatorName, normURL)

// Start a goroutine for each worker
go func() {
defer w.wg.Done()
ctWorker.startDownloadingCerts(w.context)
w.discardWorker(&ctWorker)
}()

return true
}

// discardWorker removes a worker from the watcher's list of workers.
// This needs to be done when a worker stops.
func (w *Watcher) discardWorker(worker *worker) {
Expand All @@ -193,67 +205,6 @@ func (w *Watcher) discardWorker(worker *worker) {
}
}

// dropRemovedLogs checks if any of the currently monitored logs are no longer in the log list or are retired.
// If they are not, the CT Logs are probably no longer relevant and the corresponding workers will be stopped.
func (w *Watcher) dropRemovedLogs(logList loglist3.LogList) {
removedCTs := 0

// Iterate over all workers and check if they are still in the logList
// If they are not, the CT Logs are probably no longer relevant.
// We should stop the worker if that didn't already happen.
for _, ctWorker := range w.workers {
workerURL := normalizeCtlogURL(ctWorker.ctURL)

onLogList := false
for _, operator := range logList.Operators {
if ctWorker.operatorName != operator.Name {
// This operator is not the one we're looking for
continue
}

// Iterate over each log of the operator
for _, transparencyLog := range operator.Logs {
// Remove retired logs from the list
if transparencyLog.State.LogStatus() == loglist3.RetiredLogStatus {
// Skip retired logs
continue
}

// Check if the log is already being watched
logListURL := normalizeCtlogURL(transparencyLog.URL)
if workerURL == logListURL {
onLogList = true
break
}
}

// Prevent further loop iterations
if onLogList {
break
}
}

// Make sure to not drop logs that are defined locally in the additional logs list
for _, additionalLogConfig := range config.AppConfig.General.AdditionalLogs {
additionalLogListURL := normalizeCtlogURL(additionalLogConfig.URL)
if workerURL == additionalLogListURL {
onLogList = true
break
}
}

// If the log is not in the loglist, stop the worker
if !onLogList {
log.Printf("Stopping worker. CT URL not found in LogList or retired: '%s'\n", ctWorker.ctURL)
removedCTs++
ctWorker.stop()
}
}

log.Printf("Removed ct logs: %d\n", removedCTs)
log.Printf("Currently monitored ct logs: %d\n", len(w.workers))
}

// Stop stops the watcher.
func (w *Watcher) Stop() {
log.Printf("Stopping watcher\n")
Expand Down
Loading