Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
773f582
Add Timestamp variant to Parameter enum in metadata (#1326)
Jan 23, 2026
9d48561
Enhance metadata handling by adding datetime conversion in pydict_to_…
Jan 23, 2026
2ae3a08
Add chrono dependency for enhanced datetime handling in metadata
Jan 23, 2026
65168b7
Add get_timestamp and set_timestamp methods to Metadata for handling …
Jan 23, 2026
8284cc2
Add chrono dependency with serde feature for enhanced datetime serial…
Jan 23, 2026
f83af86
Implement serialization for Timestamp variant in log_to_terminal func…
Jan 23, 2026
8d3ba13
Refactor datetime handling in pydict_to_metadata and metadata_to_pydi…
Jan 23, 2026
4b9cf39
Refactor Timestamp serialization in log_to_terminal function for impr…
Jan 23, 2026
b56b308
Add chrono dependency to Cargo.lock for enhanced datetime handling
Jan 23, 2026
bc1bbe1
Refactor timestamp handling in pydict_to_metadata for improved clarit…
Jan 26, 2026
dc97fbb
Refactor timestamp handling in pydict_to_metadata for improved effici…
Jan 26, 2026
a26314f
Enhance C++ build configuration by adding C++20 standard and suppress…
Jan 29, 2026
39860d7
Refactor timestamp handling in Metadata to use nanoseconds for improv…
Jan 29, 2026
33b2c44
Update CMake template to use C++20 standard for improved compatibility
Jan 29, 2026
c864b9e
Update C++ node build configuration to use C++20 standard for enhance…
Jan 29, 2026
629967a
Update C++ node build configuration to use C++20 standard for consist…
Jan 29, 2026
c2c9cbf
Update CMake configuration to set C++ standard to 20 for improved com…
Jan 29, 2026
f4a4612
Update C++ node build configuration to use C++20 standard for improve…
Jan 29, 2026
94ac2dd
Enhance C++ node README with C++20 timestamp support details
Jan 29, 2026
4b00bb6
Refactor timestamp conversion in Metadata to improve code clarity
Jan 29, 2026
d27f834
Refactor timestamp serialization in echo command for improved clarity
Jan 30, 2026
8a71991
Refactor timestamp serialization in Metadata for improved error handling
Jan 30, 2026
67af07a
Refactor timestamp serialization in Metadata for improved readability
Jan 30, 2026
c269db6
Refactor timestamp serialization in echo command for improved consist…
Jan 30, 2026
47eba11
Simplify C++ timestamp API docs and handle Timestamp in example
haixuanTao Feb 9, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions apis/c++/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ serde = { version = "1.0.164", features = ["derive"] }
serde_json = { version = "1.0" }
serde-big-array = { version = "0.5.1", optional = true }
arrow = { workspace = true, features = ["ffi"] }
chrono = { version = "0.4", features = ["serde"] }

[build-dependencies]
cxx-build = "1.0.73"
Expand Down
25 changes: 25 additions & 0 deletions apis/c++/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{any::Any, collections::BTreeMap, vec};

use crate::ffi::MetadataValueType;

use chrono::{DateTime, Utc};
use dora_node_api::{
self, Event, EventStream, Metadata as DoraMetadata,
MetadataParameters as DoraMetadataParameters, Parameter as DoraParameter,
Expand Down Expand Up @@ -60,6 +61,7 @@ mod ffi {
ListInt,
ListFloat,
ListString,
Timestamp,
}

pub struct CombinedEvents {
Expand Down Expand Up @@ -140,6 +142,7 @@ mod ffi {
fn get_list_int(self: &Metadata, key: &str) -> Result<Vec<i64>>;
fn get_list_float(self: &Metadata, key: &str) -> Result<Vec<f64>>;
fn get_list_string(self: &Metadata, key: &str) -> Result<Vec<String>>;
fn get_timestamp(self: &Metadata, key: &str) -> Result<String>;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we can use C++20, how about using std::chrono::time_point<sd::chrono::utc_clock>?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion! I noticed -std=c++17 in our example build files, but I haven’t confirmed which C++ standard the API bindings actually use. If C++20 is acceptable for the project, I’d be happy to use std::chrono::time_pointstd::chrono::utc_clock.
@phil-opp any thoughts on this?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@haixuanTao @heyong4725 Any thoughts on this? Is requiring c++20 ok or do we want to stick to c++17?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's go ahead and switch to C++20 and the time_point type, and see if there are any issues. Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

fn get_json(self: &Metadata, key: &str) -> Result<String>;
fn to_json(self: &Metadata) -> String;
fn list_keys(self: &Metadata) -> Vec<String>;
Expand All @@ -150,6 +153,7 @@ mod ffi {
fn set_list_int(self: &mut Metadata, key: &str, value: Vec<i64>) -> Result<()>;
fn set_list_float(self: &mut Metadata, key: &str, value: Vec<f64>) -> Result<()>;
fn set_list_string(self: &mut Metadata, key: &str, value: Vec<String>) -> Result<()>;
fn set_timestamp(self: &mut Metadata, key: &str, value: String) -> Result<()>;
#[cxx_name = "type"]
fn value_type(self: &Metadata, key: &str) -> Result<MetadataValueType>;
}
Expand Down Expand Up @@ -315,6 +319,7 @@ impl Metadata {
DoraParameter::ListInt(_) => "list<int>",
DoraParameter::ListFloat(_) => "list<float>",
DoraParameter::ListString(_) => "list<string>",
DoraParameter::Timestamp(_) => "timestamp",
}
}

Expand Down Expand Up @@ -342,6 +347,7 @@ impl Metadata {
.map(|value| JsonValue::String(value.clone()))
.collect(),
)),
DoraParameter::Timestamp(dt) => Ok(JsonValue::String(dt.to_rfc3339())),
}
}

