Skip to content

Commit 42b3a9d

Browse files
author
Evsyukov Denis
committed
feat: add batching integration for module operations
Signed-off-by: Evsyukov Denis <denis.evsyukov@flant.com>
1 parent 7fa47ea commit 42b3a9d

File tree

4 files changed

+40
-0
lines changed

4 files changed

+40
-0
lines changed

pkg/addon-operator/bootstrap.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/flant/addon-operator/pkg/kube_config_manager"
1111
"github.com/flant/addon-operator/pkg/kube_config_manager/backend"
1212
"github.com/flant/addon-operator/pkg/module_manager"
13+
"github.com/flant/addon-operator/pkg/task/batch"
1314
taskservice "github.com/flant/addon-operator/pkg/task/service"
1415
shapp "github.com/flant/shell-operator/pkg/app"
1516
"github.com/flant/shell-operator/pkg/debug"
@@ -46,6 +47,17 @@ func (op *AddonOperator) bootstrap() error {
4647
return fmt.Errorf("assemble Debug server: %w", err)
4748
}
4849

50+
// Initialize batching integration for module operations if enabled
51+
var batchingIntegration *batch.BatchingIntegration
52+
if app.BatchingEnabled {
53+
batchingIntegration = batch.NewBatchingIntegration(
54+
op.ModuleManager,
55+
nil, // Task queue service will be set later if needed
56+
op.parallelTaskChannels,
57+
op.Logger,
58+
)
59+
}
60+
4961
cfg := &taskservice.TaskHandlerServiceConfig{
5062
Engine: op.engine,
5163
ParallelTaskChannels: op.parallelTaskChannels,
@@ -56,9 +68,13 @@ func (op *AddonOperator) bootstrap() error {
5668
KubeConfigManager: op.KubeConfigManager,
5769
ConvergeState: op.ConvergeState,
5870
CRDExtraLabels: op.CRDExtraLabels,
71+
BatchingIntegration: batchingIntegration,
5972
}
6073

6174
op.TaskService = taskservice.NewTaskHandlerService(op.ctx, cfg, op.Logger)
75+
76+
// Store reference to batching integration
77+
op.BatchingIntegration = batchingIntegration
6278

6379
return nil
6480
}

pkg/addon-operator/operator.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/flant/addon-operator/pkg/module_manager/models/hooks/kind"
2626
"github.com/flant/addon-operator/pkg/module_manager/models/modules/events"
2727
"github.com/flant/addon-operator/pkg/task"
28+
"github.com/flant/addon-operator/pkg/task/batch"
2829
paralleltask "github.com/flant/addon-operator/pkg/task/parallel"
2930
taskservice "github.com/flant/addon-operator/pkg/task/service"
3031
"github.com/flant/addon-operator/pkg/utils"
@@ -45,6 +46,9 @@ const (
4546
LabelHeritage string = "heritage"
4647
)
4748

49+
// BatchingIntegration is a type alias for the batch integration
50+
type BatchingIntegration = batch.BatchingIntegration
51+
4852
// AddonOperator extends ShellOperator with modules and global hooks
4953
// and with a value storage.
5054
type AddonOperator struct {
@@ -94,6 +98,9 @@ type AddonOperator struct {
9498
ConvergeState *converge.ConvergeState
9599

96100
TaskService *taskservice.TaskHandlerService
101+
102+
// BatchingIntegration provides batching capabilities for module operations
103+
BatchingIntegration *BatchingIntegration
97104
}
98105

99106
type Option func(operator *AddonOperator)
@@ -286,6 +293,11 @@ func (op *AddonOperator) Start(ctx context.Context) error {
286293
op.ModuleManager.Start()
287294
op.StartModuleManagerEventHandler()
288295

296+
// Start batching integration for module operations
297+
if op.BatchingIntegration != nil {
298+
op.BatchingIntegration.Start(ctx)
299+
}
300+
289301
return nil
290302
}
291303

pkg/app/app.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@ var (
5656
NumberOfParallelQueues = 20
5757
ParallelQueuePrefix = "parallel_queue"
5858
ParallelQueueNamePattern = ParallelQueuePrefix + "_%d"
59+
60+
// BatchingEnabled enables batching of module operations for improved performance
61+
BatchingEnabled = false
5962
)
6063

6164
const (
@@ -172,6 +175,11 @@ func DefineStartCommandFlags(kpApp *kingpin.Application, cmd *kingpin.CmdClause)
172175
Default("").
173176
StringVar(&ShellChrootDir)
174177

178+
cmd.Flag("batching-enabled", "Enable batching of module operations for improved performance.").
179+
Envar("ADDON_OPERATOR_BATCHING_ENABLED").
180+
Default("false").
181+
BoolVar(&BatchingEnabled)
182+
175183
shapp.DefineKubeClientFlags(cmd)
176184
shapp.DefineLoggingFlags(cmd)
177185

pkg/task/service/service.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ type TaskHandlerServiceConfig struct {
4848
KubeConfigManager *kube_config_manager.KubeConfigManager
4949
ConvergeState *converge.ConvergeState
5050
CRDExtraLabels map[string]string
51+
BatchingIntegration interface{} // Interface to avoid circular import
5152
}
5253

5354
type TaskHandlerService struct {
@@ -84,6 +85,8 @@ type TaskHandlerService struct {
8485

8586
taskFactory map[sh_task.TaskType]func(t sh_task.Task, logger *log.Logger) task.Task
8687

88+
batchingIntegration interface{} // BatchingIntegration for module operations
89+
8790
logger *log.Logger
8891
}
8992

@@ -101,6 +104,7 @@ func NewTaskHandlerService(ctx context.Context, config *TaskHandlerServiceConfig
101104
convergeState: config.ConvergeState,
102105
crdExtraLabels: config.CRDExtraLabels,
103106
discoveredCRDs: discovercrds.NewDiscoveredGVKs(),
107+
batchingIntegration: config.BatchingIntegration,
104108
logger: logger,
105109
}
106110

0 commit comments

Comments
 (0)