@@ -91,6 +91,12 @@ static int n_tcpsrv = 0;
9191
9292static permittedPeers_t * pPermPeersRoot = NULL ;
9393
94+ /* default number of workes to configure. We choose 2, as this is probably good for
95+ * many installations. High-Volume ones may need much higher number!
96+ */
97+ #define DEFAULT_NUMWRKR 2
98+ #define DEFAULT_STARVATIONMAXREADS 500
99+
94100#define FRAMING_UNSET -1
95101
96102/* config settings */
@@ -123,6 +129,7 @@ static struct configSettings_s {
123129struct instanceConf_s {
124130 int iTCPSessMax ;
125131 int iTCPLstnMax ;
132+ unsigned numWrkr ;
126133 tcpLstnParams_t * cnf_params ; /**< listener config parameters */
127134 uchar * pszBindRuleset ; /* name of ruleset to bind to */
128135 ruleset_t * pBindRuleset ; /* ruleset to bind listener to (use system default if unspecified) */
@@ -157,6 +164,7 @@ struct instanceConf_s {
157164 int iKeepAliveIntvl ;
158165 int iKeepAliveProbes ;
159166 int iKeepAliveTime ;
167+ unsigned starvationMaxReads ;
160168 struct instanceConf_s * next ;
161169};
162170
@@ -166,6 +174,7 @@ struct modConfData_s {
166174 instanceConf_t * root , * tail ;
167175 int iTCPSessMax ; /* max number of sessions */
168176 int iTCPLstnMax ; /* max number of sessions */
177+ unsigned numWrkr ;
169178 int iStrmDrvrMode ; /* mode for stream driver, driver-dependent (0 mostly means plain tcp) */
170179 int iStrmDrvrExtendedCertCheck ; /* verify also purpose OID in certificate extended field */
171180 int iStrmDrvrSANPreference ; /* ignore CN when any SAN set */
@@ -193,6 +202,7 @@ struct modConfData_s {
193202 permittedPeers_t * pPermPeersRoot ;
194203 sbool configSetViaV2Method ;
195204 sbool bPreserveCase ; /* preserve case of fromhost; true by default */
205+ unsigned starvationMaxReads ;
196206};
197207
198208static modConfData_t * loadModConf = NULL ;/* modConf ptr to use for the current load process */
@@ -211,6 +221,8 @@ static struct cnfparamdescr modpdescr[] = {
211221 { "maxsessions" , eCmdHdlrPositiveInt , 0 },
212222 { "maxlistners" , eCmdHdlrPositiveInt , 0 },
213223 { "maxlisteners" , eCmdHdlrPositiveInt , 0 },
224+ { "workerthreads" , eCmdHdlrPositiveInt , 0 },
225+ { "starvationprotection.maxreads" , eCmdHdlrNonNegInt , 0 },
214226 { "streamdriver.mode" , eCmdHdlrNonNegInt , 0 },
215227 { "streamdriver.authmode" , eCmdHdlrString , 0 },
216228 { "streamdriver.permitexpiredcerts" , eCmdHdlrString , 0 },
@@ -241,6 +253,7 @@ static struct cnfparamdescr inppdescr[] = {
241253 { "port" , eCmdHdlrString , CNFPARAM_REQUIRED }, /* legacy: InputTCPServerRun */
242254 { "maxsessions" , eCmdHdlrPositiveInt , 0 },
243255 { "maxlisteners" , eCmdHdlrPositiveInt , 0 },
256+ { "workerthreads" , eCmdHdlrPositiveInt , 0 },
244257 { "flowcontrol" , eCmdHdlrBinary , 0 },
245258 { "disablelfdelimiter" , eCmdHdlrBinary , 0 },
246259 { "discardtruncatedmsg" , eCmdHdlrBinary , 0 },
@@ -254,6 +267,7 @@ static struct cnfparamdescr inppdescr[] = {
254267 { "name" , eCmdHdlrString , 0 },
255268 { "defaulttz" , eCmdHdlrString , 0 },
256269 { "ruleset" , eCmdHdlrString , 0 },
270+ { "starvationprotection.maxreads" , eCmdHdlrNonNegInt , 0 },
257271 { "streamdriver.mode" , eCmdHdlrNonNegInt , 0 },
258272 { "streamdriver.authmode" , eCmdHdlrString , 0 },
259273 { "streamdriver.permitexpiredcerts" , eCmdHdlrString , 0 },
@@ -403,6 +417,8 @@ createInstance(instanceConf_t **pinst)
403417 inst -> iSynBacklog = 0 ; /* default: unset */
404418 inst -> iTCPLstnMax = loadModConf -> iTCPLstnMax ;
405419 inst -> iTCPSessMax = loadModConf -> iTCPSessMax ;
420+ inst -> numWrkr = loadModConf -> numWrkr ;
421+ inst -> starvationMaxReads = loadModConf -> starvationMaxReads ;
406422
407423 inst -> cnf_params -> pszLstnPortFileName = NULL ;
408424
@@ -473,6 +489,8 @@ static rsRetVal addInstance(void __attribute__((unused)) *pVal, uchar *pNewVal)
473489 inst -> iAddtlFrameDelim = cs .iAddtlFrameDelim ;
474490 inst -> iTCPLstnMax = cs .iTCPLstnMax ;
475491 inst -> iTCPSessMax = cs .iTCPSessMax ;
492+ inst -> numWrkr = DEFAULT_NUMWRKR ;
493+ inst -> starvationMaxReads = DEFAULT_STARVATIONMAXREADS ;
476494 inst -> iStrmDrvrMode = cs .iStrmDrvrMode ;
477495
478496finalize_it :
@@ -497,6 +515,8 @@ addListner(modConfData_t *modConf, instanceConf_t *inst)
497515 CHKiRet (tcpsrv .SetCBOnRegularClose (pOurTcpsrv , onRegularClose ));
498516 CHKiRet (tcpsrv .SetCBOnErrClose (pOurTcpsrv , onErrClose ));
499517 /* params */
518+ CHKiRet (tcpsrv .SetNumWrkr (pOurTcpsrv , inst -> numWrkr ));
519+ CHKiRet (tcpsrv .SetStarvationMaxReads (pOurTcpsrv , inst -> starvationMaxReads ));
500520 CHKiRet (tcpsrv .SetKeepAlive (pOurTcpsrv , inst -> bKeepAlive ));
501521 CHKiRet (tcpsrv .SetKeepAliveIntvl (pOurTcpsrv , inst -> iKeepAliveIntvl ));
502522 CHKiRet (tcpsrv .SetKeepAliveProbes (pOurTcpsrv , inst -> iKeepAliveProbes ));
@@ -652,6 +672,8 @@ CODESTARTnewInpInst
652672 inst -> pszStrmDrvrCertFile = (uchar * )es_str2cstr (pvals [i ].val .d .estr , NULL );
653673 } else if (!strcmp (inppblk .descr [i ].name , "streamdriver.name" )) {
654674 inst -> pszStrmDrvrName = (uchar * )es_str2cstr (pvals [i ].val .d .estr , NULL );
675+ } else if (!strcmp (inppblk .descr [i ].name , "starvationprotection.maxreads" )) {
676+ inst -> starvationMaxReads = (unsigned ) pvals [i ].val .d .n ;
655677 } else if (!strcmp (inppblk .descr [i ].name , "gnutlsprioritystring" )) {
656678 inst -> gnutlsPriorityString = (uchar * )es_str2cstr (pvals [i ].val .d .estr , NULL );
657679 } else if (!strcmp (inppblk .descr [i ].name , "permittedpeer" )) {
@@ -685,6 +707,8 @@ CODESTARTnewInpInst
685707 inst -> iTCPSessMax = (int ) pvals [i ].val .d .n ;
686708 } else if (!strcmp (inppblk .descr [i ].name , "maxlisteners" )) {
687709 inst -> iTCPLstnMax = (int ) pvals [i ].val .d .n ;
710+ } else if (!strcmp (inppblk .descr [i ].name , "workerthreads" )) {
711+ inst -> numWrkr = (int ) pvals [i ].val .d .n ;
688712 } else if (!strcmp (inppblk .descr [i ].name , "supportoctetcountedframing" )) {
689713 inst -> cnf_params -> bSuppOctetFram = (int ) pvals [i ].val .d .n ;
690714 } else if (!strcmp (inppblk .descr [i ].name , "keepalive" )) {
@@ -723,6 +747,8 @@ CODESTARTbeginCnfLoad
723747 /* init our settings */
724748 loadModConf -> iTCPSessMax = 200 ;
725749 loadModConf -> iTCPLstnMax = 20 ;
750+ loadModConf -> numWrkr = DEFAULT_NUMWRKR ;
751+ loadModConf -> starvationMaxReads = DEFAULT_STARVATIONMAXREADS ;
726752 loadModConf -> bSuppOctetFram = 1 ;
727753 loadModConf -> iStrmDrvrMode = 0 ;
728754 loadModConf -> iStrmDrvrExtendedCertCheck = 0 ;
@@ -800,9 +826,13 @@ CODESTARTsetModCnf
800826 }
801827 } else if (!strcmp (modpblk .descr [i ].name , "maxsessions" )) {
802828 loadModConf -> iTCPSessMax = (int ) pvals [i ].val .d .n ;
829+ } else if (!strcmp (modpblk .descr [i ].name , "starvationprotection.maxreads" )) {
830+ loadModConf -> starvationMaxReads = (unsigned ) pvals [i ].val .d .n ;
803831 } else if (!strcmp (modpblk .descr [i ].name , "maxlisteners" ) ||
804832 !strcmp (modpblk .descr [i ].name , "maxlistners" )) { /* keep old name for a while */
805833 loadModConf -> iTCPLstnMax = (int ) pvals [i ].val .d .n ;
834+ } else if (!strcmp (modpblk .descr [i ].name , "workerthreads" )) {
835+ loadModConf -> numWrkr = (int ) pvals [i ].val .d .n ;
806836 } else if (!strcmp (modpblk .descr [i ].name , "keepalive" )) {
807837 loadModConf -> bKeepAlive = (int ) pvals [i ].val .d .n ;
808838 } else if (!strcmp (modpblk .descr [i ].name , "keepalive.probes" )) {
@@ -1078,7 +1108,6 @@ CODESTARTafterRun
10781108 tcpsrv_etry_t * del ;
10791109 while (etry != NULL ) {
10801110 iRet = tcpsrv .Destruct (& etry -> tcpsrv );
1081- // TODO: check iRet, reprot error
10821111 del = etry ;
10831112 etry = etry -> next ;
10841113 free (del );
0 commit comments