diff --git a/packages/opencode/package.json b/packages/opencode/package.json index c462b1761d..0463cc6d25 100644 --- a/packages/opencode/package.json +++ b/packages/opencode/package.json @@ -95,6 +95,7 @@ "@openrouter/ai-sdk-provider": "1.5.4", "@opentui/core": "0.1.87", "@opentui/solid": "0.1.87", + "@effect/platform-node": "4.0.0-beta.31", "@parcel/watcher": "2.5.1", "@pierre/diffs": "catalog:", "@solid-primitives/event-bus": "1.1.2", diff --git a/packages/opencode/src/skill/discovery.ts b/packages/opencode/src/skill/discovery.ts index c39f4b45a2..fe03dccefa 100644 --- a/packages/opencode/src/skill/discovery.ts +++ b/packages/opencode/src/skill/discovery.ts @@ -1,14 +1,12 @@ -import path from "path" -import { Effect, Layer, Schema, ServiceMap } from "effect" +import { NodeFileSystem, NodePath } from "@effect/platform-node" +import { Effect, FileSystem, Layer, Path, Schema, ServiceMap } from "effect" import { FetchHttpClient, HttpClient, HttpClientRequest, HttpClientResponse } from "effect/unstable/http" import { Global } from "../global" import { Log } from "../util/log" -import { Filesystem } from "../util/filesystem" import { withTransientReadRetry } from "@/util/effect-http-client" class IndexSkill extends Schema.Class("IndexSkill")({ name: Schema.String, - description: Schema.String, files: Schema.Array(Schema.String), }) {} @@ -16,11 +14,8 @@ class Index extends Schema.Class("Index")({ skills: Schema.Array(IndexSkill), }) {} -export namespace Discovery { - export function dir() { - return path.join(Global.Path.cache, "skills") - } -} +const skillConcurrency = 4 +const fileConcurrency = 8 export namespace DiscoveryService { export interface Service { @@ -35,113 +30,89 @@ export class DiscoveryService extends ServiceMap.Service - Effect.gen(function* () { - if (yield* Effect.promise(() => Filesystem.exists(dest))) return true + const download = Effect.fn("DiscoveryService.download")(function* (url: string, dest: string) { + if (yield* fs.exists(dest).pipe(Effect.orDie)) return true - const req = HttpClientRequest.get(url) - const response = yield* http.execute(req).pipe( - Effect.catch((err) => { + return yield* HttpClientRequest.get(url).pipe( + http.execute, + Effect.flatMap((res) => res.arrayBuffer), + Effect.flatMap((body) => + fs + .makeDirectory(path.dirname(dest), { recursive: true }) + .pipe(Effect.flatMap(() => fs.writeFile(dest, new Uint8Array(body)))), + ), + Effect.as(true), + Effect.catch((err) => + Effect.sync(() => { log.error("failed to download", { url, err }) - return Effect.succeed(null) + return false }), - ) - if (!response) return false - - const ok = yield* HttpClientResponse.filterStatusOk(response).pipe( - Effect.catch(() => { - log.error("failed to download", { url, status: response.status }) - return Effect.succeed(null) - }), - ) - if (!ok) return false - - const body = yield* ok.arrayBuffer.pipe( - Effect.catch((err) => { - log.error("failed to read download body", { url, err }) - return Effect.succeed(null) - }), - ) - if (!body) return false - - yield* Effect.promise(() => Filesystem.write(dest, Buffer.from(body))) - return true - }), - ) + ), + ) + }) const pull: DiscoveryService.Service["pull"] = Effect.fn("DiscoveryService.pull")(function* (url: string) { const base = url.endsWith("/") ? url : `${url}/` const index = new URL("index.json", base).href - const cache = Discovery.dir() const host = base.slice(0, -1) log.info("fetching index", { url: index }) - const req = HttpClientRequest.get(index).pipe(HttpClientRequest.acceptJson) - const response = yield* http.execute(req).pipe( - Effect.catch((err) => { - log.error("failed to fetch index", { url: index, err }) - return Effect.succeed(null) - }), + const data = yield* HttpClientRequest.get(index).pipe( + HttpClientRequest.acceptJson, + http.execute, + Effect.flatMap(HttpClientResponse.schemaBodyJson(Index)), + Effect.catch((err) => + Effect.sync(() => { + log.error("failed to fetch index", { url: index, err }) + return null + }), + ), ) - if (!response) return Array() - const ok = yield* HttpClientResponse.filterStatusOk(response).pipe( - Effect.catch(() => { - log.error("failed to fetch index", { url: index, status: response.status }) - return Effect.succeed(null) - }), - ) - if (!ok) return Array() - - const data = yield* HttpClientResponse.schemaBodyJson(Index)(ok).pipe( - Effect.catch((err) => { - log.error("failed to parse index", { url: index, err }) - return Effect.succeed(null) - }), - ) - if (!data) { - log.warn("invalid index format", { url: index }) - return Array() - } + if (!data) return [] const list = data.skills.filter((skill) => { - if (!skill.name || !Array.isArray(skill.files)) { - log.warn("invalid skill entry", { url: index, skill }) + if (!skill.files.includes("SKILL.md")) { + log.warn("skill entry missing SKILL.md", { url: index, skill: skill.name }) return false } return true }) - const dirs = yield* Effect.all( - list.map((skill) => + const dirs = yield* Effect.forEach( + list, + (skill) => Effect.gen(function* () { const root = path.join(cache, skill.name) - yield* Effect.all( - skill.files.map((file) => { - const link = new URL(file, `${host}/${skill.name}/`).href - const dest = path.join(root, file) - return get(link, dest) - }), - { concurrency: "unbounded" }, + yield* Effect.forEach( + skill.files, + (file) => download(new URL(file, `${host}/${skill.name}/`).href, path.join(root, file)), + { concurrency: fileConcurrency }, ) const md = path.join(root, "SKILL.md") - return (yield* Effect.promise(() => Filesystem.exists(md))) ? root : null + return (yield* fs.exists(md).pipe(Effect.orDie)) ? root : null }), - ), - { concurrency: "unbounded" }, + { concurrency: skillConcurrency }, ) - return dirs.filter((dir): dir is string => Boolean(dir)) + return dirs.filter((dir): dir is string => dir !== null) }) return DiscoveryService.of({ pull }) }), ) - static readonly defaultLayer = DiscoveryService.layer.pipe(Layer.provide(FetchHttpClient.layer)) + static readonly defaultLayer = DiscoveryService.layer.pipe( + Layer.provide(FetchHttpClient.layer), + Layer.provide(NodeFileSystem.layer), + Layer.provide(NodePath.layer), + ) } diff --git a/packages/opencode/test/skill/discovery.test.ts b/packages/opencode/test/skill/discovery.test.ts index cc2937e3a1..5cbb3ada09 100644 --- a/packages/opencode/test/skill/discovery.test.ts +++ b/packages/opencode/test/skill/discovery.test.ts @@ -1,6 +1,7 @@ import { describe, test, expect, beforeAll, afterAll } from "bun:test" import { Effect } from "effect" -import { Discovery, DiscoveryService } from "../../src/skill/discovery" +import { DiscoveryService } from "../../src/skill/discovery" +import { Global } from "../../src/global" import { Filesystem } from "../../src/util/filesystem" import { rm } from "fs/promises" import path from "path" @@ -10,9 +11,10 @@ let server: ReturnType let downloadCount = 0 const fixturePath = path.join(import.meta.dir, "../fixture/skills") +const cacheDir = path.join(Global.Path.cache, "skills") beforeAll(async () => { - await rm(Discovery.dir(), { recursive: true, force: true }) + await rm(cacheDir, { recursive: true, force: true }) server = Bun.serve({ port: 0, @@ -41,7 +43,7 @@ beforeAll(async () => { afterAll(async () => { server?.stop() - await rm(Discovery.dir(), { recursive: true, force: true }) + await rm(cacheDir, { recursive: true, force: true }) }) describe("Discovery.pull", () => { @@ -52,7 +54,7 @@ describe("Discovery.pull", () => { const dirs = await pull(CLOUDFLARE_SKILLS_URL) expect(dirs.length).toBeGreaterThan(0) for (const dir of dirs) { - expect(dir).toStartWith(Discovery.dir()) + expect(dir).toStartWith(cacheDir) const md = path.join(dir, "SKILL.md") expect(await Filesystem.exists(md)).toBe(true) } @@ -94,7 +96,7 @@ describe("Discovery.pull", () => { test("caches downloaded files on second pull", async () => { // clear dir and downloadCount - await rm(Discovery.dir(), { recursive: true, force: true }) + await rm(cacheDir, { recursive: true, force: true }) downloadCount = 0 // first pull to populate cache