|
39 | 39 | #include "core/typedefs.h" |
40 | 40 |
|
41 | 41 | class CommandQueueMT { |
| 42 | + static const size_t MAX_COMMAND_SIZE = 1024; |
| 43 | + |
42 | 44 | struct CommandBase { |
43 | 45 | bool sync = false; |
44 | 46 | virtual void call() = 0; |
@@ -154,29 +156,38 @@ class CommandQueueMT { |
154 | 156 | } |
155 | 157 |
|
156 | 158 | void _flush() { |
| 159 | + MutexLock lock(mutex); |
| 160 | + |
157 | 161 | if (unlikely(flush_read_ptr)) { |
158 | 162 | // Re-entrant call. |
159 | 163 | return; |
160 | 164 | } |
161 | 165 |
|
162 | | - MutexLock lock(mutex); |
| 166 | + char cmd_backup[MAX_COMMAND_SIZE]; |
163 | 167 |
|
164 | 168 | while (flush_read_ptr < command_mem.size()) { |
165 | 169 | uint64_t size = *(uint64_t *)&command_mem[flush_read_ptr]; |
166 | | - flush_read_ptr += 8; |
| 170 | + flush_read_ptr += sizeof(uint64_t); |
| 171 | + |
167 | 172 | CommandBase *cmd = reinterpret_cast<CommandBase *>(&command_mem[flush_read_ptr]); |
| 173 | + |
| 174 | + // Protect against race condition between this thread |
| 175 | + // during the call to the command and other threads potentially |
| 176 | + // invalidating the pointer due to reallocs. |
| 177 | + memcpy(cmd_backup, (char *)cmd, size); |
| 178 | + |
168 | 179 | uint32_t allowance_id = WorkerThreadPool::thread_enter_unlock_allowance_zone(lock); |
169 | | - cmd->call(); |
| 180 | + ((CommandBase *)cmd_backup)->call(); |
170 | 181 | WorkerThreadPool::thread_exit_unlock_allowance_zone(allowance_id); |
171 | 182 |
|
172 | 183 | // Handle potential realloc due to the command and unlock allowance. |
173 | 184 | cmd = reinterpret_cast<CommandBase *>(&command_mem[flush_read_ptr]); |
174 | 185 |
|
175 | 186 | if (unlikely(cmd->sync)) { |
176 | 187 | sync_head++; |
177 | | - lock.~MutexLock(); // Give an opportunity to awaiters right away. |
| 188 | + lock.temp_unlock(); // Give an opportunity to awaiters right away. |
178 | 189 | sync_cond_var.notify_all(); |
179 | | - new (&lock) MutexLock(mutex); |
| 190 | + lock.temp_relock(); |
180 | 191 | // Handle potential realloc happened during unlock. |
181 | 192 | cmd = reinterpret_cast<CommandBase *>(&command_mem[flush_read_ptr]); |
182 | 193 | } |
@@ -210,20 +221,23 @@ class CommandQueueMT { |
210 | 221 | void push(T *p_instance, M p_method, Args &&...p_args) { |
211 | 222 | // Standard command, no sync. |
212 | 223 | using CommandType = Command<T, M, false, Args...>; |
| 224 | + static_assert(sizeof(CommandType) <= MAX_COMMAND_SIZE); |
213 | 225 | _push_internal<CommandType, false>(p_instance, p_method, std::forward<Args>(p_args)...); |
214 | 226 | } |
215 | 227 |
|
216 | 228 | template <typename T, typename M, typename... Args> |
217 | 229 | void push_and_sync(T *p_instance, M p_method, Args... p_args) { |
218 | 230 | // Standard command, sync. |
219 | 231 | using CommandType = Command<T, M, true, Args...>; |
| 232 | + static_assert(sizeof(CommandType) <= MAX_COMMAND_SIZE); |
220 | 233 | _push_internal<CommandType, true>(p_instance, p_method, std::forward<Args>(p_args)...); |
221 | 234 | } |
222 | 235 |
|
223 | 236 | template <typename T, typename M, typename R, typename... Args> |
224 | 237 | void push_and_ret(T *p_instance, M p_method, R *r_ret, Args... p_args) { |
225 | 238 | // Command with return value, sync. |
226 | 239 | using CommandType = CommandRet<T, M, R, Args...>; |
| 240 | + static_assert(sizeof(CommandType) <= MAX_COMMAND_SIZE); |
227 | 241 | _push_internal<CommandType, true>(p_instance, p_method, r_ret, std::forward<Args>(p_args)...); |
228 | 242 | } |
229 | 243 |
|
|
0 commit comments