Skip to content

Commit b2f1ecd

Browse files
sshaderConvex, Inc.
authored andcommitted
Make the backend send structured log lines (#24132)
This is to be landed once the dashboard and CLI have been updated to handle structured log lines. GitOrigin-RevId: d4e5e0a268ed593712f4dee9ce5ca6f6cf588d1f
1 parent f28236c commit b2f1ecd

File tree

5 files changed

+86
-35
lines changed

5 files changed

+86
-35
lines changed

crates/common/src/log_lines.rs

Lines changed: 37 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,41 @@ impl LogLine {
189189
}
190190
}
191191

192+
pub fn to_json(
193+
self,
194+
allow_structured: bool,
195+
include_system_metadata: bool,
196+
) -> anyhow::Result<JsonValue> {
197+
if !allow_structured {
198+
Ok(JsonValue::String(self.to_pretty_string()))
199+
} else {
200+
match self {
201+
LogLine::Unstructured(m) => Ok(JsonValue::String(m)),
202+
LogLine::Structured {
203+
messages,
204+
level,
205+
is_truncated,
206+
timestamp,
207+
system_metadata,
208+
} => {
209+
let system_metadata = if include_system_metadata {
210+
system_metadata
211+
} else {
212+
None
213+
};
214+
let log_line_json = LogLineJson {
215+
messages: messages.into(),
216+
is_truncated,
217+
timestamp: timestamp.as_ms_since_epoch()?,
218+
level: level.to_string(),
219+
system_metadata: system_metadata.map(SystemLogMetadataJson::from),
220+
};
221+
Ok(serde_json::to_value(log_line_json)?)
222+
},
223+
}
224+
}
225+
}
226+
192227
pub fn new_developer_log_line(
193228
level: LogLevel,
194229
messages: Vec<String>,
@@ -208,7 +243,7 @@ impl LogLine {
208243
let mut total_length = 0;
209244
let mut truncated_messages: Vec<String> = vec![];
210245
for message in messages {
211-
let remaining_space = MAX_LOG_LINE_LENGTH - TRUNCATED_LINE_SUFFIX.len() - total_length;
246+
let remaining_space = MAX_LOG_LINE_LENGTH - total_length;
212247
if message.len() <= remaining_space {
213248
total_length += message.len() + 1;
214249
truncated_messages.push(message);
@@ -396,25 +431,7 @@ impl TryFrom<LogLine> for JsonValue {
396431
type Error = anyhow::Error;
397432

398433
fn try_from(value: LogLine) -> Result<Self, Self::Error> {
399-
match value {
400-
LogLine::Unstructured(m) => Ok(JsonValue::String(m)),
401-
LogLine::Structured {
402-
messages,
403-
level,
404-
is_truncated,
405-
timestamp,
406-
system_metadata,
407-
} => {
408-
let log_line_json = LogLineJson {
409-
messages: messages.into(),
410-
is_truncated,
411-
timestamp: timestamp.as_ms_since_epoch()?,
412-
level: level.to_string(),
413-
system_metadata: system_metadata.map(SystemLogMetadataJson::from),
414-
};
415-
Ok(serde_json::to_value(log_line_json)?)
416-
},
417-
}
434+
value.to_json(true, true)
418435
}
419436
}
420437

crates/common/src/version.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,9 @@ pub enum ClientType {
8888
AirbyteExport,
8989
FivetranImport,
9090
FivetranExport,
91+
// For HTTP requests from the dashboard. Requests from the dashboard via a
92+
// Convex client will have an `NPM` version
93+
Dashboard,
9194
Unrecognized(String),
9295
}
9396

@@ -104,6 +107,7 @@ impl FromStr for ClientType {
104107
"streaming-import" => Self::StreamingImport,
105108
"airbyte-export" => Self::AirbyteExport,
106109
"fivetran-export" => Self::FivetranExport,
110+
"dashboard" => Self::Dashboard,
107111
unrecognized => Self::Unrecognized(unrecognized.to_string()),
108112
};
109113
Ok(client_type)
@@ -122,6 +126,7 @@ impl Display for ClientType {
122126
Self::AirbyteExport => write!(f, "airbyte-export"),
123127
Self::FivetranImport => write!(f, "fivetran-import"),
124128
Self::FivetranExport => write!(f, "fivetran-export"),
129+
Self::Dashboard => write!(f, "dashboard"),
125130
Self::Unrecognized(other_client) => write!(f, "{other_client}"),
126131
}
127132
}
@@ -138,6 +143,7 @@ impl ClientType {
138143
| Self::AirbyteExport
139144
| Self::FivetranImport
140145
| Self::FivetranExport
146+
| Self::Dashboard
141147
| Self::Unrecognized(_) => None,
142148
}
143149
}
@@ -152,6 +158,7 @@ impl ClientType {
152158
| Self::AirbyteExport
153159
| Self::FivetranImport
154160
| Self::FivetranExport
161+
| Self::Dashboard
155162
| Self::Unrecognized(_) => None,
156163
}
157164
}
@@ -172,6 +179,7 @@ impl ClientType {
172179
| Self::AirbyteExport
173180
| Self::FivetranImport
174181
| Self::FivetranExport
182+
| Self::Dashboard
175183
| Self::Unrecognized(_) => "",
176184
}
177185
}
@@ -274,6 +282,7 @@ impl ClientVersion {
274282
| ClientType::AirbyteExport
275283
| ClientType::FivetranImport
276284
| ClientType::FivetranExport
285+
| ClientType::Dashboard
277286
| ClientType::Unrecognized(_) => true,
278287
}
279288
}

