refactor(session): effectify Session service (#19449)
parent
c33d9996f0
commit
e5f0e813b6
|
|
@ -33,6 +33,8 @@ import { Permission } from "@/permission"
|
|||
import { Global } from "@/global"
|
||||
import type { LanguageModelV2Usage } from "@ai-sdk/provider"
|
||||
import { iife } from "@/util/iife"
|
||||
import { Effect, Layer, Scope, ServiceMap } from "effect"
|
||||
import { makeRuntime } from "@/effect/run-service"
|
||||
|
||||
export namespace Session {
|
||||
const log = Log.create({ service: "session" })
|
||||
|
|
@ -233,75 +235,159 @@ export namespace Session {
|
|||
),
|
||||
}
|
||||
|
||||
export const create = fn(
|
||||
z
|
||||
.object({
|
||||
parentID: SessionID.zod.optional(),
|
||||
title: z.string().optional(),
|
||||
permission: Info.shape.permission,
|
||||
workspaceID: WorkspaceID.zod.optional(),
|
||||
})
|
||||
.optional(),
|
||||
async (input) => {
|
||||
return createNext({
|
||||
parentID: input?.parentID,
|
||||
directory: Instance.directory,
|
||||
title: input?.title,
|
||||
permission: input?.permission,
|
||||
workspaceID: input?.workspaceID,
|
||||
})
|
||||
},
|
||||
export function plan(input: { slug: string; time: { created: number } }) {
|
||||
const base = Instance.project.vcs
|
||||
? path.join(Instance.worktree, ".opencode", "plans")
|
||||
: path.join(Global.Path.data, "plans")
|
||||
return path.join(base, [input.time.created, input.slug].join("-") + ".md")
|
||||
}
|
||||
|
||||
export const getUsage = (input: {
|
||||
model: Provider.Model
|
||||
usage: LanguageModelV2Usage
|
||||
metadata?: ProviderMetadata
|
||||
}) => {
|
||||
const safe = (value: number) => {
|
||||
if (!Number.isFinite(value)) return 0
|
||||
return value
|
||||
}
|
||||
const inputTokens = safe(input.usage.inputTokens ?? 0)
|
||||
const outputTokens = safe(input.usage.outputTokens ?? 0)
|
||||
const reasoningTokens = safe(input.usage.reasoningTokens ?? 0)
|
||||
|
||||
const cacheReadInputTokens = safe(input.usage.cachedInputTokens ?? 0)
|
||||
const cacheWriteInputTokens = safe(
|
||||
(input.metadata?.["anthropic"]?.["cacheCreationInputTokens"] ??
|
||||
// @ts-expect-error
|
||||
input.metadata?.["bedrock"]?.["usage"]?.["cacheWriteInputTokens"] ??
|
||||
// @ts-expect-error
|
||||
input.metadata?.["venice"]?.["usage"]?.["cacheCreationInputTokens"] ??
|
||||
0) as number,
|
||||
)
|
||||
|
||||
export const fork = fn(
|
||||
z.object({
|
||||
sessionID: SessionID.zod,
|
||||
messageID: MessageID.zod.optional(),
|
||||
}),
|
||||
async (input) => {
|
||||
const original = await get(input.sessionID)
|
||||
if (!original) throw new Error("session not found")
|
||||
const title = getForkedTitle(original.title)
|
||||
const session = await createNext({
|
||||
directory: Instance.directory,
|
||||
workspaceID: original.workspaceID,
|
||||
title,
|
||||
})
|
||||
const msgs = await messages({ sessionID: input.sessionID })
|
||||
const idMap = new Map<string, MessageID>()
|
||||
|
||||
for (const msg of msgs) {
|
||||
if (input.messageID && msg.info.id >= input.messageID) break
|
||||
const newID = MessageID.ascending()
|
||||
idMap.set(msg.info.id, newID)
|
||||
|
||||
const parentID = msg.info.role === "assistant" && msg.info.parentID ? idMap.get(msg.info.parentID) : undefined
|
||||
const cloned = await updateMessage({
|
||||
...msg.info,
|
||||
sessionID: session.id,
|
||||
id: newID,
|
||||
...(parentID && { parentID }),
|
||||
})
|
||||
|
||||
for (const part of msg.parts) {
|
||||
await updatePart({
|
||||
...part,
|
||||
id: PartID.ascending(),
|
||||
messageID: cloned.id,
|
||||
sessionID: session.id,
|
||||
})
|
||||
}
|
||||
}
|
||||
return session
|
||||
},
|
||||
// OpenRouter provides inputTokens as the total count of input tokens (including cached).
|
||||
// AFAIK other providers (OpenRouter/OpenAI/Gemini etc.) do it the same way e.g. vercel/ai#8794 (comment)
|
||||
// Anthropic does it differently though - inputTokens doesn't include cached tokens.
|
||||
// It looks like OpenCode's cost calculation assumes all providers return inputTokens the same way Anthropic does (I'm guessing getUsage logic was originally implemented with anthropic), so it's causing incorrect cost calculation for OpenRouter and others.
|
||||
const excludesCachedTokens = !!(input.metadata?.["anthropic"] || input.metadata?.["bedrock"])
|
||||
const adjustedInputTokens = safe(
|
||||
excludesCachedTokens ? inputTokens : inputTokens - cacheReadInputTokens - cacheWriteInputTokens,
|
||||
)
|
||||
|
||||
export const touch = fn(SessionID.zod, async (sessionID) => {
|
||||
const time = Date.now()
|
||||
SyncEvent.run(Event.Updated, { sessionID, info: { time: { updated: time } } })
|
||||
const total = iife(() => {
|
||||
// Anthropic doesn't provide total_tokens, also ai sdk will vastly undercount if we
|
||||
// don't compute from components
|
||||
if (
|
||||
input.model.api.npm === "@ai-sdk/anthropic" ||
|
||||
input.model.api.npm === "@ai-sdk/amazon-bedrock" ||
|
||||
input.model.api.npm === "@ai-sdk/google-vertex/anthropic"
|
||||
) {
|
||||
return adjustedInputTokens + outputTokens + cacheReadInputTokens + cacheWriteInputTokens
|
||||
}
|
||||
return input.usage.totalTokens
|
||||
})
|
||||
|
||||
export async function createNext(input: {
|
||||
const tokens = {
|
||||
total,
|
||||
input: adjustedInputTokens,
|
||||
output: outputTokens,
|
||||
reasoning: reasoningTokens,
|
||||
cache: {
|
||||
write: cacheWriteInputTokens,
|
||||
read: cacheReadInputTokens,
|
||||
},
|
||||
}
|
||||
|
||||
const costInfo =
|
||||
input.model.cost?.experimentalOver200K && tokens.input + tokens.cache.read > 200_000
|
||||
? input.model.cost.experimentalOver200K
|
||||
: input.model.cost
|
||||
return {
|
||||
cost: safe(
|
||||
new Decimal(0)
|
||||
.add(new Decimal(tokens.input).mul(costInfo?.input ?? 0).div(1_000_000))
|
||||
.add(new Decimal(tokens.output).mul(costInfo?.output ?? 0).div(1_000_000))
|
||||
.add(new Decimal(tokens.cache.read).mul(costInfo?.cache?.read ?? 0).div(1_000_000))
|
||||
.add(new Decimal(tokens.cache.write).mul(costInfo?.cache?.write ?? 0).div(1_000_000))
|
||||
// TODO: update models.dev to have better pricing model, for now:
|
||||
// charge reasoning tokens at the same rate as output tokens
|
||||
.add(new Decimal(tokens.reasoning).mul(costInfo?.output ?? 0).div(1_000_000))
|
||||
.toNumber(),
|
||||
),
|
||||
tokens,
|
||||
}
|
||||
}
|
||||
|
||||
export class BusyError extends Error {
|
||||
constructor(public readonly sessionID: string) {
|
||||
super(`Session ${sessionID} is busy`)
|
||||
}
|
||||
}
|
||||
|
||||
export interface Interface {
|
||||
readonly create: (input?: {
|
||||
parentID?: SessionID
|
||||
title?: string
|
||||
permission?: Permission.Ruleset
|
||||
workspaceID?: WorkspaceID
|
||||
}) => Effect.Effect<Info>
|
||||
readonly fork: (input: { sessionID: SessionID; messageID?: MessageID }) => Effect.Effect<Info>
|
||||
readonly touch: (sessionID: SessionID) => Effect.Effect<void>
|
||||
readonly get: (id: SessionID) => Effect.Effect<Info>
|
||||
readonly share: (id: SessionID) => Effect.Effect<{ url: string }>
|
||||
readonly unshare: (id: SessionID) => Effect.Effect<void>
|
||||
readonly setTitle: (input: { sessionID: SessionID; title: string }) => Effect.Effect<void>
|
||||
readonly setArchived: (input: { sessionID: SessionID; time?: number }) => Effect.Effect<void>
|
||||
readonly setPermission: (input: { sessionID: SessionID; permission: Permission.Ruleset }) => Effect.Effect<void>
|
||||
readonly setRevert: (input: {
|
||||
sessionID: SessionID
|
||||
revert: Info["revert"]
|
||||
summary: Info["summary"]
|
||||
}) => Effect.Effect<void>
|
||||
readonly clearRevert: (sessionID: SessionID) => Effect.Effect<void>
|
||||
readonly setSummary: (input: { sessionID: SessionID; summary: Info["summary"] }) => Effect.Effect<void>
|
||||
readonly diff: (sessionID: SessionID) => Effect.Effect<Snapshot.FileDiff[]>
|
||||
readonly messages: (input: { sessionID: SessionID; limit?: number }) => Effect.Effect<MessageV2.WithParts[]>
|
||||
readonly children: (parentID: SessionID) => Effect.Effect<Info[]>
|
||||
readonly remove: (sessionID: SessionID) => Effect.Effect<void>
|
||||
readonly updateMessage: (msg: MessageV2.Info) => Effect.Effect<MessageV2.Info>
|
||||
readonly removeMessage: (input: { sessionID: SessionID; messageID: MessageID }) => Effect.Effect<MessageID>
|
||||
readonly removePart: (input: {
|
||||
sessionID: SessionID
|
||||
messageID: MessageID
|
||||
partID: PartID
|
||||
}) => Effect.Effect<PartID>
|
||||
readonly updatePart: (part: MessageV2.Part) => Effect.Effect<MessageV2.Part>
|
||||
readonly updatePartDelta: (input: {
|
||||
sessionID: SessionID
|
||||
messageID: MessageID
|
||||
partID: PartID
|
||||
field: string
|
||||
delta: string
|
||||
}) => Effect.Effect<void>
|
||||
readonly initialize: (input: {
|
||||
sessionID: SessionID
|
||||
modelID: ModelID
|
||||
providerID: ProviderID
|
||||
messageID: MessageID
|
||||
}) => Effect.Effect<void>
|
||||
}
|
||||
|
||||
export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/Session") {}
|
||||
|
||||
type Patch = z.infer<typeof Event.Updated.schema>["info"]
|
||||
|
||||
const db = <T>(fn: (d: Parameters<typeof Database.use>[0] extends (trx: infer D) => any ? D : never) => T) =>
|
||||
Effect.sync(() => Database.use(fn))
|
||||
|
||||
export const layer: Layer.Layer<Service, never, Bus.Service | Config.Service> = Layer.effect(
|
||||
Service,
|
||||
Effect.gen(function* () {
|
||||
const bus = yield* Bus.Service
|
||||
const config = yield* Config.Service
|
||||
const scope = yield* Scope.Scope
|
||||
|
||||
const createNext = Effect.fn("Session.createNext")(function* (input: {
|
||||
id?: SessionID
|
||||
title?: string
|
||||
parentID?: SessionID
|
||||
|
|
@ -326,152 +412,200 @@ export namespace Session {
|
|||
}
|
||||
log.info("created", result)
|
||||
|
||||
SyncEvent.run(Event.Created, { sessionID: result.id, info: result })
|
||||
yield* Effect.sync(() => SyncEvent.run(Event.Created, { sessionID: result.id, info: result }))
|
||||
|
||||
const cfg = await Config.get()
|
||||
const cfg = yield* config.get()
|
||||
if (!result.parentID && (Flag.OPENCODE_AUTO_SHARE || cfg.share === "auto")) {
|
||||
share(result.id).catch(() => {
|
||||
// Silently ignore sharing errors during session creation
|
||||
})
|
||||
yield* share(result.id).pipe(Effect.ignore, Effect.forkIn(scope))
|
||||
}
|
||||
|
||||
if (!Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) {
|
||||
// This only exist for backwards compatibility. We should not be
|
||||
// manually publishing this event; it is a sync event now
|
||||
Bus.publish(Event.Updated, {
|
||||
yield* bus.publish(Event.Updated, {
|
||||
sessionID: result.id,
|
||||
info: result,
|
||||
})
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
})
|
||||
|
||||
export function plan(input: { slug: string; time: { created: number } }) {
|
||||
const base = Instance.project.vcs
|
||||
? path.join(Instance.worktree, ".opencode", "plans")
|
||||
: path.join(Global.Path.data, "plans")
|
||||
return path.join(base, [input.time.created, input.slug].join("-") + ".md")
|
||||
}
|
||||
|
||||
export const get = fn(SessionID.zod, async (id) => {
|
||||
const row = Database.use((db) => db.select().from(SessionTable).where(eq(SessionTable.id, id)).get())
|
||||
const get = Effect.fn("Session.get")(function* (id: SessionID) {
|
||||
const row = yield* db((d) => d.select().from(SessionTable).where(eq(SessionTable.id, id)).get())
|
||||
if (!row) throw new NotFoundError({ message: `Session not found: ${id}` })
|
||||
return fromRow(row)
|
||||
})
|
||||
|
||||
export const share = fn(SessionID.zod, async (id) => {
|
||||
const cfg = await Config.get()
|
||||
if (cfg.share === "disabled") {
|
||||
throw new Error("Sharing is disabled in configuration")
|
||||
}
|
||||
const share = Effect.fn("Session.share")(function* (id: SessionID) {
|
||||
const cfg = yield* config.get()
|
||||
if (cfg.share === "disabled") throw new Error("Sharing is disabled in configuration")
|
||||
const result = yield* Effect.promise(async () => {
|
||||
const { ShareNext } = await import("@/share/share-next")
|
||||
const share = await ShareNext.create(id)
|
||||
|
||||
SyncEvent.run(Event.Updated, { sessionID: id, info: { share: { url: share.url } } })
|
||||
|
||||
return share
|
||||
return ShareNext.create(id)
|
||||
})
|
||||
yield* Effect.sync(() => SyncEvent.run(Event.Updated, { sessionID: id, info: { share: { url: result.url } } }))
|
||||
return result
|
||||
})
|
||||
|
||||
export const unshare = fn(SessionID.zod, async (id) => {
|
||||
// Use ShareNext to remove the share (same as share function uses ShareNext to create)
|
||||
const unshare = Effect.fn("Session.unshare")(function* (id: SessionID) {
|
||||
yield* Effect.promise(async () => {
|
||||
const { ShareNext } = await import("@/share/share-next")
|
||||
await ShareNext.remove(id)
|
||||
|
||||
SyncEvent.run(Event.Updated, { sessionID: id, info: { share: { url: null } } })
|
||||
})
|
||||
yield* Effect.sync(() => SyncEvent.run(Event.Updated, { sessionID: id, info: { share: { url: null } } }))
|
||||
})
|
||||
|
||||
export const setTitle = fn(
|
||||
z.object({
|
||||
sessionID: SessionID.zod,
|
||||
title: z.string(),
|
||||
}),
|
||||
async (input) => {
|
||||
SyncEvent.run(Event.Updated, { sessionID: input.sessionID, info: { title: input.title } })
|
||||
},
|
||||
const children = Effect.fn("Session.children")(function* (parentID: SessionID) {
|
||||
const project = Instance.project
|
||||
const rows = yield* db((d) =>
|
||||
d
|
||||
.select()
|
||||
.from(SessionTable)
|
||||
.where(and(eq(SessionTable.project_id, project.id), eq(SessionTable.parent_id, parentID)))
|
||||
.all(),
|
||||
)
|
||||
|
||||
export const setArchived = fn(
|
||||
z.object({
|
||||
sessionID: SessionID.zod,
|
||||
time: z.number().optional(),
|
||||
}),
|
||||
async (input) => {
|
||||
SyncEvent.run(Event.Updated, { sessionID: input.sessionID, info: { time: { archived: input.time } } })
|
||||
},
|
||||
)
|
||||
|
||||
export const setPermission = fn(
|
||||
z.object({
|
||||
sessionID: SessionID.zod,
|
||||
permission: Permission.Ruleset,
|
||||
}),
|
||||
async (input) => {
|
||||
SyncEvent.run(Event.Updated, {
|
||||
sessionID: input.sessionID,
|
||||
info: { permission: input.permission, time: { updated: Date.now() } },
|
||||
})
|
||||
},
|
||||
)
|
||||
|
||||
export const setRevert = fn(
|
||||
z.object({
|
||||
sessionID: SessionID.zod,
|
||||
revert: Info.shape.revert,
|
||||
summary: Info.shape.summary,
|
||||
}),
|
||||
async (input) => {
|
||||
SyncEvent.run(Event.Updated, {
|
||||
sessionID: input.sessionID,
|
||||
info: {
|
||||
summary: input.summary,
|
||||
time: { updated: Date.now() },
|
||||
revert: input.revert,
|
||||
},
|
||||
})
|
||||
},
|
||||
)
|
||||
|
||||
export const clearRevert = fn(SessionID.zod, async (sessionID) => {
|
||||
SyncEvent.run(Event.Updated, {
|
||||
sessionID,
|
||||
info: {
|
||||
time: { updated: Date.now() },
|
||||
revert: null,
|
||||
},
|
||||
})
|
||||
return rows.map(fromRow)
|
||||
})
|
||||
|
||||
export const setSummary = fn(
|
||||
z.object({
|
||||
sessionID: SessionID.zod,
|
||||
summary: Info.shape.summary,
|
||||
}),
|
||||
async (input) => {
|
||||
SyncEvent.run(Event.Updated, {
|
||||
sessionID: input.sessionID,
|
||||
info: {
|
||||
time: { updated: Date.now() },
|
||||
summary: input.summary,
|
||||
},
|
||||
})
|
||||
},
|
||||
)
|
||||
|
||||
export const diff = fn(SessionID.zod, async (sessionID) => {
|
||||
const remove: (sessionID: SessionID) => Effect.Effect<void> = Effect.fnUntraced(function* (sessionID: SessionID) {
|
||||
try {
|
||||
return await Storage.read<Snapshot.FileDiff[]>(["session_diff", sessionID])
|
||||
} catch {
|
||||
return []
|
||||
const session = yield* get(sessionID)
|
||||
const kids = yield* children(sessionID)
|
||||
for (const child of kids) {
|
||||
yield* remove(child.id)
|
||||
}
|
||||
yield* unshare(sessionID).pipe(Effect.ignore)
|
||||
yield* Effect.sync(() => {
|
||||
SyncEvent.run(Event.Deleted, { sessionID, info: session })
|
||||
SyncEvent.remove(sessionID)
|
||||
})
|
||||
} catch (e) {
|
||||
log.error(e)
|
||||
}
|
||||
})
|
||||
|
||||
export const messages = fn(
|
||||
z.object({
|
||||
sessionID: SessionID.zod,
|
||||
limit: z.number().optional(),
|
||||
const updateMessage = Effect.fn("Session.updateMessage")(function* (msg: MessageV2.Info) {
|
||||
yield* Effect.sync(() =>
|
||||
SyncEvent.run(MessageV2.Event.Updated, {
|
||||
sessionID: msg.sessionID,
|
||||
info: msg,
|
||||
}),
|
||||
async (input) => {
|
||||
)
|
||||
return msg
|
||||
})
|
||||
|
||||
const updatePart = Effect.fn("Session.updatePart")(function* (part: MessageV2.Part) {
|
||||
yield* Effect.sync(() =>
|
||||
SyncEvent.run(MessageV2.Event.PartUpdated, {
|
||||
sessionID: part.sessionID,
|
||||
part: structuredClone(part),
|
||||
time: Date.now(),
|
||||
}),
|
||||
)
|
||||
return part
|
||||
})
|
||||
|
||||
const create = Effect.fn("Session.create")(function* (input?: {
|
||||
parentID?: SessionID
|
||||
title?: string
|
||||
permission?: Permission.Ruleset
|
||||
workspaceID?: WorkspaceID
|
||||
}) {
|
||||
return yield* createNext({
|
||||
parentID: input?.parentID,
|
||||
directory: Instance.directory,
|
||||
title: input?.title,
|
||||
permission: input?.permission,
|
||||
workspaceID: input?.workspaceID,
|
||||
})
|
||||
})
|
||||
|
||||
const fork = Effect.fn("Session.fork")(function* (input: { sessionID: SessionID; messageID?: MessageID }) {
|
||||
const original = yield* get(input.sessionID)
|
||||
const title = getForkedTitle(original.title)
|
||||
const session = yield* createNext({
|
||||
directory: Instance.directory,
|
||||
workspaceID: original.workspaceID,
|
||||
title,
|
||||
})
|
||||
const msgs = yield* messages({ sessionID: input.sessionID })
|
||||
const idMap = new Map<string, MessageID>()
|
||||
|
||||
for (const msg of msgs) {
|
||||
if (input.messageID && msg.info.id >= input.messageID) break
|
||||
const newID = MessageID.ascending()
|
||||
idMap.set(msg.info.id, newID)
|
||||
|
||||
const parentID = msg.info.role === "assistant" && msg.info.parentID ? idMap.get(msg.info.parentID) : undefined
|
||||
const cloned = yield* updateMessage({
|
||||
...msg.info,
|
||||
sessionID: session.id,
|
||||
id: newID,
|
||||
...(parentID && { parentID }),
|
||||
})
|
||||
|
||||
for (const part of msg.parts) {
|
||||
yield* updatePart({
|
||||
...part,
|
||||
id: PartID.ascending(),
|
||||
messageID: cloned.id,
|
||||
sessionID: session.id,
|
||||
})
|
||||
}
|
||||
}
|
||||
return session
|
||||
})
|
||||
|
||||
const patch = (sessionID: SessionID, info: Patch) =>
|
||||
Effect.sync(() => SyncEvent.run(Event.Updated, { sessionID, info }))
|
||||
|
||||
const touch = Effect.fn("Session.touch")(function* (sessionID: SessionID) {
|
||||
yield* patch(sessionID, { time: { updated: Date.now() } })
|
||||
})
|
||||
|
||||
const setTitle = Effect.fn("Session.setTitle")(function* (input: { sessionID: SessionID; title: string }) {
|
||||
yield* patch(input.sessionID, { title: input.title })
|
||||
})
|
||||
|
||||
const setArchived = Effect.fn("Session.setArchived")(function* (input: { sessionID: SessionID; time?: number }) {
|
||||
yield* patch(input.sessionID, { time: { archived: input.time } })
|
||||
})
|
||||
|
||||
const setPermission = Effect.fn("Session.setPermission")(function* (input: {
|
||||
sessionID: SessionID
|
||||
permission: Permission.Ruleset
|
||||
}) {
|
||||
yield* patch(input.sessionID, { permission: input.permission, time: { updated: Date.now() } })
|
||||
})
|
||||
|
||||
const setRevert = Effect.fn("Session.setRevert")(function* (input: {
|
||||
sessionID: SessionID
|
||||
revert: Info["revert"]
|
||||
summary: Info["summary"]
|
||||
}) {
|
||||
yield* patch(input.sessionID, { summary: input.summary, time: { updated: Date.now() }, revert: input.revert })
|
||||
})
|
||||
|
||||
const clearRevert = Effect.fn("Session.clearRevert")(function* (sessionID: SessionID) {
|
||||
yield* patch(sessionID, { time: { updated: Date.now() }, revert: null })
|
||||
})
|
||||
|
||||
const setSummary = Effect.fn("Session.setSummary")(function* (input: {
|
||||
sessionID: SessionID
|
||||
summary: Info["summary"]
|
||||
}) {
|
||||
yield* patch(input.sessionID, { time: { updated: Date.now() }, summary: input.summary })
|
||||
})
|
||||
|
||||
const diff = Effect.fn("Session.diff")(function* (sessionID: SessionID) {
|
||||
return yield* Effect.tryPromise(() => Storage.read<Snapshot.FileDiff[]>(["session_diff", sessionID])).pipe(
|
||||
Effect.orElseSucceed(() => [] as Snapshot.FileDiff[]),
|
||||
)
|
||||
})
|
||||
|
||||
const messages = Effect.fn("Session.messages")(function* (input: { sessionID: SessionID; limit?: number }) {
|
||||
return yield* Effect.promise(async () => {
|
||||
const result = [] as MessageV2.WithParts[]
|
||||
for await (const msg of MessageV2.stream(input.sessionID)) {
|
||||
if (input.limit && result.length >= input.limit) break
|
||||
|
|
@ -479,7 +613,144 @@ export namespace Session {
|
|||
}
|
||||
result.reverse()
|
||||
return result
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
const removeMessage = Effect.fn("Session.removeMessage")(function* (input: {
|
||||
sessionID: SessionID
|
||||
messageID: MessageID
|
||||
}) {
|
||||
yield* Effect.sync(() =>
|
||||
SyncEvent.run(MessageV2.Event.Removed, {
|
||||
sessionID: input.sessionID,
|
||||
messageID: input.messageID,
|
||||
}),
|
||||
)
|
||||
return input.messageID
|
||||
})
|
||||
|
||||
const removePart = Effect.fn("Session.removePart")(function* (input: {
|
||||
sessionID: SessionID
|
||||
messageID: MessageID
|
||||
partID: PartID
|
||||
}) {
|
||||
yield* Effect.sync(() =>
|
||||
SyncEvent.run(MessageV2.Event.PartRemoved, {
|
||||
sessionID: input.sessionID,
|
||||
messageID: input.messageID,
|
||||
partID: input.partID,
|
||||
}),
|
||||
)
|
||||
return input.partID
|
||||
})
|
||||
|
||||
const updatePartDelta = Effect.fn("Session.updatePartDelta")(function* (input: {
|
||||
sessionID: SessionID
|
||||
messageID: MessageID
|
||||
partID: PartID
|
||||
field: string
|
||||
delta: string
|
||||
}) {
|
||||
yield* bus.publish(MessageV2.Event.PartDelta, input)
|
||||
})
|
||||
|
||||
const initialize = Effect.fn("Session.initialize")(function* (input: {
|
||||
sessionID: SessionID
|
||||
modelID: ModelID
|
||||
providerID: ProviderID
|
||||
messageID: MessageID
|
||||
}) {
|
||||
yield* Effect.promise(() =>
|
||||
SessionPrompt.command({
|
||||
sessionID: input.sessionID,
|
||||
messageID: input.messageID,
|
||||
model: input.providerID + "/" + input.modelID,
|
||||
command: Command.Default.INIT,
|
||||
arguments: "",
|
||||
}),
|
||||
)
|
||||
})
|
||||
|
||||
return Service.of({
|
||||
create,
|
||||
fork,
|
||||
touch,
|
||||
get,
|
||||
share,
|
||||
unshare,
|
||||
setTitle,
|
||||
setArchived,
|
||||
setPermission,
|
||||
setRevert,
|
||||
clearRevert,
|
||||
setSummary,
|
||||
diff,
|
||||
messages,
|
||||
children,
|
||||
remove,
|
||||
updateMessage,
|
||||
removeMessage,
|
||||
removePart,
|
||||
updatePart,
|
||||
updatePartDelta,
|
||||
initialize,
|
||||
})
|
||||
}),
|
||||
)
|
||||
|
||||
export const defaultLayer = layer.pipe(Layer.provide(Bus.layer), Layer.provide(Config.defaultLayer))
|
||||
|
||||
const { runPromise } = makeRuntime(Service, defaultLayer)
|
||||
|
||||
export const create = fn(
|
||||
z
|
||||
.object({
|
||||
parentID: SessionID.zod.optional(),
|
||||
title: z.string().optional(),
|
||||
permission: Info.shape.permission,
|
||||
workspaceID: WorkspaceID.zod.optional(),
|
||||
})
|
||||
.optional(),
|
||||
(input) => runPromise((svc) => svc.create(input)),
|
||||
)
|
||||
|
||||
export const fork = fn(z.object({ sessionID: SessionID.zod, messageID: MessageID.zod.optional() }), (input) =>
|
||||
runPromise((svc) => svc.fork(input)),
|
||||
)
|
||||
|
||||
export const touch = fn(SessionID.zod, (id) => runPromise((svc) => svc.touch(id)))
|
||||
export const get = fn(SessionID.zod, (id) => runPromise((svc) => svc.get(id)))
|
||||
export const share = fn(SessionID.zod, (id) => runPromise((svc) => svc.share(id)))
|
||||
export const unshare = fn(SessionID.zod, (id) => runPromise((svc) => svc.unshare(id)))
|
||||
|
||||
export const setTitle = fn(z.object({ sessionID: SessionID.zod, title: z.string() }), (input) =>
|
||||
runPromise((svc) => svc.setTitle(input)),
|
||||
)
|
||||
|
||||
export const setArchived = fn(z.object({ sessionID: SessionID.zod, time: z.number().optional() }), (input) =>
|
||||
runPromise((svc) => svc.setArchived(input)),
|
||||
)
|
||||
|
||||
export const setPermission = fn(z.object({ sessionID: SessionID.zod, permission: Permission.Ruleset }), (input) =>
|
||||
runPromise((svc) => svc.setPermission(input)),
|
||||
)
|
||||
|
||||
export const setRevert = fn(
|
||||
z.object({ sessionID: SessionID.zod, revert: Info.shape.revert, summary: Info.shape.summary }),
|
||||
(input) =>
|
||||
runPromise((svc) => svc.setRevert({ sessionID: input.sessionID, revert: input.revert, summary: input.summary })),
|
||||
)
|
||||
|
||||
export const clearRevert = fn(SessionID.zod, (id) => runPromise((svc) => svc.clearRevert(id)))
|
||||
|
||||
export const setSummary = fn(z.object({ sessionID: SessionID.zod, summary: Info.shape.summary }), (input) =>
|
||||
runPromise((svc) => svc.setSummary({ sessionID: input.sessionID, summary: input.summary })),
|
||||
)
|
||||
|
||||
export const diff = fn(SessionID.zod, (id) => runPromise((svc) => svc.diff(id)))
|
||||
|
||||
export const messages = fn(z.object({ sessionID: SessionID.zod, limit: z.number().optional() }), (input) =>
|
||||
runPromise((svc) => svc.messages(input)),
|
||||
)
|
||||
|
||||
export function* list(input?: {
|
||||
|
|
@ -594,84 +865,20 @@ export namespace Session {
|
|||
}
|
||||
}
|
||||
|
||||
export const children = fn(SessionID.zod, async (parentID) => {
|
||||
const project = Instance.project
|
||||
const rows = Database.use((db) =>
|
||||
db
|
||||
.select()
|
||||
.from(SessionTable)
|
||||
.where(and(eq(SessionTable.project_id, project.id), eq(SessionTable.parent_id, parentID)))
|
||||
.all(),
|
||||
)
|
||||
return rows.map(fromRow)
|
||||
})
|
||||
export const children = fn(SessionID.zod, (id) => runPromise((svc) => svc.children(id)))
|
||||
export const remove = fn(SessionID.zod, (id) => runPromise((svc) => svc.remove(id)))
|
||||
export const updateMessage = fn(MessageV2.Info, (msg) => runPromise((svc) => svc.updateMessage(msg)))
|
||||
|
||||
export const remove = fn(SessionID.zod, async (sessionID) => {
|
||||
try {
|
||||
const session = await get(sessionID)
|
||||
for (const child of await children(sessionID)) {
|
||||
await remove(child.id)
|
||||
}
|
||||
await unshare(sessionID).catch(() => {})
|
||||
|
||||
SyncEvent.run(Event.Deleted, { sessionID, info: session })
|
||||
|
||||
// Eagerly remove event sourcing data to free up space
|
||||
SyncEvent.remove(sessionID)
|
||||
} catch (e) {
|
||||
log.error(e)
|
||||
}
|
||||
})
|
||||
|
||||
export const updateMessage = fn(MessageV2.Info, async (msg) => {
|
||||
SyncEvent.run(MessageV2.Event.Updated, {
|
||||
sessionID: msg.sessionID,
|
||||
info: msg,
|
||||
})
|
||||
|
||||
return msg
|
||||
})
|
||||
|
||||
export const removeMessage = fn(
|
||||
z.object({
|
||||
sessionID: SessionID.zod,
|
||||
messageID: MessageID.zod,
|
||||
}),
|
||||
async (input) => {
|
||||
SyncEvent.run(MessageV2.Event.Removed, {
|
||||
sessionID: input.sessionID,
|
||||
messageID: input.messageID,
|
||||
})
|
||||
return input.messageID
|
||||
},
|
||||
export const removeMessage = fn(z.object({ sessionID: SessionID.zod, messageID: MessageID.zod }), (input) =>
|
||||
runPromise((svc) => svc.removeMessage(input)),
|
||||
)
|
||||
|
||||
export const removePart = fn(
|
||||
z.object({
|
||||
sessionID: SessionID.zod,
|
||||
messageID: MessageID.zod,
|
||||
partID: PartID.zod,
|
||||
}),
|
||||
async (input) => {
|
||||
SyncEvent.run(MessageV2.Event.PartRemoved, {
|
||||
sessionID: input.sessionID,
|
||||
messageID: input.messageID,
|
||||
partID: input.partID,
|
||||
})
|
||||
return input.partID
|
||||
},
|
||||
z.object({ sessionID: SessionID.zod, messageID: MessageID.zod, partID: PartID.zod }),
|
||||
(input) => runPromise((svc) => svc.removePart(input)),
|
||||
)
|
||||
|
||||
const UpdatePartInput = MessageV2.Part
|
||||
|
||||
export const updatePart = fn(UpdatePartInput, async (part) => {
|
||||
SyncEvent.run(MessageV2.Event.PartUpdated, {
|
||||
sessionID: part.sessionID,
|
||||
part: structuredClone(part),
|
||||
time: Date.now(),
|
||||
})
|
||||
return part
|
||||
})
|
||||
export const updatePart = fn(MessageV2.Part, (part) => runPromise((svc) => svc.updatePart(part)))
|
||||
|
||||
export const updatePartDelta = fn(
|
||||
z.object({
|
||||
|
|
@ -681,111 +888,11 @@ export namespace Session {
|
|||
field: z.string(),
|
||||
delta: z.string(),
|
||||
}),
|
||||
async (input) => {
|
||||
Bus.publish(MessageV2.Event.PartDelta, input)
|
||||
},
|
||||
(input) => runPromise((svc) => svc.updatePartDelta(input)),
|
||||
)
|
||||
|
||||
export const getUsage = fn(
|
||||
z.object({
|
||||
model: z.custom<Provider.Model>(),
|
||||
usage: z.custom<LanguageModelV2Usage>(),
|
||||
metadata: z.custom<ProviderMetadata>().optional(),
|
||||
}),
|
||||
(input) => {
|
||||
const safe = (value: number) => {
|
||||
if (!Number.isFinite(value)) return 0
|
||||
return value
|
||||
}
|
||||
const inputTokens = safe(input.usage.inputTokens ?? 0)
|
||||
const outputTokens = safe(input.usage.outputTokens ?? 0)
|
||||
const reasoningTokens = safe(input.usage.reasoningTokens ?? 0)
|
||||
|
||||
const cacheReadInputTokens = safe(input.usage.cachedInputTokens ?? 0)
|
||||
const cacheWriteInputTokens = safe(
|
||||
(input.metadata?.["anthropic"]?.["cacheCreationInputTokens"] ??
|
||||
// @ts-expect-error
|
||||
input.metadata?.["bedrock"]?.["usage"]?.["cacheWriteInputTokens"] ??
|
||||
// @ts-expect-error
|
||||
input.metadata?.["venice"]?.["usage"]?.["cacheCreationInputTokens"] ??
|
||||
0) as number,
|
||||
)
|
||||
|
||||
// OpenRouter provides inputTokens as the total count of input tokens (including cached).
|
||||
// AFAIK other providers (OpenRouter/OpenAI/Gemini etc.) do it the same way e.g. vercel/ai#8794 (comment)
|
||||
// Anthropic does it differently though - inputTokens doesn't include cached tokens.
|
||||
// It looks like OpenCode's cost calculation assumes all providers return inputTokens the same way Anthropic does (I'm guessing getUsage logic was originally implemented with anthropic), so it's causing incorrect cost calculation for OpenRouter and others.
|
||||
const excludesCachedTokens = !!(input.metadata?.["anthropic"] || input.metadata?.["bedrock"])
|
||||
const adjustedInputTokens = safe(
|
||||
excludesCachedTokens ? inputTokens : inputTokens - cacheReadInputTokens - cacheWriteInputTokens,
|
||||
)
|
||||
|
||||
const total = iife(() => {
|
||||
// Anthropic doesn't provide total_tokens, also ai sdk will vastly undercount if we
|
||||
// don't compute from components
|
||||
if (
|
||||
input.model.api.npm === "@ai-sdk/anthropic" ||
|
||||
input.model.api.npm === "@ai-sdk/amazon-bedrock" ||
|
||||
input.model.api.npm === "@ai-sdk/google-vertex/anthropic"
|
||||
) {
|
||||
return adjustedInputTokens + outputTokens + cacheReadInputTokens + cacheWriteInputTokens
|
||||
}
|
||||
return input.usage.totalTokens
|
||||
})
|
||||
|
||||
const tokens = {
|
||||
total,
|
||||
input: adjustedInputTokens,
|
||||
output: outputTokens,
|
||||
reasoning: reasoningTokens,
|
||||
cache: {
|
||||
write: cacheWriteInputTokens,
|
||||
read: cacheReadInputTokens,
|
||||
},
|
||||
}
|
||||
|
||||
const costInfo =
|
||||
input.model.cost?.experimentalOver200K && tokens.input + tokens.cache.read > 200_000
|
||||
? input.model.cost.experimentalOver200K
|
||||
: input.model.cost
|
||||
return {
|
||||
cost: safe(
|
||||
new Decimal(0)
|
||||
.add(new Decimal(tokens.input).mul(costInfo?.input ?? 0).div(1_000_000))
|
||||
.add(new Decimal(tokens.output).mul(costInfo?.output ?? 0).div(1_000_000))
|
||||
.add(new Decimal(tokens.cache.read).mul(costInfo?.cache?.read ?? 0).div(1_000_000))
|
||||
.add(new Decimal(tokens.cache.write).mul(costInfo?.cache?.write ?? 0).div(1_000_000))
|
||||
// TODO: update models.dev to have better pricing model, for now:
|
||||
// charge reasoning tokens at the same rate as output tokens
|
||||
.add(new Decimal(tokens.reasoning).mul(costInfo?.output ?? 0).div(1_000_000))
|
||||
.toNumber(),
|
||||
),
|
||||
tokens,
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
export class BusyError extends Error {
|
||||
constructor(public readonly sessionID: string) {
|
||||
super(`Session ${sessionID} is busy`)
|
||||
}
|
||||
}
|
||||
|
||||
export const initialize = fn(
|
||||
z.object({
|
||||
sessionID: SessionID.zod,
|
||||
modelID: ModelID.zod,
|
||||
providerID: ProviderID.zod,
|
||||
messageID: MessageID.zod,
|
||||
}),
|
||||
async (input) => {
|
||||
await SessionPrompt.command({
|
||||
sessionID: input.sessionID,
|
||||
messageID: input.messageID,
|
||||
model: input.providerID + "/" + input.modelID,
|
||||
command: Command.Default.INIT,
|
||||
arguments: "",
|
||||
})
|
||||
},
|
||||
z.object({ sessionID: SessionID.zod, modelID: ModelID.zod, providerID: ProviderID.zod, messageID: MessageID.zod }),
|
||||
(input) => runPromise((svc) => svc.initialize(input)),
|
||||
)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue