Skip to content

Latest commit

 

History

History
563 lines (430 loc) · 13.9 KB

File metadata and controls

563 lines (430 loc) · 13.9 KB
id database
title Database Schema
sidebar_position 2

Database Schema

The Mass Payout backend uses PostgreSQL for persistent storage.

Database Entities

Asset

Stores imported asset tokens and their lifecycle contracts.

Table: asset

CREATE TABLE asset (
  id UUID PRIMARY KEY,
  token_id VARCHAR NOT NULL UNIQUE,        -- Asset token ID (0.0.xxxxx)
  name VARCHAR NOT NULL,                   -- Asset name
  symbol VARCHAR NOT NULL,                 -- Asset symbol
  total_supply BIGINT NOT NULL,            -- Total token supply
  decimals INT NOT NULL,                   -- Token decimals
  lifecycle_contract_id VARCHAR,           -- LifeCycle contract ID
  payment_token_id VARCHAR,                -- Payment token ID (USDC, etc.)
  last_synced_at TIMESTAMP,                -- Last blockchain sync
  sync_status VARCHAR,                     -- SYNCED, SYNCING, FAILED
  created_at TIMESTAMP DEFAULT NOW(),
  updated_at TIMESTAMP DEFAULT NOW()
);

Fields:

  • token_id: Hedera token ID (e.g., 0.0.789012)
  • lifecycle_contract_id: Associated LifeCycle Cash Flow contract
  • payment_token_id: Token used for payments (e.g., USDC 0.0.429274)
  • sync_status: Blockchain sync state

Indexes:

CREATE UNIQUE INDEX idx_asset_token_id ON asset(token_id);
CREATE INDEX idx_asset_sync_status ON asset(sync_status);

Distribution

Stores dividend/coupon distributions.

Table: distribution

CREATE TABLE distribution (
  id UUID PRIMARY KEY,
  asset_id UUID NOT NULL REFERENCES asset(id),
  type VARCHAR NOT NULL,                   -- DIVIDEND, COUPON, SNAPSHOT
  execution_type VARCHAR NOT NULL,         -- MANUAL, SCHEDULED, RECURRING, AUTOMATIC
  payout_type VARCHAR,                     -- FIXED, PERCENTAGE (for equities)
  amount DECIMAL,                          -- Total distribution amount
  concept VARCHAR,                         -- Description/label
  scheduled_time TIMESTAMP,                -- For scheduled distributions
  frequency VARCHAR,                       -- HOURLY, DAILY, WEEKLY, MONTHLY
  start_time TIMESTAMP,                    -- For recurring distributions
  trigger_condition VARCHAR,               -- For automatic distributions
  status VARCHAR NOT NULL,                 -- PENDING, PROCESSING, COMPLETED, FAILED
  executed_at TIMESTAMP,
  created_at TIMESTAMP DEFAULT NOW(),
  updated_at TIMESTAMP DEFAULT NOW()
);

Fields:

  • type: Distribution type (DIVIDEND for equities, COUPON for bonds)
  • execution_type: How distribution is triggered
    • MANUAL: Execute immediately
    • SCHEDULED: Execute at specific time
    • RECURRING: Execute on recurring schedule
    • AUTOMATIC: Execute based on trigger
  • payout_type: FIXED (same amount per holder) or PERCENTAGE (proportional to holdings)
  • status: Current processing state

Indexes:

CREATE INDEX idx_distribution_asset ON distribution(asset_id);
CREATE INDEX idx_distribution_status ON distribution(status);
CREATE INDEX idx_distribution_scheduled ON distribution(scheduled_time) WHERE status = 'PENDING';

Holder

Stores asset holders and their payment amounts.

Table: holder

CREATE TABLE holder (
  id UUID PRIMARY KEY,
  asset_id UUID NOT NULL REFERENCES asset(id),
  distribution_id UUID REFERENCES distribution(id),
  account_id VARCHAR NOT NULL,             -- Holder account ID
  balance BIGINT NOT NULL,                 -- Token balance
  payment_amount DECIMAL,                  -- Payment amount for distribution
  paid BOOLEAN DEFAULT FALSE,              -- Payment status
  created_at TIMESTAMP DEFAULT NOW(),
  updated_at TIMESTAMP DEFAULT NOW(),
  UNIQUE(asset_id, distribution_id, account_id)
);

Fields:

  • account_id: Hedera account ID (e.g., 0.0.123456)
  • balance: Token holdings (in smallest unit)
  • payment_amount: Calculated payment for this distribution
  • paid: Whether payment has been executed

Indexes:

CREATE INDEX idx_holder_asset ON holder(asset_id);
CREATE INDEX idx_holder_distribution ON holder(distribution_id);
CREATE INDEX idx_holder_account ON holder(account_id);
CREATE UNIQUE INDEX idx_holder_unique ON holder(asset_id, distribution_id, account_id);

