Source: queue.js

/**
 * @module queue
 * @description Shared BullMQ queue for durable run execution (INF-003).
 *
 * When Redis is available (`REDIS_URL` is set and `ioredis` + `bullmq` are
 * installed), run execution is routed through a BullMQ queue instead of the
 * fire-and-forget `runWithAbort` pattern.  This provides:
 *
 * - **Durability** — jobs survive process crashes; stuck runs are retried.
 * - **Global concurrency** — `MAX_WORKERS` limits total parallel runs.
 * - **Visibility** — queue depth and active job count are queryable.
 *
 * When Redis is NOT available, the queue is `null` and callers fall back to
 * the existing `runWithAbort` in-process execution.  This keeps SQLite-only
 * deployments working unchanged.
 *
 * ### Exports
 * - {@link runQueue}        — BullMQ `Queue` instance (or `null`).
 * - {@link isQueueAvailable} — `true` when the queue is usable.
 * - {@link closeQueue}      — Gracefully close the queue connection.
 * - {@link getQueueStats}   — Return queue depth and active job count.
 */

import { createRequire } from "module";
import { formatLogLine } from "./utils/logFormatter.js";

const _require = createRequire(import.meta.url);

// ─── Lazy-load BullMQ ─────────────────────────────────────────────────────────
// BullMQ is an optional dependency — only loaded when REDIS_URL is set.

let Queue = null;
if (process.env.REDIS_URL) {
  try {
    const bullmq = _require("bullmq");
    Queue = bullmq.Queue;
  } catch {
    console.warn(formatLogLine("warn", null,
      "[queue] REDIS_URL is set but `bullmq` is not installed. " +
      "Run `npm install bullmq` to enable durable job queues. Falling back to in-process execution."
    ));
  }
}

// ─── Queue instance ───────────────────────────────────────────────────────────

/** @type {Object|null} BullMQ Queue instance for run jobs. */
export let runQueue = null;

if (Queue && process.env.REDIS_URL) {
  try {
    runQueue = new Queue("sentri:runs", {
      connection: {
        url: process.env.REDIS_URL,
        maxRetriesPerRequest: null,
      },
      defaultJobOptions: {
        attempts: 2,
        backoff: { type: "exponential", delay: 5000 },
        removeOnComplete: { count: 200 },
        removeOnFail: { count: 500 },
      },
    });

    console.log(formatLogLine("info", null, "[queue] BullMQ run queue initialised"));
  } catch (err) {
    console.warn(formatLogLine("warn", null,
      `[queue] Failed to create BullMQ queue: ${err.message}. Falling back to in-process execution.`));
    runQueue = null;
  }
}

// ─── Public helpers ───────────────────────────────────────────────────────────

/**
 * Check whether the BullMQ queue is available.
 *
 * @returns {boolean}
 */
export function isQueueAvailable() {
  return runQueue !== null;
}

/**
 * Get queue statistics (waiting + active counts).
 *
 * @returns {Promise<Object>} `{ waiting, active, delayed, failed }`
 */
export async function getQueueStats() {
  if (!runQueue) return { waiting: 0, active: 0, delayed: 0, failed: 0 };
  const [waiting, active, delayed, failed] = await Promise.all([
    runQueue.getWaitingCount(),
    runQueue.getActiveCount(),
    runQueue.getDelayedCount(),
    runQueue.getFailedCount(),
  ]);
  return { waiting, active, delayed, failed };
}

/**
 * Gracefully close the queue connection.
 * Called from the shutdown hook in `index.js`.
 *
 * @returns {Promise<void>}
 */
export async function closeQueue() {
  if (runQueue) {
    try {
      await runQueue.close();
      console.log(formatLogLine("info", null, "[queue] BullMQ queue closed"));
    } catch (err) {
      console.warn(formatLogLine("warn", null, `[queue] Queue close error: ${err.message}`));
    }
    runQueue = null;
  }
}