/**
* @module utils/runWithAbort
* @description Abortable run helper and abort controller registry.
*
* Encapsulates the `AbortController` lifecycle, success/failure logging, and
* error handling that every async pipeline route (crawl, run, generate) repeats.
*
* ### Exports
* - {@link runWithAbort} — Execute an async function with abort support.
* - {@link runAbortControllers} — `Map<runId, { controller: AbortController, run: Object, provider: string|null }>` registry.
*/
import { emitRunEvent } from "../routes/sse.js";
import { logActivity } from "./activityLogger.js";
import * as runRepo from "../database/repositories/runRepo.js";
import * as runLogRepo from "../database/repositories/runLogRepo.js";
import { classifyError } from "./errorClassifier.js";
import { formatLogLine } from "./logFormatter.js";
import { captureProvider } from "./activeRuns.js";
// ─── Abort registry: runId → AbortController ──────────────────────────────────
// Allows in-progress crawl / generate / test_run operations to be cancelled.
// Maps runId → { controller: AbortController, run: Object, provider: string|null }
// Storing the run reference lets the abort endpoint mutate the same
// in-memory object the pipeline holds, preventing status overwrites.
// Storing the provider lets the chat endpoint skip runs that aren't using
// Ollama when checking for concurrent LLM activity.
export const runAbortControllers = new Map();
/**
* @param {Object} [opts.actorInfo] - { userId, userName } from the triggering request.
* Spread into the failure logActivity call so the
* audit trail records who started the run.
* @param {Function} [opts.onComplete] - Called after the run reaches any terminal state
* (completed, failed, or aborted). Receives the
* run object so callers can inspect final status.
* Errors thrown by onComplete are silently caught
* to avoid masking the original run outcome.
*/
export function runWithAbort(runId, run, asyncFn, { onSuccess, onFailActivity, actorInfo, onComplete }) {
const abortController = new AbortController();
// Capture the provider at start time so the chat busy guard can accurately
// filter to runs actually using Ollama. If the user switches providers
// mid-run, the captured value stays the same — matching the provider the
// pipeline's LLM calls are pinned to via the sticky fallback mechanism.
runAbortControllers.set(runId, { controller: abortController, run, provider: captureProvider() });
asyncFn(abortController.signal)
.then((result) => {
runAbortControllers.delete(runId);
if (run.status !== "aborted") {
onSuccess?.(result);
}
// Persist completed run to SQLite (results, pass/fail counts, duration,
// feedback loop improvements).
runRepo.save(run);
})
.catch((err) => {
runAbortControllers.delete(runId);
if (err.name === "AbortError" || run.status === "aborted") {
// Flush any results accumulated before the abort so they aren't lost.
// The abort endpoint already set status="aborted" via runRepo.update(),
// but the in-memory run object may have results/logs not yet persisted.
runRepo.save(run);
return;
}
const runType = run.type === "crawl" ? "crawl" : "run";
const classified = classifyError(err, runType);
console.error(formatLogLine("error", runId, `[${runType}] ${err.message}`));
run.status = "failed";
run.error = classified.message;
run.errorCategory = classified.category;
run.finishedAt = new Date().toISOString();
logActivity({ ...onFailActivity(err), ...(actorInfo || {}), status: "failed" });
emitRunEvent(runId, "done", { status: "failed" });
runRepo.save(run); // persist failed status to SQLite
})
.finally(async () => {
// Fire onComplete for any terminal state (completed, failed, aborted).
// Errors (sync or async) are silently caught so a failing callback
// never masks the original run outcome or breaks the pipeline cleanup.
try { await onComplete?.(run); } catch { /* best-effort */ }
// Evict the run's seq counter from the runLogRepo cache — the run is
// finished and will never append more log lines, so keeping the entry
// would be an unbounded memory leak on long-running servers.
runLogRepo.evictCache(runId);
});
}