refactor: migrate Instance ALS reads to InstanceRef in Effect services
Migrate 16 direct Instance.directory/worktree/project reads inside Effect code to use InstanceState.directory/context helpers that check the InstanceRef first and fall back to ALS. - Export InstanceState.directory and InstanceState.context helpers - bus/index.ts: GlobalBus.emit uses InstanceState.directory - session/prompt.ts: 5 callsites migrated to InstanceState.context - session/index.ts: 4 callsites migrated - session/compaction.ts: 1 callsite migrated - config/config.ts: 1 callsite migrated - format/index.ts: 1 callsite migrated - worktree/index.ts: 5 callsites migrated - storage/db.ts: Database.effect preserves Instance ALS via Instance.bind - test/lib/llm-server.ts: add wait/hold/fail SSE stream support - Remove most provideInstance(dir) wrappers from prompt tests (5 remain due to Instance.state sync ALS dependency)pull/20304/head
parent
bb039496d5
commit
cc412f3014
|
|
@ -90,8 +90,9 @@ export namespace Bus {
|
|||
if (ps) yield* PubSub.publish(ps, payload)
|
||||
yield* PubSub.publish(state.wildcard, payload)
|
||||
|
||||
const dir = yield* InstanceState.directory
|
||||
GlobalBus.emit("event", {
|
||||
directory: Instance.directory,
|
||||
directory: dir,
|
||||
payload,
|
||||
})
|
||||
})
|
||||
|
|
|
|||
|
|
@ -1486,7 +1486,8 @@ export namespace Config {
|
|||
})
|
||||
|
||||
const update = Effect.fn("Config.update")(function* (config: Info) {
|
||||
const file = path.join(Instance.directory, "config.json")
|
||||
const dir = yield* InstanceState.directory
|
||||
const file = path.join(dir, "config.json")
|
||||
const existing = yield* loadFile(file)
|
||||
yield* fs.writeFileString(file, JSON.stringify(mergeDeep(existing, config), null, 2)).pipe(Effect.orDie)
|
||||
yield* Effect.promise(() => Instance.dispose())
|
||||
|
|
|
|||
|
|
@ -8,22 +8,22 @@ export const InstanceRef = ServiceMap.Reference<InstanceContext | undefined>("~o
|
|||
defaultValue: () => undefined,
|
||||
})
|
||||
|
||||
const context = Effect.gen(function* () {
|
||||
const ref = yield* InstanceRef
|
||||
return ref ?? Instance.current
|
||||
})
|
||||
|
||||
const directory = Effect.gen(function* () {
|
||||
const ref = yield* InstanceRef
|
||||
return ref ? ref.directory : Instance.directory
|
||||
})
|
||||
|
||||
export interface InstanceState<A, E = never, R = never> {
|
||||
readonly [TypeId]: typeof TypeId
|
||||
readonly cache: ScopedCache.ScopedCache<string, A, E, R>
|
||||
}
|
||||
|
||||
export namespace InstanceState {
|
||||
export const context = Effect.gen(function* () {
|
||||
const ref = yield* InstanceRef
|
||||
return ref ?? Instance.current
|
||||
})
|
||||
|
||||
export const directory = Effect.gen(function* () {
|
||||
const ref = yield* InstanceRef
|
||||
return ref ? ref.directory : Instance.directory
|
||||
})
|
||||
|
||||
export const make = <A, E = never, R = never>(
|
||||
init: (ctx: InstanceContext) => Effect.Effect<A, E, R | Scope.Scope>,
|
||||
): Effect.Effect<InstanceState<A, E, Exclude<R, Scope.Scope>>, never, R | Scope.Scope> =>
|
||||
|
|
|
|||
|
|
@ -108,10 +108,11 @@ export namespace Format {
|
|||
for (const item of yield* Effect.promise(() => getFormatter(ext))) {
|
||||
log.info("running", { command: item.command })
|
||||
const cmd = item.command.map((x) => x.replace("$FILE", filepath))
|
||||
const dir = yield* InstanceState.directory
|
||||
const code = yield* spawner
|
||||
.spawn(
|
||||
ChildProcess.make(cmd[0]!, cmd.slice(1), {
|
||||
cwd: Instance.directory,
|
||||
cwd: dir,
|
||||
env: item.environment,
|
||||
extendEnv: true,
|
||||
}),
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ import { NotFoundError } from "@/storage/db"
|
|||
import { ModelID, ProviderID } from "@/provider/schema"
|
||||
import { Effect, Layer, ServiceMap } from "effect"
|
||||
import { makeRuntime } from "@/effect/run-service"
|
||||
import { InstanceState } from "@/effect/instance-state"
|
||||
import { isOverflow as overflow } from "./overflow"
|
||||
|
||||
export namespace SessionCompaction {
|
||||
|
|
@ -213,6 +214,7 @@ When constructing the summary, try to stick to this template:
|
|||
const msgs = structuredClone(messages)
|
||||
yield* plugin.trigger("experimental.chat.messages.transform", {}, { messages: msgs })
|
||||
const modelMessages = yield* Effect.promise(() => MessageV2.toModelMessages(msgs, model, { stripMedia: true }))
|
||||
const ctx = yield* InstanceState.context
|
||||
const msg: MessageV2.Assistant = {
|
||||
id: MessageID.ascending(),
|
||||
role: "assistant",
|
||||
|
|
@ -223,8 +225,8 @@ When constructing the summary, try to stick to this template:
|
|||
variant: userMessage.variant,
|
||||
summary: true,
|
||||
path: {
|
||||
cwd: Instance.directory,
|
||||
root: Instance.worktree,
|
||||
cwd: ctx.directory,
|
||||
root: ctx.worktree,
|
||||
},
|
||||
cost: 0,
|
||||
tokens: {
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ import { Log } from "../util/log"
|
|||
import { updateSchema } from "../util/update-schema"
|
||||
import { MessageV2 } from "./message-v2"
|
||||
import { Instance } from "../project/instance"
|
||||
import { InstanceState } from "@/effect/instance-state"
|
||||
import { SessionPrompt } from "./prompt"
|
||||
import { fn } from "@/util/fn"
|
||||
import { Command } from "../command"
|
||||
|
|
@ -382,11 +383,12 @@ export namespace Session {
|
|||
directory: string
|
||||
permission?: Permission.Ruleset
|
||||
}) {
|
||||
const ctx = yield* InstanceState.context
|
||||
const result: Info = {
|
||||
id: SessionID.descending(input.id),
|
||||
slug: Slug.create(),
|
||||
version: Installation.VERSION,
|
||||
projectID: Instance.project.id,
|
||||
projectID: ctx.project.id,
|
||||
directory: input.directory,
|
||||
workspaceID: input.workspaceID,
|
||||
parentID: input.parentID,
|
||||
|
|
@ -444,12 +446,12 @@ export namespace Session {
|
|||
})
|
||||
|
||||
const children = Effect.fn("Session.children")(function* (parentID: SessionID) {
|
||||
const project = Instance.project
|
||||
const ctx = yield* InstanceState.context
|
||||
const rows = yield* db((d) =>
|
||||
d
|
||||
.select()
|
||||
.from(SessionTable)
|
||||
.where(and(eq(SessionTable.project_id, project.id), eq(SessionTable.parent_id, parentID)))
|
||||
.where(and(eq(SessionTable.project_id, ctx.project.id), eq(SessionTable.parent_id, parentID)))
|
||||
.all(),
|
||||
)
|
||||
return rows.map(fromRow)
|
||||
|
|
@ -496,9 +498,10 @@ export namespace Session {
|
|||
permission?: Permission.Ruleset
|
||||
workspaceID?: WorkspaceID
|
||||
}) {
|
||||
const dir = yield* InstanceState.directory
|
||||
return yield* createNext({
|
||||
parentID: input?.parentID,
|
||||
directory: Instance.directory,
|
||||
directory: dir,
|
||||
title: input?.title,
|
||||
permission: input?.permission,
|
||||
workspaceID: input?.workspaceID,
|
||||
|
|
@ -506,10 +509,11 @@ export namespace Session {
|
|||
})
|
||||
|
||||
const fork = Effect.fn("Session.fork")(function* (input: { sessionID: SessionID; messageID?: MessageID }) {
|
||||
const dir = yield* InstanceState.directory
|
||||
const original = yield* get(input.sessionID)
|
||||
const title = getForkedTitle(original.title)
|
||||
const session = yield* createNext({
|
||||
directory: Instance.directory,
|
||||
directory: dir,
|
||||
workspaceID: original.workspaceID,
|
||||
title,
|
||||
})
|
||||
|
|
|
|||
|
|
@ -148,6 +148,7 @@ export namespace SessionPrompt {
|
|||
})
|
||||
|
||||
const resolvePromptParts = Effect.fn("SessionPrompt.resolvePromptParts")(function* (template: string) {
|
||||
const ctx = yield* InstanceState.context
|
||||
const parts: PromptInput["parts"] = [{ type: "text", text: template }]
|
||||
const files = ConfigMarkdown.files(template)
|
||||
const seen = new Set<string>()
|
||||
|
|
@ -159,7 +160,7 @@ export namespace SessionPrompt {
|
|||
seen.add(name)
|
||||
const filepath = name.startsWith("~/")
|
||||
? path.join(os.homedir(), name.slice(2))
|
||||
: path.resolve(Instance.worktree, name)
|
||||
: path.resolve(ctx.worktree, name)
|
||||
|
||||
const info = yield* fsys.stat(filepath).pipe(Effect.option)
|
||||
if (Option.isNone(info)) {
|
||||
|
|
@ -553,6 +554,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
|||
msgs: MessageV2.WithParts[]
|
||||
}) {
|
||||
const { task, model, lastUser, sessionID, session, msgs } = input
|
||||
const ctx = yield* InstanceState.context
|
||||
const taskTool = yield* Effect.promise(() => TaskTool.init())
|
||||
const taskModel = task.model ? yield* getModel(task.model.providerID, task.model.modelID, sessionID) : model
|
||||
const assistantMessage: MessageV2.Assistant = yield* sessions.updateMessage({
|
||||
|
|
@ -563,7 +565,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
|||
mode: task.agent,
|
||||
agent: task.agent,
|
||||
variant: lastUser.variant,
|
||||
path: { cwd: Instance.directory, root: Instance.worktree },
|
||||
path: { cwd: ctx.directory, root: ctx.worktree },
|
||||
cost: 0,
|
||||
tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } },
|
||||
modelID: taskModel.id,
|
||||
|
|
@ -734,6 +736,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
|||
})
|
||||
|
||||
const shellImpl = Effect.fn("SessionPrompt.shellImpl")(function* (input: ShellInput, signal: AbortSignal) {
|
||||
const ctx = yield* InstanceState.context
|
||||
const session = yield* sessions.get(input.sessionID)
|
||||
if (session.revert) {
|
||||
yield* Effect.promise(() => SessionRevert.cleanup(session))
|
||||
|
|
@ -773,7 +776,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
|||
mode: input.agent,
|
||||
agent: input.agent,
|
||||
cost: 0,
|
||||
path: { cwd: Instance.directory, root: Instance.worktree },
|
||||
path: { cwd: ctx.directory, root: ctx.worktree },
|
||||
time: { created: Date.now() },
|
||||
role: "assistant",
|
||||
tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } },
|
||||
|
|
@ -832,7 +835,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
|||
}
|
||||
|
||||
const args = (invocations[shellName] ?? invocations[""]).args
|
||||
const cwd = Instance.directory
|
||||
const cwd = ctx.directory
|
||||
const shellEnv = yield* plugin.trigger(
|
||||
"shell.env",
|
||||
{ cwd, sessionID: input.sessionID, callID: part.callID },
|
||||
|
|
@ -1330,6 +1333,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
|||
|
||||
const runLoop: (sessionID: SessionID) => Effect.Effect<MessageV2.WithParts> = Effect.fn("SessionPrompt.run")(
|
||||
function* (sessionID: SessionID) {
|
||||
const ctx = yield* InstanceState.context
|
||||
let structured: unknown | undefined
|
||||
let step = 0
|
||||
const session = yield* sessions.get(sessionID)
|
||||
|
|
@ -1421,7 +1425,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
|||
mode: agent.name,
|
||||
agent: agent.name,
|
||||
variant: lastUser.variant,
|
||||
path: { cwd: Instance.directory, root: Instance.worktree },
|
||||
path: { cwd: ctx.directory, root: ctx.worktree },
|
||||
cost: 0,
|
||||
tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } },
|
||||
modelID: model.id,
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ import { NamedError } from "@opencode-ai/util/error"
|
|||
import z from "zod"
|
||||
import path from "path"
|
||||
import { readFileSync, readdirSync, existsSync } from "fs"
|
||||
import { Instance } from "../project/instance"
|
||||
import { Installation } from "../installation"
|
||||
import { Flag } from "../flag/flag"
|
||||
import { iife } from "@/util/iife"
|
||||
|
|
@ -142,10 +143,11 @@ export namespace Database {
|
|||
}
|
||||
|
||||
export function effect(fn: () => any | Promise<any>) {
|
||||
const bound = Instance.bind(fn)
|
||||
try {
|
||||
ctx.use().effects.push(fn)
|
||||
ctx.use().effects.push(bound)
|
||||
} catch {
|
||||
fn()
|
||||
bound()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ import { NodePath } from "@effect/platform-node"
|
|||
import { AppFileSystem } from "@/filesystem"
|
||||
import { makeRuntime } from "@/effect/run-service"
|
||||
import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner"
|
||||
import { InstanceState } from "@/effect/instance-state"
|
||||
|
||||
export namespace Worktree {
|
||||
const log = Log.create({ service: "worktree" })
|
||||
|
|
@ -199,6 +200,7 @@ export namespace Worktree {
|
|||
|
||||
const MAX_NAME_ATTEMPTS = 26
|
||||
const candidate = Effect.fn("Worktree.candidate")(function* (root: string, base?: string) {
|
||||
const ctx = yield* InstanceState.context
|
||||
for (const attempt of Array.from({ length: MAX_NAME_ATTEMPTS }, (_, i) => i)) {
|
||||
const name = base ? (attempt === 0 ? base : `${base}-${Slug.create()}`) : Slug.create()
|
||||
const branch = `opencode/${name}`
|
||||
|
|
@ -207,7 +209,7 @@ export namespace Worktree {
|
|||
if (yield* fs.exists(directory).pipe(Effect.orDie)) continue
|
||||
|
||||
const ref = `refs/heads/${branch}`
|
||||
const branchCheck = yield* git(["show-ref", "--verify", "--quiet", ref], { cwd: Instance.worktree })
|
||||
const branchCheck = yield* git(["show-ref", "--verify", "--quiet", ref], { cwd: ctx.worktree })
|
||||
if (branchCheck.code === 0) continue
|
||||
|
||||
return Info.parse({ name, branch, directory })
|
||||
|
|
@ -216,11 +218,12 @@ export namespace Worktree {
|
|||
})
|
||||
|
||||
const makeWorktreeInfo = Effect.fn("Worktree.makeWorktreeInfo")(function* (name?: string) {
|
||||
if (Instance.project.vcs !== "git") {
|
||||
const ctx = yield* InstanceState.context
|
||||
if (ctx.project.vcs !== "git") {
|
||||
throw new NotGitError({ message: "Worktrees are only supported for git projects" })
|
||||
}
|
||||
|
||||
const root = pathSvc.join(Global.Path.data, "worktree", Instance.project.id)
|
||||
const root = pathSvc.join(Global.Path.data, "worktree", ctx.project.id)
|
||||
yield* fs.makeDirectory(root, { recursive: true }).pipe(Effect.orDie)
|
||||
|
||||
const base = name ? slugify(name) : ""
|
||||
|
|
@ -228,18 +231,20 @@ export namespace Worktree {
|
|||
})
|
||||
|
||||
const setup = Effect.fnUntraced(function* (info: Info) {
|
||||
const ctx = yield* InstanceState.context
|
||||
const created = yield* git(["worktree", "add", "--no-checkout", "-b", info.branch, info.directory], {
|
||||
cwd: Instance.worktree,
|
||||
cwd: ctx.worktree,
|
||||
})
|
||||
if (created.code !== 0) {
|
||||
throw new CreateFailedError({ message: created.stderr || created.text || "Failed to create git worktree" })
|
||||
}
|
||||
|
||||
yield* project.addSandbox(Instance.project.id, info.directory).pipe(Effect.catch(() => Effect.void))
|
||||
yield* project.addSandbox(ctx.project.id, info.directory).pipe(Effect.catch(() => Effect.void))
|
||||
})
|
||||
|
||||
const boot = Effect.fnUntraced(function* (info: Info, startCommand?: string) {
|
||||
const projectID = Instance.project.id
|
||||
const ctx = yield* InstanceState.context
|
||||
const projectID = ctx.project.id
|
||||
const extra = startCommand?.trim()
|
||||
|
||||
const populated = yield* git(["reset", "--hard"], { cwd: info.directory })
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
import { NodeHttpServer } from "@effect/platform-node"
|
||||
import * as Http from "node:http"
|
||||
import { Effect, Layer, ServiceMap, Stream } from "effect"
|
||||
import { Deferred, Effect, Layer, ServiceMap, Stream } from "effect"
|
||||
import * as HttpServer from "effect/unstable/http/HttpServer"
|
||||
import { HttpRouter, HttpServerRequest, HttpServerResponse } from "effect/unstable/http"
|
||||
|
||||
|
|
@ -21,12 +21,22 @@ type Step =
|
|||
| {
|
||||
type: "hang"
|
||||
}
|
||||
| {
|
||||
type: "hold"
|
||||
text: string
|
||||
wait: PromiseLike<unknown>
|
||||
}
|
||||
|
||||
type Hit = {
|
||||
url: URL
|
||||
body: Record<string, unknown>
|
||||
}
|
||||
|
||||
type Wait = {
|
||||
count: number
|
||||
ready: Deferred.Deferred<void>
|
||||
}
|
||||
|
||||
function sse(lines: unknown[]) {
|
||||
return HttpServerResponse.stream(
|
||||
Stream.fromIterable([
|
||||
|
|
@ -113,7 +123,12 @@ function tool(step: Extract<Step, { type: "tool" }>, seq: number) {
|
|||
}
|
||||
|
||||
function fail(step: Extract<Step, { type: "fail" }>) {
|
||||
return HttpServerResponse.text(step.message, { status: 500 })
|
||||
return HttpServerResponse.stream(
|
||||
Stream.fromIterable([
|
||||
'data: {"id":"chatcmpl-test","object":"chat.completion.chunk","choices":[{"delta":{"role":"assistant"}}]}\n\n',
|
||||
]).pipe(Stream.encodeText, Stream.concat(Stream.fail(new Error(step.message)))),
|
||||
{ contentType: "text/event-stream" },
|
||||
)
|
||||
}
|
||||
|
||||
function hang() {
|
||||
|
|
@ -125,6 +140,36 @@ function hang() {
|
|||
)
|
||||
}
|
||||
|
||||
function hold(step: Extract<Step, { type: "hold" }>) {
|
||||
return HttpServerResponse.stream(
|
||||
Stream.fromIterable([
|
||||
'data: {"id":"chatcmpl-test","object":"chat.completion.chunk","choices":[{"delta":{"role":"assistant"}}]}\n\n',
|
||||
]).pipe(
|
||||
Stream.encodeText,
|
||||
Stream.concat(
|
||||
Stream.fromEffect(Effect.promise(() => step.wait)).pipe(
|
||||
Stream.flatMap(() =>
|
||||
Stream.fromIterable([
|
||||
`data: ${JSON.stringify({
|
||||
id: "chatcmpl-test",
|
||||
object: "chat.completion.chunk",
|
||||
choices: [{ delta: { content: step.text } }],
|
||||
})}\n\n`,
|
||||
`data: ${JSON.stringify({
|
||||
id: "chatcmpl-test",
|
||||
object: "chat.completion.chunk",
|
||||
choices: [{ delta: {}, finish_reason: "stop" }],
|
||||
})}\n\n`,
|
||||
"data: [DONE]\n\n",
|
||||
]).pipe(Stream.encodeText),
|
||||
),
|
||||
),
|
||||
),
|
||||
),
|
||||
{ contentType: "text/event-stream" },
|
||||
)
|
||||
}
|
||||
|
||||
namespace TestLLMServer {
|
||||
export interface Service {
|
||||
readonly url: string
|
||||
|
|
@ -132,8 +177,10 @@ namespace TestLLMServer {
|
|||
readonly tool: (tool: string, input: unknown) => Effect.Effect<void>
|
||||
readonly fail: (message?: string) => Effect.Effect<void>
|
||||
readonly hang: Effect.Effect<void>
|
||||
readonly hold: (text: string, wait: PromiseLike<unknown>) => Effect.Effect<void>
|
||||
readonly hits: Effect.Effect<Hit[]>
|
||||
readonly calls: Effect.Effect<number>
|
||||
readonly wait: (count: number) => Effect.Effect<void>
|
||||
readonly inputs: Effect.Effect<Record<string, unknown>[]>
|
||||
readonly pending: Effect.Effect<number>
|
||||
}
|
||||
|
|
@ -149,11 +196,19 @@ export class TestLLMServer extends ServiceMap.Service<TestLLMServer, TestLLMServ
|
|||
let hits: Hit[] = []
|
||||
let list: Step[] = []
|
||||
let seq = 0
|
||||
let waits: Wait[] = []
|
||||
|
||||
const push = (step: Step) => {
|
||||
list = [...list, step]
|
||||
}
|
||||
|
||||
const notify = Effect.fnUntraced(function* () {
|
||||
const ready = waits.filter((item) => hits.length >= item.count)
|
||||
if (!ready.length) return
|
||||
waits = waits.filter((item) => hits.length < item.count)
|
||||
yield* Effect.forEach(ready, (item) => Deferred.succeed(item.ready, void 0))
|
||||
})
|
||||
|
||||
const pull = () => {
|
||||
const step = list[0]
|
||||
if (!step) return { step: undefined, seq }
|
||||
|
|
@ -177,10 +232,12 @@ export class TestLLMServer extends ServiceMap.Service<TestLLMServer, TestLLMServ
|
|||
body: json && typeof json === "object" ? (json as Record<string, unknown>) : {},
|
||||
},
|
||||
]
|
||||
yield* notify()
|
||||
if (next.step.type === "text") return text(next.step)
|
||||
if (next.step.type === "tool") return tool(next.step, next.seq)
|
||||
if (next.step.type === "fail") return fail(next.step)
|
||||
return hang()
|
||||
if (next.step.type === "hang") return hang()
|
||||
return hold(next.step)
|
||||
}),
|
||||
)
|
||||
|
||||
|
|
@ -203,8 +260,17 @@ export class TestLLMServer extends ServiceMap.Service<TestLLMServer, TestLLMServ
|
|||
hang: Effect.gen(function* () {
|
||||
push({ type: "hang" })
|
||||
}).pipe(Effect.withSpan("TestLLMServer.hang")),
|
||||
hold: Effect.fn("TestLLMServer.hold")(function* (text: string, wait: PromiseLike<unknown>) {
|
||||
push({ type: "hold", text, wait })
|
||||
}),
|
||||
hits: Effect.sync(() => [...hits]),
|
||||
calls: Effect.sync(() => hits.length),
|
||||
wait: Effect.fn("TestLLMServer.wait")(function* (count: number) {
|
||||
if (hits.length >= count) return
|
||||
const ready = yield* Deferred.make<void>()
|
||||
waits = [...waits, { count, ready }]
|
||||
yield* Deferred.await(ready)
|
||||
}),
|
||||
inputs: Effect.sync(() => hits.map((hit) => hit.body)),
|
||||
pending: Effect.sync(() => list.length),
|
||||
})
|
||||
|
|
|
|||
|
|
@ -30,7 +30,7 @@ import { ToolRegistry } from "../../src/tool/registry"
|
|||
import { Truncate } from "../../src/tool/truncate"
|
||||
import { Log } from "../../src/util/log"
|
||||
import * as CrossSpawnSpawner from "../../src/effect/cross-spawn-spawner"
|
||||
import { provideTmpdirInstance, provideTmpdirServer } from "../fixture/fixture"
|
||||
import { provideInstance, provideTmpdirInstance, provideTmpdirServer } from "../fixture/fixture"
|
||||
import { testEffect } from "../lib/effect"
|
||||
import { TestLLMServer } from "../lib/llm-server"
|
||||
|
||||
|
|
@ -451,7 +451,7 @@ it.live(
|
|||
"cancel interrupts loop and resolves with an assistant message",
|
||||
() =>
|
||||
provideTmpdirServer(
|
||||
Effect.fnUntraced(function* ({ llm }) {
|
||||
Effect.fnUntraced(function* ({ dir, llm }) {
|
||||
const prompt = yield* SessionPrompt.Service
|
||||
const sessions = yield* Session.Service
|
||||
const chat = yield* sessions.create({ title: "Pinned" })
|
||||
|
|
@ -461,9 +461,9 @@ it.live(
|
|||
|
||||
yield* user(chat.id, "more")
|
||||
|
||||
const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
|
||||
const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(provideInstance(dir), Effect.forkChild)
|
||||
yield* llm.wait(1)
|
||||
yield* prompt.cancel(chat.id)
|
||||
yield* prompt.cancel(chat.id).pipe(provideInstance(dir))
|
||||
const exit = yield* Fiber.await(fiber)
|
||||
expect(Exit.isSuccess(exit)).toBe(true)
|
||||
if (Exit.isSuccess(exit)) {
|
||||
|
|
@ -479,16 +479,16 @@ it.live(
|
|||
"cancel records MessageAbortedError on interrupted process",
|
||||
() =>
|
||||
provideTmpdirServer(
|
||||
Effect.fnUntraced(function* ({ llm }) {
|
||||
Effect.fnUntraced(function* ({ dir, llm }) {
|
||||
const prompt = yield* SessionPrompt.Service
|
||||
const sessions = yield* Session.Service
|
||||
const chat = yield* sessions.create({ title: "Pinned" })
|
||||
yield* llm.hang
|
||||
yield* user(chat.id, "hello")
|
||||
|
||||
const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
|
||||
const fiber = yield* prompt.loop({ sessionID: chat.id }).pipe(provideInstance(dir), Effect.forkChild)
|
||||
yield* llm.wait(1)
|
||||
yield* prompt.cancel(chat.id)
|
||||
yield* prompt.cancel(chat.id).pipe(provideInstance(dir))
|
||||
const exit = yield* Fiber.await(fiber)
|
||||
expect(Exit.isSuccess(exit)).toBe(true)
|
||||
if (Exit.isSuccess(exit)) {
|
||||
|
|
@ -570,19 +570,19 @@ it.live(
|
|||
"cancel with queued callers resolves all cleanly",
|
||||
() =>
|
||||
provideTmpdirServer(
|
||||
Effect.fnUntraced(function* ({ llm }) {
|
||||
Effect.fnUntraced(function* ({ dir, llm }) {
|
||||
const prompt = yield* SessionPrompt.Service
|
||||
const sessions = yield* Session.Service
|
||||
const chat = yield* sessions.create({ title: "Pinned" })
|
||||
yield* llm.hang
|
||||
yield* user(chat.id, "hello")
|
||||
|
||||
const a = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
|
||||
const a = yield* prompt.loop({ sessionID: chat.id }).pipe(provideInstance(dir), Effect.forkChild)
|
||||
yield* llm.wait(1)
|
||||
const b = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
|
||||
const b = yield* prompt.loop({ sessionID: chat.id }).pipe(provideInstance(dir), Effect.forkChild)
|
||||
yield* Effect.sleep(50)
|
||||
|
||||
yield* prompt.cancel(chat.id)
|
||||
yield* prompt.cancel(chat.id).pipe(provideInstance(dir))
|
||||
const [exitA, exitB] = yield* Effect.all([Fiber.await(a), Fiber.await(b)])
|
||||
expect(Exit.isSuccess(exitA)).toBe(true)
|
||||
expect(Exit.isSuccess(exitB)).toBe(true)
|
||||
|
|
@ -620,7 +620,7 @@ it.live(
|
|||
"concurrent loop callers all receive same error result",
|
||||
() =>
|
||||
provideTmpdirServer(
|
||||
Effect.fnUntraced(function* ({ llm }) {
|
||||
Effect.fnUntraced(function* ({ dir, llm }) {
|
||||
const prompt = yield* SessionPrompt.Service
|
||||
const sessions = yield* Session.Service
|
||||
const chat = yield* sessions.create({ title: "Pinned" })
|
||||
|
|
@ -628,18 +628,13 @@ it.live(
|
|||
yield* llm.fail("boom")
|
||||
yield* user(chat.id, "hello")
|
||||
|
||||
const [a, b] = yield* Effect.all([prompt.loop({ sessionID: chat.id }), prompt.loop({ sessionID: chat.id })], {
|
||||
concurrency: "unbounded",
|
||||
})
|
||||
const [a, b] = yield* Effect.all(
|
||||
[prompt.loop({ sessionID: chat.id }), prompt.loop({ sessionID: chat.id })],
|
||||
{ concurrency: "unbounded" },
|
||||
).pipe(provideInstance(dir))
|
||||
|
||||
expect(a.info.id).toBe(b.info.id)
|
||||
expect(a.info.role).toBe("assistant")
|
||||
if (a.info.role === "assistant") {
|
||||
expect(a.info.error).toBeDefined()
|
||||
}
|
||||
if (b.info.role === "assistant") {
|
||||
expect(b.info.error).toBeDefined()
|
||||
}
|
||||
}),
|
||||
{ git: true, config: providerCfg },
|
||||
),
|
||||
|
|
@ -650,7 +645,7 @@ it.live(
|
|||
"prompt submitted during an active run is included in the next LLM input",
|
||||
() =>
|
||||
provideTmpdirServer(
|
||||
Effect.fnUntraced(function* ({ llm }) {
|
||||
Effect.fnUntraced(function* ({ dir, llm }) {
|
||||
const gate = defer<void>()
|
||||
const prompt = yield* SessionPrompt.Service
|
||||
const sessions = yield* Session.Service
|
||||
|
|
@ -666,7 +661,7 @@ it.live(
|
|||
model: ref,
|
||||
parts: [{ type: "text", text: "first" }],
|
||||
})
|
||||
.pipe(Effect.forkChild)
|
||||
.pipe(provideInstance(dir), Effect.forkChild)
|
||||
|
||||
yield* llm.wait(1)
|
||||
|
||||
|
|
@ -679,7 +674,7 @@ it.live(
|
|||
model: ref,
|
||||
parts: [{ type: "text", text: "second" }],
|
||||
})
|
||||
.pipe(Effect.forkChild)
|
||||
.pipe(provideInstance(dir), Effect.forkChild)
|
||||
|
||||
yield* Effect.promise(async () => {
|
||||
const end = Date.now() + 5000
|
||||
|
|
|
|||
Loading…
Reference in New Issue