Skip to content

Commit 7abf37c

Browse files
committed
github, server: stream outputs from S3 if possible, with range requests
1 parent 40edabf commit 7abf37c

File tree

9 files changed

+174
-55
lines changed

9 files changed

+174
-55
lines changed

bin/src/main.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -990,6 +990,8 @@ async fn do_job_copy(mut l: Level<Stuff>) -> Result<()> {
990990
async fn do_job_sign(mut l: Level<Stuff>) -> Result<()> {
991991
l.usage_args(Some("JOB SRC"));
992992

993+
l.optflag("x", "", "display extended information about signed URL");
994+
993995
let a = args!(l);
994996

995997
if a.args().len() != 2 {
@@ -998,6 +1000,7 @@ async fn do_job_sign(mut l: Level<Stuff>) -> Result<()> {
9981000

9991001
let job = a.args()[0].as_str();
10001002
let src = a.args()[1].as_str();
1003+
let ext = a.opts().opt_present("x");
10011004

10021005
let c = l.context().user();
10031006
for o in c.job_outputs_get().job(job).send().await?.into_inner() {
@@ -1016,7 +1019,15 @@ async fn do_job_sign(mut l: Level<Stuff>) -> Result<()> {
10161019
.await?
10171020
.into_inner();
10181021

1019-
println!("{}", su.url);
1022+
if ext {
1023+
let JobOutputSignedUrlResult { available, size, url } = su;
1024+
1025+
println!("url: {url}");
1026+
println!("size: {size}");
1027+
println!("available: {available}");
1028+
} else {
1029+
println!("{}", su.url);
1030+
}
10201031
return Ok(());
10211032
}
10221033

client/openapi.json

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3232,6 +3232,18 @@
32323232
"JobOutputSignedUrlResult": {
32333233
"type": "object",
32343234
"properties": {
3235+
"available": {
3236+
"description": "Has this file been uploaded to the object store yet?",
3237+
"default": false,
3238+
"type": "boolean"
3239+
},
3240+
"size": {
3241+
"description": "Size of the file in bytes.",
3242+
"default": 0,
3243+
"type": "integer",
3244+
"format": "uint64",
3245+
"minimum": 0
3246+
},
32353247
"url": {
32363248
"type": "string"
32373249
}

download/src/kinds/url.rs

Lines changed: 27 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -35,35 +35,40 @@ pub async fn stream_from_url(
3535
range: Option<PotentialRange>,
3636
head_only: bool,
3737
content_type: String,
38+
assume_file_size: Option<u64>,
3839
) -> Result<Option<Response<Body>>> {
3940
let log = log.new(o!("download" => "url"));
4041
let file_size: u64;
4142
let (mut body, want_bytes, crange) = if let Some(range) = range {
42-
/*
43-
* If this is a range request, we first need to determine the total file
44-
* size.
45-
*/
46-
let head = client.head(&url).send().await?;
47-
48-
/*
49-
* Beware the content_length() method on the response: on a HEAD request
50-
* it is (unhelpfully!) zero, because the body of the request should
51-
* ultimately be empty. Parse the actual Content-Length value
52-
* ourselves:
53-
*/
54-
file_size = head
55-
.headers()
56-
.get(CONTENT_LENGTH)
57-
.and_then(|hv| hv.to_str().ok())
58-
.and_then(|cl| cl.parse::<u64>().ok())
59-
.ok_or_else(|| anyhow!("no content-length in response"))?;
43+
if let Some(assumed) = assume_file_size {
44+
file_size = assumed;
45+
} else {
46+
/*
47+
* If this is a range request, we first need to determine the total
48+
* file size.
49+
*/
50+
let head = client.head(&url).send().await?;
6051

61-
if head.status() == StatusCode::NOT_FOUND {
6252
/*
63-
* If the backend returns 404, we want to be able to pass that on to
64-
* the client.
53+
* Beware the content_length() method on the response: on a HEAD
54+
* request it is (unhelpfully!) zero, because the body of the
55+
* request should ultimately be empty. Parse the actual
56+
* Content-Length value ourselves:
6557
*/
66-
return Ok(None);
58+
file_size = head
59+
.headers()
60+
.get(CONTENT_LENGTH)
61+
.and_then(|hv| hv.to_str().ok())
62+
.and_then(|cl| cl.parse::<u64>().ok())
63+
.ok_or_else(|| anyhow!("no content-length in response"))?;
64+
65+
if head.status() == StatusCode::NOT_FOUND {
66+
/*
67+
* If the backend returns 404, we want to be able to pass that
68+
* on to the client.
69+
*/
70+
return Ok(None);
71+
}
6772
}
6873

6974
match range.single_range(file_size) {

github/server/src/http.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2024 Oxide Computer Company
2+
* Copyright 2025 Oxide Computer Company
33
*/
44

55
use anyhow::{anyhow, bail, Result};
@@ -203,6 +203,7 @@ async fn artefact(
203203
let app = rc.context();
204204
let path = path.into_inner();
205205
let query = query.into_inner();
206+
let pr = rc.range();
206207

207208
let load = match path.load(&rc) {
208209
Ok(Some(load)) => load,
@@ -222,6 +223,7 @@ async fn artefact(
222223
&path.output,
223224
&path.name,
224225
query.format.as_deref(),
226+
pr,
225227
)
226228
.await
227229
}
@@ -910,6 +912,7 @@ async fn published_file_common(
910912
pr,
911913
head_only,
912914
guess_mime_type(&path.name),
915+
None,
913916
)
914917
.await
915918
{

github/server/src/variety/basic.rs

Lines changed: 71 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
/*
2-
* Copyright 2024 Oxide Computer Company
2+
* Copyright 2025 Oxide Computer Company
33
*/
44

55
use crate::{App, FlushOut, FlushState};
66
use anyhow::{bail, Result};
77
use buildomat_client::types::{DependSubmit, JobOutput};
88
use buildomat_common::*;
9+
use buildomat_download::PotentialRange;
910
use buildomat_github_database::{types::*, Database};
1011
use buildomat_jobsh::variety::basic::{output_sse, output_table, BasicConfig};
1112
use chrono::SecondsFormat;
@@ -19,6 +20,7 @@ use serde::{Deserialize, Serialize};
1920
use slog::{debug, error, info, o, trace, warn, Logger};
2021
use std::collections::{HashMap, VecDeque};
2122
use std::sync::Arc;
23+
use std::time::Duration;
2224
use tokio::io::{AsyncSeekExt, AsyncWriteExt};
2325

2426
const KILOBYTE: f64 = 1024.0;
@@ -939,6 +941,7 @@ pub(crate) async fn artefact(
939941
output: &str,
940942
name: &str,
941943
format: Option<&str>,
944+
pr: Option<PotentialRange>,
942945
) -> Result<Option<hyper::Response<Body>>> {
943946
let p: BasicPrivate = cr.get_private()?;
944947

@@ -951,10 +954,6 @@ pub(crate) async fn artefact(
951954
if let Some(id) = &p.buildomat_id {
952955
let bm = app.buildomat(&app.db.load_repository(cs.repo)?);
953956

954-
let backend =
955-
bm.job_output_download().job(id).output(output).send().await?;
956-
let cl = backend.content_length().unwrap();
957-
958957
/*
959958
* To try and help out the browser in deciding whether to display or
960959
* immediately download a particular file, we'll try to guess the
@@ -974,6 +973,10 @@ pub(crate) async fn artefact(
974973
bail!("cannot reformat a file that is not plain text");
975974
}
976975

976+
let backend =
977+
bm.job_output_download().job(id).output(output).send().await?;
978+
let cl = backend.content_length().unwrap();
979+
977980
if cl > MAX_RENDERED_LOG {
978981
bail!("file too large for reformat");
979982
}
@@ -1048,15 +1051,69 @@ pub(crate) async fn artefact(
10481051
));
10491052
}
10501053

1051-
return Ok(Some(
1052-
hyper::Response::builder()
1053-
.status(hyper::StatusCode::OK)
1054-
.header(hyper::header::CONTENT_TYPE, ct)
1055-
.header(hyper::header::CONTENT_LENGTH, cl)
1056-
.body(Body::wrap(StreamBody::new(
1057-
backend.into_inner_stream().map_ok(|b| Frame::data(b)),
1058-
)))?,
1059-
));
1054+
/*
1055+
* To improve efficiency, we can try to fetch the file directly from the
1056+
* object store instead of forwarding the request through the API
1057+
* server.
1058+
*/
1059+
let url = bm
1060+
.job_output_signed_url()
1061+
.job(id)
1062+
.output(output)
1063+
.body_map(|body| body.content_type(ct.clone()).expiry_seconds(120))
1064+
.send()
1065+
.await?;
1066+
if url.available {
1067+
let client = reqwest::ClientBuilder::new()
1068+
.timeout(Duration::from_secs(3600))
1069+
.tcp_keepalive(Duration::from_secs(60))
1070+
.connect_timeout(Duration::from_secs(15))
1071+
.build()?;
1072+
1073+
/*
1074+
* The file is available for streaming directly from the object
1075+
* store.
1076+
*/
1077+
match buildomat_download::stream_from_url(
1078+
&app.log,
1079+
format!("pre-signed job output: job {id}, output {output}"),
1080+
&client,
1081+
url.url.clone(),
1082+
pr,
1083+
false,
1084+
ct,
1085+
Some(url.size),
1086+
)
1087+
.await
1088+
{
1089+
Ok(res) => return Ok(res),
1090+
Err(e) => {
1091+
bail!(
1092+
"pre-signed job output: \
1093+
job {id} output {output}: {e:?}"
1094+
);
1095+
}
1096+
}
1097+
} else {
1098+
/*
1099+
* Fall back to a regular request through the core API server.
1100+
* XXX There is currently no good way to pass the range request to
1101+
* the backend here.
1102+
*/
1103+
let backend =
1104+
bm.job_output_download().job(id).output(output).send().await?;
1105+
let cl = backend.content_length().unwrap();
1106+
1107+
return Ok(Some(
1108+
hyper::Response::builder()
1109+
.status(hyper::StatusCode::OK)
1110+
.header(hyper::header::CONTENT_TYPE, ct)
1111+
.header(hyper::header::CONTENT_LENGTH, cl)
1112+
.body(Body::wrap(StreamBody::new(
1113+
backend.into_inner_stream().map_ok(|b| Frame::data(b)),
1114+
)))?,
1115+
));
1116+
}
10601117
}
10611118

10621119
Ok(None)

server/src/api/user.rs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,7 @@ pub(crate) async fn job_output_download(
361361
let owner = c.require_user(log, &rqctx.request).await?;
362362
let t = c.load_job_for_user(log, &owner, p.job()?).await?;
363363

364-
let o = c.load_job_output(log, &t, p.output()?).await.or_500()?;
364+
let (o, _) = c.load_job_output(log, &t, p.output()?).await.or_500()?;
365365

366366
let info = format!("job {} output {} path {:?}", t.id, o.id, o.path);
367367
c.file_response(log, info, t.id, o.id, pr, false).await
@@ -385,7 +385,7 @@ pub(crate) async fn job_output_head(
385385
let owner = c.require_user(log, &rqctx.request).await?;
386386
let t = c.load_job_for_user(log, &owner, p.job()?).await?;
387387

388-
let o = c.load_job_output(log, &t, p.output()?).await.or_500()?;
388+
let (o, _) = c.load_job_output(log, &t, p.output()?).await.or_500()?;
389389

390390
let info = format!("job {} output {} path {:?}", t.id, o.id, o.path);
391391
c.file_response(log, info, t.id, o.id, pr, true).await
@@ -401,6 +401,16 @@ pub(crate) struct JobOutputSignedUrl {
401401
#[derive(Serialize, JsonSchema)]
402402
pub(crate) struct JobOutputSignedUrlResult {
403403
url: String,
404+
/**
405+
* Has this file been uploaded to the object store yet?
406+
*/
407+
#[serde(default)]
408+
available: bool,
409+
/**
410+
* Size of the file in bytes.
411+
*/
412+
#[serde(default)]
413+
size: u64,
404414
}
405415

406416
#[endpoint {
@@ -429,7 +439,7 @@ pub(crate) async fn job_output_signed_url(
429439
let owner = c.require_user(log, &rqctx.request).await?;
430440
let t = c.load_job_for_user(log, &owner, p.job()?).await?;
431441

432-
let o = c.load_job_output(log, &t, p.output()?).await.or_500()?;
442+
let (o, f) = c.load_job_output(log, &t, p.output()?).await.or_500()?;
433443
let psu = c
434444
.file_presigned_url(
435445
t.id,
@@ -447,7 +457,11 @@ pub(crate) async fn job_output_signed_url(
447457
t.id, o.id, o.path, psu.info; "params" => ?b,
448458
);
449459

450-
Ok(HttpResponseOk(JobOutputSignedUrlResult { url: psu.url }))
460+
Ok(HttpResponseOk(JobOutputSignedUrlResult {
461+
url: psu.url,
462+
size: f.size.0,
463+
available: f.time_archived.is_some(),
464+
}))
451465
}
452466

453467
#[derive(Deserialize, JsonSchema)]
@@ -507,7 +521,7 @@ pub(crate) async fn job_output_publish(
507521
let owner = c.require_user(log, &rqctx.request).await?;
508522
let t = c.load_job_for_user(log, &owner, p.job()?).await?;
509523

510-
let o = c.load_job_output(log, &t, p.output()?).await.or_500()?;
524+
let (o, _) = c.load_job_output(log, &t, p.output()?).await.or_500()?;
511525

512526
info!(
513527
log,

server/src/archive/jobs.rs

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -622,19 +622,30 @@ impl LoadedArchivedJob {
622622
.collect::<Result<Vec<_>>>()
623623
}
624624

625-
pub fn job_output(&self, id: db::JobFileId) -> Result<db::JobOutput> {
625+
pub fn job_output(
626+
&self,
627+
id: db::JobFileId,
628+
) -> Result<(db::JobOutput, db::JobFile)> {
626629
let job = self.id;
627630

628631
self.job
629632
.outputs
630633
.iter()
631634
.find(|f| f.file.id().ok() == Some(id))
632635
.map(|f| {
633-
Ok(db::JobOutput {
634-
job,
635-
path: f.path.clone(),
636-
id: f.file.id()?,
637-
})
636+
Ok((
637+
db::JobOutput {
638+
job,
639+
id: f.file.id().unwrap(),
640+
path: f.path.clone(),
641+
},
642+
db::JobFile {
643+
job,
644+
id: f.file.id().unwrap(),
645+
size: db::DataSize(f.file.size),
646+
time_archived: Some(f.file.time_archived()?),
647+
},
648+
))
638649
})
639650
.ok_or_else(|| anyhow!("file {id} for job {job} not in archive"))?
640651
}

server/src/db/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1440,6 +1440,10 @@ impl Database {
14401440
self.sql.tx(|h| self.i_job_outputs(h, job))
14411441
}
14421442

1443+
pub fn job_file(&self, job: JobId, file: JobFileId) -> DBResult<JobFile> {
1444+
self.sql.tx(|h| h.get_row(JobFile::find(job, file)))
1445+
}
1446+
14431447
pub fn job_file_opt(
14441448
&self,
14451449
job: JobId,

0 commit comments

Comments
 (0)