Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/catalog/src/durable/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@
//!
//! When in doubt, reach out to the Surfaces team, and we'll be more than happy to help :)

pub mod json_compatible;
#[cfg(test)]
mod tests;
pub mod wire_compatible;

use mz_ore::{soft_assert_eq_or_log, soft_assert_ne_or_log};
use mz_repr::Diff;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,52 +7,54 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use bytes::Bytes;
use serde::Serialize;
use serde::de::DeserializeOwned;

/// Denotes that `Self` is wire compatible with type `T`.
/// Denotes that `Self` is JSON compatible with type `T`.
///
/// You should not implement this yourself, instead use the `wire_compatible!` macro.
pub unsafe trait WireCompatible<T: prost::Message>: prost::Message + Default {
/// You should not implement this yourself, instead use the `json_compatible!` macro.
pub unsafe trait JsonCompatible<T>: Serialize + DeserializeOwned
where
T: Serialize + DeserializeOwned,
{
/// Converts the type `T` into `Self` by serializing `T` and deserializing as `Self`.
fn convert(old: &T) -> Self {
let bytes = old.encode_to_vec();
// Note: use Bytes to enable possible re-use of the underlying buffer.
let bytes = Bytes::from(bytes);
Self::decode(bytes).expect("wire compatible")
let bytes = serde_json::to_vec(old).expect("JSON serializable");
serde_json::from_slice(&bytes).expect("JSON compatible")
}
}

// SAFETY: A message type is trivially wire compatible with itself.
unsafe impl<T: prost::Message + Default + Clone> WireCompatible<T> for T {
// SAFETY: A type is trivially JSON compatible with itself.
unsafe impl<T: Serialize + DeserializeOwned + Clone> JsonCompatible<T> for T {
fn convert(old: &Self) -> Self {
old.clone()
}
}

/// Defines one protobuf type as wire compatible with another.
/// Defines one type as JSON compatible with another.
///
/// ```text
/// wire_compatible!(objects_v28::DatabaseKey with objects_v27::DatabaseKey);
/// json_compatible!(objects_v28::DatabaseKey with objects_v27::DatabaseKey);
/// ```
///
/// Internally this will implement the `WireCompatible<B> for <A>`, e.g.
/// `WireCompatible<objects_v27::DatabaseKey> for objects_v28::DatabaseKey` and generate `proptest`
/// Internally this will implement `JsonCompatible<B> for <A>`, e.g.
/// `JsonCompatible<objects_v27::DatabaseKey> for objects_v28::DatabaseKey` and generate `proptest`
/// cases that will create arbitrary objects of type `B` and assert they can be deserialized with
/// type `A`, and vice versa.
#[macro_export]
macro_rules! wire_compatible {
macro_rules! json_compatible {
($a:ident $(:: $a_sub:ident)* with $b:ident $(:: $b_sub:ident)*) => {
::static_assertions::assert_impl_all!(
$a $(::$a_sub)* : ::proptest::arbitrary::Arbitrary, ::prost::Message, Default,
$a $(::$a_sub)* : ::proptest::arbitrary::Arbitrary, ::serde::Serialize, ::serde::de::DeserializeOwned,
);
::static_assertions::assert_impl_all!(
$b $(::$b_sub)* : ::proptest::arbitrary::Arbitrary, ::prost::Message, Default,
$b $(::$b_sub)* : ::proptest::arbitrary::Arbitrary, ::serde::Serialize, ::serde::de::DeserializeOwned,
);

// SAFETY: Below we assert that these types are wire compatible by generating arbitrary
// SAFETY: Below we assert that these types are JSON compatible by generating arbitrary
// structs, encoding in one, and then decoding in the other.
unsafe impl $crate::durable::upgrade::wire_compatible::WireCompatible< $b $(::$b_sub)* > for $a $(::$a_sub)* {}
unsafe impl $crate::durable::upgrade::wire_compatible::WireCompatible< $a $(::$a_sub)* > for $b $(::$b_sub)* {}
unsafe impl $crate::durable::upgrade::json_compatible::JsonCompatible< $b $(::$b_sub)* > for $a $(::$a_sub)* {}
unsafe impl $crate::durable::upgrade::json_compatible::JsonCompatible< $a $(::$a_sub)* > for $b $(::$b_sub)* {}

::paste::paste! {
::proptest::proptest! {
Expand All @@ -63,39 +65,37 @@ macro_rules! wire_compatible {

#[mz_ore::test]
#[cfg_attr(miri, ignore)] // slow
fn [<proptest_wire_compat_ $a:snake $(_$a_sub:snake)* _to_ $b:snake $(_$b_sub:snake)* >](a: $a $(::$a_sub)* ) {
use ::prost::Message;
let a_bytes = a.encode_to_vec();
let b_decoded = $b $(::$b_sub)*::decode(&a_bytes[..]);
fn [<proptest_json_compat_ $a:snake $(_$a_sub:snake)* _to_ $b:snake $(_$b_sub:snake)* >](a: $a $(::$a_sub)* ) {
let a_bytes = ::serde_json::to_vec(&a).expect("JSON serializable");
let b_decoded = ::serde_json::from_slice::<$b $(::$b_sub)*>(&a_bytes);
::proptest::prelude::prop_assert!(b_decoded.is_ok());

// Maybe superfluous, but this is a method called in production.
let b_decoded = b_decoded.expect("asserted Ok");
let b_converted: $b $(::$b_sub)* = $crate::durable::upgrade::wire_compatible::WireCompatible::convert(&a);
let b_converted: $b $(::$b_sub)* = $crate::durable::upgrade::json_compatible::JsonCompatible::convert(&a);
assert_eq!(b_decoded, b_converted);

let b_bytes = b_decoded.encode_to_vec();
let b_bytes = ::serde_json::to_vec(&b_decoded).expect("JSON serializable");
::proptest::prelude::prop_assert_eq!(a_bytes, b_bytes, "a and b serialize differently");
}

#[mz_ore::test]
#[cfg_attr(miri, ignore)] // slow
fn [<proptest_wire_compat_ $b:snake $(_$b_sub:snake)* _to_ $a:snake $(_$a_sub:snake)* >](b: $b $(::$b_sub)* ) {
use ::prost::Message;
let b_bytes = b.encode_to_vec();
let a_decoded = $a $(::$a_sub)*::decode(&b_bytes[..]);
fn [<proptest_json_compat_ $b:snake $(_$b_sub:snake)* _to_ $a:snake $(_$a_sub:snake)* >](b: $b $(::$b_sub)* ) {
let b_bytes = ::serde_json::to_vec(&b).expect("JSON serializable");
let a_decoded = ::serde_json::from_slice::<$a $(::$a_sub)*>(&b_bytes);
::proptest::prelude::prop_assert!(a_decoded.is_ok());

// Maybe superfluous, but this is a method called in production.
let a_decoded = a_decoded.expect("asserted Ok");
let a_converted: $a $(::$a_sub)* = $crate::durable::upgrade::wire_compatible::WireCompatible::convert(&b);
let a_converted: $a $(::$a_sub)* = $crate::durable::upgrade::json_compatible::JsonCompatible::convert(&b);
assert_eq!(a_decoded, a_converted);

let a_bytes = a_decoded.encode_to_vec();
let a_bytes = ::serde_json::to_vec(&a_decoded).expect("JSON serializable");
::proptest::prelude::prop_assert_eq!(a_bytes, b_bytes, "a and b serialize differently");
}
}
}
};
}
pub use wire_compatible;
pub use json_compatible;
30 changes: 15 additions & 15 deletions src/catalog/src/durable/upgrade/v67_to_v68.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,22 @@

use crate::durable::traits::{UpgradeFrom, UpgradeInto};
use crate::durable::upgrade::MigrationAction;
use crate::durable::upgrade::wire_compatible::{WireCompatible, wire_compatible};
use crate::durable::upgrade::json_compatible::{JsonCompatible, json_compatible};
use crate::durable::upgrade::{objects_v67 as v67, objects_v68 as v68};

wire_compatible!(v67::GlobalId with v68::GlobalId);
wire_compatible!(v67::CatalogItem with v68::CatalogItem);
wire_compatible!(v67::SchemaId with v68::SchemaId);
wire_compatible!(v67::CommentValue with v68::CommentValue);
wire_compatible!(v67::RoleId with v68::RoleId);
wire_compatible!(v67::MzAclItem with v68::MzAclItem);
wire_compatible!(v67::DatabaseId with v68::DatabaseId);
wire_compatible!(v67::ResolvedSchema with v68::ResolvedSchema);
wire_compatible!(v67::ClusterId with v68::ClusterId);
wire_compatible!(v67::ClusterReplicaId with v68::ClusterReplicaId);
wire_compatible!(v67::SourceReferencesValue with v68::SourceReferencesValue);
wire_compatible!(v67::GidMappingKey with v68::GidMappingKey);
wire_compatible!(v67::ClusterIntrospectionSourceIndexKey with v68::ClusterIntrospectionSourceIndexKey);
json_compatible!(v67::GlobalId with v68::GlobalId);
json_compatible!(v67::CatalogItem with v68::CatalogItem);
json_compatible!(v67::SchemaId with v68::SchemaId);
json_compatible!(v67::CommentValue with v68::CommentValue);
json_compatible!(v67::RoleId with v68::RoleId);
json_compatible!(v67::MzAclItem with v68::MzAclItem);
json_compatible!(v67::DatabaseId with v68::DatabaseId);
json_compatible!(v67::ResolvedSchema with v68::ResolvedSchema);
json_compatible!(v67::ClusterId with v68::ClusterId);
json_compatible!(v67::ClusterReplicaId with v68::ClusterReplicaId);
json_compatible!(v67::SourceReferencesValue with v68::SourceReferencesValue);
json_compatible!(v67::GidMappingKey with v68::GidMappingKey);
json_compatible!(v67::ClusterIntrospectionSourceIndexKey with v68::ClusterIntrospectionSourceIndexKey);

/// In v68 we switched catalog items to be keyed on a `CatalogItemId`, this required a few changes:
///
Expand Down Expand Up @@ -278,7 +278,7 @@ impl UpgradeFrom<v67::state_update_kind::SourceReferences>
fn upgrade_from(old: v67::state_update_kind::SourceReferences) -> Self {
v68::state_update_kind::SourceReferences {
key: old.key.map(|old| old.upgrade_into()),
value: old.value.map(|old| WireCompatible::convert(&old)),
value: old.value.map(|old| JsonCompatible::convert(&old)),
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/catalog/src/durable/upgrade/v73_to_v74.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
use crate::durable::traits::UpgradeFrom;
use crate::durable::upgrade::MigrationAction;
use crate::durable::upgrade::{objects_v73 as v73, objects_v74 as v74};
use crate::wire_compatible;
use crate::json_compatible;

wire_compatible!(v73::RoleKey with v74::RoleKey);
json_compatible!(v73::RoleKey with v74::RoleKey);

/// in v74, we add attributes to RoleAttribute.
pub fn upgrade(
Expand Down
22 changes: 11 additions & 11 deletions src/catalog/src/durable/upgrade/v74_to_v75.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@

use crate::durable::traits::{UpgradeFrom, UpgradeInto};
use crate::durable::upgrade::MigrationAction;
use crate::durable::upgrade::wire_compatible::{WireCompatible, wire_compatible};
use crate::durable::upgrade::json_compatible::{JsonCompatible, json_compatible};
use crate::durable::upgrade::{objects_v74 as v74, objects_v75 as v75};

wire_compatible!(v74::ClusterReplicaKey with v75::ClusterReplicaKey);
wire_compatible!(v74::ClusterId with v75::ClusterId);
wire_compatible!(v74::RoleId with v75::RoleId);
wire_compatible!(v74::ReplicaLogging with v75::ReplicaLogging);
wire_compatible!(v74::replica_config::ManagedLocation with v75::replica_config::ManagedLocation);
json_compatible!(v74::ClusterReplicaKey with v75::ClusterReplicaKey);
json_compatible!(v74::ClusterId with v75::ClusterId);
json_compatible!(v74::RoleId with v75::RoleId);
json_compatible!(v74::ReplicaLogging with v75::ReplicaLogging);
json_compatible!(v74::replica_config::ManagedLocation with v75::replica_config::ManagedLocation);

/// Removes some options from unmanaged (unorchestrated?) replica configs.
pub fn upgrade(
Expand Down Expand Up @@ -49,7 +49,7 @@ impl UpgradeFrom<v74::state_update_kind::ClusterReplica>
{
fn upgrade_from(old: v74::state_update_kind::ClusterReplica) -> Self {
v75::state_update_kind::ClusterReplica {
key: old.key.as_ref().map(WireCompatible::convert),
key: old.key.as_ref().map(JsonCompatible::convert),
value: old.value.map(UpgradeFrom::upgrade_from),
}
}
Expand All @@ -58,18 +58,18 @@ impl UpgradeFrom<v74::state_update_kind::ClusterReplica>
impl UpgradeFrom<v74::ClusterReplicaValue> for v75::ClusterReplicaValue {
fn upgrade_from(old: v74::ClusterReplicaValue) -> Self {
v75::ClusterReplicaValue {
cluster_id: old.cluster_id.as_ref().map(WireCompatible::convert),
cluster_id: old.cluster_id.as_ref().map(JsonCompatible::convert),
name: old.name,
config: old.config.map(UpgradeFrom::upgrade_from),
owner_id: old.owner_id.as_ref().map(WireCompatible::convert),
owner_id: old.owner_id.as_ref().map(JsonCompatible::convert),
}
}
}

impl UpgradeFrom<v74::ReplicaConfig> for v75::ReplicaConfig {
fn upgrade_from(old: v74::ReplicaConfig) -> Self {
v75::ReplicaConfig {
logging: old.logging.as_ref().map(WireCompatible::convert),
logging: old.logging.as_ref().map(JsonCompatible::convert),
location: old.location.map(UpgradeFrom::upgrade_from),
}
}
Expand All @@ -82,7 +82,7 @@ impl UpgradeFrom<v74::replica_config::Location> for v75::replica_config::Locatio
v75::replica_config::Location::Unmanaged(loc.upgrade_into())
}
v74::replica_config::Location::Managed(loc) => {
v75::replica_config::Location::Managed(WireCompatible::convert(&loc))
v75::replica_config::Location::Managed(JsonCompatible::convert(&loc))
}
}
}
Expand Down
42 changes: 21 additions & 21 deletions src/catalog/src/durable/upgrade/v75_to_v76.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,18 @@

use crate::durable::traits::{UpgradeFrom, UpgradeInto};
use crate::durable::upgrade::MigrationAction;
use crate::durable::upgrade::wire_compatible::{WireCompatible, wire_compatible};
use crate::durable::upgrade::json_compatible::{JsonCompatible, json_compatible};
use crate::durable::upgrade::{objects_v75 as v75, objects_v76 as v76};

wire_compatible!(v75::ClusterKey with v76::ClusterKey);
wire_compatible!(v75::ClusterReplicaKey with v76::ClusterReplicaKey);
wire_compatible!(v75::ClusterId with v76::ClusterId);
wire_compatible!(v75::RoleId with v76::RoleId);
wire_compatible!(v75::MzAclItem with v76::MzAclItem);
wire_compatible!(v75::ReplicaLogging with v76::ReplicaLogging);
wire_compatible!(v75::replica_config::UnmanagedLocation with v76::replica_config::UnmanagedLocation);
wire_compatible!(v75::OptimizerFeatureOverride with v76::OptimizerFeatureOverride);
wire_compatible!(v75::ClusterSchedule with v76::ClusterSchedule);
json_compatible!(v75::ClusterKey with v76::ClusterKey);
json_compatible!(v75::ClusterReplicaKey with v76::ClusterReplicaKey);
json_compatible!(v75::ClusterId with v76::ClusterId);
json_compatible!(v75::RoleId with v76::RoleId);
json_compatible!(v75::MzAclItem with v76::MzAclItem);
json_compatible!(v75::ReplicaLogging with v76::ReplicaLogging);
json_compatible!(v75::replica_config::UnmanagedLocation with v76::replica_config::UnmanagedLocation);
json_compatible!(v75::OptimizerFeatureOverride with v76::OptimizerFeatureOverride);
json_compatible!(v75::ClusterSchedule with v76::ClusterSchedule);

/// Removes the `disk` flag from managed cluster and replica configs.
pub fn upgrade(
Expand Down Expand Up @@ -64,7 +64,7 @@ pub fn upgrade(
impl UpgradeFrom<v75::state_update_kind::Cluster> for v76::state_update_kind::Cluster {
fn upgrade_from(old: v75::state_update_kind::Cluster) -> Self {
v76::state_update_kind::Cluster {
key: old.key.as_ref().map(WireCompatible::convert),
key: old.key.as_ref().map(JsonCompatible::convert),
value: old.value.map(UpgradeFrom::upgrade_from),
}
}
Expand All @@ -74,8 +74,8 @@ impl UpgradeFrom<v75::ClusterValue> for v76::ClusterValue {
fn upgrade_from(old: v75::ClusterValue) -> Self {
v76::ClusterValue {
name: old.name,
owner_id: old.owner_id.as_ref().map(WireCompatible::convert),
privileges: old.privileges.iter().map(WireCompatible::convert).collect(),
owner_id: old.owner_id.as_ref().map(JsonCompatible::convert),
privileges: old.privileges.iter().map(JsonCompatible::convert).collect(),
config: old.config.map(UpgradeFrom::upgrade_from),
}
}
Expand Down Expand Up @@ -109,13 +109,13 @@ impl UpgradeFrom<v75::cluster_config::ManagedCluster> for v76::cluster_config::M
size: loc.size,
replication_factor: loc.replication_factor,
availability_zones: loc.availability_zones,
logging: loc.logging.as_ref().map(WireCompatible::convert),
logging: loc.logging.as_ref().map(JsonCompatible::convert),
optimizer_feature_overrides: loc
.optimizer_feature_overrides
.iter()
.map(WireCompatible::convert)
.map(JsonCompatible::convert)
.collect(),
schedule: loc.schedule.as_ref().map(WireCompatible::convert),
schedule: loc.schedule.as_ref().map(JsonCompatible::convert),
}
}
}
Expand All @@ -125,7 +125,7 @@ impl UpgradeFrom<v75::state_update_kind::ClusterReplica>
{
fn upgrade_from(old: v75::state_update_kind::ClusterReplica) -> Self {
v76::state_update_kind::ClusterReplica {
key: old.key.as_ref().map(WireCompatible::convert),
key: old.key.as_ref().map(JsonCompatible::convert),
value: old.value.map(UpgradeFrom::upgrade_from),
}
}
Expand All @@ -134,18 +134,18 @@ impl UpgradeFrom<v75::state_update_kind::ClusterReplica>
impl UpgradeFrom<v75::ClusterReplicaValue> for v76::ClusterReplicaValue {
fn upgrade_from(old: v75::ClusterReplicaValue) -> Self {
v76::ClusterReplicaValue {
cluster_id: old.cluster_id.as_ref().map(WireCompatible::convert),
cluster_id: old.cluster_id.as_ref().map(JsonCompatible::convert),
name: old.name,
config: old.config.map(UpgradeFrom::upgrade_from),
owner_id: old.owner_id.as_ref().map(WireCompatible::convert),
owner_id: old.owner_id.as_ref().map(JsonCompatible::convert),
}
}
}

impl UpgradeFrom<v75::ReplicaConfig> for v76::ReplicaConfig {
fn upgrade_from(old: v75::ReplicaConfig) -> Self {
v76::ReplicaConfig {
logging: old.logging.as_ref().map(WireCompatible::convert),
logging: old.logging.as_ref().map(JsonCompatible::convert),
location: old.location.map(UpgradeFrom::upgrade_from),
}
}
Expand All @@ -155,7 +155,7 @@ impl UpgradeFrom<v75::replica_config::Location> for v76::replica_config::Locatio
fn upgrade_from(old: v75::replica_config::Location) -> Self {
match old {
v75::replica_config::Location::Unmanaged(loc) => {
v76::replica_config::Location::Unmanaged(WireCompatible::convert(&loc))
v76::replica_config::Location::Unmanaged(JsonCompatible::convert(&loc))
}
v75::replica_config::Location::Managed(loc) => {
v76::replica_config::Location::Managed(loc.upgrade_into())
Expand Down