crates/isolate/src/tests/adversarial.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -179,18 +179,18 @@ async fn test_console_line_too_long(rt: TestRuntime) -> anyhow::Result<()> {
179179
TRUNCATED_LINE_SUFFIX,
180180
);
181181
// The limit is 32768 MAX_LOG_LINE_LENGTH, but we don't count the [INFO]
182-
// prefix, so just check that we're close
182+
// prefix or the truncated line suffix, so just check that we're close
183183
assert!(log_lines[0].clone().to_pretty_string().len() > 32700);
184-
assert!(log_lines[0].clone().to_pretty_string().len() < 32800);
184+
assert!(log_lines[0].clone().to_pretty_string().len() < 32900);
185185

186186
assert_contains(
187187
&log_lines[1].clone().to_pretty_string(),
188188
TRUNCATED_LINE_SUFFIX,
189189
);
190190
// The limit is 32768 MAX_LOG_LINE_LENGTH, but we don't count the [INFO]
191-
// prefix, so just check that we're close
191+
// prefix or the truncated line suffix, so just check that we're close
192192
assert!(log_lines[1].clone().to_pretty_string().len() > 32700);
193-
assert!(log_lines[1].clone().to_pretty_string().len() < 32800);
193+
assert!(log_lines[1].clone().to_pretty_string().len() < 32900);
194194
Ok(())
195195
}
196196

