Source: workers/runWorker.js

/**
 * @module workers/runWorker
 * @description BullMQ Worker for durable run execution (INF-003).
 *
 * Processes jobs from the `sentri:runs` queue.  Each job contains the
 * serialised run parameters (project, tests, run record, options).  The
 * worker calls `crawlAndGenerateTests` or `runTests` depending on the
 * job type, mirroring the logic previously inlined in route handlers.
 *
 * ### Concurrency
 * Controlled by `MAX_WORKERS` env var (default 2).  Each concurrent slot
 * processes one run at a time — Playwright browser instances are not shared
 * across jobs.
 *
 * ### Lifecycle
 * - {@link startWorker} — Create and start the BullMQ Worker.
 * - {@link stopWorker}  — Gracefully close the worker (drain + disconnect).
 *
 * When Redis is not available, both functions are no-ops.
 */

import { createRequire } from "module";
import { formatLogLine, structuredLog } from "../utils/logFormatter.js";
import { logActivity } from "../utils/activityLogger.js";
import { classifyError } from "../utils/errorClassifier.js";
import { emitRunEvent } from "../routes/sse.js";
import * as runRepo from "../database/repositories/runRepo.js";
import * as runLogRepo from "../database/repositories/runLogRepo.js";
import * as projectRepo from "../database/repositories/projectRepo.js";
import * as environmentRepo from "../database/repositories/environmentRepo.js";
import * as testRepo from "../database/repositories/testRepo.js";
import { runTests } from "../testRunner.js";
import { crawlAndGenerateTests } from "../crawler.js";
import { fireNotifications } from "../utils/notifications.js";
import { captureProvider } from "../utils/activeRuns.js";
import { isNonExecutedSkip } from "../utils/skipReasons.js";
import { filterShardRetrySurvivors } from "../utils/shardRetryFilter.js"; // CAP-002 Phase 2 — shared survivor filter, mirrors runRepo.purgeShardResults.
import { envScopedProject } from "../utils/envScope.js"; // DIF-012 — shared helper, see module doc.
import { subscribeToRunAborts, publishRunAbort } from "../utils/runAbortChannel.js"; // CAP-002 Phase 2 — cross-process abort pub/sub.
import { runFeedbackLoop } from "../runner/feedbackIntegration.js"; // CAP-002 Phase 2 — last-shard finalizer runs the AI feedback loop exactly once.
import { finalizeRunIfNotAborted } from "../utils/abortHelper.js";
import { __evaluateQualityGatesForTest, __evaluateWebVitalsBudgetsForTest } from "../testRunner.js";
import { clusterFailures } from "../pipeline/failureClusterer.js"; // AUTO-010 — sharded-run parity with single-process tail in testRunner.js
import { trackTelemetry } from "../utils/telemetry.js";
import { safeFetch } from "../utils/ssrfGuard.js"; // CAP-002 Phase 2 — trigger-path callbackUrl POST from sharded finalizer.

const _require = createRequire(import.meta.url);

let Worker = null;
if (process.env.REDIS_URL) {
  try {
    const bullmq = _require("bullmq");
    Worker = bullmq.Worker;
  } catch {
    // Queue module already warned about missing bullmq
  }
}

/** @type {Object|null} BullMQ Worker instance. */
let _worker = null;

/**
 * Registry of active BullMQ-processed runs. The chat endpoint reads `.provider`
 * to skip runs that aren't using Ollama when checking for concurrent LLM
 * activity (prevents false-positive 503s when a cloud run is active and the
 * user switches to Ollama).
 *
 * ### Key scheme (CAP-002 Phase 2 — Prerequisite #4)
 *
 * The map is keyed by the BullMQ jobId, NOT the bare runId, so multiple
 * shards of the same run executing on the same replica don't collide on a
 * single map slot. The jobId scheme is:
 *
 *   - **Legacy / single-shard runs**: `jobId === runId`. Key is just the
 *     runId — bit-for-bit identical to the pre-Phase-2 behaviour.
 *   - **Shard-mode runs** (coordinator PR): `jobId === ${runId}:s${shardIndex}`.
 *     Each shard gets its own map entry; entries store `runId` + `shardIndex`
 *     so reverse lookups (e.g. the cross-process abort handler) can fan out
 *     to every shard of a given parent run.
 *
 * Use {@link workerAbortKey} when registering and deleting; use
 * {@link abortAllShardsForRun} to fan out an abort to every shard of a parent
 * run (the abort route's local-fast-path + the run-abort pub/sub subscriber
 * both go through this helper).
 *
 * @type {Map<string, {controller: AbortController, provider: string|null, runId: string, shardIndex: number|null}>}
 */
export const workerAbortControllers = new Map();

/**
 * Compute the registry key for a (runId, shardIndex) pair. Mirrors the
 * BullMQ `jobId` shape so the map key, the queue jobId, and the trace
 * artifact path (Prerequisite #2) all derive from the same identifier.
 *
 * @param {string}      runId
 * @param {number|null} shardIndex - 0-based, or `null` for legacy single-shard runs.
 * @returns {string}
 */
export function workerAbortKey(runId, shardIndex) {
  if (shardIndex == null) return runId;
  return `${runId}:s${shardIndex}`;
}

/**
 * Iterate every registry entry belonging to a parent run, regardless of
 * shard. Used by:
 *
 *   - The abort route's local-fast-path so a user-driven abort cancels
 *     every shard's controller on this replica before publishing to the
 *     cross-replica pub/sub channel.
 *   - The run-abort pub/sub subscriber so a cross-replica abort signal
 *     reaches every matching local shard.
 *
 * Visits both shard-keyed entries (`${runId}:s${i}`) and the legacy bare-
 * runId entry. Safe to call concurrently with `processJob` — iteration
 * uses a snapshot so handlers can mutate the map (delete) without
 * affecting the loop.
 *
 * @param {string}   runId
 * @param {Function} fn - Callback `(entry, key) => void`.
 * @returns {number} Number of entries visited.
 */
export function forEachShardEntry(runId, fn) {
  let visited = 0;
  // Snapshot to allow mutation during iteration (the typical use is
  // `entry.controller.abort()` followed by `workerAbortControllers.delete(key)`).
  const snapshot = [];
  for (const [key, entry] of workerAbortControllers) {
    if (entry?.runId === runId) snapshot.push([key, entry]);
  }
  for (const [key, entry] of snapshot) {
    try { fn(entry, key); visited++; } catch { /* fail-open per-entry */ }
  }
  return visited;
}

/**
 * Abort every shard controller for a parent run. Convenience wrapper around
 * {@link forEachShardEntry} that captures the canonical "cancel + delete"
 * sequence used by both the user-driven abort route and the cross-replica
 * pub/sub subscriber.
 *
 * @param {string} runId
 * @returns {number} Number of shard controllers aborted.
 */
export function abortAllShardsForRun(runId) {
  return forEachShardEntry(runId, (entry, key) => {
    try { entry.controller.abort(); } catch { /* already aborted */ }
    workerAbortControllers.delete(key);
  });
}

const WORKER_CONCURRENCY = parseInt(process.env.WORKER_CONCURRENCY || process.env.MAX_WORKERS, 10) || 2;

/**
 * Process a single run job from the queue.
 *
 * @param {Object} job — BullMQ Job instance.
 * @returns {Promise<void>}
 */
async function processJob(job) {
  // CAP-002 Phase 2 — `shardIndex` is `null` for legacy single-process runs
  // and for every job that doesn't go through the BullMQ shard fan-out. The
  // retry-reset below uses it to scope the results-array wipe to *this*
  // shard so a sibling shard's already-completed results aren't erased on
  // retry. When `shardIndex` is null the old wipe-all behaviour is preserved
  // verbatim — zero regression for `shardCount === 1`.
  //
  // `shardCount` is the parent run's total shard count, used at the
  // finalization handoff: the boundary-crossing shard (whose
  // `incrementShardsCompleted` UPDATE returns 1 AND lands the counter at
  // `shardCount`) owns the post-run feedback loop + `done` event.
  const { runId, projectId, type, options, shardIndex = null, shardCount: jobShardCount = null } = job.data;

  structuredLog("worker.job_start", { runId, projectId, type, jobId: job.id });

  const project = projectRepo.getById(projectId);
  if (!project) {
    console.warn(formatLogLine("warn", null,
      `[worker] Project ${projectId} not found for job ${job.id} — marking run failed`));
    runRepo.update(runId, {
      status: "failed",
      error: "Project not found",
      finishedAt: new Date().toISOString(),
    });
    emitRunEvent(runId, "done", { status: "failed" });
    return;
  }

  // Reconstruct the run object from the database (it was created by the
  // route handler before the job was enqueued).
  const run = runRepo.getById(runId);
  if (!run) {
    console.warn(formatLogLine("warn", null,
      `[worker] Run ${runId} not found for job ${job.id}`));
    return;
  }

  // CAP-002 Phase 2 — defense-in-depth terminal-status guard. If
  // `finalizeShardedRun` throws after `markRunCompletedFirstWriterWins`
  // succeeds (e.g. a DB error during `logActivity` / `fireNotifications` /
  // `safeFetch`), BullMQ would retry the job. Without this guard the retry
  // would re-execute the shard against an already-terminal parent run,
  // double-counting stats via `incrementRunStats`. The window is extremely
  // narrow (all post-persist code is best-effort try/catch) but the guard
  // is cheap and correct — same predicate the first-writer-wins primitives
  // use, just enforced at job-start instead of UPDATE-time.
  if (run.status === "completed" || run.status === "failed" || run.status === "aborted") {
    structuredLog("worker.job_skipped_terminal", {
      runId, projectId, type, jobId: job.id, status: run.status,
    });
    return;
  }

  // DIF-012: resolve the per-run environment override (if any) from the
  // persisted run record. The route handler validated the envId at enqueue
  // time, so we only need to look it up here. A row that was deleted
  // between enqueue and worker pickup yields `environment === undefined`,
  // which `envScopedProject` treats as "no override" — the run falls back
  // to project.url, matching the behaviour the caller would have got
  // before DIF-012.
  const environment = run.environmentId ? environmentRepo.getById(run.environmentId) : null;
  const scopedProject = envScopedProject(project, environment);

  // Create an AbortController so the abort endpoint can cancel this job.
  // Capture the active provider at job-start time so the chat busy guard
  // can accurately filter to runs using Ollama (prevents false-positive
  // "AI is busy" 503s when a cloud run is active and the user switches
  // to Ollama).
  const abortController = new AbortController();
  // CAP-002 Phase 2 (Prerequisite #4) — key the registry by jobId so
  // multiple shards of the same run on the same replica don't collide.
  // Legacy single-shard runs (`shardIndex == null`) key by bare runId,
  // preserving the prior shape bit-for-bit. The entry stores `runId` so
  // the parent-keyed fan-out helpers (`forEachShardEntry`,
  // `abortAllShardsForRun`) can reverse-look-up by parent run.
  const abortKey = workerAbortKey(runId, shardIndex);
  workerAbortControllers.set(abortKey, {
    controller: abortController,
    provider: captureProvider(),
    runId,
    shardIndex,
  });
  const signal = abortController.signal;

  try {
    if (type === "crawl") {
      await crawlAndGenerateTests(scopedProject, run, {
        dialsPrompt: options.dialsPrompt,
        testCount: options.testCount,
        explorerMode: options.explorerMode,
        explorerTuning: options.explorerTuning,
        signal,
      });

      if (run.status !== "aborted") {
        logActivity({
          ...options.actorInfo,
          type: "crawl.complete",
          projectId: project.id,
          projectName: project.name,
          detail: `Crawl completed — ${run.pagesFound || 0} pages found`,
        });
      }

      // FEA-001: Fire failure notifications — best-effort (consistent with
      // the in-process fallback in runs.js which calls fireNotifications for
      // crawls via the onComplete callback).
      try { await fireNotifications(run, project); } catch { /* best-effort */ }
    } else if (type === "test_run") {
      // Use the snapshotted test IDs from enqueue time (options.testIds) so
      // retries execute the same set of tests as the original attempt.
      // Falls back to a fresh DB query for jobs enqueued before this fix.
      //
      // AUTO-001: the order of `options.testIds` is the risk-ranked + budget-
      // capped dispatch sequence assembled by the route layer (see
      // routes/runs.js:202). The previous `allTests.filter(idSet.has)`
      // implementation silently re-sorted tests into DB order — defeating
      // risk-based ordering for every BullMQ-processed run. Re-build the
      // array in the explicit testIds order; drop any ids that no longer
      // resolve (test deleted between enqueue and worker pickup).
      let tests;
      if (Array.isArray(options.testIds) && options.testIds.length > 0) {
        const allTests = testRepo.getByProjectId(project.id);
        const byId = new Map(allTests.map(t => [t.id, t]));
        tests = options.testIds.map(id => byId.get(id)).filter(Boolean);
      } else {
        tests = testRepo.getByProjectId(project.id)
          .filter(t => t.reviewStatus === "approved");
      }

      await runTests(scopedProject, tests, run, {
        parallelWorkers: options.parallelWorkers || 1,
        browser: options.browser || null,         // DIF-002: resolved to chromium inside runTests when null
        device: options.device || null,
        locale: options.locale || null,
        timezoneId: options.timezoneId || null,
        geolocation: options.geolocation || null,
        networkCondition: options.networkCondition || "fast", // AUTO-006
        signal,
      });

      if (run.status !== "aborted") {
        logActivity({
          ...options.actorInfo,
          type: "test_run.complete",
          projectId: project.id,
          projectName: project.name,
          detail: `Test run completed — ${run.passed || 0} passed, ${run.failed || 0} failed`,
        });
      }

      // Fire failure notifications (FEA-001) — best-effort
      try { await fireNotifications(run, project); } catch { /* best-effort */ }
    } else if (type === "test_run_shard") {
      // CAP-002 Phase 2 — execute one shard of a sharded run. The coordinator
      // (routes/runs.js) pre-partitioned testIds at enqueue time; we never
      // re-derive the split. `runTests` returns this shard's stats delta;
      // we compose them onto the parent `runs` row atomically and hand off
      // to the boundary-crossing shard for finalization.
      const allTests = testRepo.getByProjectId(project.id);
      const byId = new Map(allTests.map(t => [t.id, t]));
      const sliceTests = (options.testIds || []).map((id) => byId.get(id)).filter(Boolean);

      const shardStats = await runTests(scopedProject, sliceTests, run, {
        parallelWorkers: options.parallelWorkers || 1,
        browser: options.browser || null,
        device: options.device || null,
        locale: options.locale || null,
        timezoneId: options.timezoneId || null,
        geolocation: options.geolocation || null,
        networkCondition: options.networkCondition || "fast",
        signal,
        shardIndex,
      });

      if (signal.aborted || run.status === "aborted") {
        // Abort owns terminal-state writes; don't touch stats or counter.
        workerAbortControllers.delete(abortKey);
        return;
      }

      // Compose this shard's deltas onto the parent run atomically. Sibling
      // shards may be running this same UPDATE concurrently — the SQL row
      // lock is the linearization point.
      if (shardStats && (shardStats.passed || shardStats.failed || shardStats.totalDelta)) {
        runRepo.incrementRunStats(runId, {
          passedDelta: shardStats.passed || 0,
          failedDelta: shardStats.failed || 0,
          totalDelta: shardStats.totalDelta || 0,
        });
      }

      // CAP-002 Phase 2 — finalization handoff. `incrementShardsCompleted`
      // now returns `{ advanced, newValue }` from a single transaction
      // (UPDATE + SELECT under the same row lock) so the boundary check
      // is atomic — no interleaving window where a sibling shard on
      // another Postgres process could read the same post-increment value
      // and fire a duplicate finalizer. The caller checks
      // `newValue === totalShards` directly; no separate `getById` needed.
      const { advanced, newValue } = runRepo.incrementShardsCompleted(runId);
      const totalShards = jobShardCount || 1;
      if (advanced === 1 && newValue >= totalShards) {
        const fresh = runRepo.getById(runId);
        if (fresh) {
          // Pass the full `options` payload so the finalizer can access the
          // CI/CD callback URL (and any future shard-shared knobs) without
          // a second job-data round-trip. Every shard carries the same
          // `callbackUrl`; the finalizer is the single firer thanks to the
          // exclusivity guarantee on `incrementShardsCompleted`.
          await finalizeShardedRun(scopedProject, fresh, options || {});
        }
      }

      workerAbortControllers.delete(abortKey);
      return;  // bypass the tail `runRepo.save(run)` — shard mode owns its writes
    }

    // Check abort signal one final time before persisting.  If the abort
    // endpoint fired between the pipeline completing and this point, the DB
    // already has status="aborted" + "skipped" entries.  Writing the worker's
    // stale in-memory run back would overwrite that state.
    if (signal.aborted || run.status === "aborted") return;

    // Persist final state
    runRepo.save(run);

    structuredLog("worker.job_complete", {
      runId, projectId, type, jobId: job.id,
      status: run.status, passed: run.passed, failed: run.failed,
    });
  } catch (err) {
    // Use the shard-aware key so we delete *this* shard's entry, not the
    // bare-runId entry that might belong to a sibling shard's controller.
    workerAbortControllers.delete(abortKey);

    if (err.name === "AbortError" || signal.aborted || run.status === "aborted") {
      // The abort endpoint (runs.js) is the single owner of abort state:
      // it writes status="aborted", adds "skipped" entries for unexecuted
      // tests, and persists everything to the DB.  The worker must NOT
      // write its in-memory `run` back — doing so would race with the
      // abort endpoint and could overwrite the "skipped" entries with the
      // worker's stale results snapshot.  Simply bail out.
      return;
    }

    const maxAttempts = job.opts?.attempts || 2;
    const isFinalAttempt = job.attemptsMade >= maxAttempts - 1;

    const runType = type === "crawl" ? "crawl" : "run";
    const classified = classifyError(err, runType);
    console.error(formatLogLine("error", runId, `[worker] ${err.message}`));

    if (isFinalAttempt) {
      // Only persist terminal state on the final attempt to prevent
      // retries from re-executing an already-failed run (the DB row
      // would have status="failed" and finishedAt set, causing duplicate
      // activity logs, duplicate SSE events, and status overwrites).
      //
      // CAP-002 Phase 2 (Prerequisite #6): for shard-mode runs, use the
      // atomic first-writer-wins UPDATE so the first crashing shard owns
      // the failure reason. A full `runRepo.save(run)` here would
      // overwrite a sibling shard's earlier classified message with this
      // shard's (potentially less-informative) one — confusing the audit
      // trail and the SSE `done` event. The first-writer-wins predicate
      // (`WHERE id = ? AND status = 'running'`) makes the second-place
      // shard a clean no-op. After persisting the terminal state we
      // publish to `sentri:run-abort` so sibling shards in *other*
      // replicas receive the cancel signal and drain — same channel the
      // user-driven abort route uses (Prerequisite #5). For legacy
      // single-shard runs (`shardIndex == null`) the prior `save(run)`
      // path is preserved bit-for-bit so the failure-status semantics
      // don't shift for non-shard callers.
      const isShardMode = shardIndex != null;
      let firstWriter = true;
      if (isShardMode) {
        firstWriter = runRepo.markRunFailedFirstWriterWins(runId, {
          error: classified.message,
          errorCategory: classified.category,
        });
        // Re-hydrate the in-memory run from whatever the DB now holds so
        // the rest of this block (and any downstream `runRepo.save(run)`
        // we might call later) doesn't overwrite the first writer's
        // values with stale snapshot data.
        run.status = "failed";
        run.error = classified.message;
        run.errorCategory = classified.category;
        run.finishedAt = new Date().toISOString();
      } else {
        run.status = "failed";
        run.error = classified.message;
        run.errorCategory = classified.category;
        run.finishedAt = new Date().toISOString();
        runRepo.save(run);
      }

      // Activity log + SSE `done` event fire only when this caller was
      // the first writer (shard mode) or the sole writer (legacy mode).
      // Second-place shards skip emitting so we don't surface duplicate
      // notifications for the same logical failure.
      if (firstWriter) {
        logActivity({
          ...options.actorInfo,
          type: `${runType === "crawl" ? "crawl" : "test_run"}.fail`,
          projectId: project.id,
          projectName: project.name,
          detail: `${runType === "crawl" ? "Crawl" : "Test run"} failed: ${classified.message}`,
          status: "failed",
        });
        emitRunEvent(runId, "done", { status: "failed" });
      }

      // Whether or not this shard was the first writer, publish the abort
      // signal so sibling shards in other replicas stop wasting compute.
      // Same channel as the user-driven abort. `publishRunAbort` is
      // best-effort and a no-op when Redis is unavailable.
      if (isShardMode) {
        publishRunAbort(runId).catch(() => { /* best-effort */ });
      }
    } else {
      // Non-final attempt: reset ALL accumulated run state so the retry
      // starts completely clean.  Without this, the retry would reload the
      // partially-populated run from the DB (via runRepo.getById at line 81)
      // and runTests/crawlAndGenerateTests would append MORE results to the
      // already-populated arrays, causing duplicate entries in run.results,
      // inflated pass/fail counts, and incorrect totals.
      run.status = "running";
      run.error = null;
      run.errorCategory = null;
      run.finishedAt = null;
      // AUTO-001 / AUTO-004: preserve the route-layer's pre-seeded skip
      // markers across retries — `over_budget` AND `skipped_no_impact` both
      // represent a final dispatch decision, not a transient execution
      // state. Dropping either would silently re-classify those tests on
      // the retry (or worse, make them vanish from `run.results` entirely,
      // breaking the pass-rate denominator and the "every approved test
      // has a resolution" invariant). Routed through `isNonExecutedSkip`
      // (`backend/src/utils/skipReasons.js`) so adding a future skip kind
      // doesn't require a third site edit.
      //
      // CAP-002 Phase 2 (Prerequisite #3): when this job is one shard of
      // a multi-shard run, only wipe results belonging to *this* shard.
      // Sibling shards that already finished have their results stamped
      // with `_shardIndex !== thisShardIndex` (see `testRunner.js`
      // `processResult` — every persisted result row carries its shard
      // index) and must survive the retry — otherwise a single shard's
      // BullMQ retry would erase three other shards' worth of completed
      // work. For legacy single-process runs (`shardIndex == null`) the
      // filter degrades to the prior wipe-all behaviour: keep only the
      // non-executed skips, drop everything else — bit-for-bit identical
      // to the pre-CAP-002 code path. Re-derive `passed`/`failed` from
      // the surviving rows instead of resetting to 0 so sibling-shard
      // pass/fail counts are preserved through the retry.
      //
      // Critical lost-write fix: in shard mode we use the atomic
      // `purgeShardResults` primitive instead of an in-memory filter +
      // `runRepo.save(run)`. The previous approach read `run.results`
      // from the job-start DB snapshot (line ~189), filtered in JS,
      // then `save(run)` overwrote the live column — silently
      // truncating any sibling-shard rows that landed via
      // `appendRunResults` between snapshot and save. The primitive
      // performs a transactional read-modify-write on the live column
      // under a `FOR UPDATE` lock (Postgres) / BEGIN IMMEDIATE serial-
      // isation (SQLite), so concurrent sibling appends are honoured.
      const isShardMode = shardIndex != null;
      if (isShardMode) {
        // CAP-002 Phase 2 — Gate the entire retry-prep sequence on the run
        // still being in `running` state. A sibling shard may have already
        // transitioned the row to a terminal state (`failed` via
        // `markRunFailedFirstWriterWins`, `aborted` via the abort route,
        // or — extremely unlikely but defended against — `completed` via
        // a late finalizer). If we proceeded blindly, the chain below
        // would:
        //   - `purgeShardResults` would wipe this shard's already-recorded
        //     results from a terminal run (harmless data loss, but pointless).
        //   - `runLogRepo.deleteByRunId` would erase the audit trail for
        //     a user-initiated abort (the abort route may have logged the
        //     abort reason before the worker reached this catch block).
        //   - The old `runRepo.update(... status: "running", error: null,
        //     finishedAt: null)` would un-terminalise the row, silently
        //     overwriting the sibling shard's classified failure reason
        //     OR the user's abort with a fresh `running` status — the
        //     run would then "resurrect" and execute against a parent
        //     record that the rest of the system already considers
        //     terminal (notifications fired, SSE `done` emitted, GitHub
        //     Check concluded).
        // The first-writer-wins primitive uses `WHERE status = 'running'`
        // so it's atomic — no TOCTOU race between this check and the write.
        // When it returns false, BullMQ still gets the `throw err` below
        // and will retry the job, but the next attempt's `runRepo.getById`
        // at job-start will see the terminal status and either:
        //   (a) skip execution if we add a terminal-status guard there
        //       (out of scope for this fix), or
        //   (b) the next retry's catch block hits this same guard and
        //       no-ops again — eventually exhausting attempts and the
        //       BullMQ "failed" event fires.
        const gated = runRepo.resetRunForRetryIfStillRunning(runId);
        if (!gated) {
          structuredLog("run.retry_skipped_terminal", {
            runId, shardIndex,
            reason: "sibling shard transitioned run to terminal state",
          });
          throw err; // Surface to BullMQ — it will retry, next attempt no-ops at job-start.
        }
        runRepo.purgeShardResults(runId, shardIndex, isNonExecutedSkip);
        // DO NOT overwrite `passed` / `failed` columns here. Those columns
        // are composed atomically by each shard's `incrementRunStats` call
        // after `runTests` returns. If a sibling shard is still executing,
        // its partial results are already in the `results` JSON column (via
        // `appendRunResults`) but its `incrementRunStats` hasn't fired yet.
        // Re-deriving passed/failed from the JSON and writing them as
        // absolute values would inflate the columns — then the sibling's
        // `incrementRunStats(passedDelta)` would add its delta on top,
        // double-counting. Leaving the columns untouched means the retrying
        // shard's own `incrementRunStats` call (after re-execution) adds
        // only its fresh delta, which is correct by construction.
        //
        // run.results in-memory is now stale (the live DB column is
        // canonical). Don't write it back via save() — instead persist
        // only the run-level non-results, non-stats fields the retry needs.
        run.pagesFound = 0;
        run.logs = [];
        // CAP-002 Phase 2 — DO NOT call runLogRepo.deleteByRunId in shard
        // mode. The `run_logs` table is parent-keyed (`WHERE runId = ?`
        // — see `backend/src/database/repositories/runLogRepo.js`) with
        // no `shardIndex` column, so a parent-scoped DELETE would wipe
        // every sibling shard's logs alongside this shard's pre-failure
        // entries — including sibling shards that have already completed
        // successfully OR are still actively writing log lines via
        // `appendLog`. Trading a duplicated log-prefix from the retried
        // shard (minor UI oddity) for preserving sibling-shard audit
        // trails (real data integrity) is the correct call. Legacy
        // single-shard runs (`else` branch below) keep the delete because
        // there are no siblings to protect — bit-for-bit unchanged.
      } else {
        // Legacy single-shard / non-shard path — bit-for-bit unchanged.
        // `shardIndex == null` → filter degrades to "keep only non-executed
        // skips", matching the pre-CAP-002 wipe-all behaviour. Shared with
        // `runRepo.purgeShardResults` so the survivor contract is in one place.
        run.results = filterShardRetrySurvivors(run.results, null, isNonExecutedSkip);
        run.passed = 0;
        run.failed = 0;
        run.pagesFound = 0;
        run.logs = [];
        // Delete run_logs table rows from the failed attempt so the retry
        // doesn't start with stale log entries.  runRepo.getById() hydrates
        // run.logs from run_logs, so without this the retry would see the
        // old logs concatenated with new ones.
        runLogRepo.deleteByRunId(runId);
        runRepo.save(run);
      }
    }

    throw err; // Let BullMQ handle retry logic
  } finally {
    workerAbortControllers.delete(abortKey);
    runLogRepo.evictCache(runId);
  }
}

/**
 * CAP-002 Phase 2 — Finalize a sharded run after every shard's results
 * have landed. Called exactly once per run: the shard whose
 * `incrementShardsCompleted` UPDATE crossed the boundary at `shardCount`
 * is the finalizer; all other shards return without invoking this.
 *
 * The "exactly once" guarantee comes from the SQL predicate on
 * `incrementShardsCompleted` (Prerequisite #1) — it returns `1` only for
 * the single UPDATE that landed the counter at the cap. No JS-side mutex
 * needed.
 *
 * Owns (mirrors the single-shard tail of `runTests`):
 *   - gateResult / webVitalsResult evaluation against the full results set
 *     (every shard has flushed via `appendRunResults` — DB is the source
 *     of truth, the local `run` here is the fresh row this worker re-read)
 *   - retryCount / failedAfterRetry aggregation
 *   - one `runFeedbackLoop` invocation (single AI-call pass per run)
 *   - `finalizeRunIfNotAborted` → status = "completed"
 *   - duration computation (now - startedAt)
 *   - one `done` SSE event
 *   - one `test_run.complete` activity log
 *   - one `run.complete` telemetry event
 *   - best-effort notifications + GitHub-check completion
 *
 * @param {Object} project       - DIF-012 env-scoped project (carries `qualityGates`, `webVitalsBudgets`).
 * @param {Object} run           - Fresh `runs` row, hydrated via `runRepo.getById`.
 * @param {Object} jobOptions    - The shard job's `options` payload. Carries:
 *   - `actorInfo`   — `{ userId, userName }` from the triggering request.
 *   - `callbackUrl` — CI/CD POST target (trigger path only). SSRF-validated
 *                     at the route layer; finalizer fires exactly once,
 *                     gated by `incrementShardsCompleted` exclusivity.
 * @returns {Promise<void>}
 */
async function finalizeShardedRun(project, run, jobOptions = {}) {
  const actorInfo = jobOptions.actorInfo || {};
  const callbackUrl = jobOptions.callbackUrl || null;
  // Defensive: if the run already reached a terminal state between this
  // shard's increment and this call, skip finalization entirely. Two cases:
  //   - `"aborted"` — the abort route owns terminal-state writes.
  //   - `"failed"` — a sibling shard crashed on its final attempt and
  //     `markRunFailedFirstWriterWins` already transitioned the row.
  //     Without this guard the finalizer would wastefully run the AI
  //     feedback loop (regenerating tests for a browser-crash failure,
  //     not a real test-logic issue), persist partial gate results onto
  //     a terminal row, and modify test `reviewStatus` via the feedback
  //     loop — all side effects on a run the user already sees as failed.
  //     `markRunCompletedFirstWriterWins` at the end would correctly
  //     reject the status flip (DB is `"failed"`, not `"running"`), but
  //     the intermediate AI calls + DB writes are real waste.
  if (run.status === "aborted" || run.status === "failed") return;

  const runId = run.id;
  const results = Array.isArray(run.results) ? run.results : [];
  run.retryCount = results.reduce((sum, r) => sum + (r.retryCount || 0), 0);
  run.failedAfterRetry = results.filter((r) => r.failedAfterRetry).length;

  // Quality gates + web vitals — same evaluators as the single-shard path.
  // The exported `__evaluateQualityGatesForTest` / `__evaluateWebVitalsBudgetsForTest`
  // aliases are the public test-friendly names; we deliberately reuse them
  // here rather than duplicate the logic (it's a moderately large pure
  // function with subtle data-driven-skip handling — see testRunner.js).
  run.gateResult = __evaluateQualityGatesForTest(project.qualityGates, run);
  run.webVitalsResult = __evaluateWebVitalsBudgetsForTest(project.webVitalsBudgets, run);

  // AUTO-010 — Deterministic root-cause clustering on the full DB results
  // set. Mirrors the single-process tail in `testRunner.js`; the
  // sharded-finalizer path bypasses that call site, so without this every
  // multi-shard run would persist `rootCauses: null` and the RunDetail
  // Root Cause Summary panel would never render for sharded runs.
  run.rootCauses = clusterFailures({ results });

  // Persist aggregates BEFORE the feedback loop so a feedback-loop crash
  // doesn't lose the gate verdict (CI consumers polling `/trigger/runs/:id`
  // would otherwise see a `null` gateResult on a failed-feedback run).
  runRepo.update(runId, {
    retryCount: run.retryCount,
    failedAfterRetry: run.failedAfterRetry,
    gateResult: run.gateResult,
    webVitalsResult: run.webVitalsResult,
    rootCauses: run.rootCauses, // AUTO-010
  });

  // Re-read live status from the DB before the (expensive) feedback loop.
  // The `run` snapshot above came from `runRepo.getById` at the boundary-
  // crossing call site; the user could have clicked Abort between that
  // read and now. The DB-level `markRunCompletedFirstWriterWins` below
  // catches the race correctly (returns false → skips side effects), but
  // running the AI feedback loop against an already-aborted run is wasted
  // ACUs. Cheap one-column SELECT closes most of the race window without
  // changing the correctness guarantee. (The window between *this* read
  // and the markRunCompletedFirstWriterWins UPDATE further down is still
  // open, but it's far shorter than the feedback-loop duration.)
  const liveStatus = runRepo.getById(runId)?.status;
  if (liveStatus === "aborted" || liveStatus === "failed") {
    structuredLog("run.finalize_skipped_terminal", {
      runId,
      reason: `live status ${liveStatus} — skipping feedback loop and finalization`,
    });
    return;
  }

  // Feedback loop — runs exactly once per run. Load the FULL approved test
  // set (every shard's slice combined) so the AI regeneration sees the
  // same tests the original dispatch did. Aborts mid-feedback are swallowed
  // by the loop itself; we additionally swallow any unexpected throw so a
  // feedback-loop crash can never block run finalization (CI must still
  // see a terminal status — `completed` or `failed`).
  const tests = testRepo.getByProjectId(project.id).filter((t) => t.reviewStatus === "approved");
  try {
    await runFeedbackLoop(run, tests, null);
  } catch (err) {
    structuredLog("run.feedback_loop_failed", { runId, error: err.message });
  }

  // Status transition + duration + telemetry. `startedAt` is the parent
  // run's column (set when the route created the row); duration is the
  // wall-clock from run start to finalizer arrival — captures the actual
  // parallel speedup vs. a single-shard run on the same suite.
  finalizeRunIfNotAborted(run, () => {
    run.finishedAt = new Date().toISOString();
    run.duration = run.startedAt
      ? Date.now() - new Date(run.startedAt).getTime()
      : null;
    structuredLog("run.complete", {
      runId, projectId: project.id,
      passed: run.passed, failed: run.failed, total: run.total,
      durationMs: run.duration, shardCount: run.shardCount,
    });
    trackTelemetry("run.complete", {
      projectId: project.id,
      browser: run.browser,
      total: run.total, passed: run.passed, failed: run.failed,
      retryCount: run.retryCount || 0,
      failedAfterRetry: run.failedAfterRetry || 0,
      shardCount: run.shardCount || 1,
      parallelWorkers: run.parallelWorkers || 1,
      durationMs: run.duration,
      url: project.url,
    });
  });

  // Persist terminal state atomically with first-writer-wins semantics.
  // Catches the late-abort race where the user clicked Abort between
  // `getById` and here — the predicate `WHERE status = 'running'` makes
  // the UPDATE a no-op when the row is already terminal, so the user's
  // abort isn't silently overwritten with `completed`. When the primitive
  // returns false, treat the run as aborted (the abort route owns the
  // SSE `done` event and activity log for that case) and skip the
  // post-finalize side effects below.
  const persistedTerminal = runRepo.markRunCompletedFirstWriterWins(runId, {
    finishedAt: run.finishedAt,
    duration: run.duration,
    qualityAnalytics: run.qualityAnalytics || null,
  });
  if (!persistedTerminal) {
    structuredLog("run.finalize_skipped_terminal", {
      runId,
      reason: "row already terminal (abort beat finalizer)",
    });
    return;
  }

  // Activity log + `done` SSE — fire exactly once per logical run.
  logActivity({
    ...actorInfo,
    type: "test_run.complete",
    projectId: project.id,
    projectName: project.name,
    detail: `Test run completed — ${run.passed || 0} passed, ${run.failed || 0} failed (${run.shardCount}× shards)`,
  });
  emitRunEvent(runId, "done", {
    status: run.status,
    passed: run.passed, failed: run.failed, total: run.total,
  });

  // Best-effort side effects — never block finalization on these.
  try { await fireNotifications(run, project); } catch { /* best-effort */ }

  // CAP-002 Phase 2 — GitHub Check Run completion. The trigger-path
  // `runWithAbort.onComplete` callback handles this for single-shard runs
  // (`backend/src/routes/trigger.js:685`); the sharded BullMQ path
  // bypasses `runWithAbort` entirely, so without this call a sharded run
  // triggered from a GitHub PR would leave the Check Run stuck
  // "in_progress" forever. Dynamic import avoids a circular dependency at
  // module load (trigger.js → testRunner.js → workers/runWorker.js).
  // Best-effort — a GitHub outage must never block run finalization.
  if (run.githubCheck?.checkRunId) {
    try {
      const { concludeGithubCheck } = await import("../routes/trigger.js");
      await concludeGithubCheck(run, project);
    } catch { /* best-effort */ }
  }

  // CAP-002 Phase 2 — Fire the CI/CD `callbackUrl` POST. Same payload
  // shape as the single-shard trigger path (`routes/trigger.js:780-789`)
  // so CI consumers can use one handler for both sharded and non-sharded
  // runs. SSRF-safe: `safeFetch` re-resolves DNS to mitigate rebinding
  // and blocks redirects to prevent open-redirect bypasses. The URL was
  // validated at the route layer before enqueue. Fires exactly once per
  // run because only the boundary-crossing shard reaches this function
  // (gated by `incrementShardsCompleted`'s exclusivity predicate).
  // Best-effort — a callback failure must never re-throw out of the
  // finalizer, mirroring the `safeFetchCallback(...).catch(() => {})`
  // contract on the single-shard path.
  if (callbackUrl && typeof callbackUrl === "string") {
    try {
      const payload = JSON.stringify({
        runId,
        status: run.status,
        passed: run.passed,
        failed: run.failed,
        total: run.total,
        error: run.error || null,
        gateResult: run.gateResult || null,
        webVitalsResult: run.webVitalsResult || null,
      });
      await safeFetch(callbackUrl, {
        method: "POST",
        headers: { "Content-Type": "application/json" },
        body: payload,
        signal: AbortSignal.timeout(10_000),
      });
    } catch { /* best-effort */ }
  }
}

