Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from collections.abc import AsyncIterable, AsyncIterator, Generator, Mapping, Sequence
from contextlib import suppress
from pathlib import Path
from contextlib import AbstractAsyncContextManager, AsyncExitStack, suppress
from typing import Protocol, cast

from agent_framework import (
Expand All @@ -25,12 +26,14 @@
SupportsAgentRun,
WorkflowAgent,
)
from agent_framework.exceptions import AgentFrameworkException
from azure.ai.agentserver.responses import (
ResponseContext,
ResponseEventStream,
ResponseProviderProtocol,
ResponsesServerOptions,
)
from azure.ai.agentserver.responses._id_generator import IdGenerator
from azure.ai.agentserver.responses.hosting import ResponsesAgentServerHost
from azure.ai.agentserver.responses.models import (
ApplyPatchToolCallItemParam,
Expand Down Expand Up @@ -108,11 +111,13 @@
ReasoningSummaryPartBuilder,
TextContentBuilder,
)
from mcp import McpError
from typing_extensions import Any

logger = logging.getLogger(__name__)


# region Approval Storage
class ApprovalStorage(Protocol):
"""Storage for saving function approval requests."""

Expand Down Expand Up @@ -247,6 +252,39 @@ def _checkpoint_storage_for_context(root: str, context_id: str) -> FileCheckpoin
return FileCheckpointStorage(storage_path)


# endregion Approval Storage

# Foundry Toolbox Auth integration
# Consent-URL error code returned by the Foundry MCP gateway when calling `/list`
CONSENT_ERROR_CODE = -32007


def consent_url_from_error(exc: BaseException) -> str | None:
"""Return the consent URL when ``exc`` wraps a Foundry MCP gateway consent error.

The Agent Framework MCP layer surfaces gateway consent failures by wrapping the underlying
``McpError`` inside an :class:`AgentFrameworkException` (typically a ``ToolExecutionException``
raised from ``MCPStreamableHTTPTool.__aenter__``). This helper inspects ``exc.args`` for a
wrapped ``McpError`` whose ``error.code`` is :data:`CONSENT_ERROR_CODE`; when found, the
consent link the gateway returned in ``error.message`` is returned. Returns ``None`` for
anything else, so callers can do ``if (url := consent_url_from_error(ex)) is None: raise``.

Args:
exc: The exception to inspect.

Returns:
The consent URL if ``exc`` wraps a consent ``McpError``, otherwise ``None``.
"""
inner_exception = next((arg for arg in exc.args if isinstance(arg, McpError)), None)
if inner_exception is not None and inner_exception.error.code == CONSENT_ERROR_CODE:
return inner_exception.error.message
return None


# endregion Foundry Toolbox Auth integration


# region ResponsesHostServer
class ResponsesHostServer(ResponsesAgentServerHost):
"""A responses server host for an agent."""

Expand Down Expand Up @@ -315,8 +353,43 @@ def __init__(
if self.config.is_hosted
else InMemoryFunctionApprovalStorage()
)
# Lazy agent lifecycle: the agent (and any MCP tools it owns) is entered on
# the first request rather than at server startup, so that authentication
# failures during MCP connect can be surfaced to the client as an
# `oauth_consent_request` stream event instead of crashing the server.
self._agent_stack: AsyncExitStack | None = None
self._agent_init_lock = asyncio.Lock()
self.shutdown_handler(self._cleanup_agent) # pyright: ignore[reportUnknownMemberType]
self.response_handler(self._handle_response) # pyright: ignore[reportUnknownMemberType]

async def _ensure_agent_ready(self) -> None:
"""Lazily enter the agent's async context exactly once.

On failure the partial exit stack is closed and ``_agent_stack`` is left
as ``None`` so a subsequent request (e.g. after the user completes OAuth
consent) can retry the connection.
"""
if self._agent_stack is not None:
return
async with self._agent_init_lock:
if self._agent_stack is not None:
return
stack = AsyncExitStack()
try:
if isinstance(self._agent, AbstractAsyncContextManager):
await stack.enter_async_context(self._agent)
except BaseException:
await stack.aclose()
raise
self._agent_stack = stack

async def _cleanup_agent(self) -> None:
"""Close the agent's async context. Registered as the server shutdown handler."""
stack = self._agent_stack
if stack is not None:
self._agent_stack = None
await stack.aclose()

async def _handle_response(
self,
request: CreateResponse,
Expand Down Expand Up @@ -359,45 +432,76 @@ async def _handle_inner_agent(
else:
run_kwargs["options"] = chat_options

if not is_streaming_request:
# Run the agent in non-streaming mode
response = await self._agent.run(stream=False, **run_kwargs) # type: ignore[reportUnknownMemberType]

for message in response.messages:
for content in message.contents:
async for item in _to_outputs(
response_event_stream,
content,
approval_storage=self._approval_storage,
):
yield item

# Lazy-enter the agent (and any MCP tools it owns). The MCP client wraps gateway
# consent failures (and other connection-time errors) in AgentFrameworkException; if
# one of those is a consent error we surface the consent link to the client through
# the already-opened response stream instead of crashing the request. Other exception
# types propagate normally so the host can handle / log them.
try:
await self._ensure_agent_ready()
except AgentFrameworkException as ex:
consent_url = consent_url_from_error(ex)
if consent_url is None:
raise
logger.warning("OAuth consent required for Foundry MCP gateway.")
oauth_item = OAuthConsentRequestOutputItem(
id=IdGenerator.new_id("oacr"),
consent_link=consent_url,
server_label="Foundry Toolbox",
)
builder = response_event_stream.add_output_item(oauth_item.id)
yield builder.emit_added(oauth_item)
yield builder.emit_done(oauth_item)
yield response_event_stream.emit_completed()
return

# Track the current active output item builder for streaming;
# lazily created on matching content, closed when a different type arrives.
tracker = _OutputItemTracker(response_event_stream)
tracker: _OutputItemTracker | None = _OutputItemTracker(response_event_stream) if is_streaming_request else None

# Run the agent in streaming mode
async for update in self._agent.run(stream=True, **run_kwargs): # type: ignore[reportUnknownMemberType]
for content in update.contents:
for event in tracker.handle(content):
try:
if not is_streaming_request:
# Run the agent in non-streaming mode
response = await self._agent.run(stream=False, **run_kwargs) # type: ignore[reportUnknownMemberType]

for message in response.messages:
for content in message.contents:
async for item in _to_outputs(
response_event_stream,
content,
approval_storage=self._approval_storage,
):
yield item
yield response_event_stream.emit_completed()
else:
if tracker is None: # pragma: no cover - defensive, set above
raise RuntimeError("Streaming tracker was not initialized.")
# Run the agent in streaming mode
async for update in self._agent.run(stream=True, **run_kwargs): # type: ignore[reportUnknownMemberType]
for content in update.contents:
for event in tracker.handle(content):
yield event
if tracker.needs_async:
async for item in _to_outputs(
response_event_stream,
content,
approval_storage=self._approval_storage,
):
yield item
tracker.needs_async = False

# Close any remaining active builder
for event in tracker.close():
yield event
if tracker.needs_async:
async for item in _to_outputs(
response_event_stream,
content,
approval_storage=self._approval_storage,
):
yield item
tracker.needs_async = False

# Close any remaining active builder
for event in tracker.close():
yield event

yield response_event_stream.emit_completed()
yield response_event_stream.emit_completed()
except Exception:
# Drain any in-progress streaming builder before emitting consent
# so the resulting stream stays well-formed.
if tracker is not None:
for event in tracker.close():
yield event
yield response_event_stream.emit_completed()
raise

async def _handle_inner_workflow(
self,
Expand Down Expand Up @@ -429,6 +533,11 @@ async def _handle_inner_workflow(
if not isinstance(self._agent, WorkflowAgent):
raise RuntimeError("Agent is not a workflow agent.")

# Workflow agents are not async context managers in any built-in path,
# but call _ensure_agent_ready for symmetry with the regular path so
# any future async resources owned by the workflow are entered here.
await self._ensure_agent_ready()

# Determine the latest checkpoint (if any) so we can resume the
# workflow's prior state for this turn. The directory is keyed by
# the inbound context id (conversation_id when set, otherwise
Expand Down Expand Up @@ -551,6 +660,8 @@ async def _delete_not_latest_checkpoints(checkpoint_storage: FileCheckpointStora
await checkpoint_storage.delete(checkpoint.checkpoint_id)


# endregion ResponsesHostServer

# region Active Builder State


Expand Down
Loading
Loading