diff --git a/.mockery.yaml b/.mockery.yaml new file mode 100644 index 00000000..4d2bbbad --- /dev/null +++ b/.mockery.yaml @@ -0,0 +1,16 @@ +dir: "{{ .InterfaceDir }}/mocks" +mockname: "{{ .InterfaceName }}" +outpkg: mocks +filename: "{{ .InterfaceName | snakecase }}.go" +packages: + github.com/smartcontractkit/chainlink-aptos/relayer/write_target: + interfaces: + chainService: + config: + mockname: ChainService + contractReader: + config: + mockname: ContractReader + contractWriter: + config: + mockname: ContractWriter \ No newline at end of file diff --git a/contracts/data-feeds/Move.toml b/contracts/data-feeds/Move.toml index 343cafbf..303ec8aa 100644 --- a/contracts/data-feeds/Move.toml +++ b/contracts/data-feeds/Move.toml @@ -7,14 +7,19 @@ authors = [] data_feeds = "_" owner = "_" platform = "_" +platform_secondary = "_" +owner_secondary = "_" [dev-addresses] data_feeds = "0x100" owner = "0xcafe" platform = "0xbaba" +platform_secondary = "0x200" +owner_secondary = "0x22" [dependencies] AptosFramework = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "34025227eb5367c449bf9dae3cd226fe5c187904", subdir = "aptos-move/framework/aptos-framework" } -ChainlinkPlatform = { local = "../platform" } +ChainlinkPlatform = { local = "../platform"} +ChainlinkPlatformB = { local = "../platform_secondary" } -[dev-dependencies] +[dev-dependencies] \ No newline at end of file diff --git a/contracts/data-feeds/sources/registry.move b/contracts/data-feeds/sources/registry.move index 42a10003..3f11d837 100644 --- a/contracts/data-feeds/sources/registry.move +++ b/contracts/data-feeds/sources/registry.move @@ -114,8 +114,9 @@ module data_feeds::registry { const ECANNOT_TRANSFER_TO_SELF: u64 = 10; const ENOT_PROPOSED_OWNER: u64 = 11; const EEMPTY_WORKFLOW_OWNERS: u64 = 12; + const EINVALID_RAW_REPORT: u64 = 13; - // Schema types + // Schema types (For Backwards Compatibility) only const SCHEMA_V3: u16 = 3; const SCHEMA_V4: u16 = 4; @@ -149,15 +150,26 @@ module data_feeds::registry { let transfer_ref = object::generate_transfer_ref(&constructor_ref); let object_signer = object::generate_signer(&constructor_ref); - // register to receive platform::forwarder reports + // callback for on_report function let cb = aptos_framework::function_info::new_function_info( publisher, string::utf8(b"registry"), string::utf8(b"on_report") ); + // register to receive platform::forwarder reports platform::storage::register(publisher, cb, new_proof()); + // callback for on_report_secondary function + let cb_secondary = + aptos_framework::function_info::new_function_info( + publisher, + string::utf8(b"registry"), + string::utf8(b"on_report_secondary") + ); + // register to receive platform_secondary::forwarder reports + platform_secondary::storage::register(publisher, cb_secondary, new_proof_secondary()); + move_to( &object_signer, Registry { @@ -306,14 +318,19 @@ module data_feeds::registry { /// This identifier links callback registration with the `on_report` event and enables secure retrieval of callback data. /// Only has the `drop` ability to prevent copying and persisting in global storage. struct OnReceive has drop {} + struct OnReceiveSecondary has drop {} /// Creates a new OnReceive object. inline fun new_proof(): OnReceive { OnReceive {} } - // Platform receiver function interface - public fun on_report(_metadata: Object): option::Option acquires Registry { + /// Creates a new OnReceive object. + inline fun new_proof_secondary(): OnReceiveSecondary { + OnReceiveSecondary {} + } + + public fun on_report(_meta: object::Object): option::Option acquires Registry { let registry = borrow_global_mut(get_state_addr()); let (metadata, data) = platform::storage::retrieve(new_proof()); @@ -347,6 +364,40 @@ module data_feeds::registry { option::none() } + public fun on_report_secondary(_meta: object::Object): option::Option acquires Registry { + let registry = borrow_global_mut(get_state_addr()); + + let (metadata, data) = platform_secondary::storage::retrieve(new_proof_secondary()); + + let parsed_metadata = platform_secondary::storage::parse_report_metadata(metadata); + + let workflow_owner = + platform_secondary::storage::get_report_metadata_workflow_owner(&parsed_metadata); + assert!( + vector::contains(®istry.allowed_workflow_owners, &workflow_owner), + EUNAUTHORIZED_WORKFLOW_OWNER + ); + + let workflow_name = + platform_secondary::storage::get_report_metadata_workflow_name(&parsed_metadata); + assert!( + vector::is_empty(®istry.allowed_workflow_names) + || vector::contains(®istry.allowed_workflow_names, &workflow_name), + EUNAUTHORIZED_WORKFLOW_NAME + ); + + let (feed_ids, reports) = parse_raw_report(data); + vector::zip_ref( + &feed_ids, + &reports, + |feed_id, report| { + perform_update(registry, *feed_id, *report); + } + ); + + option::none() + } + public entry fun set_workflow_config( authority: &signer, allowed_workflow_owners: vector>, @@ -393,41 +444,60 @@ module data_feeds::registry { // Parse ETH ABI encoded raw data into multiple reports fun parse_raw_report(data: vector): (vector>, vector>) { - let offset = 0; + let data_len: u64 = vector::length(&data); + let offset: u64 = 0; assert!( to_u256be(vector::slice(&data, offset, offset + 32)) == 32, 32 ); offset = offset + 32; - let count = to_u256be(vector::slice(&data, offset, offset + 32)); + let count_u256 = to_u256be(vector::slice(&data, offset, offset + 32)); + // Should be safe, we should never have enough reports to overflow this + let count: u64 = count_u256 as u64; offset = offset + 32; - for (i in 0..count) { + let feed_ids: vector> = vector::empty>(); + let reports: vector> = vector::empty>(); + + let is_v03: bool = data_len == count * 13 * 32 + 2 * 32; + let is_benchmark: bool = data_len == count * 3 * 32 + 64; + + if (is_v03) { // skip len * offsets table - offset = offset + 32; + offset = offset + 32 * count; }; - let feed_ids = vector[]; - let reports = vector[]; + if (is_benchmark) { + for (i in 0..count) { + let feed_id = vector::slice(&data, offset, offset + 32); + vector::push_back(&mut feed_ids, feed_id); + let len = 3 * 32; + let report = vector::slice(&data, offset, offset + len); + vector::push_back(&mut reports, report); + offset = offset + len; + }; + } else if (is_v03) { + for (i in 0..count) { + let feed_id = vector::slice(&data, offset, offset + 32); + vector::push_back(&mut feed_ids, feed_id); + offset = offset + 32; - for (i in 0..count) { - let feed_id = vector::slice(&data, offset, offset + 32); - vector::push_back(&mut feed_ids, feed_id); - offset = offset + 32; - - assert!( - to_u256be(vector::slice(&data, offset, offset + 32)) == 64, - 64 - ); - offset = offset + 32; + assert!( + to_u256be(vector::slice(&data, offset, offset + 32)) == 64, + 64 + ); + offset = offset + 32; - let len = (to_u256be(vector::slice(&data, offset, offset + 32)) as u64); - offset = offset + 32; + let len = (to_u256be(vector::slice(&data, offset, offset + 32)) as u64); + offset = offset + 32; - let report = vector::slice(&data, offset, offset + len); - vector::push_back(&mut reports, report); - offset = offset + len; + let report = vector::slice(&data, offset, offset + len); + vector::push_back(&mut reports, report); + offset = offset + len; + }; + } else { + abort error::invalid_argument(EINVALID_RAW_REPORT); }; (feed_ids, reports) @@ -442,21 +512,27 @@ module data_feeds::registry { ); let feed = simple_map::borrow_mut(&mut registry.feeds, &feed_id); - let report_feed_id = vector::slice(&report_data, 0, 32); - // schema is based on first two bytes of the feed id - let schema = to_u16be(vector::slice(&report_feed_id, 0, 2)); - - let observation_timestamp: u256; - let benchmark_price: u256; - if (schema == SCHEMA_V3 || schema == SCHEMA_V4) { - // offsets are the same for timestamp and benchmark in v3 and v4. - observation_timestamp = - (to_u32be(vector::slice(&report_data, 3 * 32 - 4, 3 * 32)) as u256); - // NOTE: aptos has no signed integer types, so can't parse as i196, this is a raw representation - benchmark_price = to_u256be(vector::slice(&report_data, 6 * 32, 7 * 32)); - } else { - abort error::invalid_argument(EINVALID_REPORT) - }; + let is_v03 = vector::length(&report_data) == 9 * 32; // 288 bytes + + let observation_timestamp_ptr: u64 = if (is_v03) { 3 * 32 } + else { 64 }; + let benchmark_price_ptr: u64 = if (is_v03) { 6 * 32 } + else { 64 }; + + let observation_timestamp: u256 = + to_u32be( + vector::slice( + &report_data, + observation_timestamp_ptr - 4, + observation_timestamp_ptr + ) + ) as u256; + + let benchmark_price: u256 = + to_u256be( + vector::slice(&report_data, benchmark_price_ptr, benchmark_price_ptr + + 32) + ); if (feed.observation_timestamp >= observation_timestamp) { event::emit( @@ -639,13 +715,18 @@ module data_feeds::registry { } #[test_only] - fun set_up_test(publisher: &signer, platform: &signer) { + fun set_up_test( + publisher: &signer, platform: &signer, platform_secondary: &signer + ) { use aptos_framework::account::{Self}; account::create_account_for_test(signer::address_of(publisher)); platform::forwarder::init_module_for_testing(platform); platform::storage::init_module_for_testing(platform); + platform_secondary::forwarder::init_module_for_testing(platform_secondary); + platform_secondary::storage::init_module_for_testing(platform_secondary); + init_module(publisher); } @@ -692,6 +773,101 @@ module data_feeds::registry { #[test] fun test_parse_raw_report() { + // request_context = 00018463f564e082c55b7237add2a03bd6b3c35789d38be0f6964d9aba82f1a8000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000000000000000 + // metadata = 1019256d85b84c7ba85cd9b7bb94fe15b73d7ec99e3cc0f470ee5dd2a1eaac88c000000000000000000000000bc3a8582cc08d3df797ab13a6c567eadb2517b3f0f931b7145b218016bf9dde43030303045544842544300000000000000000000000000000000000000aa00010 + // 0000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000004000000000000000000000000000000000000000000000000000000000000001c + // raw report = + // 0000000000000000000000000000000000000000000000000000000000000020 32 + // 0000000000000000000000000000000000000000000000000000000000000005 len=5 + // 0001111111111111111100000000000000000000000000000000000000000000 feed_id 1 + // 0000000000000000000000000000000000000000000000000000000066c2e36c obervation_timestamp 1 + // 00000000000000000000000000000000000000000000000000000000000494a8 answer 1 + // 0002111111111111111100000000000000000000000000000000000000000000 feed_id 2 + // 0000000000000000000000000000000000000000000000000000000066c2e36c obervation_timestamp 2 + // 00000000000000000000000000000000000000000000000000000000000594a8 answer 2 + // 0003111111111111111100000000000000000000000000000000000000000000 feed_id 3 + // 0000000000000000000000000000000000000000000000000000000066c2e36c obervation_timestamp 3 + // 00000000000000000000000000000000000000000000000000000000000694a8 answer 3 + // 0004111111111111111100000000000000000000000000000000000000000000 feed_id 4 + // 0000000000000000000000000000000000000000000000000000000066c2e36c obervation_timestamp 4 + // 00000000000000000000000000000000000000000000000000000000000794a8 answer 4 + // 0005111111111111111100000000000000000000000000000000000000000000 feed_id 5 + // 0000000000000000000000000000000000000000000000000000000066c2e36c obervation_timestamp 5 + // 00000000000000000000000000000000000000000000000000000000000894a8 answer 5 + + let data = + x"0000000000000000000000000000000000000000000000000000000000000020000000000000000000000000000000000000000000000000000000000000000500011111111111111111000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000066c2e36c00000000000000000000000000000000000000000000000000000000000494a800021111111111111111000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000066c2e36c00000000000000000000000000000000000000000000000000000000000594a800031111111111111111000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000066c2e36c00000000000000000000000000000000000000000000000000000000000694a800041111111111111111000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000066c2e36c00000000000000000000000000000000000000000000000000000000000794a800051111111111111111000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000066c2e36c00000000000000000000000000000000000000000000000000000000000894a8"; + + let (feed_ids, reports) = parse_raw_report(data); + std::debug::print(&feed_ids); + std::debug::print(&reports); + + assert!( + feed_ids + == vector[ + x"0001111111111111111100000000000000000000000000000000000000000000", + x"0002111111111111111100000000000000000000000000000000000000000000", + x"0003111111111111111100000000000000000000000000000000000000000000", + x"0004111111111111111100000000000000000000000000000000000000000000", + x"0005111111111111111100000000000000000000000000000000000000000000" + ], + 1 + ); + + let expected_reports = vector[ + x"00011111111111111111000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000066c2e36c00000000000000000000000000000000000000000000000000000000000494a8", + x"00021111111111111111000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000066c2e36c00000000000000000000000000000000000000000000000000000000000594a8", + x"00031111111111111111000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000066c2e36c00000000000000000000000000000000000000000000000000000000000694a8", + x"00041111111111111111000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000066c2e36c00000000000000000000000000000000000000000000000000000000000794a8", + x"00051111111111111111000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000066c2e36c00000000000000000000000000000000000000000000000000000000000894a8" + ]; + assert!(reports == expected_reports, 1); + } + + #[ + test( + owner = @owner, + publisher = @data_feeds, + platform = @platform, + platform_secondary = @platform_secondary + ) + ] + fun test_perform_update( + owner: &signer, + publisher: &signer, + platform: &signer, + platform_secondary: &signer + ) acquires Registry { + set_up_test(publisher, platform, platform_secondary); + + let report_data = + x"00011111111111111111000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000066c2e36c00000000000000000000000000000000000000000000000000000000000494a8"; + let feed_id = vector::slice(&report_data, 0, 32); + let expected_timestamp = 0x000066c2e36c; + let expected_benchmark = 0x0000000494a8; + + let config_id = vector[1]; + + set_feeds( + owner, + vector[feed_id], + vector[string::utf8(b"description")], + config_id + ); + + let registry = borrow_global_mut(get_state_addr()); + perform_update(registry, feed_id, report_data); + + let benchmarks = get_benchmarks(owner, vector[feed_id]); + assert!(vector::length(&benchmarks) == 1, 1); + + let benchmark = vector::borrow(&benchmarks, 0); + assert!(benchmark.benchmark == expected_benchmark, 1); + assert!(benchmark.observation_timestamp == expected_timestamp, 1); + } + + #[test] + fun test_parse_raw_report_v3() { // request_context = 00018463f564e082c55b7237add2a03bd6b3c35789d38be0f6964d9aba82f1a8000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000000000000000 // metadata = 1019256d85b84c7ba85cd9b7bb94fe15b73d7ec99e3cc0f470ee5dd2a1eaac88c000000000000000000000000bc3a8582cc08d3df797ab13a6c567eadb2517b3f0f931b7145b218016bf9dde43030303045544842544300000000000000000000000000000000000000aa00010 // 0000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000004000000000000000000000000000000000000000000000000000000000000001c @@ -748,11 +924,21 @@ module data_feeds::registry { assert!(reports == expected_reports, 1); } - #[test(owner = @owner, publisher = @data_feeds, platform = @platform)] + #[ + test( + owner = @owner, + publisher = @data_feeds, + platform = @platform, + platform_secondary = @platform_secondary + ) + ] fun test_perform_update_v3( - owner: &signer, publisher: &signer, platform: &signer + owner: &signer, + publisher: &signer, + platform: &signer, + platform_secondary: &signer ) acquires Registry { - set_up_test(publisher, platform); + set_up_test(publisher, platform, platform_secondary); let report_data = x"0003fbba4fce42f65d6032b18aee53efdf526cc734ad296cb57565979d883bdd0000000000000000000000000000000000000000000000000000000066ed173e0000000000000000000000000000000000000000000000000000000066ed174200000000000000007fffffffffffffffffffffffffffffffffffffffffffffff00000000000000007fffffffffffffffffffffffffffffffffffffffffffffff0000000000000000000000000000000000000000000000000000000066ee68c2000000000000000000000000000000000000000000000d808cc35e6ed670bd00000000000000000000000000000000000000000000000d808590c35425347980000000000000000000000000000000000000000000000d8093f5f989878e7c00"; @@ -776,6 +962,7 @@ module data_feeds::registry { assert!(vector::length(&benchmarks) == 1, 1); let benchmark = vector::borrow(&benchmarks, 0); + assert!(benchmark.benchmark == expected_benchmark, 1); assert!(benchmark.observation_timestamp == expected_timestamp, 1); } @@ -785,6 +972,7 @@ module data_feeds::registry { owner = @owner, publisher = @data_feeds, platform = @platform, + platform_secondary = @platform_secondary, new_owner = @0xbeef ) ] @@ -792,9 +980,10 @@ module data_feeds::registry { owner: &signer, publisher: &signer, platform: &signer, + platform_secondary: &signer, new_owner: &signer ) acquires Registry { - set_up_test(publisher, platform); + set_up_test(publisher, platform, platform_secondary); assert!(get_owner() == @owner, 1); @@ -804,24 +993,44 @@ module data_feeds::registry { assert!(get_owner() == signer::address_of(new_owner), 2); } - #[test(publisher = @data_feeds, platform = @platform, unknown_user = @0xbeef)] + #[ + test( + publisher = @data_feeds, + platform = @platform, + platform_secondary = @platform_secondary, + unknown_user = @0xbeef + ) + ] #[expected_failure(abort_code = 327681, location = data_feeds::registry)] fun test_transfer_ownership_failure_not_owner( - publisher: &signer, platform: &signer, unknown_user: &signer + publisher: &signer, + platform: &signer, + platform_secondary: &signer, + unknown_user: &signer ) acquires Registry { - set_up_test(publisher, platform); + set_up_test(publisher, platform, platform_secondary); assert!(get_owner() == @owner, 1); transfer_ownership(unknown_user, signer::address_of(unknown_user)); } - #[test(owner = @owner, publisher = @data_feeds, platform = @platform)] + #[ + test( + owner = @owner, + publisher = @data_feeds, + platform = @platform, + platform_secondary = @platform_secondary + ) + ] #[expected_failure(abort_code = 65546, location = data_feeds::registry)] fun test_transfer_ownership_failure_transfer_to_self( - owner: &signer, publisher: &signer, platform: &signer + owner: &signer, + publisher: &signer, + platform: &signer, + platform_secondary: &signer ) acquires Registry { - set_up_test(publisher, platform); + set_up_test(publisher, platform, platform_secondary); assert!(get_owner() == @owner, 1); @@ -833,6 +1042,7 @@ module data_feeds::registry { owner = @owner, publisher = @data_feeds, platform = @platform, + platform_secondary = @platform_secondary, new_owner = @0xbeef ) ] @@ -841,9 +1051,10 @@ module data_feeds::registry { owner: &signer, publisher: &signer, platform: &signer, + platform_secondary: &signer, new_owner: &signer ) acquires Registry { - set_up_test(publisher, platform); + set_up_test(publisher, platform, platform_secondary); assert!(get_owner() == @owner, 1); @@ -851,9 +1062,11 @@ module data_feeds::registry { accept_ownership(new_owner); } - #[test(publisher = @data_feeds, platform = @platform)] - fun test_retrieve_benchmark(publisher: &signer, platform: &signer) acquires Registry { - set_up_test(publisher, platform); + #[test(publisher = @data_feeds, platform = @platform, platform_secondary = @platform_secondary)] + fun test_retrieve_benchmark( + publisher: &signer, platform: &signer, platform_secondary: &signer + ) acquires Registry { + set_up_test(publisher, platform, platform_secondary); let feed_id = vector[1, 2, 3, 4, 5]; set_feed_for_test( diff --git a/contracts/platform_secondary/Move.toml b/contracts/platform_secondary/Move.toml new file mode 100644 index 00000000..99c82216 --- /dev/null +++ b/contracts/platform_secondary/Move.toml @@ -0,0 +1,17 @@ +[package] +name = "ChainlinkPlatformB" +version = "1.0.0" +authors = [] + +[addresses] +platform_secondary = "_" +owner_secondary = "_" + +[dev-addresses] +platform_secondary = "0x200" +owner_secondary = "0x22" + +[dependencies] +AptosFramework = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "34025227eb5367c449bf9dae3cd226fe5c187904", subdir = "aptos-move/framework/aptos-framework" } + +[dev-dependencies] diff --git a/contracts/platform_secondary/sources/forwarder.move b/contracts/platform_secondary/sources/forwarder.move new file mode 100644 index 00000000..1b82c445 --- /dev/null +++ b/contracts/platform_secondary/sources/forwarder.move @@ -0,0 +1,571 @@ +module platform_secondary::forwarder { + use aptos_framework::object::{Self, ExtendRef, TransferRef}; + use aptos_std::smart_table::{SmartTable, Self}; + + use std::error; + use std::event; + use std::vector; + use std::bit_vector; + use std::option::{Self, Option}; + use std::signer; + use std::bcs; + + const E_INVALID_DATA_LENGTH: u64 = 1; + const E_INVALID_SIGNER: u64 = 2; + const E_DUPLICATE_SIGNER: u64 = 3; + const E_INVALID_SIGNATURE_COUNT: u64 = 4; + const E_INVALID_SIGNATURE: u64 = 5; + const E_ALREADY_PROCESSED: u64 = 6; + const E_NOT_OWNER: u64 = 7; + const E_MALFORMED_SIGNATURE: u64 = 8; + const E_FAULT_TOLERANCE_MUST_BE_POSITIVE: u64 = 9; + const E_EXCESS_SIGNERS: u64 = 10; + const E_INSUFFICIENT_SIGNERS: u64 = 11; + const E_CALLBACK_DATA_NOT_CONSUMED: u64 = 12; + const E_CANNOT_TRANSFER_TO_SELF: u64 = 13; + const E_NOT_PROPOSED_OWNER: u64 = 14; + const E_CONFIG_ID_NOT_FOUND: u64 = 15; + const E_INVALID_REPORT_VERSION: u64 = 16; + + const MAX_ORACLES: u64 = 31; + + const APP_OBJECT_SEED: vector = b"FORWARDER"; + + struct ConfigId has key, store, drop, copy { + don_id: u32, + config_version: u32 + } + + struct State has key { + owner_address: address, + pending_owner_address: address, + extend_ref: ExtendRef, + transfer_ref: TransferRef, + + // (don_id, config_version) => config + configs: SmartTable, + reports: SmartTable, address> + } + + struct Config has key, store, drop, copy { + f: u8, + // oracles: SimpleMap, + oracles: vector + } + + #[event] + struct ConfigSet has drop, store { + don_id: u32, + config_version: u32, + f: u8, + signers: vector> + } + + #[event] + struct ReportProcessed has drop, store { + receiver: address, + workflow_execution_id: vector, + report_id: u16 + } + + #[event] + struct OwnershipTransferRequested has drop, store { + from: address, + to: address + } + + #[event] + struct OwnershipTransferred has drop, store { + from: address, + to: address + } + + inline fun assert_is_owner(state: &State, target_address: address) { + assert!( + state.owner_address == target_address, + error::permission_denied(E_NOT_OWNER) + ); + } + + fun init_module(publisher: &signer) { + assert!(signer::address_of(publisher) == @platform_secondary, 1); + + let constructor_ref = object::create_named_object(publisher, APP_OBJECT_SEED); + + let extend_ref = object::generate_extend_ref(&constructor_ref); + let transfer_ref = object::generate_transfer_ref(&constructor_ref); + let app_signer = &object::generate_signer(&constructor_ref); + + move_to( + app_signer, + State { + owner_address: @owner_secondary, + pending_owner_address: @0x0, + configs: smart_table::new(), + reports: smart_table::new(), + extend_ref, + transfer_ref + } + ); + } + + inline fun get_state_addr(): address { + object::create_object_address(&@platform_secondary, APP_OBJECT_SEED) + } + + public entry fun set_config( + authority: &signer, + don_id: u32, + config_version: u32, + f: u8, + oracles: vector> + ) acquires State { + let state = borrow_global_mut(get_state_addr()); + + assert_is_owner(state, signer::address_of(authority)); + + assert!(f != 0, error::invalid_argument(E_FAULT_TOLERANCE_MUST_BE_POSITIVE)); + assert!( + vector::length(&oracles) <= MAX_ORACLES, + error::invalid_argument(E_EXCESS_SIGNERS) + ); + assert!( + vector::length(&oracles) >= 3 * (f as u64) + 1, + error::invalid_argument(E_INSUFFICIENT_SIGNERS) + ); + + smart_table::upsert( + &mut state.configs, + ConfigId { don_id, config_version }, + Config { + f, + oracles: vector::map( + oracles, + |oracle| { ed25519::new_unvalidated_public_key_from_bytes(oracle) } + ) + } + ); + + event::emit( + ConfigSet { don_id, config_version, f, signers: oracles } + ); + } + + public entry fun clear_config( + authority: &signer, don_id: u32, config_version: u32 + ) acquires State { + let state = borrow_global_mut(get_state_addr()); + + assert_is_owner(state, signer::address_of(authority)); + + smart_table::remove(&mut state.configs, ConfigId { don_id, config_version }); + + event::emit( + ConfigSet { don_id, config_version, f: 0, signers: vector::empty() } + ); + } + + use aptos_std::aptos_hash::blake2b_256; + use aptos_std::ed25519; + + struct Signature has drop { + public_key: ed25519::UnvalidatedPublicKey, // TODO: pass signer index rather than key to save on space and gas? + sig: ed25519::Signature + } + + public fun signature_from_bytes(bytes: vector): Signature { + assert!( + vector::length(&bytes) == 96, + error::invalid_argument(E_MALFORMED_SIGNATURE) + ); + let public_key = + ed25519::new_unvalidated_public_key_from_bytes(vector::slice(&bytes, 0, 32)); + let sig = ed25519::new_signature_from_bytes(vector::slice(&bytes, 32, 96)); + Signature { sig, public_key } + } + + inline fun transmission_id( + receiver: address, workflow_execution_id: vector, report_id: u16 + ): vector { + let id = bcs::to_bytes(&receiver); + vector::append(&mut id, workflow_execution_id); + vector::append(&mut id, bcs::to_bytes(&report_id)); + id + } + + /// The dispatch call knows both storage and indirectly the callback, thus the separate module. + fun dispatch( + receiver: address, metadata: vector, data: vector + ) { + let meta = platform_secondary::storage::insert(receiver, metadata, data); + aptos_framework::dispatchable_fungible_asset::derived_supply(meta); + let obj_address = + object::object_address(&meta); + assert!( + !platform_secondary::storage::storage_exists(obj_address), + E_CALLBACK_DATA_NOT_CONSUMED + ); + } + + entry fun report( + transmitter: &signer, + receiver: address, + raw_report: vector, + signatures: vector> + ) acquires State { + let signatures = vector::map( + signatures, |signature| signature_from_bytes(signature) + ); + + let (metadata, data) = + validate_and_process_report(transmitter, receiver, raw_report, signatures); + // NOTE: unable to catch failure here + dispatch(receiver, metadata, data); + } + + inline fun to_u16be(data: vector): u16 { + // reverse big endian to little endian + vector::reverse(&mut data); + aptos_std::from_bcs::to_u16(data) + } + + inline fun to_u32be(data: vector): u32 { + // reverse big endian to little endian + vector::reverse(&mut data); + aptos_std::from_bcs::to_u32(data) + } + + fun validate_and_process_report( + transmitter: &signer, + receiver: address, + raw_report: vector, + signatures: vector + ): (vector, vector) acquires State { + let state = borrow_global_mut(get_state_addr()); + + // report_context = vector::slice(&raw_report, 0, 96); + let report = vector::slice(&raw_report, 96, vector::length(&raw_report)); + + // parse out report metadata + // version | workflow_execution_id | timestamp | don_id | config_version | ... + let report_version = *vector::borrow(&report, 0); + assert!(report_version == 1, E_INVALID_REPORT_VERSION); + + let workflow_execution_id = vector::slice(&report, 1, 33); + // _timestamp + let don_id = vector::slice(&report, 37, 41); + let don_id = to_u32be(don_id); + let config_version = vector::slice(&report, 41, 45); + let config_version = to_u32be(config_version); + let report_id = vector::slice(&report, 107, 109); + let report_id = to_u16be(report_id); + let metadata = vector::slice(&report, 45, 109); + let data = vector::slice(&report, 109, vector::length(&report)); + + let config_id = ConfigId { don_id, config_version }; + assert!(smart_table::contains(&state.configs, config_id), E_CONFIG_ID_NOT_FOUND); + let config = smart_table::borrow(&state.configs, config_id); + + // check if report was already delivered + let transmission_id = transmission_id(receiver, workflow_execution_id, report_id); + let processed = smart_table::contains(&state.reports, transmission_id); + assert!(!processed, E_ALREADY_PROCESSED); + + let required_signatures = (config.f as u64) + 1; + assert!( + vector::length(&signatures) == required_signatures, + error::invalid_argument(E_INVALID_SIGNATURE_COUNT) + ); + + // blake2b(report_context | report) + let msg = blake2b_256(raw_report); + + let signed = bit_vector::new(vector::length(&config.oracles)); + + vector::for_each_ref( + &signatures, + |signature| { + let signature: &Signature = signature; // some compiler versions can't infer the type here + + let (valid, index) = vector::index_of( + &config.oracles, &signature.public_key + ); + assert!(valid, error::invalid_argument(E_INVALID_SIGNER)); + + // check for duplicate signers + let duplicate = bit_vector::is_index_set(&signed, index); + assert!(!duplicate, error::invalid_argument(E_DUPLICATE_SIGNER)); + bit_vector::set(&mut signed, index); + + let result = + ed25519::signature_verify_strict( + &signature.sig, &signature.public_key, msg + ); + assert!(result, error::invalid_argument(E_INVALID_SIGNATURE)); + } + ); + + // mark as delivered + smart_table::add( + &mut state.reports, transmission_id, signer::address_of(transmitter) + ); + + event::emit(ReportProcessed { receiver, workflow_execution_id, report_id }); + + (metadata, data) + } + + #[view] + public fun get_transmission_state( + receiver: address, workflow_execution_id: vector, report_id: u16 + ): bool acquires State { + let state = borrow_global(get_state_addr()); + let transmission_id = transmission_id(receiver, workflow_execution_id, report_id); + + return smart_table::contains(&state.reports, transmission_id) + } + + #[view] + public fun get_transmitter( + receiver: address, workflow_execution_id: vector, report_id: u16 + ): Option
acquires State { + let state = borrow_global(get_state_addr()); + let transmission_id = transmission_id(receiver, workflow_execution_id, report_id); + + if (!smart_table::contains(&state.reports, transmission_id)) { + return option::none() + }; + option::some(*smart_table::borrow(&state.reports, transmission_id)) + } + + // Ownership functions + + #[view] + public fun get_owner(): address acquires State { + let state = borrow_global(get_state_addr()); + state.owner_address + } + + #[view] + public fun get_config(don_id: u32, config_version: u32): Config acquires State { + let state = borrow_global(get_state_addr()); + let config_id = ConfigId { don_id, config_version }; + *smart_table::borrow(&state.configs, config_id) + } + + public entry fun transfer_ownership(authority: &signer, to: address) acquires State { + let state = borrow_global_mut(get_state_addr()); + assert_is_owner(state, signer::address_of(authority)); + assert!( + state.owner_address != to, + error::invalid_argument(E_CANNOT_TRANSFER_TO_SELF) + ); + + state.pending_owner_address = to; + + event::emit(OwnershipTransferRequested { from: state.owner_address, to }); + } + + public entry fun accept_ownership(authority: &signer) acquires State { + let state = borrow_global_mut(get_state_addr()); + assert!( + state.pending_owner_address == signer::address_of(authority), + error::permission_denied(E_NOT_PROPOSED_OWNER) + ); + + let old_owner_address = state.owner_address; + state.owner_address = state.pending_owner_address; + state.pending_owner_address = @0x0; + + event::emit( + OwnershipTransferred { from: old_owner_address, to: state.owner_address } + ); + } + + #[test_only] + public fun init_module_for_testing(publisher: &signer) { + init_module(publisher); + } + + #[test_only] + public entry fun set_up_test(owner_secondary: &signer, publisher: &signer) { + use aptos_framework::account::{Self}; + account::create_account_for_test(signer::address_of(owner_secondary)); + account::create_account_for_test(signer::address_of(publisher)); + + init_module(publisher); + } + + #[test_only] + struct OracleSet has drop { + don_id: u32, + config_version: u32, + f: u8, + oracles: vector>, + signers: vector + } + + #[test_only] + fun generate_oracle_set(): OracleSet { + let don_id = 0; + let f = 1; + + let signers = vector[]; + let oracles = vector[]; + for (i in 0..31) { + let (sk, pk) = ed25519::generate_keys(); + vector::push_back(&mut signers, sk); + vector::push_back(&mut oracles, ed25519::validated_public_key_to_bytes(&pk)); + }; + OracleSet { don_id, config_version: 1, f, oracles, signers } + } + + #[test_only] + fun sign_report( + config: &OracleSet, report: vector, report_context: vector + ): vector { + // blake2b(report_context, report) + let msg = report_context; + vector::append(&mut msg, report); + let msg = blake2b_256(msg); + + let signatures = vector[]; + let required_signatures = config.f + 1; + for (i in 0..required_signatures) { + let config_signer = vector::borrow(&config.signers, (i as u64)); + let public_key = + ed25519::new_unvalidated_public_key_from_bytes( + *vector::borrow(&config.oracles, (i as u64)) + ); + let sig = ed25519::sign_arbitrary_bytes(config_signer, msg); + vector::push_back(&mut signatures, Signature { sig, public_key }); + }; + signatures + } + + #[test(owner_secondary = @owner_secondary, publisher = @platform_secondary)] + public entry fun test_happy_path(owner_secondary: &signer, publisher: &signer) acquires State { + set_up_test(owner_secondary, publisher); + + let config = generate_oracle_set(); + + // configure DON + set_config( + owner_secondary, + config.don_id, + config.config_version, + config.f, + config.oracles + ); + + // generate report + let version = 1; + let timestamp: u32 = 1; + let workflow_id = + x"6d795f6964000000000000000000000000000000000000000000000000000000"; + let workflow_name = x"000000000000DEADBEEF"; + let workflow_owner = x"0000000000000000000000000000000000000051"; + let report_id = x"0001"; + let execution_id = + x"6d795f657865637574696f6e5f69640000000000000000000000000000000000"; + let mercury_reports = vector[x"010203", x"aabbcc"]; + + let report = vector[]; + // header + vector::push_back(&mut report, version); + vector::append(&mut report, execution_id); + + let bytes = bcs::to_bytes(×tamp); + // convert little-endian to big-endian + vector::reverse(&mut bytes); + vector::append(&mut report, bytes); + + let bytes = bcs::to_bytes(&config.don_id); + // convert little-endian to big-endian + vector::reverse(&mut bytes); + vector::append(&mut report, bytes); + + let bytes = bcs::to_bytes(&config.config_version); + // convert little-endian to big-endian + vector::reverse(&mut bytes); + vector::append(&mut report, bytes); + + // metadata + vector::append(&mut report, workflow_id); + vector::append(&mut report, workflow_name); + vector::append(&mut report, workflow_owner); + vector::append(&mut report, report_id); + // report + vector::append(&mut report, bcs::to_bytes(&mercury_reports)); + + let report_context = + x"a0b000000000000000000000000000000000000000000000000000000000000a0b000000000000000000000000000000000000000000000000000000000000a0b000000000000000000000000000000000000000000000000000000000000000"; + assert!(vector::length(&report_context) == 96, 1); + + let raw_report = vector[]; + vector::append(&mut raw_report, report_context); + vector::append(&mut raw_report, report); + + // sign report + let signatures = sign_report(&config, report, report_context); + + // call entrypoint + validate_and_process_report( + owner_secondary, + signer::address_of(publisher), + raw_report, + signatures + ); + } + + #[test(owner_secondary = @owner_secondary, publisher = @platform_secondary, new_owner = @0xbeef)] + fun test_transfer_ownership_success( + owner_secondary: &signer, publisher: &signer, new_owner: &signer + ) acquires State { + set_up_test(owner_secondary, publisher); + + assert!(get_owner() == @owner_secondary, 1); + + transfer_ownership(owner_secondary, signer::address_of(new_owner)); + accept_ownership(new_owner); + + assert!(get_owner() == signer::address_of(new_owner), 2); + } + + #[test(owner_secondary = @owner_secondary, publisher = @platform_secondary, unknown_user = @0xbeef)] + #[expected_failure(abort_code = 327687, location = platform_secondary::forwarder)] + fun test_transfer_ownership_failure_not_owner( + owner_secondary: &signer, publisher: &signer, unknown_user: &signer + ) acquires State { + set_up_test(owner_secondary, publisher); + + assert!(get_owner() == @owner_secondary, 1); + + transfer_ownership(unknown_user, signer::address_of(unknown_user)); + } + + #[test(owner_secondary = @owner_secondary, publisher = @platform_secondary)] + #[expected_failure(abort_code = 65549, location = platform_secondary::forwarder)] + fun test_transfer_ownership_failure_transfer_to_self( + owner_secondary: &signer, publisher: &signer + ) acquires State { + set_up_test(owner_secondary, publisher); + + assert!(get_owner() == @owner_secondary, 1); + + transfer_ownership(owner_secondary, signer::address_of(owner_secondary)); + } + + #[test(owner_secondary = @owner_secondary, publisher = @platform_secondary, new_owner = @0xbeef)] + #[expected_failure(abort_code = 327694, location = platform_secondary::forwarder)] + fun test_transfer_ownership_failure_not_proposed_owner( + owner_secondary: &signer, publisher: &signer, new_owner: &signer + ) acquires State { + set_up_test(owner_secondary, publisher); + + assert!(get_owner() == @owner_secondary, 1); + + transfer_ownership(owner_secondary, @0xfeeb); + accept_ownership(new_owner); + } +} diff --git a/contracts/platform_secondary/sources/storage.move b/contracts/platform_secondary/sources/storage.move new file mode 100644 index 00000000..d53c83c2 --- /dev/null +++ b/contracts/platform_secondary/sources/storage.move @@ -0,0 +1,521 @@ +/// The storage module stores all the state associated with the dispatch service. +module platform_secondary::storage { + use std::option; + use std::string; + use std::signer; + use std::vector; + + use aptos_std::table::{Self, Table}; + use aptos_std::smart_table::{SmartTable, Self}; + use aptos_std::type_info::{Self, TypeInfo}; + + use aptos_framework::dispatchable_fungible_asset; + use aptos_framework::function_info::FunctionInfo; + use aptos_framework::fungible_asset::{Self, Metadata}; + use aptos_framework::object::{Self, ExtendRef, TransferRef, Object}; + + const APP_OBJECT_SEED: vector = b"STORAGE"; + + friend platform_secondary::forwarder; + + const E_UNKNOWN_RECEIVER: u64 = 1; + const E_INVALID_METADATA_LENGTH: u64 = 2; + + struct Entry has key, store, drop { + metadata: Object, + extend_ref: ExtendRef + } + + struct Dispatcher has key { + /// Tracks the input type to the dispatch handler. + dispatcher: Table, + address_to_typeinfo: Table, + /// Used to store temporary data for dispatching. + extend_ref: ExtendRef, + transfer_ref: TransferRef + } + + struct DispatcherV2 has key { + dispatcher: SmartTable, + address_to_typeinfo: SmartTable + } + + /// Store the data to dispatch here. + struct Storage has drop, key { + metadata: vector, + data: vector + } + + struct ReportMetadata has key, store, drop { + workflow_cid: vector, + workflow_name: vector, + workflow_owner: vector, + report_id: vector + } + + /// Registers an account and callback for future dispatching, and a proof type `T` + /// for the callback function to retrieve arguments. Note that the function will + /// abort if the account has already been registered. + /// + /// The address of `account` is used to represent the callback by the dispatcher. + /// See the `dispatch` function in `forwarder.move`. + /// + /// Providing an instance of `T` guarantees that only a privileged module can call `register` for that type. + /// The type `T` should ideally only have the `drop` ability and no other abilities to prevent + /// copying and persisting in global storage. + public fun register( + account: &signer, callback: FunctionInfo, _proof: T + ) acquires Dispatcher, DispatcherV2 { + let typename = type_info::type_name(); + let constructor_ref = + object::create_named_object(&storage_signer(), *string::bytes(&typename)); + let extend_ref = object::generate_extend_ref(&constructor_ref); + let metadata = + fungible_asset::add_fungibility( + &constructor_ref, + option::none(), + // this was `typename` but it fails due to ENAME_TOO_LONG + string::utf8(b"storage"), + string::utf8(b"dis"), + 0, + string::utf8(b""), + string::utf8(b"") + ); + dispatchable_fungible_asset::register_derive_supply_dispatch_function( + &constructor_ref, option::some(callback) + ); + + let dispatcher = borrow_global_mut(storage_address()); + smart_table::add( + &mut dispatcher.dispatcher, + type_info::type_of(), + Entry { metadata, extend_ref } + ); + smart_table::add( + &mut dispatcher.address_to_typeinfo, + signer::address_of(account), + type_info::type_of() + ); + } + + public entry fun migrate_to_v2( + callback_addresses: vector
+ ) acquires Dispatcher, DispatcherV2 { + let addr = storage_address(); + + if (!exists(addr)) { + move_to( + &storage_signer(), + DispatcherV2 { + dispatcher: smart_table::new(), + address_to_typeinfo: smart_table::new() + } + ); + }; + + let dispatcher = borrow_global_mut(addr); + let dispatcher_v2 = borrow_global_mut(addr); + + vector::for_each_ref( + &callback_addresses, + |callback_address| { + // Aborts if the callback address does not exist. + let type_info = + table::remove( + &mut dispatcher.address_to_typeinfo, *callback_address + ); + let entry = table::remove(&mut dispatcher.dispatcher, type_info); + + smart_table::add( + &mut dispatcher_v2.address_to_typeinfo, + *callback_address, + type_info + ); + smart_table::add(&mut dispatcher_v2.dispatcher, type_info, entry); + } + ); + } + + /// Insert into this module as the callback needs to retrieve and avoid a cyclical dependency: + /// engine -> storage and then engine -> callback -> storage + public(friend) fun insert( + receiver: address, callback_metadata: vector, callback_data: vector + ): Object acquires Dispatcher, DispatcherV2 { + // TODO: delete this clause after migration completes + if (!exists(storage_address())) { + let dispatcher = borrow_global(storage_address()); + let typeinfo = *table::borrow(&dispatcher.address_to_typeinfo, receiver); + assert!( + table::contains(&dispatcher.dispatcher, typeinfo), + E_UNKNOWN_RECEIVER + ); + let Entry { metadata: asset_metadata, extend_ref } = + table::borrow(&dispatcher.dispatcher, typeinfo); + let obj_signer = object::generate_signer_for_extending(extend_ref); + move_to( + &obj_signer, Storage { data: callback_data, metadata: callback_metadata } + ); + return *asset_metadata + }; + + let dispatcher = borrow_global(storage_address()); + let typeinfo = *smart_table::borrow(&dispatcher.address_to_typeinfo, receiver); + assert!( + smart_table::contains(&dispatcher.dispatcher, typeinfo), + E_UNKNOWN_RECEIVER + ); + let Entry { metadata: asset_metadata, extend_ref } = + smart_table::borrow(&dispatcher.dispatcher, typeinfo); + let obj_signer = object::generate_signer_for_extending(extend_ref); + move_to(&obj_signer, Storage { data: callback_data, metadata: callback_metadata }); + *asset_metadata + } + + public(friend) fun storage_exists(obj_address: address): bool { + object::object_exists(obj_address) + } + + /// Second half of the process for retrieving. This happens outside engine to prevent the + /// cyclical dependency. + public fun retrieve( + _proof: T + ): (vector, vector) acquires Dispatcher, DispatcherV2, Storage { + // TODO: delete this clause after migration completes + if (!exists(storage_address())) { + let dispatcher = borrow_global(storage_address()); + let typeinfo = type_info::type_of(); + let Entry { metadata: _, extend_ref } = + table::borrow(&dispatcher.dispatcher, typeinfo); + let obj_address = object::address_from_extend_ref(extend_ref); + let data = move_from(obj_address); + return (data.metadata, data.data) + }; + let dispatcher = borrow_global(storage_address()); + let typeinfo = type_info::type_of(); + let Entry { metadata: _, extend_ref } = + smart_table::borrow(&dispatcher.dispatcher, typeinfo); + let obj_address = object::address_from_extend_ref(extend_ref); + let data = move_from(obj_address); + (data.metadata, data.data) + } + + #[view] + public fun parse_report_metadata(metadata: vector): ReportMetadata { + // workflow_cid // offset 0, size 32 + // workflow_name // offset 32, size 10 + // workflow_owner // offset 42, size 20 + // report_id // offset 62, size 2 + assert!(vector::length(&metadata) == 64, E_INVALID_METADATA_LENGTH); + + let workflow_cid = vector::slice(&metadata, 0, 32); + let workflow_name = vector::slice(&metadata, 32, 42); + let workflow_owner = vector::slice(&metadata, 42, 62); + let report_id = vector::slice(&metadata, 62, 64); + + ReportMetadata { workflow_cid, workflow_name, workflow_owner, report_id } + } + + /// Prepares the dispatch table. + fun init_module(publisher: &signer) { + assert!(signer::address_of(publisher) == @platform_secondary, 1); + + let constructor_ref = object::create_named_object(publisher, APP_OBJECT_SEED); + + let extend_ref = object::generate_extend_ref(&constructor_ref); + let transfer_ref = object::generate_transfer_ref(&constructor_ref); + let object_signer = object::generate_signer(&constructor_ref); + + move_to( + &object_signer, + Dispatcher { + dispatcher: table::new(), + address_to_typeinfo: table::new(), + extend_ref, + transfer_ref + } + ); + + move_to( + &object_signer, + DispatcherV2 { + dispatcher: smart_table::new(), + address_to_typeinfo: smart_table::new() + } + ); + } + + inline fun storage_address(): address { + object::create_object_address(&@platform_secondary, APP_OBJECT_SEED) + } + + inline fun storage_signer(): signer acquires Dispatcher { + object::generate_signer_for_extending( + &borrow_global(storage_address()).extend_ref + ) + } + + // Struct accessors + + public fun get_report_metadata_workflow_cid( + report_metadata: &ReportMetadata + ): vector { + report_metadata.workflow_cid + } + + public fun get_report_metadata_workflow_name( + report_metadata: &ReportMetadata + ): vector { + report_metadata.workflow_name + } + + public fun get_report_metadata_workflow_owner( + report_metadata: &ReportMetadata + ): vector { + report_metadata.workflow_owner + } + + public fun get_report_metadata_report_id( + report_metadata: &ReportMetadata + ): vector { + report_metadata.report_id + } + + #[test_only] + public fun init_module_for_testing(publisher: &signer) { + init_module(publisher); + } + + #[test] + fun test_parse_report_metadata() { + let metadata = + x"6d795f6964000000000000000000000000000000000000000000000000000000000000000000deadbeef00000000000000000000000000000000000000510001"; + let expected_workflow_cid = + x"6d795f6964000000000000000000000000000000000000000000000000000000"; + let expected_workflow_name = x"000000000000DEADBEEF"; + let expected_workflow_owner = x"0000000000000000000000000000000000000051"; + let expected_report_id = x"0001"; + + let parsed_metadata = parse_report_metadata(metadata); + assert!(parsed_metadata.workflow_cid == expected_workflow_cid, 1); + assert!(parsed_metadata.workflow_name == expected_workflow_name, 1); + assert!(parsed_metadata.workflow_owner == expected_workflow_owner, 1); + assert!(parsed_metadata.report_id == expected_report_id, 1); + } + + #[test_only] + fun init_module_deprecated(publisher: &signer) { + assert!(signer::address_of(publisher) == @platform_secondary, 1); + + let constructor_ref = object::create_named_object(publisher, APP_OBJECT_SEED); + + let extend_ref = object::generate_extend_ref(&constructor_ref); + let transfer_ref = object::generate_transfer_ref(&constructor_ref); + let object_signer = object::generate_signer(&constructor_ref); + + move_to( + &object_signer, + Dispatcher { + dispatcher: table::new(), + address_to_typeinfo: table::new(), + extend_ref, + transfer_ref + } + ); + } + + #[test_only] + fun register_deprecated( + account: &signer, callback: FunctionInfo, _proof: T + ) acquires Dispatcher { + let typename = type_info::type_name(); + let constructor_ref = + object::create_named_object(&storage_signer(), *string::bytes(&typename)); + let extend_ref = object::generate_extend_ref(&constructor_ref); + let metadata = + fungible_asset::add_fungibility( + &constructor_ref, + option::none(), + // this was `typename` but it fails due to ENAME_TOO_LONG + string::utf8(b"storage"), + string::utf8(b"dis"), + 0, + string::utf8(b""), + string::utf8(b"") + ); + dispatchable_fungible_asset::register_derive_supply_dispatch_function( + &constructor_ref, option::some(callback) + ); + + let dispatcher = borrow_global_mut(storage_address()); + table::add( + &mut dispatcher.dispatcher, + type_info::type_of(), + Entry { metadata, extend_ref } + ); + table::add( + &mut dispatcher.address_to_typeinfo, + signer::address_of(account), + type_info::type_of() + ); + } + + #[test_only] + struct TestProof has drop {} + + #[test_only] + struct TestProof2 has drop {} + + #[test_only] + struct TestProof3 has drop {} + + #[test_only] + struct TestProof4 has drop {} + + #[test_only] + public fun test_callback(_metadata: Object): option::Option { + option::none() + } + + #[test(publisher = @platform_secondary)] + fun test_v2_migration(publisher: &signer) acquires Dispatcher, DispatcherV2, Storage { + init_module_deprecated(publisher); + + let test_callback = + aptos_framework::function_info::new_function_info( + publisher, + string::utf8(b"storage"), + string::utf8(b"test_callback") + ); + + register_deprecated(publisher, test_callback, TestProof {}); + + let (derived_publisher, _) = + aptos_framework::account::create_resource_account( + publisher, b"TEST_V2_MIGRATION" + ); + let (derived_publisher2, _) = + aptos_framework::account::create_resource_account( + publisher, b"TEST_V2_MIGRATION_2" + ); + let (derived_publisher3, _) = + aptos_framework::account::create_resource_account( + publisher, b"TEST_V2_MIGRATION_3" + ); + + register_deprecated(&derived_publisher, test_callback, TestProof2 {}); + register_deprecated(&derived_publisher2, test_callback, TestProof3 {}); + + let callback_metadata = vector[1, 2, 3, 4]; + let callback_data = vector[5, 6, 7, 8, 9]; + + // test initial migration + { + // test that insert and retrieve work before migration + insert(signer::address_of(publisher), callback_metadata, callback_data); + let (received_metadata, received_data) = retrieve(TestProof {}); + assert!(callback_metadata == received_metadata, 1); + assert!(callback_data == received_data, 1); + + let derived_addr = signer::address_of(&derived_publisher); + migrate_to_v2(vector[@platform_secondary, derived_addr]); + + // test that insert and retrieve still work after migration + insert(signer::address_of(publisher), callback_metadata, callback_data); + let (received_metadata, received_data) = retrieve(TestProof {}); + assert!(callback_metadata == received_metadata, 1); + assert!(callback_data == received_data, 1); + + let dispatcher = borrow_global(storage_address()); + assert!( + !table::contains( + &dispatcher.dispatcher, type_info::type_of() + ), + 1 + ); + assert!(!table::contains(&dispatcher.address_to_typeinfo, @platform_secondary), 1); + assert!( + !table::contains( + &dispatcher.dispatcher, type_info::type_of() + ), + 1 + ); + assert!(!table::contains(&dispatcher.address_to_typeinfo, derived_addr), 1); + + let dispatcher_v2 = borrow_global(storage_address()); + assert!( + smart_table::contains( + &dispatcher_v2.dispatcher, type_info::type_of() + ), + 1 + ); + assert!( + smart_table::contains(&dispatcher_v2.address_to_typeinfo, @platform_secondary), + 1 + ); + assert!( + smart_table::contains( + &dispatcher_v2.dispatcher, type_info::type_of() + ), + 1 + ); + assert!( + smart_table::contains(&dispatcher_v2.address_to_typeinfo, derived_addr), + 1 + ); + }; + + // migrate a second time, when DispatcherV2 already exists. + { + let derived_addr = signer::address_of(&derived_publisher2); + migrate_to_v2(vector[derived_addr]); + + let dispatcher = borrow_global(storage_address()); + assert!( + !table::contains( + &dispatcher.dispatcher, type_info::type_of() + ), + 1 + ); + assert!(!table::contains(&dispatcher.address_to_typeinfo, derived_addr), 1); + + let dispatcher_v2 = borrow_global(storage_address()); + assert!( + smart_table::contains( + &dispatcher_v2.dispatcher, type_info::type_of() + ), + 1 + ); + assert!( + smart_table::contains(&dispatcher_v2.address_to_typeinfo, derived_addr), + 1 + ); + }; + + // test the upgraded register function + { + let derived_addr = signer::address_of(&derived_publisher3); + register(&derived_publisher3, test_callback, TestProof4 {}); + + let dispatcher = borrow_global(storage_address()); + assert!( + !table::contains( + &dispatcher.dispatcher, type_info::type_of() + ), + 1 + ); + assert!(!table::contains(&dispatcher.address_to_typeinfo, derived_addr), 1); + + let dispatcher_v2 = borrow_global(storage_address()); + assert!( + smart_table::contains( + &dispatcher_v2.dispatcher, type_info::type_of() + ), + 1 + ); + assert!( + smart_table::contains(&dispatcher_v2.address_to_typeinfo, derived_addr), + 1 + ); + } + } +} diff --git a/flake.lock b/flake.lock index 10141932..4d82fbd4 100644 --- a/flake.lock +++ b/flake.lock @@ -20,11 +20,11 @@ }, "nixpkgs": { "locked": { - "lastModified": 1724479785, - "narHash": "sha256-pP3Azj5d6M5nmG68Fu4JqZmdGt4S4vqI5f8te+E/FTw=", + "lastModified": 1746328495, + "narHash": "sha256-uKCfuDs7ZM3QpCE/jnfubTg459CnKnJG/LwqEVEdEiw=", "owner": "nixos", "repo": "nixpkgs", - "rev": "d0e1602ddde669d5beb01aec49d71a51937ed7be", + "rev": "979daf34c8cacebcd917d540070b52a3c2b9b16e", "type": "github" }, "original": { diff --git a/go.mod b/go.mod index 1819271f..cf7b9568 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( go.opentelemetry.io/otel v1.35.0 go.opentelemetry.io/otel/metric v1.35.0 go.opentelemetry.io/otel/trace v1.35.0 + go.uber.org/zap v1.27.0 golang.org/x/crypto v0.36.0 golang.org/x/exp v0.0.0-20250305212735-054e65f0b394 golang.org/x/sync v0.12.0 @@ -102,6 +103,7 @@ require ( github.com/smartcontractkit/grpc-proxy v0.0.0-20240830132753-a7e17fec5ab7 // indirect github.com/smartcontractkit/libocr v0.0.0-20250220133800-f3b940c4f298 // indirect github.com/spf13/pflag v1.0.6 // indirect + github.com/stretchr/objx v0.5.2 // indirect github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect github.com/x448/float16 v0.8.4 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect @@ -123,7 +125,6 @@ require ( go.opentelemetry.io/otel/sdk/metric v1.35.0 // indirect go.opentelemetry.io/proto/otlp v1.5.0 // indirect go.uber.org/multierr v1.11.0 // indirect - go.uber.org/zap v1.27.0 // indirect golang.org/x/mod v0.24.0 // indirect golang.org/x/net v0.38.0 // indirect golang.org/x/sys v0.31.0 // indirect diff --git a/relayer/monitor/beholder.go b/relayer/monitor/beholder.go index bf230ff7..e86d3a62 100644 --- a/relayer/monitor/beholder.go +++ b/relayer/monitor/beholder.go @@ -97,3 +97,15 @@ func (e *protoEmitter) appendAttrsRequired(ctx context.Context, m proto.Message, attrKVs = appendRequiredAttrDomain(m, attrKVs) return attrKVs } + +type NoopProtoEmitter struct{} + +func (n NoopProtoEmitter) Emit(ctx context.Context, m proto.Message, attrKVs ...any) error { + return nil +} + +func (n NoopProtoEmitter) EmitWithLog(ctx context.Context, m proto.Message, attrKVs ...any) error { + return nil +} + +var _ ProtoEmitter = (*NoopProtoEmitter)(nil) diff --git a/relayer/monitoring/pb/data-feeds/on-chain/registry/decode.go b/relayer/monitoring/pb/data-feeds/on-chain/registry/decode.go index dd30e260..0e25db3e 100644 --- a/relayer/monitoring/pb/data-feeds/on-chain/registry/decode.go +++ b/relayer/monitoring/pb/data-feeds/on-chain/registry/decode.go @@ -8,6 +8,7 @@ import ( wt_msg "github.com/smartcontractkit/chainlink-aptos/relayer/monitoring/pb/platform/write-target" "github.com/smartcontractkit/chainlink-aptos/relayer/report/data_feeds" + "github.com/smartcontractkit/chainlink-aptos/relayer/report/llo" "github.com/smartcontractkit/chainlink-aptos/relayer/report/platform" mercury_vX "github.com/smartcontractkit/chainlink-aptos/relayer/report/mercury/common" @@ -25,139 +26,122 @@ func DecodeAsFeedUpdated(m *wt_msg.WriteConfirmed) ([]*FeedUpdated, error) { } // Decode the underlying Data Feeds reports - reports, err := data_feeds.Decode(r.Data) - if err != nil { - return nil, fmt.Errorf("failed to decode Data Feeds report: %w", err) - } - - // Allocate space for the messages (event per updated feed) - msgs := make([]*FeedUpdated, 0, len(*reports)) - - // Iterate over the underlying Mercury reports - for _, rf := range *reports { - // Notice: we assume that Mercury will be the only source of reports used for Data Feeds, - // at least for the foreseeable future. If this assumption changes, we should check the - // the report type here (potentially encoded in the feed ID) and decode accordingly. + mercuryReports, dfErr := data_feeds.Decode(r.Data) - // Decode the common Mercury report - rm, err := mercury_vX.Decode(rf.Data) - if err != nil { - return nil, fmt.Errorf("failed to decode Mercury report: %w", err) - } + // HACK: to check if the report is a Mercury report or an LLO report, this will be removed + // when the generalized Write Target is completed, as it will allow report schemas to be defined + // in the workflow and passed to the Write Target. + if dfErr == nil { + msgs := make([]*FeedUpdated, 0, len(*mercuryReports)) - // Parse the report type - t := mercury_vX.GetReportType(rm.FeedId) + // Allocate space for the messages (event per updated feed) - // Notice: we publish the DataFeed FeedID, not the unrelying DataStream FeedID - feedID := data_feeds.FeedID(rf.FeedId) + // Iterate over the underlying Mercury reports + for _, rf := range *mercuryReports { + // Notice: we assume that Mercury will be the only source of reports used for Data Feeds, + // at least for the foreseeable future. If this assumption changes, we should check the + // the report type here (potentially encoded in the feed ID) and decode accordingly. - switch t { - case uint16(3): - rm, err := mercury_v3.Decode(rf.Data) + // Decode the common Mercury report + rm, err := mercury_vX.Decode(rf.Data) if err != nil { - return nil, fmt.Errorf("failed to decode Mercury v%d report: %w", t, err) + return nil, fmt.Errorf("failed to decode Mercury report: %w", err) } - msgs = append(msgs, &FeedUpdated{ - // Event data - FeedId: feedID.String(), - ObservationsTimestamp: rm.ObservationsTimestamp, - Benchmark: rm.BenchmarkPrice.Bytes(), // Map big.Int as []byte - Report: rf.Data, - - // Notice: i192 will not fit if scaled number bigger than f64 - BenchmarkVal: toBenchmarkVal(feedID, rm.BenchmarkPrice), - - // Head data - when was the event produced on-chain - BlockHash: m.BlockHash, - BlockHeight: m.BlockHeight, - BlockTimestamp: m.BlockTimestamp, - - // Transaction data - info about the tx that mained the event (optional) - // Notice: we skip SOME head/tx data here (unknown), as we map from 'platform.write-target.WriteConfirmed' - // and not from tx/event data (e.g., 'platform.write-target.WriteTxConfirmed') - TxSender: m.Transmitter, - TxReceiver: m.Forwarder, - - // Execution Context - Source - MetaSourceId: m.MetaSourceId, - - // Execution Context - Chain - MetaChainFamilyName: m.MetaChainFamilyName, - MetaChainId: m.MetaChainId, - MetaNetworkName: m.MetaNetworkName, - MetaNetworkNameFull: m.MetaNetworkNameFull, - - // Execution Context - Workflow (capabilities.RequestMetadata) - MetaWorkflowId: m.MetaWorkflowId, - MetaWorkflowOwner: m.MetaWorkflowOwner, - MetaWorkflowExecutionId: m.MetaWorkflowExecutionId, - MetaWorkflowName: m.MetaWorkflowName, - MetaWorkflowDonId: m.MetaWorkflowDonId, - MetaWorkflowDonConfigVersion: m.MetaWorkflowDonConfigVersion, - MetaReferenceId: m.MetaReferenceId, - - // Execution Context - Capability - MetaCapabilityType: m.MetaCapabilityType, - MetaCapabilityId: m.MetaCapabilityId, - MetaCapabilityTimestampStart: m.MetaCapabilityTimestampStart, - MetaCapabilityTimestampEmit: m.MetaCapabilityTimestampEmit, - }) - case uint16(4): - rm, err := mercury_v4.Decode(rf.Data) - if err != nil { - return nil, fmt.Errorf("failed to decode Mercury v%d report: %w", t, err) + // Parse the report type + t := mercury_vX.GetReportType(rm.FeedId) + + // Notice: we publish the DataFeed FeedID, not the unrelying DataStream FeedID + feedID := data_feeds.FeedID(rf.FeedId) + + switch t { + case uint16(3): + rm, err := mercury_v3.Decode(rf.Data) + if err != nil { + return nil, fmt.Errorf("failed to decode Mercury v%d report: %w", t, err) + } + + msgs = append(msgs, newFeedUpdated(m, feedID, rm.ObservationsTimestamp, rm.BenchmarkPrice, rf.Data, true)) + case uint16(4): + rm, err := mercury_v4.Decode(rf.Data) + if err != nil { + return nil, fmt.Errorf("failed to decode Mercury v%d report: %w", t, err) + } + + msgs = append(msgs, newFeedUpdated(m, feedID, rm.ObservationsTimestamp, rm.BenchmarkPrice, rf.Data, false)) + default: + return nil, fmt.Errorf("unsupported Mercury report type: %d", t) } - - msgs = append(msgs, &FeedUpdated{ - // Event data - FeedId: feedID.String(), - ObservationsTimestamp: rm.ObservationsTimestamp, - Benchmark: rm.BenchmarkPrice.Bytes(), // Map big.Int as []byte - Report: rf.Data, - - // Notice: i192 will not fit if scaled number bigger than f64 - BenchmarkVal: toBenchmarkVal(feedID, rm.BenchmarkPrice), - - // Notice: we skip head/tx data here (unknown), as we map from 'platform.write-target.WriteConfirmed' - // and not from tx/event data (e.g., 'platform.write-target.WriteTxConfirmed') - - BlockHash: m.BlockHash, - BlockHeight: m.BlockHeight, - BlockTimestamp: m.BlockTimestamp, - - // Execution Context - Source - MetaSourceId: m.MetaSourceId, - - // Execution Context - Chain - MetaChainFamilyName: m.MetaChainFamilyName, - MetaChainId: m.MetaChainId, - MetaNetworkName: m.MetaNetworkName, - MetaNetworkNameFull: m.MetaNetworkNameFull, - - // Execution Context - Workflow (capabilities.RequestMetadata) - MetaWorkflowId: m.MetaWorkflowId, - MetaWorkflowOwner: m.MetaWorkflowOwner, - MetaWorkflowExecutionId: m.MetaWorkflowExecutionId, - MetaWorkflowName: m.MetaWorkflowName, - MetaWorkflowDonId: m.MetaWorkflowDonId, - MetaWorkflowDonConfigVersion: m.MetaWorkflowDonConfigVersion, - MetaReferenceId: m.MetaReferenceId, - - // Execution Context - Capability - MetaCapabilityType: m.MetaCapabilityType, - MetaCapabilityId: m.MetaCapabilityId, - MetaCapabilityTimestampStart: m.MetaCapabilityTimestampStart, - MetaCapabilityTimestampEmit: m.MetaCapabilityTimestampEmit, - }) - default: - return nil, fmt.Errorf("unsupported Mercury report type: %d", t) } + return msgs, nil + } + lloReports, lloErr := llo.Decode(r.Data) + if lloErr != nil { + return nil, fmt.Errorf("failed to decode DF Report and LLO Report | DF Err: %w, LLO Err: %w", dfErr, lloErr) } + msgs := make([]*FeedUpdated, 0, len(*lloReports)) + for _, rf := range *lloReports { + msgs = append(msgs, newFeedUpdated(m, rf.RemappedID, rf.Timestamp, rf.Price, []byte{}, false)) + } return msgs, nil } +// newFeedUpdated creates a FeedUpdated from the given common parameters. +// If includeTxInfo is true, TxSender and TxReceiver are set. +func newFeedUpdated( + m *wt_msg.WriteConfirmed, + feedID data_feeds.FeedID, + observationsTimestamp uint32, + benchmarkPrice *big.Int, + report []byte, + includeTxInfo bool, +) *FeedUpdated { + fu := &FeedUpdated{ + FeedId: feedID.String(), + ObservationsTimestamp: observationsTimestamp, + Benchmark: benchmarkPrice.Bytes(), + Report: report, + BenchmarkVal: toBenchmarkVal(feedID, benchmarkPrice), + + // Head data - when was the event produced on-chain + BlockHash: m.BlockHash, + BlockHeight: m.BlockHeight, + BlockTimestamp: m.BlockTimestamp, + + // Execution Context - Source + MetaSourceId: m.MetaSourceId, + + // Execution Context - Chain + MetaChainFamilyName: m.MetaChainFamilyName, + MetaChainId: m.MetaChainId, + MetaNetworkName: m.MetaNetworkName, + MetaNetworkNameFull: m.MetaNetworkNameFull, + + // Execution Context - Workflow (capabilities.RequestMetadata) + MetaWorkflowId: m.MetaWorkflowId, + MetaWorkflowOwner: m.MetaWorkflowOwner, + MetaWorkflowExecutionId: m.MetaWorkflowExecutionId, + MetaWorkflowName: m.MetaWorkflowName, + MetaWorkflowDonId: m.MetaWorkflowDonId, + MetaWorkflowDonConfigVersion: m.MetaWorkflowDonConfigVersion, + MetaReferenceId: m.MetaReferenceId, + + // Execution Context - Capability + MetaCapabilityType: m.MetaCapabilityType, + MetaCapabilityId: m.MetaCapabilityId, + MetaCapabilityTimestampStart: m.MetaCapabilityTimestampStart, + MetaCapabilityTimestampEmit: m.MetaCapabilityTimestampEmit, + } + + if includeTxInfo { + fu.TxSender = m.Transmitter + fu.TxReceiver = m.Forwarder + } + + return fu +} + // toBenchmarkVal returns the benchmark i192 on-chain value decoded as an double (float64), scaled by number of decimals (e.g., 1e-18) // Where the number of decimals is extracted from the feed ID. // diff --git a/relayer/report/llo/types.go b/relayer/report/llo/types.go new file mode 100644 index 00000000..740d5f21 --- /dev/null +++ b/relayer/report/llo/types.go @@ -0,0 +1,57 @@ +package llo + +import ( + "fmt" + "math/big" + + "github.com/ethereum/go-ethereum/accounts/abi" +) + +// Encoded with: https://github.com/smartcontractkit/chainlink/blob/develop/core/services/relay/evm/cap_encoder.go +type Report struct { + RemappedID [32]byte + Timestamp uint32 + Price *big.Int // *big.Int is used because go-ethereum converts large uints to *big.Int. +} + +type Reports = []Report + +// Define the ABI schema +var schema = GetSchema() + +func GetSchema() abi.Arguments { + // Helper function to simplify error handling when creating new ABI types. + mustNewType := func(typ string, internalType string, components []abi.ArgumentMarshaling) abi.Type { + result, err := abi.NewType(typ, internalType, components) + if err != nil { + panic(fmt.Sprintf("Unexpected error during abi.NewType: %s", err)) + } + return result + } + + return abi.Arguments([]abi.Argument{ + { + // This defines the array of tuple records. + Type: mustNewType("tuple(bytes32,uint32,uint224)[]", "", []abi.ArgumentMarshaling{ + {Name: "remappedID", Type: "bytes32"}, + {Name: "timestamp", Type: "uint32"}, + {Name: "price", Type: "uint224"}, + }), + }, + }) +} + +// Decode decodes the provided ABI-encoded data into a Prices slice. +func Decode(data []byte) (*Reports, error) { + values, err := schema.Unpack(data) + if err != nil { + return nil, fmt.Errorf("failed to decode report: %w", err) + } + + var decoded []Report + if err = schema.Copy(&decoded, values); err != nil { + return nil, fmt.Errorf("failed to copy report values to struct: %w", err) + } + + return &decoded, nil +} diff --git a/relayer/report/llo/types_test.go b/relayer/report/llo/types_test.go new file mode 100644 index 00000000..467a1598 --- /dev/null +++ b/relayer/report/llo/types_test.go @@ -0,0 +1,66 @@ +package llo_test + +import ( + "math/big" + "testing" + + "github.com/smartcontractkit/chainlink-aptos/relayer/report/llo" +) + +func TestDecodeFeedReport(t *testing.T) { + // Create some sample records. + original := []llo.Report{ + { + // Example feedID: the first byte is 0x01 and the remainder are zeros. + RemappedID: [32]byte{0x01}, + Price: big.NewInt(1234567890123456789), + Timestamp: 1620000000, + }, + { + RemappedID: [32]byte{0xAA, 0xBB, 0xCC}, + Price: big.NewInt(123), + Timestamp: 1630000000, + }, + } + + // Get the ABI schema from our constructor. + schema := llo.GetSchema() + + // Pack the original data using the ABI schema. + encoded, err := schema.Pack(original) + if err != nil { + t.Fatalf("failed to pack data: %v", err) + } + + // Decode the data using our Decode function. + decoded, err := llo.Decode(encoded) + if err != nil { + t.Fatalf("failed to decode data: %v", err) + } + + // Check that the lengths match. + if len(*decoded) != len(original) { + t.Fatalf("expected %d records, got %d", len(original), len(*decoded)) + } + + // Compare each record field by field. + for i := range original { + origRecord := original[i] + decRecord := (*decoded)[i] + + // Compare FeedID. + if origRecord.RemappedID != decRecord.RemappedID { + t.Errorf("record %d: mismatched FeedID: expected %x, got %x", i, origRecord.RemappedID, decRecord.RemappedID) + } + + // Compare Price using big.Int.Cmp. + if origRecord.Price.Cmp(decRecord.Price) != 0 { + t.Errorf("record %d: mismatched Price: expected %v, got %v", i, origRecord.Price, decRecord.Price) + } + + // Compare Timestamp. + if origRecord.Timestamp != decRecord.Timestamp { + t.Errorf("record %d: mismatched Timestamp: expected %d, got %d", i, origRecord.Timestamp, decRecord.Timestamp) + } + } +} diff --git a/relayer/write_target/aptos/write_target_monitor.go b/relayer/write_target/aptos/write_target_monitor.go index 52af8916..1fe255b0 100644 --- a/relayer/write_target/aptos/write_target_monitor.go +++ b/relayer/write_target/aptos/write_target_monitor.go @@ -30,7 +30,6 @@ const ( func NewAptosWriteTargetMonitor(ctx context.Context, lggr logger.Logger) (*monitor.BeholderClient, error) { // Initialize the Beholder client with a local logger a custom Emitter client := beholder.GetClient().ForPackage("write_target") - registryMetrics, err := registry.NewMetrics() if err != nil { return nil, fmt.Errorf("failed to create new registry metrics: %w", err) diff --git a/relayer/write_target/mocks/chain_service.go b/relayer/write_target/mocks/chain_service.go new file mode 100644 index 00000000..38865dd4 --- /dev/null +++ b/relayer/write_target/mocks/chain_service.go @@ -0,0 +1,93 @@ +// Code generated by mockery v2.53.3. DO NOT EDIT. + +package mocks + +import ( + context "context" + + types "github.com/smartcontractkit/chainlink-common/pkg/types" + mock "github.com/stretchr/testify/mock" +) + +// ChainService is an autogenerated mock type for the chainService type +type ChainService struct { + mock.Mock +} + +type ChainService_Expecter struct { + mock *mock.Mock +} + +func (_m *ChainService) EXPECT() *ChainService_Expecter { + return &ChainService_Expecter{mock: &_m.Mock} +} + +// LatestHead provides a mock function with given fields: ctx +func (_m *ChainService) LatestHead(ctx context.Context) (types.Head, error) { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for LatestHead") + } + + var r0 types.Head + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (types.Head, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) types.Head); ok { + r0 = rf(ctx) + } else { + r0 = ret.Get(0).(types.Head) + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ChainService_LatestHead_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'LatestHead' +type ChainService_LatestHead_Call struct { + *mock.Call +} + +// LatestHead is a helper method to define mock.On call +// - ctx context.Context +func (_e *ChainService_Expecter) LatestHead(ctx interface{}) *ChainService_LatestHead_Call { + return &ChainService_LatestHead_Call{Call: _e.mock.On("LatestHead", ctx)} +} + +func (_c *ChainService_LatestHead_Call) Run(run func(ctx context.Context)) *ChainService_LatestHead_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *ChainService_LatestHead_Call) Return(_a0 types.Head, _a1 error) *ChainService_LatestHead_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *ChainService_LatestHead_Call) RunAndReturn(run func(context.Context) (types.Head, error)) *ChainService_LatestHead_Call { + _c.Call.Return(run) + return _c +} + +// NewChainService creates a new instance of ChainService. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewChainService(t interface { + mock.TestingT + Cleanup(func()) +}) *ChainService { + mock := &ChainService{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/relayer/write_target/mocks/contract_reader.go b/relayer/write_target/mocks/contract_reader.go new file mode 100644 index 00000000..f9c49580 --- /dev/null +++ b/relayer/write_target/mocks/contract_reader.go @@ -0,0 +1,87 @@ +// Code generated by mockery v2.53.3. DO NOT EDIT. + +package mocks + +import ( + context "context" + + primitives "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" + mock "github.com/stretchr/testify/mock" +) + +// ContractReader is an autogenerated mock type for the contractReader type +type ContractReader struct { + mock.Mock +} + +type ContractReader_Expecter struct { + mock *mock.Mock +} + +func (_m *ContractReader) EXPECT() *ContractReader_Expecter { + return &ContractReader_Expecter{mock: &_m.Mock} +} + +// GetLatestValue provides a mock function with given fields: ctx, readIdentifier, confidenceLevel, params, returnVal +func (_m *ContractReader) GetLatestValue(ctx context.Context, readIdentifier string, confidenceLevel primitives.ConfidenceLevel, params interface{}, returnVal interface{}) error { + ret := _m.Called(ctx, readIdentifier, confidenceLevel, params, returnVal) + + if len(ret) == 0 { + panic("no return value specified for GetLatestValue") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, primitives.ConfidenceLevel, interface{}, interface{}) error); ok { + r0 = rf(ctx, readIdentifier, confidenceLevel, params, returnVal) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// ContractReader_GetLatestValue_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetLatestValue' +type ContractReader_GetLatestValue_Call struct { + *mock.Call +} + +// GetLatestValue is a helper method to define mock.On call +// - ctx context.Context +// - readIdentifier string +// - confidenceLevel primitives.ConfidenceLevel +// - params interface{} +// - returnVal interface{} +func (_e *ContractReader_Expecter) GetLatestValue(ctx interface{}, readIdentifier interface{}, confidenceLevel interface{}, params interface{}, returnVal interface{}) *ContractReader_GetLatestValue_Call { + return &ContractReader_GetLatestValue_Call{Call: _e.mock.On("GetLatestValue", ctx, readIdentifier, confidenceLevel, params, returnVal)} +} + +func (_c *ContractReader_GetLatestValue_Call) Run(run func(ctx context.Context, readIdentifier string, confidenceLevel primitives.ConfidenceLevel, params interface{}, returnVal interface{})) *ContractReader_GetLatestValue_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(primitives.ConfidenceLevel), args[3].(interface{}), args[4].(interface{})) + }) + return _c +} + +func (_c *ContractReader_GetLatestValue_Call) Return(_a0 error) *ContractReader_GetLatestValue_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *ContractReader_GetLatestValue_Call) RunAndReturn(run func(context.Context, string, primitives.ConfidenceLevel, interface{}, interface{}) error) *ContractReader_GetLatestValue_Call { + _c.Call.Return(run) + return _c +} + +// NewContractReader creates a new instance of ContractReader. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewContractReader(t interface { + mock.TestingT + Cleanup(func()) +}) *ContractReader { + mock := &ContractReader{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/relayer/write_target/mocks/contract_writer.go b/relayer/write_target/mocks/contract_writer.go new file mode 100644 index 00000000..7019d2fc --- /dev/null +++ b/relayer/write_target/mocks/contract_writer.go @@ -0,0 +1,149 @@ +// Code generated by mockery v2.53.3. DO NOT EDIT. + +package mocks + +import ( + context "context" + big "math/big" + + mock "github.com/stretchr/testify/mock" + + types "github.com/smartcontractkit/chainlink-common/pkg/types" +) + +// ContractWriter is an autogenerated mock type for the contractWriter type +type ContractWriter struct { + mock.Mock +} + +type ContractWriter_Expecter struct { + mock *mock.Mock +} + +func (_m *ContractWriter) EXPECT() *ContractWriter_Expecter { + return &ContractWriter_Expecter{mock: &_m.Mock} +} + +// GetTransactionStatus provides a mock function with given fields: ctx, transactionID +func (_m *ContractWriter) GetTransactionStatus(ctx context.Context, transactionID string) (types.TransactionStatus, error) { + ret := _m.Called(ctx, transactionID) + + if len(ret) == 0 { + panic("no return value specified for GetTransactionStatus") + } + + var r0 types.TransactionStatus + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) (types.TransactionStatus, error)); ok { + return rf(ctx, transactionID) + } + if rf, ok := ret.Get(0).(func(context.Context, string) types.TransactionStatus); ok { + r0 = rf(ctx, transactionID) + } else { + r0 = ret.Get(0).(types.TransactionStatus) + } + + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, transactionID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ContractWriter_GetTransactionStatus_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetTransactionStatus' +type ContractWriter_GetTransactionStatus_Call struct { + *mock.Call +} + +// GetTransactionStatus is a helper method to define mock.On call +// - ctx context.Context +// - transactionID string +func (_e *ContractWriter_Expecter) GetTransactionStatus(ctx interface{}, transactionID interface{}) *ContractWriter_GetTransactionStatus_Call { + return &ContractWriter_GetTransactionStatus_Call{Call: _e.mock.On("GetTransactionStatus", ctx, transactionID)} +} + +func (_c *ContractWriter_GetTransactionStatus_Call) Run(run func(ctx context.Context, transactionID string)) *ContractWriter_GetTransactionStatus_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *ContractWriter_GetTransactionStatus_Call) Return(_a0 types.TransactionStatus, _a1 error) *ContractWriter_GetTransactionStatus_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *ContractWriter_GetTransactionStatus_Call) RunAndReturn(run func(context.Context, string) (types.TransactionStatus, error)) *ContractWriter_GetTransactionStatus_Call { + _c.Call.Return(run) + return _c +} + +// SubmitTransaction provides a mock function with given fields: ctx, contractName, method, args, transactionID, toAddress, meta, value +func (_m *ContractWriter) SubmitTransaction(ctx context.Context, contractName string, method string, args interface{}, transactionID string, toAddress string, meta *types.TxMeta, value *big.Int) error { + ret := _m.Called(ctx, contractName, method, args, transactionID, toAddress, meta, value) + + if len(ret) == 0 { + panic("no return value specified for SubmitTransaction") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, string, interface{}, string, string, *types.TxMeta, *big.Int) error); ok { + r0 = rf(ctx, contractName, method, args, transactionID, toAddress, meta, value) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// ContractWriter_SubmitTransaction_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SubmitTransaction' +type ContractWriter_SubmitTransaction_Call struct { + *mock.Call +} + +// SubmitTransaction is a helper method to define mock.On call +// - ctx context.Context +// - contractName string +// - method string +// - args interface{} +// - transactionID string +// - toAddress string +// - meta *types.TxMeta +// - value *big.Int +func (_e *ContractWriter_Expecter) SubmitTransaction(ctx interface{}, contractName interface{}, method interface{}, args interface{}, transactionID interface{}, toAddress interface{}, meta interface{}, value interface{}) *ContractWriter_SubmitTransaction_Call { + return &ContractWriter_SubmitTransaction_Call{Call: _e.mock.On("SubmitTransaction", ctx, contractName, method, args, transactionID, toAddress, meta, value)} +} + +func (_c *ContractWriter_SubmitTransaction_Call) Run(run func(ctx context.Context, contractName string, method string, args interface{}, transactionID string, toAddress string, meta *types.TxMeta, value *big.Int)) *ContractWriter_SubmitTransaction_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(string), args[3].(interface{}), args[4].(string), args[5].(string), args[6].(*types.TxMeta), args[7].(*big.Int)) + }) + return _c +} + +func (_c *ContractWriter_SubmitTransaction_Call) Return(_a0 error) *ContractWriter_SubmitTransaction_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *ContractWriter_SubmitTransaction_Call) RunAndReturn(run func(context.Context, string, string, interface{}, string, string, *types.TxMeta, *big.Int) error) *ContractWriter_SubmitTransaction_Call { + _c.Call.Return(run) + return _c +} + +// NewContractWriter creates a new instance of ContractWriter. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewContractWriter(t interface { + mock.TestingT + Cleanup(func()) +}) *ContractWriter { + mock := &ContractWriter{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/relayer/write_target/write_target.go b/relayer/write_target/write_target.go index b89a018b..a640fd29 100644 --- a/relayer/write_target/write_target.go +++ b/relayer/write_target/write_target.go @@ -6,6 +6,7 @@ import ( "context" "encoding/binary" "encoding/hex" + "errors" "fmt" "math/big" "time" @@ -45,6 +46,19 @@ const ( ContractMethodName_getTransmitter = "getTransmitter" ) +type chainService interface { + LatestHead(ctx context.Context) (commontypes.Head, error) +} + +type contractReader interface { + GetLatestValue(ctx context.Context, readIdentifier string, confidenceLevel primitives.ConfidenceLevel, params, returnVal any) error +} + +type contractWriter interface { + SubmitTransaction(ctx context.Context, contractName, method string, args any, transactionID string, toAddress string, meta *commontypes.TxMeta, value *big.Int) error + GetTransactionStatus(ctx context.Context, transactionID string) (commontypes.TransactionStatus, error) +} + type writeTarget struct { capabilities.CapabilityInfo @@ -55,10 +69,11 @@ type writeTarget struct { // Local beholder client, also hosting the protobuf emitter beholder *monitor.BeholderClient - cs commontypes.ChainService - cr commontypes.ContractReader - cw commontypes.ContractWriter + cs chainService + cr contractReader + cw contractWriter configValidateFn func(config ReqConfig) error + decodeReport func(report []byte, metadata capabilities.RequestMetadata) (*platform.Report, error) nodeAddress string forwarderAddress string @@ -76,9 +91,9 @@ type WriteTargetOpts struct { Logger logger.Logger Beholder *monitor.BeholderClient - ChainService commontypes.ChainService - ContractReader commontypes.ContractReader - ChainWriter commontypes.ContractWriter + ChainService chainService + ContractReader contractReader + ChainWriter contractWriter ConfigValidateFn func(config ReqConfig) error NodeAddress string @@ -116,6 +131,10 @@ func NewWriteTargetID(chainFamilyName, networkName, chainID, version string) (st // TODO: opts.Config input is not validated for sanity func NewWriteTarget(opts WriteTargetOpts) capabilities.TargetCapability { + return newWriteTarget(opts) +} + +func newWriteTarget(opts WriteTargetOpts) *writeTarget { capInfo := capabilities.MustNewCapabilityInfo(opts.ID, capabilities.CapabilityTypeTarget, CapabilityName) return &writeTarget{ @@ -128,6 +147,7 @@ func NewWriteTarget(opts WriteTargetOpts) capabilities.TargetCapability { opts.ContractReader, opts.ChainWriter, opts.ConfigValidateFn, + decodeReport, opts.NodeAddress, opts.ForwarderAddress, } @@ -230,18 +250,9 @@ func (c *writeTarget) Execute(ctx context.Context, request capabilities.Capabili } // Decode the report - reportDecoded, err := platform.Decode(inputs.Report) + reportDecoded, err := c.decodeReport(inputs.Report, request.Metadata) if err != nil { - msg := builder.buildWriteError(info, 0, "failed to decode the report", err.Error()) - return capabilities.CapabilityResponse{}, c.asEmittedError(ctx, msg) - } - - // Validate encoded report is prefixed with workflowID and executionID that match the request meta - if reportDecoded.ExecutionID != request.Metadata.WorkflowExecutionID { - msg := builder.buildWriteError(info, 0, "decoded report execution ID does not match the request", "") - return capabilities.CapabilityResponse{}, c.asEmittedError(ctx, msg) - } else if reportDecoded.WorkflowID != request.Metadata.WorkflowID { - msg := builder.buildWriteError(info, 0, "decoded report workflow ID does not match the request", "") + msg := builder.buildWriteError(info, 0, "report is invalid", err.Error()) return capabilities.CapabilityResponse{}, c.asEmittedError(ctx, msg) } @@ -395,11 +406,30 @@ func (c *writeTarget) Execute(ctx context.Context, request capabilities.Capabili // TODO: implement a background WriteTxConfirmer to periodically source new events/transactions, // relevant to this forwarder), and emit write-tx-accepted/confirmed events. - - go c.acceptAndConfirmWrite(ctx, *info, txID, query) + err = c.acceptAndConfirmWrite(ctx, *info, txID, query) + if err != nil { + return capabilities.CapabilityResponse{}, err + } return success(), nil } +func decodeReport(report []byte, metadata capabilities.RequestMetadata) (*platform.Report, error) { + // Decode the report + reportDecoded, err := platform.Decode(report) + if err != nil { + return nil, fmt.Errorf("failed to decode report [%s]: %w", string(report), err) + } + + // Validate encoded report is prefixed with workflowID and executionID that match the request meta + if reportDecoded.ExecutionID != metadata.WorkflowExecutionID { + return nil, errors.New("decoded report execution ID does not match the request") + } else if reportDecoded.WorkflowID != metadata.WorkflowID { + return nil, errors.New("decoded report execution ID does not match the request") + } + + return reportDecoded, nil +} + func (c *writeTarget) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error { // TODO: notify the background WriteTxConfirmer (workflow registered) return nil @@ -416,7 +446,7 @@ func (c *writeTarget) UnregisterFromWorkflow(ctx context.Context, request capabi // - 'platform.write-target.WriteAccepted' if accepted (with or without an error) // - 'platform.write-target.WriteError' if accepted (with an error) // - 'platform.write-target.WriteConfirmed' if confirmed (until timeout) -func (c *writeTarget) acceptAndConfirmWrite(ctx context.Context, info requestInfo, txID uuid.UUID, query func(context.Context) (*TransmissionState, error)) { +func (c *writeTarget) acceptAndConfirmWrite(ctx context.Context, info requestInfo, txID uuid.UUID, query func(context.Context) (*TransmissionState, error)) error { attrs := c.traceAttributes(info.request.Metadata.WorkflowExecutionID) _, span := c.beholder.Tracer.Start(ctx, "Execute.acceptAndConfirmWrite", trace.WithAttributes(attrs...)) defer span.End() @@ -425,7 +455,7 @@ func (c *writeTarget) acceptAndConfirmWrite(ctx context.Context, info requestInf // Timeout for the confirmation process timeout := c.config.ConfirmerTimeout.Duration() - ctx, cancel := context.WithTimeout(context.Background(), timeout) + ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() // Retry interval for the confirmation process @@ -438,37 +468,28 @@ func (c *writeTarget) acceptAndConfirmWrite(ctx context.Context, info requestInf capInfo, _ := c.Info(ctx) builder := NewMessageBuilder(c.chainInfo, capInfo) - // Fn helpers - checkAcceptedStatus := func(ctx context.Context) (commontypes.TransactionStatus, bool, error) { - // Check TXM for status - status, err := c.cw.GetTransactionStatus(ctx, txID.String()) - if err != nil { - return commontypes.Unknown, false, fmt.Errorf("failed to get tx status: %w", err) - } - - lggr.Debugw("txm - tx status", "txID", txID, "status", status) - - // Check if the transaction was accepted (included in a chain block, not required to be finalized) - // Notice: 'Unconfirmed' is used by TXM to indicate the transaction is not yet included in a block, - // while 'Included' (N/A yet) could be used to indicate the transaction is included in a block but not yet finalized. - if /* status == commontypes.Included || */ status == commontypes.Finalized { - return status, true, nil - } - - // false if [Unknown, Pending, Failed, Fatal] - return status, false, nil + txFinalized, err := c.waitTxReachesTerminalStatus(ctx, lggr, txID) + if err != nil { + // We (eventually) failed to confirm the report was transmitted + msg := builder.buildWriteError(&info, 0, "failed to wait until tx gets finalized", err.Error()) + lggr.Errorw("failed to wait until tx gets finalized", "txID", txID, "error", err) + _ = c.beholder.ProtoEmitter.Emit(ctx, msg) + return msg.AsError() } - checkConfirmedStatus := query - // Store the acceptance status - accepted := false + checkConfirmedStatus := query for { select { case <-ctx.Done(): // We (eventually) failed to confirm the report was transmitted - _ = c.beholder.ProtoEmitter.EmitWithLog(ctx, builder.buildWriteError(&info, 0, "write confirmation - failed", "timed out")) - return + cause := "transaction was finalized, but report was not observed on chain before timeout" + if !txFinalized { + cause = "transaction failed and no other node managed to get report on chain before timeout" + } + msg := builder.buildWriteError(&info, 0, "write confirmation - failed", cause) + _ = c.beholder.ProtoEmitter.EmitWithLog(ctx, msg) + return msg.AsError() case <-ticker.C: // Fetch the latest head from the chain (timestamp) head, err := c.cs.LatestHead(ctx) @@ -477,32 +498,6 @@ func (c *writeTarget) acceptAndConfirmWrite(ctx context.Context, info requestInf continue } - if !accepted { - // Check acceptance status - status, accepted, err := checkAcceptedStatus(ctx) - if err != nil { - lggr.Errorw("failed to check accepted status", "txID", txID, "err", err) - continue - } - - if !accepted { - lggr.Infow("not accepted yet", "txID", txID, "status", status) - continue - } - - lggr.Infow("accepted", "txID", txID, "status", status) - // Notice: report write confirmation is only possible after a tx is accepted without an error - // TODO: [Beholder] Emit 'platform.write-target.WriteAccepted' (useful to source tx hash, block number, and tx status/error) - - // TODO: check if accepted with an error (e.g., on-chain revert) - // Notice: this functionality is not available in the current CW/TXM API - acceptedWithErr := false - if acceptedWithErr { //nolint:staticcheck - // TODO: [Beholder] Emit 'platform.write-target.WriteError' if accepted with an error (surface specific on-chain error) - // Notice: no return, we continue to check for confirmation (tx could be accepted by another node) - } - } - // Check confirmation status (transmission state) state, err := checkConfirmedStatus(ctx) if err != nil { @@ -517,13 +512,57 @@ func (c *writeTarget) acceptAndConfirmWrite(ctx context.Context, info requestInf // We (eventually) confirmed the report was transmitted // Emit the confirmation message and return - lggr.Infow("confirmed - transmission state visible", "txID", txID) + if !txFinalized { + lggr.Infow("confirmed - transmission state visible but submitted by another node. This node's tx failed", "txID", txID) + } else { + lggr.Infow("confirmed - transmission state visible", "txID", txID) + } // Source the transmitter address from the on-chain state info.reportTransmissionState = state _ = c.beholder.ProtoEmitter.EmitWithLog(ctx, builder.buildWriteConfirmed(&info, head)) - return + + return nil + } + } +} + +// Polls transaction status until it reaches one of terminal states [Finalized, Failed, Fatal] +func (c *writeTarget) waitTxReachesTerminalStatus(ctx context.Context, lggr logger.Logger, txID uuid.UUID) (finalized bool, err error) { + // Retry interval for the confirmation process + interval := c.config.ConfirmerPollPeriod.Duration() + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return false, ctx.Err() + case <-ticker.C: + // Check TXM for status + status, err := c.cw.GetTransactionStatus(ctx, txID.String()) + if err != nil { + lggr.Errorw("failed to fetch the transaction status", "txID", txID, "err", err) + continue + } + + lggr.Debugw("txm - tx status", "txID", txID, "status", status) + + switch status { + case commontypes.Finalized: + // Notice: report write confirmation is only possible after a tx is accepted without an error + // TODO: [Beholder] Emit 'platform.write-target.WriteAccepted' (useful to source tx hash, block number, and tx status/error) + lggr.Infow("accepted", "txID", txID, "status", status) + return true, nil + case commontypes.Failed, commontypes.Fatal: + // TODO: [Beholder] Emit 'platform.write-target.WriteError' if accepted with an error (surface specific on-chain error) + lggr.Infow("transaction failed", "txID", txID, "status", status) + return false, nil + default: + lggr.Infow("not accepted yet", "txID", txID, "status", status) + continue + } } } } diff --git a/relayer/write_target/write_target_test.go b/relayer/write_target/write_target_test.go index 1f5c3193..8180e531 100644 --- a/relayer/write_target/write_target_test.go +++ b/relayer/write_target/write_target_test.go @@ -1,9 +1,28 @@ package write_target import ( + "context" + "encoding/binary" + "encoding/hex" "testing" + "time" + "github.com/smartcontractkit/chainlink-common/pkg/beholder" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/consensus/ocr3/types" + "github.com/smartcontractkit/chainlink-common/pkg/config" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + commontypes "github.com/smartcontractkit/chainlink-common/pkg/types" + "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" + "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" + "github.com/smartcontractkit/chainlink-common/pkg/values" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "go.uber.org/zap/zapcore" + + "github.com/smartcontractkit/chainlink-aptos/relayer/monitor" + "github.com/smartcontractkit/chainlink-aptos/relayer/report/platform" + "github.com/smartcontractkit/chainlink-aptos/relayer/write_target/mocks" ) func TestNewWriteTargetID(t *testing.T) { @@ -95,3 +114,169 @@ func TestNewWriteTargetID(t *testing.T) { }) } } + +type mockedWriteTarget struct { + cs *mocks.ChainService + cr *mocks.ContractReader + cw *mocks.ContractWriter + wt *writeTarget +} + +func newMockedWriteTarget(t *testing.T, lggr logger.Logger) mockedWriteTarget { + cs := mocks.NewChainService(t) + cr := mocks.NewContractReader(t) + cw := mocks.NewContractWriter(t) + beholderClient, err := beholder.NewStdoutClient() + require.NoError(t, err) + bh := &monitor.BeholderClient{Client: beholderClient, ProtoEmitter: monitor.NoopProtoEmitter{}} + require.NoError(t, err) + + wt := newWriteTarget(WriteTargetOpts{ + ID: "write_aptos-1@1.0.0", + Config: Config{ + ConfirmerPollPeriod: *config.MustNewDuration(100 * time.Millisecond), + ConfirmerTimeout: *config.MustNewDuration(300 * time.Millisecond), + }, + ChainInfo: ChainInfo{}, + Logger: lggr, + Beholder: bh, + ChainService: cs, + ContractReader: cr, + ChainWriter: cw, + ConfigValidateFn: func(config ReqConfig) error { return nil }, + NodeAddress: "", + ForwarderAddress: "", + }) + wt.decodeReport = func(report []byte, metadata capabilities.RequestMetadata) (*platform.Report, error) { + return &platform.Report{}, nil + } + return mockedWriteTarget{ + cs: cs, + cr: cr, + cw: cw, + wt: wt, + } +} + +func createValidRequest(t *testing.T) capabilities.CapabilityRequest { + signedReport, err := values.Wrap(types.SignedReport{ + ID: binary.BigEndian.AppendUint16(nil, 8), + Report: []byte("Report payload"), // no need not include valid metadata, since report validation is mocked + }) + require.NoError(t, err) + inputs, err := values.NewMap(map[string]any{ + KeySignedReport: signedReport, + }) + require.NoError(t, err) + return capabilities.CapabilityRequest{ + Metadata: capabilities.RequestMetadata{ + WorkflowExecutionID: hex.EncodeToString([]byte("WorkflowExecutionID")), + }, + Config: values.EmptyMap(), + Inputs: inputs, + } +} + +func TestWriteTarget_Execute(t *testing.T) { + t.Parallel() + t.Run("Returns error if tx is not finalized before timeout", func(t *testing.T) { + mockedWT := newMockedWriteTarget(t, logger.Test(t)) + mockedWT.cs.EXPECT().LatestHead(mock.Anything).Return(commontypes.Head{}, nil).Once() + // Mocks getTransmissionState. Signal that report was not transmitter to trigger creation of a new transaction. + mockedWT.cr.EXPECT().GetLatestValue(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() + // ContractWriter accepts transaction + mockedWT.cw.EXPECT().SubmitTransaction(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() + // Transaction never reaches terminal state + mockedWT.cw.EXPECT().GetTransactionStatus(mock.Anything, mock.Anything).Return(commontypes.Pending, nil) + + request := createValidRequest(t) + _, err := mockedWT.wt.Execute(t.Context(), request) + require.EqualError(t, err, "platform.write_target.WriteError [ERR-0] - failed to wait until tx gets finalized: context deadline exceeded") + }) + t.Run("Returns error if tx reaches terminal status, but report is not on chain", func(t *testing.T) { + testCases := []struct { + TransactionStatus commontypes.TransactionStatus + ExpectedError string + }{ + { + TransactionStatus: commontypes.Finalized, + ExpectedError: "platform.write_target.WriteError [ERR-0] - write confirmation - failed: transaction was finalized, but report was not observed on chain before timeout", + }, + { + TransactionStatus: commontypes.Fatal, + ExpectedError: "platform.write_target.WriteError [ERR-0] - write confirmation - failed: transaction failed and no other node managed to get report on chain before timeout", + }, + { + TransactionStatus: commontypes.Failed, + ExpectedError: "platform.write_target.WriteError [ERR-0] - write confirmation - failed: transaction failed and no other node managed to get report on chain before timeout", + }, + } + for _, tc := range testCases { + mockedWT := newMockedWriteTarget(t, logger.Test(t)) + mockedWT.cs.EXPECT().LatestHead(mock.Anything).Return(commontypes.Head{}, nil) + // Mocks getTransmissionState. Since return value is not modified - signals that report was not accepted. + // First call is required to trigger transaction submission, subsequent calls to cause timeout error + mockedWT.cr.EXPECT().GetLatestValue(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + // ContractWriter accepts transaction + mockedWT.cw.EXPECT().SubmitTransaction(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() + // Returns terminal transaction status + mockedWT.cw.EXPECT().GetTransactionStatus(mock.Anything, mock.Anything).Return(tc.TransactionStatus, nil).Once() + + request := createValidRequest(t) + _, err := mockedWT.wt.Execute(t.Context(), request) + require.EqualError(t, err, tc.ExpectedError) + } + }) + t.Run("Returns success if report is on chains", func(t *testing.T) { + testCases := []struct { + TransactionStatus commontypes.TransactionStatus + ExpectedLogMsg string + }{ + { + TransactionStatus: commontypes.Finalized, + ExpectedLogMsg: "confirmed - transmission state visible", + }, + { + TransactionStatus: commontypes.Fatal, + ExpectedLogMsg: "confirmed - transmission state visible but submitted by another node. This node's tx failed", + }, + { + TransactionStatus: commontypes.Failed, + ExpectedLogMsg: "confirmed - transmission state visible but submitted by another node. This node's tx failed", + }, + } + for _, tc := range testCases { + lggr, observed := logger.TestObserved(t, zapcore.InfoLevel) + mockedWT := newMockedWriteTarget(t, lggr) + mockedWT.cs.EXPECT().LatestHead(mock.Anything).Return(commontypes.Head{Height: "12"}, nil) + secondCall := false + // On the first trigger transaction submission by setting transmitted to `false`, on second call return + // true to signal that report is on chain. + mockedWT.cr.EXPECT().GetLatestValue(mock.Anything, "-forwarder-getTransmissionState", mock.Anything, mock.Anything, mock.Anything).RunAndReturn( + func(ctx context.Context, s string, level primitives.ConfidenceLevel, inputs interface{}, rawTransmitted interface{}) error { + transmitted := rawTransmitted.(*bool) + *transmitted = secondCall // return false on the first call to trigger transaction + secondCall = true + return nil + }).Twice() + // Returns address of the report transmitter + mockedWT.cr.EXPECT().GetLatestValue(mock.Anything, "-forwarder-getTransmitter", mock.Anything, mock.Anything, mock.Anything).RunAndReturn( + func(ctx context.Context, s string, level primitives.ConfidenceLevel, inputs interface{}, rawTransmitterAddr interface{}) error { + transmitterAddr := rawTransmitterAddr.(*struct { + Vec []string + }) + transmitterAddr.Vec = []string{"0x0abc"} + return nil + }).Once() + // signal that transaction was accepted by CW + mockedWT.cw.EXPECT().SubmitTransaction(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() + // signal that transaction is in terminal state and it's time to poll for transmission status + mockedWT.cw.EXPECT().GetTransactionStatus(mock.Anything, mock.Anything).Return(tc.TransactionStatus, nil) + request := createValidRequest(t) + result, err := mockedWT.wt.Execute(t.Context(), request) + require.NoError(t, err) + require.Equal(t, success(), result) + tests.RequireLogMessage(t, observed, tc.ExpectedLogMsg) + } + }) +} diff --git a/shell.nix b/shell.nix index 4444d1da..55e0342a 100644 --- a/shell.nix +++ b/shell.nix @@ -11,10 +11,28 @@ go_1_23 gopls delve - (golangci-lint.override {buildGoModule = buildGo122Module;}) + # override to lock 1.64 version that is currently used by CI + (golangci-lint.overrideAttrs (old: rec { + version = "1.64.8"; + src = fetchFromGitHub { + owner = "golangci"; + repo = "golangci-lint"; + rev = "v${version}"; + hash = "sha256-H7IdXAleyzJeDFviISitAVDNJmiwrMysYcGm6vAoWso="; + }; + vendorHash = "sha256-i7ec4U4xXmRvHbsDiuBjbQ0xP7xRuilky3gi+dT1H10="; + + ldflags = [ + "-s" + "-X main.version=${version}" + "-X main.commit=v${version}" + "-X main.date=19700101-00:00:00" + ]; + })) gotools # Official golang implementation of the Ethereum protocol (e.g., geth, abigen, rlpdump, etc.) go-ethereum + go-mockery # Protobuf + plugins/tools protobuf