Skip to content

πŸ› οΈ feat: Escrow Reconciliation Service keep trustless_work_escrows in Sync with Stellar BlockchainΒ #309

@sotoJ24

Description

@sotoJ24

🧭 Issue Type

Architecture / Backend Feature/ Maintainer issue

Relates to: Contributing Guide Β· Git Guidelines


πŸ“‹ Issue Summary

SafeTrust relies on TrustlessWork webhooks to receive real-time escrow state changes from the Stellar blockchain. However, webhooks are not guaranteed β€” network timeouts, service restarts, or delivery failures can leave public.trustless_work_escrows out of sync with the actual on-chain state. This story defines the architecture for a periodic reconciliation service that acts as the safety net: a Hasura cron trigger that batch-fetches current escrow state from the TrustlessWork Indexer API and reconciles it against our local database.


πŸ›οΈ Architecture Context

The Problem

Stellar Blockchain (source of truth)
        β”‚
        β”‚ webhook (can fail / be missed)
        β–Ό
TrustlessWork API
        β”‚
        β”‚ ← THIS LINK CAN BREAK
        β–Ό
public.trustless_work_escrows   ← may be stale
        β”‚
        β–Ό
Frontend (GraphQL subscriptions) ← shows wrong state to users

When a webhook is missed, a user's escrow status, balance, or milestone state in the SafeTrust UI becomes incorrect β€” with no automatic correction mechanism.

The Solution

A Hasura Cron Trigger running every 15 minutes calls a Node.js handler endpoint (/reconciliation/sync-escrows) that:

  1. Fetches all contract_id values from public.trustless_work_escrows
  2. Batches them in chunks of 50
  3. Calls GET /helper/get-escrows-by-contract-ids on the TrustlessWork API for each batch
  4. Compares the API response against local DB state
  5. Upserts any records where status, balance, or escrow_metadata differ
  6. Logs a summary of changes to trustless_work_webhook_events with event_type = 'reconciliation.sync'

πŸ—„οΈ Database Impact

No new tables required.

The existing schema fully supports this feature:

Table Role in Reconciliation
public.trustless_work_escrows Target of upsert β€” the table being reconciled
public.trustless_work_webhook_events Receives a reconciliation audit log entry per run
public.escrow_milestones Updated if milestone status differs from API response

Fields reconciled on trustless_work_escrows

-- Fields compared and potentially updated per run:
status          -- Most critical: 'pending' β†’ 'active' β†’ 'completed' β†’ 'disputed'
balance         -- Current on-chain balance (DECIMAL)
escrow_metadata -- Full JSONB blob from TrustlessWork API response
updated_at      -- Set to NOW() on any update

πŸ› οΈ Implementation Specification

1. Hasura Cron Trigger

File: metadata/base/cron_triggers.yaml

- name: escrow_reconciliation_sync
  webhook: "{{WEBHOOK_BASE_URL}}/reconciliation/sync-escrows"
  schedule: "*/15 * * * *"   # every 15 minutes
  include_in_metadata: true
  payload:
    tenant_id: "safetrust"
  retry_conf:
    num_retries: 3
    timeout_seconds: 60
    tolerance_seconds: 21600
  comment: "Periodic reconciliation of trustless_work_escrows against TrustlessWork Indexer API"

2. Handler Endpoint

New file: webhook/reconciliation/sync-escrows.js

const express = require('express');
const router = express.Router();
const { Pool } = require('pg');
const axios = require('axios');

const pool = new Pool({
connectionString: process.env.PG_DATABASE_URL,
});

const TRUSTLESSWORK_API_URL = process.env.TRUSTLESSWORK_API_URL;
const TRUSTLESSWORK_API_KEY = process.env.TRUSTLESSWORK_API_KEY;
const BATCH_SIZE = 50;

