Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -289,4 +289,4 @@ Users can be now overwrite the input and ouput attributes of spans created by in

- Added utility to set input and output data for any active span in a trace

[0.1.86]: https://github.com/KeyValueSoftwareSystems/netra-sdk-py/tree/main
[0.1.89]: https://github.com/KeyValueSoftwareSystems/netra-sdk-py/tree/main
69 changes: 69 additions & 0 deletions netra/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,12 @@ def shutdown(cls) -> None:
meter_provider.shutdown()
except Exception:
pass
# Close simulation HTTP client
if hasattr(cls, "simulation") and cls.simulation is not None:
try:
cls.simulation.close()
except Exception:
pass

@classmethod
def get_meter(cls, name: str = "netra", version: Optional[str] = None) -> otel_metrics.Meter:
Expand Down Expand Up @@ -409,6 +415,47 @@ def set_custom_event(cls, event_name: str, attributes: Any) -> None:
else:
logger.warning("Both event_name and attributes must be provided for custom events.")

@classmethod
def record_exception(
cls,
exception: BaseException,
attributes: Optional[Dict[str, Any]] = None,
) -> None:
"""Record a caught exception on the currently active span.

Use this inside ``except`` blocks to attach exception details to the
current span when the exception is being handled and will not propagate
to the SDK's automatic capture logic.

The method adds a standard OpenTelemetry exception event (with type,
message, and stacktrace), sets the span status to ERROR, and records
the ``netra.error_message`` attribute for consistency with the rest of
the SDK.

Args:
exception: The exception instance to record.
attributes: Optional extra attributes to attach to the exception
event.

Example::

@workflow
def process_order(order_id: str) -> str:
try:
result = call_payment_api(order_id)
except PaymentError as exc:
Netra.record_exception(exc)
return "fallback_result"
return result
"""
if not isinstance(exception, BaseException):
logger.error(
"record_exception: exception must be a BaseException instance, got %s",
type(exception),
)
return
SessionManager.record_exception(exception, attributes=attributes)

@classmethod
def add_conversation(cls, conversation_type: ConversationType, role: str, content: Any) -> None:
"""
Expand Down Expand Up @@ -461,6 +508,28 @@ def set_root_output(cls, value: Any) -> None:
"""
SessionManager.set_root_output(value)

@classmethod
def set_root_output_stream(cls, value: Any) -> Any:
"""
Wrap a stream so the accumulated output is set on the root span when iteration ends.

The returned object is a transparent proxy — iterate over it instead of the original::

stream = Netra.set_root_output_stream(stream)
for chunk in stream:
...

Supports both sync and async iterables. Returns *value* unchanged if no active trace
context exists or if *value* is not iterable.

Args:
value: The stream to wrap (Netra-instrumented or any generic iterable).

Returns:
A wrapped stream proxy, or *value* unchanged if wrapping is not possible.
"""
return SessionManager.set_root_output_stream(value)

