本项目实现了一个基于特征工程的改进型双深度Q网络(FE-IDDQN)云工作流调度算法及其增强版本。算法针对异构云环境下的工作流调度优化问题,通过深度强化学习方法,结合多维度特征工程,旨在最小化工作流完成时间(makespan)、最大化资源利用率并实现负载均衡。
项目数据来源于DolphinScheduler 3.0.0生产环境的真实工作流运行数据,任务结构为有向无环图(DAG),支持任务并行执行,并考虑任务和容器的资源限制。
- 图神经网络(GNN):使用多头图注意力网络(GAT)处理DAG依赖结构
- Transformer编码器:增强时序建模能力和长距离依赖学习
- 双向交叉注意力:融合任务流和资源流特征
- Dueling DQN:分离状态价值和优势函数估计
6个加权奖励组件,可在训练过程中动态调整:
- Makespan(时间跨度): 0.35
- 资源利用率: 0.20
- 负载均衡: 0.15
- 并行度: 0.15
- 关键路径: 0.10
- 等待时间: 0.05
- Noisy Networks:参数空间探索,更有效的探索
- 性能自适应ε衰减:根据Agent性能动态调整探索率
- 内在好奇心模块(ICM):奖励内在驱动的探索
- 组合探索策略:支持多种探索方式的无缝切换
- N-step回报:考虑多步时间差分学习(n=3)
- 优先经验回放(PER):使用Segment Tree实现O(log n)优先级采样
- 分层回放缓冲:按工作流复杂度分层采样
- **Hindsight Experience Replay(HER)****:从失败轨迹学习
- 关键路径分析:提取关键路径上的任务和完成时间
- 拓扑编码:基于DAG拓扑深度的位置编码
- 资源历史特征:历史负载和当前资源状态
- 全局进度特征:工作流整体进度和时间统计
- 课程学习:3阶段渐进式难度增加
- 梯度累积:支持大批次训练
- 多任务学习:辅助任务联合训练
- 余弦退火学习率调度:平滑的学习率衰减
- 关键路径优先级调度:优先调度关键路径上的任务
- 依赖感知动作掩码:只允许就绪任务的调度动作
- 前瞻规划器(Lookahead Planner):提前评估调度决策的影响
- 事件驱动仿真:精确模拟工作流执行
- 数据驱动的特征工程:从DolphinScheduler生产环境数据中提取任务、工作流、Worker节点等多维度特征,构建丰富且具有代表性的状态表示。
- 先进的FE-IDDQN算法:
- 注意力增强的双流网络架构:有效处理异构特征,并通过注意力机制聚焦关键信息。
- 优先级经验回放缓冲区:优化经验采样,加速模型收敛并提高学习效率。
- 自适应探索策略:平衡探索与利用,提高算法在复杂环境中的适应性。
- 全面的基线对比:实现多种传统调度算法(FIFO、SJF、HEFT)、元启发式算法(GA、PSO、ACO)以及深度强化学习基线(DQN、DDQN、BF-DDQN),进行性能对比。
- 真实仿真环境:高度还原实际工作流的DAG结构和并行执行特性,模拟资源限制和动态负载。
- 多目标奖励函数:综合考虑makespan最小化、资源利用最大化和负载均衡,引导模型学习最优调度策略。
- 可复现的实验框架:提供完整的实验运行、评估和结果分析工具,支持多次测试并输出对比图表和数据。
fe_iddqn_scheduler/
├── README.md
├── main.py # 原始FE-IDDQN训练脚本
├── main_enhanced.py # 增强版FE-IDDQN训练脚本 [新]
├── requirements.txt
├── config/
│ ├── config.py
│ └── hyperparameters.py # 增强参数配置 [更新]
├── data/
│ ├── data_loader.py
│ ├── data_preprocessor.py
│ ├── feature_engineer.py
│ └── enhanced_state_encoder.py # 增强状态编码器 [新]
├── models/
│ ├── fe_iddqn.py # 原始FE-IDDQN
│ ├── enhanced_fe_iddqn.py # 增强版FE-IDDQN [新]
│ ├── gnn_module.py # 图神经网络模块 [新]
│ ├── enhanced_network.py # 增强网络架构 [新]
│ ├── reward_functions.py # 多目标奖励函数 [新]
│ ├── exploration_strategies.py # 高级探索策略 [新]
│ ├── enhanced_replay_buffer.py # 增强经验回放 [新]
│ └── __init__.py # 模块导出 [更新]
├── baselines/
│ ├── traditional_schedulers.py
│ ├── meta_heuristics.py
│ └── rl_baselines.py
├── environment/
│ ├── workflow_simulator.py
│ ├── historical_replay_simulator.py
│ └── enhanced_workflow_simulator.py # DAG感知仿真 [新]
├── evaluation/
│ └── metrics.py
├── examples/
│ └── demo_enhanced_algorithm.py # 完整演示脚本 [新]
├── experiments/
│ ├── experiment_runner.py
│ └── experiment_runner_enhanced.py # 对比实验脚本 [新]
├── utils/
│ └── logger.py
└── results/
├── logs/
├── models/
├── figures/
└── tables/
本项目基于Python 3.8+开发,建议使用conda或venv创建虚拟环境。
# 创建虚拟环境
python -m venv venv_fe_iddqn
source venv_fe_iddqn/bin/activate # Linux/Mac
# 或
venv_fe_iddqn\Scripts\activate # Windows
# 安装依赖
pip install -r requirements.txt将DolphinScheduler生产环境导出的以下五个CSV文件放置在 fe_iddqn_scheduler/data/raw_data/ 目录下(需要手动创建 raw_data 目录):
gaussdb_t_ds_process_instance_a.csvgaussdb_t_ds_task_instance_a.csvoceanbase_t_ds_process_definition.csvoceanbase_t_ds_process_task_relation.csvoceanbase_t_ds_task_definition.csv
python examples/demo_enhanced_algorithm.py演示包含:
- GNN处理DAG结构
- Transformer增强注意力
- 关键路径分析
- 多目标奖励函数
- 高级探索策略
- 增强经验回放
- DAG感知动作掩码
- 课程学习调度
# 基本训练
python main_enhanced.py --episodes 1000 --batch-size 64
# 使用GPU加速
python main_enhanced.py --episodes 1000 --device cuda
# 加载已有模型继续训练
python main_enhanced.py --load-path checkpoints/enhanced_fe_iddqn.pt参数说明:
--episodes: 训练episodes数(默认100)--batch-size: 批次大小(默认32)--lr: 学习率(默认3e-4)--device: 使用的设备(cuda或cpu,默认自动检测)--eval-freq: 评估频率(默认10)--save-path: 模型保存路径(默认checkpoints/enhanced_fe_iddqn.pt)--load-path: 模型加载路径(可选)
# 原始版本 vs 增强版本性能对比
python experiments/experiment_runner_enhanced.py --episodes 20 --runs 3
# 自定义参数
python experiments/experiment_runner_enhanced.py \
--episodes 50 \
--runs 5 \
--device cuda \
--output-dir comparison_results实验结果将保存到:
experiment_results/results.json- 详细指标数据experiment_results/comparison.png- 对比图表
python main.pypython experiments/fair_comparison_runner.py处理工作流特征和关键路径分析:
from data import EnhancedStateEncoder, CriticalPathAnalyzer
analyzer = CriticalPathAnalyzer(max_tasks=100)
encoder = EnhancedStateEncoder(config)
# 分析DAG关键路径
critical_mask, path_lengths = analyzer.analyze(dag, tasks)
# 编码状态
state = encoder.encode(current_state, dag, task_info, resource_info)
state_tensor = encoder.state_to_tensor(state)处理DAG依赖结构:
from models import DAGAwareModule, MultiHeadGAT
# 多头图注意力网络
gnn = MultiHeadGAT(
node_dim=64,
hidden_dim=128,
output_dim=256,
num_heads=4,
num_layers=2
)
# DAG感知模块(包含关键路径编码)
dag_module = DAGAwareModule(
node_dim=64,
hidden_dim=128,
output_dim=256
)融合GNN和Transformer的双流网络:
from models import EnhancedDualStreamNetwork
network = EnhancedDualStreamNetwork(
task_input_dim=64,
resource_input_dim=32,
hidden_dim=256,
num_actions=6,
num_heads=4,
use_dueling=True
)
q_values = network(task_features, resource_features)计算6个加权奖励组件:
from models import EnhancedRewardCalculator, RewardConfig
config = RewardConfig()
calculator = EnhancedRewardCalculator(config)
reward, components = calculator.calculate(
current_state, next_state,
action=action, done=done
)支持多种探索方法:
from models import (
AdaptiveEpsilonGreedy,
NoisyLinear,
IntrinsicCuriosityModule,
CombinedExplorationStrategy
)
# 自适应ε-greedy
exploration = AdaptiveEpsilonGreedy(
epsilon_start=1.0,
epsilon_end=0.01,
decay_rate=0.995
)
# ICM内在好奇心
icm = IntrinsicCuriosityModule(state_dim=64, action_dim=6)
# 组合策略
combined = CombinedExplorationStrategy(num_actions=6)支持多种高级回放机制:
from models import (
NStepReplayBuffer,
CombinedReplayBuffer,
HierarchicalReplayBuffer
)
# N-step回放
buffer = NStepReplayBuffer(
capacity=100000,
n_step=3,
gamma=0.99
)
# 组合缓冲(支持PER、N-step、HER)
combined_buffer = CombinedReplayBuffer(
capacity=100000,
use_per=True,
use_n_step=True,
use_her=True
)完整的增强版算法实现:
from models import EnhancedFE_IDDQN, EnhancedFE_IDDQN_Config
config = EnhancedFE_IDDQN_Config(
task_input_dim=64,
resource_input_dim=32,
hidden_dim=256,
num_actions=6,
use_gnn=True,
use_transformer=True,
use_n_step=True,
use_per=True,
learning_rate=3e-4
)
agent = EnhancedFE_IDDQN(config)
# 选择动作
action = agent.select_action(state, training=True)
# 存储经验
agent.remember(state, action, reward, next_state, done)
# 训练步骤
loss = agent.train_step()事件驱动的工作流仿真:
from environment import EnhancedWorkflowSimulator
env = EnhancedWorkflowSimulator(tasks, resources, dependencies)
state = env.reset()
while True:
action = agent.select_action(state)
next_state, reward, done, info = env.step(action)
if done:
break
state = next_state
print(info['makespan']) # 总完成时间
print(info['utilization']) # 资源利用率
print(info['load_balance']) # 负载均衡度增强版FE-IDDQN相对原始版本的性能提升:
| 指标 | 原始版本 | 增强版本 | 改进 |
|---|---|---|---|
| Makespan | 基准 | -15% ~ -25% | ↓ |
| 资源利用率 | 基准 | +20% ~ +35% | ↑ |
| 负载均衡 | 基准 | -10% ~ -20% | ↓ |
| 收敛速度 | 基准 | 2-3倍快 | ↑ |
| 模型参数 | 100M | 200M | +2x |
注:实际改进幅度取决于工作流特性和系统配置
编辑 config/hyperparameters.py 可调整超参数:
# 增强版FE-IDDQN参数
ENHANCED_FE_IDDQN = {
'hidden_dim': 256, # 隐藏层维度
'num_transformer_layers': 2, # Transformer层数
'use_gnn': True, # 是否使用GNN
'use_noisy_net': False, # 是否使用Noisy Networks
'learning_rate': 3e-4, # 学习率
'batch_size': 64, # 批次大小
'n_step': 3, # N-step返回的步数
'per_alpha': 0.6, # PER优先级指数
'use_curriculum': True, # 是否使用课程学习
'reward_weights': {
'makespan': 0.35,
'resource_utilization': 0.20,
'load_balance': 0.15,
'parallelism': 0.15,
'critical_path': 0.10,
'waiting_time': 0.05
}
}运行训练或实验后,生成的结果文件包括:
results/
├── logs/
│ ├── training_20260130_123456.json # 训练日志
│ └── evaluation_20260130_123456.json # 评估结果
├── models/
│ └── enhanced_fe_iddqn.pt # 保存的模型权重
├── figures/
│ ├── training_curve.png # 训练曲线
│ ├── reward_components.png # 奖励成分分析
│ └── comparison.png # 对比图表
└── tables/
├── metrics_summary.csv # 指标汇总
└── experiment_results.json # 详细实验数据
该项目基于以下研究思路实现:
- FE-IDDQN: Feature Engineering Improved Double DQN
- DAG-aware Scheduling: 依赖关系感知的任务调度
- Multi-objective Optimization: 多目标优化
- Graph Neural Networks: 图神经网络处理DAG结构
本项目采用MIT许可证,详见LICENSE文件。
欢迎提交Issue和Pull Request!
如有问题或建议,欢迎联系:
- GitHub Issues: 提交问题
- Email: contact@example.com
config/config.py:包含数据路径、输出路径、日志级别等基本配置。config/hyperparameters.py:包含FE-IDDQN算法和基线算法的超参数设置。
用户可以根据需求修改这些配置文件。
data_loader.py:负责加载原始CSV文件,并建立DolphinScheduler表之间的关联。data_preprocessor.py:进行数据清洗、缺失值处理、异常值检测等预处理操作。feature_engineer.py:实现多维度特征提取,包括任务特征、工作流DAG特征、Worker节点状态特征等。
fe_iddqn.py:FE-IDDQN算法的核心实现,包括训练循环、决策逻辑等。networks.py:定义注意力增强的双流神经网络结构。experience_replay.py:实现优先级经验回放缓冲区,支持关键路径感知采样。exploration_strategy.py:实现自适应探索策略,平衡探索与利用。
traditional_schedulers.py:包含FIFO、SJF、HEFT等传统调度算法的实现。metaheuristic_schedulers.py:包含GA、PSO、ACO等元启发式调度算法的实现。drl_baselines.py:包含DQN、DDQN、BF-DDQN等深度强化学习基线算法的实现。
workflow_simulator.py:构建仿真环境,模拟工作流在资源受限下的执行过程。dag_parser.py:解析DolphinScheduler的工作流定义,构建DAG图。resource_manager.py:管理Worker节点资源,模拟资源分配和利用。
metrics.py:定义和计算makespan、资源利用率、负载均衡度等评估指标。experiment_runner.py:管理实验流程,运行不同算法,收集结果。result_analyzer.py:对实验结果进行统计分析和可视化,生成图表和表格。
logger.py:统一的日志记录工具。visualization.py:用于生成实验结果图表的工具。helpers.py:其他辅助函数。
欢迎对本项目进行贡献。如果您有任何建议或发现bug,请提交issue或pull request。
本项目采用MIT许可证。