/**
* @module workers/runWorker
* @description BullMQ Worker for durable run execution (INF-003).
*
* Processes jobs from the `sentri:runs` queue. Each job contains the
* serialised run parameters (project, tests, run record, options). The
* worker calls `crawlAndGenerateTests` or `runTests` depending on the
* job type, mirroring the logic previously inlined in route handlers.
*
* ### Concurrency
* Controlled by `MAX_WORKERS` env var (default 2). Each concurrent slot
* processes one run at a time — Playwright browser instances are not shared
* across jobs.
*
* ### Lifecycle
* - {@link startWorker} — Create and start the BullMQ Worker.
* - {@link stopWorker} — Gracefully close the worker (drain + disconnect).
*
* When Redis is not available, both functions are no-ops.
*/
import { createRequire } from "module";
import { formatLogLine, structuredLog } from "../utils/logFormatter.js";
import { logActivity } from "../utils/activityLogger.js";
import { classifyError } from "../utils/errorClassifier.js";
import { emitRunEvent } from "../routes/sse.js";
import * as runRepo from "../database/repositories/runRepo.js";
import * as runLogRepo from "../database/repositories/runLogRepo.js";
import * as projectRepo from "../database/repositories/projectRepo.js";
import * as testRepo from "../database/repositories/testRepo.js";
import { runTests } from "../testRunner.js";
import { crawlAndGenerateTests } from "../crawler.js";
import { fireNotifications } from "../utils/notifications.js";
import { captureProvider } from "../utils/activeRuns.js";
const _require = createRequire(import.meta.url);
let Worker = null;
if (process.env.REDIS_URL) {
try {
const bullmq = _require("bullmq");
Worker = bullmq.Worker;
} catch {
// Queue module already warned about missing bullmq
}
}
/** @type {Object|null} BullMQ Worker instance. */
let _worker = null;
/**
* Registry of active BullMQ-processed runs. The chat endpoint reads `.provider`
* to skip runs that aren't using Ollama when checking for concurrent LLM
* activity (prevents false-positive 503s when a cloud run is active and the
* user switches to Ollama).
*
* @type {Map<string, {controller: AbortController, provider: string|null}>}
*/
export const workerAbortControllers = new Map();
const MAX_WORKERS = parseInt(process.env.MAX_WORKERS, 10) || 2;
/**
* Process a single run job from the queue.
*
* @param {Object} job — BullMQ Job instance.
* @returns {Promise<void>}
*/
async function processJob(job) {
const { runId, projectId, type, options } = job.data;
structuredLog("worker.job_start", { runId, projectId, type, jobId: job.id });
const project = projectRepo.getById(projectId);
if (!project) {
console.warn(formatLogLine("warn", null,
`[worker] Project ${projectId} not found for job ${job.id} — marking run failed`));
runRepo.update(runId, {
status: "failed",
error: "Project not found",
finishedAt: new Date().toISOString(),
});
emitRunEvent(runId, "done", { status: "failed" });
return;
}
// Reconstruct the run object from the database (it was created by the
// route handler before the job was enqueued).
const run = runRepo.getById(runId);
if (!run) {
console.warn(formatLogLine("warn", null,
`[worker] Run ${runId} not found for job ${job.id}`));
return;
}
// Create an AbortController so the abort endpoint can cancel this job.
// Capture the active provider at job-start time so the chat busy guard
// can accurately filter to runs using Ollama (prevents false-positive
// "AI is busy" 503s when a cloud run is active and the user switches
// to Ollama).
const abortController = new AbortController();
workerAbortControllers.set(runId, { controller: abortController, provider: captureProvider() });
const signal = abortController.signal;
try {
if (type === "crawl") {
await crawlAndGenerateTests(project, run, {
dialsPrompt: options.dialsPrompt,
testCount: options.testCount,
explorerMode: options.explorerMode,
explorerTuning: options.explorerTuning,
signal,
});
if (run.status !== "aborted") {
logActivity({
...options.actorInfo,
type: "crawl.complete",
projectId: project.id,
projectName: project.name,
detail: `Crawl completed — ${run.pagesFound || 0} pages found`,
});
}
// FEA-001: Fire failure notifications — best-effort (consistent with
// the in-process fallback in runs.js which calls fireNotifications for
// crawls via the onComplete callback).
try { await fireNotifications(run, project); } catch { /* best-effort */ }
} else if (type === "test_run") {
// Use the snapshotted test IDs from enqueue time (options.testIds) so
// retries execute the same set of tests as the original attempt.
// Falls back to a fresh DB query for jobs enqueued before this fix.
let tests;
if (Array.isArray(options.testIds) && options.testIds.length > 0) {
const allTests = testRepo.getByProjectId(project.id);
const idSet = new Set(options.testIds);
tests = allTests.filter(t => idSet.has(t.id));
} else {
tests = testRepo.getByProjectId(project.id)
.filter(t => t.reviewStatus === "approved");
}
await runTests(project, tests, run, {
parallelWorkers: options.parallelWorkers || 1,
browser: options.browser || null, // DIF-002: resolved to chromium inside runTests when null
device: options.device || null,
locale: options.locale || null,
timezoneId: options.timezoneId || null,
geolocation: options.geolocation || null,
signal,
});
if (run.status !== "aborted") {
logActivity({
...options.actorInfo,
type: "test_run.complete",
projectId: project.id,
projectName: project.name,
detail: `Test run completed — ${run.passed || 0} passed, ${run.failed || 0} failed`,
});
}
// Fire failure notifications (FEA-001) — best-effort
try { await fireNotifications(run, project); } catch { /* best-effort */ }
}
// Check abort signal one final time before persisting. If the abort
// endpoint fired between the pipeline completing and this point, the DB
// already has status="aborted" + "skipped" entries. Writing the worker's
// stale in-memory run back would overwrite that state.
if (signal.aborted || run.status === "aborted") return;
// Persist final state
runRepo.save(run);
structuredLog("worker.job_complete", {
runId, projectId, type, jobId: job.id,
status: run.status, passed: run.passed, failed: run.failed,
});
} catch (err) {
workerAbortControllers.delete(runId);
if (err.name === "AbortError" || signal.aborted || run.status === "aborted") {
// The abort endpoint (runs.js) is the single owner of abort state:
// it writes status="aborted", adds "skipped" entries for unexecuted
// tests, and persists everything to the DB. The worker must NOT
// write its in-memory `run` back — doing so would race with the
// abort endpoint and could overwrite the "skipped" entries with the
// worker's stale results snapshot. Simply bail out.
return;
}
const maxAttempts = job.opts?.attempts || 2;
const isFinalAttempt = job.attemptsMade >= maxAttempts - 1;
const runType = type === "crawl" ? "crawl" : "run";
const classified = classifyError(err, runType);
console.error(formatLogLine("error", runId, `[worker] ${err.message}`));
if (isFinalAttempt) {
// Only persist terminal state on the final attempt to prevent
// retries from re-executing an already-failed run (the DB row
// would have status="failed" and finishedAt set, causing duplicate
// activity logs, duplicate SSE events, and status overwrites).
run.status = "failed";
run.error = classified.message;
run.errorCategory = classified.category;
run.finishedAt = new Date().toISOString();
logActivity({
...options.actorInfo,
type: `${runType === "crawl" ? "crawl" : "test_run"}.fail`,
projectId: project.id,
projectName: project.name,
detail: `${runType === "crawl" ? "Crawl" : "Test run"} failed: ${classified.message}`,
status: "failed",
});
emitRunEvent(runId, "done", { status: "failed" });
runRepo.save(run);
} else {
// Non-final attempt: reset ALL accumulated run state so the retry
// starts completely clean. Without this, the retry would reload the
// partially-populated run from the DB (via runRepo.getById at line 81)
// and runTests/crawlAndGenerateTests would append MORE results to the
// already-populated arrays, causing duplicate entries in run.results,
// inflated pass/fail counts, and incorrect totals.
run.status = "running";
run.error = null;
run.errorCategory = null;
run.finishedAt = null;
run.results = [];
run.passed = 0;
run.failed = 0;
run.pagesFound = 0;
run.logs = [];
// Delete run_logs table rows from the failed attempt so the retry
// doesn't start with stale log entries. runRepo.getById() hydrates
// run.logs from run_logs, so without this the retry would see the
// old logs concatenated with new ones.
runLogRepo.deleteByRunId(runId);
runRepo.save(run);
}
throw err; // Let BullMQ handle retry logic
} finally {
workerAbortControllers.delete(runId);
runLogRepo.evictCache(runId);
}
}
// ─── Public API ───────────────────────────────────────────────────────────────
/**
* Create and start the BullMQ Worker.
* No-op if Redis or BullMQ is not available.
*/
export function startWorker() {
if (!Worker || !process.env.REDIS_URL) return;
try {
_worker = new Worker("sentri:runs", processJob, {
connection: {
url: process.env.REDIS_URL,
maxRetriesPerRequest: null,
},
concurrency: MAX_WORKERS,
});
_worker.on("failed", (job, err) => {
console.error(formatLogLine("error", null,
`[worker] Job ${job?.id} failed: ${err.message}`));
});
_worker.on("error", (err) => {
console.error(formatLogLine("error", null,
`[worker] Worker error: ${err.message}`));
});
console.log(formatLogLine("info", null,
`[worker] BullMQ worker started (concurrency: ${MAX_WORKERS})`));
} catch (err) {
console.warn(formatLogLine("warn", null,
`[worker] Failed to start BullMQ worker: ${err.message}`));
}
}
/**
* Gracefully close the worker.
* Called from the shutdown hook in `index.js`.
*
* @returns {Promise<void>}
*/
export async function stopWorker() {
// Abort all in-flight jobs
for (const [runId, entry] of workerAbortControllers) {
entry.controller.abort();
workerAbortControllers.delete(runId);
}
if (_worker) {
try {
await _worker.close();
console.log(formatLogLine("info", null, "[worker] BullMQ worker stopped"));
} catch (err) {
console.warn(formatLogLine("warn", null,
`[worker] Worker close error: ${err.message}`));
}
_worker = null;
}
}