Compare commits

...

5 Commits

Author SHA1 Message Date
Kit Langton e4fbf5c657 fix(mcp): use Effect.timeoutOrElse for transport connect timeout
Replace withTimeout (Promise.race) with Effect.timeoutOrElse so that
fiber interruption properly triggers acquireUseRelease cleanup. The
Promise.race approach left the underlying connect promise dangling,
which prevented the release function from running on some platforms.
2026-03-26 11:19:10 -04:00
Kit Langton 3439521ef0 effectify MCP service: convert remaining Promise code to idiomatic Effect
- fetchFromClient: async function -> Effect.tryPromise + Effect.map
- collectFromConnected: compose directly with Effect fetchFromClient
- closeClient: Effect.promise(.catch) -> Effect.tryPromise + Effect.ignore
- Bus.publish calls: .catch() -> busPublish helper with Effect.tryPromise + Effect.ignore
- startAuth: Effect.promise(async try/catch) -> Effect.tryPromise + Effect.catch
- finishAuth: simplify inner async/await
- create(): eliminate mutable let vars, extract connectRemote/connectLocal
- Extract sanitize helper, CreateResult interface, busPublish helper
- watch callback: use busPublish via Effect.runPromise
- getConfig helper function for repeated Config.get() pattern
2026-03-26 10:39:33 -04:00
Kit Langton 7965300ce5 refactor(mcp): convert create/defs/tryConnect to Effect-native patterns
Replace the async tryConnect helper with connectTransport using
Effect.acquireUseRelease for guaranteed transport cleanup on failure.
Convert create() from async function to Effect.gen and defs() to
return an Effect. Callers now yield* directly instead of wrapping
in Effect.promise.
2026-03-26 10:39:33 -04:00
Kit Langton ebdecf2ec7 fix(mcp): close transport on failed/timed-out connections
When withTimeout rejects during MCP connect, the transport (and its
child process for stdio servers) was never closed. Extract a tryConnect
helper that ensures transport.close() is always called on failure,
eliminating process/connection leaks in all connect paths.

Fixes #19168
2026-03-26 10:39:33 -04:00
Kit Langton 4342ef6c97 review test 2026-03-26 10:39:32 -04:00
9 changed files with 1253 additions and 229 deletions

View File

@ -0,0 +1,65 @@
---
description: Review pull requests for correctness bugs and regressions
mode: primary
model: opencode/gpt-5.4
reasoningEffort: high
textVerbosity: low
temperature: 0.1
tools:
write: false
edit: false
bash: false
webfetch: false
task: false
todowrite: false
---
You are a pull request reviewer focused on correctness.
Start by reading `.opencode-review/pr.json`, `.opencode-review/files.json`, and
`.opencode-review/diff.patch`.
You have read access to the full repository. Use that access only for targeted
follow-up on changed files: direct callees, direct callers, touched tests,
related types, or helpers needed to confirm a concrete bug.
Review strategy:
1. Start with changed hunks.
2. Read the full changed file only when a hunk needs more context.
3. Expand to other files only when they are directly relevant to a suspected
bug.
4. Stop once you have enough evidence to either report the issue or discard it.
Avoid broad repo exploration. Do not read unrelated files just to learn the
architecture. Prefer depth on a few relevant files over breadth across many
files.
Report only concrete issues with a plausible failure mode. Ignore formatting,
micro-optimizations, and weak style opinions.
Do not report more than 5 findings.
Return only JSON. The response must be an array of objects with this exact
shape:
```json
[
{
"category": "correctness",
"severity": "must-fix",
"confidence": "high",
"file": "path/to/file.ts",
"line": 12,
"summary": "Short one-line bug summary",
"evidence": "Why this is a real issue in the current code",
"suggestion": "Optional fix direction",
"introduced": true
}
]
```
Severity must be one of `must-fix`, `should-fix`, or `suggestion`.
Confidence must be one of `high`, `medium`, or `low`.
If there are no issues, return `[]`.

View File

@ -0,0 +1,64 @@
---
description: Review pull requests for high-signal maintainability issues
mode: primary
model: opencode/gpt-5.4
reasoningEffort: high
textVerbosity: low
temperature: 0.1
tools:
write: false
edit: false
bash: false
webfetch: false
task: false
todowrite: false
---
You are a pull request reviewer focused on maintainability.
Start by reading `.opencode-review/pr.json`, `.opencode-review/files.json`, and
`.opencode-review/diff.patch`.
Use repository guidance from `AGENTS.md` and `REVIEW.md` when present. Be
strict about real repo conventions, but do not nitpick personal taste.
Review strategy:
1. Start with changed hunks.
2. Read the full changed file when needed.
3. Expand to nearby helpers, tests, or conventions only when the diff suggests
a real maintainability problem.
4. Stop when you have enough evidence.
Avoid repo-wide convention hunts. Do not search broadly for every possible
style rule.
Only report issues that create meaningful maintenance cost, hide bugs, or break
clear project conventions. Ignore harmless formatting or one-off stylistic
differences.
Do not report more than 5 findings.
Return only JSON. The response must be an array of objects with this exact
shape:
```json
[
{
"category": "maintainability",
"severity": "should-fix",
"confidence": "high",
"file": "path/to/file.ts",
"line": 12,
"summary": "Short one-line maintainability issue summary",
"evidence": "Why this matters in this codebase",
"suggestion": "Optional fix direction",
"introduced": true
}
]
```
Severity must be one of `must-fix`, `should-fix`, or `suggestion`.
Confidence must be one of `high`, `medium`, or `low`.
If there are no issues, return `[]`.

View File

@ -0,0 +1,63 @@
---
description: Review pull requests for security issues and unsafe changes
mode: primary
model: opencode/gpt-5.4
reasoningEffort: high
textVerbosity: low
temperature: 0.1
tools:
write: false
edit: false
bash: false
webfetch: false
task: false
todowrite: false
---
You are a pull request reviewer focused on security.
Start by reading `.opencode-review/pr.json`, `.opencode-review/files.json`, and
`.opencode-review/diff.patch`.
You have read access to the full repository. Inspect related code only when it
is directly connected to changed code, especially auth, validation,
persistence, secrets handling, logging, and data exposure paths.
Review strategy:
1. Start with changed hunks.
2. Read the full changed file only when needed.
3. Expand only to directly connected validation, auth, storage, or transport
code.
4. Stop once you can prove or reject the issue.
Avoid broad repo sweeps or generic checklist-driven exploration.
Only report concrete issues introduced or exposed by this pull request. Ignore
generic OWASP checklists unless the code actually shows the problem.
Do not report more than 5 findings.
Return only JSON. The response must be an array of objects with this exact
shape:
```json
[
{
"category": "security",
"severity": "must-fix",
"confidence": "high",
"file": "path/to/file.ts",
"line": 12,
"summary": "Short one-line security issue summary",
"evidence": "Why this is a real issue in the current code",
"suggestion": "Optional fix direction",
"introduced": true
}
]
```
Severity must be one of `must-fix`, `should-fix`, or `suggestion`.
Confidence must be one of `high`, `medium`, or `low`.
If there are no issues, return `[]`.

View File

@ -0,0 +1,56 @@
---
description: Verify pull request review findings and remove weak claims
mode: primary
model: opencode/gpt-5.4
reasoningEffort: high
textVerbosity: low
temperature: 0.1
tools:
write: false
edit: false
bash: false
webfetch: false
task: false
todowrite: false
---
You are a verification pass for pull request review findings.
Start by reading `.opencode-review/pr.json`, `.opencode-review/files.json`,
`.opencode-review/diff.patch`, and `.opencode-review/candidates.json`.
For each candidate, inspect the cited code and reject anything that is:
- vague or speculative
- duplicated by a stronger finding
- unsupported by the current code
- not meaningfully attributable to this pull request
- a harmless style preference
Keep only findings with concrete evidence and an actionable explanation.
Prefer reading the cited file and directly related context only. Do not do a
broad repo search unless a candidate specifically depends on another file.
Return no more than 8 findings.
Return only JSON. The response must be an array of objects with this exact
shape:
```json
[
{
"category": "correctness",
"severity": "must-fix",
"confidence": "high",
"file": "path/to/file.ts",
"line": 12,
"summary": "Short one-line issue summary",
"evidence": "Why this survived verification",
"suggestion": "Optional fix direction",
"introduced": true
}
]
```
If there are no verified issues, return `[]`.

