@@ -75,7 +75,7 @@ static void on_database_write_ready(h2o_socket_t *sock, const char *err);
7575static void on_process_queries (void * arg );
7676static void poll_database_connection (h2o_socket_t * sock , const char * err );
7777static void prepare_statements (db_conn_t * conn );
78- static void process_queries (db_conn_t * conn , bool removed );
78+ static void process_queries (db_conn_pool_t * pool );
7979static void remove_connection (db_conn_t * conn );
8080static void start_database_connect (db_conn_pool_t * pool , db_conn_t * conn );
8181
@@ -237,7 +237,10 @@ static void on_database_connect_read_ready(h2o_socket_t *sock, const char *err)
237237 h2o_timer_unlink (& conn -> timer );
238238 h2o_socket_read_stop (conn -> sock );
239239 h2o_socket_read_start (conn -> sock , on_database_read_ready );
240- process_queries (conn , true);
240+ * conn -> pool -> conn .tail = & conn -> l ;
241+ conn -> pool -> conn .tail = & conn -> l .next ;
242+ conn -> l .next = NULL ;
243+ process_queries (conn -> pool );
241244 return ;
242245 default :
243246 LIBRARY_ERROR ("PQresultStatus" , PQresultErrorMessage (result ));
@@ -370,7 +373,13 @@ static void on_database_read_ready(h2o_socket_t *sock, const char *err)
370373 for (PGnotify * notify = PQnotifies (conn -> conn ); notify ; notify = PQnotifies (conn -> conn ))
371374 PQfreemem (notify );
372375
373- process_queries (conn , removed );
376+ if (removed && conn -> query_num ) {
377+ * conn -> pool -> conn .tail = & conn -> l ;
378+ conn -> pool -> conn .tail = & conn -> l .next ;
379+ conn -> l .next = NULL ;
380+ }
381+
382+ process_queries (conn -> pool );
374383}
375384
376385static void on_database_timeout (h2o_timer_t * timer )
@@ -405,20 +414,83 @@ static void on_database_write_ready(h2o_socket_t *sock, const char *err)
405414
406415static void on_process_queries (void * arg )
407416{
417+ list_t * iter = NULL ;
408418 db_conn_pool_t * const pool = arg ;
419+ size_t query_num = 0 ;
409420
410- while (pool -> queries .head && pool -> conn ) {
411- db_conn_t * const conn = H2O_STRUCT_FROM_MEMBER (db_conn_t , l , pool -> conn );
421+ while (pool -> queries .head && pool -> conn .head ) {
422+ db_conn_t * const conn = H2O_STRUCT_FROM_MEMBER (db_conn_t , l , pool -> conn .head );
423+ db_query_param_t * const param = H2O_STRUCT_FROM_MEMBER (db_query_param_t ,
424+ l ,
425+ conn -> pool -> queries .head );
412426
413- pool -> conn = conn -> l .next ;
414427 assert (conn -> query_num );
415- process_queries (conn , true);
428+ assert (pool -> query_num < pool -> config -> max_query_num );
429+ pool -> conn .head = conn -> l .next ;
430+ pool -> queries .head = param -> l .next ;
431+
432+ if (!pool -> conn .head ) {
433+ assert (pool -> conn .tail == & conn -> l .next );
434+ pool -> conn .tail = & pool -> conn .head ;
435+ }
436+
437+ if (++ pool -> query_num == pool -> config -> max_query_num ) {
438+ assert (!pool -> queries .head );
439+ assert (pool -> queries .tail == & param -> l .next );
440+ pool -> queries .tail = & pool -> queries .head ;
441+ }
442+
443+ if (do_execute_query (conn , param )) {
444+ param -> on_error (param , DB_ERROR );
445+ on_database_error (conn , DB_ERROR );
446+ }
447+ else {
448+ query_num ++ ;
449+
450+ if (conn -> query_num ) {
451+ * pool -> conn .tail = & conn -> l ;
452+ pool -> conn .tail = & conn -> l .next ;
453+ conn -> l .next = NULL ;
454+ }
455+ else {
456+ conn -> l .next = iter ;
457+ iter = & conn -> l ;
458+ }
459+ }
416460 }
417461
418- if (pool -> queries .head && pool -> conn_num )
419- start_database_connect (pool , NULL );
462+ if (iter )
463+ do {
464+ db_conn_t * const conn = H2O_STRUCT_FROM_MEMBER (db_conn_t , l , iter );
465+
466+ iter = conn -> l .next ;
467+
468+ if (flush_connection (on_database_write_ready , conn ))
469+ on_database_error (conn , DB_ERROR );
470+ } while (iter );
420471
472+ pool -> conn .tail = & pool -> conn .head ;
421473 pool -> process_queries = false;
474+ query_num += pool -> config -> max_query_num - pool -> query_num ;
475+
476+ for (iter = pool -> conn .head ; iter ;) {
477+ db_conn_t * const conn = H2O_STRUCT_FROM_MEMBER (db_conn_t , l , iter );
478+
479+ iter = conn -> l .next ;
480+
481+ if (flush_connection (on_database_write_ready , conn )) {
482+ * pool -> conn .tail = iter ;
483+ on_database_error (conn , DB_ERROR );
484+ }
485+ else
486+ pool -> conn .tail = & conn -> l .next ;
487+ }
488+
489+ const size_t conn_num = pool -> config -> max_db_conn_num - pool -> conn_num ;
490+
491+ if (query_num > conn_num )
492+ for (query_num -= conn_num ; pool -> conn_num && query_num ; query_num -- )
493+ start_database_connect (pool , NULL );
422494}
423495
424496static void poll_database_connection (h2o_socket_t * sock , const char * err )
@@ -536,54 +608,44 @@ static void prepare_statements(db_conn_t *conn)
536608 }
537609 else {
538610 h2o_socket_read_start (conn -> sock , on_database_read_ready );
539- process_queries (conn , true);
611+ * conn -> pool -> conn .tail = & conn -> l ;
612+ conn -> pool -> conn .tail = & conn -> l .next ;
613+ conn -> l .next = NULL ;
614+ process_queries (conn -> pool );
540615 }
541616}
542617
543- static void process_queries (db_conn_t * conn , bool removed )
618+ static void process_queries (db_conn_pool_t * pool )
544619{
545- const bool flush = conn -> query_num && conn -> pool -> queries .head ;
546-
547- while (conn -> query_num && conn -> pool -> queries .head ) {
548- db_query_param_t * const param = H2O_STRUCT_FROM_MEMBER (db_query_param_t ,
549- l ,
550- conn -> pool -> queries .head );
551-
552- if (++ conn -> pool -> query_num == conn -> pool -> config -> max_query_num ) {
553- assert (conn -> pool -> queries .tail == & param -> l .next );
554- conn -> pool -> queries .tail = & conn -> pool -> queries .head ;
555- }
556-
557- conn -> pool -> queries .head = param -> l .next ;
558-
559- if (do_execute_query (conn , param )) {
560- param -> on_error (param , DB_ERROR );
561- on_database_error (conn , DB_ERROR );
562- return ;
563- }
564- }
565-
566- if (flush && flush_connection (on_database_write_ready , conn ))
567- on_database_error (conn , DB_ERROR );
568- else if (conn -> query_num && removed ) {
569- conn -> l .next = conn -> pool -> conn ;
570- conn -> pool -> conn = & conn -> l ;
620+ if (!pool -> process_queries && pool -> queries .head ) {
621+ task_message_t * const msg = h2o_mem_alloc (sizeof (* msg ));
622+
623+ assert (pool -> query_num < pool -> config -> max_query_num );
624+ memset (msg , 0 , sizeof (* msg ));
625+ msg -> arg = pool ;
626+ msg -> super .type = TASK ;
627+ msg -> task = on_process_queries ;
628+ pool -> process_queries = true;
629+ send_local_message (& msg -> super , pool -> local_messages );
571630 }
572- else if (!conn -> query_num && !removed )
573- // This call should not be problematic, assuming a relatively low number of connections.
574- remove_connection (conn );
575631}
576632
577633static void remove_connection (db_conn_t * conn )
578634{
579- list_t * iter = conn -> pool -> conn ;
580- list_t * * prev = & conn -> pool -> conn ;
635+ list_t * iter = conn -> pool -> conn . head ;
636+ list_t * * prev = & conn -> pool -> conn . head ;
581637
582638 for (; iter && iter != & conn -> l ; iter = iter -> next )
583639 prev = & iter -> next ;
584640
585- if (iter )
641+ if (iter ) {
586642 * prev = iter -> next ;
643+
644+ if (!conn -> pool -> conn .head ) {
645+ assert (conn -> pool -> conn .tail == & iter -> next );
646+ conn -> pool -> conn .tail = & conn -> pool -> conn .head ;
647+ }
648+ }
587649}
588650
589651static void start_database_connect (db_conn_pool_t * pool , db_conn_t * conn )
@@ -661,37 +723,15 @@ int execute_database_query(db_conn_pool_t *pool, db_query_param_t *param)
661723 int ret = 1 ;
662724
663725 if (pool -> query_num ) {
664- if (pool -> conn ) {
665- // Delay sending the database queries to the server, so that if there is a rapid
666- // succession of calls to this function, all resultant queries would be inserted
667- // into a command pipeline with a smaller number of system calls.
668- if (!pool -> process_queries ) {
669- task_message_t * const msg = h2o_mem_alloc (sizeof (* msg ));
670-
671- memset (msg , 0 , sizeof (* msg ));
672- msg -> arg = pool ;
673- msg -> super .type = TASK ;
674- msg -> task = on_process_queries ;
675- send_local_message (& msg -> super , pool -> local_messages );
676- pool -> process_queries = true;
677- }
678-
679- ret = 0 ;
680- }
681- else {
682- if (pool -> conn_num )
683- start_database_connect (pool , NULL );
684-
685- if (pool -> conn_num < pool -> config -> max_db_conn_num && pool -> query_num )
686- ret = 0 ;
687- }
688-
689- if (!ret ) {
690- param -> l .next = NULL ;
691- * pool -> queries .tail = & param -> l ;
692- pool -> queries .tail = & param -> l .next ;
693- pool -> query_num -- ;
694- }
726+ // Delay sending the database queries to the server, so that if there is a rapid
727+ // succession of calls to this function, all resultant queries would be inserted
728+ // into a command pipeline with a smaller number of system calls.
729+ param -> l .next = NULL ;
730+ * pool -> queries .tail = & param -> l ;
731+ pool -> queries .tail = & param -> l .next ;
732+ pool -> query_num -- ;
733+ process_queries (pool );
734+ ret = 0 ;
695735 }
696736
697737 return ret ;
@@ -704,9 +744,9 @@ void free_database_connection_pool(db_conn_pool_t *pool)
704744
705745 size_t num = 0 ;
706746
707- if (pool -> conn )
747+ if (pool -> conn . head )
708748 do {
709- db_conn_t * const conn = H2O_STRUCT_FROM_MEMBER (db_conn_t , l , pool -> conn );
749+ db_conn_t * const conn = H2O_STRUCT_FROM_MEMBER (db_conn_t , l , pool -> conn . head );
710750
711751 assert (!conn -> queries .head );
712752 assert (conn -> query_num == pool -> config -> max_pipeline_query_num );
@@ -715,10 +755,10 @@ void free_database_connection_pool(db_conn_pool_t *pool)
715755 h2o_socket_read_stop (conn -> sock );
716756 h2o_socket_close (conn -> sock );
717757 PQfinish (conn -> conn );
718- pool -> conn = pool -> conn -> next ;
719- free (conn );
758+ pool -> conn .head = conn -> l .next ;
720759 num ++ ;
721- } while (pool -> conn );
760+ free (conn );
761+ } while (pool -> conn .head );
722762
723763 assert (num + pool -> conn_num == pool -> config -> max_db_conn_num );
724764}
@@ -732,6 +772,7 @@ void initialize_database_connection_pool(const char *conninfo,
732772{
733773 memset (pool , 0 , sizeof (* pool ));
734774 pool -> config = config ;
775+ pool -> conn .tail = & pool -> conn .head ;
735776 pool -> conninfo = conninfo ? conninfo : "" ;
736777 pool -> local_messages = local_messages ;
737778 pool -> loop = loop ;
0 commit comments