refactor(mcp): convert create/defs/tryConnect to Effect-native patterns

Replace the async tryConnect helper with connectTransport using
Effect.acquireUseRelease for guaranteed transport cleanup on failure.
Convert create() from async function to Effect.gen and defs() to
return an Effect. Callers now yield* directly instead of wrapping
in Effect.promise.
worktree-agent-a682f34a
Kit Langton 2026-03-25 22:11:39 -04:00
parent ebdecf2ec7
commit 7965300ce5
1 changed files with 125 additions and 121 deletions

View File

@ -24,7 +24,7 @@ import { BusEvent } from "../bus/bus-event"
import { Bus } from "@/bus"
import { TuiEvent } from "@/cli/cmd/tui/event"
import open from "open"
import { Effect, Layer, Option, ServiceMap, Stream } from "effect"
import { Effect, Exit, Layer, Option, ServiceMap, Stream } from "effect"
import { InstanceState } from "@/effect/instance-state"
import { makeRuntime } from "@/effect/run-service"
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
@ -160,12 +160,17 @@ export namespace MCP {
})
}
async function defs(key: string, client: MCPClient, timeout?: number) {
const result = await withTimeout(client.listTools(), timeout ?? DEFAULT_TIMEOUT).catch((err) => {
log.error("failed to get tools from client", { key, error: err })
return undefined
})
return result?.tools
function defs(key: string, client: MCPClient, timeout?: number) {
return Effect.tryPromise({
try: () => withTimeout(client.listTools(), timeout ?? DEFAULT_TIMEOUT),
catch: (err) => err instanceof Error ? err : new Error(String(err)),
}).pipe(
Effect.map((result) => result.tools),
Effect.catch((err) => {
log.error("failed to get tools from client", { key, error: err })
return Effect.succeed(undefined)
}),
)
}
async function fetchFromClient<T extends { name: string }>(
@ -191,24 +196,34 @@ export namespace MCP {
type Transport = StdioClientTransport | StreamableHTTPClientTransport | SSEClientTransport
/** Try to connect a client via the given transport; closes the transport on failure. */
async function tryConnect(transport: Transport, timeout: number): Promise<MCPClient> {
const client = new Client({ name: "opencode", version: Installation.VERSION })
try {
await withTimeout(client.connect(transport), timeout)
return client
} catch (error) {
await transport.close().catch(() => {})
throw error
}
}
/**
* Connect a client via the given transport with resource safety:
* on failure the transport is closed; on success the caller owns it.
*/
const connectTransport = (transport: Transport, timeout: number) =>
Effect.acquireUseRelease(
Effect.succeed(transport),
(t) =>
Effect.tryPromise({
try: () => {
const client = new Client({ name: "opencode", version: Installation.VERSION })
return withTimeout(client.connect(t), timeout).then(() => client)
},
catch: (e) => (e instanceof Error ? e : new Error(String(e))),
}),
(t, exit) =>
Exit.isFailure(exit)
? Effect.promise(() => t.close()).pipe(Effect.ignore)
: Effect.void,
)
async function create(key: string, mcp: Config.Mcp) {
const create = Effect.fn("MCP.create")(function* (key: string, mcp: Config.Mcp) {
if (mcp.enabled === false) {
log.info("mcp server disabled", { key })
return {
mcpClient: undefined,
status: { status: "disabled" as const },
mcpClient: undefined as MCPClient | undefined,
status: { status: "disabled" as const } as Status,
defs: undefined as MCPToolDef[] | undefined,
}
}
@ -257,70 +272,64 @@ export namespace MCP {
},
]
let lastError: Error | undefined
const connectTimeout = mcp.timeout ?? DEFAULT_TIMEOUT
for (const { name, transport } of transports) {
try {
mcpClient = await tryConnect(transport, connectTimeout)
log.info("connected", { key, transport: name })
const result = yield* connectTransport(transport, connectTimeout).pipe(
Effect.map((client) => ({ client, transportName: name })),
Effect.catch((error) => {
const lastError = error instanceof Error ? error : new Error(String(error))
// Handle OAuth-specific errors.
const isAuthError =
error instanceof UnauthorizedError || (authProvider && lastError.message.includes("OAuth"))
if (isAuthError) {
log.info("mcp server requires authentication", { key, transport: name })
if (lastError.message.includes("registration") || lastError.message.includes("client_id")) {
status = {
status: "needs_client_registration" as const,
error: "Server does not support dynamic client registration. Please provide clientId in config.",
}
Bus.publish(TuiEvent.ToastShow, {
title: "MCP Authentication Required",
message: `Server "${key}" requires a pre-registered client ID. Add clientId to your config.`,
variant: "warning",
duration: 8000,
}).catch((e) => log.debug("failed to show toast", { error: e }))
} else {
pendingOAuthTransports.set(key, transport)
status = { status: "needs_auth" as const }
Bus.publish(TuiEvent.ToastShow, {
title: "MCP Authentication Required",
message: `Server "${key}" requires authentication. Run: opencode mcp auth ${key}`,
variant: "warning",
duration: 8000,
}).catch((e) => log.debug("failed to show toast", { error: e }))
}
} else {
log.debug("transport connection failed", {
key,
transport: name,
url: mcp.url,
error: lastError.message,
})
status = {
status: "failed" as const,
error: lastError.message,
}
}
return Effect.succeed(undefined)
}),
)
if (result) {
mcpClient = result.client
log.info("connected", { key, transport: result.transportName })
status = { status: "connected" }
break
} catch (error) {
lastError = error instanceof Error ? error : new Error(String(error))
// Handle OAuth-specific errors.
// The SDK throws UnauthorizedError when auth() returns 'REDIRECT',
// but may also throw plain Errors when auth() fails internally
// (e.g. during discovery, registration, or state generation).
// When an authProvider is attached, treat both cases as auth-related.
const isAuthError =
error instanceof UnauthorizedError || (authProvider && lastError.message.includes("OAuth"))
if (isAuthError) {
log.info("mcp server requires authentication", { key, transport: name })
// Check if this is a "needs registration" error
if (lastError.message.includes("registration") || lastError.message.includes("client_id")) {
// tryConnect already closed the transport
status = {
status: "needs_client_registration" as const,
error: "Server does not support dynamic client registration. Please provide clientId in config.",
}
// Show toast for needs_client_registration
Bus.publish(TuiEvent.ToastShow, {
title: "MCP Authentication Required",
message: `Server "${key}" requires a pre-registered client ID. Add clientId to your config.`,
variant: "warning",
duration: 8000,
}).catch((e) => log.debug("failed to show toast", { error: e }))
} else {
// Store transport for later finishAuth call
// Note: tryConnect closed the transport, but the SDK's finishAuth
// only needs the authProvider reference which survives close()
pendingOAuthTransports.set(key, transport)
status = { status: "needs_auth" as const }
// Show toast for needs_auth
Bus.publish(TuiEvent.ToastShow, {
title: "MCP Authentication Required",
message: `Server "${key}" requires authentication. Run: opencode mcp auth ${key}`,
variant: "warning",
duration: 8000,
}).catch((e) => log.debug("failed to show toast", { error: e }))
}
break
}
// tryConnect already closed the failed transport
log.debug("transport connection failed", {
key,
transport: name,
url: mcp.url,
error: lastError.message,
})
status = {
status: "failed" as const,
error: lastError.message,
}
}
// If this was an auth error, stop trying other transports
if ((status as Status | undefined)?.status === "needs_auth" || (status as Status | undefined)?.status === "needs_client_registration") break
}
}
@ -343,23 +352,25 @@ export namespace MCP {
})
const connectTimeout = mcp.timeout ?? DEFAULT_TIMEOUT
try {
mcpClient = await tryConnect(transport, connectTimeout)
status = {
status: "connected",
}
} catch (error) {
// tryConnect already closed the transport (kills orphaned child process)
log.error("local mcp startup failed", {
key,
command: mcp.command,
cwd,
error: error instanceof Error ? error.message : String(error),
})
status = {
status: "failed" as const,
error: error instanceof Error ? error.message : String(error),
}
const result = yield* connectTransport(transport, connectTimeout).pipe(
Effect.catch((error) => {
const msg = error instanceof Error ? error.message : String(error)
log.error("local mcp startup failed", {
key,
command: mcp.command,
cwd,
error: msg,
})
status = {
status: "failed" as const,
error: msg,
}
return Effect.succeed(undefined)
}),
)
if (result) {
mcpClient = result
status = { status: "connected" }
}
}
@ -372,31 +383,29 @@ export namespace MCP {
if (!mcpClient) {
return {
mcpClient: undefined,
mcpClient: undefined as MCPClient | undefined,
status,
defs: undefined as MCPToolDef[] | undefined,
}
}
const listed = await defs(key, mcpClient, mcp.timeout)
const listed = yield* defs(key, mcpClient, mcp.timeout)
if (!listed) {
await mcpClient.close().catch((error) => {
log.error("Failed to close MCP client", {
error,
})
})
yield* Effect.promise(() => mcpClient!.close()).pipe(Effect.ignore)
return {
mcpClient: undefined,
status: { status: "failed" as const, error: "Failed to get tools" },
mcpClient: undefined as MCPClient | undefined,
status: { status: "failed" as const, error: "Failed to get tools" } as Status,
defs: undefined as MCPToolDef[] | undefined,
}
}
log.info("create() successfully created client", { key, toolCount: listed.length })
return {
mcpClient,
mcpClient: mcpClient as MCPClient | undefined,
status,
defs: listed,
defs: listed as MCPToolDef[] | undefined,
}
}
})
// --- Effect Service ---
@ -472,7 +481,7 @@ export namespace MCP {
log.info("tools list changed notification received", { server: name })
if (s.clients[name] !== client || s.status[name]?.status !== "connected") return
const listed = await defs(name, client, timeout)
const listed = await Effect.runPromise(defs(name, client, timeout))
if (!listed) return
if (s.clients[name] !== client || s.status[name]?.status !== "connected") return
@ -507,13 +516,15 @@ export namespace MCP {
return
}
const result = yield* Effect.promise(() => create(key, mcp).catch(() => undefined))
const result = yield* create(key, mcp).pipe(
Effect.catch(() => Effect.succeed(undefined)),
)
if (!result) return
s.status[key] = result.status
if (result.mcpClient) {
s.clients[key] = result.mcpClient
s.defs[key] = result.defs
s.defs[key] = result.defs!
watch(s, key, result.mcpClient, mcp.timeout)
}
}),
@ -577,14 +588,7 @@ export namespace MCP {
const createAndStore = Effect.fn("MCP.createAndStore")(function* (name: string, mcp: Config.Mcp) {
const s = yield* InstanceState.get(cache)
const result = yield* Effect.promise(() => create(name, mcp))
if (!result) {
yield* closeClient(s, name)
delete s.clients[name]
s.status[name] = { status: "failed" as const, error: "unknown error" }
return s.status[name]
}
const result = yield* create(name, mcp)
s.status[name] = result.status
if (!result.mcpClient) {
@ -595,7 +599,7 @@ export namespace MCP {
yield* closeClient(s, name)
s.clients[name] = result.mcpClient
s.defs[name] = result.defs
s.defs[name] = result.defs!
watch(s, name, result.mcpClient, mcp.timeout)
return result.status
})