router.post('/sync-escrows', async (req, res) => {
const startedAt = new Date();
let totalUpdated = 0;
let totalFailed = 0;

try {
// Step 1: Fetch all contract_ids from local DB
const { rows } = await pool.query(
SELECT contract_id FROM public.trustless_work_escrows WHERE tenant_id = $1 AND contract_id IS NOT NULL,
['safetrust']
);

const contractIds = rows.map(r => r.contract_id);
if (contractIds.length === 0) {
  return res.json({ message: 'No escrows to reconcile', updated: 0 });
}

// Step 2: Batch into chunks of BATCH_SIZE
const batches = [];
for (let i = 0; i < contractIds.length; i += BATCH_SIZE) {
  batches.push(contractIds.slice(i, i + BATCH_SIZE));
}

// Step 3: Process each batch
for (const batch of batches) {
  try {
    const response = await axios.get(
      `${TRUSTLESSWORK_API_URL}/helper/get-escrows-by-contract-ids`,
      {
        params: { contractIds: batch.join(',') },
        headers: { 'x-api-key': TRUSTLESSWORK_API_KEY },
        timeout: 30000,
      }
    );

    const apiEscrows = response.data?.escrows || [];

    // Step 4: Upsert each escrow if state differs
    for (const apiEscrow of apiEscrows) {
      try {
        const result = await pool.query(
          `UPDATE public.trustless_work_escrows
           SET
             status         = $1,
             balance        = $2,
             escrow_metadata = $3,
             updated_at     = NOW()
           WHERE contract_id = $4
             AND (
               status          IS DISTINCT FROM $1 OR
               balance         IS DISTINCT FROM $2 OR
               escrow_metadata IS DISTINCT FROM $3
             )
           RETURNING id`,
          [
            apiEscrow.status,
            apiEscrow.balance,
            JSON.stringify(apiEscrow),
            apiEscrow.contractId,
          ]
        );
        if (result.rowCount > 0) totalUpdated++;
      } catch (rowErr) {
        console.error(`Failed to update ${apiEscrow.contractId}:`, rowErr.message);
        totalFailed++;
      }
    }
  } catch (batchErr) {
    console.error('Batch fetch failed:', batchErr.message);
    totalFailed += batch.length;
  }
}

// Step 5: Write reconciliation audit log
await pool.query(
  `INSERT INTO public.trustless_work_webhook_events
     (contract_id, event_type, payload, processed, processed_at, tenant_id)
   VALUES
     (NULL, 'reconciliation.sync', $1, TRUE, NOW(), 'safetrust')`,
  [
    JSON.stringify({
      started_at: startedAt,
      completed_at: new Date(),
      total_contracts: contractIds.length,
      total_updated: totalUpdated,
      total_failed: totalFailed,
    }),
  ]
);

res.json({
  message: 'Reconciliation complete',
  total_contracts: contractIds.length,
  total_updated: totalUpdated,
  total_failed: totalFailed,
});

} catch (err) {
console.error('Reconciliation sync failed:', err);
res.status(500).json({ error: 'Reconciliation failed', details: err.message });
}
});

module.exports = router;


3. Register the Route

File: webhook/index.js β€” add one line:

const reconciliationRoutes = require('./reconciliation/sync-escrows');
app.use('/reconciliation', reconciliationRoutes);

4. Required Environment Variables

Add to .env.example:

# TrustlessWork API
TRUSTLESSWORK_API_URL=https://dev.api.trustlesswork.com   # or mainnet
TRUSTLESSWORK_API_KEY=your_api_key_here

βœ… Acceptance Criteria

  • Hasura cron trigger escrow_reconciliation_sync is defined in metadata/base/cron_triggers.yaml and fires every 15 minutes
  • POST /reconciliation/sync-escrows endpoint exists and responds with 200 on success
  • Endpoint fetches all contract_id values from public.trustless_work_escrows where tenant_id = 'safetrust'
  • Contract IDs are batched in chunks of 50 before calling the TrustlessWork API
  • Only records where status, balance, or escrow_metadata differ from the API response are updated (IS DISTINCT FROM)
  • updated_at is set to NOW() on every updated row
  • A reconciliation.sync event is inserted into public.trustless_work_webhook_events after every run, with payload containing total_contracts, total_updated, total_failed, started_at, completed_at
  • If the TrustlessWork API returns an error for a batch, that batch is counted as total_failed and the run continues (no full abort)
  • TRUSTLESSWORK_API_URL and TRUSTLESSWORK_API_KEY are documented in .env.example
  • The route is registered in webhook/index.js under /reconciliation
  • Karate test covers: successful sync with changes, successful sync with no changes, batch API failure resilience

πŸ”— Relationships

  • Depends on: public.trustless_work_escrows (canonical escrow table)
  • Writes to: public.trustless_work_webhook_events (audit log)
  • Triggered by: Hasura cron (metadata/base/cron_triggers.yaml)
  • Complements: TrustlessWork webhook handler (inbound real-time events)
  • TrustlessWork endpoint used: GET /helper/get-escrows-by-contract-ids

πŸ“Ž References

