/**
* @module database/adapters/postgres-adapter
* @description PostgreSQL adapter implementing the db-adapter interface.
*
* Uses the `pg` package with `pg-native` for synchronous query execution.
* The existing repository layer calls `db.prepare().run()` synchronously
* (better-sqlite3 API), so this adapter provides the same blocking semantics.
*
* ### Prerequisites
* ```bash
* npm install pg pg-native
* ```
* `pg-native` provides libpq C bindings with a `querySync` method.
* If `pg-native` is not installed, the adapter falls back to `deasync`
* (`npm install deasync`) to block the event loop on async pool queries.
*
* ### SQL compatibility
* Automatically translates common SQLite-isms to PostgreSQL:
* - `@param` named bindings → `$N` positional parameters
* - `INTEGER PRIMARY KEY AUTOINCREMENT` → `SERIAL PRIMARY KEY`
* - `datetime('now')` → `NOW()`
* - `INSERT OR IGNORE` → `INSERT ... ON CONFLICT DO NOTHING`
* - `INSERT OR REPLACE` → upsert via `ON CONFLICT DO UPDATE SET`
* - `LIKE` → `ILIKE` (case-insensitive matching)
* - `PRAGMA table_info(t)` → `information_schema.columns` query
*
* ### Column name case mapping
* PostgreSQL folds unquoted identifiers to lowercase (`passwordHash` → `passwordhash`).
* The adapter automatically remaps lowercase column names back to camelCase on
* every returned row so the application code works identically on both backends.
*
* @exports createPostgresAdapter
*/
import pg from "pg";
import { AsyncLocalStorage } from "node:async_hooks";
import { createRequire } from "module";
import { formatLogLine } from "../../utils/logFormatter.js";
const { Pool } = pg;
const _require = createRequire(import.meta.url);
// Try to load pg-native for synchronous query support
let PgNative = null;
try {
PgNative = _require("pg-native");
} catch {
// pg-native not installed — will try deasync fallback
}
// Try to load deasync for async→sync bridge
let deasyncLib = null;
if (!PgNative) {
try {
deasyncLib = _require("deasync");
} catch {
// Neither available — will throw on createPostgresAdapter()
}
}
// ─── SQL dialect translation ──────────────────────────────────────────────────
/**
* Mask single-quoted string literals in SQL so regex replacements don't
* corrupt values inside strings. Returns the masked SQL and a restore
* function that puts the original literals back.
*
* @param {string} sql
* @returns {{ masked: string, restore: Function }}
*/
function maskStringLiterals(sql) {
const literals = [];
const masked = sql.replace(/'(?:[^']|'')*'/g, (match) => {
const idx = literals.length;
literals.push(match);
return `__STRLIT_${idx}__`;
});
return {
masked,
restore(s) {
return s.replace(/__STRLIT_(\d+)__/g, (_m, i) => literals[Number(i)]);
},
};
}
/**
* Translate a single SQL statement from SQLite dialect to PostgreSQL.
*
* String literals are masked before replacements and restored afterward
* so that values like `'I LIKE cats'` are never corrupted to `'I ILIKE cats'`.
*
* @param {string} stmt — a single SQL statement (no trailing semicolons expected)
* @returns {string} PostgreSQL-compatible SQL statement
*/
function translateSingleStatement(stmt) {
// datetime('now') → NOW() — must run BEFORE maskStringLiterals because
// the masker treats 'now' as a string literal and replaces it with a
// placeholder, preventing the regex from matching.
let pre = stmt.replace(/datetime\('now'\)/gi, "NOW()");
const { masked, restore } = maskStringLiterals(pre);
let out = masked;
// INTEGER PRIMARY KEY AUTOINCREMENT → SERIAL PRIMARY KEY
out = out.replace(/INTEGER\s+PRIMARY\s+KEY\s+AUTOINCREMENT/gi, "SERIAL PRIMARY KEY");
// INSERT OR IGNORE INTO → INSERT INTO ... ON CONFLICT DO NOTHING
if (/INSERT\s+OR\s+IGNORE/i.test(out)) {
out = out.replace(/INSERT\s+OR\s+IGNORE\s+INTO/gi, "INSERT INTO");
if (!/ON\s+CONFLICT/i.test(out)) {
out = out.trimEnd() + " ON CONFLICT DO NOTHING";
}
}
// INSERT OR REPLACE INTO → INSERT INTO ... ON CONFLICT DO UPDATE SET
if (/INSERT\s+OR\s+REPLACE\s+INTO/i.test(out)) {
out = out.replace(/INSERT\s+OR\s+REPLACE\s+INTO/gi, "INSERT INTO");
if (!/ON\s+CONFLICT/i.test(out)) {
const match = out.match(/INSERT\s+INTO\s+(\w+)\s*\(([^)]+)\)/i);
if (match) {
const cols = match[2].split(",").map(c => c.trim());
const pk = cols[0];
const updateCols = cols.slice(1);
if (updateCols.length > 0) {
const setClauses = updateCols.map(c => `${c} = EXCLUDED.${c}`).join(", ");
out = out.trimEnd() + ` ON CONFLICT(${pk}) DO UPDATE SET ${setClauses}`;
} else {
out = out.trimEnd() + ` ON CONFLICT(${pk}) DO NOTHING`;
}
}
}
}
// SQLite LIKE is case-insensitive by default; PostgreSQL LIKE is case-sensitive.
// Case-insensitive flag so both `LIKE` and `like` are translated.
out = out.replace(/\bLIKE\b/gi, "ILIKE");
return restore(out);
}
/**
* Convert SQLite-flavoured SQL to PostgreSQL.
*
* Splits multi-statement SQL on semicolons (respecting string literals),
* translates each statement individually, and rejoins. This ensures that
* INSERT OR IGNORE / INSERT OR REPLACE clauses each receive their own
* ON CONFLICT suffix rather than only the last statement.
*
* @param {string} sql
* @returns {string} PostgreSQL-compatible SQL
*/
export function translateSql(sql) {
// Split on semicolons that are NOT inside single-quoted string literals
// and NOT inside -- line comments.
// This handles migration files with multiple statements and SQL comments
// that may contain semicolons (e.g. `-- ISO 8601; checked on every ...`).
const statements = [];
let current = "";
let inString = false;
let inLineComment = false;
for (let i = 0; i < sql.length; i++) {
const ch = sql[i];
// End of line comment on newline
if (inLineComment) {
current += ch;
if (ch === "\n") inLineComment = false;
continue;
}
// Detect start of -- line comment (only outside strings)
if (!inString && ch === "-" && i + 1 < sql.length && sql[i + 1] === "-") {
inLineComment = true;
current += ch;
continue;
}
if (ch === "'" && !inString) {
inString = true;
current += ch;
} else if (ch === "'" && inString) {
// Handle escaped single quotes ('')
if (i + 1 < sql.length && sql[i + 1] === "'") {
current += "''";
i++;
} else {
inString = false;
current += ch;
}
} else if (ch === ";" && !inString) {
const trimmed = current.trim();
if (trimmed) statements.push(trimmed);
current = "";
} else {
current += ch;
}
}
const lastTrimmed = current.trim();
if (lastTrimmed) statements.push(lastTrimmed);
if (statements.length === 0) return sql;
// Translate each statement individually and rejoin with semicolons.
return statements.map(s => translateSingleStatement(s)).join(";\n") + ";";
}
/**
* Convert `@name` named parameters to `$N` positional parameters.
*
* String literals are masked before replacement so that email addresses
* or other values containing `@` inside quoted strings are not treated
* as parameter placeholders (e.g. `'user@example.com'` stays intact).
*
* @param {string} sql — SQL with `@name` placeholders
* @param {Object} namedParams — `{ name: value, ... }`
* @returns {{ sql: string, values: any[] }}
*/
function namedToPositional(sql, namedParams) {
const { masked, restore } = maskStringLiterals(sql);
const paramIndex = {};
const values = [];
let idx = 0;
const translated = masked.replace(/@(\w+)/g, (_match, name) => {
if (!(name in paramIndex)) {
idx++;
paramIndex[name] = idx;
values.push(namedParams[name] !== undefined ? namedParams[name] : null);
}
return `$${paramIndex[name]}`;
});
return { sql: restore(translated), values };
}
/**
* Convert `?` positional placeholders to `$N` numbered placeholders.
*
* String literals are masked before replacement so that `?` inside
* quoted strings (e.g. `'What?'`) is not treated as a placeholder.
*
* @param {string} sql
* @returns {string}
*/
function questionToNumbered(sql) {
const { masked, restore } = maskStringLiterals(sql);
let idx = 0;
return restore(masked.replace(/\?/g, () => `$${++idx}`));
}
/**
* Determine if args represent a named-params object (vs positional args).
*
* @param {any[]} args
* @returns {boolean}
*/
function isNamedParams(args) {
return args.length === 1
&& typeof args[0] === "object"
&& args[0] !== null
&& !Array.isArray(args[0]);
}
// ─── Column name case mapping ─────────────────────────────────────────────────
// PostgreSQL folds unquoted identifiers to lowercase. The migration SQL uses
// camelCase column names (e.g. `passwordHash`) which PostgreSQL stores as
// `passwordhash`. When rows are returned, the keys are all lowercase.
// The application code expects camelCase, so we remap on every row returned.
// This is cheaper than quoting every identifier in every SQL statement.
/**
* Build a lowercase → camelCase lookup from a list of camelCase names.
* Only entries where lowercase differs from the original are included.
* @param {string[]} names
* @returns {Object<string, string>}
*/
function buildColumnMap(names) {
const map = {};
for (const n of names) {
const lower = n.toLowerCase();
if (lower !== n) map[lower] = n;
}
return map;
}
/** All camelCase column names used across the schema. */
const _COL_MAP = buildColumnMap([
// users
"passwordHash", "createdAt", "updatedAt", "emailVerified",
// oauth_ids
"userId",
// projects
"deletedAt", "workspaceId",
// tests
"projectId", "playwrightCode", "playwrightCodePrev", "sourceUrl", "pageTitle",
"lastResult", "lastRunAt", "qualityScore", "isJourneyTest", "journeyType",
"assertionEnhanced", "reviewStatus", "reviewedAt", "promptVersion", "modelUsed",
"linkedIssueKey", "generatedFrom", "isApiTest", "codeRegeneratedAt",
"aiFixAppliedAt", "codeVersion",
// runs
"startedAt", "finishedAt", "errorCategory", "pagesFound", "parallelWorkers",
"tracePath", "videoPath", "videoSegments", "testQueue", "generateInput",
"promptAudit", "pipelineStats", "feedbackLoop", "currentStep", "rateLimitError",
"qualityAnalytics",
// activities
"projectName", "testId", "testName", "userName",
// healing_history
"strategyIndex", "succeededAt", "failCount", "strategyVersion",
// password_reset_tokens & verification_tokens
"expiresAt", "usedAt",
// webhook_tokens (migration 002)
"tokenHash", "lastUsedAt",
// schedules (migration 002)
"cronExpr", "nextRunAt",
// run_logs (migration 002)
"runId",
// schema_migrations
"appliedAt", "durationMs",
// information_schema queries
"column_name", "data_type",
// notification_settings (FEA-001)
"teamsWebhookUrl", "emailRecipients", "webhookUrl",
// workspaces (ACL-001)
"ownerId",
// workspace_members (ACL-001)
"joinedAt",
]);
/**
* Remap lowercase PostgreSQL column names to camelCase on a single row object.
* Keys that are already camelCase or not in the map are left unchanged.
* @param {Object} row
* @returns {Object}
*/
function remapRow(row) {
if (!row || typeof row !== "object") return row;
const out = {};
for (const key of Object.keys(row)) {
out[_COL_MAP[key] || key] = row[key];
}
return out;
}
/**
* Remap all rows in an array.
* @param {Object[]} rows
* @returns {Object[]}
*/
function remapRows(rows) {
if (!rows || rows.length === 0) return rows;
// Fast path: check if the first row has any lowercase keys that need mapping
const firstKeys = Object.keys(rows[0]);
const needsRemap = firstKeys.some(k => k in _COL_MAP);
if (!needsRemap) return rows;
return rows.map(remapRow);
}
// ─── Adapter factory ──────────────────────────────────────────────────────────
/**
* Create a PostgreSQL adapter instance.
*
* @param {Object} [opts]
* @param {string} [opts.connectionString] — PostgreSQL connection URL.
* Defaults to `process.env.DATABASE_URL`.
* @param {number} [opts.poolSize] — Max pool connections (default 10).
* @returns {Object} Adapter conforming to the db-adapter interface.
* @throws {Error} If `DATABASE_URL` is not set.
* @throws {Error} If neither `pg-native` nor `deasync` is installed.
*/
export function createPostgresAdapter(opts = {}) {
const connectionString = opts.connectionString || process.env.DATABASE_URL;
if (!connectionString) {
throw new Error("[postgres-adapter] DATABASE_URL is required");
}
if (!PgNative && !deasyncLib) {
throw new Error(
"[postgres-adapter] PostgreSQL requires either `pg-native` (recommended) " +
"or `deasync` for synchronous query execution. Install one:\n" +
" npm install pg-native # recommended — uses libpq C bindings\n" +
" npm install deasync # fallback — blocks event loop"
);
}
// ── pg-native synchronous path ────────────────────────────────────────
let nativeClient = null;
if (PgNative) {
nativeClient = new PgNative();
nativeClient.connectSync(connectionString);
console.log(formatLogLine("info", null, "[postgres-adapter] Connected via pg-native (synchronous)"));
}
/**
* Reconnect the pg-native client if the connection was lost.
* Called from query() when querySync throws a connection error.
* @returns {boolean} true if reconnection succeeded.
*/
function reconnectNativeClient() {
if (!nativeClient || !PgNative) return false;
try {
console.warn(formatLogLine("warn", null, "[postgres-adapter] Connection lost — attempting reconnect…"));
// Create a fresh client — pg-native does not support reconnecting
// an existing client after the underlying libpq connection is closed.
nativeClient = new PgNative();
nativeClient.connectSync(connectionString);
console.log(formatLogLine("info", null, "[postgres-adapter] Reconnected via pg-native"));
return true;
} catch (err) {
console.error(formatLogLine("error", null, `[postgres-adapter] Reconnect failed: ${err.message}`));
return false;
}
}
// ── deasync fallback path ─────────────────────────────────────────────
const maxPool = opts.poolSize || parseInt(process.env.PG_POOL_SIZE, 10) || 10;
const pool = !nativeClient ? new Pool({
connectionString,
max: maxPool,
idleTimeoutMillis: 30000,
connectionTimeoutMillis: 5000,
}) : null;
if (pool) {
console.log(formatLogLine("info", null, `[postgres-adapter] Connection pool created via deasync (max ${maxPool})`));
}
// When a deasync transaction is active, this Map holds the txQuery
// function keyed by a unique transaction token. Each call to
// transaction() creates a fresh token and stores it in AsyncLocalStorage
// so it is scoped to the current async execution context. query()
// reads the token from AsyncLocalStorage and looks up the corresponding
// override. This prevents concurrent requests (whose event-loop turns
// are interleaved by deasyncLib.loopWhile) from accidentally routing
// their queries through another transaction's dedicated client.
const _txQueryOverrides = new Map();
const _txStorage = new AsyncLocalStorage();
/**
* Detect whether a SQL statement is a DML command (INSERT/UPDATE/DELETE)
* that does not already have a RETURNING clause. pg-native's querySync
* only returns rows, so DML without RETURNING returns [] and we lose
* the affected row count. Appending RETURNING * makes pg-native return
* the affected rows so rows.length gives the correct count.
*
* @param {string} sql
* @returns {string} SQL with RETURNING * appended if needed.
*/
function ensureReturning(sql) {
if (!nativeClient) return sql;
const trimmed = sql.trimStart();
const isDml = /^(INSERT|UPDATE|DELETE)\b/i.test(trimmed);
if (!isDml) return sql;
if (/\bRETURNING\b/i.test(sql)) return sql;
return sql.trimEnd().replace(/;?\s*$/, "") + " RETURNING *";
}
/**
* Execute a query synchronously.
*
* @param {string} sql
* @param {any[]} [values]
* @returns {{ rows: Object[], rowCount: number }}
*/
function query(sql, values = []) {
if (nativeClient) {
const execSql = ensureReturning(sql);
try {
const rows = nativeClient.querySync(execSql, values);
return { rows: remapRows(rows), rowCount: rows.length };
} catch (err) {
// Attempt one reconnect on connection-level errors (e.g. PostgreSQL
// restarted, TCP timeout). If the reconnect succeeds, retry the query.
const isConnectionError = /connection|socket|EPIPE|ECONNRESET|terminating/i.test(err.message);
if (isConnectionError && reconnectNativeClient()) {
const rows = nativeClient.querySync(execSql, values);
return { rows: remapRows(rows), rowCount: rows.length };
}
throw err;
}
}
// If a deasync transaction is active on THIS async context, route the
// query through the dedicated transaction client. AsyncLocalStorage
// ensures each request's transaction token is isolated even when
// deasyncLib.loopWhile() interleaves event-loop turns from other requests.
const txToken = _txStorage.getStore();
if (txToken && _txQueryOverrides.has(txToken)) {
return _txQueryOverrides.get(txToken)(sql, values);
}
// deasync fallback: run async query and block until it resolves
let done = false;
let result = null;
let error = null;
pool.query(sql, values)
.then(r => { result = r; done = true; })
.catch(e => { error = e; done = true; });
deasyncLib.loopWhile(() => !done);
if (error) throw error;
return { rows: remapRows(result.rows), rowCount: result.rowCount || 0 };
}
/**
* Handle PRAGMA table_info() calls by querying information_schema.
*
* @param {string} sql
* @returns {Object} `{ isPragma: boolean, rows: Object[]|undefined }`
*/
function handlePragmaTableInfo(sql) {
const match = sql.match(/PRAGMA\s+table_info\((\w+)\)/i);
if (!match) return { isPragma: false };
const tableName = match[1];
const pgSql = `SELECT column_name AS name, data_type AS type
FROM information_schema.columns
WHERE table_name = $1 ORDER BY ordinal_position`;
const result = query(pgSql, [tableName]);
return { isPragma: true, rows: result.rows };
}
return {
/** @type {"postgres"} */
dialect: "postgres",
prepare(rawSql) {
// Intercept PRAGMA table_info() calls
const pragmaResult = handlePragmaTableInfo(rawSql);
if (pragmaResult.isPragma) {
return {
run() { return { changes: 0 }; },
get() { return pragmaResult.rows[0]; },
all() { return pragmaResult.rows; },
};
}
const pgSql = translateSql(rawSql);
return {
run(...args) {
let sql, values;
if (isNamedParams(args)) {
({ sql, values } = namedToPositional(pgSql, args[0]));
} else {
sql = questionToNumbered(pgSql);
values = args;
}
const result = query(sql, values);
if (result.rows && result.rows.length > 0) {
return { changes: result.rowCount || 0, ...result.rows[0] };
}
return { changes: result.rowCount || 0 };
},
get(...args) {
let sql, values;
if (isNamedParams(args)) {
({ sql, values } = namedToPositional(pgSql, args[0]));
} else {
sql = questionToNumbered(pgSql);
values = args;
}
const result = query(sql, values);
return result.rows[0] || undefined;
},
all(...args) {
let sql, values;
if (isNamedParams(args)) {
({ sql, values } = namedToPositional(pgSql, args[0]));
} else {
sql = questionToNumbered(pgSql);
values = args;
}
const result = query(sql, values);
return result.rows;
},
};
},
exec(sql) {
const pgSql = translateSql(sql);
// Execute each statement individually — PostgreSQL's simple query
// protocol handles multi-statement strings, but some DDL combinations
// (e.g. CREATE TABLE + CREATE INDEX) can fail when sent as one query.
// Split on the semicolons that translateSql() uses as delimiters.
const stmts = pgSql.split(/;\s*\n/).map(s => s.replace(/;\s*$/, "").trim()).filter(Boolean);
for (const stmt of stmts) {
query(stmt);
}
},
transaction(fn) {
return function (...args) {
if (nativeClient) {
// pg-native uses a single connection — BEGIN/COMMIT are on the same client.
query("BEGIN");
try {
const result = fn(...args);
query("COMMIT");
return result;
} catch (err) {
query("ROLLBACK");
throw err;
}
}
// Pool path: acquire a dedicated client so all statements within the
// transaction run on the same connection. pool.query() checks out and
// releases a connection per call, which would break transactional
// semantics (BEGIN on conn A, body on conn B, COMMIT on conn C).
let done = false;
let client = null;
let clientError = null;
pool.connect()
.then(c => { client = c; done = true; })
.catch(e => { clientError = e; done = true; });
deasyncLib.loopWhile(() => !done);
if (clientError) throw clientError;
/** Run a query on the dedicated transaction client. */
function txQuery(sql, values = []) {
let txDone = false;
let txResult = null;
let txError = null;
client.query(sql, values)
.then(r => { txResult = r; txDone = true; })
.catch(e => { txError = e; txDone = true; });
deasyncLib.loopWhile(() => !txDone);
if (txError) throw txError;
return { rows: remapRows(txResult.rows), rowCount: txResult.rowCount || 0 };
}
txQuery("BEGIN");
// Redirect all query() calls inside fn() to the dedicated
// transaction client so that db.prepare().run() etc. execute
// within the same BEGIN/COMMIT block.
// Use a unique Symbol token stored in AsyncLocalStorage so
// concurrent transactions (interleaved by deasyncLib.loopWhile
// pumping the event loop) each route to their own client.
const txToken = Symbol("tx");
_txQueryOverrides.set(txToken, txQuery);
try {
// _txStorage.run() scopes the token to this async context,
// so query() in other request handlers won't see it.
const result = _txStorage.run(txToken, () => fn(...args));
txQuery("COMMIT");
return result;
} catch (err) {
try { txQuery("ROLLBACK"); } catch { /* best-effort rollback */ }
throw err;
} finally {
_txQueryOverrides.delete(txToken);
client.release();
}
};
},
pragma(_str) {
// No-op for PostgreSQL
},
async close() {
if (nativeClient) {
try {
nativeClient.end();
console.log(formatLogLine("info", null, "[postgres-adapter] Native client closed"));
} catch (err) {
console.warn(formatLogLine("warn", null, `[postgres-adapter] Close error: ${err.message}`));
}
}
if (pool) {
try {
await pool.end();
console.log(formatLogLine("info", null, "[postgres-adapter] Connection pool closed"));
} catch (err) {
console.warn(formatLogLine("warn", null, `[postgres-adapter] Pool close error: ${err.message}`));
}
}
},
};
}