21
REVIEW.md 100644
View File

@ -0,0 +1,21 @@
# Review Guidelines
## Prioritize
- correctness bugs, regressions, and unsafe edge cases
- security issues with concrete impact
- maintainability problems that clearly violate repo conventions
## Flag
- unnecessary `any` in new code when a precise type is practical
- deep nesting when early returns would make the flow clearer
- duplicated logic that should obviously reuse existing helpers
- new routes, migrations, or persistence changes that look untested or unsafe
## Skip
- harmless formatting differences
- stylistic nits without clear repo guidance
- optional micro-optimizations without user impact
- pre-existing issues unrelated to the pull request

View File

@ -0,0 +1,664 @@
#!/usr/bin/env bun
import { NodeFileSystem, NodePath } from "@effect/platform-node"
import * as CrossSpawnSpawner from "../src/effect/cross-spawn-spawner"
import { makeRuntime } from "../src/effect/run-service"
import path from "path"
import { Duration, Effect, Fiber, FileSystem, Layer, Schema, Schedule, ServiceMap, Stream } from "effect"
import type { PlatformError } from "effect/PlatformError"
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
const Category = Schema.Union([
Schema.Literal("correctness"),
Schema.Literal("security"),
Schema.Literal("maintainability"),
])
const Severity = Schema.Union([Schema.Literal("must-fix"), Schema.Literal("should-fix"), Schema.Literal("suggestion")])
const Confidence = Schema.Union([Schema.Literal("high"), Schema.Literal("medium"), Schema.Literal("low")])
class Base extends Schema.Class<Base>("ReviewBase")({
ref: Schema.String,
}) {}
class Head extends Schema.Class<Head>("ReviewHead")({
sha: Schema.String,
ref: Schema.String,
}) {}
class Pull extends Schema.Class<Pull>("ReviewPull")({
number: Schema.Number,
title: Schema.String,
body: Schema.NullOr(Schema.String),
head: Head,
base: Base,
}) {}
class PullFile extends Schema.Class<PullFile>("ReviewPullFile")({
filename: Schema.String,
status: Schema.String,
patch: Schema.optional(Schema.String),
}) {}
class PullContext extends Schema.Class<PullContext>("ReviewPullContext")({
repo: Schema.String,
mergeBase: Schema.String,
pull: Pull,
}) {}
class Finding extends Schema.Class<Finding>("ReviewFinding")({
category: Category,
severity: Severity,
confidence: Confidence,
file: Schema.String,
line: Schema.Number,
summary: Schema.String,
evidence: Schema.String,
suggestion: Schema.String,
introduced: Schema.Boolean,
}) {}
class ReviewError extends Schema.TaggedErrorClass<ReviewError>()("ReviewError", {
message: Schema.String,
cause: Schema.optional(Schema.Defect),
}) {}
const PullFiles = Schema.Array(PullFile)
const Findings = Schema.Array(Finding)
const decodePullJson = Schema.decodeSync(Schema.fromJsonString(Pull))
const decodePullFilesJson = Schema.decodeSync(Schema.fromJsonString(PullFiles))
const decodeFindingsJson = Schema.decodeSync(Schema.fromJsonString(Findings))
const encodePullContext = Schema.encodeSync(Schema.fromJsonString(PullContext))
const encodePullFiles = Schema.encodeSync(Schema.fromJsonString(PullFiles))
const encodeFindings = Schema.encodeSync(Schema.fromJsonString(Findings))
const args = parse(process.argv.slice(2))
export namespace Review {
export interface Interface {
readonly run: (input: {
repo: string
pr: number
post: boolean
}) => Effect.Effect<void, ReviewError | PlatformError>
}
export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/Review") {}
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
const fs = yield* FileSystem.FileSystem
const spawner = yield* ChildProcessSpawner.ChildProcessSpawner
const root = process.cwd()
const bin = process.env.OPENCODE_BIN ?? "opencode"
const note = (text: string) => Effect.sync(() => console.error(`[review] ${text}`))
const fail = (message: string) => (cause: unknown) =>
new ReviewError({
message,
cause,
})
const cmd = Effect.fn("Review.cmd")(function* (file: string, argv: string[], cwd: string) {
const handle = yield* spawner.spawn(
ChildProcess.make(file, argv, {
cwd,
extendEnv: true,
stdin: "ignore",
stdout: "pipe",
stderr: "pipe",
}),
)
const [stdout, stderr, code] = yield* Effect.all(
[
Stream.mkString(Stream.decodeText(handle.stdout)),
Stream.mkString(Stream.decodeText(handle.stderr)),
handle.exitCode,
],
{ concurrency: 3 },
)
if (code !== ChildProcessSpawner.ExitCode(0)) {
return yield* new ReviewError({
message: `${file} ${argv.join(" ")} failed`,
cause: new Error(stderr.trim() || stdout.trim() || `exit=${code}`),
})
}
return stdout
}, Effect.scoped)
const pull = (text: string) =>
Effect.try({
try: () => decodePullJson(text),
catch: fail("pull decode failed"),
})
const files = (text: string) =>
Effect.try({
try: () => decodePullFilesJson(text),
catch: fail("pull files decode failed"),
})
const findings = (text: string) =>
Effect.try({
try: () => decodeFindingsJson(text),
catch: fail("findings decode failed"),
})
const gh = Effect.fn("Review.gh")(function* (argv: string[]) {
return yield* cmd("gh", argv, root)
})
const git = Effect.fn("Review.git")(function* (argv: string[], cwd: string) {
return yield* cmd("git", argv, cwd)
})
const sync = Effect.fn("Review.sync")(function* (dir: string, box: string) {
const src = path.join(root, ".opencode", "agents")
const dst = path.join(dir, ".opencode", "agents")
yield* fs.makeDirectory(dst, { recursive: true }).pipe(Effect.mapError(fail("create agents dir failed")))
for (const name of [
"review-correctness.md",
"review-security.md",
"review-maintainability.md",
"review-verify.md",
]) {
const text = yield* fs.readFileString(path.join(src, name)).pipe(Effect.mapError(fail(`read ${name} failed`)))
yield* fs.writeFileString(path.join(dst, name), text).pipe(Effect.mapError(fail(`write ${name} failed`)))
}
const review = yield* fs
.readFileString(path.join(root, "REVIEW.md"))
.pipe(Effect.mapError(fail("read REVIEW.md failed")))
yield* fs
.writeFileString(path.join(box, "REVIEW.md"), review)
.pipe(Effect.mapError(fail("write REVIEW.md failed")))
})
const parseText = Effect.fn("Review.parseText")(function* (text: string) {
const body = text.trim()
if (!body) return yield* new ReviewError({ message: "review agent returned no text" })
const clean = strip(body)
try {
return decodeFindingsJson(clean)
} catch {}
const start = clean.indexOf("[")
const end = clean.lastIndexOf("]")
if (start !== -1 && end > start) {
return yield* findings(clean.slice(start, end + 1))
}
return yield* new ReviewError({ message: `could not parse findings JSON\n\n${clean}` })
})
const talk = Effect.fn("Review.talk")(function* (agent: string, prompt: string, cwd: string) {
const out: string[] = []
const err: string[] = []
const handle = yield* spawner.spawn(
ChildProcess.make(bin, ["run", "--agent", agent, "--format", "json", prompt], {
cwd,
extendEnv: true,
stdin: "ignore",
stdout: "pipe",
stderr: "pipe",
}),
)
const [, , code] = yield* Effect.all(
[
handle.stdout.pipe(
Stream.decodeText(),
Stream.splitLines,
Stream.runForEach((line) =>
Effect.sync(() => {
out.push(line)
trace(agent, line)
}),
),
),
handle.stderr.pipe(
Stream.decodeText(),
Stream.splitLines,
Stream.runForEach((line) =>
Effect.sync(() => {
err.push(line)
if (line.trim()) console.error(`[review:${agent}] ${line}`)
}),
),
),
handle.exitCode,
],
{ concurrency: 3 },
)
if (code !== ChildProcessSpawner.ExitCode(0)) {
return yield* new ReviewError({
message: `${agent} failed`,
cause: new Error(err.join("\n").trim() || out.join("\n").trim() || `exit=${code}`),
})
}
return out.join("\n")
}, Effect.scoped)
const pass = Effect.fn("Review.pass")(function* (agent: string, prompt: string, cwd: string) {
yield* note(`${agent} tools: read/glob/grep/list allowed; write/edit/bash denied`)
const raw = yield* talk(agent, prompt, cwd)
return yield* parseText(collect(raw))
})
const job = Effect.fn("Review.job")(function* (
name: string,
fx: Effect.Effect<readonly Finding[], ReviewError | PlatformError>,
) {
yield* note(`${name} started`)
const beat = yield* note(`${name} still running`).pipe(
Effect.repeat(Schedule.spaced(Duration.seconds(15))),
Effect.delay(Duration.seconds(15)),
Effect.forkScoped,
)
const out = yield* fx.pipe(
Effect.timeout(Duration.minutes(10)),
Effect.catchTag("TimeoutError", () =>
Effect.fail(new ReviewError({ message: `${name} timed out after 600s` })),
),
Effect.ensuring(Fiber.interrupt(beat)),
)
yield* note(`${name} finished (${out.length} findings)`)
return out
}, Effect.scoped)
const safe = (name: string, fx: Effect.Effect<readonly Finding[], ReviewError | PlatformError>) =>
fx.pipe(Effect.catch((err) => note(`pass failed: ${name}: ${err.message}`).pipe(Effect.as([] as const))))
const inline = Effect.fn("Review.inline")(function* (repo: string, pr: number, sha: string, item: Finding) {
yield* gh([
"api",
"--method",
"POST",
"-H",
"Accept: application/vnd.github+json",
"-H",
"X-GitHub-Api-Version: 2022-11-28",
`/repos/${repo}/pulls/${pr}/comments`,
"-f",
`body=${body(item)}`,
"-f",
`commit_id=${sha}`,
"-f",
`path=${item.file}`,
"-F",
`line=${Math.trunc(item.line)}`,
"-f",
"side=RIGHT",
])
})
const top = Effect.fn("Review.top")(function* (repo: string, pr: number, text: string) {
yield* gh(["pr", "comment", String(pr), "--repo", repo, "--body", text])
})
const run = Effect.fn("Review.run")(function* (input: { repo: string; pr: number; post: boolean }) {
yield* note(`loading PR #${input.pr}`)
const data = yield* gh(["api", `/repos/${input.repo}/pulls/${input.pr}`]).pipe(Effect.flatMap(pull))
const list = yield* gh(["api", `/repos/${input.repo}/pulls/${input.pr}/files?per_page=100`]).pipe(
Effect.flatMap(files),
)
const tmp = yield* fs
.makeTempDirectoryScoped({ prefix: "opencode-review-" })
.pipe(Effect.mapError(fail("create temp dir failed")))
const dir = path.join(tmp, `pr-${input.pr}`)
yield* note("preparing worktree")
yield* git(
["fetch", "origin", data.base.ref, `refs/pull/${input.pr}/head:refs/remotes/origin/pr-${input.pr}`],
root,
)
yield* Effect.acquireRelease(
git(["worktree", "add", "--detach", dir, `refs/remotes/origin/pr-${input.pr}`], root),
() => git(["worktree", "remove", "--force", dir], root).pipe(Effect.catch(() => Effect.void)),
)
const base = (yield* git(["merge-base", `origin/${data.base.ref}`, "HEAD"], dir)).trim()
const diff = yield* git(["diff", "--unified=3", `${base}...HEAD`], dir)
const box = path.join(dir, ".opencode-review")
yield* fs.makeDirectory(box, { recursive: true }).pipe(Effect.mapError(fail("create review dir failed")))
yield* sync(dir, box)
yield* fs
.writeFileString(
path.join(box, "pr.json"),
encodePullContext(
new PullContext({
repo: input.repo,
mergeBase: base,
pull: data,
}),
),
)
.pipe(Effect.mapError(fail("write pr.json failed")))
yield* fs
.writeFileString(path.join(box, "files.json"), encodePullFiles(list))
.pipe(Effect.mapError(fail("write files.json failed")))
yield* fs
.writeFileString(path.join(box, "diff.patch"), diff)
.pipe(Effect.mapError(fail("write diff.patch failed")))
const out = yield* Effect.all(
[
safe("correctness", job("correctness", pass("review-correctness", correctness(data, list), dir))),
safe("security", job("security", pass("review-security", security(data, list), dir))),
safe(
"maintainability",
job("maintainability", pass("review-maintainability", maintainability(data, list), dir)),
),
],
{ concurrency: 3 },
)
const merged = dedupe(out.flatMap((item) => [...item]))
yield* fs
.writeFileString(path.join(box, "candidates.json"), encodeFindings(merged))
.pipe(Effect.mapError(fail("write candidates.json failed")))
const kept = merged.length
? dedupe(yield* job("verifier", pass("review-verify", verify(data, merged), dir)))
: []
const ranges = new Map(list.map((item) => [item.filename, hunks(item.patch)]))
const notes = kept.filter((item) => inDiff(ranges.get(item.file), item.line))
const rest = kept.filter((item) => !inDiff(ranges.get(item.file), item.line))
if (!input.post) {
yield* Effect.sync(() => print(kept, notes, rest))
return
}
if (!kept.length) {
yield* top(input.repo, input.pr, "lgtm")
return
}
yield* Effect.all(
notes.map((item) => inline(input.repo, input.pr, data.head.sha, item)),
{ concurrency: 1 },
)
if (rest.length) yield* top(input.repo, input.pr, summary(rest))
})
return Service.of({
run: (input) => run(input).pipe(Effect.scoped),
})
}),
)
export const defaultLayer = layer.pipe(
Layer.provide(CrossSpawnSpawner.layer),
Layer.provide(NodeFileSystem.layer),
Layer.provide(NodePath.layer),
)
const { runPromise } = makeRuntime(Service, defaultLayer)
export function run(input: { repo: string; pr: number; post: boolean }) {
return runPromise((svc) => svc.run(input))
}
}
await Review.run(args)
function parse(argv: string[]) {
let repo: string | undefined
let pr: number | undefined
let post = false
for (let i = 0; i < argv.length; i++) {
const arg = argv[i]
if (arg === "--repo") repo = argv[++i]
if (arg === "--pr") pr = Number(argv[++i])
if (arg === "--post") post = true
}
if (!repo) throw new Error("Missing --repo")
if (!pr) throw new Error("Missing --pr")
return { repo, pr, post }
}
function collect(raw: string) {
const seen = new Set<string>()
const out: string[] = []
for (const row of raw.split(/\r?\n/)) {
if (!row.trim()) continue
let item: unknown
try {
item = JSON.parse(row)
} catch {
continue
}
if (!item || typeof item !== "object" || !("type" in item) || item.type !== "text") continue
if (!("part" in item) || !item.part || typeof item.part !== "object") continue
const part = item.part as { id?: string; text?: string }
if (!part.id || seen.has(part.id)) continue
seen.add(part.id)
if (typeof part.text === "string") out.push(part.text)
}
return out.join("\n")
}
function trace(agent: string, row: string) {
if (!row.trim()) return
let item: unknown
try {
item = JSON.parse(row)
} catch {
console.error(`[review:${agent}] ${row}`)
return
}
if (!item || typeof item !== "object") return
const type = "type" in item && typeof item.type === "string" ? item.type : undefined
if (!type) return
if (type === "tool_use") {
const part = "part" in item && item.part && typeof item.part === "object" ? item.part : undefined
const tool = part && "tool" in part && typeof part.tool === "string" ? part.tool : "tool"
const state = part && "state" in part && part.state && typeof part.state === "object" ? part.state : undefined
const input = state && "input" in state ? brief(state.input) : ""
console.error(`[review:${agent}] ${tool}${input ? ` ${input}` : ""}`)
return
}
if (type === "step_start") {
console.error(`[review:${agent}] step started`)
return
}
if (type === "step_finish") {
const part = "part" in item && item.part && typeof item.part === "object" ? item.part : undefined
const reason = part && "reason" in part && typeof part.reason === "string" ? part.reason : "step"
console.error(`[review:${agent}] step finished (${reason})`)
}
}
function brief(input: unknown) {
if (!input || typeof input !== "object") return ""
if ("filePath" in input && typeof input.filePath === "string") return input.filePath
if ("path" in input && typeof input.path === "string") return input.path
if ("pattern" in input && typeof input.pattern === "string") return input.pattern
if ("command" in input && typeof input.command === "string") return input.command
if ("include" in input && typeof input.include === "string") return input.include
return ""
}
function strip(text: string) {
if (!text.startsWith("```") || !text.endsWith("```")) return text
return text
.replace(/^```[a-zA-Z]*\n?/, "")
.replace(/\n?```$/, "")
.trim()
}
function correctness(data: Pull, list: readonly PullFile[]) {
return [
`Review pull request #${data.number}: ${data.title}.`,
`Base ref: ${data.base.ref}. Head ref: ${data.head.ref}.`,
`Changed files: ${list.map((item) => item.filename).join(", ")}.`,
"Read `.opencode-review/REVIEW.md` before reviewing.",
"Start with the diff. Use the rest of the repo only for targeted confirmation.",
"Avoid broad exploration. Follow direct references only.",
"Find correctness bugs, regressions, missing edge-case handling, broken invariants, and unsafe assumptions.",
"Only report issues introduced or exposed by this pull request.",
].join("\n")
}
function security(data: Pull, list: readonly PullFile[]) {
return [
`Review pull request #${data.number}: ${data.title}.`,
`Base ref: ${data.base.ref}. Head ref: ${data.head.ref}.`,
`Changed files: ${list.map((item) => item.filename).join(", ")}.`,
"Read `.opencode-review/REVIEW.md` before reviewing.",
"Start with the diff. Use the rest of the repo only for targeted confirmation.",
"Avoid broad exploration. Follow direct auth, validation, storage, or transport links only.",
"Find concrete security issues such as missing validation, unsafe auth checks, secrets exposure, or data leaks.",
"Only report issues introduced or exposed by this pull request.",
].join("\n")
}
function maintainability(data: Pull, list: readonly PullFile[]) {
return [
`Review pull request #${data.number}: ${data.title}.`,
`Base ref: ${data.base.ref}. Head ref: ${data.head.ref}.`,
`Changed files: ${list.map((item) => item.filename).join(", ")}.`,
"Read `.opencode-review/REVIEW.md` before reviewing.",
"Start with the diff. Use the rest of the repo only for targeted confirmation.",
"Avoid broad exploration. Focus on maintainability issues made visible by the changed files.",
"Find high-signal maintainability issues that clearly violate repo conventions or make future bugs likely.",
"Do not nitpick harmless style differences.",
].join("\n")
}
function verify(data: Pull, list: readonly Finding[]) {
return [
`Verify review findings for pull request #${data.number}: ${data.title}.`,
`Candidates: ${list.length}.`,
"Inspect the cited file first and expand only if needed to confirm or reject the finding.",
"Reject anything vague, duplicated, unsupported, or not attributable to the pull request.",
].join("\n")
}
function dedupe(list: readonly Finding[]) {
const seen = new Set<string>()
return order(list).filter((item) => {
const key = [item.category, item.file, Math.trunc(item.line), item.summary.trim().toLowerCase()].join(":")
if (seen.has(key)) return false
seen.add(key)
return true
})
}
function order(list: readonly Finding[]) {
const rank = {
"must-fix": 0,
"should-fix": 1,
suggestion: 2,
}
return [...list].sort((a, b) => {
const left = rank[a.severity] - rank[b.severity]
if (left) return left
return a.file.localeCompare(b.file) || a.line - b.line
})
}
function hunks(patch?: string) {
if (!patch) return [] as [number, number][]
const out: [number, number][] = []
let line = 0
let start = -1
let end = -1
for (const row of patch.split("\n")) {
if (row.startsWith("@@")) {
push(out, start, end)
start = -1
end = -1
const hit = /\+([0-9]+)(?:,([0-9]+))?/.exec(row)
line = hit ? Number(hit[1]) : 0
continue
}
if (row.startsWith("+") && !row.startsWith("+++")) {
start = start === -1 ? line : start
end = line
line += 1
continue
}
if (row.startsWith("-") && !row.startsWith("---")) continue
push(out, start, end)
start = -1
end = -1
line += 1
}
push(out, start, end)
return out
}
function push(out: [number, number][], start: number, end: number) {
if (start === -1 || end === -1) return
const prev = out.at(-1)
if (prev && prev[1] + 1 >= start) {
prev[1] = Math.max(prev[1], end)
return
}
out.push([start, end])
}
function inDiff(list: [number, number][] | undefined, line: number) {
return !!list?.some((item) => line >= item[0] && line <= item[1])
}
function body(item: Finding) {
const out = [`[${item.severity}] ${item.summary}`, "", item.evidence]
if (item.suggestion.trim()) out.push("", `Suggestion: ${item.suggestion.trim()}`)
return out.join("\n")
}
function summary(list: readonly Finding[]) {
const head = "OpenCode review found additional PR-relevant issues that could not be placed on changed lines:"
const body = order(list).map(
(item) => `- [${item.severity}] \`${item.file}:${Math.trunc(item.line)}\` ${item.summary}`,
)
return [head, "", ...body].join("\n")
}
function print(all: readonly Finding[], notes: readonly Finding[], rest: readonly Finding[]) {
console.log("# OpenCode Review")
console.log()
console.log(`- total: ${all.length}`)
console.log(`- inline-ready: ${notes.length}`)
console.log(`- summary-only: ${rest.length}`)
console.log()
for (const item of order(all)) {
console.log(`- [${item.severity}] ${item.file}:${Math.trunc(item.line)} ${item.summary}`)
console.log(` ${item.evidence}`)
if (item.suggestion.trim()) console.log(` suggestion: ${item.suggestion.trim()}`)
}
}

