-
-
Notifications
You must be signed in to change notification settings - Fork 12
distributed_computing_analysis.md
Mohamed edited this page Jun 10, 2025
·
1 revision
-
Location:
example/core/example10_distributed_computing.cpp - Objective: This example constructs a simulation of a distributed computing environment. It effectively demonstrates how multiple QB actors can collaborate to manage a workflow involving task generation, intelligent scheduling, parallel execution by worker nodes, result aggregation, and overall system monitoring. It's a valuable case study for understanding more complex, multi-stage actor interactions and asynchronous process simulation.
Through this analysis, you will see how QB facilitates:
- Complex, multi-actor workflows.
- Dynamic task scheduling and basic load balancing concepts.
- Simulation of work and delays using asynchronous callbacks.
- Centralized coordination and state management by dedicated actors.
- System-wide monitoring and statistics gathering.
The simulation is orchestrated by several types of actors, each with distinct responsibilities, potentially running across multiple VirtualCores:
-
Role: Periodically creates new computational
Taskobjects (defined in the example with properties like type, priority, complexity, and data). -
QB Integration: Standard
qb::Actor. -
Functionality:
- Uses
qb::io::async::callbackto triggergenerateTask()at intervals determined by aTASKS_PER_SECONDconstant. -
generateTask(): Constructs aTaskobject with randomized attributes. - Encapsulates the
Taskwithin aTaskMessageevent andpushes it to theTaskSchedulerActor. - Ceases generation upon receiving a
ShutdownMessage.
- Uses
-
Role: Receives tasks, queues them, and dispatches them to available
WorkerNodeActors based on a scheduling strategy (prioritization and basic load awareness). -
QB Integration: Standard
qb::Actor. -
State Management:
-
_worker_ids: A list of availableWorkerNodeActorIDs. -
_worker_metrics: A map (qb::ActorIdtoWorkerMetrics) storing the last known status (heartbeat, utilization) of each worker. -
_task_queue: Astd::dequeofstd::shared_ptr<Task>, acting as the pending task buffer. -
_active_tasks: A map (task_idtostd::shared_ptr<Task>) tracking tasks currently assigned to workers.
-
-
Event Handling & Logic:
-
on(TaskMessage&): Adds the new task to_task_queueand attempts to schedule it immediately viascheduleTasks(). -
on(WorkerHeartbeatMessage&): Updates thelast_heartbeattimestamp for the reporting worker in_worker_metrics. -
on(WorkerStatusMessage&): Updates the fullWorkerMetrics(including utilization) for the reporting worker. -
on(ResultMessage&): ReceivesTaskResultfrom aResultCollectorActor(indirectly signaling task completion by a worker), removes the task from_active_tasks, and attempts toscheduleTasks()to dispatch new work if workers are free. -
on(UpdateWorkersMessage&): Receives the initial list ofWorkerNodeActorIDs from theSystemMonitorActor.
-
- **Scheduling (
scheduleTasks()):- Sorts
_task_queuebyTask::priority(higher priority first). - Iterates through known
_worker_ids. - For each worker, calls
isWorkerAvailable()to check its status. - If a worker is available and tasks are pending, dequeues the highest priority task, moves it to
_active_tasks, andpushes aTaskAssignmentMessage(containing theTask) to that worker.
- Sorts
-
Worker Availability (
isWorkerAvailable()): Checks if metrics for the worker exist, if itslast_heartbeatis recent (withinHEARTBEAT_TIMEOUT), and if its reportedutilizationis below a defined threshold (e.g., 80%), indicating it has capacity. -
Load Assessment (
assessLoadBalance()): Periodically scheduled viaqb::io::async::callback. Calculates and logs average worker utilization and current task queue sizes for monitoring purposes.
-
Role: Receives
TaskAssignmentMessages and simulates the execution of the assignedTask. -
QB Integration: Standard
qb::Actor. -
State Management:
_current_task,_metrics(its ownWorkerMetrics),_is_busyflag,_simulation_start_time,_busy_start_time. - **Initialization & Reporting (
onInit(),scheduleHeartbeat(),scheduleMetricsUpdate()):- In
onInit(), after receivingInitializeMessage, it starts two recurringqb::io::async::callbackchains:-
scheduleHeartbeat(): Periodically sends aWorkerHeartbeatMessage(containing its ID, current time, and busy status) to theTaskSchedulerActor. -
scheduleMetricsUpdate(): Periodically calculates its own utilization and other metrics, then sends aWorkerStatusMessageto theTaskSchedulerActor.
-
- In
- **Task Execution (
on(TaskAssignmentMessage&)):- If not already
_is_busy, accepts the task. - Sets
_is_busy = true, stores_current_task, updates the task's status toIN_PROGRESS. -
pushes aTaskStatusUpdateMessageback to theTaskSchedulerActor. -
Simulates Work: Schedules a call to its own
completeCurrentTask()method usingqb::io::async::callback. The delay for this callback is determined by thetask->complexity(usinggenerateProcessingTime()).
- If not already
- **Task Completion (
completeCurrentTask()- invoked by the scheduled callback):- Calculates simulated processing time.
- Randomly determines if the task succeeded or failed.
- Updates its internal
_metrics(total tasks processed, processing time, success/failure count). - Creates a
TaskResultobject. -
pushes aResultMessage(containing theTaskResult) to theResultCollectorActor. -
pushes a finalTaskStatusUpdateMessage(COMPLETED or FAILED) to theTaskSchedulerActor. - Sets
_is_busy = false, making itself available for new tasks.
-
Role: Receives
ResultMessages fromWorkerNodeActors and stores/logs the outcomes of completed tasks. -
QB Integration: Standard
qb::Actor. -
State Management:
_results(a map fromtask_idtoTaskResult). -
Event Handling:
-
on(InitializeMessage&): Activates the actor. -
on(ResultMessage&): Stores the receivedresultin its_resultsmap and logs the result to the console. -
on(ShutdownMessage&): Calculates and prints final statistics (overall success rate, average processing time across all tasks) based on the collected_results.
-
- Role: Sets up the entire simulation, starts the actors, monitors overall system performance, and initiates a graceful shutdown.
-
QB Integration: Standard
qb::Actor. - **Initialization (
onInit()andon(InitializeMessage&)):- Creates instances of all other actor types (
TaskGeneratorActor,TaskSchedulerActor,ResultCollectorActor, and a pool ofWorkerNodeActors), assigning them to different cores (qb::Main::addActor). - Stores their
ActorIds. -
pushes anInitializeMessageto all created actors to kickstart their operations. - Sends an initial
UpdateWorkersMessage(containing all worker IDs) to theTaskSchedulerActor.
- Creates instances of all other actor types (
- **Performance Monitoring (
schedulePerformanceReport()):- Uses
qb::io::async::callbackto periodically call itself (by pushing aSystemStatsMessageto its own ID after calculation). - Calculates system-wide statistics (e.g., total tasks generated, completed, failed; overall throughput) using global
std::atomiccounters (which are incremented by other actors likeTaskGeneratorActorandWorkerNodeActor). - Logs these system-wide stats.
- Uses
- **Simulation Shutdown (
shutdownSystem()):- Scheduled via
qb::io::async::callbackto run afterSIMULATION_DURATION_SECONDS. - Performs a final performance report.
-
broadcast<ShutdownMessage>()to all actors to signal them to stop their activities and terminate gracefully. - Finally, calls
qb::Main::stop()to stop the entire engine.
- Scheduled via
- Complex Multi-Actor Workflow: Illustrates a sophisticated pipeline with clear separation of concerns: generation, scheduling, execution, collection, and system-level monitoring.
-
Asynchronous Simulation of Work: Extensive use of
qb::io::async::callbackto simulate task processing times and to schedule periodic actions (heartbeats, metrics updates, report generation) without blocking anyVirtualCore. -
Centralized Coordination & State:
-
TaskSchedulerActor: Manages the central task queue and worker availability. -
ResultCollectorActor: Aggregates all final task outcomes. -
SystemMonitorActor: Orchestrates the setup and shutdown of the entire simulation.
-
-
Dynamic Task Scheduling & Basic Load Balancing: The
TaskSchedulerActoruses worker heartbeats and reported metrics (utilization) to make decisions about which worker should receive the next task, also prioritizing tasks from its queue. -
Worker Self-Monitoring and Reporting:
WorkerNodeActors proactively send heartbeats and status updates to the scheduler, enabling the scheduler to make informed decisions. -
Inter-Actor Event Design: Shows various custom event types (
TaskMessage,WorkerStatusMessage,ResultMessage,ShutdownMessage, etc.) designed to carry specific information between collaborating actors. -
Graceful Shutdown: A coordinated shutdown process initiated by the
SystemMonitorActorvia aShutdownMessage, allowing other actors to finalize their work and report statistics before the engine stops. -
Global Atomics for High-Level Stats: Use of
std::atomicglobal counters demonstrates a simple way to aggregate high-level system statistics from multiple concurrent actors with thread safety (though for more complex stats, a dedicated statistics actor would be preferable).
This distributed_computing example serves as a rich template for understanding how to structure larger, more intricate applications using the QB Actor Framework, particularly those involving distributed workflows and resource management.
(Next Example Analysis: file_monitor Example Analysis**)