diff --git a/python/fi_instrumentation/otel.py b/python/fi_instrumentation/otel.py index 60abc74..1d8cd80 100644 --- a/python/fi_instrumentation/otel.py +++ b/python/fi_instrumentation/otel.py @@ -5,6 +5,7 @@ import os import signal import sys +import threading import uuid from enum import Enum from typing import Any, Dict, List, Optional, Tuple, Type, Union @@ -401,6 +402,7 @@ def __init__( transport: Transport = Transport.HTTP, ): self._active_spans = {} + self._shutdown_lock = threading.Lock() if span_exporter is None: if transport == Transport.HTTP: @@ -437,27 +439,21 @@ def on_end(self, span: Any) -> None: def shutdown(self) -> None: """Override shutdown to ensure all active spans get exported""" try: - # Process any spans that haven't been ended - if self._active_spans: - print(f"Ending {len(self._active_spans)} active spans during shutdown") - - # Create a copy to avoid modification during iteration - active_spans = list(self._active_spans.values()) - - # End all active spans and mark them as leaked - for span in active_spans: - if hasattr(span, "is_recording") and span.is_recording(): - try: - # Mark the span as leaked - span.set_attribute("gen_ai.span.leaked", True) - span.end() - except Exception as e: - pass - - # Clear the tracking dictionary - self._active_spans.clear() + with self._shutdown_lock: + if self._active_spans: + logger.warning("Ending %d active spans during shutdown", len(self._active_spans)) + active_spans = list(self._active_spans.values()) + + for span in active_spans: + if hasattr(span, "is_recording") and span.is_recording(): + try: + span.set_attribute("gen_ai.span.leaked", True) + span.end() + except Exception: + pass + + self._active_spans.clear() finally: - # Call the parent shutdown method super().shutdown() @@ -567,7 +563,9 @@ def shutdown(self) -> None: if hasattr(self, "_session") and self._session: self._session.close() except Exception as e: - print(f"Error during shutdown: {e}") + logger.error("Error during gRPC exporter shutdown: %s", e) + finally: + super().shutdown() class HTTPSpanExporter(_HTTPSpanExporter): @@ -608,7 +606,9 @@ def shutdown(self) -> None: if hasattr(self, "_session") and self._session: self._session.close() except Exception as e: - print(f"Error during shutdown: {e}") + logger.error("Error during HTTP exporter shutdown: %s", e) + finally: + super().shutdown() def _exporter_transport(exporter: SpanExporter) -> str: