/**
* @module scheduler
* @description Cron-based test run scheduler (ENH-006).
*
* Manages one `node-cron` task per project schedule. Tasks are stored in a
* process-local Map keyed by projectId. On startup the server calls
* {@link initScheduler} which loads every enabled schedule from the DB and
* arms each cron task. When a project's schedule is created, updated, or
* toggled, the caller invokes {@link reloadSchedule} to apply the change
* without a process restart.
*
* ### Firing logic
* When a cron task fires it behaves identically to `POST /api/projects/:id/run`:
* - Loads the project and its approved tests from the DB.
* - Skips if an active run is already in progress (prevents double-runs).
* - Creates a `test_run` run record and hands off to `runWithAbort`.
* - Records `lastRunAt` / `nextRunAt` via `scheduleRepo.updateRunTimes()`.
* - Logs a `scheduled_run.start` activity entry.
*
* ### Exports
* - {@link initScheduler} — Load all enabled schedules at startup.
* - {@link reloadSchedule} — Upsert a single project's task (create/update/toggle).
* - {@link stopSchedule} — Cancel and remove a task (project deleted).
* - {@link getNextRunAt} — Compute the ISO next-fire time for a cron expression.
*/
import cron from "node-cron";
import * as scheduleRepo from "./database/repositories/scheduleRepo.js";
import * as projectRepo from "./database/repositories/projectRepo.js";
import * as testRepo from "./database/repositories/testRepo.js";
import * as runRepo from "./database/repositories/runRepo.js";
import { generateRunId } from "./utils/idGenerator.js";
import { runWithAbort } from "./utils/runWithAbort.js";
import { runTests } from "./testRunner.js";
import { logActivity } from "./utils/activityLogger.js";
import { classifyError } from "./utils/errorClassifier.js";
import { formatLogLine } from "./utils/logFormatter.js";
import { fireNotifications } from "./utils/notifications.js";
import { detectStaleTests } from "./utils/staleDetector.js";
// ─── Task registry ─────────────────────────────────────────────────────────────
// Maps projectId → node-cron ScheduledTask
/** @type {Map<string, Object>} projectId → node-cron ScheduledTask */
const tasks = new Map();
/** @type {Object|null} Weekly stale test detection task (AUTO-013). */
let _staleDetectionTask = null;
// ─── Helpers ──────────────────────────────────────────────────────────────────
/**
* Extract date/time components from a UTC timestamp in a given IANA timezone.
*
* Uses `Intl.DateTimeFormat.formatToParts()` — the spec-guaranteed approach
* for timezone-aware field extraction. Unlike the `toLocaleString` round-trip
* (`new Date(d.toLocaleString("en-US", { timeZone }))`), this does not
* depend on locale-specific date string formatting or parsing, and handles
* DST transitions correctly (spring-forward gaps, fall-back overlaps).
*
* @param {Date} date - UTC Date object.
* @param {string} timezone - IANA timezone name (e.g. "America/New_York").
* @returns {{ minute: number, hour: number, day: number, month: number, weekday: number }}
*/
const _tzFormatters = new Map();
function getDatePartsInTz(date, timezone) {
// Cache the DateTimeFormat instance per timezone — construction is expensive,
// but formatToParts() on a cached instance is fast.
let fmt = _tzFormatters.get(timezone);
if (!fmt) {
fmt = new Intl.DateTimeFormat("en-US", {
timeZone: timezone,
hour12: false,
year: "numeric",
month: "numeric",
day: "numeric",
hour: "numeric",
minute: "numeric",
weekday: "short",
});
_tzFormatters.set(timezone, fmt);
}
const partsArr = fmt.formatToParts(date);
const parts = {};
for (const p of partsArr) parts[p.type] = p.value;
// Map weekday short name → 0-6 (Sun=0)
const dayMap = { Sun: 0, Mon: 1, Tue: 2, Wed: 3, Thu: 4, Fri: 5, Sat: 6 };
return {
minute: parseInt(parts.minute, 10),
hour: parseInt(parts.hour, 10) % 24, // en-US hour12:false returns 24 for midnight in some engines
day: parseInt(parts.day, 10),
month: parseInt(parts.month, 10),
weekday: dayMap[parts.weekday] ?? 0,
};
}
/**
* Compute the next fire time for a cron expression in a given timezone.
* Returns an ISO 8601 string or null if the expression is invalid.
*
* We use a lightweight approach: advance minute-by-minute from now (max
* 1 year) until the cron fields match. For common schedules this resolves
* in at most 525,960 steps, typically far fewer. A real cron-parser
* library would be cleaner but avoids a new dependency.
*
* Timezone conversion uses `Intl.DateTimeFormat.formatToParts()` — the
* spec-guaranteed approach that correctly handles DST transitions.
*
* @param {string} cronExpr - 5-field cron expression.
* @param {string} [timezone] - IANA timezone (defaults to "UTC").
* @returns {string|null}
*/
export function getNextRunAt(cronExpr, timezone = "UTC") {
if (!cron.validate(cronExpr)) return null;
// Parse cron fields: minute hour dom month dow
const fields = cronExpr.trim().split(/\s+/);
if (fields.length !== 5) return null;
const [minuteField, hourField, domField, monthField, dowField] = fields;
/**
* Check if a single atomic cron token matches the given value.
* An atom is one of: "*", a plain number, a range "a-b", or any of
* those followed by "/step".
*/
function matchesAtom(atom, value, min, max) {
// Step: */n, a-b/n, or plain/n
if (atom.includes("/")) {
const [range, step] = atom.split("/");
const stepN = parseInt(step, 10);
if (range === "*") return (value - min) % stepN === 0;
if (range.includes("-")) {
const [lo, hi] = range.split("-").map(Number);
return value >= lo && value <= hi && (value - lo) % stepN === 0;
}
// plain start/step (e.g. "5/2") — start at lo, step through max
const lo = parseInt(range, 10);
return value >= lo && value <= max && (value - lo) % stepN === 0;
}
// Range: a-b
if (atom.includes("-")) {
const [lo, hi] = atom.split("-").map(Number);
return value >= lo && value <= hi;
}
return parseInt(atom, 10) === value;
}
function matches(field, value, min, max) {
if (field === "*") return true;
// List: split on commas and delegate each element to matchesAtom
// This correctly handles combined list+range like "1-5,10-15"
if (field.includes(",")) {
return field.split(",").some(atom => matchesAtom(atom, value, min, max));
}
return matchesAtom(field, value, min, max);
}
// ── Day-of-week Sunday alias: POSIX cron allows 7 as well as 0 ──────
// getDatePartsInTz returns 0 for Sunday (JS convention: 0=Sun … 6=Sat).
// When the cron field contains 7, we need to match it against value 0.
//
// Simple text replacement ("7"→"0") breaks ranges like "5-7" → "5-0"
// (which would never match). Instead, we match the dow field twice:
// 1. Match with the real weekday value (0–6) — handles 0-based fields.
// 2. If Sunday (value=0), also try matching as value 7 — handles
// fields written with the 7-alias (plain "7", range "5-7", list "1,7").
function matchesDow(field, value) {
if (matches(field, value, 0, 7)) return true;
// If today is Sunday (0), also check if the field matches 7
if (value === 0 && matches(field, 7, 0, 7)) return true;
return false;
}
// Start from the next full minute
const start = new Date();
start.setSeconds(0, 0);
start.setTime(start.getTime() + 60_000); // +1 minute
// Iterate up to 1 year ahead
const limit = start.getTime() + 365 * 24 * 60 * 60 * 1000;
const candidate = new Date(start);
while (candidate.getTime() < limit) {
// Evaluate the candidate in the target timezone using Intl.DateTimeFormat
const tp = getDatePartsInTz(candidate, timezone);
if (
matches(minuteField, tp.minute, 0, 59) &&
matches(hourField, tp.hour, 0, 23) &&
matches(domField, tp.day, 1, 31) &&
matches(monthField, tp.month, 1, 12) &&
matchesDow(dowField, tp.weekday)
) {
return candidate.toISOString();
}
candidate.setTime(candidate.getTime() + 60_000);
}
return null;
}
// ─── Fire a scheduled run ──────────────────────────────────────────────────────
/**
* Execute a test run for a project as triggered by a cron schedule.
* Mirrors the logic in `POST /api/projects/:id/run`.
*
* @param {string} projectId
*/
async function fireScheduledRun(projectId) {
const project = projectRepo.getById(projectId);
if (!project) {
console.warn(formatLogLine("warn", null, `[scheduler] Project ${projectId} not found — skipping scheduled run`));
return;
}
// Skip if an active run is already in progress
const activeRun = runRepo.findActiveByProjectId(projectId);
if (activeRun) {
console.log(formatLogLine("info", null, `[scheduler] Skipping scheduled run for ${project.name} — ${activeRun.id} already running`));
return;
}
const allTests = testRepo.getByProjectId(projectId);
const tests = allTests.filter(t => t.reviewStatus === "approved");
if (!tests.length) {
console.log(formatLogLine("info", null, `[scheduler] Skipping scheduled run for ${project.name} — no approved tests`));
return;
}
// Use the project's configured parallelWorkers (from last dials config),
// falling back to the PARALLEL_WORKERS env var or 1 if not set.
const defaultWorkers = parseInt(process.env.PARALLEL_WORKERS, 10) || 1;
const parallelWorkers = Math.max(1, Math.min(10, defaultWorkers));
const runId = generateRunId();
const run = {
id: runId,
projectId: project.id,
type: "test_run",
status: "running",
startedAt: new Date().toISOString(),
logs: [],
results: [],
passed: 0,
failed: 0,
total: tests.length,
parallelWorkers,
testQueue: tests.map(t => ({ id: t.id, name: t.name, steps: t.steps || [] })),
workspaceId: project.workspaceId || null,
};
runRepo.create(run);
logActivity({
type: "scheduled_run.start",
projectId: project.id,
projectName: project.name,
workspaceId: project.workspaceId || null,
detail: `Scheduled test run started — ${tests.length} test${tests.length !== 1 ? "s" : ""}${parallelWorkers > 1 ? ` (${parallelWorkers}x parallel)` : ""}`,
status: "running",
});
console.log(formatLogLine("info", null, `[scheduler] Firing scheduled run ${runId} for project ${project.name}`));
runWithAbort(runId, run,
signal => runTests(project, tests, run, { parallelWorkers, signal }),
{
onSuccess: () => {
logActivity({
type: "scheduled_run.complete",
projectId: project.id,
projectName: project.name,
workspaceId: project.workspaceId || null,
detail: `Scheduled run completed — ${run.passed || 0} passed, ${run.failed || 0} failed`,
});
},
onFailActivity: (err) => ({
type: "scheduled_run.fail",
projectId: project.id,
projectName: project.name,
workspaceId: project.workspaceId || null,
detail: `Scheduled run failed: ${classifyError(err, "run").message}`,
}),
onComplete: async (finishedRun) => {
// Record lastRunAt and update nextRunAt
const schedule = scheduleRepo.getByProjectId(projectId);
if (schedule) {
const nextRunAt = getNextRunAt(schedule.cronExpr, schedule.timezone);
scheduleRepo.updateRunTimes(projectId, new Date().toISOString(), nextRunAt);
}
// FEA-001: Fire failure notifications — best-effort
try { await fireNotifications(finishedRun, project); } catch { /* best-effort */ }
},
},
);
}
// ─── Task management ──────────────────────────────────────────────────────────
/**
* Cancel and remove an existing task for a project (if any).
* @param {string} projectId
*/
function cancelTask(projectId) {
const existing = tasks.get(projectId);
if (existing) {
existing.stop();
tasks.delete(projectId);
}
}
/**
* Arm (or re-arm) a cron task for a project schedule.
* If the schedule is disabled or has an invalid cron expression, the task
* is cancelled and removed.
*
* @param {Object} schedule - Schedule row from scheduleRepo
*/
function armTask(schedule) {
cancelTask(schedule.projectId);
if (!schedule.enabled) return;
if (!cron.validate(schedule.cronExpr)) {
console.warn(formatLogLine("warn", null,
`[scheduler] Invalid cron expression "${schedule.cronExpr}" for project ${schedule.projectId} — task not armed`));
return;
}
const task = cron.schedule(schedule.cronExpr, () => {
fireScheduledRun(schedule.projectId).catch(err => {
console.error(formatLogLine("error", null,
`[scheduler] Unhandled error in scheduled run for ${schedule.projectId}: ${err.message}`));
});
}, {
timezone: schedule.timezone || "UTC",
scheduled: true,
});
tasks.set(schedule.projectId, task);
console.log(formatLogLine("info", null,
`[scheduler] Armed task for project ${schedule.projectId} (${schedule.cronExpr}, tz=${schedule.timezone})`));
}
// ─── Public API ───────────────────────────────────────────────────────────────
/**
* Load all enabled schedules from the database and arm cron tasks.
* Also starts the weekly stale test detection job (AUTO-013).
* Called once from `index.js` after DB init.
*/
export function initScheduler() {
const schedules = scheduleRepo.getAllEnabled();
for (const s of schedules) {
try {
armTask(s);
} catch (err) {
console.error(formatLogLine("error", null,
`[scheduler] Failed to arm task for project ${s.projectId}: ${err.message} — skipping`));
}
}
// AUTO-013: Weekly stale test detection — runs every Sunday at 03:00 UTC.
// Flags approved tests that haven't been run in STALE_TEST_DAYS (default 90).
_staleDetectionTask = cron.schedule("0 3 * * 0", () => {
try {
detectStaleTests();
} catch (err) {
console.error(formatLogLine("error", null,
`[scheduler] Stale test detection failed: ${err.message}`));
}
}, { timezone: "UTC", scheduled: true });
console.log(formatLogLine("info", null, `[scheduler] Initialised — ${tasks.size} active schedule(s) (${schedules.length} loaded), stale detection armed`));
}
/**
* Reload a single project's cron task after a schedule create/update/toggle.
* Fetches the latest schedule from the DB and re-arms the task.
*
* @param {string} projectId
*/
export function reloadSchedule(projectId) {
const schedule = scheduleRepo.getByProjectId(projectId);
if (!schedule) {
cancelTask(projectId);
return;
}
armTask(schedule);
}
/**
* Stop and remove the task for a project.
* Called when a project is deleted so the cron task doesn't fire against
* a non-existent project.
*
* @param {string} projectId
*/
export function stopSchedule(projectId) {
cancelTask(projectId);
}
/**
* Stop and remove all active cron tasks.
* Called during graceful shutdown so no new scheduled runs fire while
* in-flight work is draining.
*/
export function stopAllTasks() {
for (const [projectId, task] of tasks) {
task.stop();
}
tasks.clear();
if (_staleDetectionTask) {
_staleDetectionTask.stop();
_staleDetectionTask = null;
}
}
/**
* Return the number of currently active (armed) cron tasks.
* Exposed for the /api/system health endpoint.
*
* @returns {number}
*/
export function activeTaskCount() {
return tasks.size;
}