Skip to content

Commit 56b0994

Browse files
committed
workshop: add "stdin" column
Allows passing arbitrary (binary) data to a job's stdin. For simplicity, this is implemented using a memfd instead of a pair of pipes.
1 parent 0204cf3 commit 56b0994

File tree

8 files changed

+23
-1
lines changed

8 files changed

+23
-1
lines changed

debian/changelog

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ cm4all-workshop (7.1) unstable; urgency=low
22

33
* workshop: disable progress pipe if "translate" is enabled
44
* workshop: read remaining output after job process exits
5+
* workshop: add "stdin" column
56

67
--
78

doc/index.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -536,6 +536,8 @@ The `jobs` table
536536
* ``env``: Additional environment variables. Some dangerous
537537
environment variables cannot be set, though, for example
538538
``LD_PRELOAD``.
539+
* ``stdin``: If not ``NULL``, then the process is started with a file
540+
handle on standard input that reads this data.
539541
* ``node_name``: Name of the node which is currently executing
540542
this job, or :samp:`NULL`.
541543
* ``node_timeout``: When this time stamp has passed, then the

sql/jobs.sql

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ CREATE TABLE jobs (
2929
args varchar(4096)[] NULL,
3030
-- environment variables in the form NAME=VALUE
3131
env varchar(4096)[] NULL,
32+
-- optional data fed into stdin
33+
stdin bytea NULL,
3234

3335
-- which cm4all-workshop node is executing this job?
3436
node_name varchar(256) NULL,

src/Migrate.cxx

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ MigrateWorkshopDatabase(Pg::Connection &c, const char *schema)
4747

4848
/* since Workshop 6.0.1 */
4949
c.Execute("ALTER TABLE jobs ADD COLUMN IF NOT EXISTS cpu_usage interval NULL");
50+
51+
/* since Workshop 7.1 */
52+
c.Execute("ALTER TABLE jobs ADD COLUMN IF NOT EXISTS stdin bytea NULL");
5053
}
5154

5255
static void

src/workshop/Job.hxx

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@
44

55
#pragma once
66

7+
#include "util/AllocatedArray.hxx"
8+
79
#include <chrono>
10+
#include <cstddef>
811
#include <string>
912
#include <forward_list>
1013

@@ -19,6 +22,8 @@ struct WorkshopJob {
1922

2023
std::forward_list<std::string> env;
2124

25+
AllocatedArray<std::byte> stdin;
26+
2227
explicit WorkshopJob(WorkshopQueue &_queue):queue(_queue) {}
2328

2429
WorkshopJob(WorkshopQueue &_queue, const char *_id, const char *_plan_name)

src/workshop/Operator.cxx

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include "net/EasyMessage.hxx"
2626
#include "net/SocketPair.hxx"
2727
#include "net/UniqueSocketDescriptor.hxx"
28+
#include "io/linux/MemFD.hxx"
2829
#include "io/FdHolder.hxx"
2930
#include "io/FileAt.hxx"
3031
#include "io/Open.hxx"
@@ -259,6 +260,9 @@ WorkshopOperator::Start2(std::size_t max_log_buffer,
259260
stderr_w, control_child,
260261
close_fds);
261262

263+
if (job.stdin != nullptr)
264+
p.stdin_fd = close_fds.Insert(CreateMemFD("stdin", job.stdin));
265+
262266
/* use a per-plan cgroup */
263267

264268
CgroupOptions cgroup;

src/workshop/PGQueue.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ pg_select_new_jobs(Pg::Connection &db,
8686
assert(plans_exclude != nullptr && *plans_exclude == '{');
8787
assert(plans_lowprio != nullptr && *plans_lowprio == '{');
8888

89-
const char *sql = "SELECT id,plan_name,args,env "
89+
const char *sql = "SELECT id,plan_name,args,env,stdin "
9090
"FROM jobs "
9191
"WHERE node_name IS NULL "
9292
"AND time_done IS NULL AND exit_status IS NULL "

src/workshop/Queue.cxx

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include "Plan.hxx"
99
#include "lib/fmt/RuntimeError.hxx"
1010
#include "pg/Array.hxx"
11+
#include "pg/Hex.hxx"
1112
#include "pg/Reflection.hxx"
1213
#include "event/Loop.hxx"
1314
#include "util/StringAPI.hxx"
@@ -88,6 +89,9 @@ MakeJob(WorkshopQueue &queue,
8889
job.args = Pg::DecodeArray(result.GetValue(row, 2));
8990
job.env = Pg::DecodeArray(result.GetValue(row, 3));
9091

92+
if (!result.IsValueNull(row, 4))
93+
job.stdin = Pg::DecodeHex(result.GetValueView(row, 4));
94+
9195
if (job.id.empty())
9296
throw std::runtime_error("Job has no id");
9397

@@ -459,6 +463,7 @@ WorkshopQueue::OnConnect()
459463
static constexpr const char *const required_jobs_columns[] = {
460464
"enabled",
461465
"log",
466+
"stdin",
462467
};
463468

464469
const char *schema = db.GetSchemaName().empty()

0 commit comments

Comments
 (0)