|
3 | 3 | // Distributed under the GPL v3 License, see LICENSE.notice.txt |
4 | 4 |
|
5 | 5 | #include "runtime-light/component/component.h" |
| 6 | + |
| 7 | +#include <chrono> |
| 8 | +#include <cstdint> |
| 9 | +#include <utility> |
| 10 | + |
| 11 | +#include "runtime-core/utils/kphp-assert-core.h" |
6 | 12 | #include "runtime-light/core/globals/php-init-scripts.h" |
| 13 | +#include "runtime-light/header.h" |
| 14 | +#include "runtime-light/scheduler/scheduler.h" |
| 15 | +#include "runtime-light/utils/context.h" |
7 | 16 |
|
8 | | -void ComponentState::resume_if_was_rescheduled() { |
9 | | - if (poll_status == PollStatus::PollReschedule) { |
10 | | - // If component was suspended by please yield and there is no awaitable streams |
11 | | - main_thread(); |
12 | | - } |
| 17 | +void ComponentState::init_script_execution() noexcept { |
| 18 | + kphp_core_context.init(); |
| 19 | + init_php_scripts_in_each_worker(php_script_mutable_globals_singleton, main_task); |
| 20 | + scheduler.suspend(std::make_pair(main_task.get_handle(), WaitEvent::Rechedule{})); |
13 | 21 | } |
14 | 22 |
|
15 | | -bool ComponentState::is_stream_already_being_processed(uint64_t stream_d) { |
16 | | - return opened_streams.contains(stream_d); |
17 | | -} |
| 23 | +void ComponentState::process_platform_updates() noexcept { |
| 24 | + const auto &platform_ctx{*get_platform_context()}; |
| 25 | + |
| 26 | + for (;;) { |
| 27 | + // check if platform asked for yield |
| 28 | + if (static_cast<bool>(platform_ctx.please_yield.load())) { // tell the scheduler that we are about to yield |
| 29 | + php_debug("platform asked for yield"); |
| 30 | + const auto schedule_status{scheduler.schedule(ScheduleEvent::Yield{})}; |
| 31 | + poll_status = schedule_status == ScheduleStatus::Error ? PollStatus::PollFinishedError : PollStatus::PollReschedule; |
| 32 | + return; |
| 33 | + } |
18 | 34 |
|
19 | | -void ComponentState::resume_if_wait_stream(uint64_t stream_d, StreamStatus status) { |
20 | | - if (is_stream_timer(stream_d)) { |
21 | | - process_timer(stream_d); |
22 | | - } else { |
23 | | - process_stream(stream_d, status); |
| 35 | + // try taking update from the platform |
| 36 | + if (uint64_t stream_d{}; static_cast<bool>(platform_ctx.take_update(std::addressof(stream_d)))) { |
| 37 | + if (opened_streams_.contains(stream_d)) { // update on opened stream |
| 38 | + php_debug("took update on stream %" PRIu64, stream_d); |
| 39 | + switch (scheduler.schedule(ScheduleEvent::UpdateOnStream{.stream_d = stream_d})) { |
| 40 | + case ScheduleStatus::Resumed: { // scheduler's resumed a coroutine waiting for update |
| 41 | + break; |
| 42 | + } |
| 43 | + case ScheduleStatus::Skipped: { // no one is waiting for the event yet, so just save it |
| 44 | + pending_updates_.insert(stream_d); |
| 45 | + break; |
| 46 | + } |
| 47 | + case ScheduleStatus::Error: { // something bad's happened, stop execution |
| 48 | + poll_status = PollStatus::PollFinishedError; |
| 49 | + return; |
| 50 | + } |
| 51 | + } |
| 52 | + } else { // update on incoming stream |
| 53 | + php_debug("got new incoming stream %" PRIu64, stream_d); |
| 54 | + if (standard_stream_ != INVALID_PLATFORM_DESCRIPTOR) { |
| 55 | + php_warning("skip new incoming stream since previous one is not closed"); |
| 56 | + release_stream(stream_d); |
| 57 | + continue; |
| 58 | + } // TODO: multiple incoming streams (except for http queries) |
| 59 | + standard_stream_ = stream_d; |
| 60 | + incoming_streams_.push_back(stream_d); |
| 61 | + opened_streams_.insert(stream_d); |
| 62 | + if (const auto schedule_status{scheduler.schedule(ScheduleEvent::IncomingStream{.stream_d = stream_d})}; schedule_status == ScheduleStatus::Error) { |
| 63 | + poll_status = PollStatus::PollFinishedError; |
| 64 | + return; |
| 65 | + } |
| 66 | + } |
| 67 | + } else { // we'are out of updates so let the scheduler do whatever it wants |
| 68 | + switch (scheduler.schedule(ScheduleEvent::NoEvent{})) { |
| 69 | + case ScheduleStatus::Resumed: { // scheduler's resumed some coroutine, so let's continue scheduling |
| 70 | + break; |
| 71 | + } |
| 72 | + case ScheduleStatus::Skipped: { // scheduler's done nothing, so it's either scheduled all coroutines or is waiting for events |
| 73 | + poll_status = scheduler.done() ? PollStatus::PollFinishedOk : PollStatus::PollBlocked; |
| 74 | + return; |
| 75 | + } |
| 76 | + case ScheduleStatus::Error: { // something bad's happened, stop execution |
| 77 | + poll_status = PollStatus::PollFinishedError; |
| 78 | + return; |
| 79 | + } |
| 80 | + } |
| 81 | + } |
24 | 82 | } |
| 83 | + // unreachable code |
| 84 | + poll_status = PollStatus::PollFinishedError; |
25 | 85 | } |
26 | 86 |
|
27 | | -void ComponentState::process_new_input_stream(uint64_t stream_d) { |
28 | | - bool already_pending = std::find(incoming_pending_queries.begin(), incoming_pending_queries.end(), stream_d) != incoming_pending_queries.end(); |
29 | | - if (!already_pending) { |
30 | | - php_debug("got new pending query %lu", stream_d); |
31 | | - incoming_pending_queries.push_back(stream_d); |
32 | | - } |
33 | | - if (wait_incoming_stream) { |
34 | | - php_debug("start process pending query %lu", stream_d); |
35 | | - main_thread(); |
| 87 | +uint64_t ComponentState::take_incoming_stream() noexcept { |
| 88 | + if (incoming_streams_.empty()) { |
| 89 | + php_warning("can't take incoming stream cause we don't have them"); |
| 90 | + return INVALID_PLATFORM_DESCRIPTOR; |
36 | 91 | } |
| 92 | + const auto stream_d{incoming_streams_.front()}; |
| 93 | + incoming_streams_.pop_front(); |
| 94 | + php_debug("take incoming stream %" PRIu64, stream_d); |
| 95 | + return stream_d; |
37 | 96 | } |
38 | 97 |
|
39 | | -void ComponentState::init_script_execution() { |
40 | | - kphp_core_context.init(); |
41 | | - init_php_scripts_in_each_worker(php_script_mutable_globals_singleton, k_main); |
42 | | - main_thread = k_main.get_handle(); |
| 98 | +uint64_t ComponentState::open_stream(const string &component_name) noexcept { |
| 99 | + uint64_t stream_d{}; |
| 100 | + if (const auto open_stream_res{get_platform_context()->open(component_name.size(), component_name.c_str(), std::addressof(stream_d))}; |
| 101 | + open_stream_res != OpenStreamResult::OpenStreamOk) { |
| 102 | + php_warning("can't open stream to %s", component_name.c_str()); |
| 103 | + return INVALID_PLATFORM_DESCRIPTOR; |
| 104 | + } |
| 105 | + opened_streams_.insert(stream_d); |
| 106 | + php_debug("opened a stream %" PRIu64 " to %s", stream_d, component_name.c_str()); |
| 107 | + return stream_d; |
43 | 108 | } |
44 | 109 |
|
45 | | -bool ComponentState::is_stream_timer(uint64_t stream_d) { |
46 | | - return timer_callbacks.contains(stream_d); |
| 110 | +uint64_t ComponentState::set_timer(std::chrono::nanoseconds duration) noexcept { |
| 111 | + uint64_t timer_d{}; |
| 112 | + if (const auto set_timer_res{get_platform_context()->set_timer(std::addressof(timer_d), static_cast<uint64_t>(duration.count()))}; |
| 113 | + set_timer_res != SetTimerResult::SetTimerOk) { |
| 114 | + php_warning("can't set timer for %.9f sec", std::chrono::duration<double>(duration).count()); |
| 115 | + return INVALID_PLATFORM_DESCRIPTOR; |
| 116 | + } |
| 117 | + opened_streams_.insert(timer_d); |
| 118 | + php_debug("set timer %" PRIu64 " for %.9f sec", timer_d, std::chrono::duration<double>(duration).count()); |
| 119 | + return timer_d; |
47 | 120 | } |
48 | 121 |
|
49 | | -void ComponentState::process_timer(uint64_t stream_d) { |
| 122 | +void ComponentState::release_stream(uint64_t stream_d) noexcept { |
| 123 | + if (stream_d == standard_stream_) { |
| 124 | + standard_stream_ = INVALID_PLATFORM_DESCRIPTOR; |
| 125 | + } |
| 126 | + opened_streams_.erase(stream_d); |
| 127 | + pending_updates_.erase(stream_d); // also erase pending updates if exists |
50 | 128 | get_platform_context()->free_descriptor(stream_d); |
51 | | - timer_callbacks[stream_d](); |
52 | | - timer_callbacks.erase(stream_d); |
53 | | - opened_streams.erase(stream_d); |
| 129 | + php_debug("released a stream %" PRIu64, stream_d); |
54 | 130 | } |
55 | 131 |
|
56 | | -void ComponentState::process_stream(uint64_t stream_d, StreamStatus status) { |
57 | | - auto expected_status = opened_streams[stream_d]; |
58 | | - if ((expected_status == StreamRuntimeStatus::WBlocked && status.write_status != IOBlocked) |
59 | | - || (expected_status == StreamRuntimeStatus::RBlocked && status.read_status != IOBlocked)) { |
60 | | - php_debug("resume on waited query %lu", stream_d); |
61 | | - auto suspend_point = awaiting_coroutines[stream_d]; |
62 | | - awaiting_coroutines.erase(stream_d); |
63 | | - php_assert(awaiting_coroutines.empty()); |
64 | | - suspend_point(); |
| 132 | +void ComponentState::release_all_streams() noexcept { |
| 133 | + const auto &platform_ctx{*get_platform_context()}; |
| 134 | + standard_stream_ = INVALID_PLATFORM_DESCRIPTOR; |
| 135 | + for (const auto stream_d : opened_streams_) { |
| 136 | + platform_ctx.free_descriptor(stream_d); |
| 137 | + php_debug("released a stream %" PRIu64, stream_d); |
65 | 138 | } |
| 139 | + opened_streams_.clear(); |
| 140 | + pending_updates_.clear(); |
| 141 | + incoming_streams_.clear(); |
66 | 142 | } |
0 commit comments