Skip to content

Conversation

@agam-99
Copy link
Contributor

@agam-99 agam-99 commented Dec 15, 2025

Dear Gobblin maintainers,

Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!

JIRA

Description

This PR introduces a separate memory configuration and dedicated task queue for the work execution phase in Temporal workflows to support better scalability and resource utilization.

Key Changes:

  • Created dedicated execution task queue (EXECUTION_TASK_QUEUE) to isolate work execution activities from work discovery and commit phases
  • Added ExecutionWorker to handle execution-stage activities independently
  • Introduced WorkflowStage enum for stage-specific task queue routing
  • When dynamic scaling is enabled, NestingExecWorkflow (work execution) routes to the execution queue while other stages use the default queue

Configuration:

  • GobblinTemporalConfigurationKeys.EXECUTION_TASK_QUEUE - Execution task queue name (default: "GobblinTemporalExecutionTaskQueue")
  • GobblinTemporalConfigurationKeys.EXECUTION_WORKER_MEMORY_MBS - Memory allocation for execution workers

Benefits:

  • Independent scaling for work execution vs. discovery/commit phases
  • More efficient compute utilization

Tests

  • My PR adds relevant unit tests for the changes made.
    All tests use reflection and mocking to test actual method behavior without requiring full Temporal infrastructure.

Commits

  • My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

@agam-99 agam-99 force-pushed the feat/dynamic-stage-scaling branch from 1196a45 to 82b4dc0 Compare December 16, 2025 06:15
@codecov-commenter
Copy link

codecov-commenter commented Dec 16, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 51.71%. Comparing base (6619cb1) to head (e833981).
⚠️ Report is 1 commits behind head on master.

Additional details and impacted files
@@             Coverage Diff              @@
##             master    #4159      +/-   ##
============================================
+ Coverage     49.12%   51.71%   +2.59%     
+ Complexity    10253     7751    -2502     
============================================
  Files          1924     1411     -513     
  Lines         75350    53504   -21846     
  Branches       8361     5882    -2479     
============================================
- Hits          37012    27670    -9342     
+ Misses        35039    23492   -11547     
+ Partials       3299     2342     -957     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces independent dynamic scaling for different workflow stages in Gobblin's Temporal integration by enabling separate memory configuration and dedicated task queues for work execution activities.

Key Changes:

  • Created dedicated execution task queue and ExecutionWorker class to isolate work execution from discovery/commit phases
  • Added WorkflowStage enum for stage-specific task queue routing with configurable memory allocation
  • Modified ProcessWorkUnitsWorkflowImpl to route NestingExecWorkflow to execution queue when dynamic scaling is enabled

Reviewed changes

Copilot reviewed 13 out of 14 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
GobblinTemporalConfigurationKeys.java Added execution worker class constant, execution task queue configuration keys, and stage-specific memory configuration
ExecutionWorker.java New specialized worker for work execution stage with dedicated task queue and concurrency settings
WorkflowStage.java New enum defining workflow stages (WORK_DISCOVERY, WORK_EXECUTION, COMMIT) with stage-specific task queue routing
AbstractRecommendScalingForWorkUnitsImpl.java Enhanced to create profile overlays with ExecutionWorker class and optional memory configuration for scaled containers
ProcessWorkUnitsWorkflowImpl.java Modified to route child workflows to appropriate task queues based on dynamic scaling configuration
GobblinTemporalTaskRunner.java Added initialization logic to start ExecutionWorker in initial container when dynamic scaling is enabled
AbstractTemporalWorker.java Refactored to make config field protected and extracted getTaskQueue() method for subclass customization
ActivityType.java Added overloaded buildActivityOptions() method accepting taskQueue parameter
Test files Comprehensive unit tests for workflow routing, worker configuration, and scaling behavior
application.conf Minor whitespace cleanup

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

GobblinTemporalConfigurationKeys.EXECUTION_WORKER_CLASS
));

return overlayPairs.isEmpty() ? ProfileOverlay.unchanged() : new ProfileOverlay.Adding(overlayPairs);
Copy link

Copilot AI Dec 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check for overlayPairs.isEmpty() on line 100 will always be false because the ExecutionWorker class is unconditionally added to overlayPairs on line 95. This means ProfileOverlay.unchanged() will never be returned. Consider removing this check or restructuring the logic since overlayPairs will always contain at least one element.

Suggested change
return overlayPairs.isEmpty() ? ProfileOverlay.unchanged() : new ProfileOverlay.Adding(overlayPairs);
return new ProfileOverlay.Adding(overlayPairs);

Copilot uses AI. Check for mistakes.

// Verify
Assert.assertEquals(taskQueue, GobblinTemporalConfigurationKeys.DEFAULT_GOBBLIN_TEMPORAL_TASK_QUEUE,
"WORK_COMMIT should use default task queue");
Copy link

Copilot AI Dec 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment mentions "WORK_COMMIT" but the actual enum value being tested is "COMMIT". Update the comment to match the enum value for consistency.

Copilot uses AI. Check for mistakes.
Comment on lines 75 to 87
public ActivityOptions buildActivityOptions(Properties props, boolean setHeartbeatTimeout, String taskQueue) {
ActivityOptions.Builder builder = ActivityOptions.newBuilder()
.setStartToCloseTimeout(getStartToCloseTimeout(props))
.setRetryOptions(buildRetryOptions(props))
.setTaskQueue(taskQueue);

if (setHeartbeatTimeout) {
builder.setHeartbeatTimeout(getHeartbeatTimeout(props));
}

return builder.build();
}

Copy link

Copilot AI Dec 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The newly added buildActivityOptions method with taskQueue parameter is not used anywhere in the codebase. If this method is intended for future use, consider documenting this in a code comment. Otherwise, consider removing it to avoid maintaining unused code. Alternatively, if this should be used for routing activities to the execution queue when dynamic scaling is enabled, the implementation may be incomplete.

Suggested change
public ActivityOptions buildActivityOptions(Properties props, boolean setHeartbeatTimeout, String taskQueue) {
ActivityOptions.Builder builder = ActivityOptions.newBuilder()
.setStartToCloseTimeout(getStartToCloseTimeout(props))
.setRetryOptions(buildRetryOptions(props))
.setTaskQueue(taskQueue);
if (setHeartbeatTimeout) {
builder.setHeartbeatTimeout(getHeartbeatTimeout(props));
}
return builder.build();
}

