Skip to content

Commit 63d1199

Browse files
committed
progress: Emit a Start message with API version, drop from fields
And use a semantic version for the API version as this allows a clearer evolution. Signed-off-by: Colin Walters <[email protected]>
1 parent c2a7d60 commit 63d1199

File tree

6 files changed

+103
-61
lines changed

6 files changed

+103
-61
lines changed

docs/src/experimental-progress-fd.md

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,8 @@ The format of data output over `--progress-fd` is [JSON Lines](https://jsonlines
1212
which is a series of JSON objects separated by newlines (the intermediate
1313
JSON content is guaranteed not to contain a literal newline).
1414

15-
The current API version is `org.containers.bootc/progress/v1`. You can find
16-
the JSON schema describing this version here:
17-
[progress-v1.schema.json](progress-v1.schema.json).
15+
You can find the JSON schema describing this version here:
16+
[progress-v0.schema.json](progress-v0.schema.json).
1817

1918
Deploying a new image with either switch or upgrade consists
2019
of three stages: `pulling`, `importing`, and `staging`. The `pulling` step

docs/src/progress-v1.schema.json renamed to docs/src/progress-v0.schema.json

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,29 @@
33
"title": "Event",
44
"description": "An event emitted as JSON.",
55
"oneOf": [
6+
{
7+
"type": "object",
8+
"required": [
9+
"type",
10+
"version"
11+
],
12+
"properties": {
13+
"type": {
14+
"type": "string",
15+
"enum": [
16+
"Start"
17+
]
18+
},
19+
"version": {
20+
"description": "The semantic version of the progress protocol.",
21+
"type": "string"
22+
}
23+
}
24+
},
625
{
726
"description": "An incremental update to a container image layer download",
827
"type": "object",
928
"required": [
10-
"api_version",
1129
"bytes",
1230
"bytes_cached",
1331
"bytes_total",
@@ -21,10 +39,6 @@
2139
"type"
2240
],
2341
"properties": {
24-
"api_version": {
25-
"description": "The version of the progress event format.",
26-
"type": "string"
27-
},
2842
"bytes": {
2943
"description": "The number of bytes already fetched.",
3044
"type": "integer",
@@ -92,7 +106,6 @@
92106
"description": "An incremental update with discrete steps",
93107
"type": "object",
94108
"required": [
95-
"api_version",
96109
"description",
97110
"id",
98111
"steps",
@@ -103,10 +116,6 @@
103116
"type"
104117
],
105118
"properties": {
106-
"api_version": {
107-
"description": "The version of the progress event format.",
108-
"type": "string"
109-
},
110119
"description": {
111120
"description": "A human readable description of the task if i18n is not available.",
112121
"type": "string"

lib/src/deploy.rs

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use ostree_ext::ostree::{self, Sysroot};
2121
use ostree_ext::sysroot::SysrootLock;
2222
use ostree_ext::tokio_util::spawn_blocking_cancellable_flatten;
2323

24-
use crate::progress_jsonl::{Event, ProgressWriter, SubTaskBytes, SubTaskStep, API_VERSION};
24+
use crate::progress_jsonl::{Event, ProgressWriter, SubTaskBytes, SubTaskStep};
2525
use crate::spec::ImageReference;
2626
use crate::spec::{BootOrder, HostSpec};
2727
use crate::status::labels_of_config;
@@ -214,7 +214,6 @@ async fn handle_layer_progress_print(
214214
subtask.bytes = layer_size;
215215
subtasks.push(subtask.clone());
216216
prog.send(Event::ProgressBytes {
217-
api_version: API_VERSION.into(),
218217
task: "pulling".into(),
219218
description: format!("Pulling Image: {digest}").into(),
220219
id: (*digest).into(),
@@ -245,7 +244,6 @@ async fn handle_layer_progress_print(
245244
byte_bar.set_position(bytes.fetched);
246245
subtask.bytes = byte_bar.position();
247246
prog.send_lossy(Event::ProgressBytes {
248-
api_version: API_VERSION.into(),
249247
task: "pulling".into(),
250248
description: format!("Pulling Image: {digest}").into(),
251249
id: (*digest).into(),
@@ -283,7 +281,6 @@ async fn handle_layer_progress_print(
283281
// use as a heuristic to begin import progress
284282
// Cannot be lossy or it is dropped
285283
prog.send(Event::ProgressSteps {
286-
api_version: API_VERSION.into(),
287284
task: "importing".into(),
288285
description: "Importing Image".into(),
289286
id: (*digest).into(),
@@ -358,7 +355,6 @@ pub(crate) async fn pull(
358355
let prog = printer.await?;
359356
// Both the progress and the import are done, so import is done as well
360357
prog.send(Event::ProgressSteps {
361-
api_version: API_VERSION.into(),
362358
task: "importing".into(),
363359
description: "Importing Image".into(),
364360
id: digest_imp.clone().as_ref().into(),
@@ -573,7 +569,6 @@ pub(crate) async fn stage(
573569
};
574570
let mut subtasks = vec![];
575571
prog.send(Event::ProgressSteps {
576-
api_version: API_VERSION.into(),
577572
task: "staging".into(),
578573
description: "Deploying Image".into(),
579574
id: image.manifest_digest.clone().as_ref().into(),
@@ -596,7 +591,6 @@ pub(crate) async fn stage(
596591
subtask.description = "Deploying Image".into();
597592
subtask.completed = false;
598593
prog.send(Event::ProgressSteps {
599-
api_version: API_VERSION.into(),
600594
task: "staging".into(),
601595
description: "Deploying Image".into(),
602596
id: image.manifest_digest.clone().as_ref().into(),
@@ -627,7 +621,6 @@ pub(crate) async fn stage(
627621
subtask.description = "Pulling Bound Images".into();
628622
subtask.completed = false;
629623
prog.send(Event::ProgressSteps {
630-
api_version: API_VERSION.into(),
631624
task: "staging".into(),
632625
description: "Deploying Image".into(),
633626
id: image.manifest_digest.clone().as_ref().into(),
@@ -650,7 +643,6 @@ pub(crate) async fn stage(
650643
subtask.description = "Removing old images".into();
651644
subtask.completed = false;
652645
prog.send(Event::ProgressSteps {
653-
api_version: API_VERSION.into(),
654646
task: "staging".into(),
655647
description: "Deploying Image".into(),
656648
id: image.manifest_digest.clone().as_ref().into(),
@@ -674,7 +666,6 @@ pub(crate) async fn stage(
674666
subtask.completed = true;
675667
subtasks.push(subtask.clone());
676668
prog.send(Event::ProgressSteps {
677-
api_version: API_VERSION.into(),
678669
task: "staging".into(),
679670
description: "Deploying Image".into(),
680671
id: image.manifest_digest.clone().as_ref().into(),

lib/src/progress_jsonl.rs

Lines changed: 75 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,15 @@ use tokio::sync::Mutex;
1616
// Maximum number of times per second that an event will be written.
1717
const REFRESH_HZ: u16 = 5;
1818

19-
pub const API_VERSION: &str = "org.containers.bootc.progress/v1";
19+
/// Semantic version of the protocol.
20+
const API_VERSION: &str = "0.1.0";
2021

2122
/// An incremental update to e.g. a container image layer download.
2223
/// The first time a given "subtask" name is seen, a new progress bar should be created.
2324
/// If bytes == bytes_total, then the subtask is considered complete.
24-
#[derive(Debug, serde::Serialize, serde::Deserialize, Default, Clone, JsonSchema)]
25+
#[derive(
26+
Debug, serde::Serialize, serde::Deserialize, Default, Clone, JsonSchema, PartialEq, Eq,
27+
)]
2528
#[serde(rename_all = "camelCase")]
2629
pub struct SubTaskBytes<'t> {
2730
/// A machine readable type for the task (used for i18n).
@@ -45,7 +48,9 @@ pub struct SubTaskBytes<'t> {
4548
}
4649

4750
/// Marks the beginning and end of a dictrete step
48-
#[derive(Debug, serde::Serialize, serde::Deserialize, Default, Clone, JsonSchema)]
51+
#[derive(
52+
Debug, serde::Serialize, serde::Deserialize, Default, Clone, JsonSchema, PartialEq, Eq,
53+
)]
4954
#[serde(rename_all = "camelCase")]
5055
pub struct SubTaskStep<'t> {
5156
/// A machine readable type for the task (used for i18n).
@@ -65,18 +70,20 @@ pub struct SubTaskStep<'t> {
6570
}
6671

6772
/// An event emitted as JSON.
68-
#[derive(Debug, serde::Serialize, serde::Deserialize, JsonSchema)]
73+
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, JsonSchema, PartialEq, Eq)]
6974
#[serde(
7075
tag = "type",
7176
rename_all = "PascalCase",
7277
rename_all_fields = "camelCase"
7378
)]
7479
pub enum Event<'t> {
80+
Start {
81+
/// The semantic version of the progress protocol.
82+
#[serde(borrow)]
83+
version: Cow<'t, str>,
84+
},
7585
/// An incremental update to a container image layer download
7686
ProgressBytes {
77-
/// The version of the progress event format.
78-
#[serde(borrow)]
79-
api_version: Cow<'t, str>,
8087
/// A machine readable type (e.g., pulling) for the task (used for i18n
8188
/// and UI customization).
8289
#[serde(borrow)]
@@ -106,9 +113,6 @@ pub enum Event<'t> {
106113
},
107114
/// An incremental update with discrete steps
108115
ProgressSteps {
109-
/// The version of the progress event format.
110-
#[serde(borrow)]
111-
api_version: Cow<'t, str>,
112116
/// A machine readable type (e.g., pulling) for the task (used for i18n
113117
/// and UI customization).
114118
#[serde(borrow)]
@@ -150,6 +154,8 @@ impl FromStr for RawProgressFd {
150154

151155
#[derive(Debug)]
152156
struct ProgressWriterInner {
157+
/// true if we sent the initial Start message
158+
sent_start: bool,
153159
last_write: Option<std::time::Instant>,
154160
fd: BufWriter<Sender>,
155161
}
@@ -171,6 +177,7 @@ impl TryFrom<OwnedFd> for ProgressWriter {
171177
impl From<Sender> for ProgressWriter {
172178
fn from(value: Sender) -> Self {
173179
let inner = ProgressWriterInner {
180+
sent_start: false,
174181
last_write: None,
175182
fd: BufWriter::new(value),
176183
};
@@ -190,6 +197,18 @@ impl TryFrom<RawProgressFd> for ProgressWriter {
190197
}
191198

192199
impl ProgressWriter {
200+
/// Serialize the target value as a single line of JSON and write it.
201+
async fn send_impl_inner<T: Serialize>(inner: &mut ProgressWriterInner, v: T) -> Result<()> {
202+
// serde is guaranteed not to output newlines here
203+
let buf = serde_json::to_vec(&v)?;
204+
inner.fd.write_all(&buf).await?;
205+
// We always end in a newline
206+
inner.fd.write_all(b"\n").await?;
207+
// And flush to ensure the remote side sees updates immediately
208+
inner.fd.flush().await?;
209+
Ok(())
210+
}
211+
193212
/// Serialize the target object to JSON as a single line
194213
pub(crate) async fn send_impl<T: Serialize>(&self, v: T, required: bool) -> Result<()> {
195214
let mut guard = self.inner.lock().await;
@@ -198,8 +217,17 @@ impl ProgressWriter {
198217
return Ok(());
199218
};
200219

220+
// If this is our first message, emit the Start message
221+
if !inner.sent_start {
222+
inner.sent_start = true;
223+
let start = Event::Start {
224+
version: API_VERSION.into(),
225+
};
226+
Self::send_impl_inner(inner, &start).await?;
227+
}
228+
201229
// For messages that can be dropped, if we already sent an update within this cycle, discard this one.
202-
// TODO: Also consider querying the pipe buffer and also dropping if we can't do this write.
230+
// TODO: Also consider querying the pipe buffer and also dropping if wqe can't do this write.
203231
let now = Instant::now();
204232
if !required {
205233
const REFRESH_MS: u32 = 1000 / REFRESH_HZ as u32;
@@ -210,22 +238,15 @@ impl ProgressWriter {
210238
}
211239
}
212240

213-
// SAFETY: Propagating panics from the mutex here is intentional
214-
// serde is guaranteed not to output newlines here
215-
let buf = serde_json::to_vec(&v)?;
216-
inner.fd.write_all(&buf).await?;
217-
// We always end in a newline
218-
inner.fd.write_all(b"\n").await?;
219-
// And flush to ensure the remote side sees updates immediately
220-
inner.fd.flush().await?;
241+
Self::send_impl_inner(inner, &v).await?;
221242
// Update the last write time
222243
inner.last_write = Some(now);
223244
Ok(())
224245
}
225246

226247
/// Send an event.
227-
pub(crate) async fn send<T: Serialize>(&self, v: T) {
228-
if let Err(e) = self.send_impl(v, true).await {
248+
pub(crate) async fn send(&self, event: Event<'_>) {
249+
if let Err(e) = self.send_impl(event, true).await {
229250
eprintln!("Failed to write to jsonl: {}", e);
230251
// Stop writing to fd but let process continue
231252
// SAFETY: Propagating panics from the mutex here is intentional
@@ -234,8 +255,8 @@ impl ProgressWriter {
234255
}
235256

236257
/// Send an event that can be dropped.
237-
pub(crate) async fn send_lossy<T: Serialize>(&self, v: T) {
238-
if let Err(e) = self.send_impl(v, false).await {
258+
pub(crate) async fn send_lossy(&self, event: Event<'_>) {
259+
if let Err(e) = self.send_impl(event, false).await {
239260
eprintln!("Failed to write to jsonl: {}", e);
240261
// Stop writing to fd but let process continue
241262
// SAFETY: Propagating panics from the mutex here is intentional
@@ -272,18 +293,30 @@ mod test {
272293
#[tokio::test]
273294
async fn test_jsonl() -> Result<()> {
274295
let testvalues = [
275-
S {
276-
s: "foo".into(),
277-
v: 42,
296+
Event::ProgressSteps {
297+
task: "sometask".into(),
298+
description: "somedesc".into(),
299+
id: "someid".into(),
300+
steps_cached: 0,
301+
steps: 0,
302+
steps_total: 3,
303+
subtasks: Vec::new(),
278304
},
279-
S {
280-
// Test with an embedded newline to sanity check that serde doesn't write it literally
281-
s: "foo\nbar".into(),
282-
v: 0,
305+
Event::ProgressBytes {
306+
task: "sometask".into(),
307+
description: "somedesc".into(),
308+
id: "someid".into(),
309+
bytes_cached: 0,
310+
bytes: 11,
311+
bytes_total: 42,
312+
steps_cached: 0,
313+
steps: 0,
314+
steps_total: 3,
315+
subtasks: Vec::new(),
283316
},
284317
];
285318
let (send, recv) = tokio::net::unix::pipe::pipe()?;
286-
let testvalues_sender = &testvalues;
319+
let testvalues_sender = testvalues.iter().cloned();
287320
let sender = async move {
288321
let w = ProgressWriter::try_from(send)?;
289322
for value in testvalues_sender {
@@ -296,10 +329,18 @@ mod test {
296329
let tf = BufReader::new(recv);
297330
let mut expected = testvalues.iter();
298331
let mut lines = tf.lines();
332+
let mut got_first = false;
299333
while let Some(line) = lines.next_line().await? {
300-
let found: S = serde_json::from_str(&line)?;
301-
let expected = expected.next().unwrap();
302-
assert_eq!(&found, expected);
334+
let found: Event = serde_json::from_str(&line)?;
335+
let expected_value = if !got_first {
336+
got_first = true;
337+
&Event::Start {
338+
version: API_VERSION.into(),
339+
}
340+
} else {
341+
expected.next().unwrap()
342+
};
343+
assert_eq!(&found, expected_value);
303344
}
304345
anyhow::Ok(())
305346
};

tests/booted/test-image-pushpull-upgrade.nu

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,12 @@ RUN echo test content > /usr/share/blah.txt
7575

7676
# This just does some basic verification of the progress JSON
7777
def sanity_check_switch_progress_json [data] {
78-
let first = $data.0;
7978
let event_count = $data | length
80-
assert equal $first.type ProgressBytes
81-
let steps = $first.stepsTotal
79+
# The first one should always be a start event
80+
let first = $data.0;
81+
assert equal $first.type Start
82+
let second = $data.1
83+
let steps = $second.stepsTotal
8284
mut i = 0
8385
for elt in $data {
8486
if $elt.type != "ProgressBytes" {

xtask/src/xtask.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ fn update_generated(sh: &Shell) -> Result<()> {
149149
}
150150
for (of, target) in [
151151
("host", "docs/src/host-v1.schema.json"),
152-
("progress", "docs/src/progress-v1.schema.json"),
152+
("progress", "docs/src/progress-v0.schema.json"),
153153
] {
154154
let schema = cmd!(sh, "cargo run -q -- internals print-json-schema --of={of}").read()?;
155155
std::fs::write(target, &schema)?;

0 commit comments

Comments
 (0)