Transports in the Model Context Protocol (MCP) provide the foundation for communication between clients and servers. A transport handles the underlying mechanics of how messages are sent and received.
MCP uses JSON-RPC 2.0 as its wire format. The transport layer is responsible for converting MCP protocol messages into JSON-RPC format for transmission and converting received JSON-RPC messages back into MCP protocol messages.
The stdio transport enables communication through standard input and output streams. This is particularly useful for local integrations and command-line tools.
Use stdio when:
Building command-line tools
Implementing local integrations
Needing simple process communication
Working with shell scripts
Copy
const server = new Server({ name: "example-server", version: "1.0.0"}, { capabilities: {}});const transport = new StdioServerTransport();await server.connect(transport);
Copy
const server = new Server({ name: "example-server", version: "1.0.0"}, { capabilities: {}});const transport = new StdioServerTransport();await server.connect(transport);
Copy
const client = new Client({ name: "example-client", version: "1.0.0"}, { capabilities: {}});const transport = new StdioClientTransport({ command: "./server", args: ["--option", "value"]});await client.connect(transport);
Copy
app = Server("example-server")async with stdio_server() as streams: await app.run( streams[0], streams[1], app.create_initialization_options() )
Copy
params = StdioServerParameters( command="./server", args=["--option", "value"])async with stdio_client(params) as streams: async with ClientSession(streams[0], streams[1]) as session: await session.initialize()
The SSE Transport has been replaced with a more
flexible Streamable HTTP transport. Refer to the Specification
and latest SDKs for the most recent information.
SSE transport enables server-to-client streaming with HTTP POST requests for client-to-server communication.
Client-to-Server Communication: Every JSON-RPC message from client to server is sent as a new HTTP POST request to the MCP endpoint
Server Responses: The server can respond either with:
A single JSON response (Content-Type: application/json)
An SSE stream (Content-Type: text/event-stream) for multiple messages
Server-to-Client Communication: Servers can send requests/notifications to clients via:
SSE streams initiated by client requests
SSE streams from HTTP GET requests to the MCP endpoint
Copy
import express from "express";const app = express();const server = new Server({ name: "example-server", version: "1.0.0"}, { capabilities: {}});// MCP endpoint handles both POST and GETapp.post("/mcp", async (req, res) => { // Handle JSON-RPC request const response = await server.handleRequest(req.body); // Return single response or SSE stream if (needsStreaming) { res.setHeader("Content-Type", "text/event-stream"); // Send SSE events... } else { res.json(response); }});app.get("/mcp", (req, res) => { // Optional: Support server-initiated SSE streams res.setHeader("Content-Type", "text/event-stream"); // Send server notifications/requests...});app.listen(3000);
Copy
import express from "express";const app = express();const server = new Server({ name: "example-server", version: "1.0.0"}, { capabilities: {}});// MCP endpoint handles both POST and GETapp.post("/mcp", async (req, res) => { // Handle JSON-RPC request const response = await server.handleRequest(req.body); // Return single response or SSE stream if (needsStreaming) { res.setHeader("Content-Type", "text/event-stream"); // Send SSE events... } else { res.json(response); }});app.get("/mcp", (req, res) => { // Optional: Support server-initiated SSE streams res.setHeader("Content-Type", "text/event-stream"); // Send server notifications/requests...});app.listen(3000);
Copy
const client = new Client({ name: "example-client", version: "1.0.0"}, { capabilities: {}});const transport = new HttpClientTransport( new URL("http://localhost:3000/mcp"));await client.connect(transport);
async with http_client("http://localhost:8000/mcp") as transport: async with ClientSession(transport[0], transport[1]) as session: await session.initialize()
SSE as a standalone transport is deprecated as of protocol version 2024-11-05.
It has been replaced by Streamable HTTP, which incorporates SSE as an optional
streaming mechanism. For backwards compatibility information, see the
backwards compatibility section below.
The legacy SSE transport enabled server-to-client streaming with HTTP POST requests for client-to-server communication.
The deprecated SSE transport had similar security considerations to Streamable HTTP, particularly regarding DNS rebinding attacks. These same protections should be applied when using SSE streams within the Streamable HTTP transport.
Copy
import express from "express";const app = express();const server = new Server({ name: "example-server", version: "1.0.0"}, { capabilities: {}});let transport: SSEServerTransport | null = null;app.get("/sse", (req, res) => { transport = new SSEServerTransport("/messages", res); server.connect(transport);});app.post("/messages", (req, res) => { if (transport) { transport.handlePostMessage(req, res); }});app.listen(3000);
Copy
import express from "express";const app = express();const server = new Server({ name: "example-server", version: "1.0.0"}, { capabilities: {}});let transport: SSEServerTransport | null = null;app.get("/sse", (req, res) => { transport = new SSEServerTransport("/messages", res); server.connect(transport);});app.post("/messages", (req, res) => { if (transport) { transport.handlePostMessage(req, res); }});app.listen(3000);
Copy
const client = new Client({ name: "example-client", version: "1.0.0"}, { capabilities: {}});const transport = new SSEClientTransport( new URL("http://localhost:3000/sse"));await client.connect(transport);
MCP makes it easy to implement custom transports for specific needs. Any transport implementation just needs to conform to the Transport interface:
You can implement custom transports for:
Custom network protocols
Specialized communication channels
Integration with existing systems
Performance optimization
Copy
interface Transport { // Start processing messages start(): Promise<void>; // Send a JSON-RPC message send(message: JSONRPCMessage): Promise<void>; // Close the connection close(): Promise<void>; // Callbacks onclose?: () => void; onerror?: (error: Error) => void; onmessage?: (message: JSONRPCMessage) => void;}
Copy
interface Transport { // Start processing messages start(): Promise<void>; // Send a JSON-RPC message send(message: JSONRPCMessage): Promise<void>; // Close the connection close(): Promise<void>; // Callbacks onclose?: () => void; onerror?: (error: Error) => void; onmessage?: (message: JSONRPCMessage) => void;}
Note that while MCP Servers are often implemented with asyncio, we recommend
implementing low-level interfaces like transports with anyio for wider compatibility.
Copy
@contextmanagerasync def create_transport( read_stream: MemoryObjectReceiveStream[JSONRPCMessage | Exception], write_stream: MemoryObjectSendStream[JSONRPCMessage]): """ Transport interface for MCP. Args: read_stream: Stream to read incoming messages from write_stream: Stream to write outgoing messages to """ async with anyio.create_task_group() as tg: try: # Start processing messages tg.start_soon(lambda: process_messages(read_stream)) # Send messages async with write_stream: yield write_stream except Exception as exc: # Handle errors raise exc finally: # Clean up tg.cancel_scope.cancel() await write_stream.aclose() await read_stream.aclose()
Note that while MCP Servers are often implemented with asyncio, we recommend
implementing low-level interfaces like transports with anyio for wider compatibility.
Copy
@contextmanagerasync def example_transport(scope: Scope, receive: Receive, send: Send): try: # Create streams for bidirectional communication read_stream_writer, read_stream = anyio.create_memory_object_stream(0) write_stream, write_stream_reader = anyio.create_memory_object_stream(0) async def message_handler(): try: async with read_stream_writer: # Message handling logic pass except Exception as exc: logger.error(f"Failed to handle message: {exc}") raise exc async with anyio.create_task_group() as tg: tg.start_soon(message_handler) try: # Yield streams for communication yield read_stream, write_stream except Exception as exc: logger.error(f"Transport error: {exc}") raise exc finally: tg.cancel_scope.cancel() await write_stream.aclose() await read_stream.aclose() except Exception as exc: logger.error(f"Failed to initialize transport: {exc}") raise exc