@classmethod
def start_span(
cls,
Expand Down
6 changes: 4 additions & 2 deletions netra/instrumentation/agno/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -941,7 +941,7 @@ def set_request_attributes(
span.set_attribute("input", input_content)


def set_response_attributes(span: Span, response: Any) -> None:
def set_response_attributes(span: Span, response: Any) -> Optional[str]:
"""Set response-side span attributes from an Agno response object.

Writes token usage, output content, response ID, and output type.
Expand All @@ -951,7 +951,7 @@ def set_response_attributes(span: Span, response: Any) -> None:
response: The Agno response object (RunResponse, TeamRunOutput, etc.).
"""
if not span.is_recording():
return
return None

usage = extract_token_usage(response)
if usage:
Expand All @@ -965,6 +965,8 @@ def set_response_attributes(span: Span, response: Any) -> None:
if response_id:
span.set_attribute(ATTR_RESPONSE_ID, response_id)

return output


def sanitize_headers(raw_headers: List[Tuple[bytes, bytes]]) -> Dict[str, str]:
"""Convert ASGI raw header pairs to a dict with sensitive values redacted.
Expand Down
14 changes: 12 additions & 2 deletions netra/instrumentation/agno/wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ def _set_common_span_attributes(span: Span, entity_type: str) -> None:
class _BaseStreamWrapper:
"""Shared base for all span streaming wrappers."""

_netra_stream_wrapper = True

def __init__(self, span: Span, response: Any, ctx_token: Any = None) -> None:
"""Initialise the streaming wrapper.

Expand Down Expand Up @@ -222,10 +224,14 @@ class _AgentStreamOutputMixin:

def _set_output_on_success(self) -> None:
"""Write accumulated run content and token usage to the span before it closes."""
self._netra_output = ""
if self._last_response is not None:
set_response_attributes(self._span, self._last_response)
output = set_response_attributes(self._span, self._last_response)
self._netra_output = output if output else ""
if self._content_chunks:
self._span.set_attribute("output", "".join(self._content_chunks))
output = "".join(self._content_chunks)
self._span.set_attribute("output", output)
self._netra_output = output


class _LlmStreamOutputMixin:
Expand All @@ -239,9 +245,11 @@ class _LlmStreamOutputMixin:
def _set_output_on_success(self) -> None:
"""Write accumulated LLM content, token usage, and timing metrics to the span."""
output_str = None
self._netra_output = ""
if self._content_chunks:
content = "".join(self._content_chunks)
output_str = json.dumps([{"role": "assistant", "content": content}])
self._netra_output = content
elif self._tool_calls:
try:
tc_serialized = serialize_value(self._tool_calls, clean=True)
Expand All @@ -251,10 +259,12 @@ def _set_output_on_success(self) -> None:
except (json.JSONDecodeError, ValueError):
tc_data = tc_serialized
output_str = json.dumps([{"role": "assistant", "tool_calls": tc_data}])
self._netra_output = tc_serialized
except Exception as e:
logger.debug("netra.instrumentation.agno: failed to serialize tool_calls for LLM output: %s", e)
elif self._last_response is not None:
output_str = format_response_as_output(self._last_response)
self._netra_output = output_str if output_str else ""
if output_str:
self._span.set_attribute("output", output_str)
set_llm_completion_attributes(self._span, output_str)
Expand Down
24 changes: 24 additions & 0 deletions netra/instrumentation/cerebras/wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ def _detect_streaming(args: Tuple[Any, ...], kwargs: Dict[str, Any]) -> bool:
class StreamingWrapper(ObjectProxy): # type: ignore[misc]
"""Wrapper for streaming responses"""

_netra_stream_wrapper = True

def __init__(self, span: Span, response: Iterator[Any], request_kwargs: Dict[str, Any]) -> None:
super().__init__(response)
self._span = span
Expand All @@ -59,6 +61,15 @@ def _ensure_choice(self, index: int) -> None:
else:
self._complete_response["choices"].append({"text": ""})

def _extract_content_text(self) -> str:
"""Extract the plain text content from the accumulated response."""
parts = []
for choice in self._complete_response.get("choices", []):
msg = choice.get("message", {})
if content := msg.get("content"):
parts.append(content)
return "".join(parts)

def __iter__(self) -> Iterator[Any]:
return self

Expand Down Expand Up @@ -129,13 +140,16 @@ def _finalize_span(self) -> None:
"""Finalize span when streaming is complete"""
record_span_timing(self._span, LLM_RESPONSE_DURATION)
set_response_attributes(self._span, self._complete_response)
self._netra_output = self._extract_content_text()
self._span.set_status(Status(StatusCode.OK))
self._span.end()


class AsyncStreamingWrapper(ObjectProxy): # type: ignore[misc]
"""Async wrapper for streaming responses"""

_netra_stream_wrapper = True

def __init__(self, span: Span, response: AsyncIterator[Any], request_kwargs: Dict[str, Any]) -> None:
super().__init__(response)
self._span = span
Expand All @@ -155,6 +169,15 @@ def _ensure_choice(self, index: int) -> None:
else:
self._complete_response["choices"].append({"text": ""})

def _extract_content_text(self) -> str:
"""Extract the plain text content from the accumulated response."""
parts = []
for choice in self._complete_response.get("choices", []):
msg = choice.get("message", {})
if content := msg.get("content"):
parts.append(content)
return "".join(parts)

def __aiter__(self) -> AsyncIterator[Any]:
return self

Expand Down Expand Up @@ -227,6 +250,7 @@ def _finalize_span(self) -> None:
"""Finalize span when streaming is complete"""
record_span_timing(self._span, LLM_RESPONSE_DURATION)
set_response_attributes(self._span, self._complete_response)
self._netra_output = self._extract_content_text()
self._span.set_status(Status(StatusCode.OK))
self._span.end()

Expand Down
6 changes: 6 additions & 0 deletions netra/instrumentation/google_genai/wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ async def wrapper(wrapped: Callable[..., Any], instance: Any, args: Tuple[Any, .


class StreamingWrapper:
_netra_stream_wrapper = True

def __init__(self, span: Span, response: Iterator[Any]) -> None:
self._span = span
self._buffer: dict[Any, Any] = {"chunk": None, "content": ""}
Expand Down Expand Up @@ -272,11 +274,14 @@ def _process_chunk(self, chunk: Any) -> None:
def _finalize_span(self) -> None:
record_span_timing(self._span, LLM_RESPONSE_DURATION)
set_response_attributes(self._span, self._buffer)
self._netra_output = self._buffer.get("content", "") if isinstance(self._buffer, dict) else ""
self._span.set_status(Status(StatusCode.OK))
self._span.end()


class AsyncStreamingWrapper:
_netra_stream_wrapper = True

def __init__(self, span: Span, response: AsyncIterator[Any]) -> None:
self._span = span
self._buffer: dict[Any, Any] = {"chunk": None, "content": ""}
Expand Down Expand Up @@ -313,5 +318,6 @@ def _process_chunk(self, chunk: Any) -> None:
def _finalize_span(self) -> None:
record_span_timing(self._span, LLM_RESPONSE_DURATION)
set_response_attributes(self._span, self._buffer)
self._netra_output = self._buffer.get("content", "") if isinstance(self._buffer, dict) else ""
self._span.set_status(Status(StatusCode.OK))
self._span.end()
24 changes: 24 additions & 0 deletions netra/instrumentation/groq/wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
class StreamingWrapper(ObjectProxy): # type: ignore[misc]
"""Wrapper for streaming responses (OpenAI-style)."""

_netra_stream_wrapper = True

def __init__(self, span: Span, response: Iterator[Any], request_kwargs: Dict[str, Any]) -> None:
super().__init__(response)
self._span = span
Expand All @@ -43,6 +45,15 @@ def _ensure_choice(self, index: int) -> None:
else:
self._complete_response["choices"].append({"text": ""})

def _extract_content_text(self) -> str:
"""Extract the plain text content from the accumulated response."""
parts = []
for choice in self._complete_response.get("choices", []):
msg = choice.get("message", {})
if content := msg.get("content"):
parts.append(content)
return "".join(parts)

def __iter__(self) -> Iterator[Any]:
return self

Expand Down Expand Up @@ -98,13 +109,16 @@ def _process_chunk(self, chunk: Any) -> None:
def _finalize_span(self) -> None:
record_span_timing(self._span, LLM_RESPONSE_DURATION)
set_response_attributes(self._span, self._complete_response)
self._netra_output = self._extract_content_text()
self._span.set_status(Status(StatusCode.OK))
self._span.end()


class AsyncStreamingWrapper(ObjectProxy): # type: ignore[misc]
"""Async wrapper for streaming responses (OpenAI-style)."""

_netra_stream_wrapper = True

def __init__(self, span: Span, response: AsyncIterator[Any], request_kwargs: Dict[str, Any]) -> None:
super().__init__(response)
self._span = span
Expand All @@ -122,6 +136,15 @@ def _ensure_choice(self, index: int) -> None:
else:
self._complete_response["choices"].append({"text": ""})

def _extract_content_text(self) -> str:
"""Extract the plain text content from the accumulated response."""
parts = []
for choice in self._complete_response.get("choices", []):
msg = choice.get("message", {})
if content := msg.get("content"):
parts.append(content)
return "".join(parts)

def __aiter__(self) -> AsyncIterator[Any]:
return self

Expand Down Expand Up @@ -177,6 +200,7 @@ def _process_chunk(self, chunk: Any) -> None:
def _finalize_span(self) -> None:
record_span_timing(self._span, LLM_RESPONSE_DURATION)
set_response_attributes(self._span, self._complete_response)
self._netra_output = self._extract_content_text()
self._span.set_status(Status(StatusCode.OK))
self._span.end()

Expand Down
Loading