From 2e6ac8ff49eabcb1b62c1bd504338e7449f80c6e Mon Sep 17 00:00:00 2001 From: Kit Langton Date: Thu, 26 Mar 2026 14:41:00 -0400 Subject: [PATCH] fix(mcp): close transport on failed/timed-out connections (#19200) --- packages/opencode/src/mcp/index.ts | 523 +++++++++---------- packages/opencode/test/mcp/lifecycle.test.ts | 101 +++- 2 files changed, 350 insertions(+), 274 deletions(-) diff --git a/packages/opencode/src/mcp/index.ts b/packages/opencode/src/mcp/index.ts index d114550fcd..9bc6ef9dcf 100644 --- a/packages/opencode/src/mcp/index.ts +++ b/packages/opencode/src/mcp/index.ts @@ -24,7 +24,7 @@ import { BusEvent } from "../bus/bus-event" import { Bus } from "@/bus" import { TuiEvent } from "@/cli/cmd/tui/event" import open from "open" -import { Effect, Layer, Option, ServiceMap, Stream } from "effect" +import { Effect, Exit, Layer, Option, ServiceMap, Stream } from "effect" import { InstanceState } from "@/effect/instance-state" import { makeRuntime } from "@/effect/run-service" import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process" @@ -129,6 +129,8 @@ export namespace MCP { return typeof entry === "object" && entry !== null && "type" in entry } + const sanitize = (s: string) => s.replace(/[^a-zA-Z0-9_-]/g, "_") + // Convert MCP tool definition to AI SDK Tool type function convertMcpTool(mcpTool: MCPToolDef, client: MCPClient, timeout?: number): Tool { const inputSchema = mcpTool.inputSchema @@ -160,233 +162,48 @@ export namespace MCP { }) } - async function defs(key: string, client: MCPClient, timeout?: number) { - const result = await withTimeout(client.listTools(), timeout ?? DEFAULT_TIMEOUT).catch((err) => { - log.error("failed to get tools from client", { key, error: err }) - return undefined - }) - return result?.tools + function defs(key: string, client: MCPClient, timeout?: number) { + return Effect.tryPromise({ + try: () => withTimeout(client.listTools(), timeout ?? DEFAULT_TIMEOUT), + catch: (err) => err instanceof Error ? err : new Error(String(err)), + }).pipe( + Effect.map((result) => result.tools), + Effect.catch((err) => { + log.error("failed to get tools from client", { key, error: err }) + return Effect.succeed(undefined) + }), + ) } - async function fetchFromClient( + function fetchFromClient( clientName: string, client: Client, listFn: (c: Client) => Promise, label: string, - ): Promise | undefined> { - const items = await listFn(client).catch((e: any) => { - log.error(`failed to get ${label}`, { clientName, error: e.message }) - return undefined - }) - if (!items) return undefined - - const out: Record = {} - const sanitizedClient = clientName.replace(/[^a-zA-Z0-9_-]/g, "_") - for (const item of items) { - const sanitizedName = item.name.replace(/[^a-zA-Z0-9_-]/g, "_") - out[sanitizedClient + ":" + sanitizedName] = { ...item, client: clientName } - } - return out + ) { + return Effect.tryPromise({ + try: () => listFn(client), + catch: (e: any) => { + log.error(`failed to get ${label}`, { clientName, error: e.message }) + return e + }, + }).pipe( + Effect.map((items) => { + const out: Record = {} + const sanitizedClient = sanitize(clientName) + for (const item of items) { + out[sanitizedClient + ":" + sanitize(item.name)] = { ...item, client: clientName } + } + return out + }), + Effect.orElseSucceed(() => undefined), + ) } - async function create(key: string, mcp: Config.Mcp) { - if (mcp.enabled === false) { - log.info("mcp server disabled", { key }) - return { - mcpClient: undefined, - status: { status: "disabled" as const }, - } - } - - log.info("found", { key, type: mcp.type }) - let mcpClient: MCPClient | undefined - let status: Status | undefined = undefined - - if (mcp.type === "remote") { - // OAuth is enabled by default for remote servers unless explicitly disabled with oauth: false - const oauthDisabled = mcp.oauth === false - const oauthConfig = typeof mcp.oauth === "object" ? mcp.oauth : undefined - let authProvider: McpOAuthProvider | undefined - - if (!oauthDisabled) { - authProvider = new McpOAuthProvider( - key, - mcp.url, - { - clientId: oauthConfig?.clientId, - clientSecret: oauthConfig?.clientSecret, - scope: oauthConfig?.scope, - }, - { - onRedirect: async (url) => { - log.info("oauth redirect requested", { key, url: url.toString() }) - // Store the URL - actual browser opening is handled by startAuth - }, - }, - ) - } - - const transports: Array<{ name: string; transport: TransportWithAuth }> = [ - { - name: "StreamableHTTP", - transport: new StreamableHTTPClientTransport(new URL(mcp.url), { - authProvider, - requestInit: mcp.headers ? { headers: mcp.headers } : undefined, - }), - }, - { - name: "SSE", - transport: new SSEClientTransport(new URL(mcp.url), { - authProvider, - requestInit: mcp.headers ? { headers: mcp.headers } : undefined, - }), - }, - ] - - let lastError: Error | undefined - const connectTimeout = mcp.timeout ?? DEFAULT_TIMEOUT - for (const { name, transport } of transports) { - try { - const client = new Client({ - name: "opencode", - version: Installation.VERSION, - }) - await withTimeout(client.connect(transport), connectTimeout) - mcpClient = client - log.info("connected", { key, transport: name }) - status = { status: "connected" } - break - } catch (error) { - lastError = error instanceof Error ? error : new Error(String(error)) - - // Handle OAuth-specific errors. - // The SDK throws UnauthorizedError when auth() returns 'REDIRECT', - // but may also throw plain Errors when auth() fails internally - // (e.g. during discovery, registration, or state generation). - // When an authProvider is attached, treat both cases as auth-related. - const isAuthError = - error instanceof UnauthorizedError || (authProvider && lastError.message.includes("OAuth")) - if (isAuthError) { - log.info("mcp server requires authentication", { key, transport: name }) - - // Check if this is a "needs registration" error - if (lastError.message.includes("registration") || lastError.message.includes("client_id")) { - status = { - status: "needs_client_registration" as const, - error: "Server does not support dynamic client registration. Please provide clientId in config.", - } - // Show toast for needs_client_registration - Bus.publish(TuiEvent.ToastShow, { - title: "MCP Authentication Required", - message: `Server "${key}" requires a pre-registered client ID. Add clientId to your config.`, - variant: "warning", - duration: 8000, - }).catch((e) => log.debug("failed to show toast", { error: e })) - } else { - // Store transport for later finishAuth call - pendingOAuthTransports.set(key, transport) - status = { status: "needs_auth" as const } - // Show toast for needs_auth - Bus.publish(TuiEvent.ToastShow, { - title: "MCP Authentication Required", - message: `Server "${key}" requires authentication. Run: opencode mcp auth ${key}`, - variant: "warning", - duration: 8000, - }).catch((e) => log.debug("failed to show toast", { error: e })) - } - break - } - - log.debug("transport connection failed", { - key, - transport: name, - url: mcp.url, - error: lastError.message, - }) - status = { - status: "failed" as const, - error: lastError.message, - } - } - } - } - - if (mcp.type === "local") { - const [cmd, ...args] = mcp.command - const cwd = Instance.directory - const transport = new StdioClientTransport({ - stderr: "pipe", - command: cmd, - args, - cwd, - env: { - ...process.env, - ...(cmd === "opencode" ? { BUN_BE_BUN: "1" } : {}), - ...mcp.environment, - }, - }) - transport.stderr?.on("data", (chunk: Buffer) => { - log.info(`mcp stderr: ${chunk.toString()}`, { key }) - }) - - const connectTimeout = mcp.timeout ?? DEFAULT_TIMEOUT - try { - const client = new Client({ - name: "opencode", - version: Installation.VERSION, - }) - await withTimeout(client.connect(transport), connectTimeout) - mcpClient = client - status = { - status: "connected", - } - } catch (error) { - log.error("local mcp startup failed", { - key, - command: mcp.command, - cwd, - error: error instanceof Error ? error.message : String(error), - }) - status = { - status: "failed" as const, - error: error instanceof Error ? error.message : String(error), - } - } - } - - if (!status) { - status = { - status: "failed" as const, - error: "Unknown error", - } - } - - if (!mcpClient) { - return { - mcpClient: undefined, - status, - } - } - - const listed = await defs(key, mcpClient, mcp.timeout) - if (!listed) { - await mcpClient.close().catch((error) => { - log.error("Failed to close MCP client", { - error, - }) - }) - return { - mcpClient: undefined, - status: { status: "failed" as const, error: "Failed to get tools" }, - } - } - - log.info("create() successfully created client", { key, toolCount: listed.length }) - return { - mcpClient, - status, - defs: listed, - } + interface CreateResult { + mcpClient?: MCPClient + status: Status + defs?: MCPToolDef[] } // --- Effect Service --- @@ -431,6 +248,184 @@ export namespace MCP { Effect.gen(function* () { const spawner = yield* ChildProcessSpawner.ChildProcessSpawner const auth = yield* McpAuth.Service + const bus = yield* Bus.Service + + type Transport = StdioClientTransport | StreamableHTTPClientTransport | SSEClientTransport + + /** + * Connect a client via the given transport with resource safety: + * on failure the transport is closed; on success the caller owns it. + */ + const connectTransport = (transport: Transport, timeout: number) => + Effect.acquireUseRelease( + Effect.succeed(transport), + (t) => + Effect.tryPromise({ + try: () => { + const client = new Client({ name: "opencode", version: Installation.VERSION }) + return withTimeout(client.connect(t), timeout).then(() => client) + }, + catch: (e) => (e instanceof Error ? e : new Error(String(e))), + }), + (t, exit) => + Exit.isFailure(exit) + ? Effect.tryPromise(() => t.close()).pipe(Effect.ignore) + : Effect.void, + ) + + const DISABLED_RESULT: CreateResult = { status: { status: "disabled" } } + + const connectRemote = Effect.fn("MCP.connectRemote")(function* (key: string, mcp: Config.Mcp & { type: "remote" }) { + const oauthDisabled = mcp.oauth === false + const oauthConfig = typeof mcp.oauth === "object" ? mcp.oauth : undefined + let authProvider: McpOAuthProvider | undefined + + if (!oauthDisabled) { + authProvider = new McpOAuthProvider( + key, + mcp.url, + { + clientId: oauthConfig?.clientId, + clientSecret: oauthConfig?.clientSecret, + scope: oauthConfig?.scope, + }, + { + onRedirect: async (url) => { + log.info("oauth redirect requested", { key, url: url.toString() }) + }, + }, + ) + } + + const transports: Array<{ name: string; transport: TransportWithAuth }> = [ + { + name: "StreamableHTTP", + transport: new StreamableHTTPClientTransport(new URL(mcp.url), { + authProvider, + requestInit: mcp.headers ? { headers: mcp.headers } : undefined, + }), + }, + { + name: "SSE", + transport: new SSEClientTransport(new URL(mcp.url), { + authProvider, + requestInit: mcp.headers ? { headers: mcp.headers } : undefined, + }), + }, + ] + + const connectTimeout = mcp.timeout ?? DEFAULT_TIMEOUT + let lastStatus: Status | undefined + + for (const { name, transport } of transports) { + const result = yield* connectTransport(transport, connectTimeout).pipe( + Effect.map((client) => ({ client, transportName: name })), + Effect.catch((error) => { + const lastError = error instanceof Error ? error : new Error(String(error)) + const isAuthError = + error instanceof UnauthorizedError || (authProvider && lastError.message.includes("OAuth")) + + if (isAuthError) { + log.info("mcp server requires authentication", { key, transport: name }) + + if (lastError.message.includes("registration") || lastError.message.includes("client_id")) { + lastStatus = { + status: "needs_client_registration" as const, + error: "Server does not support dynamic client registration. Please provide clientId in config.", + } + return bus.publish(TuiEvent.ToastShow, { + title: "MCP Authentication Required", + message: `Server "${key}" requires a pre-registered client ID. Add clientId to your config.`, + variant: "warning", + duration: 8000, + }).pipe(Effect.ignore, Effect.as(undefined)) + } else { + pendingOAuthTransports.set(key, transport) + lastStatus = { status: "needs_auth" as const } + return bus.publish(TuiEvent.ToastShow, { + title: "MCP Authentication Required", + message: `Server "${key}" requires authentication. Run: opencode mcp auth ${key}`, + variant: "warning", + duration: 8000, + }).pipe(Effect.ignore, Effect.as(undefined)) + } + } + + log.debug("transport connection failed", { + key, + transport: name, + url: mcp.url, + error: lastError.message, + }) + lastStatus = { status: "failed" as const, error: lastError.message } + return Effect.succeed(undefined) + }), + ) + if (result) { + log.info("connected", { key, transport: result.transportName }) + return { client: result.client as MCPClient | undefined, status: { status: "connected" } as Status } + } + // If this was an auth error, stop trying other transports + if (lastStatus?.status === "needs_auth" || lastStatus?.status === "needs_client_registration") break + } + + return { client: undefined as MCPClient | undefined, status: (lastStatus ?? { status: "failed", error: "Unknown error" }) as Status } + }) + + const connectLocal = Effect.fn("MCP.connectLocal")(function* (key: string, mcp: Config.Mcp & { type: "local" }) { + const [cmd, ...args] = mcp.command + const cwd = Instance.directory + const transport = new StdioClientTransport({ + stderr: "pipe", + command: cmd, + args, + cwd, + env: { + ...process.env, + ...(cmd === "opencode" ? { BUN_BE_BUN: "1" } : {}), + ...mcp.environment, + }, + }) + transport.stderr?.on("data", (chunk: Buffer) => { + log.info(`mcp stderr: ${chunk.toString()}`, { key }) + }) + + const connectTimeout = mcp.timeout ?? DEFAULT_TIMEOUT + return yield* connectTransport(transport, connectTimeout).pipe( + Effect.map((client): { client: MCPClient | undefined; status: Status } => ({ client, status: { status: "connected" } })), + Effect.catch((error): Effect.Effect<{ client: MCPClient | undefined; status: Status }> => { + const msg = error instanceof Error ? error.message : String(error) + log.error("local mcp startup failed", { key, command: mcp.command, cwd, error: msg }) + return Effect.succeed({ client: undefined, status: { status: "failed", error: msg } }) + }), + ) + }) + + const create = Effect.fn("MCP.create")(function* (key: string, mcp: Config.Mcp) { + if (mcp.enabled === false) { + log.info("mcp server disabled", { key }) + return DISABLED_RESULT + } + + log.info("found", { key, type: mcp.type }) + + const { client: mcpClient, status } = mcp.type === "remote" + ? yield* connectRemote(key, mcp as Config.Mcp & { type: "remote" }) + : yield* connectLocal(key, mcp as Config.Mcp & { type: "local" }) + + if (!mcpClient) { + return { status } satisfies CreateResult + } + + const listed = yield* defs(key, mcpClient, mcp.timeout) + if (!listed) { + yield* Effect.tryPromise(() => mcpClient.close()).pipe(Effect.ignore) + return { status: { status: "failed", error: "Failed to get tools" } } satisfies CreateResult + } + + log.info("create() successfully created client", { key, toolCount: listed.length }) + return { mcpClient, status, defs: listed } satisfies CreateResult + }) const descendants = Effect.fnUntraced( function* (pid: number) { @@ -463,20 +458,20 @@ export namespace MCP { log.info("tools list changed notification received", { server: name }) if (s.clients[name] !== client || s.status[name]?.status !== "connected") return - const listed = await defs(name, client, timeout) + const listed = await Effect.runPromise(defs(name, client, timeout)) if (!listed) return if (s.clients[name] !== client || s.status[name]?.status !== "connected") return s.defs[name] = listed - await Bus.publish(ToolsChanged, { server: name }).catch((error) => - log.warn("failed to publish tools changed", { server: name, error }), - ) + await Effect.runPromise(bus.publish(ToolsChanged, { server: name }).pipe(Effect.ignore)) }) } + const getConfig = () => Effect.promise(() => Config.get()) + const cache = yield* InstanceState.make( Effect.fn("MCP.state")(function* () { - const cfg = yield* Effect.promise(() => Config.get()) + const cfg = yield* getConfig() const config = cfg.mcp ?? {} const s: State = { status: {}, @@ -498,13 +493,15 @@ export namespace MCP { return } - const result = yield* Effect.promise(() => create(key, mcp).catch(() => undefined)) + const result = yield* create(key, mcp).pipe( + Effect.catch(() => Effect.succeed(undefined)), + ) if (!result) return s.status[key] = result.status if (result.mcpClient) { s.clients[key] = result.mcpClient - s.defs[key] = result.defs + s.defs[key] = result.defs! watch(s, key, result.mcpClient, mcp.timeout) } }), @@ -542,14 +539,12 @@ export namespace MCP { const client = s.clients[name] delete s.defs[name] if (!client) return Effect.void - return Effect.promise(() => - client.close().catch((error: any) => log.error("failed to close MCP client", { name, error })), - ) + return Effect.tryPromise(() => client.close()).pipe(Effect.ignore) } const status = Effect.fn("MCP.status")(function* () { const s = yield* InstanceState.get(cache) - const cfg = yield* Effect.promise(() => Config.get()) + const cfg = yield* getConfig() const config = cfg.mcp ?? {} const result: Record = {} @@ -568,14 +563,7 @@ export namespace MCP { const createAndStore = Effect.fn("MCP.createAndStore")(function* (name: string, mcp: Config.Mcp) { const s = yield* InstanceState.get(cache) - const result = yield* Effect.promise(() => create(name, mcp)) - - if (!result) { - yield* closeClient(s, name) - delete s.clients[name] - s.status[name] = { status: "failed" as const, error: "unknown error" } - return s.status[name] - } + const result = yield* create(name, mcp) s.status[name] = result.status if (!result.mcpClient) { @@ -586,7 +574,7 @@ export namespace MCP { yield* closeClient(s, name) s.clients[name] = result.mcpClient - s.defs[name] = result.defs + s.defs[name] = result.defs! watch(s, name, result.mcpClient, mcp.timeout) return result.status }) @@ -616,7 +604,7 @@ export namespace MCP { const tools = Effect.fn("MCP.tools")(function* () { const result: Record = {} const s = yield* InstanceState.get(cache) - const cfg = yield* Effect.promise(() => Config.get()) + const cfg = yield* getConfig() const config = cfg.mcp ?? {} const defaultTimeout = cfg.experimental?.mcp_timeout @@ -639,9 +627,7 @@ export namespace MCP { const timeout = entry?.timeout ?? defaultTimeout for (const mcpTool of listed) { - const sanitizedClientName = clientName.replace(/[^a-zA-Z0-9_-]/g, "_") - const sanitizedToolName = mcpTool.name.replace(/[^a-zA-Z0-9_-]/g, "_") - result[sanitizedClientName + "_" + sanitizedToolName] = convertMcpTool(mcpTool, client, timeout) + result[sanitize(clientName) + "_" + sanitize(mcpTool.name)] = convertMcpTool(mcpTool, client, timeout) } }), { concurrency: "unbounded" }, @@ -649,30 +635,29 @@ export namespace MCP { return result }) - function collectFromConnected( + function collectFromConnected( s: State, - fetchFn: (clientName: string, client: Client) => Promise | undefined>, + listFn: (c: Client) => Promise, + label: string, ) { return Effect.forEach( Object.entries(s.clients).filter(([name]) => s.status[name]?.status === "connected"), ([clientName, client]) => - Effect.promise(async () => Object.entries((await fetchFn(clientName, client)) ?? {})), + fetchFromClient(clientName, client, listFn, label).pipe( + Effect.map((items) => Object.entries(items ?? {})), + ), { concurrency: "unbounded" }, - ).pipe(Effect.map((results) => Object.fromEntries(results.flat()))) + ).pipe(Effect.map((results) => Object.fromEntries(results.flat()))) } const prompts = Effect.fn("MCP.prompts")(function* () { const s = yield* InstanceState.get(cache) - return yield* collectFromConnected(s, (name, client) => - fetchFromClient(name, client, (c) => c.listPrompts().then((r) => r.prompts), "prompts"), - ) + return yield* collectFromConnected(s, (c) => c.listPrompts().then((r) => r.prompts), "prompts") }) const resources = Effect.fn("MCP.resources")(function* () { const s = yield* InstanceState.get(cache) - return yield* collectFromConnected(s, (name, client) => - fetchFromClient(name, client, (c) => c.listResources().then((r) => r.resources), "resources"), - ) + return yield* collectFromConnected(s, (c) => c.listResources().then((r) => r.resources), "resources") }) const withClient = Effect.fnUntraced(function* ( @@ -713,7 +698,7 @@ export namespace MCP { }) const getMcpConfig = Effect.fnUntraced(function* (mcpName: string) { - const cfg = yield* Effect.promise(() => Config.get()) + const cfg = yield* getConfig() const mcpConfig = cfg.mcp?.[mcpName] if (!mcpConfig || !isMcpConfigured(mcpConfig)) return undefined return mcpConfig @@ -750,19 +735,21 @@ export namespace MCP { const transport = new StreamableHTTPClientTransport(new URL(mcpConfig.url), { authProvider }) - return yield* Effect.promise(async () => { - try { + return yield* Effect.tryPromise({ + try: () => { const client = new Client({ name: "opencode", version: Installation.VERSION }) - await client.connect(transport) - return { authorizationUrl: "", oauthState } - } catch (error) { + return client.connect(transport).then(() => ({ authorizationUrl: "", oauthState })) + }, + catch: (error) => error, + }).pipe( + Effect.catch((error) => { if (error instanceof UnauthorizedError && capturedUrl) { pendingOAuthTransports.set(mcpName, transport) - return { authorizationUrl: capturedUrl.toString(), oauthState } + return Effect.succeed({ authorizationUrl: capturedUrl.toString(), oauthState }) } - throw error - } - }) + return Effect.die(error) + }), + ) }) const authenticate = Effect.fn("MCP.authenticate")(function* (mcpName: string) { @@ -791,7 +778,7 @@ export namespace MCP { ), Effect.catch(() => { log.warn("failed to open browser, user must open URL manually", { mcpName }) - return Effect.promise(() => Bus.publish(BrowserOpenFailed, { mcpName, url: authorizationUrl })) + return bus.publish(BrowserOpenFailed, { mcpName, url: authorizationUrl }).pipe(Effect.ignore) }), ) @@ -811,10 +798,7 @@ export namespace MCP { if (!transport) throw new Error(`No pending OAuth flow for MCP server: ${mcpName}`) const result = yield* Effect.tryPromise({ - try: async () => { - await transport.finishAuth(authorizationCode) - return true - }, + try: () => transport.finishAuth(authorizationCode).then(() => true as const), catch: (error) => { log.error("failed to finish oauth", { mcpName, error }) return error @@ -887,6 +871,7 @@ export namespace MCP { const defaultLayer = layer.pipe( Layer.provide(McpAuth.layer), + Layer.provide(Bus.layer), Layer.provide(CrossSpawnSpawner.layer), Layer.provide(AppFileSystem.defaultLayer), Layer.provide(NodeFileSystem.layer), diff --git a/packages/opencode/test/mcp/lifecycle.test.ts b/packages/opencode/test/mcp/lifecycle.test.ts index 2880c053f1..d57783f66c 100644 --- a/packages/opencode/test/mcp/lifecycle.test.ts +++ b/packages/opencode/test/mcp/lifecycle.test.ts @@ -19,9 +19,12 @@ interface MockClientState { const clientStates = new Map() let lastCreatedClientName: string | undefined let connectShouldFail = false +let connectShouldHang = false let connectError = "Mock transport cannot connect" // Tracks how many Client instances were created (detects leaks) let clientCreateCount = 0 +// Tracks how many times transport.close() is called across all mock transports +let transportCloseCount = 0 function getOrCreateClientState(name?: string): MockClientState { const key = name ?? "default" @@ -44,32 +47,41 @@ function getOrCreateClientState(name?: string): MockClientState { return state } -// Mock transport that succeeds or fails based on connectShouldFail +// Mock transport that succeeds or fails based on connectShouldFail / connectShouldHang class MockStdioTransport { stderr: null = null pid = 12345 constructor(_opts: any) {} async start() { + if (connectShouldHang) return new Promise(() => {}) // never resolves if (connectShouldFail) throw new Error(connectError) } - async close() {} + async close() { + transportCloseCount++ + } } class MockStreamableHTTP { constructor(_url: URL, _opts?: any) {} async start() { + if (connectShouldHang) return new Promise(() => {}) // never resolves if (connectShouldFail) throw new Error(connectError) } - async close() {} + async close() { + transportCloseCount++ + } async finishAuth() {} } class MockSSE { constructor(_url: URL, _opts?: any) {} async start() { - throw new Error("SSE fallback - not used in these tests") + if (connectShouldHang) return new Promise(() => {}) // never resolves + if (connectShouldFail) throw new Error(connectError) + } + async close() { + transportCloseCount++ } - async close() {} } mock.module("@modelcontextprotocol/sdk/client/stdio.js", () => ({ @@ -145,8 +157,10 @@ beforeEach(() => { clientStates.clear() lastCreatedClientName = undefined connectShouldFail = false + connectShouldHang = false connectError = "Mock transport cannot connect" clientCreateCount = 0 + transportCloseCount = 0 }) // Import after mocks @@ -658,3 +672,80 @@ test( }, ), ) + + +// ======================================================================== +// Test: transport leak — local stdio timeout (#19168) +// ======================================================================== + +test( + "local stdio transport is closed when connect times out (no process leak)", + withInstance({}, async () => { + lastCreatedClientName = "hanging-server" + getOrCreateClientState("hanging-server") + connectShouldHang = true + + const addResult = await MCP.add("hanging-server", { + type: "local", + command: ["node", "fake.js"], + timeout: 100, + }) + + const serverStatus = (addResult.status as any)["hanging-server"] ?? addResult.status + expect(serverStatus.status).toBe("failed") + expect(serverStatus.error).toContain("timed out") + // Transport must be closed to avoid orphaned child process + expect(transportCloseCount).toBeGreaterThanOrEqual(1) + }), +) + +// ======================================================================== +// Test: transport leak — remote timeout (#19168) +// ======================================================================== + +test( + "remote transport is closed when connect times out", + withInstance({}, async () => { + lastCreatedClientName = "hanging-remote" + getOrCreateClientState("hanging-remote") + connectShouldHang = true + + const addResult = await MCP.add("hanging-remote", { + type: "remote", + url: "http://localhost:9999/mcp", + timeout: 100, + oauth: false, + }) + + const serverStatus = (addResult.status as any)["hanging-remote"] ?? addResult.status + expect(serverStatus.status).toBe("failed") + // Transport must be closed to avoid leaked HTTP connections + expect(transportCloseCount).toBeGreaterThanOrEqual(1) + }), +) + +// ======================================================================== +// Test: transport leak — failed remote transports not closed (#19168) +// ======================================================================== + +test( + "failed remote transport is closed before trying next transport", + withInstance({}, async () => { + lastCreatedClientName = "fail-remote" + getOrCreateClientState("fail-remote") + connectShouldFail = true + connectError = "Connection refused" + + const addResult = await MCP.add("fail-remote", { + type: "remote", + url: "http://localhost:9999/mcp", + timeout: 5000, + oauth: false, + }) + + const serverStatus = (addResult.status as any)["fail-remote"] ?? addResult.status + expect(serverStatus.status).toBe("failed") + // Both StreamableHTTP and SSE transports should be closed + expect(transportCloseCount).toBeGreaterThanOrEqual(2) + }), +)