@@ -1034,6 +1034,140 @@ void start_threads(struct mpsc_pbuf_buffer *buffer)
10341034 }
10351035}
10361036
1037+ static uint32_t buf32_1 [128 ];
1038+ struct test_msg_data {
1039+ union mpsc_pbuf_generic hdr ;
1040+ uint32_t buf_len ; /*contain buf_len and buf*/
1041+ uint8_t buf [sizeof (buf32_1 )];
1042+ };
1043+ struct test_msg_hnd {
1044+ struct mpsc_pbuf_buffer mpsc_buffer ;
1045+ uint32_t product_max_cnt ;
1046+ uint32_t product_cnt ;
1047+ uint32_t consumer_cnt ;
1048+ };
1049+
1050+ K_THREAD_STACK_DEFINE (t3_stack , 1024 );
1051+ static struct k_thread threads_t3 ;
1052+ static k_tid_t tid3 ;
1053+
1054+ static uint32_t test_mpsc_get_used_len (const union mpsc_pbuf_generic * packet )
1055+ {
1056+ uint32_t size = 0 ;
1057+ struct test_msg_data * msg_data = NULL ;
1058+
1059+ msg_data = CONTAINER_OF (packet , struct test_msg_data , hdr );
1060+ size = msg_data -> buf_len + sizeof (union mpsc_pbuf_generic );
1061+ size = ROUND_UP (size , sizeof (uint32_t )); /*return number of uint32_t */
1062+ size = size / sizeof (uint32_t );
1063+ return size ;
1064+ }
1065+
1066+ static void t_data_consumer_entry (void * p0 , void * p1 , void * p2 )
1067+ {
1068+ uint32_t read_cnt = 0 ;
1069+ bool wait = true;
1070+ const union mpsc_pbuf_generic * msg = NULL ;
1071+ const struct test_msg_data * test_data = NULL ;
1072+ struct test_msg_hnd * test_hnd = (struct test_msg_hnd * )p0 ;
1073+
1074+ while (1 ) {
1075+ if (msg == NULL ) {
1076+ msg = mpsc_pbuf_claim (& test_hnd -> mpsc_buffer );
1077+ }
1078+ test_data = (const struct test_msg_data * )msg ;
1079+ if (test_data == NULL ) {
1080+ continue ;
1081+ }
1082+ if (test_data && test_hnd -> mpsc_buffer .wr_idx == 0 ) {
1083+ wait = false;
1084+ }
1085+ if (wait == true) {
1086+ continue ;
1087+ }
1088+ read_cnt ++ ;
1089+ mpsc_pbuf_free (& test_hnd -> mpsc_buffer , msg );
1090+ msg = NULL ;
1091+ if (read_cnt == test_hnd -> product_max_cnt ) {
1092+ break ;
1093+ }
1094+
1095+ }
1096+ test_hnd -> consumer_cnt = read_cnt ;
1097+ }
1098+
1099+ /* test mpsc_pbuf_alloc can get sem_take
1100+ * one thread product data, one thread consumer data
1101+ * requirement:
1102+ * consumer slow process data
1103+ * step:
1104+ * 1:product data len is 0.75 of cfg.size
1105+ * 2:run product times
1106+ *
1107+ */
1108+ ZTEST (log_buffer , test_sema_lock )
1109+ {
1110+ struct test_msg_hnd test_hnd = {};
1111+ struct test_msg_data test_data ;
1112+ struct test_msg_data * item = NULL ;
1113+ struct mpsc_pbuf_buffer_config cfg ;
1114+ uint32_t loop = 0 ;
1115+ size_t wlen = 0 ;
1116+ bool fist_wait = true;
1117+
1118+ if (CONFIG_SYS_CLOCK_TICKS_PER_SEC < 10000 ) {
1119+ ztest_test_skip ();
1120+ }
1121+
1122+ cfg .buf = buf32_1 ;
1123+ cfg .size = ARRAY_SIZE (buf32_1 );
1124+ cfg .get_wlen = test_mpsc_get_used_len ;
1125+
1126+ mpsc_pbuf_init (& test_hnd .mpsc_buffer , & cfg );
1127+ test_hnd .product_max_cnt = 2 ;
1128+
1129+ tid3 = k_thread_create (& threads_t3 , t3_stack , 1024 ,
1130+ t_data_consumer_entry ,
1131+ & test_hnd , NULL , NULL ,
1132+ 10 , 0 , K_NO_WAIT );
1133+ k_thread_name_set (& threads_t3 , "test_mpsc_consumer" );
1134+ for (loop = 0 ; loop < test_hnd .product_max_cnt ; loop ++ ) {
1135+ if (loop == 0 ) {
1136+ test_data .buf_len = sizeof (buf32_1 ) / 2 ;
1137+ } else {
1138+ test_data .buf_len = sizeof (buf32_1 ) / 2 + sizeof (buf32_1 ) / 4 ;
1139+ }
1140+ test_data .buf_len = ROUND_UP (test_data .buf_len , sizeof (uint32_t ));
1141+ memset (test_data .buf , loop + 1 , test_data .buf_len );
1142+
1143+ wlen = test_data .buf_len + sizeof (test_data .buf_len ) +
1144+ sizeof (test_data .hdr );
1145+ wlen = ROUND_UP (wlen , sizeof (uint32_t ));
1146+ wlen = wlen / sizeof (uint32_t );
1147+
1148+ if (fist_wait &&
1149+ test_data .buf_len == sizeof (buf32_1 ) / 2 + sizeof (buf32_1 ) / 4 ) {
1150+ PRINT (" mpsc sema wait\n" );
1151+ }
1152+ item = (struct test_msg_data * )mpsc_pbuf_alloc (& test_hnd .mpsc_buffer ,
1153+ wlen , K_FOREVER );
1154+ item -> hdr .raw = 0 ;
1155+ memcpy (item -> buf , test_data .buf , test_data .buf_len );
1156+ item -> buf_len = test_data .buf_len + sizeof (test_data .buf_len );
1157+ mpsc_pbuf_commit (& test_hnd .mpsc_buffer , & item -> hdr );
1158+ if (fist_wait &&
1159+ test_data .buf_len == sizeof (buf32_1 ) / 2 + sizeof (buf32_1 ) / 4 ) {
1160+ PRINT (" mpsc sema wake\n" );
1161+ fist_wait = false;
1162+ }
1163+ test_hnd .product_cnt ++ ;
1164+ }
1165+ k_thread_join (tid3 , K_FOREVER );
1166+ zassert_equal (test_hnd .product_cnt ,
1167+ test_hnd .consumer_cnt , "product %d consume %d" ,
1168+ test_hnd .product_cnt , test_hnd .consumer_cnt );
1169+
1170+ }
10371171/* Test creates two threads which pends on the buffer until there is a space
10381172 * available. When enough buffers is released threads are woken up and they
10391173 * allocate packets.
0 commit comments