Skip to content

Commit 094d09f

Browse files
Remove all remaining mutexes from core replayer code
The replayer now only has one mutex left, which is specific to our implementation of Vulkan replay. See doc/Multithreading.md for more information on it. Change lava-replay output file to JSON. Also add some extra info, like frames rendered.
1 parent 1e99b36 commit 094d09f

File tree

10 files changed

+77
-77
lines changed

10 files changed

+77
-77
lines changed

CMakeLists.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@ add_executable(mprotect tests/mprotect.cpp src/util.cpp src/util.h)
223223
target_include_directories(mprotect ${COMMON_INCLUDE})
224224
target_link_libraries(mprotect ${MOST_COMMON_LIBRARIES} pthread)
225225
target_compile_options(mprotect PRIVATE ${COMMON_FLAGS})
226+
target_compile_definitions(mprotect PUBLIC COMPILE_LAYER PUBLIC NO_JSON)
226227

227228
#add_executable(userfaultfd tests/userfaultfd.cpp src/util.cpp src/util.h)
228229
#target_include_directories(userfaultfd ${COMMON_INCLUDE})
@@ -370,20 +371,23 @@ add_executable(write4 tests/write4.cpp src/filewriter.cpp src/util.cpp)
370371
target_include_directories(write4 ${COMMON_INCLUDE})
371372
target_link_libraries(write4 ${MOST_COMMON_LIBRARIES} density LZ4::LZ4 pthread)
372373
target_compile_options(write4 PRIVATE ${COMMON_FLAGS})
374+
target_compile_definitions(write4 PUBLIC COMPILE_LAYER NO_JSON)
373375
add_dependencies(write4 sync_generated)
374376
add_test(NAME write_test_4 COMMAND ${CMAKE_CURRENT_BINARY_DIR}/write4)
375377

376378
add_executable(read4 tests/read4.cpp src/filereader.cpp src/util.cpp)
377379
target_include_directories(read4 ${COMMON_INCLUDE})
378380
target_link_libraries(read4 ${MOST_COMMON_LIBRARIES} density LZ4::LZ4 pthread)
379381
target_compile_options(read4 PRIVATE ${COMMON_FLAGS})
382+
target_compile_definitions(read4 PUBLIC COMPILE_LAYER NO_JSON)
380383
add_dependencies(read4 sync_generated)
381384
add_test(NAME read_test_4 COMMAND ${CMAKE_CURRENT_BINARY_DIR}/read4)
382385
set_property(TEST read_test_4 APPEND PROPERTY DEPENDS write_test_4)
383386

384387
add_executable(write5 tests/write5.cpp src/filewriter.cpp src/filereader.cpp src/util.cpp)
385388
target_include_directories(write5 ${COMMON_INCLUDE})
386389
target_link_libraries(write5 ${MOST_COMMON_LIBRARIES} density LZ4::LZ4 pthread)
390+
target_compile_definitions(write5 PUBLIC COMPILE_LAYER NO_JSON)
387391
target_compile_options(write5 PRIVATE ${COMMON_FLAGS})
388392
add_dependencies(write5 sync_generated)
389393
add_test(NAME write_test_5 COMMAND ${CMAKE_CURRENT_BINARY_DIR}/write5)

src/filereader.cpp

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,6 @@ void file_reader::decompressor()
134134
decompress_chunk();
135135
}
136136

137-
chunk_mutex.lock();
138137
clockid_t id;
139138
int r = pthread_getcpuclockid(pthread_self(), &id);
140139
if (r != 0)
@@ -145,7 +144,6 @@ void file_reader::decompressor()
145144
{
146145
ELOG("Failed to get worker thread %u CPU usage: %s", tid, strerror(errno));
147146
}
148-
chunk_mutex.unlock();
149147
}
150148

151149
void file_reader::start_measurement()
@@ -184,7 +182,6 @@ void file_reader::start_measurement()
184182

185183
void file_reader::stop_measurement(uint64_t& worker, uint64_t& runner)
186184
{
187-
chunk_mutex.lock();
188185
struct timespec stop_runner_cpu_usage;
189186
clockid_t id;
190187
int r = pthread_getcpuclockid(pthread_self(), &id);
@@ -202,7 +199,6 @@ void file_reader::stop_measurement(uint64_t& worker, uint64_t& runner)
202199
if (!multithreaded_read)
203200
{
204201
worker = 0;
205-
chunk_mutex.unlock();
206202
return;
207203
}
208204
pthread_t t = decompressor_thread.native_handle();
@@ -218,5 +214,4 @@ void file_reader::stop_measurement(uint64_t& worker, uint64_t& runner)
218214
}
219215
assert(stop_worker_cpu_usage.tv_sec >= worker_cpu_usage.tv_sec);
220216
worker = diff_timespec(&stop_worker_cpu_usage, &worker_cpu_usage);
221-
chunk_mutex.unlock();
222217
}

