diff --git a/src/go/cmd/strelka-frontend/main.go b/src/go/cmd/strelka-frontend/main.go index 2a695ce4..5a1cb99f 100644 --- a/src/go/cmd/strelka-frontend/main.go +++ b/src/go/cmd/strelka-frontend/main.go @@ -62,12 +62,7 @@ var ( loggerOnce sync.Once ) -func DebugLog(msg string, fields ...zap.Field) { - // The value here doesn't matter, we'll just check for its existence - if _, enableDebugLogging := os.LookupEnv("ENABLE_VERBOSE_LOGGING"); !enableDebugLogging { - return - } - +func getLogger() *zap.Logger { loggerOnce.Do(func() { encoderConfig := zap.NewProductionEncoderConfig() encoderConfig.MessageKey = "message" @@ -88,8 +83,29 @@ func DebugLog(msg string, fields ...zap.Field) { loggerInstance = logger }) + return loggerInstance +} + +func Infof(format string, v ...any) { + logger := getLogger() + if logger != nil { + logger.Sugar().Infof(format, v...) + } else { + log.Printf(format, v...) + } +} + +func Trace(msg string, fields ...zap.Field) { + // The value here doesn't matter, we'll just check for its existence + if _, enableTraceLogging := os.LookupEnv("ENABLE_TRACE_LOGGING"); !enableTraceLogging { + return + } + + traceMsg := fmt.Sprintf("[TRACE] %s", msg) if loggerInstance != nil { - loggerInstance.Info(msg, fields...) + loggerInstance.Info(traceMsg, fields...) + } else { + log.Println(traceMsg) } } @@ -217,7 +233,7 @@ func (s *server) ScanFile(stream strelka.Frontend_ScanFileServer) error { } } - startQueueTask := time.Now() + startQueueScanFileTaskTime := time.Now() if err := s.coordinator.cli.ZAdd( stream.Context(), "tasks", @@ -226,19 +242,19 @@ func (s *server) ScanFile(stream strelka.Frontend_ScanFileServer) error { Member: id, }, ).Err(); err != nil { - DebugLog("[VERBOSE] Failed to queue task", + Trace("ScanFile task failed to queue", zap.String("request_id", req.Id), zap.String("strelka_id", id), - zap.Duration("took", time.Since(startQueueTask)), + zap.Int64("queue_time_ms", time.Since(startQueueScanFileTaskTime).Milliseconds()), zap.Error(err), ) return fmt.Errorf("sending task: %w", err) } else { - DebugLog("[VERBOSE] Queued task", + Trace("ScanFile task queued", zap.String("request_id", req.Id), zap.String("strelka_id", id), - zap.Time("deadline", deadline), - zap.Duration("took", time.Since(startQueueTask)), + zap.Int64("deadline", deadline.Unix()), + zap.Int64("queue_time_ms", time.Since(startQueueScanFileTaskTime).Milliseconds()), ) } @@ -249,21 +265,36 @@ func (s *server) ScanFile(stream strelka.Frontend_ScanFileServer) error { (*tx).Del(stream.Context(), sha) } - startAwaitResponse := time.Now() + startResponseLoopTime := time.Now() + lastResponseTime := time.Now() + redisErrorCount := 0 for { if err := stream.Context().Err(); err != nil { - DebugLog("[VERBOSE] Stream error", + Trace("stream closed", zap.String("request_id", req.Id), zap.String("strelka_id", id), - zap.Duration("waited", time.Since(startAwaitResponse)), + zap.Int64("since_last_response_ms", time.Since(lastResponseTime).Milliseconds()), + zap.Int64("since_entering_loop_ms", time.Since(startResponseLoopTime).Milliseconds()), + zap.Int("redis_error_count", redisErrorCount), zap.Error(err), ) return fmt.Errorf("context closed: %w", err) } + startPopTime := time.Now() res, err := s.coordinator.cli.BLPop(stream.Context(), 5*time.Second, keye).Result() if err != nil { if err != redis.Nil { + redisErrorCount += 1 + Trace("error performing response pop", + zap.String("request_id", req.Id), + zap.String("strelka_id", id), + zap.Int64("since_last_response_ms", time.Since(lastResponseTime).Milliseconds()), + zap.Int64("since_entering_loop_ms", time.Since(startResponseLoopTime).Milliseconds()), + zap.Int64("response_pop_took_ms", time.Since(startPopTime).Milliseconds()), + zap.Int("redis_error_count", redisErrorCount), + zap.Error(err), + ) // Delay to prevent fast looping over errors time.Sleep(250 * time.Millisecond) } @@ -276,19 +307,23 @@ func (s *server) ScanFile(stream strelka.Frontend_ScanFileServer) error { lpop := res[1] if lpop == "FIN" { - DebugLog("[VERBOSE] FIN response received", + Trace("FIN response received", zap.String("request_id", req.Id), zap.String("strelka_id", id), - zap.Duration("waited", time.Since(startAwaitResponse)), + zap.Int64("since_last_response_ms", time.Since(lastResponseTime).Milliseconds()), + zap.Int64("since_entering_loop_ms", time.Since(startResponseLoopTime).Milliseconds()), + zap.Int("redis_error_count", redisErrorCount), ) break } - DebugLog("[VERBOSE] Response received", + Trace("response received", zap.String("request_id", req.Id), zap.String("strelka_id", id), - zap.Duration("waited", time.Since(startAwaitResponse)), + zap.Int64("since_last_response_ms", time.Since(lastResponseTime).Milliseconds()), + zap.Int64("since_entering_loop_ms", time.Since(startResponseLoopTime).Milliseconds()), ) + lastResponseTime = time.Now() if tx != nil { (*tx).RPush(stream.Context(), sha, lpop) @@ -318,20 +353,20 @@ func (s *server) ScanFile(stream strelka.Frontend_ScanFileServer) error { s.responses <- resp - startSendResponse := time.Now() + startSendResponseTime := time.Now() if err := stream.Send(resp); err != nil { - DebugLog("[VERBOSE] Error sending response", + Trace("error sending response to caller", zap.String("request_id", req.Id), zap.String("strelka_id", id), - zap.Duration("took", time.Since(startSendResponse)), + zap.Int64("send_response_took_ms", time.Since(startSendResponseTime).Milliseconds()), zap.Error(err), ) return fmt.Errorf("send stream: %w", err) } else { - DebugLog("[VERBOSE] Sent response", + Trace("response sent to caller", zap.String("request_id", req.Id), zap.String("strelka_id", id), - zap.Duration("took", time.Since(startSendResponse)), + zap.Int64("send_response_took_ms", time.Since(startSendResponseTime).Milliseconds()), ) } } @@ -416,7 +451,7 @@ func (s *server) CompileYara(stream strelka.Frontend_CompileYaraServer) error { if err != nil { if err != redis.Nil { // Delay to prevent fast looping over errors - log.Printf("err: %v\n", err) + Infof("err: %v\n", err) time.Sleep(250 * time.Millisecond) } continue @@ -536,7 +571,7 @@ func (s *server) SyncYara(stream strelka.Frontend_SyncYaraServer) error { if err != nil { if err != redis.Nil { // Delay to prevent fast looping over errors - log.Printf("err: %v\n", err) + Infof("err: %v\n", err) time.Sleep(250 * time.Millisecond) } continue @@ -684,7 +719,7 @@ func (s *server) SyncYaraV2(stream strelka.Frontend_SyncYaraV2Server) error { if err != nil { if err != redis.Nil { // Delay to prevent fast looping over errors - log.Printf("err: %v\n", err) + Infof("err: %v\n", err) time.Sleep(250 * time.Millisecond) } continue @@ -870,17 +905,17 @@ func main() { go func() { rpc.LogResponses(responses, conf.Response.Log) }() - log.Printf("responses will be logged to %v", conf.Response.Log) + Infof("responses will be logged to %v", conf.Response.Log) } else if conf.Response.Report != 0 { go func() { rpc.ReportResponses(responses, conf.Response.Report) }() - log.Printf("responses will be reported every %v", conf.Response.Report) + Infof("responses will be reported every %v", conf.Response.Report) } else { go func() { rpc.DiscardResponses(responses) }() - log.Println("responses will be discarded") + Infof("responses will be discarded") } cd := redis.NewClient(&redis.Options{ @@ -923,16 +958,16 @@ func main() { } go func() { - log.Printf("Waiting for shutdown\n") + Infof("Waiting for shutdown\n") <-shutdownWorkersSig st := time.Now() - log.Printf("Received shutdown signal, attempting graceful shutdown\n") + Infof("Received shutdown signal, attempting graceful shutdown\n") s.GracefulStop() - log.Printf("Graceful shutdown completed in %v\n", time.Since(st)) + Infof("Graceful shutdown completed in %v\n", time.Since(st)) }() strelka.RegisterFrontendServer(s, opts) grpc_health_v1.RegisterHealthServer(s, opts) err = s.Serve(listen) - log.Printf("Shutting down. Serve err: %v\n", err) + Infof("Shutting down. Serve err: %v\n", err) } diff --git a/src/python/bin/strelka-backend b/src/python/bin/strelka-backend index c0b95cb2..7d7875e1 100644 --- a/src/python/bin/strelka-backend +++ b/src/python/bin/strelka-backend @@ -33,20 +33,33 @@ from pythonjsonlogger.json import JsonFormatter shutdown_event = threading.Event() -def init_verbose_logging(): - if 'ENABLE_VERBOSE_LOGGING' in os.environ: - logger = logging.getLogger('debugging') - logger.setLevel(logging.INFO) - logHandler = logging.StreamHandler() - formatter = JsonFormatter('%(message)s %(levelname)s %(asctime)s') - logHandler.setFormatter(formatter) - logger.addHandler(logHandler) +def enable_json_logging(): + # Get root logger + logger = logging.getLogger() + # Log to stdout (it logs to stderr by default) + handler = logging.StreamHandler(stream=sys.stdout) -def debug_log(msg, extra=None): - if 'ENABLE_VERBOSE_LOGGING' in os.environ: - logger = logging.getLogger('debugging') - logger.info(msg, extra=extra) + # Set up the JSON formatter for the handler + formatter = JsonFormatter('%(message)s %(levelname)s %(asctime)s') + handler.setFormatter(formatter) + + # Override the default handlers and attach our JSON handler + logger.handlers = [] + logger.addHandler(handler) + + +def trace(msg, extra=None): + if 'ENABLE_TRACE_LOGGING' in os.environ: + logging.info(f'[TRACE] {msg}', extra=extra) + + +def trace_scanner(scanner, msg, extra=None): + if 'ENABLE_SCANNER_TRACE_LOGGING' in os.environ: + logging.info(f'[TRACE][SCANNER] {msg}', extra={ + 'scanner': scanner, + **extra + }) class Backend(object): @@ -100,9 +113,14 @@ class Backend(object): if time.time() >= work_expire: break + start_pop_time = datetime.now() task = self.coordinator.bzpopmin(['tasks', 'tasks_compile_yara', 'tasks_compile_and_sync_yara'], timeout=5) + end_pop_time = datetime.now() + receive_time_ms = (end_pop_time - start_pop_time).total_seconds() * 1000 if task is None: - debug_log('[VERBOSE] Received no tasks') + trace('received no tasks', extra={ + 'receive_time_ms': receive_time_ms + }) continue (queue_name, root_id, expire_at) = task @@ -111,9 +129,10 @@ class Backend(object): timeout = math.ceil(expire_at - time.time()) if timeout <= 0: - debug_log('[VERBOSE] Received expired task', extra={ + trace('received expired task', extra={ 'strelka_id': root_id, - 'expires_at': expire_at + 'deadline': expire_at, + 'receive_time_ms': receive_time_ms }) continue @@ -124,39 +143,39 @@ class Backend(object): break if queue_name == b'tasks': - debug_log('[VERBOSE] Received task', extra={ + trace('received scan file task', extra={ 'strelka_id': root_id, - 'expires_at': expire_at + 'deadline': expire_at, + 'receive_time_ms': receive_time_ms }) file = strelka.File(pointer=root_id) try: with interruptingcow.timeout(timeout, strelka.RequestTimeout): - start_distribute = datetime.now() + start_scan_time = datetime.now() self.distribute(root_id, file, expire_at) - - end_distribute = datetime.now() - debug_log('[VERBOSE] Scanning complete', extra={ + end_scan_time = datetime.now() + trace('full scan complete', extra={ 'strelka_id': root_id, - 'expires_at': expire_at, - 'took_ms': (end_distribute - start_distribute).total_seconds() * 1000 + 'deadline': expire_at, + 'full_scan_took_ms': (end_scan_time - start_scan_time).total_seconds() * 1000 }) - start_emit_result = datetime.now() + start_send_fin_time = datetime.now() p = self.coordinator.pipeline(transaction=False) p.rpush(f'event:{root_id}', 'FIN') p.expireat(f'event:{root_id}', expire_at) p.execute() - end_emit_result = datetime.now() - debug_log('[VERBOSE] Result event emitted', extra={ + end_send_fin_time = datetime.now() + trace('FIN event emitted', extra={ 'strelka_id': root_id, - 'expires_at': expire_at, - 'took_ms': (end_emit_result - start_emit_result).total_seconds() * 1000 + 'deadline': expire_at, + 'fin_emit_took_ms': (end_send_fin_time - start_send_fin_time).total_seconds() * 1000 }) except strelka.RequestTimeout: - debug_log('[VERBOSE] Task timeout', extra={ + trace('scan timed out', extra={ 'strelka_id': root_id }) logging.debug(f'request {root_id} timed out') @@ -315,12 +334,15 @@ class Backend(object): try: with interruptingcow.timeout(self.limits.get('distribution'), exception=strelka.DistributionTimeout): + start_file_scan_time = datetime.now() if file.depth > self.limits.get('max_depth'): logging.info(f'request {root_id} exceeded maximum depth') return data = b'' legacy_yara_data = b'' + + start_pop_data_time = datetime.now() while 1: pop = self.coordinator.lpop(f'data:{file.pointer}') if pop is None: @@ -332,6 +354,7 @@ class Backend(object): # take this path, and we wish to evaluate each against the # same set of yara rules. legacy_yara_data = self.coordinator.get(f'yara:{root_id}') # backcompat + end_pop_data_time = datetime.now() file.add_flavors({'mime': self.taste_mime(data)}) file.add_flavors({'yara': self.taste_yara(data)}) @@ -382,9 +405,9 @@ class Backend(object): for scanner in scanner_list: try: + start_scanner_time = datetime.now() name = scanner['name'] options = scanner.get('options', {}) - if name == 'ScanYara': yara_cache_key = self.coordinator.get(f'yara_cache_key:{root_id}') if yara_cache_key: @@ -417,6 +440,12 @@ class Backend(object): **scan, **s, } + end_scanner_time = datetime.now() + trace_scanner(name, 'scan completed', extra={ + 'strelka_id': root_id, + 'deadline': expire_at, + 'scanner_took_ms': (end_scanner_time - start_scanner_time).total_seconds() * 1000 + }) except ModuleNotFoundError: logging.exception(f'scanner {name} not found') @@ -429,10 +458,20 @@ class Backend(object): **{'scan': scan}, **{'backend': {'release_version': os.environ.get('RELEASE_VERSION', '')}}, } + end_scan_file_time = datetime.now() + start_emit_result_time = datetime.now() p.rpush(f'event:{root_id}', strelka.format_event(event)) p.expireat(f'event:{root_id}', expire_at) p.execute() + end_emit_result_time = datetime.now() + trace('file scan complete', extra={ + 'strelka_id': root_id, + 'deadline': expire_at, + 'data_collection_took_ms': (end_pop_data_time - start_pop_data_time).total_seconds() * 1000, + 'file_scan_took_ms': (end_scan_file_time - start_file_scan_time).total_seconds() * 1000, + 'result_emit_took_ms': (end_emit_result_time - start_emit_result_time).total_seconds() * 1000 + }) except strelka.DistributionTimeout: logging.exception(f'node {file.uid} timed out') @@ -532,7 +571,9 @@ def main(): log_cfg_path = backend_cfg.get('logging_cfg') with open(log_cfg_path) as f: logging.config.dictConfig(yaml.safe_load(f.read())) - init_verbose_logging() + if 'ENABLE_JSON_LOGGING' in os.environ: + enable_json_logging() + logging.info(f'using backend configuration {backend_cfg_path}') try: