Module: workers/runWorker

BullMQ Worker for durable run execution (INF-003).

Processes jobs from the sentri:runs queue. Each job contains the serialised run parameters (project, tests, run record, options). The worker calls crawlAndGenerateTests or runTests depending on the job type, mirroring the logic previously inlined in route handlers.

Concurrency

Controlled by MAX_WORKERS env var (default 2). Each concurrent slot processes one run at a time — Playwright browser instances are not shared across jobs.

Lifecycle

  • startWorker — Create and start the BullMQ Worker.
  • stopWorker — Gracefully close the worker (drain + disconnect).

When Redis is not available, both functions are no-ops.

Source:

Members

(static, constant) workerAbortControllers :Map.<string, {controller: AbortController, provider: (string|null), runId: string, shardIndex: (number|null)}>

Registry of active BullMQ-processed runs. The chat endpoint reads .provider to skip runs that aren't using Ollama when checking for concurrent LLM activity (prevents false-positive 503s when a cloud run is active and the user switches to Ollama).

Key scheme (CAP-002 Phase 2 — Prerequisite #4)

The map is keyed by the BullMQ jobId, NOT the bare runId, so multiple shards of the same run executing on the same replica don't collide on a single map slot. The jobId scheme is:

  • Legacy / single-shard runs: jobId === runId. Key is just the runId — bit-for-bit identical to the pre-Phase-2 behaviour.
  • Shard-mode runs (coordinator PR): jobId === ${runId}:s${shardIndex}. Each shard gets its own map entry; entries store runId + shardIndex so reverse lookups (e.g. the cross-process abort handler) can fan out to every shard of a given parent run.

Use workerAbortKey when registering and deleting; use abortAllShardsForRun to fan out an abort to every shard of a parent run (the abort route's local-fast-path + the run-abort pub/sub subscriber both go through this helper).

Type:
  • Map.<string, {controller: AbortController, provider: (string|null), runId: string, shardIndex: (number|null)}>
Source:

(inner) _worker :Object|null

BullMQ Worker instance.

Type:
  • Object | null
Source:

Methods

(static) abortAllShardsForRun(runId) → {number}

Abort every shard controller for a parent run. Convenience wrapper around forEachShardEntry that captures the canonical "cancel + delete" sequence used by both the user-driven abort route and the cross-replica pub/sub subscriber.

Parameters:
Name Type Description
runId string
Source:
Returns:

Number of shard controllers aborted.

Type
number

(static) forEachShardEntry(runId, fn) → {number}

Iterate every registry entry belonging to a parent run, regardless of shard. Used by:

  • The abort route's local-fast-path so a user-driven abort cancels every shard's controller on this replica before publishing to the cross-replica pub/sub channel.
  • The run-abort pub/sub subscriber so a cross-replica abort signal reaches every matching local shard.

Visits both shard-keyed entries (${runId}:s${i}) and the legacy bare- runId entry. Safe to call concurrently with processJob — iteration uses a snapshot so handlers can mutate the map (delete) without affecting the loop.

Parameters:
Name Type Description
runId string
fn function

Callback (entry, key) => void.

Source:
Returns:

Number of entries visited.

Type
number

(static) startWorker()

Create and start the BullMQ Worker. No-op if Redis or BullMQ is not available.

Source:

(static) stopWorker() → {Promise.<void>}

Gracefully close the worker. Called from the shutdown hook in index.js.

Source:
Returns:
Type
Promise.<void>

(static) workerAbortKey(runId, shardIndex) → {string}

Compute the registry key for a (runId, shardIndex) pair. Mirrors the BullMQ jobId shape so the map key, the queue jobId, and the trace artifact path (Prerequisite #2) all derive from the same identifier.

Parameters:
Name Type Description
runId string
shardIndex number | null

0-based, or null for legacy single-shard runs.

Source:
Returns:
Type
string

(async, inner) finalizeShardedRun(project, run, jobOptions) → {Promise.<void>}

CAP-002 Phase 2 — Finalize a sharded run after every shard's results have landed. Called exactly once per run: the shard whose incrementShardsCompleted UPDATE crossed the boundary at shardCount is the finalizer; all other shards return without invoking this.

The "exactly once" guarantee comes from the SQL predicate on incrementShardsCompleted (Prerequisite #1) — it returns 1 only for the single UPDATE that landed the counter at the cap. No JS-side mutex needed.

Owns (mirrors the single-shard tail of runTests):

  • gateResult / webVitalsResult evaluation against the full results set (every shard has flushed via appendRunResults — DB is the source of truth, the local run here is the fresh row this worker re-read)
  • retryCount / failedAfterRetry aggregation
  • one runFeedbackLoop invocation (single AI-call pass per run)
  • finalizeRunIfNotAborted → status = "completed"
  • duration computation (now - startedAt)
  • one done SSE event
  • one test_run.complete activity log
  • one run.complete telemetry event
  • best-effort notifications + GitHub-check completion
Parameters:
Name Type Description
project Object

DIF-012 env-scoped project (carries qualityGates, webVitalsBudgets).

run Object

Fresh runs row, hydrated via runRepo.getById.

jobOptions Object

The shard job's options payload. Carries:

  • actorInfo{ userId, userName } from the triggering request.
  • callbackUrl — CI/CD POST target (trigger path only). SSRF-validated at the route layer; finalizer fires exactly once, gated by incrementShardsCompleted exclusivity.
Source:
Returns:
Type
Promise.<void>

(async, inner) processJob(job) → {Promise.<void>}

Process a single run job from the queue.

Parameters:
Name Type Description
job Object

— BullMQ Job instance.

Source:
Returns:
Type
Promise.<void>