src/filereader.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
#include <stdio.h>
1010
#include <stdarg.h>
1111

12-
#include "lavamutex.h"
1312
#include "packfile.h"
1413
#include "containers.h"
1514
#include "util.h"
@@ -163,11 +162,9 @@ class file_reader
163162

164163
void disable_multithreaded_read() // we can only disable on the fly, enable makes less sense
165164
{
166-
chunk_mutex.lock();
167165
done_decompressing = true;
168166
decompressor_thread.join();
169167
multithreaded_read = false;
170-
chunk_mutex.unlock();
171168
}
172169

173170
void self_test() const
@@ -186,7 +183,6 @@ class file_reader
186183

187184
bool multithreaded_read = true;
188185
unsigned tid = -1; // only used for logging
189-
lava::mutex chunk_mutex;
190186
/// Pointer to mapped memory of compressed file
191187
char* compressed_data = nullptr; // current position in compressed buffer
192188
char* fstart = nullptr; // start position

src/read.cpp

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -96,26 +96,25 @@ uint16_t lava_file_reader::read_apicall()
9696

9797
lava_reader::~lava_reader()
9898
{
99-
global_mutex.lock();
10099
delete thread_call_numbers;
101100
for (auto& t : thread_streams)
102101
{
103-
delete t.second;
102+
delete t;
104103
}
105104
thread_streams.clear();
106-
global_mutex.unlock();
107105
}
108106

109107
void lava_reader::finalize(bool terminate)
110108
{
111109
const double total_time_ms = ((gettime() - mStartTime.load()) / 1000000UL);
112110
const double fps = (double)mGlobalFrames / (total_time_ms / 1000.0);
113111
ILOG("==== %.2f ms, %u frames (%.2f fps) ====", total_time_ms, mGlobalFrames, fps);
114-
fprintf(out_fptr, "%.2f", fps);
115-
fclose(out_fptr);
112+
Json::Value out;
113+
out["fps"] = fps;
114+
out["frames"] = mGlobalFrames;
115+
out["time"] = total_time_ms;
116116
uint64_t runner = 0;
117117
uint64_t worker = 0;
118-
global_mutex.lock();
119118
for (unsigned i = 0; i < threads.size(); i++)
120119
{
121120
uint64_t runner_local = 0;
@@ -133,6 +132,9 @@ void lava_reader::finalize(bool terminate)
133132
assert(stop_process_cpu_usage.tv_sec >= process_cpu_usage.tv_sec);
134133
const uint64_t process_time = diff_timespec(&stop_process_cpu_usage, &process_cpu_usage);
135134
ILOG("CPU time spent in ms - readhead workers %lu, API runners %lu, full process %lu", (long unsigned)worker, (long unsigned)runner, (long unsigned)process_time);
135+
out["readahead_workers_time"] = (Json::Value::UInt64)worker;
136+
out["api_runners_time"] = (Json::Value::UInt64)runner;
137+
out["process_time"] = (Json::Value::UInt64)process_time;
136138
if (terminate)
137139
{
138140
for (auto& v : *thread_call_numbers) v = 0; // stop waiting threads from progressing
@@ -141,12 +143,12 @@ void lava_reader::finalize(bool terminate)
141143
if (!thread_streams[i]->terminated.load()) pthread_cancel(threads[i].native_handle());
142144
}
143145
}
144-
global_mutex.unlock();
146+
write_json(out_fptr, out);
147+
fclose(out_fptr);
145148
}
146149

147150
lava_file_reader& lava_reader::file_reader(uint16_t thread_id)
148151
{
149-
lava::lock_guard keep(global_mutex);
150152
return *thread_streams.at(thread_id);
151153
}
152154

@@ -201,10 +203,14 @@ void lava_reader::init(const std::string& path, int heap_size)
201203
}
202204

203205
Json::Value meta = packed_json("metadata.json", mPackedFile);
204-
mGlobalFrames = meta["global_frames"].asInt();
205206
const int num_threads = meta["threads"].asInt();
206-
global_mutex.lock();
207+
mGlobalFrames = meta["global_frames"].asInt();
208+
209+
// initialize threads -- note that this happens before threading begins, so thread safe
210+
threads.resize(num_threads);
211+
thread_streams.resize(num_threads);
207212
thread_call_numbers = new std::vector<std::atomic_uint_fast32_t>(num_threads);
213+
208214
for (int thread_id = 0; thread_id < num_threads; thread_id++)
209215
{
210216
Json::Value frameinfo = packed_json("frames_" + _to_string(thread_id) + ".json", path);
@@ -217,12 +223,10 @@ void lava_reader::init(const std::string& path, int heap_size)
217223
if (v["global_frame"] == mEnd + 1) { uncompressed_target = v["position"].asUInt(); break; }
218224
}
219225
}
220-
lava_file_reader* f = new lava_file_reader(this, mPackedFile, thread_id, mGlobalFrames, frameinfo, uncompressed_size, uncompressed_target, mStart, mEnd);
221-
thread_streams.emplace(thread_id, std::move(f));
226+
thread_streams[thread_id] = new lava_file_reader(this, mPackedFile, thread_id, mGlobalFrames, frameinfo, uncompressed_size, uncompressed_target, mStart, mEnd);
222227
}
223-
global_mutex.unlock();
224228

225-
out_fptr = fopen("lavaresults.txt", "w");
229+
out_fptr = fopen("lavaresults.json", "w");
226230
if (!out_fptr) ABORT("Failed to open results file: %s", strerror(errno));
227231
mStartTime.store(gettime());
228232
}

src/read.h

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,8 @@ class lava_reader
100100
std::atomic_uint64_t mStartTime{ 0 };
101101
/// Start CPU usage for whole process
102102
struct timespec process_cpu_usage;
103-
lava::mutex global_mutex;
104103
std::string mPackedFile;
105-
std::unordered_map<int, lava_file_reader*> thread_streams GUARDED_BY(global_mutex);
104+
std::vector<lava_file_reader*> thread_streams;
106105
int mStart = 0;
107106
int mEnd = -1;
108107
int mGlobalFrames = 0;
@@ -182,12 +181,10 @@ class lava_file_reader : public file_reader
182181
ELOG("Failed to get process CPU usage: %s", strerror(errno));
183182
}
184183
// Set start time in all threads
185-
parent->global_mutex.lock();
186184
for (unsigned i = 0; i < parent->threads.size(); i++)
187185
{
188186
parent->thread_streams[i]->start_measurement();
189187
}
190-
parent->global_mutex.unlock();
191188
}
192189
}
193190
current.frame++;

src/replay.cpp

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -115,17 +115,17 @@ static void replay_thread(int thread_id)
115115
}
116116
}
117117

118-
static void run_multithreaded(int n)
118+
static void run_multithreaded()
119119
{
120120
if (sandbox)
121121
{
122122
const char* err = sandbox_replay_start();
123123
if (err) WLOG("Warning: Failed to change sandbox to replay mode: %s", err);
124124
}
125125

126-
for (int i = 0; i < n; i++)
126+
for (unsigned i = 0; i < replayer.threads.size(); i++)
127127
{
128-
replayer.threads.emplace_back(replay_thread, i);
128+
replayer.threads[i] = std::thread(replay_thread, i);
129129
}
130130

131131
for (unsigned i = 0; i < replayer.threads.size(); i++)
@@ -318,10 +318,7 @@ int main(int argc, char **argv)
318318
exit(EXIT_SUCCESS);
319319
}
320320

321-
// Read all thread files
322-
std::vector<std::string> threadfiles = packed_files(filename, "thread_");
323-
if (threadfiles.size() == 0) DIE("Failed to find any threads in %s!", filename.c_str());
324-
run_multithreaded(threadfiles.size());
321+
run_multithreaded();
325322
if (p__custom_allocator) allocators_print(stdout);
326323
vkuDestroyWrapper(library);
327324
if (p__debug_destination) fclose(p__debug_destination);

src/tool.cpp

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -271,24 +271,20 @@ int main(int argc, char **argv)
271271
replayer.init(filename_input, heap_size);
272272
replayer.remap = validate_remap;
273273

274-
// Read all thread files
275-
std::vector<std::string> threadfiles = packed_files(filename_input, "thread_");
276-
if (threadfiles.size() == 0) DIE("Failed to find any threads in %s!", filename_input.c_str());
277-
278274
// Add callbacks
279275
vkCreateShaderModule_callbacks.push_back(callback_vkCreateShaderModule);
280276
vkDestroyDevice_callbacks.push_back(callback_vkDestroyDevice);
281277

282-
for (int i = 0; i < (int)threadfiles.size(); i++)
278+
for (unsigned i = 0; i < replayer.threads.size(); i++)
283279
{
284280
if (verbose)
285281
{
286282
printf("Threads:\n");
287283
Json::Value frameinfo = packed_json("frames_" + _to_string(i) + ".json", filename_input);
288-
printf("\t%d : [%s] with %u local frames, %d highest global frame, %u uncompressed size\n", i, frameinfo.get("thread_name", "unknown").asString().c_str(),
284+
printf("\t%u : [%s] with %u local frames, %d highest global frame, %u uncompressed size\n", i, frameinfo.get("thread_name", "unknown").asString().c_str(),
289285
(unsigned)frameinfo["frames"].size(), frameinfo["highest_global_frame"].asInt(), frameinfo["uncompressed_size"].asUInt());
290286
}
291-
replayer.threads.emplace_back(&replay_thread, &replayer, i);
287+
replayer.threads[i] = std::thread(&replay_thread, &replayer, i);
292288
}
293289
for (unsigned i = 0; i < replayer.threads.size(); i++)
294290
{
@@ -313,13 +309,9 @@ int main(int argc, char **argv)
313309
// Add in the rewrite queue from the previous run
314310
replayer.rewrite_queue = rewrite_queue_copy;
315311

316-
// Read all thread files
317-
std::vector<std::string> threadfiles = packed_files(filename_input, "thread_");
318-
if (threadfiles.size() == 0) DIE("Failed to find any threads in %s!", filename_input.c_str());
319-
320-
for (int i = 0; i < (int)threadfiles.size(); i++)
312+
for (unsigned i = 0; i < replayer.threads.size(); i++)
321313
{
322-
replayer.threads.emplace_back(&replay_thread, &replayer, i);
314+
replayer.threads[i] = std::thread(&replay_thread, &replayer, i);
323315
}
324316
for (unsigned i = 0; i < replayer.threads.size(); i++)
325317
{

src/util.cpp

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "util.h"
2+
#include "jsoncpp/json/writer.h"
23

34
#include <string.h>
45
#include <spirv/unified1/spirv.h>
@@ -386,3 +387,33 @@ const char* pretty_print_VkObjectType(VkObjectType val)
386387
}
387388
return "Error";
388389
}
390+
391+
#ifndef NO_JSON
392+
void write_json(FILE* fp, const Json::Value& v)
393+
{
394+
Json::StyledWriter writer;
395+
std::string data = writer.write(v);
396+
size_t written;
397+
int err = 0;
398+
do {
399+
written = fwrite(data.c_str(), data.size(), 1, fp);
400+
err = ferror(fp);
401+
} while (!err && !written);
402+
if (err)
403+
{
404+
ELOG("Failed to write dictionary: %s", strerror(err));
405+
}
406+
}
407+
408+
void write_json(const std::string& path, const Json::Value& v)
409+
{
410+
FILE* fp = fopen(path.c_str(), "w");
411+
if (!fp)
412+
{
413+
ELOG("Failed to open \"%s\": %s", path.c_str(), strerror(errno));
414+
return;
415+
}
416+
write_json(fp, v);
417+
fclose(fp);
418+
}
419+
#endif

src/util.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
#include <assert.h>
1616
#include <vector>
1717
#include <string>
18+
#ifndef NO_JSON
19+
#include "jsoncpp/json/value.h"
20+
#endif
1821

1922
#define VK_NO_PROTOTYPES
2023
#include "vulkan_utility.h"
@@ -251,3 +254,8 @@ typedef uint64_t (VKAPI_PTR *PFN_vkGetDeviceTracingObjectPropertyTRACETOOLTEST)(
251254
int get_env_int(const char* name, int v);
252255
int get_env_bool(const char* name, int v);
253256
FILE* get_env_file(const char* name, FILE* fallback);
257+
258+
#ifndef NO_JSON
259+
void write_json(const std::string& path, const Json::Value& v);
260+
void write_json(FILE* fp, const Json::Value& v);
261+
#endif

0 commit comments

Comments
 (0)