Skip to content

Commit 020757e

Browse files
decebalclaude
andcommitted
release: v0.12.0 — network sync, conflict strategies, MCP tracker, WS backpressure
Closes the 4 remaining embedded Core gaps from #73: 1. Configurable per-entity-type conflict resolution (#76) - MergeStrategy enum: AppendOnly, LastWriteWins, FirstWriteWins - Config::merge_strategy("prefix", strategy) with longest-prefix matching - CrdtResolver::with_strategies() wired through EmbeddedCore::open() 2. Network sync transport (#75) - SyncClient for HTTP pull/push against remote Core - POST /api/v1/sync/pull and /api/v1/sync/push server endpoints - EmbeddedCore helpers: version_vector(), events_for_sync(), receive_sync_push() - embedded-sync feature now includes reqwest for HTTP transport 3. MCP tool event emission helper - McpToolTracker: emit_tool_result(), emit_tool_error(), track() with auto-timing - Events feed directly into ToolCallAuditProjection - Feature-gated on embedded-projections 4. WebSocket backpressure for token streams - WebSocketConfig: capacity, batch_interval_ms, max_batch_size - Batched send loop with tokio::time::interval + early flush on max_batch_size - RecvError::Lagged → {"type":"lagged","missed":N} client notification - Fully backward compatible: WebSocketManager::new() unchanged 20 new tests, 1489 lib tests pass, 0 regressions. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 6e906c8 commit 020757e

File tree

25 files changed

+1631
-56
lines changed

25 files changed

+1631
-56
lines changed

