Skip to content

Commit 1819d31

Browse files
authored
Concurrent Core
1 parent 5a2ed2f commit 1819d31

File tree

136 files changed

+24837
-6205
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

136 files changed

+24837
-6205
lines changed

BUILD.md

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -134,14 +134,6 @@ Path to buildkitd config file for docker buildx. Used to configure buildx contex
134134
registry mirrors. See docker (https://docs.docker.com/engine/reference/commandline/buildx_create/#config) and buildkit
135135
(https://github.com/moby/buildkit/blob/master/docs/buildkitd.toml.md) docs for more options and examples.
136136

137-
`DEFAULT_AUTOSUPPORT_IMAGE`
138-
139-
Override the default asup image in tridentctl and operator
140-
141-
`DEFAULT_ACP_IMAGE`
142-
143-
Override the default acp image in tridentctl and operator
144-
145137
Example file:
146138
```shell
147139
# insecure, local/private registry, set $PRIVATE_REGISTRY to appropriate value
@@ -154,6 +146,14 @@ Example file:
154146
mirrors = ["$MIRROR"]
155147
```
156148

149+
`DEFAULT_AUTOSUPPORT_IMAGE`
150+
151+
Override the default asup image in tridentctl and operator
152+
153+
`DEFAULT_ACP_IMAGE`
154+
155+
Override the default acp image in tridentctl and operator
156+
157157
`TRIDENT_IMAGE`
158158

159159
Default: `trident`

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -454,7 +454,7 @@ k8s_codegen_operator:
454454
@mv $(OPERATOR_KUBERNETES_PKG)/apis/netapp/v1/zz_generated.deepcopy.go ./operator/crd/apis/netapp/v1/
455455

456456
mocks:
457-
@go install go.uber.org/mock/mockgen@v0.4.0
457+
@go install go.uber.org/mock/mockgen@v0.5.2
458458
@go generate ./...
459459

460460
.git/hooks:

cli/cmd/install.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ var (
129129
iscsiSelfHealingWaitTime time.Duration
130130
k8sAPIQPS int
131131
fsGroupPolicy string
132+
enableConcurrency bool
132133

133134
// CLI-based K8S client
134135
client k8sclient.KubernetesClient
@@ -256,6 +257,7 @@ func init() {
256257
"with the Kubernetes API server. The Burst value is automatically set as a function of the QPS value.")
257258
installCmd.Flags().StringVar(&fsGroupPolicy, "fs-group-policy", "", "The FSGroupPolicy "+
258259
"to set on Trident's CSIDriver resource.")
260+
installCmd.Flags().BoolVar(&enableConcurrency, "enable-concurrency", false, "Enable concurrency for Trident's controller **TECH PREVIEW**")
259261

260262
if err := installCmd.Flags().MarkHidden("skip-k8s-version-check"); err != nil {
261263
_, _ = fmt.Fprintln(os.Stderr, err)
@@ -692,6 +694,7 @@ func prepareYAMLFiles() error {
692694
CloudProvider: cloudProvider,
693695
IdentityLabel: identityLabel,
694696
K8sAPIQPS: k8sAPIQPS,
697+
EnableConcurrency: enableConcurrency,
695698
}
696699
deploymentYAML := k8sclient.GetCSIDeploymentYAML(deploymentArgs)
697700
if err = writeFile(deploymentPath, deploymentYAML); err != nil {
@@ -1054,6 +1057,7 @@ func installTrident() (returnError error) {
10541057
CloudProvider: cloudProvider,
10551058
IdentityLabel: identityLabel,
10561059
K8sAPIQPS: k8sAPIQPS,
1060+
EnableConcurrency: enableConcurrency,
10571061
}
10581062
returnError = client.CreateObjectByYAML(
10591063
k8sclient.GetCSIDeploymentYAML(deploymentArgs))

cli/k8s_client/types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ type DeploymentYAMLArguments struct {
172172
CloudProvider string `json:"cloudProvider"`
173173
IdentityLabel bool `json:"identityLabel"`
174174
K8sAPIQPS int `json:"k8sAPIQPS"`
175+
EnableConcurrency bool `json:"enableConcurrency"`
175176
}
176177

177178
type DaemonsetYAMLArguments struct {

cli/k8s_client/yaml_factory.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,21 @@ func getCSIDeploymentAutosupportYAML(args *DeploymentYAMLArguments) string {
423423
return ""
424424
}
425425

426+
sidecarImages := []struct {
427+
arg *string
428+
tag string
429+
}{
430+
{&args.CSISidecarProvisionerImage, commonconfig.CSISidecarProvisionerImageTag},
431+
{&args.CSISidecarAttacherImage, commonconfig.CSISidecarAttacherImageTag},
432+
{&args.CSISidecarResizerImage, commonconfig.CSISidecarResizerImageTag},
433+
{&args.CSISidecarSnapshotterImage, commonconfig.CSISidecarSnapshotterImageTag},
434+
}
435+
for _, image := range sidecarImages {
436+
if *image.arg == "" {
437+
*image.arg = args.ImageRegistry + "/" + image.tag
438+
}
439+
}
440+
426441
if args.AutosupportImage == "" {
427442
args.AutosupportImage = commonconfig.DefaultAutosupportImage
428443
}
@@ -603,6 +618,7 @@ func GetCSIDeploymentYAML(args *DeploymentYAMLArguments) string {
603618
deploymentYAML = strings.ReplaceAll(deploymentYAML, "{ENABLE_ACP}", enableACP)
604619
deploymentYAML = strings.ReplaceAll(deploymentYAML, "{K8S_API_CLIENT_TRIDENT_THROTTLE}", K8sAPITridentThrottle)
605620
deploymentYAML = strings.ReplaceAll(deploymentYAML, "{K8S_API_CLIENT_SIDECAR_THROTTLE}", K8sAPISidecarThrottle)
621+
deploymentYAML = strings.ReplaceAll(deploymentYAML, "{ENABLE_CONCURRENCY}", strconv.FormatBool(args.EnableConcurrency))
606622

607623
// Log before secrets are inserted into YAML.
608624
Log().WithField("yaml", deploymentYAML).Trace("CSI Deployment YAML.")
@@ -665,6 +681,7 @@ spec:
665681
- "--address={IP_LOCALHOST}"
666682
- "--http_request_timeout={HTTP_REQUEST_TIMEOUT}"
667683
- "--enable_force_detach={ENABLE_FORCE_DETACH}"
684+
- "--enable_concurrency={ENABLE_CONCURRENCY}"
668685
- "--metrics"
669686
{ENABLE_ACP}
670687
{DEBUG}
@@ -716,6 +733,7 @@ spec:
716733
- "--csi-address=$(ADDRESS)"
717734
- "--retry-interval-start=8s"
718735
- "--retry-interval-max=30s"
736+
- "--worker-threads=5"
719737
{K8S_API_CLIENT_SIDECAR_THROTTLE}
720738
env:
721739
- name: ADDRESS
@@ -734,6 +752,7 @@ spec:
734752
- "--v={SIDECAR_LOG_LEVEL}"
735753
- "--timeout=60s"
736754
- "--retry-interval-start=10s"
755+
- "--worker-threads=10"
737756
- "--csi-address=$(ADDRESS)"
738757
{K8S_API_CLIENT_SIDECAR_THROTTLE}
739758
env:
@@ -752,6 +771,7 @@ spec:
752771
args:
753772
- "--v={SIDECAR_LOG_LEVEL}"
754773
- "--timeout=300s"
774+
- "--workers=10"
755775
- "--csi-address=$(ADDRESS)"
756776
{K8S_API_CLIENT_SIDECAR_THROTTLE}
757777
env:
@@ -770,6 +790,7 @@ spec:
770790
args:
771791
- "--v={SIDECAR_LOG_LEVEL}"
772792
- "--timeout=300s"
793+
- "--worker-threads=10"
773794
- "--csi-address=$(ADDRESS)"
774795
- "--feature-gates=CSIVolumeGroupSnapshot=true"
775796
{K8S_API_CLIENT_SIDECAR_THROTTLE}

core/common.go

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
// Copyright 2025 NetApp, Inc. All Rights Reserved.
2+
3+
package core
4+
5+
import (
6+
"context"
7+
"fmt"
8+
"os"
9+
"time"
10+
11+
"github.com/netapp/trident/config"
12+
. "github.com/netapp/trident/logging"
13+
"github.com/netapp/trident/storage"
14+
"github.com/netapp/trident/utils/models"
15+
)
16+
17+
// recordTiming is used to record in Prometheus the total time taken for an operation as follows:
18+
//
19+
// defer recordTiming("backend_add")()
20+
//
21+
// see also: https://play.golang.org/p/6xRXlhFdqBd
22+
func recordTiming(operation string, err *error) func() {
23+
startTime := time.Now()
24+
return func() {
25+
endTime := time.Since(startTime)
26+
endTimeMS := float64(endTime.Milliseconds())
27+
success := "true"
28+
if *err != nil {
29+
success = "false"
30+
}
31+
operationDurationInMsSummary.WithLabelValues(operation, success).Observe(endTimeMS)
32+
}
33+
}
34+
35+
func recordTransactionTiming(txn *storage.VolumeTransaction, err *error) {
36+
if txn == nil || txn.VolumeCreatingConfig == nil {
37+
// for unit tests, there will be no txn to record
38+
return
39+
}
40+
41+
operation := "transaction_volume_finish"
42+
43+
startTime := txn.VolumeCreatingConfig.StartTime
44+
endTime := time.Since(startTime)
45+
endTimeMS := float64(endTime.Milliseconds())
46+
success := "true"
47+
if *err != nil {
48+
success = "false"
49+
}
50+
operationDurationInMsSummary.WithLabelValues(operation, success).Observe(endTimeMS)
51+
}
52+
53+
// getProtocol returns the appropriate protocol based on a specified volume mode, access mode and protocol, or
54+
// an error if the two settings are incompatible.
55+
// NOTE: We do not support raw block volumes with NFS protocol.
56+
func getProtocol(
57+
ctx context.Context, volumeMode config.VolumeMode, accessMode config.AccessMode, protocol config.Protocol,
58+
) (config.Protocol, error) {
59+
Logc(ctx).WithFields(LogFields{
60+
"volumeMode": volumeMode,
61+
"accessMode": accessMode,
62+
"protocol": protocol,
63+
}).Debug("Orchestrator#getProtocol")
64+
65+
resultProtocol := protocol
66+
var err error = nil
67+
68+
defer Logc(ctx).WithFields(LogFields{
69+
"resultProtocol": resultProtocol,
70+
"err": err,
71+
}).Debug("Orchestrator#getProtocol")
72+
73+
type accessVariables struct {
74+
volumeMode config.VolumeMode
75+
accessMode config.AccessMode
76+
protocol config.Protocol
77+
}
78+
type protocolResult struct {
79+
protocol config.Protocol
80+
err error
81+
}
82+
83+
err = fmt.Errorf("incompatible volume mode (%s), access mode (%s) and protocol (%s)", volumeMode, accessMode,
84+
protocol)
85+
86+
protocolTable := map[accessVariables]protocolResult{
87+
{config.Filesystem, config.ModeAny, config.ProtocolAny}: {config.ProtocolAny, nil},
88+
{config.Filesystem, config.ModeAny, config.File}: {config.File, nil},
89+
{config.Filesystem, config.ModeAny, config.Block}: {config.Block, nil},
90+
91+
{config.Filesystem, config.ReadWriteOnce, config.ProtocolAny}: {config.ProtocolAny, nil},
92+
{config.Filesystem, config.ReadWriteOnce, config.File}: {config.File, nil},
93+
{config.Filesystem, config.ReadWriteOnce, config.Block}: {config.Block, nil},
94+
95+
{config.Filesystem, config.ReadWriteOncePod, config.ProtocolAny}: {config.ProtocolAny, nil},
96+
{config.Filesystem, config.ReadWriteOncePod, config.File}: {config.File, nil},
97+
{config.Filesystem, config.ReadWriteOncePod, config.Block}: {config.Block, nil},
98+
99+
{config.Filesystem, config.ReadOnlyMany, config.ProtocolAny}: {config.ProtocolAny, nil},
100+
{config.Filesystem, config.ReadOnlyMany, config.File}: {config.File, nil},
101+
{config.Filesystem, config.ReadOnlyMany, config.Block}: {config.Block, nil},
102+
103+
{config.Filesystem, config.ReadWriteMany, config.ProtocolAny}: {config.File, nil},
104+
{config.Filesystem, config.ReadWriteMany, config.File}: {config.File, nil},
105+
{config.Filesystem, config.ReadWriteMany, config.Block}: {config.ProtocolAny, err},
106+
107+
{config.RawBlock, config.ModeAny, config.ProtocolAny}: {config.Block, nil},
108+
{config.RawBlock, config.ModeAny, config.File}: {config.ProtocolAny, err},
109+
{config.RawBlock, config.ModeAny, config.Block}: {config.Block, nil},
110+
111+
{config.RawBlock, config.ReadWriteOnce, config.ProtocolAny}: {config.Block, nil},
112+
{config.RawBlock, config.ReadWriteOnce, config.File}: {config.ProtocolAny, err},
113+
{config.RawBlock, config.ReadWriteOnce, config.Block}: {config.Block, nil},
114+
115+
{config.RawBlock, config.ReadWriteOncePod, config.ProtocolAny}: {config.Block, nil},
116+
{config.RawBlock, config.ReadWriteOncePod, config.File}: {config.ProtocolAny, err},
117+
{config.RawBlock, config.ReadWriteOncePod, config.Block}: {config.Block, nil},
118+
119+
{config.RawBlock, config.ReadOnlyMany, config.ProtocolAny}: {config.Block, nil},
120+
{config.RawBlock, config.ReadOnlyMany, config.File}: {config.ProtocolAny, err},
121+
{config.RawBlock, config.ReadOnlyMany, config.Block}: {config.Block, nil},
122+
123+
{config.RawBlock, config.ReadWriteMany, config.ProtocolAny}: {config.Block, nil},
124+
{config.RawBlock, config.ReadWriteMany, config.File}: {config.ProtocolAny, err},
125+
{config.RawBlock, config.ReadWriteMany, config.Block}: {config.Block, nil},
126+
}
127+
128+
res, isValid := protocolTable[accessVariables{volumeMode, accessMode, protocol}]
129+
130+
if !isValid {
131+
return config.ProtocolAny, fmt.Errorf("invalid volume mode (%s), access mode (%s) or protocol (%s)", volumeMode,
132+
accessMode, protocol)
133+
}
134+
135+
return res.protocol, res.err
136+
}
137+
138+
func generateVolumePublication(volName string, publishInfo *models.VolumePublishInfo) *models.VolumePublication {
139+
vp := &models.VolumePublication{
140+
Name: models.GenerateVolumePublishName(volName, publishInfo.HostName),
141+
VolumeName: volName,
142+
NodeName: publishInfo.HostName,
143+
ReadOnly: publishInfo.ReadOnly,
144+
AccessMode: publishInfo.AccessMode,
145+
}
146+
147+
return vp
148+
}
149+
150+
// isDockerPluginMode returns true if the ENV variable config.DockerPluginModeEnvVariable is set
151+
func isDockerPluginMode() bool {
152+
return os.Getenv(config.DockerPluginModeEnvVariable) != ""
153+
}
154+
155+
func isCRDContext(ctx context.Context) bool {
156+
ctxSource := ctx.Value(ContextKeyRequestSource)
157+
return ctxSource != nil && ctxSource == ContextSourceCRD
158+
}
159+
160+
func isPeriodicContext(ctx context.Context) bool {
161+
ctxSource := ctx.Value(ContextKeyRequestSource)
162+
return ctxSource != nil && ctxSource == ContextSourcePeriodic
163+
}

core/common_test.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
// Copyright 2025 NetApp, Inc. All Rights Reserved.
2+
3+
package core
4+
5+
import (
6+
"testing"
7+
8+
"github.com/stretchr/testify/assert"
9+
10+
"github.com/netapp/trident/config"
11+
)
12+
13+
func TestGetProtocol(t *testing.T) {
14+
type accessVariables struct {
15+
volumeMode config.VolumeMode
16+
accessMode config.AccessMode
17+
protocol config.Protocol
18+
expected config.Protocol
19+
}
20+
21+
accessModesPositiveTests := []accessVariables{
22+
// This is the complete set of permutations. Negative rows are commented out.
23+
{config.Filesystem, config.ModeAny, config.ProtocolAny, config.ProtocolAny},
24+
{config.Filesystem, config.ModeAny, config.File, config.File},
25+
{config.Filesystem, config.ModeAny, config.Block, config.Block},
26+
{config.Filesystem, config.ReadWriteOnce, config.ProtocolAny, config.ProtocolAny},
27+
{config.Filesystem, config.ReadWriteOnce, config.File, config.File},
28+
{config.Filesystem, config.ReadWriteOnce, config.Block, config.Block},
29+
{config.Filesystem, config.ReadWriteOncePod, config.ProtocolAny, config.ProtocolAny},
30+
{config.Filesystem, config.ReadWriteOncePod, config.File, config.File},
31+
{config.Filesystem, config.ReadWriteOncePod, config.Block, config.Block},
32+
{config.Filesystem, config.ReadOnlyMany, config.Block, config.Block},
33+
{config.Filesystem, config.ReadOnlyMany, config.ProtocolAny, config.ProtocolAny},
34+
{config.Filesystem, config.ReadOnlyMany, config.File, config.File},
35+
{config.Filesystem, config.ReadWriteMany, config.ProtocolAny, config.File},
36+
{config.Filesystem, config.ReadWriteMany, config.File, config.File},
37+
// {config.Filesystem, config.ReadWriteMany, config.Block, config.ProtocolAny},
38+
{config.RawBlock, config.ModeAny, config.ProtocolAny, config.Block},
39+
// {config.RawBlock, config.ModeAny, config.File, config.ProtocolAny},
40+
{config.RawBlock, config.ModeAny, config.Block, config.Block},
41+
{config.RawBlock, config.ReadWriteOnce, config.ProtocolAny, config.Block},
42+
// {config.RawBlock, config.ReadWriteOnce, config.File, config.ProtocolAny},
43+
// {config.RawBlock, config.ReadWriteOncePod, config.File, config.ProtocolAny},
44+
{config.RawBlock, config.ReadWriteOnce, config.Block, config.Block},
45+
{config.RawBlock, config.ReadWriteOncePod, config.Block, config.Block},
46+
{config.RawBlock, config.ReadOnlyMany, config.ProtocolAny, config.Block},
47+
// {config.RawBlock, config.ReadOnlyMany, config.File, config.ProtocolAny},
48+
{config.RawBlock, config.ReadOnlyMany, config.Block, config.Block},
49+
{config.RawBlock, config.ReadWriteMany, config.ProtocolAny, config.Block},
50+
// {config.RawBlock, config.ReadWriteMany, config.File, config.ProtocolAny},
51+
{config.RawBlock, config.ReadWriteMany, config.Block, config.Block},
52+
}
53+
54+
accessModesNegativeTests := []accessVariables{
55+
{config.Filesystem, config.ReadWriteMany, config.Block, config.ProtocolAny},
56+
{config.RawBlock, config.ModeAny, config.File, config.ProtocolAny},
57+
{config.RawBlock, config.ReadWriteOnce, config.File, config.ProtocolAny},
58+
{config.RawBlock, config.ReadWriteOncePod, config.File, config.ProtocolAny},
59+
60+
{config.RawBlock, config.ReadOnlyMany, config.File, config.ProtocolAny},
61+
{config.RawBlock, config.ReadWriteMany, config.File, config.ProtocolAny},
62+
}
63+
64+
for _, tc := range accessModesPositiveTests {
65+
protocolLocal, err := getProtocol(coreCtx, tc.volumeMode, tc.accessMode, tc.protocol)
66+
assert.Nil(t, err, nil)
67+
assert.Equal(t, tc.expected, protocolLocal, "expected both the protocols to be equal!")
68+
}
69+
70+
for _, tc := range accessModesNegativeTests {
71+
protocolLocal, err := getProtocol(coreCtx, tc.volumeMode, tc.accessMode, tc.protocol)
72+
assert.NotNil(t, err)
73+
assert.Equal(t, tc.expected, protocolLocal, "expected both the protocols to be equal!")
74+
}
75+
}

0 commit comments

Comments
 (0)