Skip to content

Commit d3bcdbf

Browse files
Fix the data race in TaskSink class (#1091)
The data race could happen when the instance is destroyed while the a new task is being added and already passed the check condition. Switch StreamLayerClientImpl, CatalogClientImpl to TaskSink. Remove ExecuteOrSchedule helpers. Resolves: OLPSUP-12083 Signed-off-by: Mykhailo Kuchma <[email protected]>
1 parent f76f40a commit d3bcdbf

File tree

10 files changed

+126
-184
lines changed

10 files changed

+126
-184
lines changed

olp-cpp-sdk-dataservice-read/src/CatalogClientImpl.cpp

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828

2929
#include "Common.h"
3030
#include "repositories/CatalogRepository.h"
31-
#include "repositories/ExecuteOrSchedule.inl"
3231

3332
namespace olp {
3433
namespace dataservice {
@@ -42,25 +41,17 @@ CatalogClientImpl::CatalogClientImpl(client::HRN catalog,
4241
client::OlpClientSettings settings)
4342
: catalog_(std::move(catalog)),
4443
settings_(std::move(settings)),
45-
lookup_client_(catalog_, settings_) {
44+
lookup_client_(catalog_, settings_),
45+
task_sink_(settings_.task_scheduler) {
4646
if (!settings_.cache) {
4747
settings_.cache = client::OlpClientSettingsFactory::CreateDefaultCache({});
4848
}
49-
50-
// to avoid capturing task scheduler inside a task, we need a copy of settings
51-
// without the scheduler
52-
task_scheduler_ = std::move(settings_.task_scheduler);
53-
54-
pending_requests_ = std::make_shared<client::PendingRequests>();
55-
}
56-
57-
CatalogClientImpl::~CatalogClientImpl() {
58-
pending_requests_->CancelAllAndWait();
5949
}
6050

6151
bool CatalogClientImpl::CancelPendingRequests() {
6252
OLP_SDK_LOG_TRACE(kLogTag, "CancelPendingRequests");
63-
return pending_requests_->CancelAll();
53+
task_sink_.CancelTasks();
54+
return true;
6455
}
6556

6657
client::CancellationToken CatalogClientImpl::GetCatalog(
@@ -77,8 +68,8 @@ client::CancellationToken CatalogClientImpl::GetCatalog(
7768
return repository.GetCatalog(request, std::move(context));
7869
};
7970

80-
return AddTask(task_scheduler_, pending_requests_,
81-
std::move(get_catalog_task), std::move(callback));
71+
return task_sink_.AddTask(std::move(get_catalog_task), std::move(callback),
72+
thread::NORMAL);
8273
};
8374

8475
return ScheduleFetch(std::move(schedule_get_catalog), std::move(request),
@@ -111,8 +102,8 @@ client::CancellationToken CatalogClientImpl::GetLatestVersion(
111102
return repository.GetLatestVersion(request, std::move(context));
112103
};
113104

114-
return AddTask(task_scheduler_, pending_requests_,
115-
std::move(get_latest_version_task), std::move(callback));
105+
return task_sink_.AddTask(std::move(get_latest_version_task),
106+
std::move(callback), thread::NORMAL);
116107
};
117108

118109
return ScheduleFetch(std::move(schedule_get_latest_version),
@@ -142,8 +133,8 @@ client::CancellationToken CatalogClientImpl::ListVersions(
142133
return repository.GetVersionsList(request, std::move(context));
143134
};
144135

145-
return AddTask(task_scheduler_, pending_requests_,
146-
std::move(versions_list_task), std::move(callback));
136+
return task_sink_.AddTask(std::move(versions_list_task), std::move(callback),
137+
thread::NORMAL);
147138
}
148139

149140
client::CancellableFuture<VersionsResponse> CatalogClientImpl::ListVersions(

olp-cpp-sdk-dataservice-read/src/CatalogClientImpl.h

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
#include <olp/dataservice/read/Types.h>
3232
#include <olp/dataservice/read/VersionsRequest.h>
3333

34+
#include "TaskSink.h"
35+
3436
namespace olp {
3537
namespace client {
3638
class OlpClient;
@@ -48,8 +50,6 @@ class CatalogClientImpl final {
4850
public:
4951
CatalogClientImpl(client::HRN catalog, client::OlpClientSettings settings);
5052

51-
~CatalogClientImpl();
52-
5353
bool CancelPendingRequests();
5454

5555
client::CancellationToken GetCatalog(CatalogRequest request,
@@ -72,9 +72,8 @@ class CatalogClientImpl final {
7272
private:
7373
client::HRN catalog_;
7474
client::OlpClientSettings settings_;
75-
std::shared_ptr<thread::TaskScheduler> task_scheduler_;
76-
std::shared_ptr<client::PendingRequests> pending_requests_;
7775
client::ApiLookupClient lookup_client_;
76+
TaskSink task_sink_;
7877
};
7978

8079
} // namespace read

olp-cpp-sdk-dataservice-read/src/Common.h

Lines changed: 0 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
#include <olp/core/client/PendingRequests.h>
2424
#include <olp/core/thread/TaskScheduler.h>
2525
#include <olp/dataservice/read/FetchOptions.h>
26-
#include "repositories/ExecuteOrSchedule.inl"
2726

2827
namespace olp {
2928
namespace dataservice {
@@ -62,59 +61,6 @@ inline client::CancellationToken ScheduleFetch(TaskScheduler&& schedule_task,
6261
std::forward<Callback>(callback));
6362
}
6463

65-
/*
66-
* @brief Common function used to wrap a lambda function and a callback that
67-
* consumes the function result with a TaskContext class and schedule this to a
68-
* task scheduler.
69-
* @param task_scheduler Task scheduler instance.
70-
* @param pending_requests PendingRequests instance that tracks current
71-
* requests.
72-
* @param task Function that will be executed.
73-
* @param callback Function that will consume task output.
74-
* @param priority Priority of the task.
75-
* @param args Additional agrs to pass to TaskContext.
76-
* @return CancellationToken used to cancel the operation.
77-
*/
78-
template <typename Function, typename Callback, typename... Args>
79-
inline client::CancellationToken AddTaskWithPriority(
80-
const std::shared_ptr<thread::TaskScheduler>& task_scheduler,
81-
const std::shared_ptr<client::PendingRequests>& pending_requests,
82-
Function task, Callback callback, uint32_t priority, Args&&... args) {
83-
auto context = client::TaskContext::Create(
84-
std::move(task), std::move(callback), std::forward<Args>(args)...);
85-
pending_requests->Insert(context);
86-
87-
repository::ExecuteOrSchedule(task_scheduler,
88-
[=] {
89-
context.Execute();
90-
pending_requests->Remove(context);
91-
},
92-
priority);
93-
94-
return context.CancelToken();
95-
}
96-
97-
/*
98-
* @brief Common function used to wrap a lambda function and a callback that
99-
* consumes the function result with a TaskContext class and schedule this to a
100-
* task scheduler with NORMAL priority.
101-
* @param task_scheduler Task scheduler instance.
102-
* @param pending_requests PendingRequests instance that tracks current
103-
* requests.
104-
* @param task Function that will be executed.
105-
* @param callback Function that will consume task output.
106-
* @param args Additional agrs to pass to TaskContext.
107-
* @return CancellationToken used to cancel the operation.
108-
*/
109-
template <typename Function, typename Callback, typename... Args>
110-
inline client::CancellationToken AddTask(
111-
const std::shared_ptr<thread::TaskScheduler>& task_scheduler,
112-
const std::shared_ptr<client::PendingRequests>& pending_requests,
113-
Function task, Callback callback, Args&&... args) {
114-
return AddTaskWithPriority(task_scheduler, pending_requests, task, callback,
115-
thread::NORMAL, std::forward<Args>(args)...);
116-
}
117-
11864
} // namespace read
11965
} // namespace dataservice
12066
} // namespace olp

olp-cpp-sdk-dataservice-read/src/StreamLayerClientImpl.cpp

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
#include "Common.h"
3232
#include "generated/api/BlobApi.h"
3333
#include "generated/api/StreamApi.h"
34-
#include "repositories/ExecuteOrSchedule.inl"
3534

3635
namespace olp {
3736
namespace dataservice {
@@ -60,20 +59,19 @@ StreamLayerClientImpl::StreamLayerClientImpl(client::HRN catalog,
6059
: catalog_(std::move(catalog)),
6160
layer_id_(std::move(layer_id)),
6261
settings_(std::move(settings)),
63-
pending_requests_(std::make_shared<client::PendingRequests>()),
64-
lookup_client_(catalog_, settings_) {
62+
lookup_client_(catalog_, settings_),
63+
task_sink_(settings_.task_scheduler) {
6564
if (!settings_.cache) {
6665
settings_.cache = client::OlpClientSettingsFactory::CreateDefaultCache({});
6766
}
6867
}
6968

70-
StreamLayerClientImpl::~StreamLayerClientImpl() {
71-
pending_requests_->CancelAllAndWait();
72-
}
69+
StreamLayerClientImpl::~StreamLayerClientImpl() = default;
7370

7471
bool StreamLayerClientImpl::CancelPendingRequests() {
7572
OLP_SDK_LOG_TRACE(kLogTag, "CancelPendingRequests");
76-
return pending_requests_->CancelAll();
73+
task_sink_.CancelTasks();
74+
return true;
7775
}
7876

7977
client::CancellationToken StreamLayerClientImpl::Subscribe(
@@ -148,8 +146,8 @@ client::CancellationToken StreamLayerClientImpl::Subscribe(
148146
return subscripton_id;
149147
};
150148

151-
return AddTask(settings_.task_scheduler, pending_requests_,
152-
std::move(subscribe_task), std::move(callback));
149+
return task_sink_.AddTask(std::move(subscribe_task), std::move(callback),
150+
thread::NORMAL);
153151
}
154152

155153
client::CancellableFuture<SubscribeResponse> StreamLayerClientImpl::Subscribe(
@@ -216,8 +214,8 @@ client::CancellationToken StreamLayerClientImpl::Unsubscribe(
216214
return subscription_id;
217215
};
218216

219-
return AddTask(settings_.task_scheduler, pending_requests_,
220-
std::move(unsubscribe_task), std::move(callback));
217+
return task_sink_.AddTask(std::move(unsubscribe_task), std::move(callback),
218+
thread::NORMAL);
221219
}
222220

223221
client::CancellableFuture<UnsubscribeResponse>
@@ -265,8 +263,8 @@ client::CancellationToken StreamLayerClientImpl::GetData(
265263
return blob_response;
266264
};
267265

268-
return AddTask(settings_.task_scheduler, pending_requests_,
269-
std::move(get_data_task), std::move(callback));
266+
return task_sink_.AddTask(std::move(get_data_task), std::move(callback),
267+
thread::NORMAL);
270268
}
271269

272270
client::CancellableFuture<DataResponse> StreamLayerClientImpl::GetData(
@@ -362,8 +360,8 @@ client::CancellationToken StreamLayerClientImpl::Poll(
362360
return res;
363361
};
364362

365-
return AddTask(settings_.task_scheduler, pending_requests_,
366-
std::move(poll_task), std::move(callback));
363+
return task_sink_.AddTask(std::move(poll_task), std::move(callback),
364+
thread::NORMAL);
367365
}
368366

369367
client::CancellableFuture<PollResponse> StreamLayerClientImpl::Poll() {
@@ -422,8 +420,8 @@ client::CancellationToken StreamLayerClientImpl::Seek(
422420
return res;
423421
};
424422

425-
return AddTask(settings_.task_scheduler, pending_requests_,
426-
std::move(seek_task), std::move(callback));
423+
return task_sink_.AddTask(std::move(seek_task), std::move(callback),
424+
thread::NORMAL);
427425
}
428426
client::CancellableFuture<SeekResponse> StreamLayerClientImpl::Seek(
429427
SeekRequest request) {

olp-cpp-sdk-dataservice-read/src/StreamLayerClientImpl.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
#include <olp/dataservice/read/Types.h>
3232
#include <olp/dataservice/read/model/Messages.h>
3333

34+
#include "TaskSink.h"
35+
3436
namespace olp {
3537
namespace client {
3638
class OlpClient;
@@ -93,10 +95,10 @@ class StreamLayerClientImpl {
9395
client::HRN catalog_;
9496
std::string layer_id_;
9597
client::OlpClientSettings settings_;
96-
std::shared_ptr<client::PendingRequests> pending_requests_;
9798
std::mutex mutex_;
9899
std::unique_ptr<StreamLayerClientContext> client_context_;
99100
client::ApiLookupClient lookup_client_;
101+
TaskSink task_sink_;
100102
};
101103

102104
} // namespace read
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright (C) 2020 HERE Europe B.V.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
* SPDX-License-Identifier: Apache-2.0
17+
* License-Filename: LICENSE
18+
*/
19+
20+
#include "TaskSink.h"
21+
22+
#include <olp/core/logging/Log.h>
23+
24+
namespace olp {
25+
namespace dataservice {
26+
namespace read {
27+
namespace {
28+
constexpr auto kLogTag = "TaskSink";
29+
}
30+
31+
TaskSink::TaskSink(std::shared_ptr<thread::TaskScheduler> task_scheduler)
32+
: task_scheduler_(std::move(task_scheduler)),
33+
pending_requests_(std::make_shared<client::PendingRequests>()),
34+
closed_(false) {}
35+
36+
TaskSink::~TaskSink() {
37+
OLP_SDK_LOG_INFO(kLogTag, "Finishing, canceling all current tasks.");
38+
{
39+
std::lock_guard<std::mutex> lock(mutex_);
40+
closed_ = true;
41+
}
42+
// CancelAllAndWait method should be called without mutex, since potentially
43+
// there might be new added tasks, it may result in deadlock.
44+
pending_requests_->CancelAllAndWait();
45+
}
46+
47+
void TaskSink::CancelTasks() { pending_requests_->CancelAll(); }
48+
49+
bool TaskSink::AddTaskImpl(client::TaskContext task, uint32_t priority) {
50+
if (task_scheduler_) {
51+
return ScheduleTask(std::move(task), priority);
52+
} else {
53+
ExecuteTask(std::move(task));
54+
return true;
55+
}
56+
}
57+
58+
bool TaskSink::ScheduleTask(client::TaskContext task, uint32_t priority) {
59+
std::lock_guard<std::mutex> lock(mutex_);
60+
if (closed_) {
61+
OLP_SDK_LOG_WARNING(
62+
kLogTag, "Attempt to add a task when the sink is already closed");
63+
return false;
64+
}
65+
66+
pending_requests_->Insert(task);
67+
auto pending_requests = pending_requests_;
68+
task_scheduler_->ScheduleTask(
69+
[=] {
70+
task.Execute();
71+
pending_requests->Remove(task);
72+
},
73+
priority);
74+
75+
return true;
76+
}
77+
78+
void TaskSink::ExecuteTask(client::TaskContext task) { task.Execute(); }
79+
80+
} // namespace read
81+
} // namespace dataservice
82+
} // namespace olp

0 commit comments

Comments
 (0)