From 5f38a997472f74c3f52f3cc5f7edd32b6ea27e51 Mon Sep 17 00:00:00 2001 From: Owen Devereaux Date: Sun, 22 Mar 2026 17:24:43 -0400 Subject: [PATCH] fix: handle ClosedResourceError when transport closes mid-request Backports fixes from #2306 to v1.x to address issue #2328. When the transport closes while handlers are processing requests (e.g., stdin EOF during a long-running tool call), the server could crash with ClosedResourceError when trying to send a response through the already-closed write stream. This fix: 1. Wraps the message loop in a try/finally that cancels in-flight handlers when the transport closes, preventing them from attempting to respond 2. Catches BrokenResourceError and ClosedResourceError when calling message.respond() and logs instead of crashing 3. Properly re-raises transport-close cancellation to let the task group handle it (vs client-initiated cancellation which already sent a response) 4. Uses list() snapshot when iterating _response_streams in the finally block to avoid 'dictionary changed size during iteration' errors Fixes #2328 --- src/mcp/server/lowlevel/server.py | 67 +++++++---- src/mcp/shared/session.py | 6 +- tests/server/test_cancel_handling.py | 173 +++++++++++++++++++++++++++ 3 files changed, 221 insertions(+), 25 deletions(-) diff --git a/src/mcp/server/lowlevel/server.py b/src/mcp/server/lowlevel/server.py index 67453624c..2dd1a8277 100644 --- a/src/mcp/server/lowlevel/server.py +++ b/src/mcp/server/lowlevel/server.py @@ -671,16 +671,23 @@ async def run( await stack.enter_async_context(task_support.run()) async with anyio.create_task_group() as tg: - async for message in session.incoming_messages: - logger.debug("Received message: %s", message) - - tg.start_soon( - self._handle_message, - message, - session, - lifespan_context, - raise_exceptions, - ) + try: + async for message in session.incoming_messages: + logger.debug("Received message: %s", message) + + tg.start_soon( + self._handle_message, + message, + session, + lifespan_context, + raise_exceptions, + ) + finally: + # Transport closed: cancel in-flight handlers. Without this the + # TG join waits for them, and when they eventually try to + # respond they hit a closed write stream (the session's + # _receive_loop closed it when the read stream ended). + tg.cancel_scope.cancel() async def _handle_message( self, @@ -763,12 +770,18 @@ async def _handle_request( response = await handler(req) except McpError as err: # pragma: no cover response = err.error - except anyio.get_cancelled_exc_class(): # pragma: no cover - logger.info( - "Request %s cancelled - duplicate response suppressed", - message.request_id, - ) - return + except anyio.get_cancelled_exc_class(): + if message.cancelled: + # Client sent CancelledNotification; responder.cancel() already + # sent an error response, so skip the duplicate. + logger.info( + "Request %s cancelled - duplicate response suppressed", + message.request_id, + ) + return + # Transport-close cancellation from the TG in run(); re-raise so the + # TG swallows its own cancellation. + raise except Exception as err: # pragma: no cover if raise_exceptions: raise err @@ -777,16 +790,24 @@ async def _handle_request( # Reset the global state after we are done if token is not None: # pragma: no branch request_ctx.reset(token) - - await message.respond(response) else: # pragma: no cover - await message.respond( - types.ErrorData( - code=types.METHOD_NOT_FOUND, - message="Method not found", - ) + response = types.ErrorData( + code=types.METHOD_NOT_FOUND, + message="Method not found", ) + try: + await message.respond(response) + except (anyio.BrokenResourceError, anyio.ClosedResourceError): + # Transport closed between handler unblocking and respond. Happens + # when _receive_loop's finally wakes a handler blocked on + # send_request: the handler runs to respond() before run()'s TG + # cancel fires, but after the write stream closed. Closed if our + # end closed (_receive_loop's async-with exit); Broken if the peer + # end closed first (streamable_http terminate()). + logger.debug("Response for %s dropped - transport closed", message.request_id) + return + logger.debug("Response sent") async def _handle_notification(self, notify: Any): diff --git a/src/mcp/shared/session.py b/src/mcp/shared/session.py index 3033acd0e..35a83fcf1 100644 --- a/src/mcp/shared/session.py +++ b/src/mcp/shared/session.py @@ -108,7 +108,7 @@ def __exit__( ) -> None: """Exit the context manager, performing cleanup and notifying completion.""" try: - if self._completed: # pragma: no branch + if self._completed: self._on_complete(self) finally: self._entered = False @@ -445,7 +445,9 @@ async def _receive_loop(self) -> None: finally: # after the read stream is closed, we need to send errors # to any pending requests - for id, stream in self._response_streams.items(): + # Snapshot: stream.send() wakes the waiter, whose finally pops + # from _response_streams before the next __next__() call. + for id, stream in list(self._response_streams.items()): error = ErrorData(code=CONNECTION_CLOSED, message="Connection closed") try: await stream.send(JSONRPCError(jsonrpc="2.0", id=id, error=error)) diff --git a/tests/server/test_cancel_handling.py b/tests/server/test_cancel_handling.py index 47c49bb62..bcb172e64 100644 --- a/tests/server/test_cancel_handling.py +++ b/tests/server/test_cancel_handling.py @@ -108,3 +108,176 @@ async def first_request(): assert isinstance(content, types.TextContent) assert content.text == "Call number: 2" assert call_count == 2 + + +@pytest.mark.anyio +async def test_server_cancels_in_flight_handlers_on_transport_close(): + """When the transport closes mid-request, server.run() must cancel in-flight + handlers rather than join on them. + + Without the cancel, the task group waits for the handler, which then tries + to respond through a write stream that _receive_loop already closed, + raising ClosedResourceError and crashing server.run() with exit code 1. + + This drives server.run() with raw memory streams because InMemoryTransport + wraps it in its own finally-cancel (_memory.py) which masks the bug. + """ + from mcp.shared.message import SessionMessage + from mcp.types import ( + LATEST_PROTOCOL_VERSION, + ClientCapabilities, + Implementation, + InitializeRequestParams, + JSONRPCNotification, + JSONRPCRequest, + ) + + handler_started = anyio.Event() + handler_cancelled = anyio.Event() + server_run_returned = anyio.Event() + + server = Server("test") + + @server.call_tool() + async def handle_call_tool(name: str, arguments: dict | None) -> list[types.TextContent]: + handler_started.set() + try: + await anyio.sleep_forever() + finally: + handler_cancelled.set() + # unreachable: sleep_forever only exits via cancellation + raise AssertionError # pragma: no cover + + to_server, server_read = anyio.create_memory_object_stream[SessionMessage | Exception](10) + server_write, from_server = anyio.create_memory_object_stream[SessionMessage](10) + + async def run_server(): + await server.run(server_read, server_write, server.create_initialization_options()) + server_run_returned.set() + + init_req = JSONRPCRequest( + jsonrpc="2.0", + id=1, + method="initialize", + params=InitializeRequestParams( + protocolVersion=LATEST_PROTOCOL_VERSION, + capabilities=ClientCapabilities(), + clientInfo=Implementation(name="test", version="1.0"), + ).model_dump(by_alias=True, mode="json", exclude_none=True), + ) + initialized = JSONRPCNotification(jsonrpc="2.0", method="notifications/initialized") + call_req = JSONRPCRequest( + jsonrpc="2.0", + id=2, + method="tools/call", + params=CallToolRequestParams(name="slow", arguments={}).model_dump(by_alias=True, mode="json"), + ) + + with anyio.fail_after(5): + async with anyio.create_task_group() as tg, to_server, server_read, server_write, from_server: + tg.start_soon(run_server) + + await to_server.send(SessionMessage(message=types.JSONRPCMessage(init_req))) + await from_server.receive() # init response + await to_server.send(SessionMessage(message=types.JSONRPCMessage(initialized))) + await to_server.send(SessionMessage(message=types.JSONRPCMessage(call_req))) + + await handler_started.wait() + + # Close the server's input stream — this is what stdin EOF does. + # server.run()'s incoming_messages loop ends, finally-cancel fires, + # handler gets CancelledError, server.run() returns. + await to_server.aclose() + + await server_run_returned.wait() + + assert handler_cancelled.is_set() + + +@pytest.mark.anyio +async def test_server_handles_transport_close_with_pending_server_to_client_requests(): + """When the transport closes while handlers are blocked on server→client + requests (sampling, roots, elicitation), server.run() must still exit cleanly. + + Two bugs covered: + 1. _receive_loop's finally iterates _response_streams with await checkpoints + inside; the woken handler's send_request finally pops from that dict + before the next __next__() — RuntimeError: dictionary changed size. + 2. The woken handler's MCPError is caught in _handle_request, which falls + through to respond() against a write stream _receive_loop already closed. + """ + from mcp.shared.message import SessionMessage + from mcp.types import ( + LATEST_PROTOCOL_VERSION, + ClientCapabilities, + Implementation, + InitializeRequestParams, + JSONRPCNotification, + JSONRPCRequest, + ) + + handlers_started = 0 + both_started = anyio.Event() + server_run_returned = anyio.Event() + + server = Server("test") + + @server.call_tool() + async def handle_call_tool(name: str, arguments: dict | None) -> list[types.TextContent]: + nonlocal handlers_started + handlers_started += 1 + if handlers_started == 2: + both_started.set() + # Blocks on send_request waiting for a client response that never comes. + # _receive_loop's finally will wake this with CONNECTION_CLOSED. + await server.request_context.session.list_roots() + raise AssertionError # pragma: no cover + + to_server, server_read = anyio.create_memory_object_stream[SessionMessage | Exception](10) + server_write, from_server = anyio.create_memory_object_stream[SessionMessage](10) + + async def run_server(): + await server.run(server_read, server_write, server.create_initialization_options()) + server_run_returned.set() + + init_req = JSONRPCRequest( + jsonrpc="2.0", + id=1, + method="initialize", + params=InitializeRequestParams( + protocolVersion=LATEST_PROTOCOL_VERSION, + capabilities=ClientCapabilities(), + clientInfo=Implementation(name="test", version="1.0"), + ).model_dump(by_alias=True, mode="json", exclude_none=True), + ) + initialized = JSONRPCNotification(jsonrpc="2.0", method="notifications/initialized") + + with anyio.fail_after(5): + async with anyio.create_task_group() as tg, to_server, server_read, server_write, from_server: + tg.start_soon(run_server) + + await to_server.send(SessionMessage(message=types.JSONRPCMessage(init_req))) + await from_server.receive() # init response + await to_server.send(SessionMessage(message=types.JSONRPCMessage(initialized))) + + # Two tool calls → two handlers → two _response_streams entries. + for rid in (2, 3): + call_req = JSONRPCRequest( + jsonrpc="2.0", + id=rid, + method="tools/call", + params=CallToolRequestParams(name="t", arguments={}).model_dump(by_alias=True, mode="json"), + ) + await to_server.send(SessionMessage(message=types.JSONRPCMessage(call_req))) + + await both_started.wait() + # Drain the two roots/list requests so send_request's _write_stream.send() + # completes and both handlers are parked at response_stream_reader.receive(). + await from_server.receive() + await from_server.receive() + + await to_server.aclose() + + # Without the fixes: RuntimeError (dict mutation) or ClosedResourceError + # (respond after write-stream close) escapes run_server and this hangs. + await server_run_returned.wait()