66
77#define _GNU_SOURCE /* for pthread_setname_np */
88
9+ #include <assert.h>
910#include <ctype.h>
1011#include <errno.h>
1112#include <poll.h>
@@ -43,17 +44,26 @@ struct jverifier {
4344 JSON_Object * objects [JSONSTACKSZ ];
4445};
4546
47+ struct msg {
48+ char * data ;
49+ struct msg * next ;
50+ };
51+
4652static struct {
4753 pthread_t thread_id ;
4854 int running ;
4955 bool connected ;
5056 bool offline_hub ;
51- int pipe [2 ];
5257 struct evp_lock lock ;
5358 evp_agent_loop_fn_t agent_loop_fn ;
59+ struct msg * messages ;
60+ pthread_cond_t cond ;
5461 const char
5562 * payloads [EVP_HUB_TYPE_UNKNOWN ][AGENT_TEST_MAX_PAYLOAD_ID + 1 ];
56- } g_agent_test = {.lock = EVP_LOCK_INITIALIZER , .pipe = {-1 , -1 }};
63+ } g_agent_test = {
64+ .lock = EVP_LOCK_INITIALIZER ,
65+ .cond = PTHREAD_COND_INITIALIZER ,
66+ };
5767
5868void
5969message_log (enum message_log_level level , const char * func , const char * file ,
@@ -1024,14 +1034,6 @@ agent_test_start(void)
10241034{
10251035 int ret ;
10261036
1027- // create pipe for test data
1028- #ifdef O_DIRECT
1029- ret = pipe2 (g_agent_test .pipe , O_DIRECT );
1030- #else
1031- ret = pipe (g_agent_test .pipe );
1032- #endif
1033- assert_int_equal (0 , ret );
1034-
10351037 // instantiate EVP Agent context
10361038 struct evp_agent_context * ctxt ;
10371039 ctxt = evp_agent_setup ("evp_agent_main" );
@@ -1082,6 +1084,16 @@ agent_test_exit(void)
10821084 }
10831085 path_free ();
10841086 set_connected (false);
1087+
1088+ struct msg * m = g_agent_test .messages ;
1089+
1090+ while (m ) {
1091+ struct msg * next = m -> next ;
1092+
1093+ free (m -> data );
1094+ free (m );
1095+ m = next ;
1096+ }
10851097}
10861098
10871099/**
@@ -1092,20 +1104,58 @@ agent_test_exit(void)
10921104void
10931105agent_write_to_pipe (const char * data )
10941106{
1095- size_t remain = strlen (data ) + 1 ;
1096- int fd = g_agent_test .pipe [1 ];
1097- if (fd == -1 ) {
1098- return ;
1107+ char * datadup = strdup (data );
1108+ struct msg * msg = malloc (sizeof (* msg ));
1109+
1110+ assert (msg );
1111+ assert (datadup );
1112+
1113+ * msg = (struct msg ){.data = datadup };
1114+
1115+ xpthread_mutex_lock (& g_agent_test .lock );
1116+
1117+ if (!g_agent_test .messages )
1118+ g_agent_test .messages = msg ;
1119+ else {
1120+ for (struct msg * m = g_agent_test .messages ; m ; m = m -> next ) {
1121+ if (!m -> next ) {
1122+ m -> next = msg ;
1123+ break ;
1124+ }
1125+ }
10991126 }
1100- while (remain ) {
1101- xlog_debug ("write pipe %s" , data );
1102- ssize_t ret ;
1103- ret = write (fd , data , remain );
1104- assert_true (ret > 0 );
1105- assert_true (remain >= (size_t )ret );
1106- remain -= ret ;
1107- data += ret ;
1127+
1128+ xlog_debug ("write pipe %s" , data );
1129+ assert (!pthread_cond_broadcast (& g_agent_test .cond ));
1130+ xpthread_mutex_unlock (& g_agent_test .lock );
1131+ }
1132+
1133+ static int
1134+ pop (agent_test_verify_t verify_callback , const void * user_data , va_list va )
1135+ {
1136+ for (struct msg * m = g_agent_test .messages , * prev = NULL ; m ;
1137+ prev = m , m = m -> next ) {
1138+ va_list vac ;
1139+ const char * payload = m -> data ;
1140+
1141+ va_copy (vac , va );
1142+
1143+ if (verify_callback (payload , user_data , vac )) {
1144+ if (prev )
1145+ prev -> next = m -> next ;
1146+ else
1147+ g_agent_test .messages = m -> next ;
1148+
1149+ free (m -> data );
1150+ free (m );
1151+ va_end (vac );
1152+ return 0 ;
1153+ }
1154+
1155+ va_end (vac );
11081156 }
1157+
1158+ return -1 ;
11091159}
11101160
11111161/**
@@ -1117,29 +1167,24 @@ agent_write_to_pipe(const char *data)
11171167void
11181168agent_poll (agent_test_verify_t verify_callback , const void * user_data , ...)
11191169{
1120- int r , fds = g_agent_test .pipe [0 ];
1121- char * payload , c ;
1122- size_t cnt ;
1123- va_list va ;
1170+ int r ;
1171+
1172+ xpthread_mutex_lock (& g_agent_test .lock );
1173+
1174+ do {
1175+ va_list va ;
11241176
1125- payload = NULL ;
1126- for (cnt = 1 ;; cnt ++ ) {
11271177 va_start (va , user_data );
1128- r = read (fds , & c , 1 );
1129- assert_int_equal (r , 1 );
1130-
1131- payload = xrealloc (payload , cnt );
1132- payload [cnt - 1 ] = c ;
1133- if (c == '\0' ) {
1134- if (verify_callback (payload , user_data , va )) {
1135- free (payload );
1136- va_end (va );
1137- return ;
1138- }
1139- cnt = 0 ;
1178+
1179+ if ((r = pop (verify_callback , user_data , va ))) {
1180+ assert (!pthread_cond_wait (& g_agent_test .cond ,
1181+ & g_agent_test .lock .lock ));
11401182 }
1183+
11411184 va_end (va );
1142- }
1185+ } while (r );
1186+
1187+ xpthread_mutex_unlock (& g_agent_test .lock );
11431188}
11441189
11451190void
0 commit comments