Skip to content

Commit 75496fe

Browse files
Adding services as interval-driven functions
1 parent 70a8ec3 commit 75496fe

File tree

3 files changed

+136
-23
lines changed

3 files changed

+136
-23
lines changed

include/taskr/common.hpp

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,4 @@ typedef HiCR::tasking::uniqueId_t taskId_t;
5858
*/
5959
typedef ssize_t workerId_t;
6060

61-
/**
62-
* The type of a service
63-
*/
64-
typedef std::function<void()> service_t;
65-
6661
} // namespace taskr

include/taskr/runtime.hpp

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
#include "task.hpp"
4242
#include "taskImpl.hpp"
4343
#include "worker.hpp"
44+
#include "service.hpp"
4445

4546
namespace taskr
4647
{
@@ -81,7 +82,6 @@ struct ThreadIndices
8182
size_t finished;
8283
};
8384

84-
8585
/**
8686
* Enumeration of states in which the TaskR runtime can be in
8787
*/
@@ -149,7 +149,7 @@ class Runtime
149149

150150
// Creating internal tasks
151151
_commonReadyTaskQueue = std::make_unique<HiCR::concurrent::Queue<taskr::Task>>(__TASKR_DEFAULT_MAX_COMMON_ACTIVE_TASKS);
152-
_serviceQueue = std::make_unique<HiCR::concurrent::Queue<taskr::service_t>>(__TASKR_DEFAULT_MAX_SERVICES);
152+
_serviceQueue = std::make_unique<HiCR::concurrent::Queue<taskr::Service>>(__TASKR_DEFAULT_MAX_SERVICES);
153153

154154
// Setting task callback functions
155155
_hicrTaskCallbackMap.setCallback(HiCR::tasking::Task::callback_t::onTaskExecute, [this](HiCR::tasking::Task *task) { this->onTaskExecuteCallback(task); });
@@ -440,7 +440,7 @@ class Runtime
440440
*
441441
* @param[in] service The service (function) to add
442442
*/
443-
__INLINE__ void addService(taskr::service_t *service) { _serviceQueue->push(service); }
443+
__INLINE__ void addService(taskr::Service *service) { _serviceQueue->push(service); }
444444

445445
/**
446446
* Funtion to force termination in case the application does not have its own termination logic
@@ -449,6 +449,21 @@ class Runtime
449449

450450
private:
451451

452+
__INLINE__ void tryRunService(Service* service)
453+
{
454+
// Checking if service is active (or inactive -- waiting for its wait interval to pass)
455+
if (service->isActive())
456+
{
457+
// TraCR set trace of thread executing a service
458+
#ifdef ENABLE_INSTRUMENTATION
459+
INSTRUMENTATION_THREAD_MARK_SET(thread_idx.exec_serv);
460+
#endif
461+
462+
// Now run service
463+
service->run();
464+
}
465+
}
466+
452467
__INLINE__ taskr::Task *serviceWorkerLoop(const workerId_t serviceWorkerId)
453468
{
454469
#ifdef ENABLE_INSTRUMENTATION
@@ -471,13 +486,8 @@ class Runtime
471486
// If found run it, and put it back into the queue
472487
if (service != nullptr)
473488
{
474-
#ifdef ENABLE_INSTRUMENTATION
475-
// TraCR set trace of thread executing a service
476-
INSTRUMENTATION_THREAD_MARK_SET(thread_idx.exec_serv);
477-
#endif
478-
479-
// Running service
480-
(*service)();
489+
// Try to run service
490+
tryRunService(service);
481491

482492
// Putting it back into the queue
483493
_serviceQueue->push(service);
@@ -525,13 +535,8 @@ class Runtime
525535
// If found run it, and put it back into the queue
526536
if (service != nullptr)
527537
{
528-
#ifdef ENABLE_INSTRUMENTATION
529-
// TraCR set trace of thread executing a service
530-
INSTRUMENTATION_THREAD_MARK_SET(thread_idx.exec_serv);
531-
#endif
532-
533-
// Running service
534-
(*service)();
538+
// Try to run service
539+
tryRunService(service);
535540

536541
// Putting it back into the queue
537542
_serviceQueue->push(service);
@@ -832,7 +837,7 @@ class Runtime
832837
/**
833838
* Common lock-free queue for services.
834839
*/
835-
std::unique_ptr<HiCR::concurrent::Queue<taskr::service_t>> _serviceQueue;
840+
std::unique_ptr<HiCR::concurrent::Queue<taskr::Service>> _serviceQueue;
836841

837842
//////// Configuration Elements
838843

include/taskr/service.hpp

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Copyright 2025 Huawei Technologies Co., Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
/**
18+
* @file task.hpp
19+
* @brief This file implements the TaskR task class
20+
* @author Sergio Martin
21+
* @date 29/7/2024
22+
*/
23+
24+
#pragma once
25+
26+
#include <chrono>
27+
#include <hicr/frontends/tasking/common.hpp>
28+
#include "common.hpp"
29+
#include "task.hpp"
30+
31+
namespace taskr
32+
{
33+
34+
/**
35+
* This class defines a TaskR
36+
*
37+
* This is a function that is executed with a given frequency and is useful to detect asynchronous events, such as incoming messages
38+
*
39+
* Services are picked up by any free task / service worker and executed. While a service is executed, no other worker can execute it in parallel.
40+
*
41+
* Services do not represent the critical path of an application. Their presence not preclude taskR from finished when there are no tasks left to execute.
42+
*/
43+
class Service
44+
{
45+
public:
46+
47+
/**
48+
* The type of a service function
49+
*/
50+
typedef std::function<void()> serviceFc_t;
51+
52+
Service() = delete;
53+
virtual ~Service() = default;
54+
55+
/**
56+
* Constructor for the TaskR task class. It requires a user-defined function to execute
57+
* The task is considered finished when the function runs to completion.
58+
*
59+
* @param[in] fc Specifies the TaskR-formatted function to use
60+
* @param[in] interval The minimum interval in ms between two executions of the service. Specify 0 for no minimum interval.
61+
*/
62+
Service(serviceFc_t fc, const size_t interval) : _fc(fc), _interval(interval) { }
63+
64+
/**
65+
* This function indicates whether the minimum interval has passed between the last execution.
66+
*
67+
* @return If the interval has passed, then it is considered active (true) and should be executed. Otherwise, it should return false.
68+
*
69+
*/
70+
__INLINE__ bool isActive() const
71+
{
72+
const auto timeElapsedSinceLastExecution = (double)std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - _lastExecutionTime).count();
73+
if (timeElapsedSinceLastExecution >= _interval) return true;
74+
return false;
75+
}
76+
77+
/**
78+
* This function runs the underlying service function
79+
*
80+
* It updates the last time the service ran, after executing it (so that actual run time does not count towards the waiting interval)
81+
*
82+
*/
83+
__INLINE__ void run()
84+
{
85+
_fc();
86+
_lastExecutionTime = std::chrono::high_resolution_clock::now();
87+
}
88+
89+
90+
private:
91+
92+
/**
93+
* Abstract definition of a time point
94+
*/
95+
typedef std::chrono::high_resolution_clock::time_point timePoint_t;
96+
97+
/**
98+
* Function for the service to execute
99+
*/
100+
const serviceFc_t _fc;
101+
102+
/**
103+
* The interval between consecutive executions
104+
*/
105+
const size_t _interval;
106+
107+
/**
108+
* Time point storing the last time this service was executed
109+
*/
110+
timePoint_t _lastExecutionTime{};
111+
}; // class Service
112+
113+
} // namespace taskr

0 commit comments

Comments
 (0)