📦 Code Changes: View Diff
Objective: From design to implementation: Building a Safety-First Balance Core Service.
In the previous chapter (0x08-a), we designed the full HFT pipeline architecture. Now, it's time to implement the core components. This chapter covers:
- Ring Buffer - Lock-free inter-service communication.
- Write-Ahead Log (WAL) - Order persistence.
- UBSCore Service - The core balance service.
In financial systems, maturity and stability outweigh extreme performance.
| Crate | Maturity | Security | Performance |
|---|---|---|---|
crossbeam-queue |
🌟🌟🌟🌟🌟 (3.3M+ DLs) | Heavily Audited | Very Low Latency |
ringbuf |
🌟🌟🌟🌟 (600K+ DLs) | Community Verified | Lower Latency |
rtrb |
🌟🌟🌟 (Newer) | Less Vetted | Lowest Latency |
Our Choice: crossbeam-queue
Reasons:
- Maintained by Rust core team members.
- Base dependency for
tokio,actix,rayon. - If it has a bug, half the Rust ecosystem collapses.
Financial System Selection Principle: Use what lets you sleep at night.
use crossbeam_queue::ArrayQueue;
// Create fixed-size ring buffer
let queue: ArrayQueue<OrderMessage> = ArrayQueue::new(1024);
// Producer: Non-blocking push
queue.push(order_msg).unwrap();
// Consumer: Non-blocking pop
if let Some(msg) = queue.pop() {
process(msg);
}WAL is the system's Single Source of Truth.
/// Write-Ahead Log for Orders
///
/// Principles:
/// 1. Append-Only: Sequential I/O, max performance.
/// 2. Group Commit: Batch fsyncs.
/// 3. Monotonic sequence_id: Deterministic replay.
pub struct WalWriter {
writer: BufWriter<File>,
next_seq: SeqNum,
pending_count: usize,
config: WalConfig,
}| Flush Strategy | Latency | Throughput | Safety |
|---|---|---|---|
| Every Entry | ~50µs | ~20K/s | Highest |
| Every 100 Entries | ~5µs (amortized) | ~200K/s | High |
| Every 1ms | ~1µs (amortized) | ~1M/s | Medium |
We choose Every 100 Entries to balance performance and safety:
pub struct WalConfig {
pub path: String,
pub flush_interval_entries: usize, // Flush every N entries
pub sync_on_flush: bool, // Whether to call fsync
}Currently CSV (readable for dev):
seq_id,timestamp_ns,order_id,user_id,price,qty,side,order_type
1,1702742400000000000,1001,100,85000000000,100000000,Buy,Limit
In production, switch to Binary (54 bytes/entry) for better performance.
UBSCore is the Single Entry Point for all balance operations.
- Balance State Management: In-memory balance state.
- Order WAL Writing: Persist orders.
- Balance Operations: lock/unlock/spend_frozen/deposit.
pub struct UBSCore {
/// User Accounts - Authoritative Balance State
accounts: FxHashMap<UserId, UserAccount>,
/// Write-Ahead Log
wal: WalWriter,
/// Configuration
config: TradingConfig,
/// Pending Orders (Locked but not filled)
pending_orders: FxHashMap<OrderId, PendingOrder>,
/// Statistics
stats: UBSCoreStats,
}process_order(order):
│
├─ 1. Write to WAL ──────────► Get seq_id
│
├─ 2. Validate order ────────► Check price/qty
│
├─ 3. Get user account ──────► Lookup user
│
├─ 4. Calculate lock amount ─► Buy: price * qty / qty_unit
│ Sell: qty
│
└─ 5. Lock balance ──────────► Success → Ok(ValidOrder)
Fail → Err(Rejected)
Implementation:
pub fn process_order(&mut self, order: Order) -> Result<ValidOrder, OrderEvent> {
// Step 1: Write to WAL FIRST (persist before any state change)
let seq_id = self.wal.append(&order)?;
// Step 2-4: Validate and calculate
// ...
// Step 5: Lock balance
let lock_result = account
.get_balance_mut(locked_asset_id)
.and_then(|balance| balance.lock(locked_amount));
match lock_result {
Ok(()) => {
// Track pending order
self.pending_orders.insert(order.id, PendingOrder { ... });
Ok(ValidOrder::new(seq_id, order, locked_amount, locked_asset_id))
}
Err(_) => Err(OrderEvent::Rejected { ... })
}
}pub fn settle_trade(&mut self, event: &TradeEvent) -> Result<(), &'static str> {
let trade = &event.trade;
let quote_amount = trade.price * trade.qty / self.config.qty_unit();
// Buyer: spend USDT, receive BTC
buyer.get_balance_mut(quote_id)?.spend_frozen(quote_amount)?;
buyer.get_balance_mut(base_id)?.deposit(trade.qty)?;
// Seller: spend BTC, receive USDT
seller.get_balance_mut(base_id)?.spend_frozen(trade.qty)?;
seller.get_balance_mut(quote_id)?.deposit(quote_amount)?;
Ok(())
}Services communicate via defined message types:
// Gateway → UBSCore
pub struct OrderMessage {
pub seq_id: SeqNum,
pub order: Order,
// ...
}
// UBSCore → ME
pub struct ValidOrder {
pub seq_id: SeqNum,
pub order: Order,
pub locked_amount: u64,
// ...
}
// ME → UBSCore + Settlement
pub struct TradeEvent {
pub trade: Trade,
pub taker_order_id: OrderId,
pub maker_order_id: OrderId,
// ...
}# Original Pipeline
cargo run --release
# UBSCore Pipeline (Enable WAL)
cargo run --release -- --ubscore| Metric | Original | UBSCore | Change |
|---|---|---|---|
| Throughput | 15,070 ops/s | 14,314 ops/s | -5% |
| WAL Entries | N/A | 100,000 | 6.67 MB |
| Balance Check | 0.3% | 1.3% | +1% |
| Matching | 45.5% | 45.5% | - |
| Settlement | 0.1% | 0.2% | - |
| Ledger I/O | 54.0% | 53.0% | -1% |
Analysis:
- WAL introduces ~5% overhead.
- Acceptable cost for safety.
- Main bottleneck remains Ledger I/O.
cargo test
# 31 tests passingsh scripts/test_e2e.sh
# ✅ All tests passed!| File | Lines | Description |
|---|---|---|
src/messages.rs |
265 | Inter-service messages |
src/wal.rs |
340 | Write-Ahead Log |
src/ubscore.rs |
490 | User Balance Core |
- Maturity > Performance
- Auditable > Rapid Dev
All state = f(WAL). Foundation for Disaster Recovery and Audit.
UBSCore uses single thread for natural atomicity (no locking needed for balance ops) and predictable latency.
Testing with --ubscore revealed 1032 rejected orders that were accepted in the legacy mode.
Overflow in price * qty (u64).
Example Order #21:
- Price: 84,956.01 USDT (6 decimals) ->
84,956,010,000 - Qty: 2.56 BTC (8 decimals) ->
256,284,400 - Product:
2.177 × 10^19>u64::MAX
Release Code Wrapping Arithmetic:
Legacy code cost = price * qty wrapped around, resulting in a much smaller, incorrect value. users were locked for 33k USDT but bought 217k USDT worth of BTC!
// Use u128 for intermediate calculation
let cost_128 = (self.price as u128) * (self.qty as u128) / (qty_unit as u128);
if cost_128 > u64::MAX as u128 {
Err(CostError::Overflow)
}USDT with 6 decimals is risky. Recommended: 2 decimals. Binance uses 2 decimals for USDT price.
Current Ledger lacks Deposit, Lock, Unlock, SpendFrozen. Only tracks Settlement.
Pipeline concurrency means Lock and Settlement events interleave non-deterministically.
Snapshot comparison is impossible.
Separate version counters for Lock events and Settle events.
| Version Space | Increment On | Sort By | Determinism |
|---|---|---|---|
lock_version |
Lock/Unlock | order_seq_id |
✅ Deterministic |
settle_version |
Settle | trade_id |
✅ Deterministic |
Validation Strategy: Verify the Final Set of events, sorted by their respective versions/source IDs, rather than checking snapshot consistency at arbitrary times.
UBSCore has inputs from OrderQueue and TradeQueue. Interleaving is random.
Solution:
- OrderQueue strictly follows
order_seq_id. - TradeQueue strictly follows
trade_id. - Link every Balance Event to its source (
order_seq_idortrade_id). - This forms a Causal Chain for audit.
struct BalanceEvent {
// ...
source_type: SourceType, // Order | Trade
source_id: u64, // order_seq_id | trade_id
}This allows offline verification:
Lock(source=Order N) must exist if Order N exists.
Settle(source=Trade M) must exist if Trade M exists.
- Implement Version Space Separation.
- Expand
BalanceEventwith causal links. - Integrate Ring Buffer.
- Develop Causal Chain Audit Tools.
📦 代码变更: 查看 Diff
从设计到实现:构建安全第一的余额核心服务
在上一章(0x08-a)中,我们设计了完整的 HFT 交易流水线架构。现在,是时候实现核心组件了。本章我们将构建:
- Ring Buffer - 服务间无锁通信
- Write-Ahead Log (WAL) - 订单持久化
- UBSCore Service - 余额核心服务
在金融系统中,成熟稳定比极致性能更重要。
| 库 | 成熟度 | 安全性 | 性能 |
|---|---|---|---|
crossbeam-queue |
🌟🌟🌟🌟🌟 (330万+下载) | 最严苛审计 | 极低延迟 |
ringbuf |
🌟🌟🌟🌟 (60万+下载) | 社区验证 | 更低延迟 |
rtrb |
🌟🌟🌟 (较新) | 较少审查 | 最低延迟 |
我们的选择:crossbeam-queue
理由:
- Rust 核心团队成员参与维护
- 被 tokio, actix, rayon 作为底层依赖
- 如果它有 Bug,半个 Rust 生态都会崩
金融系统选型原则:用它睡得着觉。
use crossbeam_queue::ArrayQueue;
// 创建固定容量的 ring buffer
let queue: ArrayQueue<OrderMessage> = ArrayQueue::new(1024);
// 生产者:非阻塞 push
queue.push(order_msg).unwrap();
// 消费者:非阻塞 pop
if let Some(msg) = queue.pop() {
process(msg);
}WAL 是系统的唯一事实来源 (Single Source of Truth)。
/// Write-Ahead Log for Orders
///
/// 设计原则:
/// 1. 追加写 (Append-Only) - 顺序 I/O,最大化性能
/// 2. Group Commit - 批量刷盘,减少 fsync 次数
/// 3. 单调递增 sequence_id - 保证确定性重放
pub struct WalWriter {
writer: BufWriter<File>,
next_seq: SeqNum,
pending_count: usize,
config: WalConfig,
}| 刷盘策略 | 延迟 | 吞吐量 | 数据安全 |
|---|---|---|---|
| 每条 fsync | ~50µs | ~20K/s | 最高 |
| 每 100 条 | ~5µs (均摊) | ~200K/s | 高 |
| 每 1ms | ~1µs (均摊) | ~1M/s | 中 |
我们选择 每 100 条刷盘,在性能和安全间取得平衡:
pub struct WalConfig {
pub path: String,
pub flush_interval_entries: usize, // 每 N 条刷盘
pub sync_on_flush: bool, // 是否调用 fsync
}当前使用 CSV 格式(开发阶段可读性好):
seq_id,timestamp_ns,order_id,user_id,price,qty,side,order_type
1,1702742400000000000,1001,100,85000000000,100000000,Buy,Limit
生产环境可切换为二进制格式(54 bytes/entry)以提升性能。
UBSCore 是所有余额操作的唯一入口。
- Balance State Management - 内存中的余额状态
- Order WAL Writing - 持久化订单
- Balance Operations - lock/unlock/spend_frozen/deposit
pub struct UBSCore {
/// 用户账户 - 权威余额状态
accounts: FxHashMap<UserId, UserAccount>,
/// Write-Ahead Log
wal: WalWriter,
/// 交易配置
config: TradingConfig,
/// 待处理订单(已锁定但未成交)
pending_orders: FxHashMap<OrderId, PendingOrder>,
/// 统计信息
stats: UBSCoreStats,
}process_order(order):
│
├─ 1. Write to WAL ──────────► 获得 seq_id
│
├─ 2. Validate order ────────► 价格/数量检查
│
├─ 3. Get user account ──────► 查找用户
│
├─ 4. Calculate lock amount ─► Buy: price * qty / qty_unit
│ Sell: qty
│
└─ 5. Lock balance ──────────► Success → Ok(ValidOrder)
Fail → Err(Rejected)
代码实现:
pub fn process_order(&mut self, order: Order) -> Result<ValidOrder, OrderEvent> {
// Step 1: Write to WAL FIRST (persist before any state change)
let seq_id = self.wal.append(&order)?;
// Step 2-4: Validate and calculate
// ...
// Step 5: Lock balance
let lock_result = account
.get_balance_mut(locked_asset_id)
.and_then(|balance| balance.lock(locked_amount));
match lock_result {
Ok(()) => {
// Track pending order
self.pending_orders.insert(order.id, PendingOrder { ... });
Ok(ValidOrder::new(seq_id, order, locked_amount, locked_asset_id))
}
Err(_) => Err(OrderEvent::Rejected { ... })
}
}pub fn settle_trade(&mut self, event: &TradeEvent) -> Result<(), &'static str> {
let trade = &event.trade;
let quote_amount = trade.price * trade.qty / self.config.qty_unit();
// Buyer: spend USDT, receive BTC
buyer.get_balance_mut(quote_id)?.spend_frozen(quote_amount)?;
buyer.get_balance_mut(base_id)?.deposit(trade.qty)?;
// Seller: spend BTC, receive USDT
seller.get_balance_mut(base_id)?.spend_frozen(trade.qty)?;
seller.get_balance_mut(quote_id)?.deposit(quote_amount)?;
Ok(())
}服务间通过明确定义的消息类型通信:
// Gateway → UBSCore
pub struct OrderMessage {
pub seq_id: SeqNum,
pub order: Order,
// ...
}
// UBSCore → ME
pub struct ValidOrder {
pub seq_id: SeqNum,
pub order: Order,
pub locked_amount: u64,
// ...
}
// ME → UBSCore + Settlement
pub struct TradeEvent {
pub trade: Trade,
pub taker_order_id: OrderId,
pub maker_order_id: OrderId,
// ...
}# 原始流水线
cargo run --release
# UBSCore 流水线(启用 WAL)
cargo run --release -- --ubscore| 指标 | 原始 | UBSCore | 变化 |
|---|---|---|---|
| 吞吐量 | 15,070 ops/s | 14,314 ops/s | -5% |
| WAL 条目 | N/A | 100,000 | 6.67 MB |
| 余额检查 | 0.3% | 1.3% | +1% |
| 匹配引擎 | 45.5% | 45.5% | - |
| 结算 | 0.1% | 0.2% | - |
| 账本 I/O | 54.0% | 53.0% | -1% |
分析:
- WAL 写入引入约 5% 的开销
- 这是可接受的代价,换取了数据安全性
- 主要瓶颈仍是 Ledger I/O(下一章优化目标)
cargo test
# 31 tests passingsh scripts/test_e2e.sh
# ✅ All tests passed!| 文件 | 行数 | 描述 |
|---|---|---|
src/messages.rs |
265 | 服务间消息类型 |
src/wal.rs |
340 | Write-Ahead Log |
src/ubscore.rs |
490 | User Balance Core |
- 成熟稳定 > 极致性能
- 可审计 > 快速开发
- 用它睡得着觉 是选型的最高标准
All state = f(WAL)。任何时刻,系统状态都可以从 WAL 100% 重建。这也是灾难恢复和审计合规的基础。
UBSCore 选择单线程不是因为简单,而是因为:
- 自然的原子性(无锁)
- 不可能双重支付
- 可预测的延迟
在实现 UBSCore 并运行 --ubscore 模式测试时,发现了 1032 个订单被拒绝,而传统模式全部接受。
Cost 计算时 price * qty 溢出 u64。
订单 #21:
price = 84,956,010,000(84956.01 USDT,6位精度)qty = 256,284,400(2.562844 BTC,8位精度)price * qty = 2.177 × 10^19> u64::MAX
Release 模式的 wrapping arithmetic! 传统模式下,溢出后值变小,虽然通过了检查,但是锁定的金额严重不足!这是一个巨大的金融漏洞。
// 使用 u128 进行中间计算
let cost_128 = (self.price as u128) * (self.qty as u128) / (qty_unit as u128);
if cost_128 > u64::MAX as u128 {
Err(CostError::Overflow)
}USDT 使用 6 位精度导致溢出风险。建议使用 2 位精度(Binance 标准)。
当前 Ledger 缺失 Deposit, Lock, Unlock, SpendFrozen 等操作。
由于 Ring Buffer 并行处理,Lock 和 Settle 事件的交错顺序不固定,导致无法通过快照对比来验证一致性。
为每种事件类型维护独立的 version:
| Version 空间 | 递增条件 | 排序依据 | 确定性 |
|---|---|---|---|
lock_version |
Lock/Unlock 事件 | order_seq_id |
✅ 确定 |
settle_version |
Settle 事件 | trade_id |
✅ 确定 |
验证策略: 不再验证任意时刻的快照,而是验证处理完成后的最终事件集合(按各自 Version 排序)。
UBSCore 有两个输入源:OrderQueue 和 TradeQueue。 为了审计,我们建立了因果链:
struct BalanceEvent {
// ...
source_type: SourceType, // Order | Trade
source_id: u64, // order_seq_id | trade_id
}这不仅解决了审计问题,还让我们可以快速定位问题源头:Lock 必定对应一个 Order,Settle 必定对应一个 Trade。
- 实现分离 Version 空间 -
lock_version/settle_version - 扩展
BalanceEvent- 添加event_type,version,source_id - Ring Buffer 集成
- 因果链审计工具