Skip to content

Commit a8eded6

Browse files
committed
feat(algorithms, intervals): task scheduler problem
1 parent 3bbb69d commit a8eded6

File tree

7 files changed

+252
-0
lines changed

7 files changed

+252
-0
lines changed

algorithms/intervals/__init__.py

Whitespace-only changes.
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Task Scheduler
2+
3+
You are given an array of CPU tasks represented by uppercase letters (A to Z) and an integer n, which denotes the
4+
cooling period required between any two identical tasks. Each task takes exactly one CPU interval to execute. Therefore,
5+
each CPU interval can either perform a task or remain idle. Tasks can be executed in any order, but the same task must
6+
be separated by at least n intervals.
7+
8+
Determine the minimum number of CPU intervals required to complete all tasks.
9+
10+
**Constraints**
11+
12+
- 1 ≤ `tasks.length` ≤ 1000
13+
- 0 ≤ `n` ≤ 100
14+
- `tasks` consists of uppercase English letters
15+
16+
## Examples
17+
18+
![Example 1](./images/examples/task_scheduler_example_1.png)
19+
![Example 2](./images/examples/task_scheduler_example_2.png)
20+
![Example 3](./images/examples/task_scheduler_example_3.png)
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
from typing import List
2+
from collections import Counter
3+
from heapq import heapify, heappop, heappush
4+
5+
6+
def least_intervals_mathematical(tasks: List[str], n: int) -> int:
7+
"""
8+
The time complexity of is O(N), where N is the number of tasks:
9+
Counter(tasks) takes O(N) time.
10+
The rest of the calculation is constant time, O(1), because the number of unique tasks (A-Z) is fixed at 26, meaning
11+
task_count.values() is at most 26 elements long.
12+
13+
This greedy approach, which relies on the mathematical structure imposed by the most frequent task, is the most
14+
efficient way to solve the Task Scheduler problem.
15+
"""
16+
# get the total number of tasks
17+
total_tasks = len(tasks)
18+
19+
# Step 1: Use Counter to get all frequencies
20+
task_count = Counter(tasks)
21+
if not task_count:
22+
return 0
23+
24+
# most_common() returns a list of (task, frequency) tuples
25+
# most_common(1) gives the highest frequency task: [('A', 3)] -> max_freq = 3
26+
max_freq = task_count.most_common(1)[0][1]
27+
28+
# Step 2: Calculate how many tasks share the max frequency
29+
# This is slightly more efficient than looping through the full dictionary
30+
# because it stops checking once frequencies drop below max_freq.
31+
max_freq_count = sum(1 for freq in task_count.values() if freq == max_freq)
32+
33+
# Step 3: Calculate the least number of intervals
34+
result = (max_freq - 1) * (n + 1) + max_freq_count
35+
36+
# Step 4: Return the maximum of result and total tasks
37+
return max(result, total_tasks)
38+
39+
40+
def least_intervals_with_max_heap(tasks: List[str], n: int) -> int:
41+
"""
42+
Calculates the minimum CPU intervals required using an iterative Max Heap approach.
43+
44+
This Max Heap solution is slightly less performant than the mathematical one, but it explicitly models the idle time
45+
(slots where max_heap is empty mid-cycle).
46+
47+
Max Heap Complexity: O(T⋅KlogK), where T is the total number of intervals (can be up to N⋅n), and K is the number
48+
of unique tasks (at most 26).
49+
"""
50+
# 1. Count Frequencies
51+
task_count = Counter(tasks)
52+
if not task_count:
53+
return 0
54+
55+
time = 0
56+
# 2. Setup Max Heap (store negative frequencies for a Python Min Heap)
57+
# The task with the highest frequency will have the smallest negative value (e.g., -3).
58+
max_heap = [-freq for freq in task_count.values()]
59+
heapify(max_heap)
60+
61+
# Loop continues until all tasks are executed (heap is empty)
62+
while max_heap:
63+
temp_list = []
64+
cycle_len = 0
65+
66+
# Execute tasks for one cooling cycle (n + 1 slots)
67+
# We process up to n + 1 tasks, always picking the highest frequency one available.
68+
for _ in range(n + 1):
69+
if max_heap:
70+
# Pop the highest frequency task
71+
frequency = -heappop(max_heap)
72+
73+
if frequency > 1:
74+
# If the task needs to run again, store the remaining count
75+
temp_list.append(frequency - 1)
76+
77+
cycle_len += 1
78+
else:
79+
# Stop iterating slots if the heap is empty, though time will still advance.
80+
break
81+
82+
# 3. Re-insert Tasks
83+
# Push the remaining counts back onto the heap (as negative values)
84+
for freq in temp_list:
85+
heappush(max_heap, -freq)
86+
87+
# 4. Time Calculation
88+
if max_heap:
89+
# If the heap is NOT empty, it means there are still tasks remaining.
90+
# We must wait for the full cooling period, so we advance time by n + 1.
91+
time += n + 1
92+
else:
93+
# If the heap IS empty, it means all tasks were completed in this cycle.
94+
# We only count the intervals actually used (cycle_len).
95+
time += cycle_len
96+
97+
return time
71.3 KB
Loading
80.7 KB
Loading
102 KB
Loading
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
import unittest
2+
from typing import List
3+
from parameterized import parameterized
4+
from algorithms.intervals.task_scheduler import (
5+
least_intervals_mathematical,
6+
least_intervals_with_max_heap,
7+
)
8+
9+
10+
class LeastIntervalsTestCase(unittest.TestCase):
11+
@parameterized.expand(
12+
[
13+
(["A", "A", "B", "B"], 2, 5),
14+
(["A", "B", "A", "A", "B", "C"], 3, 9),
15+
(["A", "C", "B", "A"], 0, 4),
16+
(["A", "A", "A", "B", "B", "C", "C"], 1, 7),
17+
(["S", "I", "V", "U", "W", "D", "U", "X"], 0, 8),
18+
(
19+
[
20+
"A",
21+
"K",
22+
"X",
23+
"M",
24+
"W",
25+
"D",
26+
"X",
27+
"B",
28+
"D",
29+
"C",
30+
"O",
31+
"Z",
32+
"D",
33+
"E",
34+
"Q",
35+
],
36+
3,
37+
15,
38+
),
39+
(
40+
[
41+
"A",
42+
"B",
43+
"C",
44+
"O",
45+
"Q",
46+
"C",
47+
"Z",
48+
"O",
49+
"X",
50+
"C",
51+
"W",
52+
"Q",
53+
"Z",
54+
"B",
55+
"M",
56+
"N",
57+
"R",
58+
"L",
59+
"C",
60+
"J",
61+
],
62+
10,
63+
34,
64+
),
65+
]
66+
)
67+
def test_least_intervals_mathematical(
68+
self, tasks: List[str], n: int, expected: int
69+
):
70+
actual = least_intervals_mathematical(tasks, n)
71+
self.assertEqual(expected, actual)
72+
73+
@parameterized.expand(
74+
[
75+
(["A", "A", "B", "B"], 2, 5),
76+
(["A", "B", "A", "A", "B", "C"], 3, 9),
77+
(["A", "C", "B", "A"], 0, 4),
78+
(["A", "A", "A", "B", "B", "C", "C"], 1, 7),
79+
(["S", "I", "V", "U", "W", "D", "U", "X"], 0, 8),
80+
(
81+
[
82+
"A",
83+
"K",
84+
"X",
85+
"M",
86+
"W",
87+
"D",
88+
"X",
89+
"B",
90+
"D",
91+
"C",
92+
"O",
93+
"Z",
94+
"D",
95+
"E",
96+
"Q",
97+
],
98+
3,
99+
15,
100+
),
101+
(
102+
[
103+
"A",
104+
"B",
105+
"C",
106+
"O",
107+
"Q",
108+
"C",
109+
"Z",
110+
"O",
111+
"X",
112+
"C",
113+
"W",
114+
"Q",
115+
"Z",
116+
"B",
117+
"M",
118+
"N",
119+
"R",
120+
"L",
121+
"C",
122+
"J",
123+
],
124+
10,
125+
34,
126+
),
127+
]
128+
)
129+
def test_least_intervals_max_heap(self, tasks: List[str], n: int, expected: int):
130+
actual = least_intervals_with_max_heap(tasks, n)
131+
self.assertEqual(expected, actual)
132+
133+
134+
if __name__ == "__main__":
135+
unittest.main()

0 commit comments

Comments
 (0)