Skip to content

Commit 42bfc03

Browse files
Double-buffering for JSON async stream
Decreases time needed to process single JSON, and reduces memory footprint size. Relates-To: DATASDK-57 Signed-off-by: Andrey Kashcheev <[email protected]>
1 parent 94c4b50 commit 42bfc03

File tree

2 files changed

+35
-28
lines changed

2 files changed

+35
-28
lines changed

olp-cpp-sdk-dataservice-read/src/repositories/AsyncJsonStream.cpp

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (C) 2023 HERE Europe B.V.
2+
* Copyright (C) 2023-2024 HERE Europe B.V.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,47 +19,51 @@
1919

2020
#include "AsyncJsonStream.h"
2121

22-
#include <cstring>
23-
2422
namespace olp {
2523
namespace dataservice {
2624
namespace read {
2725
namespace repository {
2826

29-
RapidJsonByteStream::Ch RapidJsonByteStream::Peek() const {
30-
std::unique_lock<std::mutex> lock(mutex_);
31-
cv_.wait(lock, [=]() { return !Empty(); });
32-
return buffer_[count_];
27+
RapidJsonByteStream::Ch RapidJsonByteStream::Peek() {
28+
if (ReadEmpty()) {
29+
std::unique_lock<std::mutex> lock(mutex_);
30+
cv_.wait(lock, [&]() { return !WriteEmpty(); });
31+
std::swap(read_buffer_, write_buffer_);
32+
write_buffer_.clear();
33+
count_ = 0;
34+
}
35+
return read_buffer_[count_];
3336
}
3437

3538
RapidJsonByteStream::Ch RapidJsonByteStream::Take() {
36-
std::unique_lock<std::mutex> lock(mutex_);
37-
cv_.wait(lock, [=]() { return !Empty(); });
38-
return buffer_[count_++];
39+
if (ReadEmpty()) {
40+
std::unique_lock<std::mutex> lock(mutex_);
41+
cv_.wait(lock, [&]() { return !WriteEmpty(); });
42+
std::swap(read_buffer_, write_buffer_);
43+
}
44+
full_count_++;
45+
return read_buffer_[count_++];
3946
}
4047

41-
size_t RapidJsonByteStream::Tell() const { return count_; }
48+
size_t RapidJsonByteStream::Tell() const { return full_count_; }
4249

4350
// Not implemented
4451
char* RapidJsonByteStream::PutBegin() { return 0; }
4552
void RapidJsonByteStream::Put(char) {}
4653
void RapidJsonByteStream::Flush() {}
4754
size_t RapidJsonByteStream::PutEnd(char*) { return 0; }
4855

49-
bool RapidJsonByteStream::Empty() const { return count_ == buffer_.size(); }
56+
bool RapidJsonByteStream::ReadEmpty() const {
57+
return count_ == read_buffer_.size();
58+
}
59+
bool RapidJsonByteStream::WriteEmpty() const { return write_buffer_.empty(); }
5060

5161
void RapidJsonByteStream::AppendContent(const char* content, size_t length) {
5262
std::unique_lock<std::mutex> lock(mutex_);
5363

54-
if (Empty()) {
55-
buffer_.resize(length);
56-
std::memcpy(buffer_.data(), content, length);
57-
count_ = 0;
58-
} else {
59-
const auto buffer_size = buffer_.size();
60-
buffer_.resize(buffer_size + length);
61-
std::memcpy(buffer_.data() + buffer_size, content, length);
62-
}
64+
const auto buffer_size = write_buffer_.size();
65+
write_buffer_.reserve(buffer_size + length);
66+
write_buffer_.insert(write_buffer_.end(), content, content + length);
6367

6468
cv_.notify_one();
6569
}
@@ -97,7 +101,7 @@ void AsyncJsonStream::CloseStream(boost::optional<client::ApiError> error) {
97101
return;
98102
}
99103
current_stream_->AppendContent("\0", 1);
100-
error_ = error;
104+
error_ = std::move(error);
101105
closed_ = true;
102106
}
103107

olp-cpp-sdk-dataservice-read/src/repositories/AsyncJsonStream.h

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (C) 2023 HERE Europe B.V.
2+
* Copyright (C) 2023-2024 HERE Europe B.V.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -32,13 +32,13 @@ namespace dataservice {
3232
namespace read {
3333
namespace repository {
3434

35-
// Json byte stream class. Implements rapidjson input stream concept.
35+
/// Json byte stream class. Implements rapidjson input stream concept.
3636
class RapidJsonByteStream {
3737
public:
3838
typedef char Ch;
3939

4040
/// Read the current character from stream without moving the read cursor.
41-
Ch Peek() const;
41+
Ch Peek();
4242

4343
/// Read the current character from stream and moving the read cursor to next
4444
/// character.
@@ -47,19 +47,22 @@ class RapidJsonByteStream {
4747
/// Get the current read cursor.
4848
size_t Tell() const;
4949

50-
// Not needed for reading.
50+
/// Not needed for reading.
5151
char* PutBegin();
5252
void Put(char);
5353
void Flush();
5454
size_t PutEnd(char*);
5555

56-
bool Empty() const;
56+
bool ReadEmpty() const;
57+
bool WriteEmpty() const;
5758

5859
void AppendContent(const char* content, size_t length);
5960

6061
private:
6162
mutable std::mutex mutex_;
62-
std::vector<char> buffer_; // Current buffer
63+
std::vector<char> read_buffer_; // Current buffer
64+
std::vector<char> write_buffer_; // Current buffer
65+
size_t full_count_{0}; // Bytes read from the buffer
6366
size_t count_{0}; // Bytes read from the buffer
6467
mutable std::condition_variable cv_; // Condition for next portion of content
6568
};

0 commit comments

Comments
 (0)