@@ -35,7 +35,7 @@ R_API RThreadChannel *r_th_channel_new(RThreadFunction consumer, void *user) {
3535 free (tc );
3636 return NULL ;
3737 }
38- // Create consumer thread
38+ // Create consumer thread (caller must call r_th_start to launch)
3939 tc -> consumer = r_th_new (consumer , user , 0 );
4040 if (!tc -> consumer ) {
4141 r_list_free (tc -> stack );
@@ -72,7 +72,8 @@ R_API RThreadChannelMessage *r_th_channel_message_new(RThreadChannel *tc, const
7272 cm -> id = tc -> nextid ;
7373 cm -> msg = r_mem_dup (msg , len );
7474 cm -> len = len ;
75- cm -> sem = r_th_sem_new (1 );
75+ // Initialize message semaphore to 0 so readers block until posted
76+ cm -> sem = r_th_sem_new (0 );
7677 // r_th_sem_wait (cm->sem); // busy because stack is empty
7778 cm -> lock = r_th_lock_new (false); // locked here
7879 }
@@ -82,46 +83,36 @@ R_API RThreadChannelMessage *r_th_channel_message_new(RThreadChannel *tc, const
8283R_API RThreadChannelMessage * r_th_channel_message_read (RThreadChannel * tc , RThreadChannelMessage * cm ) {
8384 R_LOG_DEBUG ("r_th_channel_message_read" );
8485 if (cm ) {
85- eprintf ("wait\n" );
8686 r_th_sem_wait (cm -> sem );
87- eprintf ("waited\n" );
8887 } else {
89- eprintf ("not waited\n" );
9088 // Don't create a dangling reference
9189 }
9290 return cm ;
9391}
9492
9593R_API RThreadChannelMessage * r_th_channel_promise_wait (RThreadChannelPromise * promise ) {
96- // wait for a message to be delivered, find one with the same promise id
97- // RThreadChannelMessage *message = r_th_channel_message_new (promise->tc, "x", 0);
98- // append message into the queue
99- if (!promise || !promise -> tc ) {
100- R_LOG_ERROR ("Invalid promise or thread channel in r_th_channel_promise_wait" );
94+ if (!promise || !promise -> tc || !promise -> message ) {
95+ R_LOG_ERROR ("Invalid promise or thread channel in r_th_channel_promise_wait" );
10196 return NULL ;
10297 }
103-
104- while (true) {
98+ RThreadChannel * tc = promise -> tc ;
99+ RThreadChannelMessage * cm = promise -> message ;
100+ // Wait for the consumer to signal the response
101+ r_th_sem_wait (cm -> sem );
102+ // Remove the message from the responses list to avoid double-free
103+ if (tc -> responses ) {
104+ r_th_lock_enter (tc -> lock );
105105 RListIter * iter ;
106106 RThreadChannelMessage * res ;
107- if (!r_th_lock_enter (promise -> tc -> lock )) {
108- R_LOG_ERROR ("Failed to acquire lock in r_th_channel_promise_wait" );
109- break ;
110- }
111- if (promise -> tc -> responses ) {
112- r_list_foreach (promise -> tc -> responses , iter , res ) {
113- if (res && res -> id == promise -> id ) {
114- r_list_split_iter (promise -> tc -> responses , iter );
115- r_th_lock_leave (promise -> tc -> lock );
116- return res ;
117- }
107+ r_list_foreach (tc -> responses , iter , res ) {
108+ if (res == cm ) {
109+ r_list_split_iter (tc -> responses , iter );
110+ break ;
118111 }
119112 }
120- r_th_lock_leave (promise -> tc -> lock );
121- // Sleep briefly to avoid CPU spinning
122- r_sys_usleep (1000 ); // 1ms sleep between checks
113+ r_th_lock_leave (tc -> lock );
123114 }
124- return NULL ;
115+ return cm ;
125116}
126117
127118R_API RThreadChannelPromise * r_th_channel_promise_new (RThreadChannel * tc ) {
@@ -139,11 +130,12 @@ R_API RThreadChannelPromise *r_th_channel_promise_new(RThreadChannel *tc) {
139130
140131// to be called only from the consumer thread
141132R_API void r_th_channel_post (RThreadChannel * tc , RThreadChannelMessage * cm ) {
133+ // Post a response from the consumer thread
142134 r_th_lock_enter (tc -> lock );
143- // TODO: lock struct
144135 r_list_append (tc -> responses , cm );
145- r_th_sem_post (tc -> sem );
146136 r_th_lock_leave (tc -> lock );
137+ // Signal any reader waiting on this message
138+ r_th_sem_post (cm -> sem );
147139}
148140
149141R_API RThreadChannelPromise * r_th_channel_query (RThreadChannel * tc , RThreadChannelMessage * cm ) {
@@ -152,6 +144,8 @@ R_API RThreadChannelPromise *r_th_channel_query(RThreadChannel *tc, RThreadChann
152144 return NULL ;
153145 }
154146 promise -> id = cm -> id ;
147+ promise -> message = cm ;
148+ // Enqueue the message for processing by the consumer thread
155149 r_th_channel_write (tc , cm );
156150 return promise ;
157151}
0 commit comments