@@ -15,6 +15,8 @@ pub(crate) mod source {
1515 use crate :: Result ;
1616 use crate :: config:: get_vertex_name;
1717 use crate :: error:: Error ;
18+ use base64:: Engine ;
19+ use base64:: prelude:: BASE64_STANDARD ;
1820 use bytes:: Bytes ;
1921 use numaflow_jetstream:: { JetstreamSourceConfig , NatsAuth , TlsClientAuthCerts , TlsConfig } ;
2022 use numaflow_kafka:: source:: KafkaSourceConfig ;
@@ -52,12 +54,20 @@ pub(crate) mod source {
5254 Http ( numaflow_http:: HttpSourceConfig ) ,
5355 }
5456
55- impl From < Box < GeneratorSource > > for SourceType {
56- fn from ( generator : Box < GeneratorSource > ) -> Self {
57+ impl TryFrom < Box < GeneratorSource > > for SourceType {
58+ type Error = Error ;
59+
60+ fn try_from ( generator : Box < GeneratorSource > ) -> Result < Self > {
5761 let mut generator_config = GeneratorConfig :: default ( ) ;
5862
5963 if let Some ( value_blob) = & generator. value_blob {
60- generator_config. content = Bytes :: from ( value_blob. clone ( ) ) ;
64+ let value_blob = BASE64_STANDARD . decode ( value_blob. as_bytes ( ) ) . map_err ( |e| {
65+ Error :: Config ( format ! (
66+ "Failed to base64 decode generator value blob: {:?}" ,
67+ e
68+ ) )
69+ } ) ?;
70+ generator_config. content = Bytes :: from ( value_blob) ;
6171 }
6272
6373 if let Some ( msg_size) = generator. msg_size {
@@ -81,7 +91,7 @@ pub(crate) mod source {
8191 . jitter
8292 . map_or ( Duration :: from_secs ( 0 ) , std:: time:: Duration :: from) ;
8393
84- SourceType :: Generator ( generator_config)
94+ Ok ( SourceType :: Generator ( generator_config) )
8595 }
8696 }
8797
@@ -342,7 +352,7 @@ pub(crate) mod source {
342352
343353 fn try_from ( mut source : Box < Source > ) -> Result < Self > {
344354 if let Some ( generator) = source. generator . take ( ) {
345- return Ok ( generator. into ( ) ) ;
355+ return Ok ( generator. try_into ( ) ? ) ;
346356 }
347357
348358 if source. udsource . is_some ( ) {
@@ -1447,6 +1457,52 @@ mod source_tests {
14471457 assert_eq ! ( default_config. jitter, Duration :: from_secs( 0 ) ) ;
14481458 }
14491459
1460+ #[ test]
1461+ fn test_generator_config_from_value_blob ( ) {
1462+ let source: SourceType =
1463+ SourceType :: try_from ( Box :: new ( numaflow_models:: models:: GeneratorSource {
1464+ value_blob : Some ( "aGVsbG8gd29ybGQK" . to_string ( ) ) ,
1465+ duration : Some ( kube:: core:: Duration :: from ( Duration :: from_secs ( 1 ) ) ) ,
1466+ jitter : Some ( kube:: core:: Duration :: from ( Duration :: from_secs ( 0 ) ) ) ,
1467+ key_count : Some ( 0 ) ,
1468+ msg_size : Some ( 8 ) ,
1469+ rpu : Some ( 1 ) ,
1470+ value : None ,
1471+ } ) )
1472+ . unwrap ( ) ;
1473+ assert_eq ! (
1474+ source,
1475+ SourceType :: Generator ( GeneratorConfig {
1476+ content: Bytes :: from( "hello world\n " ) ,
1477+ duration: Duration :: from( Duration :: from_secs( 1 ) ) ,
1478+ jitter: Duration :: from( Duration :: from_secs( 0 ) ) ,
1479+ key_count: 0 ,
1480+ rpu: 1 ,
1481+ value: None ,
1482+ msg_size_bytes: 8 ,
1483+ } )
1484+ ) ;
1485+ }
1486+
1487+ #[ test]
1488+ fn test_generator_config_from_invalid_value_blob ( ) {
1489+ let source = SourceType :: try_from ( Box :: new ( numaflow_models:: models:: GeneratorSource {
1490+ value_blob : Some ( "abcdef" . to_string ( ) ) ,
1491+ duration : Some ( kube:: core:: Duration :: from ( Duration :: from_secs ( 1 ) ) ) ,
1492+ jitter : Some ( kube:: core:: Duration :: from ( Duration :: from_secs ( 0 ) ) ) ,
1493+ key_count : Some ( 0 ) ,
1494+ msg_size : Some ( 8 ) ,
1495+ rpu : Some ( 1 ) ,
1496+ value : None ,
1497+ } ) ) ;
1498+ assert ! (
1499+ source
1500+ . unwrap_err( )
1501+ . to_string( )
1502+ . contains( "Failed to base64 decode generator value blob" )
1503+ ) ;
1504+ }
1505+
14501506 #[ test]
14511507 fn test_default_user_defined_config ( ) {
14521508 let default_config = UserDefinedConfig :: default ( ) ;
0 commit comments