Skip to content

Commit 5aab19a

Browse files
committed
Read Replica support
1 parent 3beaa75 commit 5aab19a

File tree

4 files changed

+458
-28
lines changed

4 files changed

+458
-28
lines changed

README.md

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ $posts = $driver->queryAll('SELECT * FROM posts WHERE author_id = ?', [12345]);
8888
- **Schema introspection** via `describeTable()` for table column metadata
8989
- **Query profiling** via `onQuery()` callback for logging and performance monitoring
9090
- **Connection tagging** via `setApplicationName()` and `setClientInfo()` for debugging
91+
- **Read replica support** with automatic query routing and round-robin load balancing
9192
- **IDE support** for VS Code and PHPStorm (syntax highlighting, live templates)
9293

9394
---
@@ -795,6 +796,52 @@ $driver->setClientInfo('order-service', [
795796
- MySQL: Stored in session variable `@sqlx_application_name` (queryable via `SELECT @sqlx_application_name`)
796797
- MSSQL: Stored in session context (queryable via `SELECT SESSION_CONTEXT(N'application_name')`)
797798

799+
#### Read Replicas
800+
801+
Configure read replicas for automatic read/write splitting:
802+
803+
```php
804+
$driver = Sqlx\DriverFactory::make([
805+
Sqlx\DriverOptions::OPT_URL => 'postgres://user:pass@primary/db',
806+
Sqlx\DriverOptions::OPT_READ_REPLICAS => [
807+
'postgres://user:pass@replica1/db',
808+
'postgres://user:pass@replica2/db',
809+
],
810+
]);
811+
812+
// SELECT queries automatically route to replicas (round-robin)
813+
$users = $driver->queryAll('SELECT * FROM users');
814+
815+
// Write operations always go to primary
816+
$driver->execute('INSERT INTO users (name) VALUES (?s)', ['John']);
817+
```
818+
819+
**Weighted load balancing:** Assign weights to control traffic distribution:
820+
821+
```php
822+
$driver = Sqlx\DriverFactory::make([
823+
Sqlx\DriverOptions::OPT_URL => 'postgres://user:pass@primary/db',
824+
Sqlx\DriverOptions::OPT_READ_REPLICAS => [
825+
['url' => 'postgres://user:pass@replica1/db', 'weight' => 3], // 75% traffic
826+
['url' => 'postgres://user:pass@replica2/db', 'weight' => 1], // 25% traffic
827+
],
828+
]);
829+
```
830+
831+
**Routing rules:**
832+
- `queryAll`, `queryRow`, `queryMaybeRow`, `queryValue`, `queryColumn` → replicas
833+
- `execute` → primary
834+
- All queries inside transactions → primary (consistency guarantee)
835+
836+
**Load balancing:** Weighted round-robin (or simple round-robin if all weights equal).
837+
838+
```php
839+
// Check if replicas are configured
840+
if ($driver->hasReadReplicas()) {
841+
echo "Read queries are load balanced across replicas";
842+
}
843+
```
844+
798845
#### Schema Introspection
799846

800847
- `describeTable(string $table, ?string $schema = null): array` – returns column metadata for the specified table.

src/driver.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1110,6 +1110,22 @@ macro_rules! php_sqlx_impl_driver {
11101110
self.driver_inner.set_client_info(application_name, &info)
11111111
}
11121112

1113+
/// Returns true if read replicas are configured for this driver.
1114+
///
1115+
/// When read replicas are configured, SELECT queries are automatically
1116+
/// routed to replicas using round-robin selection, while write operations
1117+
/// (INSERT/UPDATE/DELETE) always go to the primary.
1118+
///
1119+
/// # Example
1120+
/// ```php
1121+
/// if ($driver->hasReadReplicas()) {
1122+
/// echo "Read queries will be load balanced across replicas";
1123+
/// }
1124+
/// ```
1125+
pub fn has_read_replicas(&self) -> bool {
1126+
self.driver_inner.has_read_replicas()
1127+
}
1128+
11131129
/// Begins a SQL transaction, optionally executing a callable within it.
11141130
///
11151131
/// This method supports two modes of operation:

src/inner_driver.rs

Lines changed: 92 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ macro_rules! php_sqlx_impl_driver_inner {
9090
};
9191
use std::collections::BTreeMap;
9292
use std::sync::RwLock;
93+
use std::sync::atomic::AtomicUsize;
9394
use threadsafe_lru::LruCache;
9495
use $crate::{
9596
RUNTIME,
@@ -111,8 +112,16 @@ macro_rules! php_sqlx_impl_driver_inner {
111112
/// This struct is typically wrapped in `Arc` and shared across the outer driver,
112113
/// prepared queries, and query builders.
113114
pub struct $struct {
114-
/// `SQLx` connection pool for efficient connection reuse.
115+
/// `SQLx` connection pool for efficient connection reuse (primary).
115116
pub pool: Pool<$database>,
117+
/// Read replica connection pools for automatic read/write splitting.
118+
pub replica_pools: Vec<Pool<$database>>,
119+
/// Weights for each replica pool (for weighted load balancing).
120+
replica_weights: Vec<u32>,
121+
/// Total weight of all replicas (sum of weights).
122+
replica_total_weight: u32,
123+
/// Counter for weighted round-robin replica selection.
124+
replica_counter: AtomicUsize,
116125
/// LRU cache for parsed SQL AST, reducing parse overhead for repeated queries.
117126
pub ast_cache: LruCache<String, Ast>,
118127
/// Driver configuration options.
@@ -146,6 +155,31 @@ macro_rules! php_sqlx_impl_driver_inner {
146155
let pool = RUNTIME
147156
.block_on(pool_options.connect(url.as_str()))
148157
.map_err(|e| SqlxError::connection_with_source("Failed to connect", e))?;
158+
159+
// Create replica pools with weights
160+
let mut replica_pools = Vec::with_capacity(options.read_replicas.len());
161+
let mut replica_weights = Vec::with_capacity(options.read_replicas.len());
162+
for replica_config in &options.read_replicas {
163+
let mut replica_pool_options = PoolOptions::<$database>::new()
164+
.max_connections(options.max_connections.into())
165+
.min_connections(options.min_connections)
166+
.max_lifetime(options.max_lifetime)
167+
.idle_timeout(options.idle_timeout)
168+
.test_before_acquire(options.test_before_acquire);
169+
if let Some(acquire_timeout) = options.acquire_timeout {
170+
replica_pool_options = replica_pool_options.acquire_timeout(acquire_timeout);
171+
}
172+
let replica_pool = RUNTIME
173+
.block_on(replica_pool_options.connect(replica_config.url.as_str()))
174+
.map_err(|e| SqlxError::connection_with_source(
175+
format!("Failed to connect to replica: {}", replica_config.url),
176+
e,
177+
))?;
178+
replica_pools.push(replica_pool);
179+
replica_weights.push(replica_config.weight);
180+
}
181+
let replica_total_weight: u32 = replica_weights.iter().sum();
182+
149183
let mut settings = SETTINGS.clone();
150184
settings.collapsible_in_enabled = options.collapsible_in_enabled;
151185
let retry_policy = RetryPolicy {
@@ -157,6 +191,10 @@ macro_rules! php_sqlx_impl_driver_inner {
157191
Ok(Self {
158192
tx_stack: RwLock::new(Vec::new()),
159193
pool,
194+
replica_pools,
195+
replica_weights,
196+
replica_total_weight,
197+
replica_counter: AtomicUsize::new(0),
160198
ast_cache: LruCache::new(
161199
options.ast_cache_shard_count,
162200
options.ast_cache_shard_size,
@@ -185,6 +223,49 @@ macro_rules! php_sqlx_impl_driver_inner {
185223
!self.tx_stack.read().expect("Poisoned tx_stack").is_empty()
186224
}
187225

226+
/// Returns true if read replicas are configured.
227+
#[inline]
228+
pub fn has_read_replicas(&self) -> bool {
229+
!self.replica_pools.is_empty()
230+
}
231+
232+
/// Returns a reference to a read replica pool using weighted selection.
233+
///
234+
/// When weights are configured, replicas receive traffic proportional to their weight.
235+
/// For example, with weights [3, 1], the first replica gets ~75% of traffic.
236+
///
237+
/// Returns the primary pool if:
238+
/// - No replicas are configured
239+
/// - There's an active transaction (all queries go to primary)
240+
#[inline]
241+
pub fn get_read_pool(&self) -> &Pool<$database> {
242+
if self.replica_pools.is_empty() || self.has_active_transaction() {
243+
return &self.pool;
244+
}
245+
246+
// Simple round-robin if all weights are equal (optimization)
247+
if self.replica_total_weight as usize == self.replica_pools.len() {
248+
let index = self.replica_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
249+
return &self.replica_pools[index % self.replica_pools.len()];
250+
}
251+
252+
// Weighted selection (truncation is intentional for wrapping)
253+
let counter = self.replica_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
254+
#[allow(clippy::cast_possible_truncation)]
255+
let slot = (counter as u32) % self.replica_total_weight;
256+
257+
let mut cumulative = 0u32;
258+
for (i, &weight) in self.replica_weights.iter().enumerate() {
259+
cumulative += weight;
260+
if slot < cumulative {
261+
return &self.replica_pools[i];
262+
}
263+
}
264+
265+
// Fallback (should never happen if weights are valid)
266+
&self.replica_pools[0]
267+
}
268+
188269
/// Executes an operation with retry logic for transient failures.
189270
///
190271
/// Retries are skipped if:
@@ -356,7 +437,7 @@ macro_rules! php_sqlx_impl_driver_inner {
356437
let row = self.with_retry(|| {
357438
RUNTIME
358439
.block_on(
359-
bind_values(sqlx_oldapi::query(&query), &values)?.fetch_one(&self.pool),
440+
bind_values(sqlx_oldapi::query(&query), &values)?.fetch_one(self.get_read_pool()),
360441
)
361442
.map_err(|err| SqlxError::query_with_source(&query, err))
362443
})?;
@@ -410,7 +491,7 @@ macro_rules! php_sqlx_impl_driver_inner {
410491
RUNTIME
411492
.block_on(
412493
bind_values(sqlx_oldapi::query(&query), &values)?
413-
.fetch_all(&self.pool),
494+
.fetch_all(self.get_read_pool()),
414495
)
415496
.map_err(|err| SqlxError::query_with_source(&query, err))
416497
})?
@@ -479,7 +560,7 @@ macro_rules! php_sqlx_impl_driver_inner {
479560
RUNTIME
480561
.block_on(
481562
bind_values(sqlx_oldapi::query(&query), &values)?
482-
.fetch_one(&self.pool),
563+
.fetch_one(self.get_read_pool()),
483564
)
484565
.map(Some)
485566
.or_else(|err: sqlx_oldapi::Error| match err {
@@ -548,7 +629,7 @@ macro_rules! php_sqlx_impl_driver_inner {
548629
RUNTIME
549630
.block_on(
550631
bind_values(sqlx_oldapi::query(&query), &values)?
551-
.fetch_one(&self.pool),
632+
.fetch_one(self.get_read_pool()),
552633
)
553634
.map_err(|err| SqlxError::query_with_source(&query, err))
554635
})?
@@ -597,7 +678,7 @@ macro_rules! php_sqlx_impl_driver_inner {
597678
RUNTIME
598679
.block_on(
599680
bind_values(sqlx_oldapi::query(&query), &values)?
600-
.fetch_one(&self.pool),
681+
.fetch_one(self.get_read_pool()),
601682
)
602683
.map(Some)
603684
.or_else(|err: sqlx_oldapi::Error| match err {
@@ -653,7 +734,7 @@ macro_rules! php_sqlx_impl_driver_inner {
653734
RUNTIME
654735
.block_on(
655736
bind_values(sqlx_oldapi::query(&query), &values)?
656-
.fetch_all(&self.pool),
737+
.fetch_all(self.get_read_pool()),
657738
)
658739
.map_err(|err| SqlxError::query_with_source(&query, err))
659740
})?
@@ -752,7 +833,7 @@ macro_rules! php_sqlx_impl_driver_inner {
752833
RUNTIME
753834
.block_on(
754835
bind_values(sqlx_oldapi::query(&query), &values)?
755-
.fetch_all(&self.pool),
836+
.fetch_all(self.get_read_pool()),
756837
)
757838
.map_err(|err| SqlxError::query_with_source(&query, err))
758839
})?
@@ -826,7 +907,7 @@ macro_rules! php_sqlx_impl_driver_inner {
826907
} else {
827908
RUNTIME.block_on(
828909
bind_values(sqlx_oldapi::query(&query), &values)?
829-
.fetch_all(&self.pool),
910+
.fetch_all(self.get_read_pool()),
830911
)
831912
}
832913
.map_err(|err| SqlxError::query_with_source(&query, err))
@@ -892,7 +973,7 @@ macro_rules! php_sqlx_impl_driver_inner {
892973
RUNTIME
893974
.block_on(
894975
bind_values(sqlx_oldapi::query(&query), &values)?
895-
.fetch_all(&self.pool),
976+
.fetch_all(self.get_read_pool()),
896977
)
897978
.map_err(|err| SqlxError::query_with_source(&query, err))
898979
})?
@@ -949,7 +1030,7 @@ macro_rules! php_sqlx_impl_driver_inner {
9491030
RUNTIME
9501031
.block_on(
9511032
bind_values(sqlx_oldapi::query(&query), &values)?
952-
.fetch_all(&self.pool),
1033+
.fetch_all(self.get_read_pool()),
9531034
)
9541035
.map_err(|err| SqlxError::query_with_source(&query, err))
9551036
})?

0 commit comments

Comments
 (0)