|
| 1 | +use v5.40; |
| 2 | +use warnings; |
| 3 | +use Feersum::Runner; |
| 4 | +use EV; use AnyEvent; |
| 5 | +use DBI 'SQL_INTEGER'; |
| 6 | +use DBD::Pg ':async'; |
| 7 | +use Scalar::Util 'weaken'; |
| 8 | +use List::Util qw'min max pairmap'; |
| 9 | +use JSON::XS; |
| 10 | +use Text::Xslate; |
| 11 | +use LMDB_File qw':flags :error'; |
| 12 | + |
| 13 | +use constant { |
| 14 | + host_port => $ENV{host_port} || '0.0.0.0:8080', |
| 15 | + debug => $ENV{debug} // 0, |
| 16 | + db => lc($ENV{db} || 'postgres'), # postgres / mysql / maria (will use for constant folding) |
| 17 | + db_name => $ENV{db_name} || 'hello_world', |
| 18 | + db_host => $ENV{db_host} || 'tfb-database', |
| 19 | + db_port => $ENV{db_port}, |
| 20 | + db_user => $ENV{db_user} || 'benchmarkdbuser', |
| 21 | + db_pass => $ENV{db_pass} || 'benchmarkdbpass', |
| 22 | + empty => [], o => +{}, |
| 23 | + reconnect_interval => 60, |
| 24 | + max_db_connections => 512, |
| 25 | + max_update_tries => 3 |
| 26 | +}; |
| 27 | +use constant max_batch_update_size => 1; # db eq 'postgres' ? 5 : 10; # rule of thumb |
| 28 | +use constant server => qw'Server Feersum'; |
| 29 | +use constant { |
| 30 | + text => [server, qw'Content-Type text/plain'], |
| 31 | + json => [server, qw'Content-Type application/json'], |
| 32 | + html => [server, 'Content-Type', 'text/html; charset=utf-8'], |
| 33 | + nocontent => [server], |
| 34 | +}; |
| 35 | + |
| 36 | +my @dsn = ( |
| 37 | + (sprintf 'dbi:%s:port=%d;host=%s;database=%s;', |
| 38 | + (db eq 'mysql' ? ('mysql', db_port // 3306) : |
| 39 | + db eq 'maria' ? ('MariaDB', db_port // 3306) : |
| 40 | + db eq 'postgres' ? ('Pg', db_port // 5432) |
| 41 | + : die 'unknown db'), db_host, db_name), |
| 42 | + db_user, db_pass, |
| 43 | + +{qw'AutoCommit 1 RaiseError 0 PrintError 1', |
| 44 | + (db eq 'maria' ? (qw'mariadb_server_prepare 1 mariadb_ssl 0') : |
| 45 | + db eq 'mysql' ? (qw'mysql_server_prepare 1 mysql_ssl 0 mysql_get_server_pubkey 1') : |
| 46 | + db eq 'postgres' ? (qw'pg_server_prepare 1 sslmode 0') : ())} |
| 47 | +); |
| 48 | + |
| 49 | +chomp(my $cpus = `nproc`); |
| 50 | +say "$cpus cpus available" if debug; |
| 51 | +my $pool_size = int max_db_connections / $cpus; # number of db connections in each worker |
| 52 | +my $js = JSON::XS->new; |
| 53 | + |
| 54 | +my $html = render(); |
| 55 | +cache('init'); |
| 56 | + |
| 57 | +my %prepare = ( |
| 58 | + world => ['select randomNumber, id from World where id = ?', SQL_INTEGER], |
| 59 | + fortune => ['select id, message from Fortune'], |
| 60 | + update1 => ['update World set randomNumber = ? where id = ?', (SQL_INTEGER) x 2], |
| 61 | + (map { |
| 62 | + 'update'.$_ => |
| 63 | + [sprintf( |
| 64 | + (db eq 'mysql' || db eq 'maria') ? 'with t(v,i) as (values %s) update World w join t on t.i = w.id set w.randomNumber = t.v' : |
| 65 | + db eq 'postgres' ? 'with t(v,i) as (values %s) update World w set randomNumber = t.v from t where t.i = w.id' : undef, |
| 66 | + (join ',', ((db eq 'mysql' || db eq 'maria') ? 'row(?,?)' : '(?,?)') x $_) |
| 67 | + ), (SQL_INTEGER) x ($_ * 2)] |
| 68 | + } 2..max_batch_update_size) |
| 69 | +); |
| 70 | + |
| 71 | +my ($pool, $cache); |
| 72 | +my $w = EV::fork sub { # child init |
| 73 | + $pool = db_pool($pool_size, \@dsn, \%prepare); # db connection pool in each worker |
| 74 | + $cache = cache('use'); # cache |
| 75 | +}; |
| 76 | + |
| 77 | +my %route = controllers(); |
| 78 | + |
| 79 | +my $runner = Feersum::Runner->new( |
| 80 | + pre_fork => $cpus, |
| 81 | + quiet => !debug, keepalive => 1, |
| 82 | + max_connection_reqs => 1000, |
| 83 | + read_timeout => 60, |
| 84 | + listen => [host_port] |
| 85 | +)->run(sub ($h) { ($route{$h->path} // $route{404})->($h) }); |
| 86 | + |
| 87 | +sub controllers {( |
| 88 | + '/plaintext', sub ($h) { $h->send_response(200, text, \'Hello, World!') }, |
| 89 | + '/json', sub ($h) { $h->send_response(200, json, \$js->encode(+{ message => 'Hello, World!' })) }, |
| 90 | + (map +('/db', $_, '/queries', $_, '/updates', $_ ), sub ($h) { |
| 91 | + my ($n) = (my $q = $h->query // '') =~ m/queries=(\d+)/a; |
| 92 | + $n = max(1, min($n//1, 500)); |
| 93 | + my ($cv, @rs) = (AE::cv); |
| 94 | + my $on_done = sub { $h->send_response(200, json, \$js->encode($q ? \@rs : ($rs[0] // o))) }; |
| 95 | + $cv->begin( |
| 96 | + $h->path ne '/updates' |
| 97 | + ? $on_done # select |
| 98 | + : sub { # update |
| 99 | + if (@rs) { |
| 100 | + my ($i, $j) = (0, 0); |
| 101 | + my $cv = AE::cv; |
| 102 | + $cv->begin($on_done); |
| 103 | + while () { |
| 104 | + $j = min($i + max_batch_update_size - 1, $#rs); |
| 105 | + say "$i $j" if debug; |
| 106 | + $cv->begin; |
| 107 | + $_->{randomNumber} = int(rand 10000) + 1 for @rs[$i..$j]; |
| 108 | + my $tries = max_update_tries; |
| 109 | + my $st = 'update'.($j - $i + 1); |
| 110 | + my $args = [map @$_{qw/randomNumber id/}, @rs[$i..$j]]; |
| 111 | + my $update = sub ($rv = undef, $sth_or_e = undef) { |
| 112 | + $cv->end, return if $rv; |
| 113 | + say 'retryin update on '.$sth_or_e if $tries < max_update_tries; |
| 114 | + say 'fail to update on '.max_update_tries.' tries ' and $cv->end unless $tries--; |
| 115 | + db_execute($pool, $st, $args, __SUB__); |
| 116 | + }; |
| 117 | + $update->(); |
| 118 | + $i += max_batch_update_size; |
| 119 | + last if $i >= @rs; |
| 120 | + } |
| 121 | + $cv->end; |
| 122 | + } else { $on_done->() } |
| 123 | + } |
| 124 | + ); |
| 125 | + for (1..$n) { |
| 126 | + my $id = int(rand 10000) + 1; |
| 127 | + $cv->begin; |
| 128 | + db_execute($pool, world => [$id], sub ($rows, $sth) { |
| 129 | + push @rs, @{$sth->fetchall_arrayref(+{ randomNumber => 1, id => 1 })} if $rows > 0; |
| 130 | + $cv->end |
| 131 | + }); |
| 132 | + } |
| 133 | + $cv->end |
| 134 | + }), |
| 135 | + '/fortunes' => sub ($h) { |
| 136 | + db_execute($pool, fortune => empty, sub ($rows, $sth) { |
| 137 | + $h->send_response(200, html, \$html->render('fortune.tx', +{ rows => [ |
| 138 | + sort { $a->[1] cmp $b->[1] } |
| 139 | + @{$sth->fetchall_arrayref}, |
| 140 | + [0, 'Additional fortune added at request time.'] |
| 141 | + ]})) |
| 142 | + }); |
| 143 | + }, |
| 144 | + '/cached-queries' => sub ($h) { |
| 145 | + my ($n) = (my $q = $h->query // '') =~ m/count=(\d+)/a; |
| 146 | + $n = max(1, min($n//1, 500)); |
| 147 | + my @rs = map +{ id => $_ , randomNumber => $cache->($_) }, map int(rand 10000) + 1, 1..$n; |
| 148 | + $h->send_response(200, json, \$js->encode(\@rs)); |
| 149 | + }, |
| 150 | + '/' => sub ($h) { $h->send_response(204, nocontent, empty) }, |
| 151 | + 404 => sub ($h) { $h->send_response(404, nocontent, empty) } |
| 152 | +)} |
| 153 | + |
| 154 | +sub render { |
| 155 | + my $t = Text::Xslate->new(path => +{ |
| 156 | + (my $file = 'fortune.tx') => <<~\html =~ s/(?<=[\r\n])\s+//sgr |
| 157 | + <!DOCTYPE html> |
| 158 | + <html> |
| 159 | + <head><title>Fortunes</title></head> |
| 160 | + <body> |
| 161 | + <table> |
| 162 | + <tr><th>id</th><th>message</th></tr> |
| 163 | + : for $rows -> $i { |
| 164 | + <tr><td><: $i.0 :></td><td><: $i.1 :></td></tr> |
| 165 | + : } |
| 166 | + </table> |
| 167 | + </body> |
| 168 | + </html> |
| 169 | + html |
| 170 | + }); |
| 171 | + $t->load_file($file); |
| 172 | + $t |
| 173 | +} |
| 174 | + |
| 175 | +sub cache ($type = 'init') { |
| 176 | + my $path = '/dev/shm/feersum'; |
| 177 | + say "clearing $path" and unlink glob "$path*" if $type eq 'init' && -e $path; |
| 178 | + my $env = LMDB::Env->new($path, +{ |
| 179 | + mapsize => 1024*512, |
| 180 | + flags => MDB_WRITEMAP|MDB_NOSYNC|MDB_NOMETASYNC|MDB_NOTLS|MDB_NOSUBDIR|MDB_NORDAHEAD |
| 181 | + }) or die $LMDB_File::last_err; |
| 182 | + if ($type eq 'init') { |
| 183 | + die unless defined(my $tx = $env->BeginTxn); |
| 184 | + my $handle = $tx->open(undef, MDB_CREATE|MDB_INTEGERKEY); |
| 185 | + my $dbh = DBI->connect(@dsn); |
| 186 | + $tx->put($handle, $_->[0], pack S => $_->[1]) for @{$dbh->selectall_arrayref('select id, randomNumber from World')}; |
| 187 | + $tx->commit; |
| 188 | + $dbh->disconnect; |
| 189 | + say 'cache populated' if debug; |
| 190 | + return; |
| 191 | + } |
| 192 | + my $tx = $env->BeginTxn(MDB_RDONLY); |
| 193 | + my $handle = $tx->open(undef, MDB_INTEGERKEY); |
| 194 | + sub ($k) { $tx->renew; $tx->get($handle, $k, my $v); unpack S => $v } |
| 195 | +} |
| 196 | + |
| 197 | +sub db_pool ($size, $dsn, $prepare = undef) { |
| 198 | + my %pool = (slot => [], active => +{}, free => [], pending => [], prepare => $prepare); |
| 199 | + db_connect(\%pool, $_, $dsn) for 0 .. $size - 1; |
| 200 | + \%pool |
| 201 | +} |
| 202 | + |
| 203 | +sub db_connect ($pool, $id, $dsn) { |
| 204 | + say "db[$id] connection.." if debug; |
| 205 | + my $dbh = DBI->connect(@$dsn); |
| 206 | + unless ($dbh) { |
| 207 | + warn sprintf 'err: %s. will try reconnect %d sec', $DBI::errstr, reconnect_interval; |
| 208 | + $pool->{slot}[$id] = AE::timer +reconnect_interval, 0, sub { db_connect($pool, $id, $dsn) }; # try later |
| 209 | + return |
| 210 | + } |
| 211 | + my $fd = db eq 'maria' ? $dbh->mariadb_sockfd : db eq 'mysql' ? $dbh->mysql_fd : db eq 'postgres' ? $dbh->{pg_socket} : undef; |
| 212 | + open my $fh, "<&=", $fd or die $!; # dup handle |
| 213 | + state $st_opt = +{ |
| 214 | + db eq 'maria' ? (mariadb_async => 1) : |
| 215 | + db eq 'mysql' ? (async => 1) : |
| 216 | + db eq 'postgres' ? (pg_async => PG_ASYNC + PG_OLDQUERY_CANCEL) : () |
| 217 | + }; |
| 218 | + my %conn = ( |
| 219 | + id => $id, db => $dbh, fd => $fd, fh => $fh, dsn => $dsn, |
| 220 | + st => +{ $pool->{prepare} ? (pairmap { |
| 221 | + my $sth = $dbh->prepare($b->[0], $st_opt); |
| 222 | + $sth->bind_param($_, undef, $b->[$_]) for 1..$#$b; |
| 223 | + ($a, $sth) |
| 224 | + } %{$pool->{prepare}}) : () }, |
| 225 | + connected => 1, |
| 226 | + ); |
| 227 | + $conn{w} = EV::io $fh, EV::READ, sub { |
| 228 | + my $e; |
| 229 | + { ; |
| 230 | + $e = 'inactive', last unless defined(my $st = $conn{active}); |
| 231 | + if ($st) { # executed st |
| 232 | + $e = 'nost', last unless my $sth = $conn{st}{$st}; |
| 233 | + $e = 'unready', last unless |
| 234 | + db eq 'maria' ? $sth->mariadb_async_ready : |
| 235 | + db eq 'mysql' ? $sth->mysql_async_ready : |
| 236 | + db eq 'postgres' ? $sth->pg_ready : undef; |
| 237 | + $e = 'noresult', $sth->finish unless defined( |
| 238 | + my $rows = |
| 239 | + db eq 'maria' ? $sth->mariadb_async_result : |
| 240 | + db eq 'mysql' ? $sth->mysql_async_result : |
| 241 | + db eq 'postgres' ? $sth->pg_result : undef |
| 242 | + ); |
| 243 | + say "db[$id $fd] calling cb: ".$st if debug; |
| 244 | + if (my $cb = $conn{cb}) { $cb->($rows, $e // $sth) } |
| 245 | + else { say "db[$id $fd] no handler for response with $rows rows" } |
| 246 | + $sth->finish unless $e; |
| 247 | + } else { # db do |
| 248 | + $e = 'nodb', last unless my $dbh = $conn{db}; |
| 249 | + $e = 'unready', last unless |
| 250 | + db eq 'maria' ? $dbh->mariadb_async_ready : |
| 251 | + db eq 'mysql' ? $dbh->mysql_async_ready : |
| 252 | + db eq 'postgres' ? $dbh->pg_ready : undef; |
| 253 | + $e = 'noresult' unless defined( |
| 254 | + my $rv = |
| 255 | + db eq 'maria' ? $dbh->mariadb_async_result : |
| 256 | + db eq 'mysql' ? $dbh->mysql_async_result : |
| 257 | + db eq 'postgres' ? $dbh->pg_result : undef |
| 258 | + ); |
| 259 | + say "db[$id $fd] calling cb: db do query" if debug; |
| 260 | + if (my $cb = $conn{cb}) { $cb->($rv, $e) } |
| 261 | + else { say "db[$id $fd] no handler response with $rv return" } |
| 262 | + } |
| 263 | + say "db[$id $fd] error: $e " if debug && $e; |
| 264 | + say "db[$id $fd] finish" if debug; |
| 265 | + delete $conn{active}; |
| 266 | + delete $pool->{active}{$id}; |
| 267 | + push @{$pool->{free}}, \%conn; |
| 268 | + if (defined(my $pending = shift @{$pool->{pending}})) { |
| 269 | + my $code = shift @$pending; |
| 270 | + $code->($pool, splice @$pending) |
| 271 | + } |
| 272 | + return |
| 273 | + } |
| 274 | + say "db[$id $fd] $e" if debug; |
| 275 | + if (eof($fh) || (my $inactive = $e eq 'inactive')) { |
| 276 | + say "db[$id $fd] disconnected" if debug; |
| 277 | + delete @conn{qw/w connected/}; |
| 278 | + $conn{db}->disconnect if $inactive; |
| 279 | + $conn{cb}->(-1, undef) if $conn{st} && $conn{active} && $conn{cb}; |
| 280 | + db_connect($pool, $id, $dsn); # reconnect |
| 281 | + } else { |
| 282 | + say "db[$id $fd] stalled?"; |
| 283 | + } |
| 284 | + }; |
| 285 | + say "db[$id $fd] connected" if debug; |
| 286 | + $pool->{slot}[$id] = \%conn; |
| 287 | + weaken(my $weak = $pool->{slot}[$id]); |
| 288 | + push @{$pool->{free}}, $weak; |
| 289 | + if (defined(my $pending = shift @{$pool->{pending}})) { |
| 290 | + my $code = shift @$pending; |
| 291 | + $code->($pool, splice @$pending) |
| 292 | + } |
| 293 | +} |
| 294 | + |
| 295 | +sub db_execute ($pool, $st, $args, $cb) { |
| 296 | + say 'db executing..' if debug; |
| 297 | + while (my $conn = shift @{$pool->{free}}) { |
| 298 | + (debug and say 'skip unconnected'), next unless defined($conn) && $conn->{connected}; |
| 299 | + say 'on connection..'.$conn->{id} if debug; |
| 300 | + if ($conn->{st}{$st}->execute(@$args)) { |
| 301 | + (@$conn{qw/cb active/}, $pool->{active}{$conn->{id}}) = ($cb, $st, 1); |
| 302 | + return; |
| 303 | + } else { |
| 304 | + say 'error: ', $conn->{st}{$st}->errstr; |
| 305 | + db_connect($pool, @$conn{qw/id dsn/}); # reconnect |
| 306 | + next; |
| 307 | + } |
| 308 | + } |
| 309 | + say '..put to pending..' if debug; |
| 310 | + push @{$pool->{pending}}, [__SUB__, $st, $args, $cb]; |
| 311 | +} |
| 312 | + |
| 313 | +sub db_do ($pool, $query, $args, $cb) { |
| 314 | + say 'db doing..' if debug; |
| 315 | + state $db_opt = +{ |
| 316 | + db eq 'maria' ? (mariadb_async => 1) : |
| 317 | + db eq 'mysql' ? (async => 1) : |
| 318 | + db eq 'postgres' ? (pg_async => PG_ASYNC + PG_OLDQUERY_CANCEL) : () |
| 319 | + }; |
| 320 | + while (my $conn = shift @{$pool->{free}}) { |
| 321 | + (debug and say 'skip unconnected'), next unless defined($conn) && $conn->{connected}; |
| 322 | + say 'on connection..'.$conn->{id} if debug; |
| 323 | + if ($conn->{db}->do($query, $db_opt, defined($args) ? @$args : ())) { |
| 324 | + (@$conn{qw/cb active/}, $pool->{active}{$conn->{id}}) = ($cb, 0, 1); |
| 325 | + return; |
| 326 | + } else { |
| 327 | + say 'error: ', $conn->{db}->errstr; |
| 328 | + db_connect($pool, @$conn{qw/id dsn/}); # reconnect |
| 329 | + next; |
| 330 | + } |
| 331 | + } |
| 332 | + say '..put to pending..' if debug; |
| 333 | + push @{$pool->{pending}}, [__SUB__, $query, $args, $cb]; |
| 334 | +} |
0 commit comments