BatchPayout

Tracks payout batch execution.

Table: batch_payout

CREATE TABLE batch_payout (
  id UUID PRIMARY KEY,
  distribution_id UUID NOT NULL REFERENCES distribution(id),
  batch_number INT NOT NULL,               -- Batch sequence number
  total_holders INT NOT NULL,              -- Holders in this batch
  successful_payments INT DEFAULT 0,       -- Successful payments
  failed_payments INT DEFAULT 0,           -- Failed payments
  transaction_id VARCHAR,                  -- Blockchain transaction ID
  status VARCHAR NOT NULL,                 -- PENDING, PROCESSING, COMPLETED, FAILED
  error_message TEXT,                      -- Error details if failed
  executed_at TIMESTAMP,
  created_at TIMESTAMP DEFAULT NOW(),
  updated_at TIMESTAMP DEFAULT NOW()
);

Fields:

  • batch_number: Sequence number for this batch in the distribution
  • transaction_id: Hedera transaction ID (e.g., 0.0.123456@1234567890.123456789)
  • status: Batch processing state
  • error_message: Failure reason if status is FAILED

Indexes:

CREATE INDEX idx_batch_distribution ON batch_payout(distribution_id);
CREATE INDEX idx_batch_status ON batch_payout(status);

BlockchainEventListenerConfig

Configuration for blockchain event listeners.

Table: blockchain_event_listener_config

CREATE TABLE blockchain_event_listener_config (
  id UUID PRIMARY KEY,
  contract_id VARCHAR NOT NULL,            -- Contract to listen to
  last_processed_timestamp BIGINT,         -- Last processed event timestamp
  enabled BOOLEAN DEFAULT TRUE,
  created_at TIMESTAMP DEFAULT NOW(),
  updated_at TIMESTAMP DEFAULT NOW()
);

Fields:

  • contract_id: LifeCycle Cash Flow contract being monitored
  • last_processed_timestamp: Timestamp of last processed event (prevents reprocessing)
  • enabled: Whether listener is active

Indexes:

CREATE UNIQUE INDEX idx_listener_contract ON blockchain_event_listener_config(contract_id);

Entity Relationships

┌──────────┐
│  Asset   │
└────┬─────┘
     │
     │ 1:N
     │
     ▼
┌─────────────────┐
│  Distribution   │
└────┬────────────┘
     │
     │ 1:N
     ├──────────────────┐
     │                  │
     ▼                  ▼
┌──────────┐      ┌──────────────┐
│  Holder  │      │ BatchPayout  │
└──────────┘      └──────────────┘

TypeORM Entities

Asset Entity

@Entity("asset")
export class AssetEntity {
  @PrimaryGeneratedColumn("uuid")
  id: string;

  @Column({ unique: true })
  tokenId: string;

  @Column()
  name: string;

  @Column()
  symbol: string;

  @Column({ type: "bigint" })
  totalSupply: string;

  @Column()
  decimals: number;

  @Column({ nullable: true })
  lifecycleContractId?: string;

  @Column({ nullable: true })
  paymentTokenId?: string;

  @Column({ nullable: true })
  lastSyncedAt?: Date;

  @Column({ nullable: true })
  syncStatus?: string;

  @CreateDateColumn()
  createdAt: Date;

  @UpdateDateColumn()
  updatedAt: Date;

  @OneToMany(() => DistributionEntity, (distribution) => distribution.asset)
  distributions: DistributionEntity[];
}

Distribution Entity

@Entity("distribution")
export class DistributionEntity {
  @PrimaryGeneratedColumn("uuid")
  id: string;

  @Column()
  assetId: string;

  @ManyToOne(() => AssetEntity, (asset) => asset.distributions)
  @JoinColumn({ name: "assetId" })
  asset: AssetEntity;

  @Column()
  type: string;

  @Column()
  executionType: string;

  @Column({ nullable: true })
  payoutType?: string;

  @Column({ type: "decimal", nullable: true })
  amount?: string;

  @Column({ nullable: true })
  concept?: string;

  @Column({ nullable: true })
  scheduledTime?: Date;

  @Column({ nullable: true })
  frequency?: string;

  @Column({ nullable: true })
  startTime?: Date;

  @Column({ nullable: true })
  triggerCondition?: string;

  @Column()
  status: string;

  @Column({ nullable: true })
  executedAt?: Date;

  @CreateDateColumn()
  createdAt: Date;

  @UpdateDateColumn()
  updatedAt: Date;

  @OneToMany(() => HolderEntity, (holder) => holder.distribution)
  holders: HolderEntity[];

  @OneToMany(() => BatchPayoutEntity, (batch) => batch.distribution)
  batches: BatchPayoutEntity[];
}

Migrations

The backend uses TypeORM migrations for schema management.

Creating Migrations

