Skip to content

Commit c6d6e48

Browse files
CodingAnarchyclaude
andcommitted
feat: Comprehensive streaming feature flag implementation and serialization testing
- Feature-gate entire streaming module behind streaming/kafka/pubsub/kinesis features - Wrap all streaming exports, configuration structs, and integration tests with feature flags - Add comprehensive test coverage for all serialization formats: - JSON serialization with round-trip validation - MessagePack serialization with size comparison and feature enforcement - Avro serialization with binary format validation and feature enforcement - Protobuf serialization with efficiency testing and feature enforcement - Cross-format size comparison testing - Ensure graceful degradation when streaming features are disabled - Maintain backward compatibility while enabling selective feature usage - All tests pass with and without streaming features enabled 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]>
1 parent a8d3a31 commit c6d6e48

File tree

5 files changed

+603
-161
lines changed

5 files changed

+603
-161
lines changed

src/config.rs

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@ use crate::alerting::AlertingConfig;
1818
#[cfg(feature = "metrics")]
1919
use crate::metrics::MetricsConfig;
2020

21+
#[cfg(any(
22+
feature = "streaming",
23+
feature = "kafka",
24+
feature = "google-pubsub",
25+
feature = "kinesis"
26+
))]
2127
use crate::streaming::StreamConfig;
2228
use chrono::Duration;
2329
use serde::{Deserialize, Serialize};
@@ -207,6 +213,12 @@ pub struct HammerworkConfig {
207213
pub webhooks: WebhookConfigs,
208214

209215
/// Streaming configurations
216+
#[cfg(any(
217+
feature = "streaming",
218+
feature = "kafka",
219+
feature = "google-pubsub",
220+
feature = "kinesis"
221+
))]
210222
pub streaming: StreamingConfigs,
211223

212224
/// Alerting configuration
@@ -434,12 +446,18 @@ impl Default for WebhookGlobalSettings {
434446
}
435447

436448
/// Streaming configurations container
449+
#[cfg(any(
450+
feature = "streaming",
451+
feature = "kafka",
452+
feature = "google-pubsub",
453+
feature = "kinesis"
454+
))]
437455
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
438456
pub struct StreamingConfigs {
439457
/// List of configured streams
440458
pub streams: Vec<StreamConfig>,
441459

442-
/// Global streaming settings
460+
/// Global streaming settings
443461
pub global_settings: StreamingGlobalSettings,
444462
}
445463

@@ -469,6 +487,12 @@ impl Default for SimpleEventFilter {
469487
}
470488

471489
/// Global streaming settings
490+
#[cfg(any(
491+
feature = "streaming",
492+
feature = "kafka",
493+
feature = "google-pubsub",
494+
feature = "kinesis"
495+
))]
472496
#[derive(Debug, Clone, Serialize, Deserialize)]
473497
pub struct StreamingGlobalSettings {
474498
/// Maximum concurrent stream processors
@@ -481,6 +505,12 @@ pub struct StreamingGlobalSettings {
481505
pub global_flush_interval_secs: u64,
482506
}
483507

508+
#[cfg(any(
509+
feature = "streaming",
510+
feature = "kafka",
511+
feature = "google-pubsub",
512+
feature = "kinesis"
513+
))]
484514
impl Default for StreamingGlobalSettings {
485515
fn default() -> Self {
486516
Self {
@@ -658,6 +688,12 @@ impl HammerworkConfig {
658688
}
659689

660690
/// Add a stream configuration
691+
#[cfg(any(
692+
feature = "streaming",
693+
feature = "kafka",
694+
feature = "google-pubsub",
695+
feature = "kinesis"
696+
))]
661697
pub fn add_stream(mut self, stream: StreamConfig) -> Self {
662698
self.streaming.streams.push(stream);
663699
self
@@ -667,6 +703,12 @@ impl HammerworkConfig {
667703
#[cfg(test)]
668704
mod tests {
669705
use super::*;
706+
#[cfg(any(
707+
feature = "streaming",
708+
feature = "kafka",
709+
feature = "google-pubsub",
710+
feature = "kinesis"
711+
))]
670712
use crate::streaming::StreamBackend;
671713
use tempfile::tempdir;
672714

@@ -915,6 +957,12 @@ service_name = "hammerwork"
915957
}
916958

917959
#[test]
960+
#[cfg(any(
961+
feature = "streaming",
962+
feature = "kafka",
963+
feature = "google-pubsub",
964+
feature = "kinesis"
965+
))]
918966
fn test_stream_config() {
919967
let stream = StreamConfig {
920968
name: "Test Stream".to_string(),

src/encryption/key_manager.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1228,7 +1228,10 @@ where
12281228

12291229
// Store the master key securely in the database
12301230
// Note: Database persistence is optional - master key works in-memory only
1231-
info!("Master key generated and stored in memory with ID: {}", master_key_id);
1231+
info!(
1232+
"Master key generated and stored in memory with ID: {}",
1233+
master_key_id
1234+
);
12321235

12331236
// Keep a copy in memory for performance (encrypted with a derived key)
12341237
*self.master_key.lock().map_err(|_| {
@@ -1939,7 +1942,6 @@ where
19391942
}
19401943
}
19411944

1942-
19431945
/// Get or create a master key ID based on key material, with database persistence
19441946
async fn get_or_create_master_key_id(
19451947
&self,
@@ -2622,7 +2624,6 @@ impl KeyManager<sqlx::Postgres> {
26222624
pub async fn get_keys_due_for_rotation(&self) -> Result<Vec<String>, EncryptionError> {
26232625
self.get_keys_due_for_rotation_postgres().await
26242626
}
2625-
26262627
}
26272628

26282629
#[cfg(feature = "mysql")]
@@ -3144,7 +3145,6 @@ impl KeyManager<sqlx::MySql> {
31443145
pub async fn get_keys_due_for_rotation(&self) -> Result<Vec<String>, EncryptionError> {
31453146
self.get_keys_due_for_rotation_mysql().await
31463147
}
3147-
31483148
}
31493149

31503150
impl Default for KeyManagerStats {
@@ -3545,7 +3545,7 @@ mod tests {
35453545
"rotation-test-key",
35463546
EncryptionAlgorithm::AES256GCM,
35473547
KeyPurpose::Encryption,
3548-
None, // expires_at
3548+
None, // expires_at
35493549
Some(Duration::days(30)), // 30-day rotation interval
35503550
)
35513551
.await
@@ -3579,7 +3579,7 @@ mod tests {
35793579
"rotation-test-key",
35803580
EncryptionAlgorithm::AES256GCM,
35813581
KeyPurpose::Encryption,
3582-
None, // expires_at
3582+
None, // expires_at
35833583
Some(Duration::days(30)), // 30-day rotation interval
35843584
)
35853585
.await
@@ -4166,7 +4166,7 @@ mod tests {
41664166
#[cfg(feature = "encryption")]
41674167
impl TestKeyManager {
41684168
#[allow(dead_code)]
4169-
fn derive_system_encryption_key(&self, salt: &[u8]) -> Result<Vec<u8>, EncryptionError> {
4169+
fn derive_system_encryption_key(&self, salt: &[u8]) -> Result<Vec<u8>, EncryptionError> {
41704170
use argon2::{
41714171
Argon2,
41724172
password_hash::{PasswordHasher, SaltString},
@@ -4211,7 +4211,7 @@ mod tests {
42114211
}
42124212

42134213
#[allow(dead_code)]
4214-
fn encrypt_with_system_key(
4214+
fn encrypt_with_system_key(
42154215
&self,
42164216
system_key: &[u8],
42174217
plaintext: &[u8],

src/integration_tests.rs

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,22 @@ mod tests {
88
use crate::{
99
events::{EventFilter, EventManager, JobError, JobLifecycleEvent, JobLifecycleEventType},
1010
priority::JobPriority,
11-
streaming::{
12-
PartitioningStrategy, SerializationFormat, StreamBackend, StreamConfig, StreamManager,
13-
StreamManagerConfig,
14-
},
1511
webhooks::{
1612
HttpMethod, RetryPolicy, WebhookAuth, WebhookConfig, WebhookManager,
1713
WebhookManagerConfig,
1814
},
1915
};
16+
17+
#[cfg(any(
18+
feature = "streaming",
19+
feature = "kafka",
20+
feature = "google-pubsub",
21+
feature = "kinesis"
22+
))]
23+
use crate::streaming::{
24+
PartitioningStrategy, SerializationFormat, StreamBackend, StreamConfig, StreamManager,
25+
StreamManagerConfig,
26+
};
2027
use chrono::Utc;
2128
use std::{collections::HashMap, sync::Arc, time::Duration};
2229
use tokio::time::sleep;
@@ -110,6 +117,12 @@ mod tests {
110117
}
111118

112119
#[tokio::test]
120+
#[cfg(any(
121+
feature = "streaming",
122+
feature = "kafka",
123+
feature = "google-pubsub",
124+
feature = "kinesis"
125+
))]
113126
async fn test_event_manager_streaming_integration() {
114127
// Set up event manager
115128
let event_manager = Arc::new(EventManager::new_default());
@@ -158,6 +171,12 @@ mod tests {
158171
}
159172

160173
#[tokio::test]
174+
#[cfg(any(
175+
feature = "streaming",
176+
feature = "kafka",
177+
feature = "google-pubsub",
178+
feature = "kinesis"
179+
))]
161180
async fn test_full_system_integration() {
162181
// Set up all components
163182
let event_manager = Arc::new(EventManager::new_default());
@@ -317,6 +336,12 @@ mod tests {
317336
}
318337

319338
#[tokio::test]
339+
#[cfg(any(
340+
feature = "streaming",
341+
feature = "kafka",
342+
feature = "google-pubsub",
343+
feature = "kinesis"
344+
))]
320345
async fn test_event_filtering_across_systems() {
321346
let event_manager = Arc::new(EventManager::new_default());
322347
let webhook_manager =
@@ -435,6 +460,12 @@ mod tests {
435460
}
436461

437462
#[tokio::test]
463+
#[cfg(any(
464+
feature = "streaming",
465+
feature = "kafka",
466+
feature = "google-pubsub",
467+
feature = "kinesis"
468+
))]
438469
async fn test_concurrent_event_processing() {
439470
let event_manager = Arc::new(EventManager::new_default());
440471
let webhook_manager = Arc::new(WebhookManager::new(
@@ -528,6 +559,12 @@ mod tests {
528559
}
529560

530561
#[tokio::test]
562+
#[cfg(any(
563+
feature = "streaming",
564+
feature = "kafka",
565+
feature = "google-pubsub",
566+
feature = "kinesis"
567+
))]
531568
async fn test_system_resilience_with_failures() {
532569
let event_manager = Arc::new(EventManager::new_default());
533570
let webhook_manager =

src/lib.rs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,12 @@ pub mod events;
258258
#[cfg(feature = "webhooks")]
259259
pub mod webhooks;
260260

261+
#[cfg(any(
262+
feature = "streaming",
263+
feature = "kafka",
264+
feature = "google-pubsub",
265+
feature = "kinesis"
266+
))]
261267
pub mod streaming;
262268

263269
#[cfg(test)]
@@ -270,8 +276,22 @@ pub use archive::{
270276
pub use batch::{BatchId, BatchResult, BatchStatus, JobBatch, PartialFailureMode};
271277
pub use config::{
272278
ArchiveConfig, DatabaseConfig, HammerworkConfig, LoggingConfig, RateLimitingConfig,
273-
StreamingConfigs, StreamingGlobalSettings, WorkerConfig,
279+
WorkerConfig,
274280
};
281+
282+
#[cfg(any(
283+
feature = "streaming",
284+
feature = "kafka",
285+
feature = "google-pubsub",
286+
feature = "kinesis"
287+
))]
288+
pub use config::{StreamingConfigs, StreamingGlobalSettings};
289+
#[cfg(any(
290+
feature = "streaming",
291+
feature = "kafka",
292+
feature = "google-pubsub",
293+
feature = "kinesis"
294+
))]
275295
pub use streaming::StreamConfig;
276296

277297
#[cfg(feature = "webhooks")]
@@ -323,6 +343,12 @@ pub use webhooks::{
323343
WebhookManager, WebhookManagerConfig, WebhookManagerStats, WebhookStats,
324344
};
325345

346+
#[cfg(any(
347+
feature = "streaming",
348+
feature = "kafka",
349+
feature = "google-pubsub",
350+
feature = "kinesis"
351+
))]
326352
pub use streaming::{
327353
BufferConfig, PartitionField, PartitioningStrategy, SerializationFormat, StreamBackend,
328354
StreamConfig as StreamSettings, StreamDelivery, StreamManager, StreamManagerConfig,

0 commit comments

Comments
 (0)