Skip to content

Commit 11fddcc

Browse files
fix: remove stdout stream handling for share (#167)
* fix: remove stdout stream handling for share * test: remove fnctl test * fix: send err when validation error --------- Co-authored-by: cnie
1 parent 37addeb commit 11fddcc

File tree

3 files changed

+47
-113
lines changed

3 files changed

+47
-113
lines changed

src/mcpm/commands/share.py

Lines changed: 45 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,7 @@
22
Share command for MCPM - Share a single MCP server through a tunnel
33
"""
44

5-
import os
65
import secrets
7-
import select
86
import shlex
97
import shutil
108
import signal
@@ -27,19 +25,6 @@ def find_mcp_proxy() -> Optional[str]:
2725
return shutil.which("mcp-proxy")
2826

2927

30-
def make_non_blocking(file_obj):
31-
"""Make a file object non-blocking."""
32-
if os.name == 'posix':
33-
import fcntl
34-
35-
fd = file_obj.fileno()
36-
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
37-
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
38-
# On other platforms (e.g., Windows), we rely on the behavior of select()
39-
# and the non-blocking nature of readline() on Popen streams,
40-
# or the existing try-except for IOError/OSError.
41-
42-
4328
def wait_for_random_port(process: subprocess.Popen, timeout: int = 20) -> Optional[int]:
4429
"""
4530
Wait for mcp-proxy to output the random port information.
@@ -74,39 +59,26 @@ def wait_for_random_port(process: subprocess.Popen, timeout: int = 20) -> Option
7459
console.print(f"[red]Error output:[/]\n{stderr_output}")
7560
sys.exit(1)
7661

77-
# Use select to wait for data to be available without blocking
78-
readable = []
79-
if process.stdout:
80-
readable.append(process.stdout)
81-
if process.stderr:
82-
readable.append(process.stderr)
83-
84-
if readable:
85-
# Wait for up to 1 second for output
86-
r, _, _ = select.select(readable, [], [], 1.0)
87-
88-
# Process available output
89-
for stream in r:
90-
try:
91-
line = stream.readline()
92-
if line:
93-
print(line.rstrip())
94-
95-
# Check for port information
96-
if "Uvicorn running on http://" in line:
97-
try:
98-
url_part = line.split("Uvicorn running on ")[1].split(" ")[0]
99-
actual_port = int(url_part.split(":")[-1].strip())
100-
port_found = True
101-
console.print(
102-
f"[cyan]mcp-proxy SSE server running on port [bold]{actual_port}[/bold][/]"
103-
)
104-
break
105-
except (ValueError, IndexError):
106-
pass
107-
except (IOError, OSError):
108-
# Resource temporarily unavailable - this is normal for non-blocking IO
109-
pass
62+
# Process available output
63+
try:
64+
if process.stderr:
65+
line = process.stderr.readline()
66+
if line:
67+
console.print(line.rstrip())
68+
69+
# Check for port information
70+
if "Uvicorn running on http://" in line:
71+
try:
72+
url_part = line.split("Uvicorn running on ")[1].split(" ")[0]
73+
actual_port = int(url_part.split(":")[-1].strip())
74+
port_found = True
75+
console.print(f"[cyan]mcp-proxy SSE server running on port [bold]{actual_port}[/bold][/]")
76+
break
77+
except (ValueError, IndexError):
78+
pass
79+
except (IOError, OSError):
80+
# Resource temporarily unavailable - this is normal for non-blocking IO
81+
pass
11082
else:
11183
# No streams to read from, just wait a bit
11284
time.sleep(0.5)
@@ -148,12 +120,6 @@ def start_mcp_proxy(command: str, port: Optional[int] = None) -> Tuple[subproces
148120
console.print(f"[cyan]Running command: [bold]{' '.join(cmd_parts)}[/bold][/]")
149121
process = subprocess.Popen(cmd_parts, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1)
150122

151-
# Make stdout and stderr non-blocking
152-
if process.stdout:
153-
make_non_blocking(process.stdout)
154-
if process.stderr:
155-
make_non_blocking(process.stderr)
156-
157123
# If port is None, we need to parse the output to find the random port
158124
actual_port = port
159125
if not actual_port:
@@ -352,45 +318,31 @@ def signal_handler(sig, frame):
352318
if tunnel:
353319
tunnel.kill()
354320
break
355-
356-
# Use select to check for available output without blocking
357-
readable = []
358-
if server_process.stdout:
359-
readable.append(server_process.stdout)
360-
if server_process.stderr:
361-
readable.append(server_process.stderr)
362-
363-
if readable:
364-
# Wait for up to 1 second for output
365-
r, _, _ = select.select(readable, [], [], 1.0)
366-
367-
# Process available output
368-
for stream in r:
369-
try:
370-
line = stream.readline()
371-
if line:
372-
line_str = line.rstrip()
373-
print(line_str)
374-
last_activity_time = time.time()
375-
376-
# Check for error messages
377-
error_msg = monitor_for_errors(line_str)
378-
if error_msg and error_msg not in error_messages:
379-
console.print(f"[bold red]Error:[/] {error_msg}")
380-
error_messages.append(error_msg)
381-
server_error_detected = True
382-
383-
# If this is a critical error and we have retries left, restart
384-
if "Protocol initialization error" in error_msg and retries_left > 0:
385-
console.print(
386-
f"[yellow]Will attempt to restart ({retries_left} retries left)[/]"
387-
)
388-
# Break out of the loop to trigger a restart
389-
server_process.terminate()
390-
break
391-
except (IOError, OSError):
392-
# Resource temporarily unavailable - this is normal for non-blocking IO
393-
pass
321+
# Process available output
322+
try:
323+
if server_process.stderr:
324+
line = server_process.stderr.readline()
325+
if line:
326+
line_str = line.rstrip()
327+
console.print(line_str)
328+
last_activity_time = time.time()
329+
330+
# Check for error messages
331+
error_msg = monitor_for_errors(line_str)
332+
if error_msg and error_msg not in error_messages:
333+
console.print(f"[bold red]Error:[/] {error_msg}")
334+
error_messages.append(error_msg)
335+
server_error_detected = True
336+
337+
# If this is a critical error and we have retries left, restart
338+
if "Protocol initialization error" in error_msg and retries_left > 0:
339+
console.print(f"[yellow]Will attempt to restart ({retries_left} retries left)[/]")
340+
# Break out of the loop to trigger a restart
341+
server_process.terminate()
342+
break
343+
except (IOError, OSError):
344+
# Resource temporarily unavailable - this is normal for non-blocking IO
345+
pass
394346
else:
395347
# No streams to read from, just wait a bit
396348
time.sleep(0.5)

src/mcpm/router/transport.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ async def handle_post_message(self, scope: Scope, receive: Receive, send: Send):
220220
response = Response("Could not parse message", status_code=400)
221221
await response(scope, receive, send)
222222
try:
223-
await writer.send(SessionMessage(message=err))
223+
await writer.send(err)
224224
except (BrokenPipeError, ConnectionError, OSError) as pipe_err:
225225
logger.warning(f"Failed to send error due to pipe issue: {pipe_err}")
226226
return
@@ -240,7 +240,7 @@ async def handle_post_message(self, scope: Scope, receive: Receive, send: Send):
240240
logger.warning(f"Connection error when sending message to session {session_id}: {e}")
241241
self._read_stream_writers.pop(session_id, None)
242242
self._session_id_to_identifier.pop(session_id, None)
243-
243+
244244
# Implicitly return None. The original 'return response' is removed.
245245
return
246246

tests/test_share.py

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
from src.mcpm.commands.share import (
1111
find_mcp_proxy,
12-
make_non_blocking,
1312
monitor_for_errors,
1413
share,
1514
terminate_process,
@@ -33,23 +32,6 @@ def test_find_mcp_proxy_not_found(self, monkeypatch):
3332

3433
assert find_mcp_proxy() is None
3534

36-
@patch("fcntl.fcntl")
37-
def test_make_non_blocking(self, mock_fcntl):
38-
"""Test making a file object non-blocking"""
39-
# Create a mock file object
40-
mock_file = Mock()
41-
mock_file.fileno.return_value = 42
42-
43-
# Set up mock fcntl return values
44-
mock_fcntl.return_value = 0
45-
46-
# Call the function
47-
make_non_blocking(mock_file)
48-
49-
# Verify that functions were called correctly
50-
mock_file.fileno.assert_called_once()
51-
assert mock_fcntl.call_count == 2
52-
5335
def test_monitor_for_errors_with_known_error(self):
5436
"""Test error detection with a known error pattern"""
5537
error_line = "Error: RuntimeError: Received request before initialization was complete"

0 commit comments

Comments
 (0)