Expand Down Expand Up @@ -426,6 +432,17 @@ impl Metadata {
}
}

pub fn get_timestamp(&self, key: &str) -> EyreResult<String> {
let parameter = self.expect_parameter(key)?;
match parameter {
DoraParameter::Timestamp(dt) => Ok(dt.to_rfc3339()),
other => Err(eyre!(
"metadata key '{key}' has type '{}', expected 'timestamp'",
Metadata::parameter_type_name(other)
)),
}
}

pub fn get_json(&self, key: &str) -> EyreResult<String> {
let parameter = self.expect_parameter(key)?;
let json_value = Metadata::parameter_to_json(parameter, key)?;
Expand Down Expand Up @@ -479,6 +496,13 @@ impl Metadata {
self.insert_parameter(key, DoraParameter::ListString(value))
}

pub fn set_timestamp(&mut self, key: &str, value: String) -> EyreResult<()> {
let dt = DateTime::parse_from_rfc3339(&value)
.map_err(|e| eyre!("Failed to parse timestamp '{value}': {e}"))?
.with_timezone(&Utc);
self.insert_parameter(key, DoraParameter::Timestamp(dt))
}

pub fn value_type(&self, key: &str) -> EyreResult<MetadataValueType> {
let parameter = self.expect_parameter(key)?;
let value_type = match parameter {
Expand All @@ -489,6 +513,7 @@ impl Metadata {
DoraParameter::ListInt(_) => MetadataValueType::ListInt,
DoraParameter::ListFloat(_) => MetadataValueType::ListFloat,
DoraParameter::ListString(_) => MetadataValueType::ListString,
DoraParameter::Timestamp(_) => MetadataValueType::Timestamp,
};
Ok(value_type)
}
Expand Down
1 change: 1 addition & 0 deletions apis/python/operator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ arrow-schema = { workspace = true }
aligned-vec = "0.5.0"
futures = "0.3.28"
futures-concurrency = "7.3.0"
chrono = { version = "0.4", features = ["serde"] }
56 changes: 54 additions & 2 deletions apis/python/operator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{
};

use arrow::pyarrow::ToPyArrow;
use chrono::{DateTime, Utc};
use dora_node_api::{
DoraNode, Event, EventStream, Metadata, MetadataParameters, Parameter, StopCause,
merged::{MergeExternalSend, MergedEvent},
Expand Down Expand Up @@ -234,8 +235,39 @@ pub fn pydict_to_metadata(dict: Option<Bound<'_, PyDict>>) -> Result<MetadataPar
let list: Vec<String> = value.extract()?;
parameters.insert(key, Parameter::ListString(list))
} else {
println!("could not convert type {value}");
parameters.insert(key, Parameter::String(value.str()?.to_string()))
// Check if it's a datetime.datetime object
let datetime_module = PyModule::import(value.py(), "datetime")
.context("Failed to import datetime module")?;
let datetime_class = datetime_module.getattr("datetime")?;

if value.is_instance(datetime_class.as_ref())? {
// Extract timestamp using timestamp() method
let timestamp_float: f64 = value
.call_method0("timestamp")?
.extract()
.context("Failed to extract timestamp from datetime")?;

// Convert to chrono::DateTime<Utc>
// timestamp() returns seconds since epoch as float
// Convert to SystemTime first, then to DateTime<Utc>
let seconds = timestamp_float as i64;
let nanos = ((timestamp_float - seconds as f64) * 1_000_000_000.0) as u64;

let system_time = if seconds >= 0 {
UNIX_EPOCH + std::time::Duration::new(seconds as u64, nanos as u32)
} else {
UNIX_EPOCH
.checked_sub(std::time::Duration::new((-seconds) as u64, nanos as u32))
.unwrap_or(UNIX_EPOCH)
};

let dt = DateTime::<Utc>::from(system_time);

parameters.insert(key, Parameter::Timestamp(dt))
} else {
println!("could not convert type {value}");
parameters.insert(key, Parameter::String(value.str()?.to_string()))
}
};
}
}
Expand Down Expand Up @@ -306,6 +338,26 @@ pub fn metadata_to_pydict<'a>(
Parameter::ListString(l) => dict
.set_item(k, l)
.context("Could not insert metadata into python dictionary")?,
Parameter::Timestamp(dt) => {
// Convert chrono::DateTime<Utc> to Python datetime.datetime
let timestamp = dt.timestamp();
let microseconds = dt.timestamp_subsec_micros();

// Get UTC timezone from Python's datetime module
let datetime_module =
PyModule::import(py, "datetime").context("Failed to import datetime module")?;
let datetime_class = datetime_module.getattr("datetime")?;
let utc_timezone = datetime_module.getattr("timezone")?.getattr("utc")?;

// Create timezone-aware datetime using fromtimestamp
let total_seconds = timestamp as f64 + microseconds as f64 / 1_000_000.0;
let py_datetime = datetime_class
.call_method1("fromtimestamp", (total_seconds, utc_timezone))
.context("Failed to create Python datetime from timestamp")?;

dict.set_item(k, py_datetime)
.context("Could not insert timestamp into python dictionary")?
}
}
}

Expand Down
3 changes: 3 additions & 0 deletions binaries/cli/src/command/topic/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,9 @@ async fn log_to_terminal(
Parameter::Float(value) => serde_json::to_string(value).unwrap(),
Parameter::ListFloat(value) => serde_json::to_string(value).unwrap(),
Parameter::ListString(value) => serde_json::to_string(value).unwrap(),
Parameter::Timestamp(dt) => {
serde_json::to_string(&dt.to_rfc3339()).unwrap()
}
};
write!(output, "{}:{value}", serde_json::Value::String(k.clone()),)
.unwrap();
Expand Down
2 changes: 2 additions & 0 deletions libraries/message/src/metadata.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::BTreeMap;

use arrow_schema::DataType;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};

/// Additional data that is sent as part of output messages.
Expand Down Expand Up @@ -69,6 +70,7 @@ pub enum Parameter {
Float(f64),
ListFloat(Vec<f64>),
ListString(Vec<String>),
Timestamp(DateTime<Utc>),
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
Expand Down
Loading