BullMQ Worker for durable run execution (INF-003).
Processes jobs from the sentri:runs queue. Each job contains the
serialised run parameters (project, tests, run record, options). The
worker calls crawlAndGenerateTests or runTests depending on the
job type, mirroring the logic previously inlined in route handlers.
Concurrency
Controlled by MAX_WORKERS env var (default 2). Each concurrent slot
processes one run at a time — Playwright browser instances are not shared
across jobs.
Lifecycle
startWorker— Create and start the BullMQ Worker.stopWorker— Gracefully close the worker (drain + disconnect).
When Redis is not available, both functions are no-ops.
- Source:
Members
(static, constant) workerAbortControllers :Map.<string, {controller: AbortController, provider: (string|null), runId: string, shardIndex: (number|null)}>
Registry of active BullMQ-processed runs. The chat endpoint reads .provider
to skip runs that aren't using Ollama when checking for concurrent LLM
activity (prevents false-positive 503s when a cloud run is active and the
user switches to Ollama).
Key scheme (CAP-002 Phase 2 — Prerequisite #4)
The map is keyed by the BullMQ jobId, NOT the bare runId, so multiple shards of the same run executing on the same replica don't collide on a single map slot. The jobId scheme is:
- Legacy / single-shard runs:
jobId === runId. Key is just the runId — bit-for-bit identical to the pre-Phase-2 behaviour. - Shard-mode runs (coordinator PR):
jobId === ${runId}:s${shardIndex}. Each shard gets its own map entry; entries storerunId+shardIndexso reverse lookups (e.g. the cross-process abort handler) can fan out to every shard of a given parent run.
Use workerAbortKey when registering and deleting; use
abortAllShardsForRun to fan out an abort to every shard of a parent
run (the abort route's local-fast-path + the run-abort pub/sub subscriber
both go through this helper).
Type:
- Map.<string, {controller: AbortController, provider: (string|null), runId: string, shardIndex: (number|null)}>
- Source:
(inner) _worker :Object|null
BullMQ Worker instance.
Type:
- Object | null
- Source:
Methods
(static) abortAllShardsForRun(runId) → {number}
Abort every shard controller for a parent run. Convenience wrapper around
forEachShardEntry that captures the canonical "cancel + delete"
sequence used by both the user-driven abort route and the cross-replica
pub/sub subscriber.
Parameters:
| Name | Type | Description |
|---|---|---|
runId |
string |
- Source:
Returns:
Number of shard controllers aborted.
- Type
- number
(static) forEachShardEntry(runId, fn) → {number}
Iterate every registry entry belonging to a parent run, regardless of shard. Used by:
- The abort route's local-fast-path so a user-driven abort cancels every shard's controller on this replica before publishing to the cross-replica pub/sub channel.
- The run-abort pub/sub subscriber so a cross-replica abort signal reaches every matching local shard.
Visits both shard-keyed entries (${runId}:s${i}) and the legacy bare-
runId entry. Safe to call concurrently with processJob — iteration
uses a snapshot so handlers can mutate the map (delete) without
affecting the loop.
Parameters:
| Name | Type | Description |
|---|---|---|
runId |
string | |
fn |
function | Callback |
- Source:
Returns:
Number of entries visited.
- Type
- number
(static) startWorker()
Create and start the BullMQ Worker. No-op if Redis or BullMQ is not available.
- Source:
(static) stopWorker() → {Promise.<void>}
Gracefully close the worker.
Called from the shutdown hook in index.js.
- Source:
Returns:
- Type
- Promise.<void>
(static) workerAbortKey(runId, shardIndex) → {string}
Compute the registry key for a (runId, shardIndex) pair. Mirrors the
BullMQ jobId shape so the map key, the queue jobId, and the trace
artifact path (Prerequisite #2) all derive from the same identifier.
Parameters:
| Name | Type | Description |
|---|---|---|
runId |
string | |
shardIndex |
number | null | 0-based, or |
- Source:
Returns:
- Type
- string
(async, inner) finalizeShardedRun(project, run, jobOptions) → {Promise.<void>}
CAP-002 Phase 2 — Finalize a sharded run after every shard's results
have landed. Called exactly once per run: the shard whose
incrementShardsCompleted UPDATE crossed the boundary at shardCount
is the finalizer; all other shards return without invoking this.
The "exactly once" guarantee comes from the SQL predicate on
incrementShardsCompleted (Prerequisite #1) — it returns 1 only for
the single UPDATE that landed the counter at the cap. No JS-side mutex
needed.
Owns (mirrors the single-shard tail of runTests):
- gateResult / webVitalsResult evaluation against the full results set
(every shard has flushed via
appendRunResults— DB is the source of truth, the localrunhere is the fresh row this worker re-read) - retryCount / failedAfterRetry aggregation
- one
runFeedbackLoopinvocation (single AI-call pass per run) finalizeRunIfNotAborted→ status = "completed"- duration computation (now - startedAt)
- one
doneSSE event - one
test_run.completeactivity log - one
run.completetelemetry event - best-effort notifications + GitHub-check completion
Parameters:
| Name | Type | Description |
|---|---|---|
project |
Object | DIF-012 env-scoped project (carries |
run |
Object | Fresh |
jobOptions |
Object | The shard job's
|
- Source:
Returns:
- Type
- Promise.<void>
(async, inner) processJob(job) → {Promise.<void>}
Process a single run job from the queue.
Parameters:
| Name | Type | Description |
|---|---|---|
job |
Object | — BullMQ Job instance. |
- Source:
Returns:
- Type
- Promise.<void>