# BMAD Architecture Story ## feat: Escrow Reconciliation Service β€” Keep `trustless_work_escrows` in Sync with Stellar Blockchain

🧭 Story Type

Architecture / Backend Feature
Relates to: Contributing Guide Β· Git Guidelines


πŸ“‹ Issue Summary

SafeTrust relies on TrustlessWork webhooks to receive real-time escrow state changes from the Stellar blockchain. However, webhooks are not guaranteed β€” network timeouts, service restarts, or delivery failures can leave public.trustless_work_escrows out of sync with the actual on-chain state. This story defines the architecture for a periodic reconciliation service that acts as the safety net: a Hasura cron trigger that batch-fetches current escrow state from the TrustlessWork Indexer API and reconciles it against our local database.


πŸ›οΈ Architecture Context

The Problem

Stellar Blockchain (source of truth)
        β”‚
        β”‚ webhook (can fail / be missed)
        β–Ό
TrustlessWork API
        β”‚
        β”‚ ← THIS LINK CAN BREAK
        β–Ό
public.trustless_work_escrows   ← may be stale
        β”‚
        β–Ό
Frontend (GraphQL subscriptions) ← shows wrong state to users

When a webhook is missed, a user's escrow status, balance, or milestone state in the SafeTrust UI becomes incorrect β€” with no automatic correction mechanism.

The Solution

A Hasura Cron Trigger running every 15 minutes calls a Node.js handler endpoint (/reconciliation/sync-escrows) that:

  1. Fetches all contract_id values from public.trustless_work_escrows
  2. Batches them in chunks of 50
  3. Calls GET /helper/get-escrows-by-contract-ids on the TrustlessWork API for each batch
  4. Compares the API response against local DB state
  5. Upserts any records where status, balance, or escrow_metadata differ
  6. Logs a summary of changes to trustless_work_webhook_events with event_type = 'reconciliation.sync'

πŸ—„οΈ Database Impact

No new tables required.

The existing schema fully supports this feature:

Table Role in Reconciliation
public.trustless_work_escrows Target of upsert β€” the table being reconciled
public.trustless_work_webhook_events Receives a reconciliation audit log entry per run
public.escrow_milestones Updated if milestone status differs from API response

Fields reconciled on trustless_work_escrows

-- Fields compared and potentially updated per run:
status          -- Most critical: 'pending' β†’ 'active' β†’ 'completed' β†’ 'disputed'
balance         -- Current on-chain balance (DECIMAL)
escrow_metadata -- Full JSONB blob from TrustlessWork API response
updated_at      -- Set to NOW() on any update

πŸ› οΈ Implementation Specification

1. Hasura Cron Trigger

File: metadata/base/cron_triggers.yaml

- name: escrow_reconciliation_sync
  webhook: "{{WEBHOOK_BASE_URL}}/reconciliation/sync-escrows"
  schedule: "*/15 * * * *"   # every 15 minutes
  include_in_metadata: true
  payload:
    tenant_id: "safetrust"
  retry_conf:
    num_retries: 3
    timeout_seconds: 60
    tolerance_seconds: 21600
  comment: "Periodic reconciliation of trustless_work_escrows against TrustlessWork Indexer API"

2. Handler Endpoint

New file: webhook/reconciliation/sync-escrows.js

const express = require('express');
const router = express.Router();
const { Pool } = require('pg');
const axios = require('axios');

const pool = new Pool({
  connectionString: process.env.PG_DATABASE_URL,
});

const TRUSTLESSWORK_API_URL = process.env.TRUSTLESSWORK_API_URL;
const TRUSTLESSWORK_API_KEY = process.env.TRUSTLESSWORK_API_KEY;
const BATCH_SIZE = 50;