# Generate migration from entity changes
npm run typeorm:migration:generate -- -n MigrationName

# Create empty migration
npm run typeorm:migration:create -- -n MigrationName

Running Migrations

# Run pending migrations
npm run typeorm:migration:run

# Revert last migration
npm run typeorm:migration:revert

Example Migration

export class CreateAssetTable1234567890 implements MigrationInterface {
  public async up(queryRunner: QueryRunner): Promise<void> {
    await queryRunner.createTable(
      new Table({
        name: "asset",
        columns: [
          {
            name: "id",
            type: "uuid",
            isPrimary: true,
            generationStrategy: "uuid",
            default: "uuid_generate_v4()",
          },
          {
            name: "token_id",
            type: "varchar",
            isUnique: true,
          },
          // ... more columns
        ],
      }),
    );
  }

  public async down(queryRunner: QueryRunner): Promise<void> {
    await queryRunner.dropTable("asset");
  }
}

Repository Pattern

Repositories abstract database access:

Example Repository

@Injectable()
export class DistributionRepository {
  constructor(
    @InjectRepository(DistributionEntity)
    private readonly repo: Repository<DistributionEntity>,
  ) {}

  async findById(id: string): Promise<Distribution | null> {
    const entity = await this.repo.findOne({
      where: { id },
      relations: ["asset", "holders", "batches"],
    });
    return entity ? this.toDomain(entity) : null;
  }

  async findPendingScheduled(now: Date): Promise<Distribution[]> {
    const entities = await this.repo.find({
      where: {
        status: "PENDING",
        scheduledTime: LessThanOrEqual(now),
      },
    });
    return entities.map((e) => this.toDomain(e));
  }

  async save(distribution: Distribution): Promise<void> {
    const entity = this.toEntity(distribution);
    await this.repo.save(entity);
  }

  private toDomain(entity: DistributionEntity): Distribution {
    // Map entity to domain model
  }

  private toEntity(domain: Distribution): DistributionEntity {
    // Map domain model to entity
  }
}

Database Configuration

Location: apps/mass-payout/backend/.env

# PostgreSQL connection
DATABASE_HOST=localhost
DATABASE_PORT=5432
DATABASE_USER=postgres
DATABASE_PASSWORD=postgres
DATABASE_NAME=mass_payout

TypeORM Configuration: apps/mass-payout/backend/ormconfig.ts

export default {
  type: "postgres",
  host: process.env.DATABASE_HOST,
  port: parseInt(process.env.DATABASE_PORT, 10),
  username: process.env.DATABASE_USER,
  password: process.env.DATABASE_PASSWORD,
  database: process.env.DATABASE_NAME,
  entities: ["src/**/*.entity.ts"],
  migrations: ["src/infrastructure/persistence/migrations/*.ts"],
  synchronize: false, // Use migrations in production
  logging: process.env.NODE_ENV === "development",
};

Best Practices

Use Migrations

Always use migrations for schema changes:

# 1. Modify entity
# 2. Generate migration
npm run typeorm:migration:generate -- -n AddPaymentTokenToAsset
# 3. Review generated migration
# 4. Run migration
npm run typeorm:migration:run

Transaction Management

Use transactions for multi-step operations:

async createDistributionWithHolders(
  distribution: Distribution,
  holders: Holder[],
): Promise<void> {
  await this.dataSource.transaction(async manager => {
    await manager.save(DistributionEntity, this.toEntity(distribution));
    await manager.save(HolderEntity, holders.map(h => this.toHolderEntity(h)));
  });
}

Query Optimization

Use indexes and efficient queries:

// Good: Use indexed columns
await this.repo.find({
  where: { status: "PENDING" }, // Indexed
});

// Bad: Full table scan
await this.repo.find().then((all) => all.filter((d) => d.status === "PENDING"));

Pagination

Paginate large result sets:

async findAll(page: number, limit: number): Promise<[Distribution[], number]> {
  const [entities, total] = await this.repo.findAndCount({
    skip: (page - 1) * limit,
    take: limit,
    order: { createdAt: 'DESC' },
  });
  return [entities.map(e => this.toDomain(e)), total];
}

Troubleshooting

Database Connection Failed

Problem: Cannot connect to PostgreSQL

Solutions:

  • Verify PostgreSQL is running: docker-compose ps
  • Check credentials in .env
  • Ensure database exists: CREATE DATABASE mass_payout;

Migration Failed

Problem: Migration execution fails

Solutions:

  • Check migration SQL for errors
  • Verify database state matches expected
  • Revert last migration: npm run typeorm:migration:revert
  • Fix and regenerate migration

Slow Queries

Problem: Database queries are slow

Solutions:

  • Add indexes to frequently queried columns
  • Use pagination for large result sets
  • Enable query logging to identify slow queries
  • Use EXPLAIN ANALYZE in PostgreSQL to optimize

Next Steps