From 080c9b8afecf171f84e187660fda3347937efcaa Mon Sep 17 00:00:00 2001 From: Ricardo Azpeitia Pimentel Date: Fri, 20 Jun 2025 09:27:32 -0500 Subject: [PATCH 01/11] Add check for buffer bound --- c_src/zstd_nif.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/c_src/zstd_nif.c b/c_src/zstd_nif.c index 894395e..a5540c5 100644 --- a/c_src/zstd_nif.c +++ b/c_src/zstd_nif.c @@ -250,7 +250,12 @@ static ERL_NIF_TERM zstd_nif_compress_stream(ErlNifEnv* env, int argc, const ERL return enif_make_tuple2(env, zstd_atom_error, zstd_atom_invalid); /* all output binary buffer */ - if (!(enif_alloc_binary(ZSTD_compressBound(in.size), &out))) { + size_t buffer_size = ZSTD_compressBound(in.size); + if(ZSTD_isError(buffer_size)) { + enif_release_binary(&in); + return enif_make_tuple2(env, zstd_atom_error, enif_make_string(env, ZSTD_getErrorName(buffer_size), ERL_NIF_LATIN1)); + } + if (!(enif_alloc_binary(buffer_size, &out))) { enif_release_binary(&in); return enif_make_tuple2(env, zstd_atom_error, zstd_atom_enomem); } From 99ee4135a394831b204fc95136afa4ac98b42c45 Mon Sep 17 00:00:00 2001 From: Ricardo Azpeitia Pimentel Date: Fri, 20 Jun 2025 11:58:12 -0500 Subject: [PATCH 02/11] Compress random bytes --- test/zstd_tests.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/zstd_tests.erl b/test/zstd_tests.erl index 51b78b8..724440b 100644 --- a/test/zstd_tests.erl +++ b/test/zstd_tests.erl @@ -13,7 +13,7 @@ zstd_test() -> zstd:compress(Data))). zstd_stream_test() -> - Bin = << <<"A">> || _ <- lists:seq(1, 1024 * 1024) >>, + Bin = rand:bytes(1000000), CStream = zstd:new_compression_stream(), ok = zstd:compression_stream_init(CStream, From e7069e7cedf0c177c215fb6d37ca1b02e6e3c4c1 Mon Sep 17 00:00:00 2001 From: Ricardo Azpeitia Pimentel Date: Fri, 20 Jun 2025 15:31:09 -0500 Subject: [PATCH 03/11] Make flush work when we have multithreading enabled --- c_src/zstd_nif.c | 48 ++++++++++++++++++++++++++++++--------------- src/zstd.erl | 16 ++++++++++++--- test/zstd_tests.erl | 12 ++++++++---- 3 files changed, 53 insertions(+), 23 deletions(-) diff --git a/c_src/zstd_nif.c b/c_src/zstd_nif.c index a5540c5..d8b1ff6 100644 --- a/c_src/zstd_nif.c +++ b/c_src/zstd_nif.c @@ -113,6 +113,7 @@ static ERL_NIF_TERM zstd_nif_init_compression_stream(ErlNifEnv* env, int argc, c int level = ZSTD_CLEVEL_DEFAULT; int window_log = 0; int enable_long_distance_matching = 0; + int num_workers = 0; size_t ret; ZSTD_CStream **pzcs; @@ -129,7 +130,11 @@ static ERL_NIF_TERM zstd_nif_init_compression_stream(ErlNifEnv* env, int argc, c return enif_make_badarg(env); /* extract the enable long distance matching if any */ - if ((argc == 4) && !(enif_get_int(env, argv[3], &enable_long_distance_matching))) + if ((argc >= 4) && !(enif_get_int(env, argv[3], &enable_long_distance_matching))) + return enif_make_badarg(env); + + /* extract the number of workers */ + if ((argc == 5) && !(enif_get_int(env, argv[4], &num_workers))) return enif_make_badarg(env); /* initialize the stream */ @@ -141,6 +146,8 @@ static ERL_NIF_TERM zstd_nif_init_compression_stream(ErlNifEnv* env, int argc, c return enif_make_tuple2(env, zstd_atom_error, enif_make_string(env, ZSTD_getErrorName(ret), ERL_NIF_LATIN1)); if (ZSTD_isError(ret = ZSTD_CCtx_setParameter(*pzcs, ZSTD_c_checksumFlag, 1))) return enif_make_tuple2(env, zstd_atom_error, enif_make_string(env, ZSTD_getErrorName(ret), ERL_NIF_LATIN1)); + if (ZSTD_isError(ret = ZSTD_CCtx_setParameter(*pzcs, ZSTD_c_nbWorkers, num_workers))) + return enif_make_tuple2(env, zstd_atom_error, enif_make_string(env, ZSTD_getErrorName(ret), ERL_NIF_LATIN1)); /* stream initialization successful */ return zstd_atom_ok; @@ -200,7 +207,6 @@ static ERL_NIF_TERM zstd_nif_reset_decompression_stream(ErlNifEnv* env, int argc } static ERL_NIF_TERM zstd_nif_flush_compression_stream(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) { - size_t ret; ErlNifBinary bin; ZSTD_CStream **pzcs; @@ -212,19 +218,28 @@ static ERL_NIF_TERM zstd_nif_flush_compression_stream(ErlNifEnv* env, int argc, if (!(enif_alloc_binary(ZSTD_CStreamOutSize(), &bin))) return enif_make_tuple2(env, zstd_atom_error, zstd_atom_enomem); - /* output buffer */ - ZSTD_outBuffer outbuf = { - .pos = 0, - .dst = bin.data, - .size = bin.size, - }; - - /* reset the stream */ - if (ZSTD_isError(ret = ZSTD_endStream(*pzcs, &outbuf))) - { - enif_release_binary(&bin); - return enif_make_tuple2(env, zstd_atom_error, enif_make_string(env, ZSTD_getErrorName(ret), ERL_NIF_LATIN1)); - } + int finished; + size_t offset = 0; + ZSTD_outBuffer outbuf; + do { + /* output buffer */ + outbuf.pos = 0; + outbuf.dst = bin.data + offset; + outbuf.size = bin.size - offset; + + /* ends the stream */ + size_t const remaining = ZSTD_endStream(*pzcs, &outbuf); + if (ZSTD_isError(remaining)) + { + enif_release_binary(&bin); + return enif_make_tuple2(env, zstd_atom_error, enif_make_string(env, ZSTD_getErrorName(remaining), ERL_NIF_LATIN1)); + } + finished = remaining == 0; + if(!finished) { + offset += ZSTD_CStreamOutSize(); + enif_realloc_binary(&bin, bin.size + ZSTD_CStreamOutSize()); + } + } while (!finished); /* transfer to binary object */ ERL_NIF_TERM binary = enif_make_binary(env, &bin); @@ -232,7 +247,7 @@ static ERL_NIF_TERM zstd_nif_flush_compression_stream(ErlNifEnv* env, int argc, /* remove unused spaces */ if (outbuf.pos < outbuf.size) - result = enif_make_sub_binary(env, binary, 0, outbuf.pos); + result = enif_make_sub_binary(env, binary, 0, bin.size - (outbuf.size - outbuf.pos)); /* construct the result tuple */ return enif_make_tuple2(env, zstd_atom_ok, result); @@ -436,6 +451,7 @@ static ErlNifFunc nif_funcs[] = { { "compression_stream_init" , 2, zstd_nif_init_compression_stream , ERL_DIRTY_JOB_CPU_BOUND }, { "compression_stream_init" , 3, zstd_nif_init_compression_stream , ERL_DIRTY_JOB_CPU_BOUND }, { "compression_stream_init" , 4, zstd_nif_init_compression_stream , ERL_DIRTY_JOB_CPU_BOUND }, + { "compression_stream_init" , 5, zstd_nif_init_compression_stream , ERL_DIRTY_JOB_CPU_BOUND }, { "decompression_stream_init" , 1, zstd_nif_init_decompression_stream , ERL_DIRTY_JOB_CPU_BOUND }, { "compression_stream_reset" , 2, zstd_nif_reset_compression_stream }, diff --git a/src/zstd.erl b/src/zstd.erl index 44e968f..ee31ba9 100644 --- a/src/zstd.erl +++ b/src/zstd.erl @@ -3,9 +3,10 @@ -export([compress/1, compress/2]). -export([decompress/1]). -export([new_compression_stream/0, compression_stream_init/1, compression_stream_init/2, - compression_stream_init/3, compression_stream_init/4, compression_stream_reset/1, - compression_stream_reset/2, stream_compress/2, stream_flush/1, new_decompression_stream/0, - decompression_stream_init/1, decompression_stream_reset/1, stream_decompress/2]). + compression_stream_init/3, compression_stream_init/4, compression_stream_init/5, + compression_stream_reset/1, compression_stream_reset/2, stream_compress/2, stream_flush/1, + new_decompression_stream/0, decompression_stream_init/1, decompression_stream_reset/1, + stream_decompress/2]). -on_load init/0. @@ -51,6 +52,15 @@ compression_stream_init(_Ref, _Level, _WindowLog) -> compression_stream_init(_Ref, _Level, _WindowLog, _EnableLongDistanceMatching) -> erlang:nif_error(?LINE). +-spec compression_stream_init(reference(), 0..22, integer(), integer(), integer()) -> + ok | {error, invalid | string()}. +compression_stream_init(_Ref, + _Level, + _WindowLog, + _EnableLongDistanceMatching, + _NumWorkers) -> + erlang:nif_error(?LINE). + -spec decompression_stream_init(reference()) -> ok | {error, invalid | string()}. decompression_stream_init(_Ref) -> erlang:nif_error(?LINE). diff --git a/test/zstd_tests.erl b/test/zstd_tests.erl index 724440b..b568d7e 100644 --- a/test/zstd_tests.erl +++ b/test/zstd_tests.erl @@ -3,6 +3,7 @@ -define(COMPRESSION_LEVEL, 5). -define(WINDOW_LOG, 23). -define(ENABLE_LONG_DISTANCE_MATCHING, 1). +-define(NUM_WORKERS, 8). -include_lib("eunit/include/eunit.hrl"). @@ -19,12 +20,15 @@ zstd_stream_test() -> zstd:compression_stream_init(CStream, ?COMPRESSION_LEVEL, ?WINDOW_LOG, - ?ENABLE_LONG_DISTANCE_MATCHING), + ?ENABLE_LONG_DISTANCE_MATCHING, + ?NUM_WORKERS), {ok, CompressionBin} = zstd:stream_compress(CStream, Bin), - {ok, FlushBin} = zstd:stream_flush(CStream), + {ok, LastBin} = zstd:stream_flush(CStream), DStream = zstd:new_decompression_stream(), ok = zstd:decompression_stream_init(DStream), {ok, DBin1} = zstd:stream_decompress(DStream, CompressionBin), - {ok, DBin2} = zstd:stream_decompress(DStream, FlushBin), - ?assertEqual(Bin, <>). + {ok, DBin2} = zstd:stream_decompress(DStream, LastBin), + DecompressBin = <>, + ?assertEqual(size(Bin), size(DecompressBin)), + ?assertEqual(Bin, DecompressBin). From 3a7627632c366a2eaae42681d39faaffd8688f4a Mon Sep 17 00:00:00 2001 From: Ricardo Azpeitia Pimentel Date: Fri, 20 Jun 2025 15:57:52 -0500 Subject: [PATCH 04/11] The CI doesn't support multhread zstd --- c_src/zstd_nif.c | 34 +++++++++++++++++----------------- test/zstd_tests.erl | 2 +- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/c_src/zstd_nif.c b/c_src/zstd_nif.c index d8b1ff6..978f8d0 100644 --- a/c_src/zstd_nif.c +++ b/c_src/zstd_nif.c @@ -222,23 +222,23 @@ static ERL_NIF_TERM zstd_nif_flush_compression_stream(ErlNifEnv* env, int argc, size_t offset = 0; ZSTD_outBuffer outbuf; do { - /* output buffer */ - outbuf.pos = 0; - outbuf.dst = bin.data + offset; - outbuf.size = bin.size - offset; - - /* ends the stream */ - size_t const remaining = ZSTD_endStream(*pzcs, &outbuf); - if (ZSTD_isError(remaining)) - { - enif_release_binary(&bin); - return enif_make_tuple2(env, zstd_atom_error, enif_make_string(env, ZSTD_getErrorName(remaining), ERL_NIF_LATIN1)); - } - finished = remaining == 0; - if(!finished) { - offset += ZSTD_CStreamOutSize(); - enif_realloc_binary(&bin, bin.size + ZSTD_CStreamOutSize()); - } + /* output buffer */ + outbuf.pos = 0; + outbuf.dst = bin.data + offset; + outbuf.size = bin.size - offset; + + /* ends the stream */ + size_t const remaining = ZSTD_endStream(*pzcs, &outbuf); + if (ZSTD_isError(remaining)) + { + enif_release_binary(&bin); + return enif_make_tuple2(env, zstd_atom_error, enif_make_string(env, ZSTD_getErrorName(remaining), ERL_NIF_LATIN1)); + } + finished = remaining == 0; + if(!finished) { + offset += ZSTD_CStreamOutSize(); + enif_realloc_binary(&bin, bin.size + ZSTD_CStreamOutSize()); + } } while (!finished); /* transfer to binary object */ diff --git a/test/zstd_tests.erl b/test/zstd_tests.erl index b568d7e..3c54195 100644 --- a/test/zstd_tests.erl +++ b/test/zstd_tests.erl @@ -3,7 +3,7 @@ -define(COMPRESSION_LEVEL, 5). -define(WINDOW_LOG, 23). -define(ENABLE_LONG_DISTANCE_MATCHING, 1). --define(NUM_WORKERS, 8). +-define(NUM_WORKERS, 0). -include_lib("eunit/include/eunit.hrl"). From a6c11faec40a482db41a64ee35b0d616e7b3b286 Mon Sep 17 00:00:00 2001 From: Ricardo Azpeitia Pimentel Date: Fri, 20 Jun 2025 16:02:33 -0500 Subject: [PATCH 05/11] Check if we run out of memory --- c_src/zstd_nif.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/c_src/zstd_nif.c b/c_src/zstd_nif.c index 978f8d0..362b081 100644 --- a/c_src/zstd_nif.c +++ b/c_src/zstd_nif.c @@ -237,7 +237,11 @@ static ERL_NIF_TERM zstd_nif_flush_compression_stream(ErlNifEnv* env, int argc, finished = remaining == 0; if(!finished) { offset += ZSTD_CStreamOutSize(); - enif_realloc_binary(&bin, bin.size + ZSTD_CStreamOutSize()); + if(!enif_realloc_binary(&bin, bin.size + ZSTD_CStreamOutSize())) { + enif_release_binary(&bin); + return enif_make_tuple2(env, zstd_atom_error, zstd_atom_enomem); + } + } } while (!finished); From 145ecc96fcaee4533c8be309279e8ba70141f347 Mon Sep 17 00:00:00 2001 From: Ricardo Azpeitia Pimentel Date: Fri, 20 Jun 2025 16:04:10 -0500 Subject: [PATCH 06/11] Remove space --- c_src/zstd_nif.c | 1 - 1 file changed, 1 deletion(-) diff --git a/c_src/zstd_nif.c b/c_src/zstd_nif.c index 362b081..e49ded6 100644 --- a/c_src/zstd_nif.c +++ b/c_src/zstd_nif.c @@ -241,7 +241,6 @@ static ERL_NIF_TERM zstd_nif_flush_compression_stream(ErlNifEnv* env, int argc, enif_release_binary(&bin); return enif_make_tuple2(env, zstd_atom_error, zstd_atom_enomem); } - } } while (!finished); From f555b97534d141458c7281a822c141e9fda9de04 Mon Sep 17 00:00:00 2001 From: Ricardo Azpeitia Pimentel Date: Fri, 20 Jun 2025 16:15:20 -0500 Subject: [PATCH 07/11] Add spaces --- c_src/zstd_nif.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/c_src/zstd_nif.c b/c_src/zstd_nif.c index e49ded6..e2f2933 100644 --- a/c_src/zstd_nif.c +++ b/c_src/zstd_nif.c @@ -270,8 +270,8 @@ static ERL_NIF_TERM zstd_nif_compress_stream(ErlNifEnv* env, int argc, const ERL /* all output binary buffer */ size_t buffer_size = ZSTD_compressBound(in.size); if(ZSTD_isError(buffer_size)) { - enif_release_binary(&in); - return enif_make_tuple2(env, zstd_atom_error, enif_make_string(env, ZSTD_getErrorName(buffer_size), ERL_NIF_LATIN1)); + enif_release_binary(&in); + return enif_make_tuple2(env, zstd_atom_error, enif_make_string(env, ZSTD_getErrorName(buffer_size), ERL_NIF_LATIN1)); } if (!(enif_alloc_binary(buffer_size, &out))) { enif_release_binary(&in); From 03953b2400d7280573ef575da8fb6a9b951dff9c Mon Sep 17 00:00:00 2001 From: Ricardo Azpeitia Pimentel Date: Fri, 20 Jun 2025 17:02:07 -0500 Subject: [PATCH 08/11] See if it works in the CI --- rebar.config | 6 ++++-- test/zstd_tests.erl | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/rebar.config b/rebar.config index 4f3cef4..1a0ed8f 100644 --- a/rebar.config +++ b/rebar.config @@ -6,9 +6,11 @@ {deps, []}. {pre_hooks, - [{"(linux|darwin|solaris)", compile, "make MOREFLAGS=-fPIC -C priv/zstd/lib libzstd.a"}, + [{"(linux|darwin|solaris)", + compile, + "make MOREFLAGS=-fPIC -C priv/zstd/lib libzstd.a-mt"}, {"(linux|darwin|solaris)", compile, "make -C c_src"}, - {"(freebsd)", compile, "gmake MOREFLAGS=-fPIC -C priv/zstd/lib libzstd.a"}, + {"(freebsd)", compile, "gmake MOREFLAGS=-fPIC -C priv/zstd/lib libzstd.a-mt"}, {"(freebsd)", compile, "gmake -C c_src"}]}. {post_hooks, diff --git a/test/zstd_tests.erl b/test/zstd_tests.erl index 3c54195..b568d7e 100644 --- a/test/zstd_tests.erl +++ b/test/zstd_tests.erl @@ -3,7 +3,7 @@ -define(COMPRESSION_LEVEL, 5). -define(WINDOW_LOG, 23). -define(ENABLE_LONG_DISTANCE_MATCHING, 1). --define(NUM_WORKERS, 0). +-define(NUM_WORKERS, 8). -include_lib("eunit/include/eunit.hrl"). From 105960598fdf5ae0b25ea6cc02c04ded2b896959 Mon Sep 17 00:00:00 2001 From: Ricardo Azpeitia Pimentel Date: Mon, 23 Jun 2025 11:00:38 -0600 Subject: [PATCH 09/11] Add safe guards --- c_src/zstd_nif.c | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/c_src/zstd_nif.c b/c_src/zstd_nif.c index e2f2933..c6ce698 100644 --- a/c_src/zstd_nif.c +++ b/c_src/zstd_nif.c @@ -118,7 +118,7 @@ static ERL_NIF_TERM zstd_nif_init_compression_stream(ErlNifEnv* env, int argc, c ZSTD_CStream **pzcs; /* extract the stream */ - if (!(enif_get_resource(env, argv[0], zstd_compression_stream_type, (void **)&pzcs))) + if (!(enif_get_resource(env, argv[0], zstd_compression_stream_type, (void **)&pzcs)) && *pzcs != NULL) return enif_make_tuple2(env, zstd_atom_error, zstd_atom_invalid); /* extract the compression level if any */ @@ -211,7 +211,7 @@ static ERL_NIF_TERM zstd_nif_flush_compression_stream(ErlNifEnv* env, int argc, ZSTD_CStream **pzcs; /* extract the stream */ - if (!(enif_get_resource(env, argv[0], zstd_compression_stream_type, (void **)&pzcs))) + if (!(enif_get_resource(env, argv[0], zstd_compression_stream_type, (void **)&pzcs)) && *pzcs != NULL) return enif_make_tuple2(env, zstd_atom_error, zstd_atom_invalid); /* allocate binary buffer */ @@ -263,7 +263,7 @@ static ERL_NIF_TERM zstd_nif_compress_stream(ErlNifEnv* env, int argc, const ERL ZSTD_CStream **pzcs; /* extract the stream */ - if (!(enif_get_resource(env, argv[0], zstd_compression_stream_type, (void **)&pzcs)) || + if ((!(enif_get_resource(env, argv[0], zstd_compression_stream_type, (void **)&pzcs)) && (*pzcs) != NULL) || !(enif_inspect_iolist_as_binary(env, argv[1], &in))) return enif_make_tuple2(env, zstd_atom_error, zstd_atom_invalid); @@ -386,11 +386,13 @@ static ERL_NIF_TERM zstd_nif_decompress_stream(ErlNifEnv* env, int argc, const E static void zstd_compression_stream_destructor(ErlNifEnv *env, void *stream) { ZSTD_CStream **handle = stream; ZSTD_freeCStream(*handle); + *handle = NULL; } static void zstd_decompression_stream_destructor(ErlNifEnv *env, void *stream) { ZSTD_DStream **handle = stream; ZSTD_freeDStream(*handle); + *handle = NULL; } static int zstd_init(ErlNifEnv *env) { From 86d36eb04dea9987e539f883a184dfe5661600c3 Mon Sep 17 00:00:00 2001 From: Ricardo Azpeitia Pimentel Date: Tue, 24 Jun 2025 08:31:23 -0600 Subject: [PATCH 10/11] Test for parallel streaming crashes --- test/zstd_tests.erl | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/test/zstd_tests.erl b/test/zstd_tests.erl index b568d7e..917ed52 100644 --- a/test/zstd_tests.erl +++ b/test/zstd_tests.erl @@ -32,3 +32,19 @@ zstd_stream_test() -> DecompressBin = <>, ?assertEqual(size(Bin), size(DecompressBin)), ?assertEqual(Bin, DecompressBin). + +zstd_parallel_test() -> + Bin = rand:bytes(1_000_000), + CStream = zstd:new_compression_stream(), + ok = + zstd:compression_stream_init(CStream, + ?COMPRESSION_LEVEL, + ?WINDOW_LOG, + ?ENABLE_LONG_DISTANCE_MATCHING, + ?NUM_WORKERS), + Parent = self(), + F = fun() -> zstd:stream_compress(CStream, Bin), Parent ! done end, + lists:foreach(fun(_) -> spawn(F) end, lists:seq(1, 100)), + lists:foreach(fun(_) -> receive done -> ok end end, lists:seq(1, 100)), + {ok, _} = zstd:stream_flush(CStream), + ok. \ No newline at end of file From bdef1524ed222e66c2d8c2857aae424f8fdde8ac Mon Sep 17 00:00:00 2001 From: Ricardo Azpeitia Pimentel Date: Tue, 24 Jun 2025 08:31:36 -0600 Subject: [PATCH 11/11] Formatter --- test/zstd_tests.erl | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/test/zstd_tests.erl b/test/zstd_tests.erl index 917ed52..4620b7e 100644 --- a/test/zstd_tests.erl +++ b/test/zstd_tests.erl @@ -43,8 +43,17 @@ zstd_parallel_test() -> ?ENABLE_LONG_DISTANCE_MATCHING, ?NUM_WORKERS), Parent = self(), - F = fun() -> zstd:stream_compress(CStream, Bin), Parent ! done end, + F = fun() -> + zstd:stream_compress(CStream, Bin), + Parent ! done + end, lists:foreach(fun(_) -> spawn(F) end, lists:seq(1, 100)), - lists:foreach(fun(_) -> receive done -> ok end end, lists:seq(1, 100)), + lists:foreach(fun(_) -> + receive + done -> + ok + end + end, + lists:seq(1, 100)), {ok, _} = zstd:stream_flush(CStream), - ok. \ No newline at end of file + ok.