diff --git a/packages/opencode/src/mcp/index.ts b/packages/opencode/src/mcp/index.ts index d1275b1b96..70cab2ffff 100644 --- a/packages/opencode/src/mcp/index.ts +++ b/packages/opencode/src/mcp/index.ts @@ -24,7 +24,7 @@ import { BusEvent } from "../bus/bus-event" import { Bus } from "@/bus" import { TuiEvent } from "@/cli/cmd/tui/event" import open from "open" -import { Effect, Layer, Option, ServiceMap, Stream } from "effect" +import { Effect, Exit, Layer, Option, ServiceMap, Stream } from "effect" import { InstanceState } from "@/effect/instance-state" import { makeRuntime } from "@/effect/run-service" import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process" @@ -160,12 +160,17 @@ export namespace MCP { }) } - async function defs(key: string, client: MCPClient, timeout?: number) { - const result = await withTimeout(client.listTools(), timeout ?? DEFAULT_TIMEOUT).catch((err) => { - log.error("failed to get tools from client", { key, error: err }) - return undefined - }) - return result?.tools + function defs(key: string, client: MCPClient, timeout?: number) { + return Effect.tryPromise({ + try: () => withTimeout(client.listTools(), timeout ?? DEFAULT_TIMEOUT), + catch: (err) => err instanceof Error ? err : new Error(String(err)), + }).pipe( + Effect.map((result) => result.tools), + Effect.catch((err) => { + log.error("failed to get tools from client", { key, error: err }) + return Effect.succeed(undefined) + }), + ) } async function fetchFromClient( @@ -191,24 +196,34 @@ export namespace MCP { type Transport = StdioClientTransport | StreamableHTTPClientTransport | SSEClientTransport - /** Try to connect a client via the given transport; closes the transport on failure. */ - async function tryConnect(transport: Transport, timeout: number): Promise { - const client = new Client({ name: "opencode", version: Installation.VERSION }) - try { - await withTimeout(client.connect(transport), timeout) - return client - } catch (error) { - await transport.close().catch(() => {}) - throw error - } - } + /** + * Connect a client via the given transport with resource safety: + * on failure the transport is closed; on success the caller owns it. + */ + const connectTransport = (transport: Transport, timeout: number) => + Effect.acquireUseRelease( + Effect.succeed(transport), + (t) => + Effect.tryPromise({ + try: () => { + const client = new Client({ name: "opencode", version: Installation.VERSION }) + return withTimeout(client.connect(t), timeout).then(() => client) + }, + catch: (e) => (e instanceof Error ? e : new Error(String(e))), + }), + (t, exit) => + Exit.isFailure(exit) + ? Effect.promise(() => t.close()).pipe(Effect.ignore) + : Effect.void, + ) - async function create(key: string, mcp: Config.Mcp) { + const create = Effect.fn("MCP.create")(function* (key: string, mcp: Config.Mcp) { if (mcp.enabled === false) { log.info("mcp server disabled", { key }) return { - mcpClient: undefined, - status: { status: "disabled" as const }, + mcpClient: undefined as MCPClient | undefined, + status: { status: "disabled" as const } as Status, + defs: undefined as MCPToolDef[] | undefined, } } @@ -257,70 +272,64 @@ export namespace MCP { }, ] - let lastError: Error | undefined const connectTimeout = mcp.timeout ?? DEFAULT_TIMEOUT for (const { name, transport } of transports) { - try { - mcpClient = await tryConnect(transport, connectTimeout) - log.info("connected", { key, transport: name }) + const result = yield* connectTransport(transport, connectTimeout).pipe( + Effect.map((client) => ({ client, transportName: name })), + Effect.catch((error) => { + const lastError = error instanceof Error ? error : new Error(String(error)) + + // Handle OAuth-specific errors. + const isAuthError = + error instanceof UnauthorizedError || (authProvider && lastError.message.includes("OAuth")) + if (isAuthError) { + log.info("mcp server requires authentication", { key, transport: name }) + + if (lastError.message.includes("registration") || lastError.message.includes("client_id")) { + status = { + status: "needs_client_registration" as const, + error: "Server does not support dynamic client registration. Please provide clientId in config.", + } + Bus.publish(TuiEvent.ToastShow, { + title: "MCP Authentication Required", + message: `Server "${key}" requires a pre-registered client ID. Add clientId to your config.`, + variant: "warning", + duration: 8000, + }).catch((e) => log.debug("failed to show toast", { error: e })) + } else { + pendingOAuthTransports.set(key, transport) + status = { status: "needs_auth" as const } + Bus.publish(TuiEvent.ToastShow, { + title: "MCP Authentication Required", + message: `Server "${key}" requires authentication. Run: opencode mcp auth ${key}`, + variant: "warning", + duration: 8000, + }).catch((e) => log.debug("failed to show toast", { error: e })) + } + } else { + log.debug("transport connection failed", { + key, + transport: name, + url: mcp.url, + error: lastError.message, + }) + status = { + status: "failed" as const, + error: lastError.message, + } + } + + return Effect.succeed(undefined) + }), + ) + if (result) { + mcpClient = result.client + log.info("connected", { key, transport: result.transportName }) status = { status: "connected" } break - } catch (error) { - lastError = error instanceof Error ? error : new Error(String(error)) - - // Handle OAuth-specific errors. - // The SDK throws UnauthorizedError when auth() returns 'REDIRECT', - // but may also throw plain Errors when auth() fails internally - // (e.g. during discovery, registration, or state generation). - // When an authProvider is attached, treat both cases as auth-related. - const isAuthError = - error instanceof UnauthorizedError || (authProvider && lastError.message.includes("OAuth")) - if (isAuthError) { - log.info("mcp server requires authentication", { key, transport: name }) - - // Check if this is a "needs registration" error - if (lastError.message.includes("registration") || lastError.message.includes("client_id")) { - // tryConnect already closed the transport - status = { - status: "needs_client_registration" as const, - error: "Server does not support dynamic client registration. Please provide clientId in config.", - } - // Show toast for needs_client_registration - Bus.publish(TuiEvent.ToastShow, { - title: "MCP Authentication Required", - message: `Server "${key}" requires a pre-registered client ID. Add clientId to your config.`, - variant: "warning", - duration: 8000, - }).catch((e) => log.debug("failed to show toast", { error: e })) - } else { - // Store transport for later finishAuth call - // Note: tryConnect closed the transport, but the SDK's finishAuth - // only needs the authProvider reference which survives close() - pendingOAuthTransports.set(key, transport) - status = { status: "needs_auth" as const } - // Show toast for needs_auth - Bus.publish(TuiEvent.ToastShow, { - title: "MCP Authentication Required", - message: `Server "${key}" requires authentication. Run: opencode mcp auth ${key}`, - variant: "warning", - duration: 8000, - }).catch((e) => log.debug("failed to show toast", { error: e })) - } - break - } - - // tryConnect already closed the failed transport - log.debug("transport connection failed", { - key, - transport: name, - url: mcp.url, - error: lastError.message, - }) - status = { - status: "failed" as const, - error: lastError.message, - } } + // If this was an auth error, stop trying other transports + if ((status as Status | undefined)?.status === "needs_auth" || (status as Status | undefined)?.status === "needs_client_registration") break } } @@ -343,23 +352,25 @@ export namespace MCP { }) const connectTimeout = mcp.timeout ?? DEFAULT_TIMEOUT - try { - mcpClient = await tryConnect(transport, connectTimeout) - status = { - status: "connected", - } - } catch (error) { - // tryConnect already closed the transport (kills orphaned child process) - log.error("local mcp startup failed", { - key, - command: mcp.command, - cwd, - error: error instanceof Error ? error.message : String(error), - }) - status = { - status: "failed" as const, - error: error instanceof Error ? error.message : String(error), - } + const result = yield* connectTransport(transport, connectTimeout).pipe( + Effect.catch((error) => { + const msg = error instanceof Error ? error.message : String(error) + log.error("local mcp startup failed", { + key, + command: mcp.command, + cwd, + error: msg, + }) + status = { + status: "failed" as const, + error: msg, + } + return Effect.succeed(undefined) + }), + ) + if (result) { + mcpClient = result + status = { status: "connected" } } } @@ -372,31 +383,29 @@ export namespace MCP { if (!mcpClient) { return { - mcpClient: undefined, + mcpClient: undefined as MCPClient | undefined, status, + defs: undefined as MCPToolDef[] | undefined, } } - const listed = await defs(key, mcpClient, mcp.timeout) + const listed = yield* defs(key, mcpClient, mcp.timeout) if (!listed) { - await mcpClient.close().catch((error) => { - log.error("Failed to close MCP client", { - error, - }) - }) + yield* Effect.promise(() => mcpClient!.close()).pipe(Effect.ignore) return { - mcpClient: undefined, - status: { status: "failed" as const, error: "Failed to get tools" }, + mcpClient: undefined as MCPClient | undefined, + status: { status: "failed" as const, error: "Failed to get tools" } as Status, + defs: undefined as MCPToolDef[] | undefined, } } log.info("create() successfully created client", { key, toolCount: listed.length }) return { - mcpClient, + mcpClient: mcpClient as MCPClient | undefined, status, - defs: listed, + defs: listed as MCPToolDef[] | undefined, } - } + }) // --- Effect Service --- @@ -472,7 +481,7 @@ export namespace MCP { log.info("tools list changed notification received", { server: name }) if (s.clients[name] !== client || s.status[name]?.status !== "connected") return - const listed = await defs(name, client, timeout) + const listed = await Effect.runPromise(defs(name, client, timeout)) if (!listed) return if (s.clients[name] !== client || s.status[name]?.status !== "connected") return @@ -507,13 +516,15 @@ export namespace MCP { return } - const result = yield* Effect.promise(() => create(key, mcp).catch(() => undefined)) + const result = yield* create(key, mcp).pipe( + Effect.catch(() => Effect.succeed(undefined)), + ) if (!result) return s.status[key] = result.status if (result.mcpClient) { s.clients[key] = result.mcpClient - s.defs[key] = result.defs + s.defs[key] = result.defs! watch(s, key, result.mcpClient, mcp.timeout) } }), @@ -577,14 +588,7 @@ export namespace MCP { const createAndStore = Effect.fn("MCP.createAndStore")(function* (name: string, mcp: Config.Mcp) { const s = yield* InstanceState.get(cache) - const result = yield* Effect.promise(() => create(name, mcp)) - - if (!result) { - yield* closeClient(s, name) - delete s.clients[name] - s.status[name] = { status: "failed" as const, error: "unknown error" } - return s.status[name] - } + const result = yield* create(name, mcp) s.status[name] = result.status if (!result.mcpClient) { @@ -595,7 +599,7 @@ export namespace MCP { yield* closeClient(s, name) s.clients[name] = result.mcpClient - s.defs[name] = result.defs + s.defs[name] = result.defs! watch(s, name, result.mcpClient, mcp.timeout) return result.status })