Copilot uses AI. Check for mistakes.
*/
public class ExecutionWorker extends AbstractTemporalWorker {
public static final long DEADLOCK_DETECTION_TIMEOUT_SECONDS = 120;
public int maxExecutionConcurrency;
Copy link

Copilot AI Dec 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The field maxExecutionConcurrency should be declared as private or have documentation explaining why it needs package-private visibility. Consider making it private if external access is not required, or document the reason for package-private visibility if it's intentional for testing purposes.

Suggested change
public int maxExecutionConcurrency;
private int maxExecutionConcurrency;

Copilot uses AI. Check for mistakes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

double DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_BACKOFF_COEFFICIENT = 2;
String TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS = TEMPORAL_ACTIVITY_RETRY_OPTIONS + "maximum.attempts";
int DEFAULT_TEMPORAL_ACTIVITY_RETRY_OPTIONS_MAXIMUM_ATTEMPTS = 4;

Copy link

Copilot AI Dec 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a documentation comment for the WORK_EXECUTION_MEMORY_MB configuration key to explain its purpose and usage, similar to other configuration keys in this interface. The comment should clarify that this is the memory allocation in megabytes for execution worker containers when dynamic scaling is enabled.

Suggested change
/**
* Memory allocation (in megabytes) for execution worker containers when dynamic scaling is enabled.
* This value determines the amount of memory assigned to each worker container during execution.
*/

Copilot uses AI. Check for mistakes.
# job history store ( WARN [GobblinYarnAppLauncher] NOT starting the admin UI because the job execution info server is NOT enabled )
job.execinfo.server.enabled=false
job.history.store.enabled=false
job.history.store.enabled=false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove the whitespace change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated


public ExecutionWorker(Config config, WorkflowClient workflowClient) {
super(config, workflowClient);
this.maxExecutionConcurrency = ConfigUtils.getInt(config, GobblinTemporalConfigurationKeys.TEMPORAL_NUM_THREADS_PER_WORKER,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this config be process workunit specific instead of using a generic config?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah probably we can in another PR

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add this in same PR, its just adding one more config in GobblinTemporalConfigurationKeys, default value can be same

String TEMPORAL_NUM_THREADS_PER_EXECUTION_WORKER = PREFIX + "num.threads.per.execution.worker";

executionWorker.start();
workers.add(executionWorker);
logger.info("Worker started for class: {}", GobblinTemporalConfigurationKeys.EXECUTION_WORKER_CLASS);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we tested the scenario with 1 container?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(COMMIT_STEP_WORKFLOW_ID_BASE, config))
.build();

// CommitStepWorkflow inherits default queue from ProcessWorkUnitsWorkflow parent
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where are we setting default queue for CommitActivity? We don't want commit activity in the process queue right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we saying that we have only set the task queu for the child worfklow/activities of ProcessWorkUnitsWorkflow parent. so the CommitStep would default to the default Task queue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes correct

protected WorkerOptions createWorkerOptions() {
return WorkerOptions.newBuilder()
.setDefaultDeadlockDetectionTimeout(TimeUnit.SECONDS.toMillis(DEADLOCK_DETECTION_TIMEOUT_SECONDS))
.setMaxConcurrentActivityExecutionSize(this.maxExecutionConcurrency)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it makes sense to have different configs for this? Currently this is being driven by TEMPORAL_NUM_THREADS_PER_WORKER, but suppose If I want to reduce the number of threads in the worker but don't want to reduce the execution local activity concurrenncy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah good point - I am not sure about this, maybe this can be discussed and handled in another PR.
This PR only handles independent memory configuration and other configs remain same.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets add this in same PR too, so that we have configs to play around and tune if needed


public ExecutionWorker(Config config, WorkflowClient workflowClient) {
super(config, workflowClient);
this.maxExecutionConcurrency = ConfigUtils.getInt(config, GobblinTemporalConfigurationKeys.TEMPORAL_NUM_THREADS_PER_WORKER,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add this in same PR, its just adding one more config in GobblinTemporalConfigurationKeys, default value can be same

String TEMPORAL_NUM_THREADS_PER_EXECUTION_WORKER = PREFIX + "num.threads.per.execution.worker";

protected WorkerOptions createWorkerOptions() {
return WorkerOptions.newBuilder()
.setDefaultDeadlockDetectionTimeout(TimeUnit.SECONDS.toMillis(DEADLOCK_DETECTION_TIMEOUT_SECONDS))
.setMaxConcurrentActivityExecutionSize(this.maxExecutionConcurrency)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets add this in same PR too, so that we have configs to play around and tune if needed

Comment on lines 87 to 92
/**
* Package-private for testing purposes.
*/
int getMaxExecutionConcurrency() {
return maxExecutionConcurrency;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use lombok @Getter(AccessLevel.PROTECTED) for this

Comment on lines 207 to 210
*/
private ExecutionWorker createMockWorker(Config config) throws Exception {
ExecutionWorker worker = Mockito.mock(ExecutionWorker.class, Mockito.CALLS_REAL_METHODS);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are writing unit test for ExecutionWorker, we shouldn't be mocking that class itself, you can mock Config and WorkflowClient passed into constructor

Comment on lines 225 to 228

/**
* Helper to invoke the protected getWorkflowImplClasses method using reflection.
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Protected members should be directly accessible if package for Main Class and Test Class are same, can you check this again as I don't think use of reflection is needed

@Blazer-007 Blazer-007 changed the title Independent Dynamic Scaling for different Activities in Temporal WorkFlow [GOBBLIN-2245] Independent Dynamic Scaling for different Activities in Temporal WorkFlow Jan 15, 2026
@Blazer-007 Blazer-007 merged commit 8e9146e into apache:master Jan 15, 2026
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants