@@ -6,10 +6,7 @@ use chrono::{TimeZone, Utc};
6
6
use opentelemetry_proto:: tonic:: common:: v1:: any_value:: Value ;
7
7
use opentelemetry_proto:: tonic:: logs:: v1:: LogRecord ;
8
8
use std:: borrow:: Cow ;
9
- use std:: collections:: HashMap ;
10
- use std:: sync:: { Arc , RwLock } ;
11
-
12
- type SchemaCache = Arc < RwLock < HashMap < u64 , ( Arc < BondEncodedSchema > , [ u8 ; 16 ] ) > > > ;
9
+ use std:: sync:: Arc ;
13
10
14
11
const FIELD_ENV_NAME : & str = "env_name" ;
15
12
const FIELD_ENV_VER : & str = "env_ver" ;
@@ -25,22 +22,11 @@ const FIELD_BODY: &str = "body";
25
22
26
23
/// Encoder to write OTLP payload in bond form.
27
24
#[ derive( Clone ) ]
28
- pub ( crate ) struct OtlpEncoder {
29
- // TODO - limit cache size or use LRU eviction, and/or add feature flag for caching
30
- schema_cache : SchemaCache ,
31
- }
25
+ pub ( crate ) struct OtlpEncoder ;
32
26
33
27
impl OtlpEncoder {
34
28
pub ( crate ) fn new ( ) -> Self {
35
- OtlpEncoder {
36
- schema_cache : Arc :: new ( RwLock :: new ( HashMap :: new ( ) ) ) ,
37
- }
38
- }
39
-
40
- /// Get the number of cached schemas (for testing/debugging purposes)
41
- #[ allow( dead_code) ]
42
- pub ( crate ) fn schema_cache_size ( & self ) -> usize {
43
- self . schema_cache . read ( ) . unwrap ( ) . len ( )
29
+ OtlpEncoder { }
44
30
}
45
31
46
32
/// Encode a batch of logs into a vector of (event_name, bytes, schema_ids, start_time_nanos, end_time_nanos)
@@ -105,7 +91,7 @@ impl OtlpEncoder {
105
91
let ( field_info, schema_id) =
106
92
Self :: determine_fields_and_schema_id ( log_record, event_name_str) ;
107
93
108
- let schema_entry = self . get_or_create_schema ( schema_id, field_info. as_slice ( ) ) ;
94
+ let schema_entry = Self :: create_schema ( schema_id, field_info. as_slice ( ) ) ;
109
95
// 2. Encode row
110
96
let row_buffer = self . write_row_data ( log_record, & field_info) ;
111
97
let level = log_record. severity_number as u8 ;
@@ -251,35 +237,14 @@ impl OtlpEncoder {
251
237
( field_defs, schema_id)
252
238
}
253
239
254
- /// Get or create schema - fields are accessible via returned schema entry
255
- fn get_or_create_schema ( & self , schema_id : u64 , field_info : & [ FieldDef ] ) -> CentralSchemaEntry {
256
- {
257
- if let Some ( ( schema_arc, schema_md5) ) =
258
- self . schema_cache . read ( ) . unwrap ( ) . get ( & schema_id)
259
- {
260
- return CentralSchemaEntry {
261
- id : schema_id,
262
- md5 : * schema_md5,
263
- schema : ( * * schema_arc) . clone ( ) , // Dereference Arc and clone BondEncodedSchema
264
- } ;
265
- }
266
- }
267
-
268
- // Only clone field_info when we actually need to create a new schema
269
- // Investigate if we can avoid cloning by using Cow using Arc to fields_info
240
+ /// Create schema - always creates a new CentralSchemaEntry
241
+ fn create_schema ( schema_id : u64 , field_info : & [ FieldDef ] ) -> CentralSchemaEntry {
270
242
let schema =
271
243
BondEncodedSchema :: from_fields ( "OtlpLogRecord" , "telemetry" , field_info. to_vec ( ) ) ; //TODO - use actual struct name and namespace
272
244
273
245
let schema_bytes = schema. as_bytes ( ) ;
274
246
let schema_md5 = md5:: compute ( schema_bytes) . 0 ;
275
247
276
- // Cache Arc<BondEncodedSchema> to avoid cloning large structures
277
- let schema_arc = Arc :: new ( schema. clone ( ) ) ;
278
- {
279
- let mut cache = self . schema_cache . write ( ) . unwrap ( ) ;
280
- cache. insert ( schema_id, ( schema_arc, schema_md5) ) ;
281
- }
282
-
283
248
CentralSchemaEntry {
284
249
id : schema_id,
285
250
md5 : schema_md5,
@@ -431,35 +396,91 @@ mod tests {
431
396
}
432
397
433
398
#[ test]
434
- fn test_schema_caching ( ) {
399
+ fn test_multiple_schemas_per_batch ( ) {
435
400
let encoder = OtlpEncoder :: new ( ) ;
436
401
402
+ // Create multiple log records with different schema structures
403
+ // to test that multiple schemas can exist within the same batch
437
404
let log1 = LogRecord {
438
405
observed_time_unix_nano : 1_700_000_000_000_000_000 ,
406
+ event_name : "user_action" . to_string ( ) ,
439
407
severity_number : 9 ,
408
+ severity_text : "INFO" . to_string ( ) ,
440
409
..Default :: default ( )
441
410
} ;
442
411
412
+ // Schema 2: Same event_name but with trace_id (different schema)
443
413
let mut log2 = LogRecord {
414
+ event_name : "user_action" . to_string ( ) ,
444
415
observed_time_unix_nano : 1_700_000_001_000_000_000 ,
445
416
severity_number : 10 ,
417
+ severity_text : "WARN" . to_string ( ) ,
446
418
..Default :: default ( )
447
419
} ;
420
+ log2. trace_id = vec ! [ 1 ; 16 ] ;
448
421
449
- let metadata = "namespace=test" ;
422
+ // Schema 3: Same event_name but with attributes (different schema)
423
+ let mut log3 = LogRecord {
424
+ event_name : "user_action" . to_string ( ) ,
425
+ observed_time_unix_nano : 1_700_000_002_000_000_000 ,
426
+ severity_number : 11 ,
427
+ severity_text : "ERROR" . to_string ( ) ,
428
+ ..Default :: default ( )
429
+ } ;
430
+ log3. attributes . push ( KeyValue {
431
+ key : "user_id" . to_string ( ) ,
432
+ value : Some ( AnyValue {
433
+ value : Some ( Value :: StringValue ( "user123" . to_string ( ) ) ) ,
434
+ } ) ,
435
+ } ) ;
450
436
451
- // First encoding creates schema
452
- let _result1 = encoder. encode_log_batch ( [ log1] . iter ( ) , metadata) ;
453
- assert_eq ! ( encoder. schema_cache. read( ) . unwrap( ) . len( ) , 1 ) ;
437
+ let metadata = "namespace=test" ;
454
438
455
- // Second encoding with same schema structure reuses schema
456
- let _result2 = encoder. encode_log_batch ( [ log2. clone ( ) ] . iter ( ) , metadata) ;
457
- assert_eq ! ( encoder. schema_cache. read( ) . unwrap( ) . len( ) , 1 ) ;
439
+ // Encode multiple log records with different schema structures but same event_name
440
+ let result = encoder. encode_log_batch ( [ log1, log2, log3] . iter ( ) , metadata) ;
458
441
459
- // Add trace_id to create different schema
460
- log2. trace_id = vec ! [ 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 , 11 , 12 , 13 , 14 , 15 , 16 ] ;
461
- let _result3 = encoder. encode_log_batch ( [ log2] . iter ( ) , metadata) ;
462
- assert_eq ! ( encoder. schema_cache. read( ) . unwrap( ) . len( ) , 2 ) ;
442
+ // Should create one batch (same event_name = "user_action")
443
+ assert_eq ! ( result. len( ) , 1 ) ;
444
+ let batch = & result[ 0 ] ;
445
+ assert_eq ! ( batch. event_name, "user_action" ) ;
446
+
447
+ // Verify that multiple schemas were created within the same batch
448
+ // schema_ids should contain multiple semicolon-separated MD5 hashes
449
+ let schema_ids = & batch. metadata . schema_ids ;
450
+ assert ! ( !schema_ids. is_empty( ) ) ;
451
+
452
+ // Split by semicolon to get individual schema IDs
453
+ let schema_id_list: Vec < & str > = schema_ids. split ( ';' ) . collect ( ) ;
454
+
455
+ // Should have 3 different schema IDs (one per unique schema structure)
456
+ assert_eq ! (
457
+ schema_id_list. len( ) ,
458
+ 3 ,
459
+ "Expected 3 schema IDs but found {}: {}" ,
460
+ schema_id_list. len( ) ,
461
+ schema_ids
462
+ ) ;
463
+
464
+ // Verify all schema IDs are different (each log record has different schema structure)
465
+ let unique_schemas: std:: collections:: HashSet < & str > = schema_id_list. into_iter ( ) . collect ( ) ;
466
+ assert_eq ! (
467
+ unique_schemas. len( ) ,
468
+ 3 ,
469
+ "Expected 3 unique schema IDs but found duplicates in: {schema_ids}"
470
+ ) ;
471
+
472
+ // Verify each schema ID is a valid MD5 hash (32 hex characters)
473
+ for schema_id in unique_schemas {
474
+ assert_eq ! (
475
+ schema_id. len( ) ,
476
+ 32 ,
477
+ "Schema ID should be 32 hex characters: {schema_id}"
478
+ ) ;
479
+ assert ! (
480
+ schema_id. chars( ) . all( |c| c. is_ascii_hexdigit( ) ) ,
481
+ "Schema ID should contain only hex characters: {schema_id}"
482
+ ) ;
483
+ }
463
484
}
464
485
465
486
#[ test]
@@ -521,9 +542,6 @@ mod tests {
521
542
assert ! ( !result[ 0 ] . data. is_empty( ) ) ; // Should have encoded data
522
543
// Should have 3 different schema IDs (semicolon-separated)
523
544
assert_eq ! ( result[ 0 ] . metadata. schema_ids. matches( ';' ) . count( ) , 2 ) ; // 3 schemas = 2 semicolons
524
-
525
- // Should have 3 different schemas cached
526
- assert_eq ! ( encoder. schema_cache. read( ) . unwrap( ) . len( ) , 3 ) ;
527
545
}
528
546
529
547
#[ test]
@@ -637,8 +655,5 @@ mod tests {
637
655
assert_eq ! ( system_alert. metadata. schema_ids. matches( ';' ) . count( ) , 0 ) ; // 1 schema = 0 semicolons
638
656
assert ! ( !log_batch. data. is_empty( ) ) ; // Should have encoded data
639
657
assert_eq ! ( log_batch. metadata. schema_ids. matches( ';' ) . count( ) , 0 ) ; // 1 schema = 0 semicolons
640
-
641
- // Should have 4 different schemas cached
642
- assert_eq ! ( encoder. schema_cache. read( ) . unwrap( ) . len( ) , 4 ) ;
643
658
}
644
659
}
0 commit comments