-
Notifications
You must be signed in to change notification settings - Fork 15
feat: chunks certification #24
base: main
Are you sure you want to change the base?
Changes from 3 commits
5d63c89
cf04a01
d158dff
439f555
cbe66b9
346fbdd
01f1c6a
e320560
e55fca2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -35,6 +35,8 @@ use std::{ | |
| Arc, Mutex, | ||
| }, | ||
| }; | ||
| use std::io::prelude::{Read}; | ||
| use flate2::read::{GzDecoder, DeflateDecoder}; | ||
|
|
||
| mod config; | ||
| mod logging; | ||
|
|
@@ -282,13 +284,20 @@ async fn forward_request( | |
|
|
||
| let mut certificate: Option<Result<Vec<u8>, ()>> = None; | ||
| let mut tree: Option<Result<Vec<u8>, ()>> = None; | ||
| let mut chunk_tree: Option<Vec<u8>> = None; | ||
| let mut chunk_index: String = String::from("0"); | ||
| let mut encoding: Option<String> = None; | ||
|
|
||
| let mut builder = Response::builder().status(StatusCode::from_u16(http_response.status_code)?); | ||
| for HeaderField(name, value) in http_response.headers { | ||
| if name.eq_ignore_ascii_case("IC-CERTIFICATE") { | ||
| for field in value.split(',') { | ||
| if let Some((_, name, b64_value)) = regex_captures!("^(.*)=:(.*):$", field.trim()) { | ||
| slog::trace!(logger, ">> certificate {}: {}", name, b64_value); | ||
| if name == "chunk_index" { | ||
| chunk_index = b64_value.to_string(); | ||
| continue; | ||
| } | ||
| let bytes = base64::decode(b64_value).map_err(|e| { | ||
| slog::warn!( | ||
| logger, | ||
|
|
@@ -335,9 +344,24 @@ async fn forward_request( | |
| bytes | ||
| } | ||
| }); | ||
| } else if name == "chunk_tree" { | ||
| chunk_tree = match (chunk_tree, bytes) { | ||
| (None, bytes) => bytes.ok(), | ||
| (Some(chunk_tree), Ok(bytes)) => { | ||
| slog::warn!(logger, "duplicate chunk_tree field: {:?}", bytes); | ||
| Some(chunk_tree) | ||
| }, | ||
| (Some(chunk_tree), Err(_)) => { | ||
| slog::warn!(logger, "duplicate chunk_tree field (failed to decode)"); | ||
| Some(chunk_tree) | ||
| }, | ||
| }; | ||
| } | ||
| } | ||
| } | ||
| } else if name.eq_ignore_ascii_case("CONTENT-ENCODING") { | ||
| let enc = value.trim().to_string(); | ||
| encoding = Some(enc); | ||
| } | ||
|
|
||
| builder = builder.header(&name, value); | ||
|
|
@@ -348,6 +372,40 @@ async fn forward_request( | |
| } else { | ||
| None | ||
| }; | ||
|
|
||
| let decoded_body = decode_body(&http_response.body, encoding.clone()); | ||
| let body_valid = match (certificate.clone(), tree.clone()) { | ||
|
||
| (Some(Ok(certificate)), Some(Ok(tree))) => match validate_body( | ||
| &certificate, | ||
| &tree, | ||
| chunk_tree, | ||
| chunk_index, | ||
| &canister_id, | ||
| &agent, | ||
| &uri, | ||
| &decoded_body, | ||
| logger.clone(), | ||
| ) { | ||
| Ok(valid) => valid, | ||
| Err(e) => { | ||
| return Ok(Response::builder() | ||
| .status(StatusCode::INTERNAL_SERVER_ERROR) | ||
| .body(format!("Certificate validation failed: {}", e).into()) | ||
| .unwrap()); | ||
| } | ||
| }, | ||
| (Some(_), _) | (_, Some(_)) => false, | ||
| // Canisters don't have to provide certified variables | ||
| (None, None) => true, | ||
| }; | ||
|
|
||
| if !body_valid && !cfg!(feature = "skip_body_verification") { | ||
| return Ok(Response::builder() | ||
| .status(StatusCode::INTERNAL_SERVER_ERROR) | ||
| .body("Body does not pass verification".into()) | ||
| .unwrap()); | ||
| } | ||
|
|
||
| let is_streaming = http_response.streaming_strategy.is_some(); | ||
| let response = if let Some(streaming_strategy) = http_response.streaming_strategy { | ||
| let (mut sender, body) = body::Body::channel(); | ||
|
|
@@ -359,6 +417,7 @@ async fn forward_request( | |
| let streaming_canister_id_id = callback.callback.principal; | ||
| let method_name = callback.callback.method; | ||
| let mut callback_token = callback.token; | ||
| let chunk_index = callback_token.index.0.to_str_radix(10); | ||
| let logger = logger.clone(); | ||
| tokio::spawn(async move { | ||
| let canister = HttpRequestCanister::create(&agent, streaming_canister_id_id); | ||
|
|
@@ -376,7 +435,41 @@ async fn forward_request( | |
| .call() | ||
| .await | ||
| { | ||
| Ok((StreamingCallbackHttpResponse { body, token },)) => { | ||
| Ok((StreamingCallbackHttpResponse { body, token, chunk_tree },)) => { | ||
| let body_valid = match (certificate.clone(), tree.clone(), chunk_tree) { | ||
| (Some(Ok(certificate)), Some(Ok(tree)), Some(chunk_tree)) => { | ||
| let decoded_chunk_tree = base64::decode(chunk_tree).map_err(|e| { | ||
| slog::warn!( | ||
| logger, | ||
| "Unable to decode chunk_tree from base64: {}", | ||
| e | ||
| ); | ||
| }).ok(); | ||
|
|
||
|
|
||
| let decoded_body = decode_body(&body, encoding.clone()); | ||
| validate_body( | ||
| &certificate, | ||
|
||
| &tree, | ||
| decoded_chunk_tree, | ||
| chunk_index.clone(), | ||
| &canister_id, | ||
| &agent, | ||
| &uri, | ||
| &decoded_body, | ||
| logger.clone(), | ||
| ) | ||
| .ok() | ||
| .unwrap_or(false) | ||
| }, | ||
| _ => false, | ||
| }; | ||
|
|
||
| if !body_valid && !cfg!(feature = "skip_body_verification") { | ||
| sender.abort(); | ||
| break; | ||
| } | ||
|
|
||
| if sender.send_data(Bytes::from(body)).await.is_err() { | ||
| sender.abort(); | ||
| break; | ||
|
|
@@ -400,35 +493,6 @@ async fn forward_request( | |
|
|
||
| builder.body(body)? | ||
| } else { | ||
| let body_valid = match (certificate, tree) { | ||
| (Some(Ok(certificate)), Some(Ok(tree))) => match validate_body( | ||
| &certificate, | ||
| &tree, | ||
| &canister_id, | ||
| &agent, | ||
| &uri, | ||
| &http_response.body, | ||
| logger.clone(), | ||
| ) { | ||
| Ok(valid) => valid, | ||
| Err(e) => { | ||
| return Ok(Response::builder() | ||
| .status(StatusCode::INTERNAL_SERVER_ERROR) | ||
| .body(format!("Certificate validation failed: {}", e).into()) | ||
| .unwrap()); | ||
| } | ||
| }, | ||
| (Some(_), _) | (_, Some(_)) => false, | ||
| // Canisters don't have to provide certified variables | ||
| (None, None) => true, | ||
| }; | ||
|
|
||
| if !body_valid && !cfg!(feature = "skip_body_verification") { | ||
| return Ok(Response::builder() | ||
| .status(StatusCode::INTERNAL_SERVER_ERROR) | ||
| .body("Body does not pass verification".into()) | ||
| .unwrap()); | ||
| } | ||
| builder.body(http_response.body.into())? | ||
| }; | ||
|
|
||
|
|
@@ -467,9 +531,30 @@ async fn forward_request( | |
| Ok(response) | ||
| } | ||
|
|
||
| fn decode_body(body: &[u8], encoding: Option<String>) -> Vec<u8> { | ||
Daniel-Bloom-dfinity marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| match encoding { | ||
| Some(enc) => match enc.as_str() { | ||
| "gzip" => { | ||
| let decoded: &mut Vec<u8> = &mut vec![]; | ||
| GzDecoder::new(body).read_to_end(decoded).unwrap(); | ||
| decoded.to_vec() | ||
| }, | ||
| "deflate" => { | ||
| let decoded: &mut Vec<u8> = &mut vec![]; | ||
| DeflateDecoder::new(body).read_to_end(decoded).unwrap(); | ||
| decoded.to_vec() | ||
| }, | ||
| _ => body.to_vec(), | ||
| }, | ||
| _ => body.to_vec(), | ||
| } | ||
| } | ||
|
|
||
| fn validate_body( | ||
| certificate: &[u8], | ||
| tree: &[u8], | ||
| chunk_tree: Option<Vec<u8>>, | ||
| chunk_index: String, | ||
| canister_id: &Principal, | ||
| agent: &Agent, | ||
| uri: &Uri, | ||
|
|
@@ -532,9 +617,38 @@ fn validate_body( | |
|
|
||
| let mut sha256 = Sha256::new(); | ||
| sha256.update(response_body); | ||
| let body_sha = sha256.finalize(); | ||
| let body_sha: [u8; 32] = sha256.finalize().into(); | ||
|
|
||
| if let Some(tree) = chunk_tree { | ||
| let chunk_tree: HashTree = serde_cbor::from_slice(&tree).map_err(AgentError::InvalidCborData)?; | ||
|
|
||
| Ok(&body_sha[..] == tree_sha) | ||
| let chunk_tree_digest = chunk_tree.digest(); | ||
|
|
||
| if chunk_tree_digest != tree_sha { | ||
| slog::trace!( | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe this is a warning not just trace? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same in 639 below. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A similar design is already used nearby. If it's really necessary, then I'll change it :) |
||
| logger, | ||
| ">> Invalid chunk_tree in the header. Digest does not equal tree_sha", | ||
| ); | ||
| return Ok(false); | ||
| } | ||
|
|
||
| let index_path = [chunk_index.into()]; | ||
| let chunk_sha = match chunk_tree.lookup_path(&index_path) { | ||
| LookupResult::Found(v) => v, | ||
| _ => { | ||
| slog::trace!( | ||
| logger, | ||
| ">> Invalid Tree in the header. Does not contain path {:?}", | ||
| path | ||
| ); | ||
| return Ok(false); | ||
| } | ||
| }; | ||
|
|
||
| Ok(body_sha == chunk_sha) | ||
| } else { | ||
| Ok(body_sha == tree_sha) | ||
| } | ||
| } | ||
|
|
||
| fn is_hop_header(name: &str) -> bool { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.