Skip to content

Commit 2f94aad

Browse files
riptlripatel-fd
authored andcommitted
solfuzz: tile parallelism
1 parent 1854c9c commit 2f94aad

File tree

2 files changed

+207
-18
lines changed

2 files changed

+207
-18
lines changed

src/flamenco/runtime/tests/Local.mk

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ $(call add-hdrs,generated/context.pb.h,generated/elf.pb.h,generated/invoke.pb.h,
1212
$(call add-objs,generated/context.pb generated/elf.pb generated/invoke.pb generated/txn.pb generated/block.pb generated/vm.pb generated/type.pb generated/shred.pb generated/metadata.pb,fd_flamenco)
1313

1414
SOL_COMPAT_FLAGS:=-Wl,--undefined=fd_types_vt_by_name
15-
$(call make-unit-test,test_sol_compat,test_sol_compat,fd_flamenco_test fd_flamenco fd_funk fd_ballet fd_util fd_disco,$(SECP256K1_LIBS))
15+
$(call make-unit-test,test_sol_compat,test_sol_compat,fd_flamenco_test fd_flamenco fd_tango fd_funk fd_ballet fd_util fd_disco,$(SECP256K1_LIBS))
1616
$(call make-shared,libfd_exec_sol_compat.so,fd_sol_compat,fd_flamenco_test fd_flamenco fd_funk fd_ballet fd_util fd_disco,$(SECP256K1_LIBS) $(SOL_COMPAT_FLAGS))
1717

1818
run-runtime-backtest: $(OBJDIR)/bin/fd_ledger

src/flamenco/runtime/tests/test_sol_compat.c

Lines changed: 206 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,26 @@
11
#define _DEFAULT_SOURCE
22
#include "fd_solfuzz.h"
33
#include <errno.h>
4-
#include <dirent.h>
4+
#include <dirent.h> /* opendir */
55
#include <fcntl.h>
6+
#include <sched.h> /* sched_yield */
67
#include <sys/types.h>
7-
#include <sys/stat.h>
8-
#include <unistd.h>
8+
#include <sys/stat.h> /* fstat */
9+
#include <unistd.h> /* close */
910
#include "../fd_runtime.h"
1011
#include "../../../ballet/nanopb/pb_firedancer.h"
12+
#include "../../../tango/fd_tango.h"
13+
14+
#define MCACHE_DEPTH (256UL)
15+
#define MCACHE_FOOTPRINT FD_MCACHE_FOOTPRINT( MCACHE_DEPTH, 0UL )
16+
#define DCACHE_DATA_SZ FD_DCACHE_REQ_DATA_SZ( PATH_MAX, MCACHE_DEPTH, 1UL, 1 )
17+
#define DCACHE_FOOTPRINT FD_DCACHE_FOOTPRINT( DCACHE_DATA_SZ, 0UL )
1118

1219
static int fail_fast;
1320
static int error_occurred;
1421

22+
static uint shutdown_signal __attribute__((aligned(64)));
23+
1524
/* run_test runs a test.
1625
Return 1 on success, 0 on failure. */
1726
static int
@@ -82,7 +91,10 @@ run_test( fd_solfuzz_runner_t * runner,
8291

8392
/* Recursive dir walk function, follows symlinks */
8493

85-
typedef int (* visit_path)( void * ctx, char const * path );
94+
typedef int
95+
(* visit_path)( void * ctx,
96+
char const * path,
97+
ulong path_len );
8698

8799
static int
88100
recursive_walk1( DIR * dir,
@@ -123,7 +135,7 @@ recursive_walk1( DIR * dir,
123135
as_file:
124136
suffix = strstr( entry->d_name, ".fix" );
125137
if( !suffix || suffix[4]!='\0' ) continue;
126-
if( !visit( visit_ctx, path ) ) break;
138+
if( !visit( visit_ctx, path, sub_path_len ) ) break;
127139
} else if( entry->d_type==DT_LNK ) {
128140
subdir = opendir( path );
129141
if( subdir ) {
@@ -165,7 +177,9 @@ recursive_walk( char const * path,
165177

166178
static int
167179
visit_sync( void * ctx,
168-
char const * path ) {
180+
char const * path,
181+
ulong path_len ) {
182+
(void)path_len;
169183
fd_solfuzz_runner_t * runner = ctx;
170184
int ok = run_test( runner, path );
171185
if( !ok ) {
@@ -189,13 +203,171 @@ run_single_threaded( fd_solfuzz_runner_t * runner,
189203

190204
/* Multi-threaded mode: fan out tasks to bank of tiles */
191205

206+
struct walkdir_state {
207+
fd_frag_meta_t * mcache;
208+
uchar * dcache;
209+
210+
ulong depth;
211+
ulong chunk0;
212+
ulong wmark;
213+
214+
ulong seq;
215+
ulong chunk;
216+
ulong cr_avail;
217+
218+
ulong worker_cnt;
219+
ulong ** fseqs;
220+
};
221+
typedef struct walkdir_state walkdir_state_t;
222+
223+
static void
224+
walkdir_backpressure( walkdir_state_t * state ) {
225+
ulong const worker_cnt = state->worker_cnt;
226+
ulong const seq_pub = state->seq;
227+
ulong cr_avail = state->cr_avail;
228+
do {
229+
sched_yield();
230+
cr_avail = ULONG_MAX;
231+
for( ulong i=0UL; i<worker_cnt; i++ ) {
232+
long lag = fd_seq_diff( seq_pub, fd_fseq_query( state->fseqs[ i ] ) );
233+
/**/ lag = fd_long_max( lag, 0L );
234+
cr_avail = fd_ulong_min( cr_avail, MCACHE_DEPTH-(ulong)lag );
235+
}
236+
} while( !cr_avail );
237+
state->cr_avail = cr_avail;
238+
}
239+
240+
static int
241+
walkdir_publish( void * ctx,
242+
char const * path,
243+
ulong path_len ) {
244+
walkdir_state_t * state = ctx;
245+
if( FD_UNLIKELY( !state->cr_avail ) ) {
246+
/* Blocked on flow-control credits ... spin until they're replenished */
247+
walkdir_backpressure( state );
248+
/* Guaranteed to have more flow-control credits */
249+
}
250+
251+
/* Write data record */
252+
ulong chunk = state->chunk;
253+
char * msg = fd_chunk_to_laddr( state->dcache, chunk );
254+
fd_cstr_fini( fd_cstr_append_text( fd_cstr_init( msg ), path, path_len ) );
255+
state->chunk = fd_dcache_compact_next( chunk, path_len+1UL, state->chunk0, state->wmark );
256+
257+
/* Write frag descriptor */
258+
ulong seq = state->seq;
259+
fd_mcache_publish( state->mcache, state->depth, seq, 0UL, chunk, 0UL, 0UL, 0UL, 0UL );
260+
state->seq = fd_seq_inc( seq, 1UL );
261+
state->cr_avail--;
262+
return 1;
263+
}
264+
265+
static void
266+
walkdir_tile( fd_frag_meta_t * mcache,
267+
uchar * dcache,
268+
ulong ** fseqs,
269+
ulong worker_cnt,
270+
int argc,
271+
char ** argv ) {
272+
walkdir_state_t state = {
273+
.mcache = mcache,
274+
.dcache = dcache,
275+
.depth = fd_mcache_depth( mcache ),
276+
.chunk0 = fd_dcache_compact_chunk0( dcache, dcache ),
277+
.wmark = fd_dcache_compact_wmark ( dcache, dcache, PATH_MAX ),
278+
.seq = fd_mcache_seq0( mcache ),
279+
.worker_cnt = worker_cnt,
280+
.fseqs = fseqs
281+
};
282+
state.chunk = state.chunk0;
283+
state.cr_avail = state.depth;
284+
285+
for( int j=1; j<argc; j++ ) {
286+
int ok = recursive_walk( argv[ j ], walkdir_publish, &state );
287+
if( !ok ) {
288+
FD_LOG_WARNING(( "Stopping early" ));
289+
}
290+
}
291+
292+
fd_mcache_seq_update( fd_mcache_seq_laddr( state.mcache ), state.seq );
293+
}
294+
295+
struct mt_state {
296+
fd_solfuzz_runner_t ** runners;
297+
ulong worker_cnt;
298+
fd_frag_meta_t * mcache;
299+
uchar * dcache;
300+
ulong ** fseqs;
301+
};
302+
typedef struct mt_state mt_state_t;
303+
304+
static void
305+
exec_tile( fd_solfuzz_runner_t * runner,
306+
fd_frag_meta_t const * mcache,
307+
uchar * dcache,
308+
ulong * fseq,
309+
ulong idx,
310+
ulong cnt ) {
311+
ulong const depth = fd_mcache_depth( mcache );
312+
ulong seq = 0UL;
313+
for(;;) {
314+
fd_frag_meta_t const * mline = mcache + fd_mcache_line_idx( seq, depth );
315+
ulong seq_found = fd_frag_meta_seq_query( mline );
316+
if( FD_UNLIKELY( seq!=seq_found ) ) {
317+
if( FD_VOLATILE_CONST( shutdown_signal ) ) break;
318+
FD_SPIN_PAUSE();
319+
continue;
320+
}
321+
322+
if( seq%cnt==idx ) {
323+
char const * path = fd_chunk_to_laddr_const( dcache, mline->chunk );
324+
run_test( runner, path );
325+
}
326+
327+
seq = fd_seq_inc( seq, 1UL );
328+
FD_VOLATILE( fseq[0] ) = seq;
329+
}
330+
FD_VOLATILE( fseq[0] ) = seq;
331+
}
332+
333+
static int
334+
exec_task( int argc,
335+
char ** argv ) {
336+
ulong worker_idx = (ulong)argc;
337+
mt_state_t const * state = fd_type_pun_const( argv );
338+
exec_tile( state->runners[ worker_idx ], state->mcache, state->dcache, state->fseqs[ worker_idx ], worker_idx, state->worker_cnt );
339+
return 0;
340+
}
341+
192342
FD_FN_UNUSED static void
193343
run_multi_threaded( fd_solfuzz_runner_t ** runners,
194344
ulong worker_cnt,
195-
int argc,
196-
char ** argv ) {
197-
(void)runners; (void)worker_cnt; (void)argc; (void)argv;
198-
FD_LOG_WARNING(( "Multi-threaded mode not implemented yet" ));
345+
int argc,
346+
char ** argv,
347+
fd_frag_meta_t * mcache,
348+
uchar * dcache,
349+
ulong ** fseqs ) {
350+
mt_state_t state = {
351+
.runners = runners,
352+
.worker_cnt = worker_cnt,
353+
.mcache = mcache,
354+
.dcache = dcache,
355+
.fseqs = fseqs
356+
};
357+
358+
for( ulong i=0UL; i<worker_cnt; i++ ) {
359+
fd_tile_exec_new( 1UL+i, exec_task, (int)i, fd_type_pun( &state ) );
360+
}
361+
362+
walkdir_tile( mcache, dcache, fseqs, worker_cnt, argc, argv );
363+
FD_VOLATILE( shutdown_signal ) = 1;
364+
365+
for( ulong i=0UL; i<worker_cnt; i++ ) {
366+
fd_tile_exec_delete( fd_tile_exec_by_id( 1UL+i ), NULL );
367+
}
368+
369+
ulong cnt = fd_mcache_seq_query( fd_mcache_seq_laddr_const( mcache ) );
370+
FD_LOG_NOTICE(( "Processed %lu files", cnt ));
199371
}
200372

201373
int
@@ -228,20 +400,33 @@ main( int argc,
228400

229401
/* Allocate runners */
230402
int exit_code = 255;
231-
fd_solfuzz_runner_t ** runners = fd_wksp_alloc_laddr( wksp, alignof(void *), worker_cnt*sizeof(void *), 1UL );
232-
if( FD_UNLIKELY( !runners ) ) { FD_LOG_WARNING(( "init failed" )); goto exit; }
403+
fd_solfuzz_runner_t ** runners = fd_wksp_alloc_laddr( wksp, alignof(void *), worker_cnt*sizeof(void *), wksp_tag );
404+
void * mcache_mem = fd_wksp_alloc_laddr( wksp, fd_mcache_align(), MCACHE_FOOTPRINT, wksp_tag );
405+
void * dcache_mem = fd_wksp_alloc_laddr( wksp, fd_dcache_align(), DCACHE_FOOTPRINT, wksp_tag );
406+
uchar * fseqs_mem = fd_wksp_alloc_laddr( wksp, fd_fseq_align(), worker_cnt*FD_FSEQ_FOOTPRINT, wksp_tag );
407+
ulong ** fseqs = fd_wksp_alloc_laddr( wksp, alignof(void *), worker_cnt*sizeof(void *), wksp_tag );
408+
if( FD_UNLIKELY( !runners | !mcache_mem | !dcache_mem | !fseqs_mem | !fseqs ) ) {
409+
FD_LOG_WARNING(( "init failed" )); goto exit;
410+
}
233411
fd_memset( runners, 0, worker_cnt*sizeof(void *) );
234412
for( ulong i=0UL; i<worker_cnt; i++ ) {
235413
runners[i] = fd_solfuzz_runner_new( wksp, wksp_tag );
236414
if( FD_UNLIKELY( !runners[i] ) ) { FD_LOG_WARNING(( "init failed (creating worker %lu)", i )); goto exit; }
237415
}
238416

417+
/* Create objects */
418+
fd_frag_meta_t * mcache = fd_mcache_join( fd_mcache_new( mcache_mem, MCACHE_DEPTH, 0UL, 0UL ) ); FD_TEST( mcache );
419+
uchar * dcache = fd_dcache_join( fd_dcache_new( dcache_mem, DCACHE_DATA_SZ, 0UL ) ); FD_TEST( dcache );
420+
for( ulong i=0UL; i<worker_cnt; i++ ) {
421+
fseqs[i] = fd_fseq_join( fd_fseq_new( fseqs_mem + i*FD_FSEQ_FOOTPRINT, 0UL ) ); FD_TEST( fseqs[i] );
422+
}
423+
239424
/* Run strategy */
240-
//if( fd_tile_cnt()==1 ) {
425+
if( fd_tile_cnt()==1 ) {
241426
run_single_threaded( runners[0], argc, argv );
242-
//} else {
243-
// run_multi_threaded( runners, worker_cnt, argc, argv );
244-
//}
427+
} else {
428+
run_multi_threaded( runners, worker_cnt, argc, argv, mcache, dcache, fseqs );
429+
}
245430
if( error_occurred ) {
246431
if( fail_fast ) exit_code = 255;
247432
else exit_code = 1;
@@ -254,7 +439,11 @@ main( int argc,
254439
for( ulong i=0UL; runners && i<worker_cnt; i++ ) {
255440
if( runners[i] ) fd_solfuzz_runner_delete( runners[i] );
256441
}
257-
fd_wksp_free_laddr( runners );
442+
fd_wksp_free_laddr( runners );
443+
fd_wksp_free_laddr( mcache_mem );
444+
fd_wksp_free_laddr( dcache_mem );
445+
fd_wksp_free_laddr( fseqs_mem );
446+
fd_wksp_free_laddr( fseqs );
258447
if( wksp_name ) fd_wksp_detach( wksp );
259448
else fd_wksp_demand_paged_delete( wksp );
260449

0 commit comments

Comments
 (0)