apps/control-plane/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ var openAPIYAML []byte
3030
// Control plane configuration constants.
3131
const (
3232
// Version is the current version of the control plane.
33-
Version = "0.11.0"
33+
Version = "0.12.0"
3434
// DefaultPort is the default port the control plane listens on.
3535
DefaultPort = "3901"
3636
// CoreServiceURL is the URL of the core event store service.

apps/control-plane/tracing.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import (
1919

2020
const (
2121
serviceName = "allsource-control-plane"
22-
serviceVersion = "0.11.0"
22+
serviceVersion = "0.12.0"
2323
)
2424

2525
// TracingConfig holds OpenTelemetry configuration

apps/core/Cargo.toml

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "allsource-core"
3-
version = "0.11.0"
3+
version = "0.12.0"
44
edition = "2024"
55
rust-version = "1.92"
66
authors = ["AllSource Team"]
@@ -58,7 +58,7 @@ vector-search = ["fastembed", "instant-distance"]
5858
keyword-search = ["tantivy"]
5959
embedded = []
6060
embedded-phase2 = ["embedded"]
61-
embedded-sync = ["embedded"]
61+
embedded-sync = ["embedded", "dep:reqwest"]
6262
embedded-replicant = ["embedded"]
6363
embedded-streaming = ["embedded"]
6464
embedded-projections = ["embedded"]
@@ -156,6 +156,11 @@ name = "bidirectional_sync"
156156
path = "tests/bidirectional_sync.rs"
157157
required-features = ["embedded-sync"]
158158

159+
[[test]]
160+
name = "http_sync_transport"
161+
path = "tests/http_sync_transport.rs"
162+
required-features = ["embedded-sync"]
163+
159164
[[test]]
160165
name = "replicant_protocol"
161166
path = "tests/replicant_protocol.rs"
@@ -171,6 +176,11 @@ name = "ai_projection_templates"
171176
path = "tests/ai_projection_templates.rs"
172177
required-features = ["embedded-projections"]
173178

179+
[[test]]
180+
name = "mcp_tool_emission"
181+
path = "tests/mcp_tool_emission.rs"
182+
required-features = ["embedded-projections"]
183+
174184
[[test]]
175185
name = "toon_format"
176186
path = "tests/toon_format.rs"

apps/core/src/embedded/config.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::infrastructure::cluster::crdt::MergeStrategy;
12
use std::path::{Path, PathBuf};
23

34
/// Configuration for [`EmbeddedCore`](super::EmbeddedCore).
@@ -11,6 +12,7 @@ pub struct EmbeddedConfig {
1112
parquet_flush_interval_secs: u64,
1213
single_tenant: bool,
1314
node_id: Option<u32>,
15+
merge_strategies: Vec<(String, MergeStrategy)>,
1416
}
1517

1618
impl EmbeddedConfig {
@@ -47,6 +49,11 @@ impl EmbeddedConfig {
4749
self.node_id
4850
}
4951

52+
/// Per-event-type merge strategies for conflict resolution.
53+
pub fn merge_strategies(&self) -> &[(String, MergeStrategy)] {
54+
&self.merge_strategies
55+
}
56+
5057
pub(crate) fn parquet_flush_interval_secs(&self) -> u64 {
5158
self.parquet_flush_interval_secs
5259
}
@@ -60,6 +67,7 @@ pub struct ConfigBuilder {
6067
parquet_flush_interval_secs: u64,
6168
single_tenant: bool,
6269
node_id: Option<u32>,
70+
merge_strategies: Vec<(String, MergeStrategy)>,
6371
}
6472

6573
impl Default for ConfigBuilder {
@@ -71,6 +79,7 @@ impl Default for ConfigBuilder {
7179
parquet_flush_interval_secs: 300,
7280
single_tenant: true,
7381
node_id: None,
82+
merge_strategies: Vec::new(),
7483
}
7584
}
7685
}
@@ -125,6 +134,16 @@ impl ConfigBuilder {
125134
self
126135
}
127136

137+
/// Register a per-event-type merge strategy for conflict resolution.
138+
///
139+
/// The `prefix` is matched against event types: `"config."` matches
140+
/// `"config.updated"`, `"config.deleted"`, etc. The longest matching
141+
/// prefix wins. Unmatched types default to `AppendOnly`.
142+
pub fn merge_strategy(mut self, prefix: impl Into<String>, strategy: MergeStrategy) -> Self {
143+
self.merge_strategies.push((prefix.into(), strategy));
144+
self
145+
}
146+
128147
/// Build the configuration. Returns `Result` for forward compatibility.
129148
pub fn build(self) -> crate::error::Result<EmbeddedConfig> {
130149
Ok(EmbeddedConfig {
@@ -134,6 +153,7 @@ impl ConfigBuilder {
134153
parquet_flush_interval_secs: self.parquet_flush_interval_secs,
135154
single_tenant: self.single_tenant,
136155
node_id: self.node_id,
156+
merge_strategies: self.merge_strategies,
137157
})
138158
}
139159
}

apps/core/src/embedded/core.rs

Lines changed: 176 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,13 @@ impl EmbeddedCore {
104104
let (hlc, resolver) = match config.node_id() {
105105
Some(node_id) => {
106106
let hlc = Arc::new(HybridLogicalClock::new(node_id));
107-
let resolver = Arc::new(CrdtResolver::new());
107+
let resolver = if config.merge_strategies().is_empty() {
108+
Arc::new(CrdtResolver::new())
109+
} else {
110+
Arc::new(CrdtResolver::with_strategies(
111+
config.merge_strategies().to_vec(),
112+
))
113+
};
108114
(Some(hlc), Some(resolver))
109115
}
110116
None => (None, None),
@@ -472,6 +478,175 @@ impl EmbeddedCore {
472478
self.store.stats()
473479
}
474480

481+
/// Get the merged version vector across all known regions.
482+
///
483+
/// Returns a map of `region_id → latest HLC timestamp`. Used by the
484+
/// sync transport to compute deltas.
485+
pub fn version_vector(
486+
&self,
487+
) -> std::collections::BTreeMap<String, crate::infrastructure::cluster::hlc::HlcTimestamp> {
488+
match &self.resolver {
489+
Some(resolver) => {
490+
let all_vv = resolver.all_version_vectors();
491+
let mut merged = crate::infrastructure::cluster::crdt::VersionVector::new();
492+
for (_region, vv) in &all_vv {
493+
merged.merge(vv);
494+
}
495+
merged.entries().clone()
496+
}
497+
None => std::collections::BTreeMap::new(),
498+
}
499+
}
500+
501+
/// Get the node's region identifier (e.g., "node-1").
502+
///
503+
/// Returns `None` if sync is not configured (no `node_id`).
504+
pub fn region_id(&self) -> Option<String> {
505+
self.hlc.as_ref().map(|hlc| format!("node-{}", hlc.node_id()))
506+
}
507+
508+
/// Receive events from a remote sync push.
509+
///
510+
/// Applies CRDT conflict resolution to each event and ingests
511+
/// accepted events. Returns `(accepted, skipped)` counts.
512+
pub async fn receive_sync_push(
513+
&self,
514+
events: Vec<ReplicatedEvent>,
515+
) -> Result<(usize, usize)> {
516+
let (Some(_hlc), Some(resolver)) = (&self.hlc, &self.resolver) else {
517+
return Err(crate::error::AllSourceError::InvalidInput(
518+
"sync requires node_id to be configured".to_string(),
519+
));
520+
};
521+
522+
let mut accepted = Vec::new();
523+
let mut skipped = 0usize;
524+
525+
for event in &events {
526+
let resolution = resolver.resolve(event);
527+
if resolution == ConflictResolution::Accept {
528+
resolver.accept(event);
529+
accepted.push(event.clone());
530+
} else {
531+
skipped += 1;
532+
}
533+
}
534+
535+
let accepted_count = accepted.len();
536+
537+
if !accepted.is_empty() {
538+
let store = Arc::clone(&self.store);
539+
tokio::task::spawn_blocking(move || {
540+
for rep_event in accepted {
541+
// Convert ReplicatedEvent back to domain Event
542+
let event_data = &rep_event.event_data;
543+
let event_type = event_data
544+
.get("event_type")
545+
.and_then(|v| v.as_str())
546+
.unwrap_or("unknown")
547+
.to_string();
548+
let entity_id = event_data
549+
.get("entity_id")
550+
.and_then(|v| v.as_str())
551+
.unwrap_or("unknown")
552+
.to_string();
553+
let tenant_id = event_data
554+
.get("tenant_id")
555+
.and_then(|v| v.as_str())
556+
.unwrap_or("default")
557+
.to_string();
558+
let payload = event_data
559+
.get("payload")
560+
.cloned()
561+
.unwrap_or(serde_json::json!({}));
562+
let metadata = event_data.get("metadata").cloned();
563+
564+
let domain_event = Event::from_strings(
565+
event_type,
566+
entity_id,
567+
tenant_id,
568+
payload,
569+
metadata,
570+
)?;
571+
store.ingest(domain_event)?;
572+
}
573+
Ok::<(), crate::error::AllSourceError>(())
574+
})
575+
.await
576+
.map_err(|e| {
577+
crate::error::AllSourceError::InvalidInput(format!("spawn_blocking failed: {e}"))
578+
})??;
579+
}
580+
581+
Ok((accepted_count, skipped))
582+
}
583+
584+
/// Get all events as `ReplicatedEvent`s for sync push.
585+
///
586+
/// Optionally filtered by a version vector threshold (only events
587+
/// newer than the threshold are returned).
588+
pub async fn events_for_sync(
589+
&self,
590+
since_vv: &std::collections::BTreeMap<String, crate::infrastructure::cluster::hlc::HlcTimestamp>,
591+
) -> Result<Vec<ReplicatedEvent>> {
592+
let Some(hlc) = &self.hlc else {
593+
return Err(crate::error::AllSourceError::InvalidInput(
594+
"sync requires node_id to be configured".to_string(),
595+
));
596+
};
597+
598+
let self_region = format!("node-{}", hlc.node_id());
599+
let since = since_vv
600+
.get(&self_region)
601+
.map(|ts| {
602+
chrono::DateTime::from_timestamp_millis(ts.physical_ms as i64).unwrap_or_default()
603+
});
604+
605+
let store = Arc::clone(&self.store);
606+
let all_events = tokio::task::spawn_blocking(move || {
607+
store.query(QueryEventsRequest {
608+
entity_id: None,
609+
event_type: None,
610+
tenant_id: None,
611+
as_of: None,
612+
since,
613+
until: None,
614+
limit: None,
615+
event_type_prefix: None,
616+
payload_filter: None,
617+
})
618+
})
619+
.await
620+
.map_err(|e| {
621+
crate::error::AllSourceError::InvalidInput(format!("spawn_blocking failed: {e}"))
622+
})??;
623+
624+
let node_id = hlc.node_id();
625+
let mut replicated = Vec::with_capacity(all_events.len());
626+
let mut last_ms = 0u64;
627+
let mut logical = 0u32;
628+
629+
for event in &all_events {
630+
let event_ms = event.timestamp().timestamp_millis() as u64;
631+
if event_ms == last_ms {
632+
logical += 1;
633+
} else {
634+
last_ms = event_ms;
635+
logical = 0;
636+
}
637+
let ts = HlcTimestamp::new(event_ms, logical, node_id);
638+
639+
replicated.push(ReplicatedEvent {
640+
event_id: event.id().to_string(),
641+
hlc_timestamp: ts,
642+
origin_region: self_region.clone(),
643+
event_data: serde_json::to_value(EventView::from(event)).unwrap_or_default(),
644+
});
645+
}
646+
647+
Ok(replicated)
648+
}
649+
475650
/// Get a reference to the underlying `EventStore`.
476651
///
477652
/// Escape hatch for advanced use cases that need direct access.

0 commit comments

Comments
 (0)