// ─── Public API ───────────────────────────────────────────────────────────────

/**
 * Create and start the BullMQ Worker.
 * No-op if Redis or BullMQ is not available.
 */
export function startWorker() {
  if (!Worker || !process.env.REDIS_URL) return;

  try {
    _worker = new Worker("sentri:runs", processJob, {
      connection: {
        url: process.env.REDIS_URL,
        maxRetriesPerRequest: null,
      },
      concurrency: WORKER_CONCURRENCY,
    });

    _worker.on("failed", (job, err) => {
      console.error(formatLogLine("error", null,
        `[worker] Job ${job?.id} failed: ${err.message}`));
    });

    _worker.on("error", (err) => {
      console.error(formatLogLine("error", null,
        `[worker] Worker error: ${err.message}`));
    });

    console.log(formatLogLine("info", null,
      `[worker] BullMQ worker started (concurrency: ${WORKER_CONCURRENCY})`));

    // CAP-002 Phase 2 (Prerequisite #5) — subscribe to the cross-process
    // run-abort channel so an abort triggered on another replica reaches
    // this worker's in-flight controllers. Same-replica aborts still go
    // through `workerAbortControllers` directly via the abort route's
    // local-fast-path (origin stamp suppresses self-echo on this side).
    // No-op when Redis is unavailable — `subscribeToRunAborts` returns
    // false and the worker continues with local-only abort behaviour.
    subscribeToRunAborts({
      // CAP-002 Phase 2 (Prerequisite #4) — fan out the cross-replica
      // abort signal to *every* shard controller for the parent run.
      // `abortAllShardsForRun` handles both the legacy single-runId-keyed
      // entry and the new `${runId}:s${i}` shard-keyed entries.
      onAbort: (runId) => { abortAllShardsForRun(runId); },
    });
  } catch (err) {
    console.warn(formatLogLine("warn", null,
      `[worker] Failed to start BullMQ worker: ${err.message}`));
  }
}

/**
 * Gracefully close the worker.
 * Called from the shutdown hook in `index.js`.
 *
 * @returns {Promise<void>}
 */
export async function stopWorker() {
  // Abort all in-flight jobs. Iterate by key so we hit every shard entry
  // (`${runId}:s${i}` for shard-mode runs, bare `runId` for legacy runs).
  for (const [key, entry] of workerAbortControllers) {
    entry.controller.abort();
    workerAbortControllers.delete(key);
  }

  if (_worker) {
    try {
      await _worker.close();
      console.log(formatLogLine("info", null, "[worker] BullMQ worker stopped"));
    } catch (err) {
      console.warn(formatLogLine("warn", null,
        `[worker] Worker close error: ${err.message}`));
    }
    _worker = null;
  }
}