@@ -310,3 +310,170 @@ where
310
310
Ok ( Self { header, payload } )
311
311
}
312
312
}
313
+
314
+ #[ cfg( test) ]
315
+ mod tests {
316
+ use std:: convert:: { TryFrom , TryInto } ;
317
+
318
+ use super :: * ;
319
+
320
+ static MESSAGE_HEADER : [ u8 ; MESSAGE_HEADER_LENGTH ] = [
321
+ 0x10 , 0x0 , 0x0 , 0x0 , // length
322
+ 0x0 , 0x0 , 0x0 , 0x03 , // stream_id
323
+ 0x2 , // type_
324
+ 0xef , // flags
325
+ ] ;
326
+
327
+ #[ test]
328
+ fn message_header ( ) {
329
+ let mh = MessageHeader :: from ( & MESSAGE_HEADER ) ;
330
+ assert_eq ! ( mh. length, 0x1000_0000 ) ;
331
+ assert_eq ! ( mh. stream_id, 0x3 ) ;
332
+ assert_eq ! ( mh. type_, MESSAGE_TYPE_RESPONSE ) ;
333
+ assert_eq ! ( mh. flags, 0xef ) ;
334
+
335
+ let mut buf2 = vec ! [ 0 ; MESSAGE_HEADER_LENGTH ] ;
336
+ mh. into_buf ( & mut buf2) ;
337
+ assert_eq ! ( & MESSAGE_HEADER , & buf2[ ..] ) ;
338
+
339
+ let mh = MessageHeader :: from ( & PROTOBUF_MESSAGE_HEADER ) ;
340
+ assert_eq ! ( mh. length as usize , TEST_PAYLOAD_LEN ) ;
341
+ }
342
+
343
+ #[ rustfmt:: skip]
344
+ static PROTOBUF_MESSAGE_HEADER : [ u8 ; MESSAGE_HEADER_LENGTH ] = [
345
+ 0x00 , 0x0 , 0x0 , TEST_PAYLOAD_LEN as u8 , // length
346
+ 0x0 , 0x12 , 0x34 , 0x56 , // stream_id
347
+ 0x1 , // type_
348
+ 0xef , // flags
349
+ ] ;
350
+
351
+ const TEST_PAYLOAD_LEN : usize = 67 ;
352
+ static PROTOBUF_REQUEST : [ u8 ; TEST_PAYLOAD_LEN ] = [
353
+ 10 , 17 , 103 , 114 , 112 , 99 , 46 , 84 , 101 , 115 , 116 , 83 , 101 , 114 , 118 , 105 , 99 , 101 , 115 , 18 ,
354
+ 4 , 84 , 101 , 115 , 116 , 26 , 9 , 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 32 , 128 , 218 , 196 , 9 , 42 , 24 , 10 ,
355
+ 9 , 116 , 101 , 115 , 116 , 95 , 107 , 101 , 121 , 49 , 18 , 11 , 116 , 101 , 115 , 116 , 95 , 118 , 97 , 108 ,
356
+ 117 , 101 , 49 ,
357
+ ] ;
358
+
359
+ fn new_protobuf_request ( ) -> Request {
360
+ let mut creq = Request :: new ( ) ;
361
+ creq. set_service ( "grpc.TestServices" . to_string ( ) ) ;
362
+ creq. set_method ( "Test" . to_string ( ) ) ;
363
+ creq. set_timeout_nano ( 20 * 1000 * 1000 ) ;
364
+ let mut meta: protobuf:: RepeatedField < KeyValue > = protobuf:: RepeatedField :: default ( ) ;
365
+ meta. push ( KeyValue {
366
+ key : "test_key1" . to_string ( ) ,
367
+ value : "test_value1" . to_string ( ) ,
368
+ ..Default :: default ( )
369
+ } ) ;
370
+ creq. set_metadata ( meta) ;
371
+ creq. payload = vec ! [ 0x1 , 0x2 , 0x3 , 0x4 , 0x5 , 0x6 , 0x7 , 0x8 , 0x9 ] ;
372
+ creq
373
+ }
374
+
375
+ #[ test]
376
+ fn protobuf_codec ( ) {
377
+ let creq = new_protobuf_request ( ) ;
378
+ let buf = creq. encode ( ) . unwrap ( ) ;
379
+ assert_eq ! ( & buf, & PROTOBUF_REQUEST ) ;
380
+ let dreq = Request :: decode ( & buf) . unwrap ( ) ;
381
+ assert_eq ! ( creq, dreq) ;
382
+ let dreq2 = Request :: decode ( & PROTOBUF_REQUEST ) . unwrap ( ) ;
383
+ assert_eq ! ( creq, dreq2) ;
384
+ }
385
+
386
+ #[ test]
387
+ fn gen_message_to_message ( ) {
388
+ let req = new_protobuf_request ( ) ;
389
+ let msg = Message :: new_request ( 3 , req) ;
390
+ let msg_clone = msg. clone ( ) ;
391
+ let gen: GenMessage = msg. try_into ( ) . unwrap ( ) ;
392
+ let dmsg = Message :: < Request > :: try_from ( gen) . unwrap ( ) ;
393
+ assert_eq ! ( msg_clone, dmsg) ;
394
+ }
395
+
396
+ #[ cfg( feature = "async" ) ]
397
+ #[ tokio:: test]
398
+ async fn async_message_header ( ) {
399
+ use std:: io:: Cursor ;
400
+ let mut buf = vec ! [ ] ;
401
+ let mut io = Cursor :: new ( & mut buf) ;
402
+ let mh = MessageHeader :: from ( & MESSAGE_HEADER ) ;
403
+ mh. write_to ( & mut io) . await . unwrap ( ) ;
404
+ assert_eq ! ( buf, & MESSAGE_HEADER ) ;
405
+
406
+ let dmh = MessageHeader :: read_from ( & buf[ ..] ) . await . unwrap ( ) ;
407
+ assert_eq ! ( mh, dmh) ;
408
+ }
409
+
410
+ #[ cfg( feature = "async" ) ]
411
+ #[ tokio:: test]
412
+ async fn async_gen_message ( ) {
413
+ let mut buf = Vec :: from ( MESSAGE_HEADER ) ;
414
+ buf. extend_from_slice ( & PROTOBUF_REQUEST ) ;
415
+ let res = GenMessage :: read_from ( & * buf) . await ;
416
+ // exceed maximum message size
417
+ assert ! ( matches!( res, Err ( Error :: RpcStatus ( _) ) ) ) ;
418
+
419
+ let mut buf = Vec :: from ( PROTOBUF_MESSAGE_HEADER ) ;
420
+ buf. extend_from_slice ( & PROTOBUF_REQUEST ) ;
421
+ buf. extend_from_slice ( & [ 0x0 , 0x0 ] ) ;
422
+ let gen = GenMessage :: read_from ( & * buf) . await . unwrap ( ) ;
423
+ assert_eq ! ( gen . header. length as usize , TEST_PAYLOAD_LEN ) ;
424
+ assert_eq ! ( gen . header. length, gen . payload. len( ) as u32 ) ;
425
+ assert_eq ! ( gen . header. stream_id, 0x123456 ) ;
426
+ assert_eq ! ( gen . header. type_, MESSAGE_TYPE_REQUEST ) ;
427
+ assert_eq ! ( gen . header. flags, 0xef ) ;
428
+ assert_eq ! ( & gen . payload, & PROTOBUF_REQUEST ) ;
429
+ assert_eq ! (
430
+ & buf[ MESSAGE_HEADER_LENGTH + TEST_PAYLOAD_LEN ..] ,
431
+ & [ 0x0 , 0x0 ]
432
+ ) ;
433
+
434
+ let mut dbuf = vec ! [ ] ;
435
+ let mut io = std:: io:: Cursor :: new ( & mut dbuf) ;
436
+ gen. write_to ( & mut io) . await . unwrap ( ) ;
437
+ assert_eq ! ( & * dbuf, & buf[ ..MESSAGE_HEADER_LENGTH + TEST_PAYLOAD_LEN ] ) ;
438
+ }
439
+
440
+ #[ cfg( feature = "async" ) ]
441
+ #[ tokio:: test]
442
+ async fn async_message ( ) {
443
+ let mut buf = Vec :: from ( MESSAGE_HEADER ) ;
444
+ buf. extend_from_slice ( & PROTOBUF_REQUEST ) ;
445
+ let res = Message :: < Request > :: read_from ( & * buf) . await ;
446
+ // exceed maximum message size
447
+ assert ! ( matches!( res, Err ( Error :: RpcStatus ( _) ) ) ) ;
448
+
449
+ let mut buf = Vec :: from ( PROTOBUF_MESSAGE_HEADER ) ;
450
+ buf. extend_from_slice ( & PROTOBUF_REQUEST ) ;
451
+ buf. extend_from_slice ( & [ 0x0 , 0x0 ] ) ;
452
+ let msg = Message :: < Request > :: read_from ( & * buf) . await . unwrap ( ) ;
453
+ assert_eq ! ( msg. header. length, 67 ) ;
454
+ assert_eq ! ( msg. header. length, msg. payload. size( ) as u32 ) ;
455
+ assert_eq ! ( msg. header. stream_id, 0x123456 ) ;
456
+ assert_eq ! ( msg. header. type_, MESSAGE_TYPE_REQUEST ) ;
457
+ assert_eq ! ( msg. header. flags, 0xef ) ;
458
+ assert_eq ! ( & msg. payload. service, "grpc.TestServices" ) ;
459
+ assert_eq ! ( & msg. payload. method, "Test" ) ;
460
+ assert_eq ! (
461
+ msg. payload. payload,
462
+ vec![ 0x1 , 0x2 , 0x3 , 0x4 , 0x5 , 0x6 , 0x7 , 0x8 , 0x9 ]
463
+ ) ;
464
+ assert_eq ! ( msg. payload. timeout_nano, 20 * 1000 * 1000 ) ;
465
+ assert_eq ! ( msg. payload. metadata. len( ) , 1 ) ;
466
+ assert_eq ! ( & msg. payload. metadata[ 0 ] . key, "test_key1" ) ;
467
+ assert_eq ! ( & msg. payload. metadata[ 0 ] . value, "test_value1" ) ;
468
+
469
+ let req = new_protobuf_request ( ) ;
470
+ let mut dmsg = Message :: new_request ( u32:: MAX , req) ;
471
+ dmsg. header . set_stream_id ( 0x123456 ) ;
472
+ dmsg. header . set_flags ( 0xe0 ) ;
473
+ dmsg. header . add_flags ( 0x0f ) ;
474
+ let mut dbuf = vec ! [ ] ;
475
+ let mut io = std:: io:: Cursor :: new ( & mut dbuf) ;
476
+ dmsg. write_to ( & mut io) . await . unwrap ( ) ;
477
+ assert_eq ! ( & dbuf, & buf[ ..MESSAGE_HEADER_LENGTH + TEST_PAYLOAD_LEN ] ) ;
478
+ }
479
+ }
0 commit comments