Skip to content

Commit 746c951

Browse files
authored
fix(resource-manager): ensure ingestion and media queues are always flushed (#1401)
1 parent 2470af7 commit 746c951

File tree

1 file changed

+5
-9
lines changed

1 file changed

+5
-9
lines changed

langfuse/_client/resource_manager.py

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -398,11 +398,9 @@ def _stop_and_join_consumer_threads(self) -> None:
398398

399399
def flush(self) -> None:
400400
tracer_provider = cast(TracerProvider, otel_trace_api.get_tracer_provider())
401-
if isinstance(tracer_provider, otel_trace_api.ProxyTracerProvider):
402-
return
403-
404-
tracer_provider.force_flush()
405-
langfuse_logger.debug("Successfully flushed OTEL tracer provider")
401+
if not isinstance(tracer_provider, otel_trace_api.ProxyTracerProvider):
402+
tracer_provider.force_flush()
403+
langfuse_logger.debug("Successfully flushed OTEL tracer provider")
406404

407405
self._score_ingestion_queue.join()
408406
langfuse_logger.debug("Successfully flushed score ingestion queue")
@@ -415,10 +413,8 @@ def shutdown(self) -> None:
415413
atexit.unregister(self.shutdown)
416414

417415
tracer_provider = cast(TracerProvider, otel_trace_api.get_tracer_provider())
418-
if isinstance(tracer_provider, otel_trace_api.ProxyTracerProvider):
419-
return
420-
421-
tracer_provider.force_flush()
416+
if not isinstance(tracer_provider, otel_trace_api.ProxyTracerProvider):
417+
tracer_provider.force_flush()
422418

423419
self._stop_and_join_consumer_threads()
424420

0 commit comments

Comments
 (0)