View File

@ -3,6 +3,15 @@ import * as ServiceMap from "effect/ServiceMap"
export const memoMap = Layer.makeMemoMapUnsafe() export const memoMap = Layer.makeMemoMapUnsafe()
export function makeRunPromise<I, S, E>(service: ServiceMap.Service<I, S>, layer: Layer.Layer<I, E>) {
let rt: ManagedRuntime.ManagedRuntime<I, E> | undefined
return <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>, options?: Effect.RunOptions) => {
rt ??= ManagedRuntime.make(layer, { memoMap })
return rt.runPromise(service.use(fn), options)
}
}
export function makeRuntime<I, S, E>(service: ServiceMap.Service<I, S>, layer: Layer.Layer<I, E>) { export function makeRuntime<I, S, E>(service: ServiceMap.Service<I, S>, layer: Layer.Layer<I, E>) {
let rt: ManagedRuntime.ManagedRuntime<I, E> | undefined let rt: ManagedRuntime.ManagedRuntime<I, E> | undefined
const getRuntime = () => (rt ??= ManagedRuntime.make(layer, { memoMap })) const getRuntime = () => (rt ??= ManagedRuntime.make(layer, { memoMap }))

View File

@ -24,7 +24,7 @@ import { BusEvent } from "../bus/bus-event"
import { Bus } from "@/bus" import { Bus } from "@/bus"
import { TuiEvent } from "@/cli/cmd/tui/event" import { TuiEvent } from "@/cli/cmd/tui/event"
import open from "open" import open from "open"
import { Effect, Layer, Option, ServiceMap, Stream } from "effect" import { Effect, Exit, Layer, Option, ServiceMap, Stream } from "effect"
import { InstanceState } from "@/effect/instance-state" import { InstanceState } from "@/effect/instance-state"
import { makeRuntime } from "@/effect/run-service" import { makeRuntime } from "@/effect/run-service"
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process" import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
@ -129,6 +129,8 @@ export namespace MCP {
return typeof entry === "object" && entry !== null && "type" in entry return typeof entry === "object" && entry !== null && "type" in entry
} }
const sanitize = (s: string) => s.replace(/[^a-zA-Z0-9_-]/g, "_")
// Convert MCP tool definition to AI SDK Tool type // Convert MCP tool definition to AI SDK Tool type
function convertMcpTool(mcpTool: MCPToolDef, client: MCPClient, timeout?: number): Tool { function convertMcpTool(mcpTool: MCPToolDef, client: MCPClient, timeout?: number): Tool {
const inputSchema = mcpTool.inputSchema const inputSchema = mcpTool.inputSchema
@ -160,141 +162,157 @@ export namespace MCP {
}) })
} }
async function defs(key: string, client: MCPClient, timeout?: number) { function defs(key: string, client: MCPClient, timeout?: number) {
const result = await withTimeout(client.listTools(), timeout ?? DEFAULT_TIMEOUT).catch((err) => { return Effect.tryPromise({
log.error("failed to get tools from client", { key, error: err }) try: () => withTimeout(client.listTools(), timeout ?? DEFAULT_TIMEOUT),
return undefined catch: (err) => err instanceof Error ? err : new Error(String(err)),
}) }).pipe(
return result?.tools Effect.map((result) => result.tools),
Effect.catch((err) => {
log.error("failed to get tools from client", { key, error: err })
return Effect.succeed(undefined)
}),
)
} }
async function fetchFromClient<T extends { name: string }>( function fetchFromClient<T extends { name: string }>(
clientName: string, clientName: string,
client: Client, client: Client,
listFn: (c: Client) => Promise<T[]>, listFn: (c: Client) => Promise<T[]>,
label: string, label: string,
): Promise<Record<string, T & { client: string }> | undefined> { ) {
const items = await listFn(client).catch((e: any) => { return Effect.tryPromise({
log.error(`failed to get ${label}`, { clientName, error: e.message }) try: () => listFn(client),
return undefined catch: (e: any) => {
}) log.error(`failed to get ${label}`, { clientName, error: e.message })
if (!items) return undefined return e
},
const out: Record<string, T & { client: string }> = {} }).pipe(
const sanitizedClient = clientName.replace(/[^a-zA-Z0-9_-]/g, "_") Effect.map((items) => {
for (const item of items) { const out: Record<string, T & { client: string }> = {}
const sanitizedName = item.name.replace(/[^a-zA-Z0-9_-]/g, "_") const sanitizedClient = sanitize(clientName)
out[sanitizedClient + ":" + sanitizedName] = { ...item, client: clientName } for (const item of items) {
} out[sanitizedClient + ":" + sanitize(item.name)] = { ...item, client: clientName }
return out }
return out
}),
Effect.orElseSucceed(() => undefined),
)
} }
async function create(key: string, mcp: Config.Mcp) { type Transport = StdioClientTransport | StreamableHTTPClientTransport | SSEClientTransport
if (mcp.enabled === false) {
log.info("mcp server disabled", { key }) /**
return { * Connect a client via the given transport with resource safety:
mcpClient: undefined, * on failure the transport is closed; on success the caller owns it.
status: { status: "disabled" as const }, */
} const connectTransport = (transport: Transport, timeout: number) =>
Effect.acquireUseRelease(
Effect.succeed(transport),
(t) => {
const client = new Client({ name: "opencode", version: Installation.VERSION })
return Effect.tryPromise({
try: () => client.connect(t).then(() => client),
catch: (e) => (e instanceof Error ? e : new Error(String(e))),
}).pipe(
Effect.timeoutOrElse({
duration: `${timeout} millis`,
onTimeout: () => Effect.fail(new Error(`Operation timed out after ${timeout}ms`)),
}),
)
},
(t, exit) =>
Exit.isFailure(exit)
? Effect.tryPromise(() => t.close()).pipe(Effect.ignore)
: Effect.void,
)
/** Fire-and-forget Bus.publish wrapped in Effect */
const busPublish = <D extends BusEvent.Definition>(def: D, properties: z.output<D["properties"]>) =>
Effect.tryPromise(() => Bus.publish(def, properties)).pipe(Effect.ignore)
interface CreateResult {
mcpClient?: MCPClient
status: Status
defs?: MCPToolDef[]
}
const DISABLED_RESULT: CreateResult = { status: { status: "disabled" } }
const connectRemote = Effect.fn("MCP.connectRemote")(function* (key: string, mcp: Config.Mcp & { type: "remote" }) {
const oauthDisabled = mcp.oauth === false
const oauthConfig = typeof mcp.oauth === "object" ? mcp.oauth : undefined
let authProvider: McpOAuthProvider | undefined
if (!oauthDisabled) {
authProvider = new McpOAuthProvider(
key,
mcp.url,
{
clientId: oauthConfig?.clientId,
clientSecret: oauthConfig?.clientSecret,
scope: oauthConfig?.scope,
},
{
onRedirect: async (url) => {
log.info("oauth redirect requested", { key, url: url.toString() })
},
},
)
} }
log.info("found", { key, type: mcp.type }) const transports: Array<{ name: string; transport: TransportWithAuth }> = [
let mcpClient: MCPClient | undefined {
let status: Status | undefined = undefined name: "StreamableHTTP",
transport: new StreamableHTTPClientTransport(new URL(mcp.url), {
authProvider,
requestInit: mcp.headers ? { headers: mcp.headers } : undefined,
}),
},
{
name: "SSE",
transport: new SSEClientTransport(new URL(mcp.url), {
authProvider,
requestInit: mcp.headers ? { headers: mcp.headers } : undefined,
}),
},
]
if (mcp.type === "remote") { const connectTimeout = mcp.timeout ?? DEFAULT_TIMEOUT
// OAuth is enabled by default for remote servers unless explicitly disabled with oauth: false let lastStatus: Status | undefined
const oauthDisabled = mcp.oauth === false
const oauthConfig = typeof mcp.oauth === "object" ? mcp.oauth : undefined
let authProvider: McpOAuthProvider | undefined
if (!oauthDisabled) { for (const { name, transport } of transports) {
authProvider = new McpOAuthProvider( const result = yield* connectTransport(transport, connectTimeout).pipe(
key, Effect.map((client) => ({ client, transportName: name })),
mcp.url, Effect.catch((error) => {
{ const lastError = error instanceof Error ? error : new Error(String(error))
clientId: oauthConfig?.clientId,
clientSecret: oauthConfig?.clientSecret,
scope: oauthConfig?.scope,
},
{
onRedirect: async (url) => {
log.info("oauth redirect requested", { key, url: url.toString() })
// Store the URL - actual browser opening is handled by startAuth
},
},
)
}
const transports: Array<{ name: string; transport: TransportWithAuth }> = [
{
name: "StreamableHTTP",
transport: new StreamableHTTPClientTransport(new URL(mcp.url), {
authProvider,
requestInit: mcp.headers ? { headers: mcp.headers } : undefined,
}),
},
{
name: "SSE",
transport: new SSEClientTransport(new URL(mcp.url), {
authProvider,
requestInit: mcp.headers ? { headers: mcp.headers } : undefined,
}),
},
]
let lastError: Error | undefined
const connectTimeout = mcp.timeout ?? DEFAULT_TIMEOUT
for (const { name, transport } of transports) {
try {
const client = new Client({
name: "opencode",
version: Installation.VERSION,
})
await withTimeout(client.connect(transport), connectTimeout)
mcpClient = client
log.info("connected", { key, transport: name })
status = { status: "connected" }
break
} catch (error) {
lastError = error instanceof Error ? error : new Error(String(error))
// Handle OAuth-specific errors.
// The SDK throws UnauthorizedError when auth() returns 'REDIRECT',
// but may also throw plain Errors when auth() fails internally
// (e.g. during discovery, registration, or state generation).
// When an authProvider is attached, treat both cases as auth-related.
const isAuthError = const isAuthError =
error instanceof UnauthorizedError || (authProvider && lastError.message.includes("OAuth")) error instanceof UnauthorizedError || (authProvider && lastError.message.includes("OAuth"))
if (isAuthError) { if (isAuthError) {
log.info("mcp server requires authentication", { key, transport: name }) log.info("mcp server requires authentication", { key, transport: name })
// Check if this is a "needs registration" error
if (lastError.message.includes("registration") || lastError.message.includes("client_id")) { if (lastError.message.includes("registration") || lastError.message.includes("client_id")) {
status = { lastStatus = {
status: "needs_client_registration" as const, status: "needs_client_registration" as const,
error: "Server does not support dynamic client registration. Please provide clientId in config.", error: "Server does not support dynamic client registration. Please provide clientId in config.",
} }
// Show toast for needs_client_registration return busPublish(TuiEvent.ToastShow, {
Bus.publish(TuiEvent.ToastShow, {
title: "MCP Authentication Required", title: "MCP Authentication Required",
message: `Server "${key}" requires a pre-registered client ID. Add clientId to your config.`, message: `Server "${key}" requires a pre-registered client ID. Add clientId to your config.`,
variant: "warning", variant: "warning",
duration: 8000, duration: 8000,
}).catch((e) => log.debug("failed to show toast", { error: e })) }).pipe(Effect.as(undefined))
} else { } else {
// Store transport for later finishAuth call
pendingOAuthTransports.set(key, transport) pendingOAuthTransports.set(key, transport)
status = { status: "needs_auth" as const } lastStatus = { status: "needs_auth" as const }
// Show toast for needs_auth return busPublish(TuiEvent.ToastShow, {
Bus.publish(TuiEvent.ToastShow, {
title: "MCP Authentication Required", title: "MCP Authentication Required",
message: `Server "${key}" requires authentication. Run: opencode mcp auth ${key}`, message: `Server "${key}" requires authentication. Run: opencode mcp auth ${key}`,
variant: "warning", variant: "warning",
duration: 8000, duration: 8000,
}).catch((e) => log.debug("failed to show toast", { error: e })) }).pipe(Effect.as(undefined))
} }
break
} }
log.debug("transport connection failed", { log.debug("transport connection failed", {
@ -303,91 +321,75 @@ export namespace MCP {
url: mcp.url, url: mcp.url,
error: lastError.message, error: lastError.message,
}) })
status = { lastStatus = { status: "failed" as const, error: lastError.message }
status: "failed" as const, return Effect.succeed(undefined)
error: lastError.message, }),
} )
} if (result) {
log.info("connected", { key, transport: result.transportName })
return { client: result.client as MCPClient | undefined, status: { status: "connected" } as Status }
} }
// If this was an auth error, stop trying other transports
if (lastStatus?.status === "needs_auth" || lastStatus?.status === "needs_client_registration") break
} }
if (mcp.type === "local") { return { client: undefined as MCPClient | undefined, status: (lastStatus ?? { status: "failed", error: "Unknown error" }) as Status }
const [cmd, ...args] = mcp.command })
const cwd = Instance.directory
const transport = new StdioClientTransport({
stderr: "pipe",
command: cmd,
args,
cwd,
env: {
...process.env,
...(cmd === "opencode" ? { BUN_BE_BUN: "1" } : {}),
...mcp.environment,
},
})
transport.stderr?.on("data", (chunk: Buffer) => {
log.info(`mcp stderr: ${chunk.toString()}`, { key })
})
const connectTimeout = mcp.timeout ?? DEFAULT_TIMEOUT const connectLocal = Effect.fn("MCP.connectLocal")(function* (key: string, mcp: Config.Mcp & { type: "local" }) {
try { const [cmd, ...args] = mcp.command
const client = new Client({ const cwd = Instance.directory
name: "opencode", const transport = new StdioClientTransport({
version: Installation.VERSION, stderr: "pipe",
}) command: cmd,
await withTimeout(client.connect(transport), connectTimeout) args,
mcpClient = client cwd,
status = { env: {
status: "connected", ...process.env,
} ...(cmd === "opencode" ? { BUN_BE_BUN: "1" } : {}),
} catch (error) { ...mcp.environment,
log.error("local mcp startup failed", { },
key, })
command: mcp.command, transport.stderr?.on("data", (chunk: Buffer) => {
cwd, log.info(`mcp stderr: ${chunk.toString()}`, { key })
error: error instanceof Error ? error.message : String(error), })
})
status = { const connectTimeout = mcp.timeout ?? DEFAULT_TIMEOUT
status: "failed" as const, return yield* connectTransport(transport, connectTimeout).pipe(
error: error instanceof Error ? error.message : String(error), Effect.map((client): { client: MCPClient | undefined; status: Status } => ({ client, status: { status: "connected" } })),
} Effect.catch((error): Effect.Effect<{ client: MCPClient | undefined; status: Status }> => {
} const msg = error instanceof Error ? error.message : String(error)
log.error("local mcp startup failed", { key, command: mcp.command, cwd, error: msg })
return Effect.succeed({ client: undefined, status: { status: "failed", error: msg } })
}),
)
})
const create = Effect.fn("MCP.create")(function* (key: string, mcp: Config.Mcp) {
if (mcp.enabled === false) {
log.info("mcp server disabled", { key })
return DISABLED_RESULT
} }
if (!status) { log.info("found", { key, type: mcp.type })
status = {
status: "failed" as const, const { client: mcpClient, status } = mcp.type === "remote"
error: "Unknown error", ? yield* connectRemote(key, mcp as Config.Mcp & { type: "remote" })
} : yield* connectLocal(key, mcp as Config.Mcp & { type: "local" })
}
if (!mcpClient) { if (!mcpClient) {
return { return { status } satisfies CreateResult
mcpClient: undefined,
status,
}
} }
const listed = await defs(key, mcpClient, mcp.timeout) const listed = yield* defs(key, mcpClient, mcp.timeout)
if (!listed) { if (!listed) {
await mcpClient.close().catch((error) => { yield* Effect.tryPromise(() => mcpClient.close()).pipe(Effect.ignore)
log.error("Failed to close MCP client", { return { status: { status: "failed", error: "Failed to get tools" } } satisfies CreateResult
error,
})
})
return {
mcpClient: undefined,
status: { status: "failed" as const, error: "Failed to get tools" },
}
} }
log.info("create() successfully created client", { key, toolCount: listed.length }) log.info("create() successfully created client", { key, toolCount: listed.length })
return { return { mcpClient, status, defs: listed } satisfies CreateResult
mcpClient, })
status,
defs: listed,
}
}
// --- Effect Service --- // --- Effect Service ---
@ -463,20 +465,20 @@ export namespace MCP {
log.info("tools list changed notification received", { server: name }) log.info("tools list changed notification received", { server: name })
if (s.clients[name] !== client || s.status[name]?.status !== "connected") return if (s.clients[name] !== client || s.status[name]?.status !== "connected") return
const listed = await defs(name, client, timeout) const listed = await Effect.runPromise(defs(name, client, timeout))
if (!listed) return if (!listed) return
if (s.clients[name] !== client || s.status[name]?.status !== "connected") return if (s.clients[name] !== client || s.status[name]?.status !== "connected") return
s.defs[name] = listed s.defs[name] = listed
await Bus.publish(ToolsChanged, { server: name }).catch((error) => await Effect.runPromise(busPublish(ToolsChanged, { server: name }))
log.warn("failed to publish tools changed", { server: name, error }),
)
}) })
} }
const getConfig = () => Effect.promise(() => Config.get())
const cache = yield* InstanceState.make<State>( const cache = yield* InstanceState.make<State>(
Effect.fn("MCP.state")(function* () { Effect.fn("MCP.state")(function* () {
const cfg = yield* Effect.promise(() => Config.get()) const cfg = yield* getConfig()
const config = cfg.mcp ?? {} const config = cfg.mcp ?? {}
const s: State = { const s: State = {
status: {}, status: {},
@ -498,13 +500,15 @@ export namespace MCP {
return return
} }
const result = yield* Effect.promise(() => create(key, mcp).catch(() => undefined)) const result = yield* create(key, mcp).pipe(
Effect.catch(() => Effect.succeed(undefined)),
)
if (!result) return if (!result) return
s.status[key] = result.status s.status[key] = result.status
if (result.mcpClient) { if (result.mcpClient) {
s.clients[key] = result.mcpClient s.clients[key] = result.mcpClient
s.defs[key] = result.defs s.defs[key] = result.defs!
watch(s, key, result.mcpClient, mcp.timeout) watch(s, key, result.mcpClient, mcp.timeout)
} }
}), }),
@ -542,14 +546,12 @@ export namespace MCP {
const client = s.clients[name] const client = s.clients[name]
delete s.defs[name] delete s.defs[name]
if (!client) return Effect.void if (!client) return Effect.void
return Effect.promise(() => return Effect.tryPromise(() => client.close()).pipe(Effect.ignore)
client.close().catch((error: any) => log.error("failed to close MCP client", { name, error })),
)
} }
const status = Effect.fn("MCP.status")(function* () { const status = Effect.fn("MCP.status")(function* () {
const s = yield* InstanceState.get(cache) const s = yield* InstanceState.get(cache)
const cfg = yield* Effect.promise(() => Config.get()) const cfg = yield* getConfig()
const config = cfg.mcp ?? {} const config = cfg.mcp ?? {}
const result: Record<string, Status> = {} const result: Record<string, Status> = {}
@ -568,14 +570,7 @@ export namespace MCP {
const createAndStore = Effect.fn("MCP.createAndStore")(function* (name: string, mcp: Config.Mcp) { const createAndStore = Effect.fn("MCP.createAndStore")(function* (name: string, mcp: Config.Mcp) {
const s = yield* InstanceState.get(cache) const s = yield* InstanceState.get(cache)
const result = yield* Effect.promise(() => create(name, mcp)) const result = yield* create(name, mcp)
if (!result) {
yield* closeClient(s, name)
delete s.clients[name]
s.status[name] = { status: "failed" as const, error: "unknown error" }
return s.status[name]
}
s.status[name] = result.status s.status[name] = result.status
if (!result.mcpClient) { if (!result.mcpClient) {
@ -586,7 +581,7 @@ export namespace MCP {
yield* closeClient(s, name) yield* closeClient(s, name)
s.clients[name] = result.mcpClient s.clients[name] = result.mcpClient
s.defs[name] = result.defs s.defs[name] = result.defs!
watch(s, name, result.mcpClient, mcp.timeout) watch(s, name, result.mcpClient, mcp.timeout)
return result.status return result.status
}) })
@ -616,7 +611,7 @@ export namespace MCP {
const tools = Effect.fn("MCP.tools")(function* () { const tools = Effect.fn("MCP.tools")(function* () {
const result: Record<string, Tool> = {} const result: Record<string, Tool> = {}
const s = yield* InstanceState.get(cache) const s = yield* InstanceState.get(cache)
const cfg = yield* Effect.promise(() => Config.get()) const cfg = yield* getConfig()
const config = cfg.mcp ?? {} const config = cfg.mcp ?? {}
const defaultTimeout = cfg.experimental?.mcp_timeout const defaultTimeout = cfg.experimental?.mcp_timeout
@ -639,9 +634,7 @@ export namespace MCP {
const timeout = entry?.timeout ?? defaultTimeout const timeout = entry?.timeout ?? defaultTimeout
for (const mcpTool of listed) { for (const mcpTool of listed) {
const sanitizedClientName = clientName.replace(/[^a-zA-Z0-9_-]/g, "_") result[sanitize(clientName) + "_" + sanitize(mcpTool.name)] = convertMcpTool(mcpTool, client, timeout)
const sanitizedToolName = mcpTool.name.replace(/[^a-zA-Z0-9_-]/g, "_")
result[sanitizedClientName + "_" + sanitizedToolName] = convertMcpTool(mcpTool, client, timeout)
} }
}), }),
{ concurrency: "unbounded" }, { concurrency: "unbounded" },
@ -649,30 +642,29 @@ export namespace MCP {
return result return result
}) })
function collectFromConnected<T>( function collectFromConnected<T extends { name: string }>(
s: State, s: State,
fetchFn: (clientName: string, client: Client) => Promise<Record<string, T> | undefined>, listFn: (c: Client) => Promise<T[]>,
label: string,
) { ) {
return Effect.forEach( return Effect.forEach(
Object.entries(s.clients).filter(([name]) => s.status[name]?.status === "connected"), Object.entries(s.clients).filter(([name]) => s.status[name]?.status === "connected"),
([clientName, client]) => ([clientName, client]) =>
Effect.promise(async () => Object.entries((await fetchFn(clientName, client)) ?? {})), fetchFromClient(clientName, client, listFn, label).pipe(
Effect.map((items) => Object.entries(items ?? {})),
),
{ concurrency: "unbounded" }, { concurrency: "unbounded" },
).pipe(Effect.map((results) => Object.fromEntries<T>(results.flat()))) ).pipe(Effect.map((results) => Object.fromEntries<T & { client: string }>(results.flat())))
} }
const prompts = Effect.fn("MCP.prompts")(function* () { const prompts = Effect.fn("MCP.prompts")(function* () {
const s = yield* InstanceState.get(cache) const s = yield* InstanceState.get(cache)
return yield* collectFromConnected(s, (name, client) => return yield* collectFromConnected(s, (c) => c.listPrompts().then((r) => r.prompts), "prompts")
fetchFromClient(name, client, (c) => c.listPrompts().then((r) => r.prompts), "prompts"),
)
}) })
const resources = Effect.fn("MCP.resources")(function* () { const resources = Effect.fn("MCP.resources")(function* () {
const s = yield* InstanceState.get(cache) const s = yield* InstanceState.get(cache)
return yield* collectFromConnected(s, (name, client) => return yield* collectFromConnected(s, (c) => c.listResources().then((r) => r.resources), "resources")
fetchFromClient(name, client, (c) => c.listResources().then((r) => r.resources), "resources"),
)
}) })
const withClient = Effect.fnUntraced(function* <A>( const withClient = Effect.fnUntraced(function* <A>(
@ -713,7 +705,7 @@ export namespace MCP {
}) })
const getMcpConfig = Effect.fnUntraced(function* (mcpName: string) { const getMcpConfig = Effect.fnUntraced(function* (mcpName: string) {
const cfg = yield* Effect.promise(() => Config.get()) const cfg = yield* getConfig()
const mcpConfig = cfg.mcp?.[mcpName] const mcpConfig = cfg.mcp?.[mcpName]
if (!mcpConfig || !isMcpConfigured(mcpConfig)) return undefined if (!mcpConfig || !isMcpConfigured(mcpConfig)) return undefined
return mcpConfig return mcpConfig
@ -750,19 +742,21 @@ export namespace MCP {
const transport = new StreamableHTTPClientTransport(new URL(mcpConfig.url), { authProvider }) const transport = new StreamableHTTPClientTransport(new URL(mcpConfig.url), { authProvider })
return yield* Effect.promise(async () => { return yield* Effect.tryPromise({
try { try: () => {
const client = new Client({ name: "opencode", version: Installation.VERSION }) const client = new Client({ name: "opencode", version: Installation.VERSION })
await client.connect(transport) return client.connect(transport).then(() => ({ authorizationUrl: "", oauthState }))
return { authorizationUrl: "", oauthState } },
} catch (error) { catch: (error) => error,
}).pipe(
Effect.catch((error) => {
if (error instanceof UnauthorizedError && capturedUrl) { if (error instanceof UnauthorizedError && capturedUrl) {
pendingOAuthTransports.set(mcpName, transport) pendingOAuthTransports.set(mcpName, transport)
return { authorizationUrl: capturedUrl.toString(), oauthState } return Effect.succeed({ authorizationUrl: capturedUrl.toString(), oauthState })
} }
throw error return Effect.die(error)
} }),
}) )
}) })
const authenticate = Effect.fn("MCP.authenticate")(function* (mcpName: string) { const authenticate = Effect.fn("MCP.authenticate")(function* (mcpName: string) {
@ -791,7 +785,7 @@ export namespace MCP {
), ),
Effect.catch(() => { Effect.catch(() => {
log.warn("failed to open browser, user must open URL manually", { mcpName }) log.warn("failed to open browser, user must open URL manually", { mcpName })
return Effect.promise(() => Bus.publish(BrowserOpenFailed, { mcpName, url: authorizationUrl })) return busPublish(BrowserOpenFailed, { mcpName, url: authorizationUrl })
}), }),
) )
@ -811,10 +805,7 @@ export namespace MCP {
if (!transport) throw new Error(`No pending OAuth flow for MCP server: ${mcpName}`) if (!transport) throw new Error(`No pending OAuth flow for MCP server: ${mcpName}`)
const result = yield* Effect.tryPromise({ const result = yield* Effect.tryPromise({
try: async () => { try: () => transport.finishAuth(authorizationCode).then(() => true as const),
await transport.finishAuth(authorizationCode)
return true
},
catch: (error) => { catch: (error) => {
log.error("failed to finish oauth", { mcpName, error }) log.error("failed to finish oauth", { mcpName, error })
return error return error

View File

@ -19,9 +19,12 @@ interface MockClientState {
const clientStates = new Map<string, MockClientState>() const clientStates = new Map<string, MockClientState>()
let lastCreatedClientName: string | undefined let lastCreatedClientName: string | undefined
let connectShouldFail = false let connectShouldFail = false
let connectShouldHang = false
let connectError = "Mock transport cannot connect" let connectError = "Mock transport cannot connect"
// Tracks how many Client instances were created (detects leaks) // Tracks how many Client instances were created (detects leaks)
let clientCreateCount = 0 let clientCreateCount = 0
// Tracks how many times transport.close() is called across all mock transports
let transportCloseCount = 0
function getOrCreateClientState(name?: string): MockClientState { function getOrCreateClientState(name?: string): MockClientState {
const key = name ?? "default" const key = name ?? "default"
@ -44,32 +47,42 @@ function getOrCreateClientState(name?: string): MockClientState {
return state return state
} }
// Mock transport that succeeds or fails based on connectShouldFail // Mock transport that succeeds or fails based on connectShouldFail / connectShouldHang
class MockStdioTransport { class MockStdioTransport {
stderr: null = null stderr: null = null
pid = 12345 pid = 12345
constructor(_opts: any) {} constructor(_opts: any) {}
async start() { async start() {
if (connectShouldHang) return new Promise<void>(() => {}) // never resolves
if (connectShouldFail) throw new Error(connectError) if (connectShouldFail) throw new Error(connectError)
if (connectShouldHang) await new Promise(() => {}) // never resolves
}
async close() {
transportCloseCount++
} }
async close() {}
} }
class MockStreamableHTTP { class MockStreamableHTTP {
constructor(_url: URL, _opts?: any) {} constructor(_url: URL, _opts?: any) {}
async start() { async start() {
if (connectShouldHang) return new Promise<void>(() => {}) // never resolves
if (connectShouldFail) throw new Error(connectError) if (connectShouldFail) throw new Error(connectError)
} }
async close() {} async close() {
transportCloseCount++
}
async finishAuth() {} async finishAuth() {}
} }
class MockSSE { class MockSSE {
constructor(_url: URL, _opts?: any) {} constructor(_url: URL, _opts?: any) {}
async start() { async start() {
throw new Error("SSE fallback - not used in these tests") if (connectShouldHang) return new Promise<void>(() => {}) // never resolves
if (connectShouldFail) throw new Error(connectError)
}
async close() {
transportCloseCount++
} }
async close() {}
} }
mock.module("@modelcontextprotocol/sdk/client/stdio.js", () => ({ mock.module("@modelcontextprotocol/sdk/client/stdio.js", () => ({
@ -145,8 +158,10 @@ beforeEach(() => {
clientStates.clear() clientStates.clear()
lastCreatedClientName = undefined lastCreatedClientName = undefined
connectShouldFail = false connectShouldFail = false
connectShouldHang = false
connectError = "Mock transport cannot connect" connectError = "Mock transport cannot connect"
clientCreateCount = 0 clientCreateCount = 0
transportCloseCount = 0
}) })
// Import after mocks // Import after mocks
@ -658,3 +673,79 @@ test(
}, },
), ),
) )
// ========================================================================
// Test: transport leak — local stdio timeout (#19168)
// ========================================================================
test(
"local stdio transport is closed when connect times out (no process leak)",
withInstance({}, async () => {
lastCreatedClientName = "hanging-server"
getOrCreateClientState("hanging-server")
connectShouldHang = true
const addResult = await MCP.add("hanging-server", {
type: "local",
command: ["node", "fake.js"],
timeout: 100,
})
const serverStatus = (addResult.status as any)["hanging-server"] ?? addResult.status
expect(serverStatus.status).toBe("failed")
expect(serverStatus.error).toContain("timed out")
// Transport must be closed to avoid orphaned child process
expect(transportCloseCount).toBeGreaterThanOrEqual(1)
}),
)
// ========================================================================
// Test: transport leak — remote timeout (#19168)
// ========================================================================
test(
"remote transport is closed when connect times out",
withInstance({}, async () => {
lastCreatedClientName = "hanging-remote"
getOrCreateClientState("hanging-remote")
connectShouldHang = true
const addResult = await MCP.add("hanging-remote", {
type: "remote",
url: "http://localhost:9999/mcp",
timeout: 100,
oauth: false,
})
const serverStatus = (addResult.status as any)["hanging-remote"] ?? addResult.status
expect(serverStatus.status).toBe("failed")
// Transport must be closed to avoid leaked HTTP connections
expect(transportCloseCount).toBeGreaterThanOrEqual(1)
}),
)
// ========================================================================
// Test: transport leak — failed remote transports not closed (#19168)
// ========================================================================
test(
"failed remote transport is closed before trying next transport",
withInstance({}, async () => {
lastCreatedClientName = "fail-remote"
getOrCreateClientState("fail-remote")
connectShouldFail = true
connectError = "Connection refused"
const addResult = await MCP.add("fail-remote", {
type: "remote",
url: "http://localhost:9999/mcp",
timeout: 5000,
oauth: false,
})
const serverStatus = (addResult.status as any)["fail-remote"] ?? addResult.status
expect(serverStatus.status).toBe("failed")
// Both StreamableHTTP and SSE transports should be closed
expect(transportCloseCount).toBeGreaterThanOrEqual(2)
}),
)