Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 22 additions & 22 deletions python/fi_instrumentation/otel.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import signal
import sys
import threading
import uuid
from enum import Enum
from typing import Any, Dict, List, Optional, Tuple, Type, Union
Expand Down Expand Up @@ -401,6 +402,7 @@ def __init__(
transport: Transport = Transport.HTTP,
):
self._active_spans = {}
self._shutdown_lock = threading.Lock()

if span_exporter is None:
if transport == Transport.HTTP:
Expand Down Expand Up @@ -437,27 +439,21 @@ def on_end(self, span: Any) -> None:
def shutdown(self) -> None:
"""Override shutdown to ensure all active spans get exported"""
try:
# Process any spans that haven't been ended
if self._active_spans:
print(f"Ending {len(self._active_spans)} active spans during shutdown")

# Create a copy to avoid modification during iteration
active_spans = list(self._active_spans.values())

# End all active spans and mark them as leaked
for span in active_spans:
if hasattr(span, "is_recording") and span.is_recording():
try:
# Mark the span as leaked
span.set_attribute("gen_ai.span.leaked", True)
span.end()
except Exception as e:
pass

# Clear the tracking dictionary
self._active_spans.clear()
with self._shutdown_lock:
if self._active_spans:
logger.warning("Ending %d active spans during shutdown", len(self._active_spans))
active_spans = list(self._active_spans.values())

for span in active_spans:
if hasattr(span, "is_recording") and span.is_recording():
try:
span.set_attribute("gen_ai.span.leaked", True)
span.end()
except Exception:
pass

self._active_spans.clear()
finally:
# Call the parent shutdown method
super().shutdown()


Expand Down Expand Up @@ -567,7 +563,9 @@ def shutdown(self) -> None:
if hasattr(self, "_session") and self._session:
self._session.close()
except Exception as e:
print(f"Error during shutdown: {e}")
logger.error("Error during gRPC exporter shutdown: %s", e)
finally:
super().shutdown()


class HTTPSpanExporter(_HTTPSpanExporter):
Expand Down Expand Up @@ -608,7 +606,9 @@ def shutdown(self) -> None:
if hasattr(self, "_session") and self._session:
self._session.close()
except Exception as e:
print(f"Error during shutdown: {e}")
logger.error("Error during HTTP exporter shutdown: %s", e)
finally:
super().shutdown()


def _exporter_transport(exporter: SpanExporter) -> str:
Expand Down