Skip to content

Commit bea4aaf

Browse files
Wrote my AIO
POSIX asynchronous I/O is very sub-optimal. They create threads, dynamically allocate memory, and invoke a lot of system calls (which set the signal mask, priority, and more). Additional locks are also possible (if the AIO is using someone else). Moreover, they are not portable.
1 parent 137992f commit bea4aaf

File tree

4 files changed

+205
-99
lines changed

4 files changed

+205
-99
lines changed

runtime/DoubleBuffer.h

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
2+
#pragma once
3+
#include <condition_variable>
4+
#include <cstdio>
5+
#include <cstdlib>
6+
#include <memory>
7+
#include <mutex>
8+
9+
#include <unistd.h>
10+
11+
#include "Config.h"
12+
#include "Portability.h"
13+
14+
class DoubleBuffer
15+
{
16+
struct Deleter {
17+
void operator()(char *__ptr) const { free(__ptr); }
18+
};
19+
20+
using MemPtrT = std::unique_ptr<char, Deleter>;
21+
22+
static MemPtrT allocateMemory()
23+
{
24+
char *tmp;
25+
int err = posix_memalign(reinterpret_cast<void **>(&tmp), 512,
26+
NanoLogConfig::OUTPUT_BUFFER_SIZE);
27+
if (err) {
28+
perror(
29+
"The NanoLog system was not able to allocate enough memory "
30+
"to support its operations. Quitting...\r\n");
31+
std::exit(-1);
32+
}
33+
return MemPtrT{tmp};
34+
};
35+
36+
static char *accessOnce(const MemPtrT &ptr)
37+
{
38+
NANOLOG_READ_WRITE_BARRIER;
39+
return ptr.get();
40+
}
41+
42+
MemPtrT freeBuffer;
43+
MemPtrT compressingBuffer;
44+
MemPtrT writeBuffer;
45+
unsigned size;
46+
int errorCode;
47+
48+
std::condition_variable condition;
49+
std::mutex mutex;
50+
51+
public:
52+
DoubleBuffer()
53+
: freeBuffer(allocateMemory()),
54+
compressingBuffer(allocateMemory()),
55+
writeBuffer(nullptr),
56+
size(0),
57+
errorCode(0),
58+
condition(),
59+
mutex(){};
60+
61+
char *getCompressingBuffer() noexcept { return compressingBuffer.get(); }
62+
63+
bool writeInProgress() const { return accessOnce(freeBuffer) == nullptr; }
64+
65+
int swapBuffer(unsigned count) noexcept
66+
{
67+
while (accessOnce(writeBuffer) != nullptr) {}
68+
69+
{
70+
std::unique_lock<std::mutex> lock(mutex);
71+
size = count;
72+
std::swap(writeBuffer, compressingBuffer);
73+
if (freeBuffer == nullptr) {
74+
condition.wait(lock, [this]() { return freeBuffer != nullptr; });
75+
} else {
76+
condition.notify_one();
77+
}
78+
std::swap(freeBuffer, compressingBuffer);
79+
return errorCode;
80+
}
81+
}
82+
83+
void writeToFile(int file) noexcept
84+
{
85+
unsigned tmp_size = 0;
86+
MemPtrT tmp_ptr = nullptr;
87+
88+
{
89+
std::unique_lock<std::mutex> lock(mutex);
90+
condition.wait(lock, [this]() { return writeBuffer != nullptr; });
91+
tmp_size = size;
92+
std::swap(writeBuffer, tmp_ptr);
93+
}
94+
95+
int res = 0;
96+
if (tmp_size != 0) {
97+
res = write(file, tmp_ptr.get(), tmp_size);
98+
res = (res < 0) ? errno : 0;
99+
}
100+
101+
while (accessOnce(freeBuffer) != nullptr) {}
102+
103+
{
104+
std::unique_lock<std::mutex> lock(mutex);
105+
errorCode = res;
106+
std::swap(freeBuffer, tmp_ptr);
107+
if (compressingBuffer == nullptr) {
108+
condition.notify_one();
109+
}
110+
}
111+
}
112+
};

runtime/Portability.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,4 +49,12 @@
4949
#define NANOLOG_PRINTF_FORMAT_ATTR(string_index, first_to_check)
5050
#endif
5151

52+
#if defined(_MSC_VER)
53+
extern "C" void _ReadWriteBarrier(void);
54+
#pragma intrinsic(_ReadWriteBarrier)
55+
#define NANOLOG_READ_WRITE_BARRIER _ReadWriteBarrier()
56+
#elif defined(__GNUC__)
57+
#define NANOLOG_READ_WRITE_BARRIER __sync_synchronize()
58+
#endif
59+
5260
#endif /* NANOLOG_PORTABILITY_H */

runtime/RuntimeLogger.cc

Lines changed: 71 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,13 @@ RuntimeLogger::RuntimeLogger()
4343
, compressionThread()
4444
, hasOutstandingOperation(false)
4545
, compressionThreadShouldExit(false)
46+
, writerThreadShouldExit(false)
4647
, syncStatus(SYNC_COMPLETED)
4748
, condMutex()
4849
, workAdded()
4950
, hintSyncCompleted()
5051
, outputFd(-1)
51-
, aioCb()
52-
, compressingBuffer(nullptr)
53-
, outputDoubleBuffer(nullptr)
52+
, buffer()
5453
, currentLogLevel(NOTICE)
5554
, cycleAtThreadStart(0)
5655
, cyclesAtLastAIOStart(0)
@@ -82,26 +81,9 @@ RuntimeLogger::RuntimeLogger()
8281
std::exit(-1);
8382
}
8483

85-
memset(&aioCb, 0, sizeof(aioCb));
86-
87-
int err = posix_memalign(reinterpret_cast<void **>(&compressingBuffer),
88-
512, NanoLogConfig::OUTPUT_BUFFER_SIZE);
89-
if (err) {
90-
perror("The NanoLog system was not able to allocate enough memory "
91-
"to support its operations. Quitting...\r\n");
92-
std::exit(-1);
93-
}
94-
95-
err = posix_memalign(reinterpret_cast<void **>(&outputDoubleBuffer),
96-
512, NanoLogConfig::OUTPUT_BUFFER_SIZE);
97-
if (err) {
98-
perror("The NanoLog system was not able to allocate enough memory "
99-
"to support its operations. Quitting...\r\n");
100-
std::exit(-1);
101-
}
102-
10384
#ifndef BENCHMARK_DISCARD_ENTRIES_AT_STAGINGBUFFER
10485
compressionThread = std::thread(&RuntimeLogger::compressionThreadMain, this);
86+
writerThread = std::thread(&RuntimeLogger::writerThreadMain, this);
10587
#endif
10688
}
10789

@@ -116,19 +98,16 @@ RuntimeLogger::~RuntimeLogger() {
11698
nanoLogSingleton.workAdded.notify_all();
11799
}
118100

101+
// Stop the writer thread completely
102+
writerThreadShouldExit = true;
103+
buffer.swapBuffer(0);
104+
writerThread.join();
105+
119106
if (nanoLogSingleton.compressionThread.joinable())
120107
nanoLogSingleton.compressionThread.join();
121108

122-
// Free all the data structures
123-
if (compressingBuffer) {
124-
free(compressingBuffer);
125-
compressingBuffer = nullptr;
126-
}
127-
128-
if (outputDoubleBuffer) {
129-
free(outputDoubleBuffer);
130-
outputDoubleBuffer = nullptr;
131-
}
109+
if (nanoLogSingleton.writerThread.joinable())
110+
nanoLogSingleton.writerThread.join();
132111

133112
if (outputFd > 0)
134113
close(outputFd);
@@ -314,8 +293,8 @@ RuntimeLogger::preallocate() {
314293
}
315294

