Source: routes/sse.js

/**
 * @module routes/sse
 * @description SSE (Server-Sent Events) infrastructure for real-time run updates.
 *
 * ### Endpoints (INF-005: all under `/api/v1/`)
 * | Method | Path                            | Description                 |
 * |--------|---------------------------------|-----------------------------|
 * | `GET`  | `/api/v1/runs/:runId/events`    | SSE stream for a single run |
 *
 * ### Exports
 * - {@link emitRunEvent} — Broadcast an event to all listeners on a run.
 * - {@link runListeners} — `Map<runId, Set<res>>` — active SSE connections.
 */

import { Router } from "express";
import * as runRepo from "../database/repositories/runRepo.js";
import * as projectRepo from "../database/repositories/projectRepo.js";
import * as runLogRepo from "../database/repositories/runLogRepo.js";
import { signRunArtifacts, signArtifactUrl } from "../middleware/appSetup.js";
import { redis, redisSub, isRedisAvailable } from "../utils/redisClient.js";
import { formatLogLine } from "../utils/logFormatter.js";

const router = Router();

// ─── SSE: Real-time run events (INF-002: Redis pub/sub for multi-instance) ────
// Registry: runId → Set of SSE response objects (local to this process)
export const runListeners = new Map();

/** Redis channel prefix for run events. */
const CHANNEL_PREFIX = "sentri:run:";

/** Unique identifier for this server instance — used to skip self-echo from Redis. */
const _instanceId = `inst_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`;

/**
 * emitRunEvent(runId, type, payload)
 *
 * Broadcasts a Server-Sent Event to every client listening on this run.
 * Called from testRunner.js and crawler.js to push live updates.
 *
 * When Redis is available, the event is also published to a Redis channel
 * so other server instances can relay it to their connected SSE clients.
 * The local delivery happens first (instant), then Redis pub (async).
 * The message includes an `_origin` field so the subscriber can skip
 * messages published by this same instance (preventing duplicate delivery).
 */
export function emitRunEvent(runId, type, payload = {}) {
  const data = JSON.stringify({ type, ...payload });

  // ── Publish to Redis so other instances can relay ──────────────────────
  if (isRedisAvailable()) {
    // Include _origin so the subscriber on this instance can skip self-echo.
    const redisData = JSON.stringify({ type, ...payload, _origin: _instanceId });
    redis.publish(`${CHANNEL_PREFIX}${runId}`, redisData).catch(() => {});
  }

  // ── Deliver to local SSE listeners ────────────────────────────────────
  _deliverToLocal(runId, type, data);
}

/**
 * Deliver an SSE event to locally connected clients for a given run.
 * Separated from emitRunEvent so the Redis subscriber can call it too.
 *
 * @param {string} runId
 * @param {string} type  — event type (for "done" cleanup logic)
 * @param {string} data  — pre-serialised JSON string
 */
function _deliverToLocal(runId, type, data) {
  const listeners = runListeners.get(runId);
  if (!listeners || listeners.size === 0) {
    if (type === "done") runListeners.delete(runId);
    return;
  }
  // Snapshot the Set before iterating — res.end() triggers the "close"
  // handler which mutates the Set, causing concurrent-modification issues.
  const snapshot = [...listeners];
  for (const res of snapshot) {
    try {
        res.write(`data: ${data}\n\n`);
        if (type === "done") res.end();
    } catch { /* client gone */ }
  }
  if (type === "done") {
    runListeners.delete(runId);
    _unsubscribeFromRun(runId);
  }
}

// ─── Redis pub/sub subscriber (INF-002) ───────────────────────────────────────
// When a client connects to an SSE endpoint on this instance, we subscribe to
// the Redis channel for that run.  Events published by ANY instance are then
// relayed to the local SSE clients.  This is how instance A's run events reach
// instance B's connected browsers.

/** Set of runIds this instance is subscribed to (avoids duplicate subscribes). */
const _subscribedRuns = new Set();

/**
 * Subscribe to the Redis channel for a run (if not already subscribed).
 * @param {string} runId
 */
function _subscribeToRun(runId) {
  if (!isRedisAvailable() || !redisSub) return;
  if (_subscribedRuns.has(runId)) return;
  _subscribedRuns.add(runId);
  redisSub.subscribe(`${CHANNEL_PREFIX}${runId}`).catch(err => {
    console.warn(formatLogLine("warn", null, `[sse] Redis subscribe failed for ${runId}: ${err.message}`));
    _subscribedRuns.delete(runId);
  });
}

