diff --git a/apps/webapp/app/routes/admin.api.v1.runs-replication.start-monitor.ts b/apps/webapp/app/routes/admin.api.v1.runs-replication.start-monitor.ts deleted file mode 100644 index 510c363526..0000000000 --- a/apps/webapp/app/routes/admin.api.v1.runs-replication.start-monitor.ts +++ /dev/null @@ -1,59 +0,0 @@ -import { ActionFunctionArgs, json } from "@remix-run/server-runtime"; -import { z } from "zod"; -import { prisma } from "~/db.server"; -import { startTcpBufferMonitor } from "~/services/monitorTcpBuffers.server"; -import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server"; -import { getTcpMonitorGlobal, setTcpMonitorGlobal } from "~/services/runsReplicationGlobal.server"; - -const schema = z.object({ - intervalMs: z.number().min(1000).max(60_000).default(5_000), -}); - -export async function action({ request }: ActionFunctionArgs) { - // Next authenticate the request - const authenticationResult = await authenticateApiRequestWithPersonalAccessToken(request); - - if (!authenticationResult) { - return json({ error: "Invalid or Missing API key" }, { status: 401 }); - } - - const user = await prisma.user.findUnique({ - where: { - id: authenticationResult.userId, - }, - }); - - if (!user) { - return json({ error: "Invalid or Missing API key" }, { status: 401 }); - } - - if (!user.admin) { - return json({ error: "You must be an admin to perform this action" }, { status: 403 }); - } - - try { - const body = await request.json(); - const { intervalMs } = schema.parse(body); - - const globalMonitor = getTcpMonitorGlobal(); - - if (globalMonitor) { - return json( - { - error: "Tcp buffer monitor already running, you must stop it before starting a new one", - }, - { - status: 400, - } - ); - } - - setTcpMonitorGlobal(startTcpBufferMonitor(intervalMs)); - - return json({ - success: true, - }); - } catch (error) { - return json({ error: error instanceof Error ? error.message : error }, { status: 400 }); - } -} diff --git a/apps/webapp/app/routes/admin.api.v1.runs-replication.stop-monitor.ts b/apps/webapp/app/routes/admin.api.v1.runs-replication.stop-monitor.ts deleted file mode 100644 index fb2a3daecd..0000000000 --- a/apps/webapp/app/routes/admin.api.v1.runs-replication.stop-monitor.ts +++ /dev/null @@ -1,47 +0,0 @@ -import { ActionFunctionArgs, json } from "@remix-run/server-runtime"; -import { prisma } from "~/db.server"; -import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server"; -import { - getTcpMonitorGlobal, - unregisterTcpMonitorGlobal, -} from "~/services/runsReplicationGlobal.server"; - -export async function action({ request }: ActionFunctionArgs) { - // Next authenticate the request - const authenticationResult = await authenticateApiRequestWithPersonalAccessToken(request); - - if (!authenticationResult) { - return json({ error: "Invalid or Missing API key" }, { status: 401 }); - } - - const user = await prisma.user.findUnique({ - where: { - id: authenticationResult.userId, - }, - }); - - if (!user) { - return json({ error: "Invalid or Missing API key" }, { status: 401 }); - } - - if (!user.admin) { - return json({ error: "You must be an admin to perform this action" }, { status: 403 }); - } - - try { - const globalMonitor = getTcpMonitorGlobal(); - - if (!globalMonitor) { - return json({ error: "Tcp buffer monitor not running" }, { status: 400 }); - } - - clearInterval(globalMonitor); - unregisterTcpMonitorGlobal(); - - return json({ - success: true, - }); - } catch (error) { - return json({ error: error instanceof Error ? error.message : error }, { status: 400 }); - } -} diff --git a/apps/webapp/app/services/monitorTcpBuffers.server.ts b/apps/webapp/app/services/monitorTcpBuffers.server.ts deleted file mode 100644 index 418ff9e4c8..0000000000 --- a/apps/webapp/app/services/monitorTcpBuffers.server.ts +++ /dev/null @@ -1,57 +0,0 @@ -// monitorTcpBuffers.ts -import fs from "fs/promises"; -import os from "os"; -import { logger } from "./logger.server"; - -/** - * Parse /proc/net/sockstat and /proc/sys/net/* every `intervalMs` - * and log the numbers. You can pivot these logs into CloudWatch - * metrics with a filter pattern if you like. - */ -export function startTcpBufferMonitor(intervalMs = 5_000) { - async function sampleOnce() { - try { - const [sockstat, wmemMax, tcpMem] = await Promise.all([ - fs.readFile("/proc/net/sockstat", "utf8"), - fs.readFile("/proc/sys/net/core/wmem_max", "utf8"), - fs.readFile("/proc/sys/net/ipv4/tcp_mem", "utf8"), - ]); - - logger.debug("tcp-buffer-monitor", { - sockstat, - wmemMax, - tcpMem, - }); - - // /proc/net/sockstat has lines like: - // TCP: inuse 5 orphan 0 tw 0 alloc 6 mem 409 - const tcpLine = sockstat.split("\n").find((l) => l.startsWith("TCP:")) ?? ""; - const fields = tcpLine.trim().split(/\s+/); - const inUse = Number(fields[2]); // open sockets - const alloc = Number(fields[8]); // total sockets with buffers - const memPages = Number(fields[10]); // pages (4 kB each) - const memBytes = memPages * 4096; - - const wmemMaxBytes = Number(wmemMax.trim()); - const [low, pressure, high] = tcpMem - .trim() - .split(/\s+/) - .map((n) => Number(n) * 4096); // pages → bytes - - logger.debug("tcp-buffer-monitor", { - t: Date.now(), - host: os.hostname(), - sockets_in_use: inUse, - sockets_alloc: alloc, - tcp_mem_bytes: memBytes, - tcp_mem_high: high, - wmem_max: wmemMaxBytes, - }); - } catch (err) { - // Log and keep going; most errors are “file disappeared for a moment” - console.error("tcp-buffer-monitor error", err); - } - } - - return setInterval(sampleOnce, intervalMs); -} diff --git a/apps/webapp/app/services/runsReplicationService.server.ts b/apps/webapp/app/services/runsReplicationService.server.ts index 90af5768fd..aaad687717 100644 --- a/apps/webapp/app/services/runsReplicationService.server.ts +++ b/apps/webapp/app/services/runsReplicationService.server.ts @@ -141,7 +141,7 @@ export class RunsReplicationService { }); this._replicationClient.events.on("start", () => { - this.logger.debug("Replication client started"); + this.logger.info("Replication client started"); }); this._replicationClient.events.on("acknowledge", ({ lsn }) => { @@ -149,7 +149,7 @@ export class RunsReplicationService { }); this._replicationClient.events.on("leaderElection", (isLeader) => { - this.logger.debug("Leader election", { isLeader }); + this.logger.info("Leader election", { isLeader }); }); } @@ -360,7 +360,7 @@ export class RunsReplicationService { }) .end(); - this.logger.debug("handle_transaction", { + this.logger.info("handle_transaction", { transaction: { xid: transaction.xid, commitLsn: transaction.commitLsn, @@ -395,7 +395,7 @@ export class RunsReplicationService { this._lastAcknowledgedAt = now; this._lastAcknowledgedLsn = this._latestCommitEndLsn; - this.logger.debug("acknowledge_latest_transaction", { + this.logger.info("acknowledge_latest_transaction", { commitEndLsn: this._latestCommitEndLsn, lastAcknowledgedAt: this._lastAcknowledgedAt, }); @@ -764,7 +764,7 @@ export class ConcurrentFlushScheduler { span.setAttribute("concurrency_pending_count", this.concurrencyLimiter.pendingCount); span.setAttribute("concurrency_concurrency", this.concurrencyLimiter.concurrency); - this.logger.debug("flush_next_batch", { + this.logger.info("flush_next_batch", { batchId, batchSize: batch.length, concurrencyActiveCount: this.concurrencyLimiter.activeCount, @@ -796,7 +796,7 @@ export class ConcurrentFlushScheduler { this.failedBatchCount++; } else { - this.logger.debug("flush_batch_complete", { + this.logger.info("flush_batch_complete", { totalBatches: 1, successfulBatches: 1, failedBatches: 0,