Source: database/repositories/runLogRepo.js

/**
 * @module database/repositories/runLogRepo
 * @description Data-access layer for the `run_logs` table (ENH-008).
 *
 * Replaces the O(n²) JSON mutation pattern on `runs.logs`:
 *   - Every log line is stored as an independent row.
 *   - Writers call {@link appendLog} — a single `INSERT`.
 *   - Readers call {@link getByRunId} to retrieve all lines in order.
 *
 * ### Schema
 * ```
 * run_logs(id AUTOINCREMENT, runId TEXT, seq INT, level TEXT, message TEXT, createdAt TEXT)
 * ```
 *
 * ### Typical flow
 * ```js
 * // In runLogger.js (called for every log() invocation):
 * appendLog(run.id, 'info', '[12:34:56] Starting crawl…');
 *
 * // In SSE route (initial snapshot):
 * const logs = getByRunId(runId).map(r => r.message);
 * ```
 *
 * ### Exports
 * - {@link appendLog}   — insert one log row
 * - {@link getByRunId}  — fetch all rows for a run, ordered by seq
 * - {@link deleteByRunId} — hard-delete all logs for a run (purge path)
 * - {@link countByRunId}  — row count for a run (used in tests)
 */

import { getDatabase } from "../sqlite.js";

// ─── Sequence counter cache ───────────────────────────────────────────────────
// Each run has a monotonic seq counter so readers always get a stable order
// even if two writers race (unlikely — runs are single-threaded per pipeline
// but this makes the contract explicit).
//
// The cache is populated lazily from the DB on the first write for a run and
// then incremented in-process.  On server restart the DB is the source of truth.

/** @type {Map<string, number>} runId → next seq value */
const _seqCache = new Map();

/**
 * Return the next sequence number for `runId` and advance the cache.
 * Falls back to a DB MAX query on first access so restarts are safe.
 *
 * @param {Object} db    — better-sqlite3 Database instance
 * @param {string} runId
 * @returns {number}
 */
function nextSeq(db, runId) {
  if (!_seqCache.has(runId)) {
    const row = db.prepare(
      "SELECT COALESCE(MAX(seq), 0) AS maxSeq FROM run_logs WHERE runId = ?"
    ).get(runId);
    _seqCache.set(runId, (row?.maxSeq ?? 0) + 1);
  }
  const seq = _seqCache.get(runId);
  _seqCache.set(runId, seq + 1);
  return seq;
}

/**
 * @typedef {Object} RunLogRow
 * @property {number} id        - Auto-increment primary key
 * @property {string} runId     - Foreign key → runs.id
 * @property {number} seq       - 1-based sequence number within the run
 * @property {string} level     - 'info' | 'warn' | 'error'
 * @property {string} message   - Formatted log line (e.g. "[12:34:56] msg")
 * @property {string} createdAt - ISO 8601 timestamp
 */

// ─── Write ────────────────────────────────────────────────────────────────────

/**
 * Append a single log line to the `run_logs` table.
 *
 * This is the hot path — called on every `log()` invocation.  It executes
 * a single `INSERT` and returns immediately.
 *
 * @param {string} runId   - The run this log line belongs to
 * @param {string} level   - 'info' | 'warn' | 'error'
 * @param {string} message - Pre-formatted log string (includes timestamp prefix)
 * @returns {void}
 */
export function appendLog(runId, level, message) {
  const db = getDatabase();
  const seq = nextSeq(db, runId);
  const createdAt = new Date().toISOString();
  db.prepare(
    "INSERT INTO run_logs (runId, seq, level, message, createdAt) VALUES (?, ?, ?, ?, ?)"
  ).run(runId, seq, level, message, createdAt);
}

// ─── Read ─────────────────────────────────────────────────────────────────────

/**
 * Fetch all log rows for a run, ordered by seq ascending.
 *
 * Returns the full row objects so callers can access `level` for filtering
 * if needed.  For plain string arrays (legacy API compat) use
 * `getByRunId(id).map(r => r.message)`.
 *
 * @param {string} runId
 * @returns {RunLogRow[]}
 */
export function getByRunId(runId) {
  const db = getDatabase();
  return db.prepare(
    "SELECT id, runId, seq, level, message, createdAt FROM run_logs WHERE runId = ? ORDER BY seq ASC"
  ).all(runId);
}

/**
 * Fetch log messages (strings only) for a run, ordered by seq ascending.
 * Convenience wrapper over {@link getByRunId} for callers that only need
 * the message strings (e.g. the SSE snapshot and the run-detail REST endpoint).
 *
 * @param {string} runId
 * @returns {string[]}
 */
export function getMessagesByRunId(runId) {
  return getByRunId(runId).map((r) => r.message);
}

// ─── Delete / maintenance ─────────────────────────────────────────────────────

/**
 * Hard-delete all log rows for a run.
 * Called when a run is permanently purged (recycle-bin purge path).
 *
 * @param {string} runId
 * @returns {number} Number of rows deleted.
 */
export function deleteByRunId(runId) {
  const db = getDatabase();
  const info = db.prepare("DELETE FROM run_logs WHERE runId = ?").run(runId);
  _seqCache.delete(runId);
  return info.changes;
}

/**
 * Hard-delete all log rows for multiple runs (batch purge).
 * Used when a project is purged and all its runs are hard-deleted.
 *
 * @param {string[]} runIds
 * @returns {number} Total rows deleted.
 */
export function deleteByRunIds(runIds) {
  if (!runIds.length) return 0;
  const db = getDatabase();
  const placeholders = runIds.map(() => "?").join(", ");
  const info = db.prepare(
    `DELETE FROM run_logs WHERE runId IN (${placeholders})`
  ).run(...runIds);
  for (const id of runIds) _seqCache.delete(id);
  return info.changes;
}

/**
 * Evict a single run from the in-process seq cache.
 * Called when a run reaches a terminal state (completed, failed, aborted)
 * so the cache doesn't grow unboundedly on long-running servers.
 *
 * @param {string} runId
 */
export function evictCache(runId) {
  _seqCache.delete(runId);
}

/**
 * Count log rows for a run.
 * Primarily used in tests to verify write behaviour.
 *
 * @param {string} runId
 * @returns {number}
 */
export function countByRunId(runId) {
  const db = getDatabase();
  return db.prepare(
    "SELECT COUNT(*) AS cnt FROM run_logs WHERE runId = ?"
  ).get(runId).cnt;
}