316295
/**
317-
* Main compression thread that handles scanning through the StagingBuffers,
318-
* compressing log entries, and outputting a compressed log file.
296+
* Main compression thread that handles scanning through the StagingBuffers and
297+
* compressing log entries.
319298
*/
320299
void
321300
RuntimeLogger::compressionThreadMain() {
@@ -329,7 +308,7 @@ RuntimeLogger::compressionThreadMain() {
329308
cycleAtThreadStart = cyclesAwakeStart;
330309

331310
// Manages the state associated with compressing log messages
332-
Log::Encoder encoder(compressingBuffer, NanoLogConfig::OUTPUT_BUFFER_SIZE);
311+
Log::Encoder encoder(buffer.getCompressingBuffer(), NanoLogConfig::OUTPUT_BUFFER_SIZE);
333312

334313
// Indicates whether a compression operation failed or not due
335314
// to insufficient space in the outputBuffer
@@ -502,51 +481,35 @@ RuntimeLogger::compressionThreadMain() {
502481
}
503482

504483
if (hasOutstandingOperation) {
505-
if (aio_error(&aioCb) == EINPROGRESS) {
506-
const struct aiocb *const aiocb_list[] = {&aioCb};
507-
if (outputBufferFull) {
508-
// If the output buffer is full and we're not done,
509-
// wait for completion
510-
cyclesActive += PerfUtils::Cycles::rdtsc() - cyclesAwakeStart;
511-
int err = aio_suspend(aiocb_list, 1, NULL);
484+
if (buffer.writeInProgress() && !outputBufferFull) {
485+
// If there's no new data, go to sleep.
486+
if (bytesConsumedThisIteration == 0 &&
487+
NanoLogConfig::POLL_INTERVAL_DURING_IO_US > 0)
488+
{
489+
std::unique_lock<std::mutex> lock(condMutex);
490+
cyclesActive += PerfUtils::Cycles::rdtsc() -
491+
cyclesAwakeStart;
492+
workAdded.wait_for(lock, std::chrono::microseconds(
493+
NanoLogConfig::POLL_INTERVAL_DURING_IO_US));
512494
cyclesAwakeStart = PerfUtils::Cycles::rdtsc();
513-
if (err != 0)
514-
perror("LogCompressor's Posix AIO "
515-
"suspend operation failed");
516-
} else {
517-
// If there's no new data, go to sleep.
518-
if (bytesConsumedThisIteration == 0 &&
519-
NanoLogConfig::POLL_INTERVAL_DURING_IO_US > 0)
520-
{
521-
std::unique_lock<std::mutex> lock(condMutex);
522-
cyclesActive += PerfUtils::Cycles::rdtsc() -
523-
cyclesAwakeStart;
524-
workAdded.wait_for(lock, std::chrono::microseconds(
525-
NanoLogConfig::POLL_INTERVAL_DURING_IO_US));
526-
cyclesAwakeStart = PerfUtils::Cycles::rdtsc();
527-
}
528-
529-
if (aio_error(&aioCb) == EINPROGRESS)
530-
continue;
531495
}
532-
}
533-
534-
// Finishing up the IO
535-
int err = aio_error(&aioCb);
536-
ssize_t ret = aio_return(&aioCb);
537496

538-
if (err != 0) {
539-
fprintf(stderr, "LogCompressor's POSIX AIO failed"
540-
" with %d: %s\r\n", err, strerror(err));
541-
} else if (ret < 0) {
542-
perror("LogCompressor's Posix AIO Write failed");
497+
if (buffer.writeInProgress())
498+
continue;
543499
}
544-
++numAioWritesCompleted;
545-
hasOutstandingOperation = false;
546-
cyclesDiskIO_upperBound += (start - cyclesAtLastAIOStart);
547500

548501
// We've completed an AIO, check if we need to notify
549502
if (syncStatus == WAITING_ON_AIO) {
503+
cyclesActive += PerfUtils::Cycles::rdtsc() - cyclesAwakeStart;
504+
int err = buffer.swapBuffer(0);
505+
cyclesAwakeStart = PerfUtils::Cycles::rdtsc();
506+
if (err != 0)
507+
fprintf(stderr, "LogCompressor's write failed with %d: %s\n",
508+
err, strerror(err));
509+
510+
++numAioWritesCompleted;
511+
hasOutstandingOperation = false;
512+
cyclesDiskIO_upperBound += (start - cyclesAtLastAIOStart);
550513
std::unique_lock<std::mutex> lock(nanoLogSingleton.condMutex);
551514
if (syncStatus == WAITING_ON_AIO) {
552515
syncStatus = SYNC_COMPLETED;
@@ -567,34 +530,49 @@ RuntimeLogger::compressionThreadMain() {
567530
ssize_t bytesOver = bytesToWrite % 512;
568531

569532
if (bytesOver != 0) {
570-
memset(compressingBuffer, 0, 512 - bytesOver);
533+
memset(buffer.getCompressingBuffer(), 0, 512 - bytesOver);
571534
bytesToWrite = bytesToWrite + 512 - bytesOver;
572535
padBytesWritten += (512 - bytesOver);
573536
}
574537
}
575538

576-
aioCb.aio_fildes = outputFd;
577-
aioCb.aio_buf = compressingBuffer;
578-
aioCb.aio_nbytes = bytesToWrite;
579539
totalBytesWritten += bytesToWrite;
580540

581-
cyclesAtLastAIOStart = PerfUtils::Cycles::rdtsc();
582-
if (aio_write(&aioCb) == -1)
583-
fprintf(stderr, "Error at aio_write(): %s\n", strerror(errno));
541+
uint64_t tmp = PerfUtils::Cycles::rdtsc();
542+
int err = buffer.swapBuffer(bytesToWrite);
543+
544+
if (hasOutstandingOperation) {
545+
cyclesActive += tmp - cyclesAwakeStart;
546+
cyclesAwakeStart = PerfUtils::Cycles::rdtsc();
547+
if (err != 0)
548+
fprintf(stderr, "LogCompressor's write failed with %d: %s\n", err,
549+
strerror(err));
584550

585-
hasOutstandingOperation = true;
551+
++numAioWritesCompleted;
552+
cyclesDiskIO_upperBound += (start - cyclesAtLastAIOStart);
553+
}
586554

587-
// Swap buffers
588-
encoder.swapBuffer(outputDoubleBuffer,
555+
cyclesAtLastAIOStart = tmp;
556+
hasOutstandingOperation = true;
557+
encoder.swapBuffer(buffer.getCompressingBuffer(),
589558
NanoLogConfig::OUTPUT_BUFFER_SIZE);
590-
std::swap(outputDoubleBuffer, compressingBuffer);
591559
outputBufferFull = false;
592560
}
593561

594562
cycleAtThreadStart = 0;
595563
cyclesActive += PerfUtils::Cycles::rdtsc() - cyclesAwakeStart;
596564
}
597565

566+
/**
567+
* Main writer thread that outputting a compressed log file.
568+
*/
569+
void
570+
RuntimeLogger::writerThreadMain() {
571+
while (!writerThreadShouldExit) {
572+
buffer.writeToFile(outputFd);
573+
}
574+
}
575+
598576
// Documentation in NanoLog.h
599577
void
600578
RuntimeLogger::setLogFile_internal(const char *filename) {
@@ -625,18 +603,28 @@ RuntimeLogger::setLogFile_internal(const char *filename) {
625603
workAdded.notify_all();
626604
}
627605

606+
// Stop the writer thread completely
607+
writerThreadShouldExit = true;
608+
buffer.swapBuffer(0);
609+
writerThread.join();
610+
628611
if (compressionThread.joinable())
629612
compressionThread.join();
630613

614+
if (writerThread.joinable())
615+
writerThread.join();
616+
631617
if (outputFd > 0)
632618
close(outputFd);
633619
outputFd = newFd;
634620

635621
// Relaunch thread
636622
nextInvocationIndexToBePersisted = 0; // Reset the dictionary
637623
compressionThreadShouldExit = false;
624+
writerThreadShouldExit = false;
638625
#ifndef BENCHMARK_DISCARD_ENTRIES_AT_STAGINGBUFFER
639626
compressionThread = std::thread(&RuntimeLogger::compressionThreadMain, this);
627+
writerThread = std::thread(&RuntimeLogger::writerThreadMain, this);
640628
#endif
641629
}
642630

0 commit comments

Comments
 (0)