|
1 | 1 | package net.hawkengine.core.pipelinescheduler; |
2 | 2 |
|
3 | | -import net.hawkengine.model.Agent; |
4 | | -import net.hawkengine.model.Job; |
| 3 | +import net.hawkengine.model.*; |
5 | 4 | import net.hawkengine.model.enums.JobStatus; |
| 5 | +import net.hawkengine.model.enums.NotificationType; |
| 6 | +import net.hawkengine.model.enums.StageStatus; |
| 7 | +import net.hawkengine.model.enums.Status; |
| 8 | +import net.hawkengine.services.AgentService; |
| 9 | +import net.hawkengine.services.PipelineService; |
| 10 | +import net.hawkengine.services.interfaces.IAgentService; |
| 11 | +import net.hawkengine.services.interfaces.IPipelineService; |
| 12 | +import net.hawkengine.ws.EndpointConnector; |
6 | 13 | import org.apache.log4j.Logger; |
7 | 14 |
|
8 | | -import java.util.ArrayList; |
9 | 15 | import java.util.List; |
| 16 | +import java.util.stream.Collectors; |
10 | 17 |
|
11 | 18 | public class JobAssignerService { |
12 | 19 | private static final Logger LOGGER = Logger.getLogger(JobAssignerService.class.getName()); |
13 | 20 |
|
14 | | - public Agent assignAgentToJob(Job job, List<Agent> agents) { |
15 | | - Agent result = null; |
16 | | - if (job.getStatus() == JobStatus.SCHEDULED) { |
17 | | - Agent assignedAgent = agents.stream().filter(a -> a.getId().equals(job.getAssignedAgentId())).findFirst().orElse(null); |
18 | | - result = assignedAgent; |
19 | | - boolean isEligible = this.isAgentEligibleForJob(job, assignedAgent); |
20 | | - if (!isEligible) { |
21 | | - job.setStatus(JobStatus.AWAITING); |
22 | | - assignedAgent.setAssigned(false); |
23 | | - result = assignedAgent; |
24 | | - LOGGER.info(String.format("Job %s unassigned from Agent %s", job.getJobDefinitionName(), assignedAgent.getName())); |
25 | | - } |
26 | | - } |
27 | | - |
28 | | - if (job.getStatus() == JobStatus.AWAITING) { |
29 | | - List<Agent> eligibleAgents = this.getEligibleAgentsForJob(job, agents); |
30 | | - Agent agentForJob = this.pickMostSuitableAgent(eligibleAgents); |
31 | | - if (agentForJob != null) { |
32 | | - job.setAssignedAgentId(agentForJob.getId()); |
33 | | - job.setStatus(JobStatus.SCHEDULED); |
34 | | - agentForJob.setAssigned(true); |
35 | | - result = agentForJob; |
36 | | - LOGGER.info(String.format("Job %s assigned to Agent %s", job.getJobDefinitionName(), agentForJob.getName())); |
37 | | - } |
38 | | - } |
| 21 | + private IAgentService agentService; |
| 22 | + private IPipelineService pipelineService; |
| 23 | + private JobAssignerUtilities jobAssignerUtilities; |
39 | 24 |
|
40 | | - return result; |
| 25 | + public JobAssignerService() { |
| 26 | + this.agentService = new AgentService(); |
| 27 | + this.pipelineService = new PipelineService(); |
| 28 | + this.jobAssignerUtilities = new JobAssignerUtilities(); |
41 | 29 | } |
42 | 30 |
|
43 | | - public List<Agent> getEligibleAgentsForJob(Job job, List<Agent> agents) { |
44 | | - List<Agent> eligibleAgents = new ArrayList<>(); |
45 | | - for (Agent agent : agents) { |
46 | | - boolean isEligible = this.isAgentEligibleForJob(job, agent); |
| 31 | + public void checkUnassignedJobs(List<Agent> agents) { |
| 32 | + List<Agent> filteredAgents = agents.stream().filter(a -> a.isConnected() && a.isEnabled()).collect(Collectors.toList()); |
| 33 | + List<Pipeline> pipelinesInProgress = (List<Pipeline>) this.pipelineService.getAllPreparedPipelinesInProgress().getObject(); |
| 34 | + for (Pipeline pipeline : pipelinesInProgress) { |
| 35 | + boolean isSetToAwaiting = false; |
| 36 | + Stage stageInProgress = pipeline.getStages().stream().filter(s -> s.getStatus() == StageStatus.IN_PROGRESS).findFirst().orElse(null); |
| 37 | + if (stageInProgress == null) { |
| 38 | + continue; |
| 39 | + } |
47 | 40 |
|
48 | | - if (isEligible) { |
49 | | - eligibleAgents.add(agent); |
| 41 | + for (Job job : stageInProgress.getJobs()) { |
| 42 | + if (job.getStatus() == JobStatus.UNASSIGNED) { |
| 43 | + boolean hasAssignableAgent = this.jobAssignerUtilities.hasAssignableAgent(job, filteredAgents); |
| 44 | + if (!hasAssignableAgent) { |
| 45 | + job.setStatus(JobStatus.AWAITING); |
| 46 | + isSetToAwaiting = true; |
| 47 | + LOGGER.info(String.format("Job %s has no assignable Agents.", job.getJobDefinitionName())); |
| 48 | + } |
| 49 | + } |
50 | 50 | } |
51 | | - } |
52 | 51 |
|
53 | | - return eligibleAgents; |
| 52 | + if (isSetToAwaiting) { |
| 53 | + stageInProgress.setStatus(StageStatus.AWAITING); |
| 54 | + pipeline.setStatus(Status.AWAITING); |
| 55 | + this.pipelineService.update(pipeline); |
| 56 | + String message = String.format("Pipeline %s set to AWAITING.", pipeline.getPipelineDefinitionName()); |
| 57 | + LOGGER.info(message); |
| 58 | + ServiceResult notification = new ServiceResult(null, NotificationType.WARNING, message); |
| 59 | + EndpointConnector.passResultToEndpoint("NotificationService", "sendMessage", notification); |
| 60 | + } |
| 61 | + } |
54 | 62 | } |
55 | 63 |
|
56 | | - public Agent pickMostSuitableAgent(List<Agent> agents) { |
57 | | - Agent agentForJob = null; |
58 | | - if (agents.size() == 1) { |
59 | | - agentForJob = agents.get(0); |
60 | | - } else if (agents.size() > 1) { |
61 | | - int numberOfResources = Integer.MAX_VALUE; |
62 | | - for (Agent agent : agents) { |
63 | | - if (agent.getResources().size() < numberOfResources) { |
64 | | - numberOfResources = agent.getResources().size(); |
65 | | - agentForJob = agent; |
| 64 | + public void checkAwaitingJobs(List<Agent> agents) { |
| 65 | + List<Agent> filteredAgents = agents.stream().filter(a -> a.isConnected() && a.isEnabled()).collect(Collectors.toList()); |
| 66 | + List<Pipeline> awaitingPipelines = (List<Pipeline>) this.pipelineService.getAllPreparedAwaitingPipelines().getObject(); |
| 67 | + for (Pipeline pipeline : awaitingPipelines) { |
| 68 | + Stage awaitingStage = pipeline.getStages().stream().filter(s -> s.getStatus() == StageStatus.AWAITING).findFirst().orElse(null); |
| 69 | + if (awaitingStage == null) { |
| 70 | + continue; |
| 71 | + } |
| 72 | + |
| 73 | + for (Job job : awaitingStage.getJobs()) { |
| 74 | + if (job.getStatus() == JobStatus.AWAITING) { |
| 75 | + boolean hasAssignableAgent = this.jobAssignerUtilities.hasAssignableAgent(job, filteredAgents); |
| 76 | + if (hasAssignableAgent) { |
| 77 | + job.setStatus(JobStatus.UNASSIGNED); |
| 78 | + LOGGER.info(String.format("Job %s set back to IN_PROGRESS.", job.getJobDefinitionName())); |
| 79 | + } |
66 | 80 | } |
67 | 81 | } |
68 | | - } |
69 | 82 |
|
70 | | - return agentForJob; |
| 83 | + boolean hasAwaitingJobs = awaitingStage.getJobs().stream().anyMatch(j -> j.getStatus() == JobStatus.AWAITING); |
| 84 | + if (!hasAwaitingJobs) { |
| 85 | + awaitingStage.setStatus(StageStatus.IN_PROGRESS); |
| 86 | + pipeline.setStatus(Status.IN_PROGRESS); |
| 87 | + this.pipelineService.update(pipeline); |
| 88 | + LOGGER.info(String.format("Pipeline %s set back to IN_PROGRESS.", pipeline.getPipelineDefinitionName())); |
| 89 | + } |
| 90 | + } |
71 | 91 | } |
72 | 92 |
|
73 | | - public boolean isAgentEligibleForJob(Job job, Agent agent) { |
74 | | - boolean isEligible = true; |
75 | | - if ((agent == null) || !agent.isConnected() || !agent.isEnabled() || agent.isRunning() || agent.isAssigned()) { |
76 | | - isEligible = false; |
77 | | - } else { |
78 | | - for (String resource : job.getResources()) { |
79 | | - if (!(agent.getResources().contains(resource))) { |
80 | | - isEligible = false; |
81 | | - break; |
| 93 | + public void assignJobs(List<Agent> agents) { |
| 94 | + List<Agent> filteredAgents = agents.stream().filter(a -> a.isConnected() && a.isEnabled() && !a.isRunning() && !a.isAssigned()).collect(Collectors.toList()); |
| 95 | + List<Pipeline> pipelines = (List<Pipeline>) this.pipelineService.getAllPreparedPipelinesInProgress().getObject(); |
| 96 | + for (Pipeline pipeline : pipelines) { |
| 97 | + for (Stage stage : pipeline.getStages()) { |
| 98 | + if (stage.getStatus() == StageStatus.IN_PROGRESS) { |
| 99 | + for (Job job : stage.getJobs()) { |
| 100 | + if (filteredAgents.size() != 0) { |
| 101 | + Agent agent = this.jobAssignerUtilities.assignAgentToJob(job, filteredAgents); |
| 102 | + if (agent != null) { |
| 103 | + ServiceResult result = this.agentService.update(agent); |
| 104 | + EndpointConnector.passResultToEndpoint(AgentService.class.getSimpleName(), "update", result); |
| 105 | + } |
| 106 | + } |
| 107 | + } |
82 | 108 | } |
83 | 109 | } |
84 | | - } |
85 | 110 |
|
86 | | - return isEligible; |
| 111 | + this.pipelineService.update(pipeline); |
| 112 | +// EndpointConnector.passResultToEndpoint(PipelineService.class.getSimpleName(), "update", result); |
| 113 | + } |
87 | 114 | } |
88 | 115 | } |
0 commit comments