router.post('/sync-escrows', async (req, res) => {
  const startedAt = new Date();
  let totalUpdated = 0;
  let totalFailed = 0;

  try {
    // Step 1: Fetch all contract_ids from local DB
    const { rows } = await pool.query(
      `SELECT contract_id FROM public.trustless_work_escrows
       WHERE tenant_id = $1 AND contract_id IS NOT NULL`,
      ['safetrust']
    );

    const contractIds = rows.map(r => r.contract_id);
    if (contractIds.length === 0) {
      return res.json({ message: 'No escrows to reconcile', updated: 0 });
    }

    // Step 2: Batch into chunks of BATCH_SIZE
    const batches = [];
    for (let i = 0; i < contractIds.length; i += BATCH_SIZE) {
      batches.push(contractIds.slice(i, i + BATCH_SIZE));
    }

    // Step 3: Process each batch
    for (const batch of batches) {
      try {
        const response = await axios.get(
          `${TRUSTLESSWORK_API_URL}/helper/get-escrows-by-contract-ids`,
          {
            params: { contractIds: batch.join(',') },
            headers: { 'x-api-key': TRUSTLESSWORK_API_KEY },
            timeout: 30000,
          }
        );

        const apiEscrows = response.data?.escrows || [];

        // Step 4: Upsert each escrow if state differs
        for (const apiEscrow of apiEscrows) {
          try {
            const result = await pool.query(
              `UPDATE public.trustless_work_escrows
               SET
                 status         = $1,
                 balance        = $2,
                 escrow_metadata = $3,
                 updated_at     = NOW()
               WHERE contract_id = $4
                 AND (
                   status          IS DISTINCT FROM $1 OR
                   balance         IS DISTINCT FROM $2 OR
                   escrow_metadata IS DISTINCT FROM $3
                 )
               RETURNING id`,
              [
                apiEscrow.status,
                apiEscrow.balance,
                JSON.stringify(apiEscrow),
                apiEscrow.contractId,
              ]
            );
            if (result.rowCount > 0) totalUpdated++;
          } catch (rowErr) {
            console.error(`Failed to update ${apiEscrow.contractId}:`, rowErr.message);
            totalFailed++;
          }
        }
      } catch (batchErr) {
        console.error('Batch fetch failed:', batchErr.message);
        totalFailed += batch.length;
      }
    }

    // Step 5: Write reconciliation audit log
    await pool.query(
      `INSERT INTO public.trustless_work_webhook_events
         (contract_id, event_type, payload, processed, processed_at, tenant_id)
       VALUES
         (NULL, 'reconciliation.sync', $1, TRUE, NOW(), 'safetrust')`,
      [
        JSON.stringify({
          started_at: startedAt,
          completed_at: new Date(),
          total_contracts: contractIds.length,
          total_updated: totalUpdated,
          total_failed: totalFailed,
        }),
      ]
    );

    res.json({
      message: 'Reconciliation complete',
      total_contracts: contractIds.length,
      total_updated: totalUpdated,
      total_failed: totalFailed,
    });

  } catch (err) {
    console.error('Reconciliation sync failed:', err);
    res.status(500).json({ error: 'Reconciliation failed', details: err.message });
  }
});

module.exports = router;

3. Register the Route

File: webhook/index.js β€” add one line:

const reconciliationRoutes = require('./reconciliation/sync-escrows');
app.use('/reconciliation', reconciliationRoutes);

4. Required Environment Variables

Add to .env.example:

# TrustlessWork API
TRUSTLESSWORK_API_URL=https://dev.api.trustlesswork.com   # or mainnet
TRUSTLESSWORK_API_KEY=your_api_key_here

βœ… Acceptance Criteria

  • Hasura cron trigger escrow_reconciliation_sync is defined in metadata/base/cron_triggers.yaml and fires every 15 minutes
  • POST /reconciliation/sync-escrows endpoint exists and responds with 200 on success
  • Endpoint fetches all contract_id values from public.trustless_work_escrows where tenant_id = 'safetrust'
  • Contract IDs are batched in chunks of 50 before calling the TrustlessWork API
  • Only records where status, balance, or escrow_metadata differ from the API response are updated (IS DISTINCT FROM)
  • updated_at is set to NOW() on every updated row
  • A reconciliation.sync event is inserted into public.trustless_work_webhook_events after every run, with payload containing total_contracts, total_updated, total_failed, started_at, completed_at
  • If the TrustlessWork API returns an error for a batch, that batch is counted as total_failed and the run continues (no full abort)
  • TRUSTLESSWORK_API_URL and TRUSTLESSWORK_API_KEY are documented in .env.example
  • The route is registered in webhook/index.js under /reconciliation
  • Karate test covers: successful sync with changes, successful sync with no changes, batch API failure resilience

πŸ”— Relationships

  • Depends on: public.trustless_work_escrows (canonical escrow table)
  • Writes to: public.trustless_work_webhook_events (audit log)
  • Triggered by: Hasura cron (metadata/base/cron_triggers.yaml)
  • Complements: TrustlessWork webhook handler (inbound real-time events)
  • TrustlessWork endpoint used: GET /helper/get-escrows-by-contract-ids

πŸ“Ž References

Metadata

Metadata

Assignees

Labels

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions