-
Notifications
You must be signed in to change notification settings - Fork 3
API Reference
Class to manage the Device Runtime.
It is responsible for offloading functions to the Cognit Frontend.
DeviceRuntime(config_path: str = DEFAULT_CONFIG_PATH)Parameters:
-
config_path(str, optional) – Path to the configuration file used to access the Cognit Frontend.
Example:
runtime = DeviceRuntime("/path/to/config.json")- cognit_config (CognitConfig) – Stores Cognit configuration.
- sync_result_queue (SyncResultQueue) – Handles synchronous function results.
- cognit_logger (CognitLogger) – Logger for runtime messages.
- call_queue (CallQueue) – Queue to store Call objects.
- current_reqs (Scheduling | None) – Current scheduling requirements.
- sm_handler (StateMachineHandler | None) – State machine handler instance.
- sm_thread (Thread | None) – Thread running the state machine.
Launches the state machine (SM) thread with the provided requirements.
Parameters:
-
init_reqs(Scheduling) – Requirements to be considered when offloading functions.
Returns: bool – True if initialized successfully, False otherwise.
Example:
runtime.init({"FLAVOUR": "flavour1", "GEOLOCATION": {"latitude": 0.0, "longitude": 0.0}})Stops the state machine (SM) thread.
Returns: bool – True if stopped successfully, False otherwise.
Example:
runtime.stop()Relaunches the SM thread with new requirements.
Parameters:
-
new_reqs(Scheduling) – New scheduling requirements.
Returns: bool – True if requirements were updated successfully, False otherwise.
Example:
runtime.update_requirements({"FLAVOUR": "flavour1", "GEOLOCATION": {"latitude": 0.0, "longitude": 0.0}})Offloads a function asynchronously.
Parameters:
-
function(Callable) – The target function to be offloaded. -
callback(Callable) – Callback executed when the offloaded function finishes. -
params(tuple) – Arguments for the function.
Returns: bool – True if the function was queued successfully, False otherwise.
Example:
def my_callback(result):
print("Async result:", result)
runtime.call_async(my_function, my_callback, 1, 2, 3)Offloads a function synchronously.
Parameters:
-
function(Callable) – The target function to be offloaded. -
params(tuple) – Arguments for the function.
Returns: ExecResponse – Execution result (or None if the call failed).
Example:
result = runtime.call(my_function, "a", "b")
print(result)runtime = DeviceRuntime("/path/to/config.json")
# Initialize with requirements
runtime.init({"FLAVOUR": "flavour0", "GEOLOCATION": {"latitude": 100.0, "longitude": 100.0}})
# Synchronous call
result = runtime.call(my_function, 10, 20)
print(f"Result: {result}")
# Asynchronous call
runtime.call_async(my_function, lambda r: print("Async:", r), 5)
# Update requirements
runtime.update_requirements({"FLAVOUR": "flavour1", "GEOLOCATION": {"latitude": 0.0, "longitude": 0.0}})
# Stop runtime
runtime.stop()State machine that manages the Device Runtime in the Cognit framework, handling authentication, requirement uploads, Edge Cluster Frontend address retrieval, and function offloading.
-
cfc(CognitFrontendClient) – Cognit Frontend Client instance. -
ecf(EdgeClusterFrontendClient) – Edge Cluster Frontend Client instance. -
token(str) – JWT token for authentication. -
requirements(Scheduling) – Current application requirements. -
new_requirements(Scheduling) – New requirements pending update. -
new_ecf_address(str) – New Edge Cluster Frontend address if updated. -
call_queue(CallQueue) – Queue for offloaded function calls. -
sync_results_queue(SyncResultQueue) – Queue for synchronous execution results. -
logger(CognitLogger) – Logger instance. -
timer(CallbackTimer) – Timer for periodic ECF address updates. - Counters and state flags for retry limits and requirement changes.
-
init– Initial authentication state. -
send_init_request– Uploads application requirements. -
get_ecf_address– Retrieves Edge Cluster Frontend address. -
ready– Ready to offload functions.
-
success_auth–init → send_init_requestif token is valid. -
repeat_auth–init → initif token is empty. -
requirements_up–send_init_request → get_ecf_addressif requirements uploaded. -
token_not_valid_requirements–send_init_request → initif CFC not connected. -
retry_requirements_upload– Retry uploading requirements. -
limit_requirements_upload– Restart CFC if upload attempts exceeded. -
send_init_update_requirements– Re-upload changed requirements. -
address_obtained–get_ecf_address → readyif ECF and CFC connected. -
token_not_valid_address–get_ecf_address → initif CFC not connected. -
retry_get_address– Retry getting address. -
limit_get_address– Restart CFC if address attempts exceeded. -
get_address_update_requirements– Update requirements during address retrieval. -
result_given– Stay inreadystate to offload functions. -
token_not_valid_ready/token_not_valid_ready_2– Re-authenticate if connections lost. -
ready_update_requirements– Upload new requirements. -
ready_update_ecf_address– Update Edge Cluster address.
DeviceRuntimeStateMachine(
config: CognitConfig,
requirements: Scheduling,
call_queue: CallQueue,
sync_result_queue: SyncResultQueue
)Parameters:
-
config(CognitConfig) – Cognit configuration object. -
requirements(Scheduling) – Initial application requirements. -
call_queue(CallQueue) – Queue for function calls. -
sync_result_queue(SyncResultQueue) – Queue for synchronous results.
Authenticates against Cognit Frontend and retrieves a JWT token. Resets counters and timers.
Uploads application requirements to the Cognit Frontend Client. Manages retries and flags for changed requirements.
Obtains the Edge Cluster Frontend address and initializes the ECF client. Manages retry counters.
Processes function offloading. Retrieves calls from the call queue, uploads functions to the DaaS gateway, and executes them synchronously or asynchronously.
Retrieves a new Edge Cluster Frontend address from the Cognit Frontend Client and updates state if changed.
-
is_cfc_connected()– Returns True if Cognit Frontend Client is connected. -
is_ecf_connected()– Returns True if Edge Cluster Frontend Client is connected. -
is_token_empty()– Checks if token is empty. -
is_requirement_upload_limit_reached()– Checks if upload attempt limit reached (3 attempts). -
are_requirements_uploaded()– Returns True if requirements were successfully uploaded. -
is_get_address_limit_reached()– Checks if address request limit reached (3 attempts). -
have_requirements_changed()– Checks if requirements have changed. -
is_new_ecf_address_set()– Checks if a new Edge Cluster Frontend address is set.
Sets new requirements to be uploaded.
Parameters:
-
new_requirements(Scheduling) – Updated application requirements.
Returns: bool – True if requirements were successfully changed, False otherwise.
# Instantiate state machine
sm = DeviceRuntimeStateMachine(config, requirements, call_queue, sync_result_queue)
# Run state machine loop (from handler)
handler.run(interval=0.05)
# Change requirements dynamically
sm.change_requirements(new_requirements)
# Stop execution
handler.stop()Handles the Device Runtime state machine by evaluating current states and performing actions according to Cognit Frontend and Edge Cluster Frontend conditions.
-
logger(CognitLogger) – Logger instance. -
running(bool) – Flag indicating if the state machine is running. -
sm(DeviceRuntimeStateMachine) – The underlying state machine instance.
__init__(config, requirements, call_queue, sync_result_queue)change_requirements(new_requirements: Scheduling) -> boolstop()run(interval=0.05)evaluate_conditions()handle_ready_state()handle_init_state()handle_send_init_request_state()handle_get_ecf_address_state()
StateMachineHandler(config: CognitConfig, requirements: Scheduling, call_queue: CallQueue, sync_result_queue: SyncResultQueue)Parameters:
-
config(CognitConfig) – Cognit configuration object. -
requirements(Scheduling) – Initial application requirements. -
call_queue(CallQueue) – Queue for offloaded function calls. -
sync_result_queue(SyncResultQueue) – Queue for synchronous execution results.
Example:
handler = StateMachineHandler(config, reqs, call_queue, sync_result_queue)Updates the requirements in the Device Runtime.
Parameters:
-
new_requirements(Scheduling) – New application requirements.
Returns: bool – True if requirements were successfully updated.
Stops the state machine loop.
Example:
handler.stop()Runs the state machine in a loop, periodically evaluating conditions.
Parameters:
-
interval(float, optional) – Sleep interval between state evaluations in seconds (default 0.05).
Example:
handler.run(interval=0.1)Evaluates the current state and dispatches to the appropriate handler method.
Handles the ready state logic, including token validation and requirement updates.
Handles the init state logic, managing authentication flow.
Handles the send_init_request state logic, including retrying requirement uploads and managing limits.
Handles the get_ecf_address state logic, obtaining the Edge Cluster Frontend address and managing retries.
handler = StateMachineHandler(config, reqs, call_queue, sync_result_queue)
# Change requirements if needed
handler.change_requirements(new_reqs)
# Start the state machine loop
handler.run(interval=0.05)
# Stop the state machine
handler.stop()Class to interact with the Edge Cluster Frontend.
Handles function execution (sync/async), sending device metrics, and connection status management.
-
logger(CognitLogger) – Logger instance for client operations. -
token(str) – Token used for communication with the Edge Cluster Frontend. -
address(str) – URL of the Edge Cluster Frontend. -
parser(FaasParser) – Used for serializing and deserializing function parameters. -
_has_connection(bool) – Connection status flag.
__init__(token: str, address: str)execute_function(func_id: str, app_req_id: int, exec_mode: ExecutionMode, callback: callable, params_tuple: tuple, timeout: int = 120) -> None | ExecResponseevaluate_response(response: ExecResponse)get_header(token: str) -> dictget_qparams(app_req_id: int, exec_mode: ExecutionMode) -> dictget_serialized_params(params_tuple: tuple) -> listget_has_connection() -> boolset_has_connection(is_connected: bool)
EdgeClusterFrontendClient(token: str, address: str)Parameters:
-
token(str) – Token for authentication with the Edge Cluster Frontend. -
address(str) – Address/URL of the Edge Cluster Frontend.
Example:
client = EdgeClusterFrontendClient(token="mytoken", address="https://edge-cluster.local")execute_function(func_id: str, app_req_id: int, exec_mode: ExecutionMode, callback: callable, params_tuple: tuple, timeout: int = 120) -> None | ExecResponse
Triggers the execution of a function on the Edge Cluster Frontend.
Parameters:
-
func_id(str) – Identifier of the function to execute. -
app_req_id(int) – Identifier of the associated requirements. -
exec_mode(ExecutionMode) – Execution mode (SYNCorASYNC). -
callback(callable) – Function to call after execution (for async mode). -
params_tuple(tuple) – Arguments for the function. -
timeout(int, optional) – HTTP request timeout in seconds (default: 120).
Returns: ExecResponse | None – Returns a result for sync execution, None for async.
Example:
result = client.execute_function("func123", 1, ExecutionMode.SYNC, None, (10, 20))Evaluates an execution response and updates connection status.
Parameters:
-
response(ExecResponse) – Response object to evaluate.
Example:
client.evaluate_response(exec_response)Generates HTTP headers for requests.
Parameters:
-
token(str) – Authentication token.
Returns: dict – Header dictionary.
Example:
header = client.get_header("mytoken")Generates query parameters for requests.
Parameters:
-
app_req_id(int) – Requirements identifier. -
exec_mode(ExecutionMode) – Execution mode (SYNCorASYNC).
Returns: dict – Query parameter dictionary.
Example:
params = client.get_qparams(1, ExecutionMode.SYNC)Serializes function arguments for HTTP requests.
Parameters:
-
params_tuple(tuple) – Function arguments.
Returns: list – Serialized parameters.
Example:
serialized = client.get_serialized_params((1, "test"))Gets the connection status of the client.
Returns: bool – True if connected, False otherwise.
Example:
if client.get_has_connection():
print("Connected")Sets the connection status of the client.
Parameters:
-
is_connected(bool) – New connection status.
Example:
client.set_has_connection(True)client = EdgeClusterFrontendClient(token="mytoken", address="https://edge-cluster.local")
# Synchronous function execution
result = client.execute_function("func123", 1, ExecutionMode.SYNC, None, (10, 20))
print(result)
# Asynchronous function execution
client.execute_function("func123", 1, ExecutionMode.ASYNC, lambda r: print("Async result:", r), (10, 20))
# Check connection
if client.get_has_connection():
print("Client connected")Class to interact with the Cognit Frontend Engine.
Used to manage application requirements, upload functions to the DaaS Gateway, and retrieve Edge Cluster Frontend addresses.
-
config(CognitConfig) – Cognit configuration with valid credentials. -
endpoint(str) – URL of the Cognit Frontend Engine. -
latency_calculator(LatencyCalculator) – Calculates latencies for Edge Cluster Frontends. -
is_max_latency_activated(bool) – Flag to indicate if max latency enforcement is enabled. -
logger(CognitLogger) – Logger for client operations. -
_has_connection(bool) – Connection status flag. -
parser(FaasParser) – Used to serialize/deserialize functions. -
app_req_id(int | None) – Application requirements ID. -
token(str | None) – JWT token for authentication. -
offloaded_funs_hash_map(dict) – Stores uploaded function hashes and their DaaS IDs. -
available_ecfs(list[str]) – List of available Edge Cluster Frontend addresses.
__init__(config: CognitConfig)init(reqs: Scheduling) -> boolget_edge_cluster_frontends_available() -> list[str]_get_edge_cluster_address() -> str_authenticate() -> str_app_req_read() -> Scheduling | None_app_req_delete() -> boolupload_function_to_daas(function: Callable) -> intsend_funtion_to_daas(data: UploadFunctionDaaS) -> int_send_latency_measurements(latencies: str) -> bool_inspect_response(response: req.Response, requestFun: str = "")is_function_uploaded(func_hash: str) -> boolget_header(token: str) -> dictget_has_connection() -> boolset_has_connection(new_value: bool)set_token(token)get_app_requirements_id() -> int
CognitFrontendClient(config: CognitConfig)Parameters:
-
config(CognitConfig) – Cognit configuration containing valid user credentials.
Example:
client = CognitFrontendClient(config=my_config)Authenticates and creates/updates application requirements in the Cognit Frontend Engine.
Parameters:
-
reqs(Scheduling) – Requirements of the application.
Returns: bool – True if initialization was successful (HTTP 200), False otherwise.
Example:
success = client.init(app_requirements)Retrieves the addresses of available Edge Cluster Frontend Engines for the current app requirements.
Returns: list[str] – List of available Edge Cluster Frontend addresses.
Example:
ecfs = client.get_edge_cluster_frontends_available()Gets the address of the Edge Cluster Frontend Engine with the lowest latency, or the first one if max latency is not activated.
Returns: str – Address of the selected Edge Cluster Frontend Engine.
Example:
ecfe = client._get_edge_cluster_address()Authenticates against the Cognit Frontend Engine and retrieves a JWT token.
Returns: str – JWT token if authentication succeeds, None otherwise.
Example:
token = client._authenticate()Reads the application requirements using the current app_req_id.
Returns: Scheduling | None – App requirements or None if an error occurred.
Example:
reqs = client._app_req_read()Deletes the application requirements from the Cognit Frontend Engine.
Returns: bool – True if deletion succeeded (HTTP 204), False otherwise.
Example:
success = client._app_req_delete()Serializes a Python function and uploads it to the DaaS Gateway.
Parameters:
-
function(Callable) – Function to upload.
Returns: int – DaaS function ID if successful, None otherwise.
Example:
func_id = client.upload_function_to_daas(my_function)Uploads a UploadFunctionDaaS object to the DaaS Gateway.
Parameters:
-
data(UploadFunctionDaaS) – Function data to upload.
Returns: int – Function ID if successful, None otherwise.
Example:
func_id = client.send_funtion_to_daas(function_data)Sends latency measurements to the Cognit Frontend Engine.
Parameters:
-
latencies(str) – JSON string of latencies.
Returns: bool – True if request succeeded, False otherwise.
Example:
success = client._send_latency_measurements(latency_json)Logs the response of an HTTP request for debugging purposes.
Parameters:
-
response(req.Response) – Response object. -
requestFun(str, optional) – Name of the request.
Example:
client._inspect_response(response, "init_request")Checks whether a function has already been uploaded to DaaS.
Parameters:
-
func_hash(str) – Hash of the function code.
Returns: bool – True if uploaded, False otherwise.
Example:
uploaded = client.is_function_uploaded(my_func_hash)Returns the HTTP header for requests.
Parameters:
-
token(str) – Authentication token.
Returns: dict – HTTP header.
Example:
header = client.get_header(token)Gets the current connection status.
Returns: bool – True if connected, False otherwise.
Example:
status = client.get_has_connection()Sets the connection status.
Parameters:
-
new_value(bool) – New connection status.
Example:
client.set_has_connection(True)Sets the JWT token for the client.
Parameters:
-
token– JWT token.
Example:
client.set_token(my_token)Returns the current application requirements ID.
Returns: int – app_req_id.
Example:
app_id = client.get_app_requirements_id()client = CognitFrontendClient(config=my_config)
# Initialize app requirements
success = client.init(app_reqs)
# Get available Edge Cluster Frontends
ecfs = client.get_edge_cluster_frontends_available()
# Upload a Python function to DaaS
func_id = client.upload_function_to_daas(my_function)
# Get connection status
connected = client.get_has_connection()
# Delete app requirements
deleted = client._app_req_delete()Class to manage the FIFO queue of functions to be executed.
It ensures thread-safe access to the queue with a mutex and supports size limits.
-
queue(list[Call]) – Internal FIFO list of calls. -
mutex(Lock) – Mutex to ensure thread-safe access. -
size_limit(int) – Maximum number of calls allowed in the queue. -
cognit_logger(CognitLogger) – Logger instance for queue operations.
__init__(size_limit: int = 50)add_call(call: Call) -> boolget_call() -> Call | None__len__() -> int
CallQueue(size_limit: int = 50)Parameters:
-
size_limit(int, optional) – Maximum number of calls allowed in the queue (default:50).
Example:
queue = CallQueue(size_limit=100)Adds a call to the end of the queue.
Parameters:
-
call(Call) – The function call to enqueue.
Returns: bool – True if the call was successfully added, False if the queue is full.
Example:
call = Call(function=my_function, fc_lang="PY", mode="ASYNC", callback=None, params=(1, 2))
queue.add_call(call)Retrieves and removes the first call in the queue.
Returns: Call | None – The next call in the queue, or None if the queue is empty.
Example:
next_call = queue.get_call()
if next_call:
print("Got a call:", next_call)Returns the number of calls currently in the queue.
Returns: int – Current queue length.
Example:
print("Queue length:", len(queue))queue = CallQueue(size_limit=3)
# Add calls
queue.add_call(Call(function=my_function, fc_lang="PY", mode="SYNC", callback=None, params=(10,)))
queue.add_call(Call(function=my_function, fc_lang="PY", mode="ASYNC", callback=None, params=(20,)))
# Get calls
first = queue.get_call()
print("Dequeued:", first)
# Check length
print("Remaining in queue:", len(queue))Class to manage a thread-safe queue for synchronous execution results.
Uses a mutex and condition variable to safely handle a single result at a time.
-
result(ExecResponse | None) – Stores the current result in the queue. -
mutex(Lock) – Mutex to ensure thread-safe access. -
condition(Condition) – Condition variable used to wait for and notify availability of results. -
cognit_logger(CognitLogger) – Logger instance for queue operations.
SyncResultQueue()Example:
sync_queue = SyncResultQueue()Adds a result to the queue. If the queue already contains a result, it will be discarded.
Parameters:
-
result(ExecResponse) – Result to be added to the queue.
Returns: bool – True if the result was added successfully, False if the queue was full.
Example:
sync_queue.add_sync_result(exec_response)Retrieves and removes the result from the queue.
If no result is available, the method will wait until one is added.
Returns: ExecResponse – The result from the queue.
Example:
result = sync_queue.get_sync_result()
print("Received result:", result)sync_queue = SyncResultQueue()
# Producer: add a result
sync_queue.add_sync_result(exec_response)
# Consumer: get the result (will wait if no result is available)
result = sync_queue.get_sync_result()
print("Result received:", result)Class to calculate network latency to edge clusters using simple ping commands.
-
logger(CognitLogger) – Logger instance for latency calculation operations.
__init__()get_simple_cmd_output(cmd, stderr=STDOUT) -> strget_latency_for_clusters(edge_clusters: list) -> dictcalculate(ip: str) -> float
LatencyCalculator()Example:
lat_calc = LatencyCalculator()Executes a simple external command and returns its output.
Parameters:
-
cmd(str) – Command to execute. -
stderr(optional, default:STDOUT) – Where to redirect standard error.
Returns: str – Command output.
Raises: OSError if the command fails to execute.
Example:
output = lat_calc.get_simple_cmd_output("ping -c 1 8.8.8.8")Calculates the latency for a list of edge clusters.
Parameters:
-
edge_clusters(list[str]) – List of edge cluster IPs or URLs.
Returns: dict – Mapping of cluster address to latency in milliseconds. Returns an error dictionary if no valid clusters are found.
Example:
latencies = lat_calc.get_latency_for_clusters(["https://cluster1.local", "http://cluster2.local"])Calculates the latency to a given IP address.
Parameters:
-
ip(str) – IP address to calculate latency for.
Returns: float – Latency in milliseconds, or -1.0 if parsing fails.
Example:
latency = lat_calc.calculate("8.8.8.8")lat_calc = LatencyCalculator()
# Calculate latency for a single IP
latency = lat_calc.calculate("8.8.8.8")
print(f"Latency: {latency} ms")
# Calculate latency for multiple edge clusters
edge_clusters = ["https://cluster1.local", "http://cluster2.local"]
latencies = lat_calc.get_latency_for_clusters(edge_clusters)
print("Latencies:", latencies)Class responsible for serializing Python functions for offloading and deserializing results returned from a serverless runtime.
FaasParser()Example:
parser = FaasParser()Serializes a Python function for offloading. Clears the __globals__ attribute temporarily to avoid sending the global namespace. The function is then cloudpickled and base64 encoded.
Parameters:
-
fc(Callable) – Python function to serialize.
Returns: str – Base64-encoded serialized function.
Example:
serialized_func = parser.serialize(my_function)Deserializes a function or result returned from a serverless runtime. The input should be a base64-encoded cloudpickled string.
Parameters:
-
input(str) – Base64-encoded serialized function/result.
Returns: Any – Deserialized Python object (function, data, etc.).
Example:
result = parser.deserialize(serialized_result)parser = FaasParser()
# Serialize a function
def add(a, b):
return a + b
serialized = parser.serialize(add)
# Deserialize it back
deserialized_add = parser.deserialize(serialized)
print(deserialized_add(2, 3)) # Output: 5Class to manage and load Cognit configuration from a YAML file.
Handles credentials, API endpoints, and other runtime configuration parameters.
-
cf(dict) – Parsed YAML configuration. -
_cognit_frontend_engine_endpoint(str | None) – Cached Cognit Frontend Engine endpoint. -
_cognit_frontend_engine_port(int | None) – Cached Cognit Frontend Engine port. -
_cognit_frontend_engine_usr(str | None) – Cached username for Cognit Frontend Engine. -
_cognit_frontend_engine_pwd(str | None) – Cached password for Cognit Frontend Engine. -
_servl_runt_port(int | None) – Cached serverless runtime port (optional).
__init__(config_path=DEFAULT_CONFIG_PATH)get_prov_context() -> tuplecognit_frontend_engine_endpointcognit_frontend_engine_portcognit_frontend_engine_cfe_usrcognit_frontend_engine_cfe_pwdservl_runt_port
CognitConfig(config_path=DEFAULT_CONFIG_PATH)Parameters:
-
config_path(str, optional) – Path to the YAML configuration file. Defaults to"./examples/cognit-template.yml".
Example:
config = CognitConfig(config_path="./my_config.yml")Returns a tuple with provision context from the configuration.
Returns: tuple – (host, port, pe_usr) from the default configuration.
Example:
host, port, user = config.get_prov_context()Property – Returns the Cognit Frontend Engine endpoint.
Lazy-loaded from the YAML configuration.
Example:
endpoint = config.cognit_frontend_engine_endpointProperty – Returns the Cognit Frontend Engine port.
Lazy-loaded from the YAML configuration.
Example:
port = config.cognit_frontend_engine_portProperty – Returns the Cognit Frontend Engine username.
Example:
user = config.cognit_frontend_engine_cfe_usrProperty – Returns the Cognit Frontend Engine password.
Example:
password = config.cognit_frontend_engine_cfe_pwdProperty – Returns the serverless runtime port (optional, may be removed in the future).
Example:
port = config.servl_runt_portconfig = CognitConfig(config_path="./examples/cognit-template.yml")
# Access endpoint and credentials
endpoint = config.cognit_frontend_engine_endpoint
port = config.cognit_frontend_engine_port
user = config.cognit_frontend_engine_cfe_usr
pwd = config.cognit_frontend_engine_cfe_pwd
# Get provision context
host, port, pe_usr = config.get_prov_context()Utility class that repeatedly executes a callback function at fixed time intervals using a background thread.
Initializes a timer that calls a callback function at regular intervals.
-
interval (
float): Interval in seconds between each callback execution. -
callback (
Callable): The function to be called at each interval.
Starts the timer.
A new thread is launched that periodically executes the callback function.
Stops the timer.
The background thread is terminated gracefully.
Executes the callback function at regular intervals.
This method runs inside the background thread and should not be called directly.
import time
def hello():
print("Hello, world!")
# Create a timer that calls `hello` every 2 seconds
timer = CallbackTimer(2.0, hello)
timer.start()
time.sleep(6) # Let it run a few times
timer.stop()