Skip to content

Commit 8f76be9

Browse files
committed
std.parallelism support added
bench test output: ```bash Write benchmark: Normal: 42 μs and 4 hnsecs Parallel: 26 μs and 8 hnsecs Read benchmark: Normal: 36 μs and 2 hnsecs Parallel: 11 μs List benchmark: Normal: 31 μs and 1 hnsec Parallel: 17 μs and 8 hnsecs 1 modules passed unittests ```
1 parent 08f9766 commit 8f76be9

File tree

2 files changed

+110
-14
lines changed

2 files changed

+110
-14
lines changed

bindings/d/source/opendal/operator.d

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,27 @@ module opendal.operator;
2222
import std.string: toStringz;
2323
import std.exception: enforce;
2424
import std.conv: to;
25+
import std.parallelism: task, TaskPool;
2526

2627
/// OpenDAL-C binding for D. (unsafe/@system)
2728
private import opendal.opendal_c;
2829

2930
struct Operator
3031
{
3132
private opendal_operator* op;
33+
private TaskPool taskPool;
34+
private bool enabledParallelism;
3235

33-
this(string scheme, OperatorOptions options) @trusted
36+
this(string scheme, OperatorOptions options, bool useParallel = false) @trusted
3437
{
3538
auto result = opendal_operator_new(scheme.toStringz, options.options);
3639
enforce(result.op !is null, "Failed to create Operator");
3740
enforce(result.error is null, "Error in Operator");
3841
op = result.op;
42+
enabledParallelism = useParallel;
43+
44+
if (enabledParallelism)
45+
taskPool = new TaskPool();
3946
}
4047

4148
void write(string path, ubyte[] data) @trusted
@@ -45,6 +52,27 @@ struct Operator
4552
enforce(error is null, "Error writing data");
4653
}
4754

55+
void writeParallel(string path, ubyte[] data) @safe
56+
{
57+
auto t = task!((Operator* op, string p, ubyte[] d) { op.write(p, d); })(&this, path, data);
58+
taskPool.put(t);
59+
t.yieldForce();
60+
}
61+
62+
ubyte[] readParallel(string path) @trusted
63+
{
64+
auto t = task!((Operator* op, string p) { return op.read(p); })(&this, path);
65+
taskPool.put(t);
66+
return t.yieldForce();
67+
}
68+
69+
Entry[] listParallel(string path) @trusted
70+
{
71+
auto t = task!((Operator* op, string p) { return op.list(p); })(&this, path);
72+
taskPool.put(t);
73+
return t.yieldForce();
74+
}
75+
4876
ubyte[] read(string path) @trusted
4977
{
5078
auto result = opendal_operator_read(op, path.toStringz);
@@ -112,6 +140,8 @@ struct Operator
112140
{
113141
if (op !is null)
114142
opendal_operator_free(op);
143+
if (enabledParallelism)
144+
taskPool.stop();
115145
}
116146
}
117147

bindings/d/source/opendal/package.d

Lines changed: 79 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,15 @@
1919

2020
module opendal;
2121

22-
version (D_BetterC)
23-
{
24-
version (LDC)
25-
{
26-
pragma(LDC_no_moduleinfo);
27-
pragma(LDC_no_typeinfo);
28-
}
29-
}
30-
3122
public import opendal.operator;
3223

3324
version (unittest)
3425
{
3526
@("Test basic Operator creation")
36-
unittest
27+
@safe unittest
3728
{
3829
/* Initialize a operator for "memory" backend, with no options */
39-
auto options = new OperatorOptions();
30+
OperatorOptions options = new OperatorOptions();
4031
Operator op = Operator("memory", options);
4132

4233
/* Prepare some data to be written */
@@ -48,10 +39,85 @@ version (unittest)
4839
/* We can read it out, make sure the data is the same */
4940
auto read_bytes = op.read("/testpath");
5041
assert(read_bytes.length == 24);
42+
assert(cast(string)read_bytes.idup == data);
43+
}
5144

52-
/* Lets print it out */
45+
@("Benchmark parallel and normal functions")
46+
@safe unittest
47+
{
48+
import std.exception: assertNotThrown;
49+
import std.file: tempDir;
50+
import std.path: buildPath;
51+
import std.datetime.stopwatch: StopWatch;
5352
import std.stdio: writeln;
5453

55-
writeln(cast(string)read_bytes.idup);
54+
auto options = new OperatorOptions();
55+
options.set("root", tempDir);
56+
auto op = Operator("fs", options, true);
57+
58+
auto testPath = buildPath(tempDir, "benchmark_test.txt");
59+
auto testData = cast(ubyte[])"Benchmarking OpenDAL async and normal functions".dup;
60+
61+
// Benchmark write operations
62+
StopWatch sw;
63+
64+
sw.start();
65+
assertNotThrown(op.write(testPath, testData));
66+
sw.stop();
67+
auto normalWriteTime = sw.peek();
68+
69+
sw.reset();
70+
sw.start();
71+
assertNotThrown(op.writeParallel(testPath, testData));
72+
sw.stop();
73+
auto parallelWriteTime = sw.peek();
74+
75+
// Benchmark read operations
76+
sw.reset();
77+
sw.start();
78+
auto normalReadData = op.read(testPath);
79+
sw.stop();
80+
auto normalReadTime = sw.peek();
81+
82+
sw.reset();
83+
sw.start();
84+
auto parallelReadData = op.readParallel(testPath);
85+
sw.stop();
86+
auto parallelReadTime = sw.peek();
87+
88+
// Benchmark list operations
89+
sw.reset();
90+
sw.start();
91+
op.list(tempDir);
92+
sw.stop();
93+
auto normalListTime = sw.peek();
94+
95+
sw.reset();
96+
sw.start();
97+
op.listParallel(tempDir);
98+
sw.stop();
99+
auto parallelListTime = sw.peek();
100+
101+
// Print benchmark results
102+
writeln("Write benchmark:");
103+
writeln(" Normal: ", normalWriteTime);
104+
writeln(" Parallel: ", parallelWriteTime);
105+
106+
writeln("Read benchmark:");
107+
writeln(" Normal: ", normalReadTime);
108+
writeln(" Parallel: ", parallelReadTime);
109+
110+
writeln("List benchmark:");
111+
writeln(" Normal: ", normalListTime);
112+
writeln(" Parallel: ", parallelListTime);
113+
114+
// Verify data integrity
115+
assert(normalReadData == testData);
116+
assert(parallelReadData == testData);
117+
118+
// Clean up
119+
op.removeObject(testPath);
120+
assert(!op.exists(testPath));
56121
}
122+
57123
}

0 commit comments

Comments
 (0)