/**
 * Unsubscribe from the Redis channel for a run (when no local listeners remain).
 * @param {string} runId
 */
function _unsubscribeFromRun(runId) {
  if (!isRedisAvailable() || !redisSub) return;
  if (!_subscribedRuns.has(runId)) return;
  _subscribedRuns.delete(runId);
  redisSub.unsubscribe(`${CHANNEL_PREFIX}${runId}`).catch(() => {});
}

// Handle incoming messages from Redis — relay to local SSE clients.
// Skip messages that originated from this instance to prevent duplicate
// delivery (emitRunEvent already delivered locally before publishing).
if (redisSub) {
  redisSub.on("message", (channel, message) => {
    if (!channel.startsWith(CHANNEL_PREFIX)) return;
    const runId = channel.slice(CHANNEL_PREFIX.length);
    try {
      const parsed = JSON.parse(message);
      // Skip self-echo: this instance already delivered the event locally.
      if (parsed._origin === _instanceId) return;
      // Strip _origin before forwarding to clients — it's an internal field.
      const { _origin, ...clientPayload } = parsed;
      const clientData = JSON.stringify(clientPayload);
      _deliverToLocal(runId, parsed.type, clientData);
    } catch { /* malformed message — ignore */ }
  });
}

// GET /api/runs/:id/events  — SSE stream for a single run
// Auth is handled by the requireAuth middleware (mounted in index.js) which
// accepts both Authorization header and ?token= query param. The query param
// fallback exists because EventSource cannot send custom headers.
router.get("/runs/:runId/events", (req, res) => {
  const { runId } = req.params;
  const run = runRepo.getById(runId);
  if (!run) return res.status(404).json({ error: "not found" });

  // Verify the run belongs to the current workspace (ACL-001)
  const project = projectRepo.getByIdInWorkspace(run.projectId, req.workspaceId);
  if (!project) return res.status(404).json({ error: "project not found" });

  res.setHeader("Content-Type", "text/event-stream");
  res.setHeader("Cache-Control", "no-cache");
  res.setHeader("Connection", "keep-alive");
  res.setHeader("X-Accel-Buffering", "no"); // disable nginx buffering
  res.flushHeaders();

  // Send current snapshot immediately so the client has something to render.
  // Logs are hydrated from the run_logs table (ENH-008) rather than the
  // legacy runs.logs JSON column — getById() already does this, but we
  // re-fetch here to ensure we have the latest rows even if the run object
  // was cached before recent appends.
  const signedRun = signRunArtifacts({
    ...run,
    logs: runLogRepo.getMessagesByRunId(run.id),
  });
  res.write(`data: ${JSON.stringify({ type: "snapshot", run: signedRun })}\n\n`);

  // If already done (completed, failed, aborted, interrupted), send snapshot +
  // done event and close immediately. This handles SSE reconnections that
  // arrive after the run finished — including when the connection dropped
  // during the feedback loop (ECONNRESET) and the client reconnects post-completion.
  if (run.status !== "running") {
    // testsGenerated is not a DB column — derive from the persisted tests array
    const testsGenerated = run.testsGenerated ?? (Array.isArray(run.tests) ? run.tests.length : undefined);
    res.write(`data: ${JSON.stringify({
      type: "done",
      status: run.status,
      ...(run.passed != null && { passed: run.passed }),
      ...(run.failed != null && { failed: run.failed }),
      ...(run.total != null && { total: run.total }),
      ...(testsGenerated != null && { testsGenerated }),
    })}\n\n`);
    return res.end();
  }

  if (!runListeners.has(runId)) runListeners.set(runId, new Set());
  runListeners.get(runId).add(res);

  // Subscribe to the Redis channel so events from other instances are relayed.
  _subscribeToRun(runId);

  // Heartbeat — keeps the connection alive through proxies / load balancers.
  // 5 s interval: long AI feedback-loop calls (30–120 s) can cause aggressive
  // OS TCP stacks or proxies to reset the idle SSE connection. A shorter
  // heartbeat keeps the pipe warm without meaningful overhead.
  const heartbeat = setInterval(() => {
    try { res.write(": heartbeat\n\n"); } catch { clearInterval(heartbeat); }
  }, 5000);

  req.on("close", () => {
    clearInterval(heartbeat);
    runListeners.get(runId)?.delete(res);
    if (runListeners.get(runId)?.size === 0) {
      runListeners.delete(runId);
      // No more local listeners for this run — unsubscribe from Redis channel
      _unsubscribeFromRun(runId);
    }
  });
});

export default router;