@@ -505,8 +505,11 @@ static int ublk_thread_init(struct ublk_thread *t)
505
505
}
506
506
507
507
if (dev -> dev_info .flags & (UBLK_F_SUPPORT_ZERO_COPY | UBLK_F_AUTO_BUF_REG )) {
508
+ unsigned nr_ios = dev -> dev_info .queue_depth * dev -> dev_info .nr_hw_queues ;
509
+ unsigned max_nr_ios_per_thread = nr_ios / dev -> nthreads ;
510
+ max_nr_ios_per_thread += !!(nr_ios % dev -> nthreads );
508
511
ret = io_uring_register_buffers_sparse (
509
- & t -> ring , dev -> dev_info . queue_depth );
512
+ & t -> ring , max_nr_ios_per_thread );
510
513
if (ret ) {
511
514
ublk_err ("ublk dev %d thread %d register spare buffers failed %d" ,
512
515
dev -> dev_info .dev_id , t -> idx , ret );
@@ -578,7 +581,7 @@ static void ublk_set_auto_buf_reg(const struct ublk_queue *q,
578
581
if (q -> tgt_ops -> buf_index )
579
582
buf .index = q -> tgt_ops -> buf_index (q , tag );
580
583
else
581
- buf .index = tag ;
584
+ buf .index = q -> ios [ tag ]. buf_index ;
582
585
583
586
if (q -> state & UBLKSRV_AUTO_BUF_REG_FALLBACK )
584
587
buf .flags = UBLK_AUTO_BUF_REG_FALLBACK ;
@@ -660,18 +663,44 @@ int ublk_queue_io_cmd(struct ublk_io *io)
660
663
661
664
static void ublk_submit_fetch_commands (struct ublk_thread * t )
662
665
{
663
- /*
664
- * Service exclusively the queue whose q_id matches our thread
665
- * index. This may change in the future.
666
- */
667
- struct ublk_queue * q = & t -> dev -> q [t -> idx ];
666
+ struct ublk_queue * q ;
668
667
struct ublk_io * io ;
669
- int i = 0 ;
668
+ int i = 0 , j = 0 ;
670
669
671
- for (i = 0 ; i < q -> q_depth ; i ++ ) {
672
- io = & q -> ios [i ];
673
- io -> t = t ;
674
- ublk_queue_io_cmd (io );
670
+ if (t -> dev -> per_io_tasks ) {
671
+ /*
672
+ * Lexicographically order all the (qid,tag) pairs, with
673
+ * qid taking priority (so (1,0) > (0,1)). Then make
674
+ * this thread the daemon for every Nth entry in this
675
+ * list (N is the number of threads), starting at this
676
+ * thread's index. This ensures that each queue is
677
+ * handled by as many ublk server threads as possible,
678
+ * so that load that is concentrated on one or a few
679
+ * queues can make use of all ublk server threads.
680
+ */
681
+ const struct ublksrv_ctrl_dev_info * dinfo = & t -> dev -> dev_info ;
682
+ int nr_ios = dinfo -> nr_hw_queues * dinfo -> queue_depth ;
683
+ for (i = t -> idx ; i < nr_ios ; i += t -> dev -> nthreads ) {
684
+ int q_id = i / dinfo -> queue_depth ;
685
+ int tag = i % dinfo -> queue_depth ;
686
+ q = & t -> dev -> q [q_id ];
687
+ io = & q -> ios [tag ];
688
+ io -> t = t ;
689
+ io -> buf_index = j ++ ;
690
+ ublk_queue_io_cmd (io );
691
+ }
692
+ } else {
693
+ /*
694
+ * Service exclusively the queue whose q_id matches our
695
+ * thread index.
696
+ */
697
+ struct ublk_queue * q = & t -> dev -> q [t -> idx ];
698
+ for (i = 0 ; i < q -> q_depth ; i ++ ) {
699
+ io = & q -> ios [i ];
700
+ io -> t = t ;
701
+ io -> buf_index = i ;
702
+ ublk_queue_io_cmd (io );
703
+ }
675
704
}
676
705
}
677
706
@@ -826,7 +855,8 @@ static void *ublk_io_handler_fn(void *data)
826
855
return NULL ;
827
856
}
828
857
/* IO perf is sensitive with queue pthread affinity on NUMA machine*/
829
- ublk_thread_set_sched_affinity (t , info -> affinity );
858
+ if (info -> affinity )
859
+ ublk_thread_set_sched_affinity (t , info -> affinity );
830
860
sem_post (info -> ready );
831
861
832
862
ublk_dbg (UBLK_DBG_THREAD , "tid %d: ublk dev %d thread %u started\n" ,
@@ -893,7 +923,7 @@ static int ublk_start_daemon(const struct dev_ctx *ctx, struct ublk_dev *dev)
893
923
894
924
ublk_dbg (UBLK_DBG_DEV , "%s enter\n" , __func__ );
895
925
896
- tinfo = calloc (sizeof (struct ublk_thread_info ), dinfo -> nr_hw_queues );
926
+ tinfo = calloc (sizeof (struct ublk_thread_info ), dev -> nthreads );
897
927
if (!tinfo )
898
928
return - ENOMEM ;
899
929
@@ -919,17 +949,29 @@ static int ublk_start_daemon(const struct dev_ctx *ctx, struct ublk_dev *dev)
919
949
dinfo -> dev_id , i );
920
950
goto fail ;
921
951
}
952
+ }
922
953
954
+ for (i = 0 ; i < dev -> nthreads ; i ++ ) {
923
955
tinfo [i ].dev = dev ;
924
956
tinfo [i ].idx = i ;
925
957
tinfo [i ].ready = & ready ;
926
- tinfo [i ].affinity = & affinity_buf [i ];
958
+
959
+ /*
960
+ * If threads are not tied 1:1 to queues, setting thread
961
+ * affinity based on queue affinity makes little sense.
962
+ * However, thread CPU affinity has significant impact
963
+ * on performance, so to compare fairly, we'll still set
964
+ * thread CPU affinity based on queue affinity where
965
+ * possible.
966
+ */
967
+ if (dev -> nthreads == dinfo -> nr_hw_queues )
968
+ tinfo [i ].affinity = & affinity_buf [i ];
927
969
pthread_create (& dev -> threads [i ].thread , NULL ,
928
970
ublk_io_handler_fn ,
929
971
& tinfo [i ]);
930
972
}
931
973
932
- for (i = 0 ; i < dinfo -> nr_hw_queues ; i ++ )
974
+ for (i = 0 ; i < dev -> nthreads ; i ++ )
933
975
sem_wait (& ready );
934
976
free (tinfo );
935
977
free (affinity_buf );
@@ -953,7 +995,7 @@ static int ublk_start_daemon(const struct dev_ctx *ctx, struct ublk_dev *dev)
953
995
ublk_send_dev_event (ctx , dev , dev -> dev_info .dev_id );
954
996
955
997
/* wait until we are terminated */
956
- for (i = 0 ; i < dinfo -> nr_hw_queues ; i ++ )
998
+ for (i = 0 ; i < dev -> nthreads ; i ++ )
957
999
pthread_join (dev -> threads [i ].thread , & thread_ret );
958
1000
fail :
959
1001
for (i = 0 ; i < dinfo -> nr_hw_queues ; i ++ )
@@ -1063,6 +1105,7 @@ static int ublk_stop_io_daemon(const struct ublk_dev *dev)
1063
1105
1064
1106
static int __cmd_dev_add (const struct dev_ctx * ctx )
1065
1107
{
1108
+ unsigned nthreads = ctx -> nthreads ;
1066
1109
unsigned nr_queues = ctx -> nr_hw_queues ;
1067
1110
const char * tgt_type = ctx -> tgt_type ;
1068
1111
unsigned depth = ctx -> queue_depth ;
@@ -1086,6 +1129,23 @@ static int __cmd_dev_add(const struct dev_ctx *ctx)
1086
1129
return - EINVAL ;
1087
1130
}
1088
1131
1132
+ /* default to 1:1 threads:queues if nthreads is unspecified */
1133
+ if (!nthreads )
1134
+ nthreads = nr_queues ;
1135
+
1136
+ if (nthreads > UBLK_MAX_THREADS ) {
1137
+ ublk_err ("%s: %u is too many threads (max %u)\n" ,
1138
+ __func__ , nthreads , UBLK_MAX_THREADS );
1139
+ return - EINVAL ;
1140
+ }
1141
+
1142
+ if (nthreads != nr_queues && !ctx -> per_io_tasks ) {
1143
+ ublk_err ("%s: threads %u must be same as queues %u if "
1144
+ "not using per_io_tasks\n" ,
1145
+ __func__ , nthreads , nr_queues );
1146
+ return - EINVAL ;
1147
+ }
1148
+
1089
1149
dev = ublk_ctrl_init ();
1090
1150
if (!dev ) {
1091
1151
ublk_err ("%s: can't alloc dev id %d, type %s\n" ,
@@ -1109,6 +1169,8 @@ static int __cmd_dev_add(const struct dev_ctx *ctx)
1109
1169
if ((features & UBLK_F_QUIESCE ) &&
1110
1170
(info -> flags & UBLK_F_USER_RECOVERY ))
1111
1171
info -> flags |= UBLK_F_QUIESCE ;
1172
+ dev -> nthreads = nthreads ;
1173
+ dev -> per_io_tasks = ctx -> per_io_tasks ;
1112
1174
dev -> tgt .ops = ops ;
1113
1175
dev -> tgt .sq_depth = depth ;
1114
1176
dev -> tgt .cq_depth = depth ;
@@ -1307,6 +1369,7 @@ static int cmd_dev_get_features(void)
1307
1369
[const_ilog2 (UBLK_F_UPDATE_SIZE )] = "UPDATE_SIZE" ,
1308
1370
[const_ilog2 (UBLK_F_AUTO_BUF_REG )] = "AUTO_BUF_REG" ,
1309
1371
[const_ilog2 (UBLK_F_QUIESCE )] = "QUIESCE" ,
1372
+ [const_ilog2 (UBLK_F_PER_IO_DAEMON )] = "PER_IO_DAEMON" ,
1310
1373
};
1311
1374
struct ublk_dev * dev ;
1312
1375
__u64 features = 0 ;
@@ -1401,8 +1464,10 @@ static void __cmd_create_help(char *exe, bool recovery)
1401
1464
exe , recovery ? "recover" : "add" );
1402
1465
printf ("\t[--foreground] [--quiet] [-z] [--auto_zc] [--auto_zc_fallback] [--debug_mask mask] [-r 0|1 ] [-g]\n" );
1403
1466
printf ("\t[-e 0|1 ] [-i 0|1]\n" );
1467
+ printf ("\t[--nthreads threads] [--per_io_tasks]\n" );
1404
1468
printf ("\t[target options] [backfile1] [backfile2] ...\n" );
1405
1469
printf ("\tdefault: nr_queues=2(max 32), depth=128(max 1024), dev_id=-1(auto allocation)\n" );
1470
+ printf ("\tdefault: nthreads=nr_queues" );
1406
1471
1407
1472
for (i = 0 ; i < sizeof (tgt_ops_list ) / sizeof (tgt_ops_list [0 ]); i ++ ) {
1408
1473
const struct ublk_tgt_ops * ops = tgt_ops_list [i ];
@@ -1459,6 +1524,8 @@ int main(int argc, char *argv[])
1459
1524
{ "auto_zc" , 0 , NULL , 0 },
1460
1525
{ "auto_zc_fallback" , 0 , NULL , 0 },
1461
1526
{ "size" , 1 , NULL , 's' },
1527
+ { "nthreads" , 1 , NULL , 0 },
1528
+ { "per_io_tasks" , 0 , NULL , 0 },
1462
1529
{ 0 , 0 , 0 , 0 }
1463
1530
};
1464
1531
const struct ublk_tgt_ops * ops = NULL ;
@@ -1534,6 +1601,10 @@ int main(int argc, char *argv[])
1534
1601
ctx .flags |= UBLK_F_AUTO_BUF_REG ;
1535
1602
if (!strcmp (longopts [option_idx ].name , "auto_zc_fallback" ))
1536
1603
ctx .auto_zc_fallback = 1 ;
1604
+ if (!strcmp (longopts [option_idx ].name , "nthreads" ))
1605
+ ctx .nthreads = strtol (optarg , NULL , 10 );
1606
+ if (!strcmp (longopts [option_idx ].name , "per_io_tasks" ))
1607
+ ctx .per_io_tasks = 1 ;
1537
1608
break ;
1538
1609
case '?' :
1539
1610
/*
0 commit comments