Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
File renamed without changes.
93 changes: 93 additions & 0 deletions operating_system/process_synchronization/semaphore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from threading import Event, Thread, current_thread
from time import sleep


class CustomSemaphore:
"""
Semaphore class to control the access to shared resources.

>>> semaphore = CustomSemaphore(2)
>>> semaphore.permits
2
>>> event = Event()
>>> semaphore.acquire(event, "Thread-1")
Thread-1 has acquired a resource.
>>> semaphore.acquire(event, "Thread-2")
Thread-2 has acquired a resource.
>>> semaphore.release()
A resource has been released.
>>> semaphore.release()
A resource has been released.
"""

def __init__(self, permits: int) -> None:
self.permits: int = permits
self.waiting_queue: list[str] = []
self.event_queue: list[Event] = []

def acquire(self, event: Event, thread_name: str) -> None:
self.permits -= 1
if self.permits < 0:
self.event_queue.append(event)
self.waiting_queue.append(thread_name)
print(f"{thread_name} is waiting for a resource.")
event.wait() # Thread waits until a resource is released
else:
print(f"{thread_name} has acquired a resource.")

def release(self) -> None:
self.permits += 1
if self.permits <= 0 and self.waiting_queue:
event = self.event_queue.pop(0)
self.waiting_queue.pop(0)
event.set() # Release the waiting thread
print("A waiting thread has been released.")
else:
print("A resource has been released.")


class ThreadManager:
"""
A class to manage the creation and execution of threads using the custom semaphore.
>>> semaphore = CustomSemaphore(2)
>>> manager = ThreadManager(semaphore, 2)
>>> manager.start_threads()
Thread-1 (thread_task) has acquired a resource.
Thread 1 is working with the resource.
Thread-2 (thread_task) has acquired a resource.
Thread 2 is working with the resource.
>>> sleep(4)
Thread 1 has finished and is releasing the resource.
A resource has been released.
Thread 2 has finished and is releasing the resource.
A resource has been released.
"""

def __init__(self, semaphore: CustomSemaphore, thread_count: int) -> None:
self.semaphore: CustomSemaphore = semaphore
self.thread_count: int = thread_count

def start_threads(self) -> None:
for i in range(self.thread_count):
thread = Thread(target=self.thread_task, args=(i + 1,))
thread.start()

def thread_task(self, thread_id: int) -> None:
# Task executed by each thread
event = Event()
thread_name = current_thread().name
self.semaphore.acquire(event, thread_name)
print(f"Thread {thread_id} is working with the resource.")
sleep(2) # Simulate some work
print(f"Thread {thread_id} has finished and is releasing the resource.")
self.semaphore.release()


if __name__ == "__main__":
resource_count = int(input("Enter the number of resources: "))
thread_count = int(input("Enter the number of threads: "))

semaphore = CustomSemaphore(resource_count)
thread_manager = ThreadManager(semaphore, thread_count)

thread_manager.start_threads()
Empty file.
Original file line number Diff line number Diff line change
@@ -1,67 +1,67 @@
"""
Round Robin is a scheduling algorithm.
In Round Robin each process is assigned a fixed time slot in a cyclic way.
https://en.wikipedia.org/wiki/Round-robin_scheduling
"""
from __future__ import annotations
from statistics import mean
def calculate_waiting_times(burst_times: list[int]) -> list[int]:
"""
Calculate the waiting times of a list of processes that have a specified duration.
Return: The waiting time for each process.
>>> calculate_waiting_times([10, 5, 8])
[13, 10, 13]
>>> calculate_waiting_times([4, 6, 3, 1])
[5, 8, 9, 6]
>>> calculate_waiting_times([12, 2, 10])
[12, 2, 12]
"""
quantum = 2
rem_burst_times = list(burst_times)
waiting_times = [0] * len(burst_times)
t = 0
while True:
done = True
for i, burst_time in enumerate(burst_times):
if rem_burst_times[i] > 0:
done = False
if rem_burst_times[i] > quantum:
t += quantum
rem_burst_times[i] -= quantum
else:
t += rem_burst_times[i]
waiting_times[i] = t - burst_time
rem_burst_times[i] = 0
if done is True:
return waiting_times
def calculate_turn_around_times(
burst_times: list[int], waiting_times: list[int]
) -> list[int]:
"""
>>> calculate_turn_around_times([1, 2, 3, 4], [0, 1, 3])
[1, 3, 6]
>>> calculate_turn_around_times([10, 3, 7], [10, 6, 11])
[20, 9, 18]
"""
return [burst + waiting for burst, waiting in zip(burst_times, waiting_times)]
if __name__ == "__main__":
burst_times = [3, 5, 7]
waiting_times = calculate_waiting_times(burst_times)
turn_around_times = calculate_turn_around_times(burst_times, waiting_times)
print("Process ID \tBurst Time \tWaiting Time \tTurnaround Time")
for i, burst_time in enumerate(burst_times):
print(
f" {i + 1}\t\t {burst_time}\t\t {waiting_times[i]}\t\t "
f"{turn_around_times[i]}"
)
print(f"\nAverage waiting time = {mean(waiting_times):.5f}")
print(f"Average turn around time = {mean(turn_around_times):.5f}")
"""
Round Robin is a scheduling algorithm.
In Round Robin each process is assigned a fixed time slot in a cyclic way.
https://en.wikipedia.org/wiki/Round-robin_scheduling
"""

from __future__ import annotations

from statistics import mean


def calculate_waiting_times(burst_times: list[int]) -> list[int]:
"""
Calculate the waiting times of a list of processes that have a specified duration.

Return: The waiting time for each process.
>>> calculate_waiting_times([10, 5, 8])
[13, 10, 13]
>>> calculate_waiting_times([4, 6, 3, 1])
[5, 8, 9, 6]
>>> calculate_waiting_times([12, 2, 10])
[12, 2, 12]
"""
quantum = 2
rem_burst_times = list(burst_times)
waiting_times = [0] * len(burst_times)
t = 0
while True:
done = True
for i, burst_time in enumerate(burst_times):
if rem_burst_times[i] > 0:
done = False
if rem_burst_times[i] > quantum:
t += quantum
rem_burst_times[i] -= quantum
else:
t += rem_burst_times[i]
waiting_times[i] = t - burst_time
rem_burst_times[i] = 0
if done is True:
return waiting_times


def calculate_turn_around_times(
burst_times: list[int], waiting_times: list[int]
) -> list[int]:
"""
>>> calculate_turn_around_times([1, 2, 3, 4], [0, 1, 3])
[1, 3, 6]
>>> calculate_turn_around_times([10, 3, 7], [10, 6, 11])
[20, 9, 18]
"""
return [burst + waiting for burst, waiting in zip(burst_times, waiting_times)]


if __name__ == "__main__":
burst_times = [3, 5, 7]
waiting_times = calculate_waiting_times(burst_times)
turn_around_times = calculate_turn_around_times(burst_times, waiting_times)
print("Process ID \tBurst Time \tWaiting Time \tTurnaround Time")
for i, burst_time in enumerate(burst_times):
print(
f" {i + 1}\t\t {burst_time}\t\t {waiting_times[i]}\t\t "
f"{turn_around_times[i]}"
)
print(f"\nAverage waiting time = {mean(waiting_times):.5f}")
print(f"Average turn around time = {mean(turn_around_times):.5f}")
Loading