2424
2525namespace kphp ::component::inter_component_session {
2626
27- // Inter component client for interactions over a stream in a client-server manner
27+ // The client for inter-component communication over a stream in a client-server manner
2828class client final {
2929 using query_id_type = uint64_t ;
3030 using query2notifier_type = kphp::stl::map<query_id_type, kphp::coro::event, kphp::memory::script_allocator>;
@@ -51,10 +51,10 @@ class client final {
5151 kphp::stl::map<query_id_type, int32_t , kphp::memory::script_allocator> req_status{};
5252
5353 auto process_write (shared_transport_type t, query_id_type qid, std::span<const std::byte> payload) noexcept -> kphp::coro::task<void> {
54- // Ensure that readiness waiter is presented
54+ // Ensure that the notifier is present
5555 kphp::log::assertion (req_finish_notifier.contains (qid));
5656
57- // Wait until transport will be available
57+ // Wait until transport is available
5858 if (is_occupied) [[unlikely]] {
5959 transport_readiness_notifier.emplace (qid, kphp::coro::event{});
6060 queue.push (qid);
@@ -76,7 +76,7 @@ class client final {
7676 co_return ;
7777 }
7878
79- // Ensure the transport we still held by us
79+ // Ensure the transport is still held by this write
8080 kphp::log::assertion (is_occupied && occupied_by == qid);
8181
8282 // Write payload
@@ -86,13 +86,13 @@ class client final {
8686 co_return ;
8787 }
8888
89- // Double check
89+ // Double- check transport ownership
9090 kphp::log::assertion (is_occupied && occupied_by == qid);
9191
9292 // Release transport
9393 is_occupied = false ;
9494
95- // Notify next one that transport is available
95+ // Notify the next waiting query that the transport is available
9696 if (!queue.empty ()) {
9797 auto q{queue.front ()};
9898 queue.pop ();
@@ -107,7 +107,7 @@ class client final {
107107 auto write (shared_transport_type t, query_id_type qid, std::span<const std::byte> payload) noexcept -> kphp::coro::task<std::expected<void, int32_t>> {
108108 req_finish_notifier.emplace (qid, kphp::coro::event{});
109109
110- // The protocol design assumes that interrupting the transfer in the middle of frame headers leads to critical error.
110+ // The protocol design assumes that interrupting the transfer in the middle of a frame leads to critical error.
111111 // Therefore, we need to write the request in a separate coroutine.
112112 // This technique prevents integrity violations when this coroutine is cancelled.
113113 kphp::coro::io_scheduler::get ().start (process_write (t, qid, payload));
@@ -133,7 +133,7 @@ class client final {
133133 using shared_ctx_type = class_instance<refcountable_ctx_type>;
134134
135135 // The following reader state is intended to be initialized once for a new client.
136- // It is assumed that "copying" (ref count increasing ) will be the common case, rather than moving.
136+ // It is assumed that "copying" (ref count increase ) will be the common case, rather than moving.
137137 shared_ctx_type ctx;
138138 kphp::coro::shared_task<void > interrupter;
139139 kphp::coro::shared_task<void > runner;
@@ -151,7 +151,7 @@ class client final {
151151 while (!ctx.get ()->interrupted .is_set ()) {
152152 // Read response header or interrupt
153153 auto read_header_res{co_await kphp::coro::when_any (t.get ()->stream .read (resp_header_buf), interrupter)};
154- // Interrupt is happened
154+ // An interrupt has occurred
155155 if (std::holds_alternative<kphp::coro::void_value>(read_header_res)) [[unlikely]] {
156156 kphp::log::debug (" reader has been interrupted" );
157157 break ;
@@ -171,15 +171,15 @@ class client final {
171171 const auto size{resp_header.size .value };
172172 kphp::log::debug (" read {} bytes for query #{} " , size, qid);
173173
174- // Ensure that buffer for response can be provided
174+ // Ensure that buffer for response is provided
175175 auto & buffer_providers{ctx.get ()->query2resp_buffer_provider };
176176 auto it_buffer_provider{buffer_providers.find (qid)};
177177
178- // Response provider is not presented => read response into dummy buffer, just for keeping of consistency
178+ // Response provider is not present => read response into dummy buffer, just to maintain consistency
179179 if (it_buffer_provider == buffer_providers.end ()) {
180180 kphp::stl::vector<std::byte, kphp::memory::script_allocator> sink_buffer{size};
181181 std::span<std::byte> sink_resp{sink_buffer.data (), sink_buffer.size ()};
182- kphp::log::debug (" response buffer provider hasn't been presented for query #{}, read response into dummy buffer" , qid);
182+ kphp::log::debug (" response buffer provider is not present for query #{}, read response into dummy buffer" , qid);
183183 // Read dummy payload
184184 if (auto res{co_await t.get ()->stream .read (sink_resp)}; !res) [[unlikely]] {
185185 kphp::log::warning (" an error occurred while reading the payload from a stream: {}" , res.error ());
@@ -189,7 +189,7 @@ class client final {
189189 continue ;
190190 }
191191
192- // Response provider is presented => make an appropriate buffer's slice for a response
192+ // Response provider is present => make an appropriate buffer slice for the response
193193 auto resp{it_buffer_provider->second (size)};
194194 ctx.get ()->query2resp [qid] = resp;
195195
@@ -202,31 +202,31 @@ class client final {
202202 kphp::log::debug (" resp buffer first byte: {} Resp buffer last byte: {} " , static_cast <uint8_t >(*std::next (resp.begin (), 0 )),
203203 static_cast <uint8_t >(*std::next (resp.begin (), resp.size () - 1 )));
204204
205- // Ensure that notifier is presented and notify
205+ // Ensure that notifier is present and notify
206206 kphp::log::assertion (ctx.get ()->resp_finish_notifier .contains (qid));
207207 ctx.get ()->resp_finish_notifier [qid].set ();
208208 }
209209
210- // Error occurred, notify all waiting queries
210+ // An error occurred, notify all waiting queries
211211 for (auto & [_, notifier] : ctx.get ()->resp_finish_notifier ) {
212212 if (!notifier.is_set ()) [[unlikely]] {
213213 notifier.set ();
214214 }
215215 }
216216 }
217217
218- // Dummy routine for waiting until an interrupting (stopping ) event will happen
218+ // Dummy routine for waiting until an interrupt (stop ) event occurs
219219 static auto wait_until_interrupt (shared_ctx_type ctx) noexcept -> kphp::coro::shared_task<void> {
220220 co_await ctx.get ()->interrupted ;
221221 co_return ;
222222 }
223223
224- // Semantics of this method is considering tha state will be changed. That's why it is not marked as `const`
224+ // Semantics of this method considers that the state will be changed. That is why it is not marked as `const`
225225 auto register_query (query_id_type qid, details::function_wrapper<std::span<std::byte>, size_t >&& buffer_provider) noexcept -> void { // NOLINT
226- // We wouldn't read a response twice
226+ // We do not read a response twice
227227 kphp::log::assertion (ctx.get ()->resp_finish_notifier .contains (qid) == false ); // NOLINT
228228
229- // Register provider of storage for a response
229+ // Register storage provider for a response
230230 ctx.get ()->query2resp_buffer_provider .emplace (qid, std::move (buffer_provider));
231231
232232 // Register notifier
@@ -238,24 +238,25 @@ class client final {
238238 : transport(make_instance<refcountable_transport_type>(refcountable_transport_type{.stream = std::move (s)})),
239239 reader({make_instance<reader::refcountable_ctx_type>(), transport}) {
240240 auto & scheduler{kphp::coro::io_scheduler::get ()};
241- // Interrupter needs for immediately stopping the reader in the of client's life
241+ // Interrupter is needed to immediately stop the reader at the end of the client's lifetime
242242 scheduler.start (reader.interrupter );
243- // Run reader as "service"
243+ // Run reader as a "service"
244244 scheduler.start (reader.runner );
245245 }
246246
247247public:
248248 ~client () {
249- // If client has been moved, skip disabling the reader.
249+ // If client has been moved, skip shutting down the reader.
250250 // Otherwise, shut down the reader.
251251 if (query_count != std::numeric_limits<query_id_type>::max ()) {
252252 reader.ctx .get ()->interrupted .set ();
253253 }
254254 }
255255
256- // Designed that `transport` and `reader` will be allocated once as refcountable class instance.
257- // Moving looks like copying but is simply reference count increasing for 'transport' and 'reader' fields.
258- // Such approach motivated by the fact that the "reader-service" cannot be easily moved due to depends on transport and cannot be trivial stopped.
256+ // 'transport' and 'reader' are designed to be allocated once as refcountable class instances.
257+ // Moving is similar to copying, but simply increases the reference count for the 'transport' and 'reader' fields.
258+ // This approach is motivated by the fact that the 'reader-service' cannot be easily moved because the 'reader-service' depends on the transport and cannot be
259+ // easily stopped.
259260 client (client&& other) noexcept
260261 : transport(other.transport), // NOLINT // Intentionally call of copy constructor for shared transport
261262 query_count (std::exchange(other.query_count, std::numeric_limits<query_id_type>::max())),
@@ -292,7 +293,7 @@ inline auto client::create(std::string_view component_name) noexcept -> std::exp
292293 if (!expected) [[unlikely]] {
293294 return std::unexpected{expected.error ()};
294295 }
295- // Create and move stream into a session
296+ // Create and move the stream into the session
296297 return std::expected<client, int32_t >{client{std::move (*expected)}};
297298}
298299
@@ -301,20 +302,20 @@ requires std::is_convertible_v<std::invoke_result_t<B, size_t>, std::span<std::b
301302 std::same_as<std::invoke_result_t <R, std::span<std::byte>>, client::response_readiness>
302303auto client::query (std::span<const std::byte> request, B response_buffer_provider,
303304 R response_handler) noexcept -> kphp::coro::task<std::expected<void, int32_t>> {
304- // If previously any readers' error has been occurred
305+ // If any reader error has previously occurred
305306 if (reader.ctx .get () == nullptr ) [[unlikely]] {
306307 co_return std::unexpected (k2::errno_eshutdown);
307308 }
308309
309310 kphp::log::assertion (query_count < std::numeric_limits<query_id_type>::max ());
310311 const auto query_id{query_count++};
311312
312- // Ensure that query will be invalidated after occasionally cancellation
313+ // Ensure that query will be invalidated after accidental cancellation
313314 const vk::final_action finalizer{[reader_ctx = reader.ctx , &query_id] noexcept { reader_ctx.get ()->query2resp_buffer_provider .erase (query_id); }};
314315
315- // Register a new query and send request
316+ // Register a new query and send the request
316317 reader.register_query (query_id, details::function_wrapper<std::span<std::byte>, size_t >{std::move (response_buffer_provider)});
317- kphp::log::debug (" client create query #{}" , query_id);
318+ kphp::log::debug (" client creates query #{}" , query_id);
318319 if (auto res{co_await writer.write (transport, query_id, request)}; !res) [[unlikely]] {
319320 co_return std::move (res);
320321 }
@@ -323,25 +324,25 @@ auto client::query(std::span<const std::byte> request, B response_buffer_provide
323324 auto response_readiness_status{response_readiness::pending};
324325
325326 kphp::log::debug (" client now is reading responses for query #{}" , query_id);
326- // Wait a new response until handler returns false
327+ // Wait for a new response until the handler returns ready
327328 while (response_readiness_status != response_readiness::ready) {
328- // Suspend on response notifier
329+ // Suspend on the response notifier
329330 co_await reader.ctx .get ()->resp_finish_notifier [query_id];
330331
331- // First of all, turn off notifier
332+ // First, unset the notifier
332333 reader.ctx .get ()->resp_finish_notifier [query_id].unset ();
333334
334- // If reader has been interrupted do not invoke handler and finish normally
335+ // If reader has been interrupted do not invoke the handler and finish normally
335336 if (reader.ctx .get ()->interrupted .is_set ()) {
336337 co_return std::expected<void , int32_t >{};
337338 }
338339
339- // If any readers' error has been occurred
340+ // If any reader error has occurred
340341 if (reader.ctx .get ()->error ) [[unlikely]] {
341342 co_return std::unexpected (*(reader.ctx .get ()->error ));
342343 }
343344
344- // Invoke handler and pass response slice
345+ // Invoke handler and pass the response slice
345346 response_readiness_status = std::invoke (std::move (response_handler), reader.ctx .get ()->query2resp [query_id]);
346347 }
347348 co_return std::expected<void , int32_t >{};
0 commit comments