Skip to content

Commit b7bf1e9

Browse files
Merge pull request #9297 from vividsnow/master
[new] Perl/Feersum benchmark
2 parents ddd0952 + 0454c57 commit b7bf1e9

File tree

6 files changed

+439
-0
lines changed

6 files changed

+439
-0
lines changed

frameworks/Perl/feersum/README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
# About
2+
3+
[Feersum](https://metacpan.org/dist/Feersum) - HTTP 1.0/1.1 handler for Perl based on
4+
[EV](https://metacpan.org/dist/EV) and [picohttpparser](https://github.com/h2o/picohttpparser)
5+
6+
# Requirements
7+
8+
* Perl 5.40)
9+
* [JSON::XS](https://metacpan.org/dist/JSON-XS)
10+
* [DBI](https://metacpan.org/dist/DBI)
11+
* [Text::Xslate](https://metacpan.org/dist/Text-Xslate)
12+
* [LMDB](https://metacpan.org/dist/LMDB_File)

frameworks/Perl/feersum/app.pl

Lines changed: 334 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,334 @@
1+
use v5.40;
2+
use warnings;
3+
use Feersum::Runner;
4+
use EV; use AnyEvent;
5+
use DBI 'SQL_INTEGER';
6+
use DBD::Pg ':async';
7+
use Scalar::Util 'weaken';
8+
use List::Util qw'min max pairmap';
9+
use JSON::XS;
10+
use Text::Xslate;
11+
use LMDB_File qw':flags :error';
12+
13+
use constant {
14+
host_port => $ENV{host_port} || '0.0.0.0:8080',
15+
debug => $ENV{debug} // 0,
16+
db => lc($ENV{db} || 'postgres'), # postgres / mysql / maria (will use for constant folding)
17+
db_name => $ENV{db_name} || 'hello_world',
18+
db_host => $ENV{db_host} || 'tfb-database',
19+
db_port => $ENV{db_port},
20+
db_user => $ENV{db_user} || 'benchmarkdbuser',
21+
db_pass => $ENV{db_pass} || 'benchmarkdbpass',
22+
empty => [], o => +{},
23+
reconnect_interval => 60,
24+
max_db_connections => 512,
25+
max_update_tries => 3
26+
};
27+
use constant max_batch_update_size => 1; # db eq 'postgres' ? 5 : 10; # rule of thumb
28+
use constant server => qw'Server Feersum';
29+
use constant {
30+
text => [server, qw'Content-Type text/plain'],
31+
json => [server, qw'Content-Type application/json'],
32+
html => [server, 'Content-Type', 'text/html; charset=utf-8'],
33+
nocontent => [server],
34+
};
35+
36+
my @dsn = (
37+
(sprintf 'dbi:%s:port=%d;host=%s;database=%s;',
38+
(db eq 'mysql' ? ('mysql', db_port // 3306) :
39+
db eq 'maria' ? ('MariaDB', db_port // 3306) :
40+
db eq 'postgres' ? ('Pg', db_port // 5432)
41+
: die 'unknown db'), db_host, db_name),
42+
db_user, db_pass,
43+
+{qw'AutoCommit 1 RaiseError 0 PrintError 1',
44+
(db eq 'maria' ? (qw'mariadb_server_prepare 1 mariadb_ssl 0') :
45+
db eq 'mysql' ? (qw'mysql_server_prepare 1 mysql_ssl 0 mysql_get_server_pubkey 1') :
46+
db eq 'postgres' ? (qw'pg_server_prepare 1 sslmode 0') : ())}
47+
);
48+
49+
chomp(my $cpus = `nproc`);
50+
say "$cpus cpus available" if debug;
51+
my $pool_size = int max_db_connections / $cpus; # number of db connections in each worker
52+
my $js = JSON::XS->new;
53+
54+
my $html = render();
55+
cache('init');
56+
57+
my %prepare = (
58+
world => ['select randomNumber, id from World where id = ?', SQL_INTEGER],
59+
fortune => ['select id, message from Fortune'],
60+
update1 => ['update World set randomNumber = ? where id = ?', (SQL_INTEGER) x 2],
61+
(map {
62+
'update'.$_ =>
63+
[sprintf(
64+
(db eq 'mysql' || db eq 'maria') ? 'with t(v,i) as (values %s) update World w join t on t.i = w.id set w.randomNumber = t.v' :
65+
db eq 'postgres' ? 'with t(v,i) as (values %s) update World w set randomNumber = t.v from t where t.i = w.id' : undef,
66+
(join ',', ((db eq 'mysql' || db eq 'maria') ? 'row(?,?)' : '(?,?)') x $_)
67+
), (SQL_INTEGER) x ($_ * 2)]
68+
} 2..max_batch_update_size)
69+
);
70+
71+
my ($pool, $cache);
72+
my $w = EV::fork sub { # child init
73+
$pool = db_pool($pool_size, \@dsn, \%prepare); # db connection pool in each worker
74+
$cache = cache('use'); # cache
75+
};
76+
77+
my %route = controllers();
78+
79+
my $runner = Feersum::Runner->new(
80+
pre_fork => $cpus,
81+
quiet => !debug, keepalive => 1,
82+
max_connection_reqs => 1000,
83+
read_timeout => 60,
84+
listen => [host_port]
85+
)->run(sub ($h) { ($route{$h->path} // $route{404})->($h) });
86+
87+
sub controllers {(
88+
'/plaintext', sub ($h) { $h->send_response(200, text, \'Hello, World!') },
89+
'/json', sub ($h) { $h->send_response(200, json, \$js->encode(+{ message => 'Hello, World!' })) },
90+
(map +('/db', $_, '/queries', $_, '/updates', $_ ), sub ($h) {
91+
my ($n) = (my $q = $h->query // '') =~ m/queries=(\d+)/a;
92+
$n = max(1, min($n//1, 500));
93+
my ($cv, @rs) = (AE::cv);
94+
my $on_done = sub { $h->send_response(200, json, \$js->encode($q ? \@rs : ($rs[0] // o))) };
95+
$cv->begin(
96+
$h->path ne '/updates'
97+
? $on_done # select
98+
: sub { # update
99+
if (@rs) {
100+
my ($i, $j) = (0, 0);
101+
my $cv = AE::cv;
102+
$cv->begin($on_done);
103+
while () {
104+
$j = min($i + max_batch_update_size - 1, $#rs);
105+
say "$i $j" if debug;
106+
$cv->begin;
107+
$_->{randomNumber} = int(rand 10000) + 1 for @rs[$i..$j];
108+
my $tries = max_update_tries;
109+
my $st = 'update'.($j - $i + 1);
110+
my $args = [map @$_{qw/randomNumber id/}, @rs[$i..$j]];
111+
my $update = sub ($rv = undef, $sth_or_e = undef) {
112+
$cv->end, return if $rv;
113+
say 'retryin update on '.$sth_or_e if $tries < max_update_tries;
114+
say 'fail to update on '.max_update_tries.' tries ' and $cv->end unless $tries--;
115+
db_execute($pool, $st, $args, __SUB__);
116+
};
117+
$update->();
118+
$i += max_batch_update_size;
119+
last if $i >= @rs;
120+
}
121+
$cv->end;
122+
} else { $on_done->() }
123+
}
124+
);
125+
for (1..$n) {
126+
my $id = int(rand 10000) + 1;
127+
$cv->begin;
128+
db_execute($pool, world => [$id], sub ($rows, $sth) {
129+
push @rs, @{$sth->fetchall_arrayref(+{ randomNumber => 1, id => 1 })} if $rows > 0;
130+
$cv->end
131+
});
132+
}
133+
$cv->end
134+
}),
135+
'/fortunes' => sub ($h) {
136+
db_execute($pool, fortune => empty, sub ($rows, $sth) {
137+
$h->send_response(200, html, \$html->render('fortune.tx', +{ rows => [
138+
sort { $a->[1] cmp $b->[1] }
139+
@{$sth->fetchall_arrayref},
140+
[0, 'Additional fortune added at request time.']
141+
]}))
142+
});
143+
},
144+
'/cached-queries' => sub ($h) {
145+
my ($n) = (my $q = $h->query // '') =~ m/count=(\d+)/a;
146+
$n = max(1, min($n//1, 500));
147+
my @rs = map +{ id => $_ , randomNumber => $cache->($_) }, map int(rand 10000) + 1, 1..$n;
148+
$h->send_response(200, json, \$js->encode(\@rs));
149+
},
150+
'/' => sub ($h) { $h->send_response(204, nocontent, empty) },
151+
404 => sub ($h) { $h->send_response(404, nocontent, empty) }
152+
)}
153+
154+
sub render {
155+
my $t = Text::Xslate->new(path => +{
156+
(my $file = 'fortune.tx') => <<~\html =~ s/(?<=[\r\n])\s+//sgr
157+
<!DOCTYPE html>
158+
<html>
159+
<head><title>Fortunes</title></head>
160+
<body>
161+
<table>
162+
<tr><th>id</th><th>message</th></tr>
163+
: for $rows -> $i {
164+
<tr><td><: $i.0 :></td><td><: $i.1 :></td></tr>
165+
: }
166+
</table>
167+
</body>
168+
</html>
169+
html
170+
});
171+
$t->load_file($file);
172+
$t
173+
}
174+
175+
sub cache ($type = 'init') {
176+
my $path = '/dev/shm/feersum';
177+
say "clearing $path" and unlink glob "$path*" if $type eq 'init' && -e $path;
178+
my $env = LMDB::Env->new($path, +{
179+
mapsize => 1024*512,
180+
flags => MDB_WRITEMAP|MDB_NOSYNC|MDB_NOMETASYNC|MDB_NOTLS|MDB_NOSUBDIR|MDB_NORDAHEAD
181+
}) or die $LMDB_File::last_err;
182+
if ($type eq 'init') {
183+
die unless defined(my $tx = $env->BeginTxn);
184+
my $handle = $tx->open(undef, MDB_CREATE|MDB_INTEGERKEY);
185+
my $dbh = DBI->connect(@dsn);
186+
$tx->put($handle, $_->[0], pack S => $_->[1]) for @{$dbh->selectall_arrayref('select id, randomNumber from World')};
187+
$tx->commit;
188+
$dbh->disconnect;
189+
say 'cache populated' if debug;
190+
return;
191+
}
192+
my $tx = $env->BeginTxn(MDB_RDONLY);
193+
my $handle = $tx->open(undef, MDB_INTEGERKEY);
194+
sub ($k) { $tx->renew; $tx->get($handle, $k, my $v); unpack S => $v }
195+
}
196+
197+
sub db_pool ($size, $dsn, $prepare = undef) {
198+
my %pool = (slot => [], active => +{}, free => [], pending => [], prepare => $prepare);
199+
db_connect(\%pool, $_, $dsn) for 0 .. $size - 1;
200+
\%pool
201+
}
202+
203+
sub db_connect ($pool, $id, $dsn) {
204+
say "db[$id] connection.." if debug;
205+
my $dbh = DBI->connect(@$dsn);
206+
unless ($dbh) {
207+
warn sprintf 'err: %s. will try reconnect %d sec', $DBI::errstr, reconnect_interval;
208+
$pool->{slot}[$id] = AE::timer +reconnect_interval, 0, sub { db_connect($pool, $id, $dsn) }; # try later
209+
return
210+
}
211+
my $fd = db eq 'maria' ? $dbh->mariadb_sockfd : db eq 'mysql' ? $dbh->mysql_fd : db eq 'postgres' ? $dbh->{pg_socket} : undef;
212+
open my $fh, "<&=", $fd or die $!; # dup handle
213+
state $st_opt = +{
214+
db eq 'maria' ? (mariadb_async => 1) :
215+
db eq 'mysql' ? (async => 1) :
216+
db eq 'postgres' ? (pg_async => PG_ASYNC + PG_OLDQUERY_CANCEL) : ()
217+
};
218+
my %conn = (
219+
id => $id, db => $dbh, fd => $fd, fh => $fh, dsn => $dsn,
220+
st => +{ $pool->{prepare} ? (pairmap {
221+
my $sth = $dbh->prepare($b->[0], $st_opt);
222+
$sth->bind_param($_, undef, $b->[$_]) for 1..$#$b;
223+
($a, $sth)
224+
} %{$pool->{prepare}}) : () },
225+
connected => 1,
226+
);
227+
$conn{w} = EV::io $fh, EV::READ, sub {
228+
my $e;
229+
{ ;
230+
$e = 'inactive', last unless defined(my $st = $conn{active});
231+
if ($st) { # executed st
232+
$e = 'nost', last unless my $sth = $conn{st}{$st};
233+
$e = 'unready', last unless
234+
db eq 'maria' ? $sth->mariadb_async_ready :
235+
db eq 'mysql' ? $sth->mysql_async_ready :
236+
db eq 'postgres' ? $sth->pg_ready : undef;
237+
$e = 'noresult', $sth->finish unless defined(
238+
my $rows =
239+
db eq 'maria' ? $sth->mariadb_async_result :
240+
db eq 'mysql' ? $sth->mysql_async_result :
241+
db eq 'postgres' ? $sth->pg_result : undef
242+
);
243+
say "db[$id $fd] calling cb: ".$st if debug;
244+
if (my $cb = $conn{cb}) { $cb->($rows, $e // $sth) }
245+
else { say "db[$id $fd] no handler for response with $rows rows" }
246+
$sth->finish unless $e;
247+
} else { # db do
248+
$e = 'nodb', last unless my $dbh = $conn{db};
249+
$e = 'unready', last unless
250+
db eq 'maria' ? $dbh->mariadb_async_ready :
251+
db eq 'mysql' ? $dbh->mysql_async_ready :
252+
db eq 'postgres' ? $dbh->pg_ready : undef;
253+
$e = 'noresult' unless defined(
254+
my $rv =
255+
db eq 'maria' ? $dbh->mariadb_async_result :
256+
db eq 'mysql' ? $dbh->mysql_async_result :
257+
db eq 'postgres' ? $dbh->pg_result : undef
258+
);
259+
say "db[$id $fd] calling cb: db do query" if debug;
260+
if (my $cb = $conn{cb}) { $cb->($rv, $e) }
261+
else { say "db[$id $fd] no handler response with $rv return" }
262+
}
263+
say "db[$id $fd] error: $e " if debug && $e;
264+
say "db[$id $fd] finish" if debug;
265+
delete $conn{active};
266+
delete $pool->{active}{$id};
267+
push @{$pool->{free}}, \%conn;
268+
if (defined(my $pending = shift @{$pool->{pending}})) {
269+
my $code = shift @$pending;
270+
$code->($pool, splice @$pending)
271+
}
272+
return
273+
}
274+
say "db[$id $fd] $e" if debug;
275+
if (eof($fh) || (my $inactive = $e eq 'inactive')) {
276+
say "db[$id $fd] disconnected" if debug;
277+
delete @conn{qw/w connected/};
278+
$conn{db}->disconnect if $inactive;
279+
$conn{cb}->(-1, undef) if $conn{st} && $conn{active} && $conn{cb};
280+
db_connect($pool, $id, $dsn); # reconnect
281+
} else {
282+
say "db[$id $fd] stalled?";
283+
}
284+
};
285+
say "db[$id $fd] connected" if debug;
286+
$pool->{slot}[$id] = \%conn;
287+
weaken(my $weak = $pool->{slot}[$id]);
288+
push @{$pool->{free}}, $weak;
289+
if (defined(my $pending = shift @{$pool->{pending}})) {
290+
my $code = shift @$pending;
291+
$code->($pool, splice @$pending)
292+
}
293+
}
294+
295+
sub db_execute ($pool, $st, $args, $cb) {
296+
say 'db executing..' if debug;
297+
while (my $conn = shift @{$pool->{free}}) {
298+
(debug and say 'skip unconnected'), next unless defined($conn) && $conn->{connected};
299+
say 'on connection..'.$conn->{id} if debug;
300+
if ($conn->{st}{$st}->execute(@$args)) {
301+
(@$conn{qw/cb active/}, $pool->{active}{$conn->{id}}) = ($cb, $st, 1);
302+
return;
303+
} else {
304+
say 'error: ', $conn->{st}{$st}->errstr;
305+
db_connect($pool, @$conn{qw/id dsn/}); # reconnect
306+
next;
307+
}
308+
}
309+
say '..put to pending..' if debug;
310+
push @{$pool->{pending}}, [__SUB__, $st, $args, $cb];
311+
}
312+
313+
sub db_do ($pool, $query, $args, $cb) {
314+
say 'db doing..' if debug;
315+
state $db_opt = +{
316+
db eq 'maria' ? (mariadb_async => 1) :
317+
db eq 'mysql' ? (async => 1) :
318+
db eq 'postgres' ? (pg_async => PG_ASYNC + PG_OLDQUERY_CANCEL) : ()
319+
};
320+
while (my $conn = shift @{$pool->{free}}) {
321+
(debug and say 'skip unconnected'), next unless defined($conn) && $conn->{connected};
322+
say 'on connection..'.$conn->{id} if debug;
323+
if ($conn->{db}->do($query, $db_opt, defined($args) ? @$args : ())) {
324+
(@$conn{qw/cb active/}, $pool->{active}{$conn->{id}}) = ($cb, 0, 1);
325+
return;
326+
} else {
327+
say 'error: ', $conn->{db}->errstr;
328+
db_connect($pool, @$conn{qw/id dsn/}); # reconnect
329+
next;
330+
}
331+
}
332+
say '..put to pending..' if debug;
333+
push @{$pool->{pending}}, [__SUB__, $query, $args, $cb];
334+
}

0 commit comments

Comments
 (0)