55 "context"
66 "encoding/json"
77 "fmt"
8- "github.com/openshift/origin/pkg/dataloader"
98 "io"
109 "io/ioutil"
1110 "math/rand"
@@ -22,12 +21,16 @@ import (
2221 "github.com/openshift-eng/openshift-tests-extension/pkg/extension"
2322 "github.com/openshift-eng/openshift-tests-extension/pkg/extension/extensiontests"
2423 configv1 "github.com/openshift/api/config/v1"
24+ "github.com/openshift/origin/pkg/dataloader"
2525 "github.com/pkg/errors"
2626 "github.com/sirupsen/logrus"
2727 "github.com/spf13/pflag"
2828 "golang.org/x/mod/semver"
29+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2930 "k8s.io/apimachinery/pkg/util/sets"
3031 "k8s.io/cli-runtime/pkg/genericclioptions"
32+ "k8s.io/client-go/kubernetes"
33+ "k8s.io/client-go/rest"
3134 e2e "k8s.io/kubernetes/test/e2e/framework"
3235
3336 "github.com/openshift/origin/pkg/clioptions/clusterdiscovery"
@@ -349,14 +352,37 @@ func (o *GinkgoRunSuiteOptions) Run(suite *TestSuite, clusterConfig *clusterdisc
349352 }
350353 }
351354
352- parallelism := o .Parallelism
353- if parallelism == 0 {
354- parallelism = suite .Parallelism
355+ // start with suite value
356+ parallelism := suite .Parallelism
357+ logrus .Infof ("Suite defined parallelism %d" , parallelism )
358+
359+ // adjust based on the number of workers
360+ totalNodes , workerNodes , err := getClusterNodeCounts (ctx , restConfig )
361+ if err != nil {
362+ logrus .Errorf ("Failed to get cluster node counts: %v" , err )
363+ } else {
364+ // default to 10 concurrent tests per worker but use the min of that
365+ // and the current parallelism value
366+ if workerNodes > 0 {
367+ workerParallelism := 10 * workerNodes
368+ logrus .Infof ("Parallelism based on worker node count: %d" , workerParallelism )
369+ parallelism = min (parallelism , workerParallelism )
370+ }
355371 }
372+
373+ // if 0 set our min value
356374 if parallelism == 0 {
357375 parallelism = 10
358376 }
359377
378+ // if explicitly set then use the specified value
379+ if o .Parallelism > 0 {
380+ parallelism = o .Parallelism
381+ logrus .Infof ("Using specified parallelism value: %d" , parallelism )
382+ }
383+
384+ logrus .Infof ("Total nodes: %d, Worker nodes: %d, Parallelism: %d" , totalNodes , workerNodes , parallelism )
385+
360386 ctx , cancelFn := context .WithCancel (context .Background ())
361387 defer cancelFn ()
362388 abortCh := make (chan os.Signal , 2 )
@@ -751,7 +777,7 @@ func (o *GinkgoRunSuiteOptions) Run(suite *TestSuite, clusterConfig *clusterdisc
751777 fmt .Fprintf (o .Out , "error: Unable to write e2e job run failures summary: %v" , err )
752778 }
753779
754- writeRunSuiteOptions (randSeed , monitorTestInfo , o .JUnitDir , timeSuffix )
780+ writeRunSuiteOptions (randSeed , totalNodes , workerNodes , parallelism , monitorTestInfo , o .JUnitDir , timeSuffix )
755781 }
756782
757783 switch {
@@ -783,15 +809,17 @@ func isBlockingFailure(test *testCase) bool {
783809 }
784810}
785811
786- func writeRunSuiteOptions (seed int64 , info monitortestframework.MonitorTestInitializationInfo , artifactDir , timeSuffix string ) {
812+ func writeRunSuiteOptions (seed int64 , totalNodes , workerNodes , parallelism int , info monitortestframework.MonitorTestInitializationInfo , artifactDir , timeSuffix string ) {
787813 var rows []map [string ]string
788814
789815 rows = make ([]map [string ]string , 0 )
790- rows = append (rows , map [string ]string {"RandomSeed" : fmt .Sprintf ("%d" , seed ), "ClusterStability" : string (info .ClusterStabilityDuringTest )})
816+ rows = append (rows , map [string ]string {"RandomSeed" : fmt .Sprintf ("%d" , seed ), "ClusterStability" : string (info .ClusterStabilityDuringTest ),
817+ "WorkerNodes" : fmt .Sprintf ("%d" , workerNodes ), "TotalNodes" : fmt .Sprintf ("%d" , totalNodes ), "Parallelism" : fmt .Sprintf ("%d" , parallelism )})
791818 dataFile := dataloader.DataFile {
792819 TableName : "run_suite_options" ,
793- Schema : map [string ]dataloader.DataType {"ClusterStability" : dataloader .DataTypeString , "Seed" : dataloader .DataTypeInteger },
794- Rows : rows ,
820+ Schema : map [string ]dataloader.DataType {"ClusterStability" : dataloader .DataTypeString , "RandomSeed" : dataloader .DataTypeInteger , "WorkerNodes" : dataloader .DataTypeInteger ,
821+ "TotalNodes" : dataloader .DataTypeInteger , "Parallelism" : dataloader .DataTypeInteger },
822+ Rows : rows ,
795823 }
796824 fileName := filepath .Join (artifactDir , fmt .Sprintf ("run-suite-options%s-%s" , timeSuffix , dataloader .AutoDataLoaderSuffix ))
797825 err := dataloader .WriteDataFile (fileName , dataFile )
@@ -941,3 +969,31 @@ func determineExternalConnectivity(clusterConfig *clusterdiscovery.ClusterConfig
941969 }
942970 return "Direct"
943971}
972+
973+ func getClusterNodeCounts (ctx context.Context , config * rest.Config ) (int , int , error ) {
974+ kubeClient , err := kubernetes .NewForConfig (config )
975+ if err != nil {
976+ return 0 , 0 , err
977+ }
978+
979+ totalNodes := 0
980+ workerNodes := 0
981+
982+ nodes , err := kubeClient .CoreV1 ().Nodes ().List (ctx , metav1.ListOptions {LabelSelector : "node-role.kubernetes.io/worker" })
983+ if err != nil {
984+ return 0 , 0 , err
985+ }
986+
987+ workerNodes = len (nodes .Items )
988+ logrus .Infof ("Found %d worker nodes" , workerNodes )
989+
990+ nodes , err = kubeClient .CoreV1 ().Nodes ().List (ctx , metav1.ListOptions {})
991+ if err != nil {
992+ return 0 , 0 , err
993+ }
994+
995+ totalNodes = len (nodes .Items )
996+ logrus .Infof ("Found %d nodes" , totalNodes )
997+
998+ return totalNodes , workerNodes , nil
999+ }
0 commit comments