crates/local_backend/src/logs.rs

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@ use common::{
1616
Json,
1717
Query,
1818
},
19+
ExtractClientVersion,
1920
HttpResponseError,
2021
},
22+
version::ClientType,
2123
RequestId,
2224
};
2325
use errors::ErrorMetadata;
@@ -45,7 +47,7 @@ pub enum FunctionExecutionJson {
4547
Completion {
4648
udf_type: String,
4749
identifier: String,
48-
log_lines: Vec<String>,
50+
log_lines: Vec<JsonValue>,
4951
timestamp: f64,
5052
cached_result: bool,
5153
execution_time: f64,
@@ -59,7 +61,7 @@ pub enum FunctionExecutionJson {
5961
udf_type: String,
6062
identifier: String,
6163
timestamp: f64,
62-
log_lines: Vec<String>,
64+
log_lines: Vec<JsonValue>,
6365
request_id: String,
6466
execution_id: String,
6567
},
@@ -86,8 +88,8 @@ pub async fn stream_udf_execution(
8688
let (log_entries, new_cursor) = entries_future_r?;
8789
let entries = log_entries
8890
.into_iter()
89-
.map(execution_to_json)
90-
.collect::<anyhow::Result<_>>()?;
91+
.map(|e| execution_to_json(e, false))
92+
.try_collect()?;
9193
let response = StreamUdfExecutionResponse {
9294
entries,
9395
new_cursor,
@@ -131,6 +133,7 @@ pub struct StreamFunctionLogs {
131133
pub async fn stream_function_logs(
132134
State(st): State<LocalAppState>,
133135
ExtractIdentity(identity): ExtractIdentity,
136+
ExtractClientVersion(client_version): ExtractClientVersion,
134137
Query(query_args): Query<StreamFunctionLogs>,
135138
) -> Result<impl IntoResponse, HttpResponseError> {
136139
let entries_future = st
@@ -144,6 +147,22 @@ pub async fn stream_function_logs(
144147
)),
145148
_ => None,
146149
};
150+
// As of writing, this endpoint is only used by the CLI and dashboard, both of
151+
// which support either unstructured `string` log lines or structured log
152+
// lines.
153+
let supports_structured_log_lines = match client_version.client() {
154+
ClientType::CLI => true,
155+
ClientType::Dashboard => true,
156+
ClientType::NPM
157+
| ClientType::Actions
158+
| ClientType::Python
159+
| ClientType::Rust
160+
| ClientType::StreamingImport
161+
| ClientType::AirbyteExport
162+
| ClientType::FivetranImport
163+
| ClientType::FivetranExport
164+
| ClientType::Unrecognized(_) => false,
165+
};
147166
futures::select_biased! {
148167
entries_future_r = entries_future.fuse() => {
149168
let (log_entries, new_cursor) = entries_future_r?;
@@ -166,15 +185,17 @@ pub async fn stream_function_logs(
166185
.map(|e| {
167186
let json = match e {
168187
FunctionExecutionPart::Completion(c) => {
169-
execution_to_json(c)?
188+
execution_to_json(c, supports_structured_log_lines)?
170189
},
171190
FunctionExecutionPart::Progress(c) => {
172191
FunctionExecutionJson::Progress {
173192
udf_type: c.event_source.udf_type.to_string(),
174193
identifier: c.event_source.path,
175194
timestamp: c.function_start_timestamp.as_secs_f64(),
176195
log_lines: c.log_lines
177-
.into_iter().map(|l| l.to_pretty_string()).collect(),
196+
.into_iter()
197+
.map(|l| l.to_json(supports_structured_log_lines, false))
198+
.try_collect()?,
178199
request_id: c.event_source.context.request_id.to_string(),
179200
execution_id: c.event_source.context.execution_id.to_string()
180201
}
@@ -204,7 +225,10 @@ pub async fn stream_function_logs(
204225
}
205226
}
206227

207-
fn execution_to_json(execution: FunctionExecution) -> anyhow::Result<FunctionExecutionJson> {
228+
fn execution_to_json(
229+
execution: FunctionExecution,
230+
supports_structured_log_lines: bool,
231+
) -> anyhow::Result<FunctionExecutionJson> {
208232
let json = match execution.params {
209233
UdfParams::Function { error, identifier } => {
210234
let identifier: String = identifier.strip().into();
@@ -214,8 +238,8 @@ fn execution_to_json(execution: FunctionExecution) -> anyhow::Result<FunctionExe
214238
log_lines: execution
215239
.log_lines
216240
.into_iter()
217-
.map(|l| l.to_pretty_string())
218-
.collect(),
241+
.map(|l| l.to_json(supports_structured_log_lines, false))
242+
.try_collect()?,
219243
timestamp: execution.unix_timestamp.as_secs_f64(),
220244
cached_result: execution.cached_result,
221245
execution_time: execution.execution_time,
@@ -237,8 +261,8 @@ fn execution_to_json(execution: FunctionExecution) -> anyhow::Result<FunctionExe
237261
log_lines: execution
238262
.log_lines
239263
.into_iter()
240-
.map(|l| l.to_pretty_string())
241-
.collect(),
264+
.map(|l| l.to_json(supports_structured_log_lines, false))
265+
.try_collect()?,
242266
timestamp: execution.unix_timestamp.as_secs_f64(),
243267
cached_result: execution.cached_result,
244268
execution_time: execution.execution_time,

crates/local_backend/src/subs/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,7 @@ fn new_sync_worker_config(client_version: ClientVersion) -> anyhow::Result<SyncW
321321
| ClientType::AirbyteExport
322322
| ClientType::FivetranImport
323323
| ClientType::FivetranExport
324+
| ClientType::Dashboard
324325
| ClientType::Actions => Err(anyhow::anyhow!(
325326
"No websocket support for client: {}",
326327
client_version.client()

0 commit comments

Comments
 (0)