26
26
#include " logging/logConfiguration.hpp"
27
27
#include " logging/logFileOutput.hpp"
28
28
#include " logging/logFileStreamOutput.hpp"
29
- #include " logging/logHandle .hpp"
29
+ #include " memory/allocation .hpp"
30
30
#include " memory/resourceArea.hpp"
31
31
#include " runtime/atomic.hpp"
32
- #include " runtime/os.inline.hpp"
33
- #include " runtime/globals.hpp"
34
32
33
+ class AsyncLogWriter ::Locker : public StackObj {
34
+ Thread*& _holder;
35
+ PlatformMonitor& _lock;
35
36
36
- class AsyncLogWriter ::AsyncLogLocker : public StackObj {
37
- static Thread* _holder;
38
37
public:
39
- static Thread* current_holder () { return _holder; }
40
- AsyncLogLocker () {
41
- assert (_instance != nullptr , " AsyncLogWriter:: _lock is unavailable " );
42
- _instance-> _lock .lock ();
38
+ Locker ( Thread*& holder, PlatformMonitor& lock)
39
+ : _holder(holder),
40
+ _lock (lock) {
41
+ _lock.lock ();
43
42
_holder = Thread::current_or_null ();
44
43
}
45
44
46
- ~AsyncLogLocker () {
45
+ ~Locker () {
47
46
assert (_holder == Thread::current_or_null (), " must be" );
48
47
_holder = nullptr ;
49
- _instance->_lock .unlock ();
48
+ _lock.unlock ();
49
+ }
50
+
51
+ void notify () {
52
+ _lock.notify ();
50
53
}
51
54
52
55
void wait () {
53
56
Thread* saved_holder = _holder;
54
57
_holder = nullptr ;
55
- _instance-> _lock .wait (0 /* no timeout */ );
58
+ _lock.wait (0 /* no timeout */ );
56
59
_holder = saved_holder;
57
60
}
58
61
};
59
62
60
- Thread* AsyncLogWriter::AsyncLogLocker::_holder = nullptr ;
63
+ class AsyncLogWriter ::ProducerLocker : public Locker {
64
+ static Thread* _holder;
65
+ public:
66
+ static Thread* current_holder () { return _holder; }
67
+ ProducerLocker () : Locker(_holder, _instance->_producer_lock) {}
68
+ };
69
+
70
+ class AsyncLogWriter ::ConsumerLocker : public Locker {
71
+ static Thread* _holder;
72
+ public:
73
+ static Thread* current_holder () { return _holder; }
74
+ ConsumerLocker () : Locker(_holder, _instance->_consumer_lock) {}
75
+ };
76
+
77
+ Thread* AsyncLogWriter::ProducerLocker::_holder = nullptr ;
78
+ Thread* AsyncLogWriter::ConsumerLocker::_holder = nullptr ;
61
79
62
80
// LogDecorator::None applies to 'constant initialization' because of its constexpr constructor.
63
81
const LogDecorations& AsyncLogWriter::None = LogDecorations(LogLevel::Warning, LogTagSetMapping<LogTag::__NO_TAG>::tagset(),
64
82
LogDecorators::None);
65
83
66
- bool AsyncLogWriter::Buffer::push_back (LogFileStreamOutput* output, const LogDecorations& decorations, const char * msg) {
67
- const size_t len = strlen (msg) ;
84
+ bool AsyncLogWriter::Buffer::push_back (LogFileStreamOutput* output, const LogDecorations& decorations, const char * msg, const size_t msg_len ) {
85
+ const size_t len = msg_len ;
68
86
const size_t sz = Message::calc_size (len);
69
87
const bool is_token = output == nullptr ;
70
88
// Always leave headroom for the flush token. Pushing a token must succeed.
@@ -80,7 +98,7 @@ bool AsyncLogWriter::Buffer::push_back(LogFileStreamOutput* output, const LogDec
80
98
}
81
99
82
100
void AsyncLogWriter::Buffer::push_flush_token () {
83
- bool result = push_back (nullptr , AsyncLogWriter::None, " " );
101
+ bool result = push_back (nullptr , AsyncLogWriter::None, " " , 0 );
84
102
assert (result, " fail to enqueue the flush token." );
85
103
}
86
104
@@ -89,22 +107,45 @@ void AsyncLogWriter::enqueue_locked(LogFileStreamOutput* output, const LogDecora
89
107
// client should use "" instead.
90
108
assert (msg != nullptr , " enqueuing a null message!" );
91
109
92
- if (!_buffer->push_back (output, decorations, msg)) {
93
- bool p_created;
94
- uint32_t * counter = _stats.put_if_absent (output, 0 , &p_created);
95
- *counter = *counter + 1 ;
96
- return ;
97
- }
110
+ size_t msg_len = strlen (msg);
111
+ void * stalled_message = nullptr ;
112
+ {
113
+ ConsumerLocker clocker;
114
+ if (_buffer->push_back (output, decorations, msg, msg_len)) {
115
+ _data_available = true ;
116
+ clocker.notify ();
117
+ return ;
118
+ }
98
119
99
- _data_available = true ;
100
- _lock.notify ();
120
+ if (LogConfiguration::async_mode () == LogConfiguration::AsyncMode::Stall) {
121
+ size_t size = Message::calc_size (msg_len);
122
+ stalled_message = os::malloc (size, mtLogging);
123
+ if (stalled_message == nullptr ) {
124
+ // Out of memory. We bail without any notice.
125
+ // Some other part of the system will probably fail later.
126
+ return ;
127
+ }
128
+ _stalled_message = new (stalled_message) Message (output, decorations, msg, msg_len);
129
+ _data_available = true ;
130
+ clocker.notify ();
131
+ // Note: we still hold the producer lock so cannot race against other threads trying to log a message
132
+ while (_stalled_message != nullptr ) {
133
+ clocker.wait ();
134
+ }
135
+ } else {
136
+ bool p_created;
137
+ uint32_t * counter = _stats.put_if_absent (output, 0 , &p_created);
138
+ *counter = *counter + 1 ;
139
+ }
140
+ } // ConsumerLocker out of scope
141
+ os::free (stalled_message);
101
142
}
102
143
103
144
// This function checks for cases where continuing with asynchronous logging may lead to stability issues, such as a deadlock.
104
145
// If this returns false then we give up on logging asynchronously and do so synchronously instead.
105
146
bool AsyncLogWriter::is_enqueue_allowed () {
106
147
AsyncLogWriter* alw = AsyncLogWriter::instance ();
107
- Thread* holding_thread = AsyncLogWriter::AsyncLogLocker ::current_holder ();
148
+ Thread* holding_thread = AsyncLogWriter::ProducerLocker ::current_holder ();
108
149
Thread* this_thread = Thread::current_or_null ();
109
150
if (this_thread == nullptr ) {
110
151
// The current thread is unattached.
@@ -142,7 +183,7 @@ bool AsyncLogWriter::enqueue(LogFileStreamOutput& output, const LogDecorations&
142
183
return false ;
143
184
}
144
185
145
- AsyncLogLocker locker ;
186
+ ProducerLocker plocker ;
146
187
147
188
#ifdef ASSERT
148
189
if (TestingAsyncLoggingDeathTest || TestingAsyncLoggingDeathTestNoCrash) {
@@ -162,17 +203,21 @@ bool AsyncLogWriter::enqueue(LogFileStreamOutput& output, LogMessageBuffer::Iter
162
203
}
163
204
164
205
// If we get here we know the AsyncLogWriter is initialized.
165
- AsyncLogLocker locker ;
206
+ ProducerLocker plocker ;
166
207
for (; !msg_iterator.is_at_end (); msg_iterator++) {
167
208
AsyncLogWriter::instance ()->enqueue_locked (&output, msg_iterator.decorations (), msg_iterator.message ());
168
209
}
169
210
return true ;
170
211
}
171
212
172
213
AsyncLogWriter::AsyncLogWriter ()
173
- : _flush_sem(0 ), _lock(), _data_available(false ),
174
- _initialized(false ),
175
- _stats() {
214
+ : _flush_sem(0 ),
215
+ _producer_lock(),
216
+ _consumer_lock(),
217
+ _data_available(false ),
218
+ _initialized(false ),
219
+ _stats(),
220
+ _stalled_message(nullptr ) {
176
221
177
222
size_t size = AsyncLogBufferSize / 2 ;
178
223
_buffer = new Buffer (size);
@@ -185,7 +230,7 @@ AsyncLogWriter::AsyncLogWriter()
185
230
}
186
231
}
187
232
188
- void AsyncLogWriter::write (AsyncLogMap<AnyObj::RESOURCE_AREA>& snapshot) {
233
+ bool AsyncLogWriter::write (AsyncLogMap<AnyObj::RESOURCE_AREA>& snapshot) {
189
234
int req = 0 ;
190
235
auto it = _buffer_staging->iterator ();
191
236
while (it.hasNext ()) {
@@ -213,20 +258,21 @@ void AsyncLogWriter::write(AsyncLogMap<AnyObj::RESOURCE_AREA>& snapshot) {
213
258
214
259
if (req > 0 ) {
215
260
assert (req == 1 , " Only one token is allowed in queue. AsyncLogWriter::flush() is NOT MT-safe!" );
216
- _flush_sem. signal (req) ;
261
+ return true ;
217
262
}
263
+ return false ;
218
264
}
219
265
220
266
void AsyncLogWriter::run () {
221
267
while (true ) {
222
268
ResourceMark rm;
223
269
AsyncLogMap<AnyObj::RESOURCE_AREA> snapshot;
224
270
{
225
- AsyncLogLocker locker;
226
-
271
+ ConsumerLocker clocker;
227
272
while (!_data_available) {
228
- locker .wait ();
273
+ clocker .wait ();
229
274
}
275
+
230
276
// Only doing a swap and statistics under the lock to
231
277
// guarantee that I/O jobs don't block logsites.
232
278
_buffer_staging->reset ();
@@ -243,7 +289,23 @@ void AsyncLogWriter::run() {
243
289
});
244
290
_data_available = false ;
245
291
}
246
- write (snapshot);
292
+
293
+ bool saw_flush_token = write (snapshot);
294
+
295
+ // Any stalled message must be written *after* the buffer has been written.
296
+ // This is because we try hard to output messages in program-order.
297
+ if (_stalled_message != nullptr ) {
298
+ assert (LogConfiguration::async_mode () == LogConfiguration::AsyncMode::Stall, " must be" );
299
+ ConsumerLocker clocker;
300
+ Message* m = (Message*)_stalled_message;
301
+ m->output ()->write_blocking (m->decorations (), m->message ());
302
+ _stalled_message = nullptr ;
303
+ clocker.notify ();
304
+ }
305
+
306
+ if (saw_flush_token) {
307
+ _flush_sem.signal (1 );
308
+ }
247
309
}
248
310
}
249
311
@@ -281,19 +343,20 @@ AsyncLogWriter* AsyncLogWriter::instance() {
281
343
void AsyncLogWriter::flush () {
282
344
if (_instance != nullptr ) {
283
345
{
284
- AsyncLogLocker locker;
346
+ ProducerLocker plocker;
347
+ ConsumerLocker clocker;
285
348
// Push directly in-case we are at logical max capacity, as this must not get dropped.
286
349
_instance->_buffer ->push_flush_token ();
287
350
_instance->_data_available = true ;
288
- _instance-> _lock .notify ();
351
+ clocker .notify ();
289
352
}
290
353
291
354
_instance->_flush_sem .wait ();
292
355
}
293
356
}
294
357
295
358
AsyncLogWriter::BufferUpdater::BufferUpdater (size_t newsize) {
296
- AsyncLogLocker locker ;
359
+ ConsumerLocker clocker ;
297
360
auto p = AsyncLogWriter::_instance;
298
361
299
362
_buf1 = p->_buffer ;
@@ -307,7 +370,7 @@ AsyncLogWriter::BufferUpdater::~BufferUpdater() {
307
370
auto p = AsyncLogWriter::_instance;
308
371
309
372
{
310
- AsyncLogLocker locker ;
373
+ ConsumerLocker clocker ;
311
374
312
375
delete p->_buffer ;
313
376
delete p->_buffer_staging ;
0 commit comments