diff --git a/ecommerce/processes/.mutant.yml b/ecommerce/processes/.mutant.yml index bad506a6..c082e82b 100644 --- a/ecommerce/processes/.mutant.yml +++ b/ecommerce/processes/.mutant.yml @@ -11,13 +11,10 @@ matcher: - Processes::Configuration* - Processes::OrderConfirmation#stream_name - Processes::Test* - - Processes::ShipmentProcess* - Processes::ReleasePaymentProcess* - Processes::OrderItemInvoicingProcess* - Processes::SyncShipmentFromPricing* - Processes::SyncInventoryFromOrdering* - Processes::NotifyPaymentsAboutOrderValue* - Processes::ThreePlusOneFree* - - Processes::ReservationProcess* - - Processes::ReservationProcess::ProcessState#call - Processes::DetermineVatRatesOnOrderPlaced* diff --git a/ecommerce/processes/lib/processes.rb b/ecommerce/processes/lib/processes.rb index 6a4f86f2..fb7216b7 100644 --- a/ecommerce/processes/lib/processes.rb +++ b/ecommerce/processes/lib/processes.rb @@ -44,7 +44,10 @@ def call(event_store, command_bus) private def enable_shipment_process(event_store, command_bus) - ShipmentProcess.new(event_store, command_bus) + event_store.subscribe( + ShipmentProcess.new(event_store, command_bus), + to: ShipmentProcess.subscribed_events + ) end def enable_shipment_sync(event_store, command_bus) diff --git a/ecommerce/processes/lib/processes/reservation_process.rb b/ecommerce/processes/lib/processes/reservation_process.rb index 53ca6296..77368417 100644 --- a/ecommerce/processes/lib/processes/reservation_process.rb +++ b/ecommerce/processes/lib/processes/reservation_process.rb @@ -1,7 +1,5 @@ module Processes class ReservationProcess - include Infra::Retry - class SomeInventoryNotAvailable < StandardError attr_reader :unavailable_products @@ -10,112 +8,94 @@ def initialize(unavailable_products) end end - def initialize(event_store, command_bus) - @event_store = event_store - @command_bus = command_bus + class ProcessState < Data.define(:order, :order_lines) + def initialize(order: nil, order_lines: nil) = super + + def reserved_product_ids = order_lines.keys end - attr_accessor :event_store, :command_bus - def call(event) - state = build_state(event) + include Infra::ProcessManager.with_state(ProcessState) + + subscribes_to( + Pricing::OfferAccepted, + Fulfillment::OrderCancelled, + Fulfillment::OrderConfirmed + ) + + def apply(event) case event when Pricing::OfferAccepted - update_order_state(state) { reserve_stock(state) } + state.with( + order: :accepted, + order_lines: event.data.fetch(:order_lines).map { |ol| [ol.fetch(:product_id), ol.fetch(:quantity)] }.to_h + ) when Fulfillment::OrderCancelled - release_stock(state) + state.with(order: :cancelled) when Fulfillment::OrderConfirmed - dispatch_stock(state) + state.with(order: :confirmed) + end + end + + def act + case state + in order: :accepted + begin + reserve_stock + rescue SomeInventoryNotAvailable => exc + reject_order(exc.unavailable_products) + else + accept_order + end + in order: :cancelled + release_stock(state.reserved_product_ids) + in order: :confirmed + dispatch_stock + else end end private - def reserve_stock(state) + def reserve_stock unavailable_products = [] + reserved_products = [] state.order_lines.each do |product_id, quantity| command_bus.(Inventory::Reserve.new(product_id: product_id, quantity: quantity)) - state.product_reserved(product_id) + reserved_products << product_id rescue Inventory::InventoryEntry::InventoryNotAvailable unavailable_products << product_id end - unless unavailable_products.empty? - release_stock(state) + if unavailable_products.any? + release_stock(reserved_products) raise SomeInventoryNotAvailable.new(unavailable_products) end end - def update_order_state(state) - yield - accept_order(state) - rescue SomeInventoryNotAvailable => exc - reject_order(state, exc.unavailable_products) - end - - def release_stock(state) - state.order_lines.slice(*state.reserved_product_ids).each do |product_id, quantity| + def release_stock(product_ids) + state.order_lines.slice(*product_ids).each do |product_id, quantity| command_bus.(Inventory::Release.new(product_id: product_id, quantity: quantity)) end end - def dispatch_stock(state) + def dispatch_stock state.order_lines.each do |product_id, quantity| command_bus.(Inventory::Dispatch.new(product_id: product_id, quantity: quantity)) end end - def accept_order(state) - command_bus.(Fulfillment::RegisterOrder.new(order_id: state.order_id)) + def accept_order + command_bus.(Fulfillment::RegisterOrder.new(order_id: id)) end - def reject_order(state, unavailable_product_ids) + def reject_order(unavailable_product_ids) command_bus.(Pricing::RejectOffer.new( - order_id: state.order_id, reason: "Some products were unavailable", unavailable_product_ids: ) + order_id: id, reason: "Some products were unavailable", unavailable_product_ids:) ) end - def stream_name(order_id) - "ReservationProcess$#{order_id}" - end - - def build_state(event) - stream_name = stream_name(event.data.fetch(:order_id)) - past_events = nil - begin - with_retry do - past_events = event_store.read.stream(stream_name).to_a - last_stored = past_events.size - 1 - event_store.link(event.event_id, stream_name: stream_name, expected_version: last_stored) - end - rescue RubyEventStore::EventDuplicatedInStream - return - end - ProcessState.new.tap do |state| - past_events.each { |ev| state.call(ev) } - state.call(event) - end - end - - class ProcessState - def initialize() - @reserved_product_ids = [] - end - - attr_reader :order_id, :order_lines, :reserved_product_ids - - def call(event) - case event - when Pricing::OfferAccepted - @order_lines = event.data.fetch(:order_lines).map { |ol| [ol.fetch(:product_id), ol.fetch(:quantity)] }.to_h - @order_id = event.data.fetch(:order_id) - when Fulfillment::OrderCancelled, Fulfillment::OrderConfirmed - @reserved_product_ids = order_lines.keys - end - end - - def product_reserved(product_id) - reserved_product_ids << product_id - end + def fetch_id(event) + event.data.fetch(:order_id) end end end diff --git a/ecommerce/processes/lib/processes/shipment_process.rb b/ecommerce/processes/lib/processes/shipment_process.rb index ea645b93..d22c9d8f 100644 --- a/ecommerce/processes/lib/processes/shipment_process.rb +++ b/ecommerce/processes/lib/processes/shipment_process.rb @@ -1,83 +1,51 @@ module Processes class ShipmentProcess - include Infra::Retry - - def initialize(event_store, command_bus) - @event_store = event_store - @command_bus = command_bus - @event_store.subscribe( - self, - to: [ - Shipping::ShippingAddressAddedToShipment, - Shipping::ShipmentSubmitted, - Fulfillment::OrderRegistered, - Fulfillment::OrderConfirmed - ] - ) - end - - def call(event) - state = build_state(event) - submit_shipment(state) if state.submit? - authorize_shipment(state) if state.authorize? + class ProcessState < Data.define(:order, :shipment) + def initialize(order: nil, shipment: nil) = super end - private - - def submit_shipment(state) - command_bus.call(Shipping::SubmitShipment.new(order_id: state.order_id)) - end + include Infra::ProcessManager.with_state(ProcessState) - def authorize_shipment(state) - command_bus.call(Shipping::AuthorizeShipment.new(order_id: state.order_id)) - end + subscribes_to( + Shipping::ShippingAddressAddedToShipment, + Fulfillment::OrderRegistered, + Fulfillment::OrderConfirmed + ) - attr_reader :command_bus, :event_store + private - def build_state(event) - with_retry do - stream_name = "ShipmentProcess$#{event.data.fetch(:order_id)}" - past_events = event_store.read.stream(stream_name).to_a - last_stored = past_events.size - 1 - event_store.link(event.event_id, stream_name: stream_name, expected_version: last_stored) - ProcessState.new.tap do |state| - past_events.each { |ev| state.call(ev) } - state.call(event) - end + def apply(event) + case event + when Shipping::ShippingAddressAddedToShipment + state.with(shipment: :address_set) + when Fulfillment::OrderRegistered + state.with(order: :placed) + when Fulfillment::OrderConfirmed + state.with(order: :confirmed) end end - class ProcessState - def initialize - @order = :draft - @shipment = :draft - end - - attr_reader :order_id - - def call(event) - case event - when Shipping::ShippingAddressAddedToShipment - @shipment = :address_set - when Shipping::ShipmentSubmitted - @shipment = :submitted - when Fulfillment::OrderRegistered - @order = :placed - @order_id = event.data.fetch(:order_id) - when Fulfillment::OrderConfirmed - @order = :confirmed - end + def act + case state + in { shipment: :address_set, order: :placed } + submit_shipment + in { shipment: :address_set, order: :confirmed } + submit_shipment + authorize_shipment + else end + end - def submit? - return false if @shipment == :submitted + def submit_shipment + command_bus.call(Shipping::SubmitShipment.new(order_id: id)) + end - @shipment == :address_set && @order != :draft - end + def authorize_shipment + command_bus.call(Shipping::AuthorizeShipment.new(order_id: id)) + end - def authorize? - @shipment == :address_set && @order == :confirmed - end + def fetch_id(event) + event.data.fetch(:order_id) end end end diff --git a/ecommerce/processes/test/reservation_process_test.rb b/ecommerce/processes/test/reservation_process_test.rb index 791d256e..807b99b1 100644 --- a/ecommerce/processes/test/reservation_process_test.rb +++ b/ecommerce/processes/test/reservation_process_test.rb @@ -7,7 +7,7 @@ class ReservationProcessTest < Test def test_happy_path process = ReservationProcess.new(event_store, command_bus) - given([offer_accepted]).each { |event| process.call(event) } + given([offer_accepted], process:) assert_all_commands( Inventory::Reserve.new(product_id: product_id, quantity: 1), @@ -16,24 +16,14 @@ def test_happy_path ) end - class EnhancedFakeCommandBus < SimpleDelegator - def initialize(command_bus, command_error_hash = {}) - super(command_bus) - @command_error_hash = command_error_hash - end - - def call(command) - super(command) - raise @command_error_hash[command] if @command_error_hash[command] - end - end - def test_rejects_order_and_compensates_stock_when_sth_is_unavailable failing_command = Inventory::Reserve.new(product_id: product_id, quantity: 1) - enhanced_command_bus = EnhancedFakeCommandBus.new(command_bus, failing_command => Inventory::InventoryEntry::InventoryNotAvailable) - process = ReservationProcess.new(event_store, enhanced_command_bus) + process = ReservationProcess.new( + event_store, + EnhancedFakeCommandBus.new(command_bus, failing_command => Inventory::InventoryEntry::InventoryNotAvailable) + ) - given([offer_accepted]).each { |event| process.call(event) } + given([offer_accepted], process:) assert_all_commands( failing_command, @@ -45,10 +35,11 @@ def test_rejects_order_and_compensates_stock_when_sth_is_unavailable def test_release_stock_when_order_is_cancelled process = ReservationProcess.new(event_store, command_bus) - given([offer_accepted]).each { |event| process.call(event) } - + given([offer_accepted], process:) command_bus.clear_all_received - given([order_cancelled]).each { |event| process.call(event) } + + given([order_cancelled], process:) + assert_all_commands( Inventory::Release.new(product_id: product_id, quantity: 1), Inventory::Release.new(product_id: another_product_id, quantity: 2) @@ -57,10 +48,11 @@ def test_release_stock_when_order_is_cancelled def test_dispatch_stock_when_order_is_confirmed process = ReservationProcess.new(event_store, command_bus) - given([offer_accepted]).each { |event| process.call(event) } - + given([offer_accepted], process:) command_bus.clear_all_received - given([order_confirmed]).each { |event| process.call(event) } + + given([order_confirmed], process:) + assert_all_commands( Inventory::Dispatch.new(product_id: product_id, quantity: 1), Inventory::Dispatch.new(product_id: another_product_id, quantity: 2) @@ -96,5 +88,17 @@ def order_cancelled } ) end + + class EnhancedFakeCommandBus < SimpleDelegator + def initialize(command_bus, command_error_hash = {}) + super(command_bus) + @command_error_hash = command_error_hash + end + + def call(command) + super(command) + raise @command_error_hash[command] if @command_error_hash[command] + end + end end end diff --git a/ecommerce/processes/test/shipment_process_test.rb b/ecommerce/processes/test/shipment_process_test.rb new file mode 100644 index 00000000..0b14e639 --- /dev/null +++ b/ecommerce/processes/test/shipment_process_test.rb @@ -0,0 +1,59 @@ +require_relative "test_helper" + +module Processes + class ShipmentProcessTest < Test + cover "Processes::ShipmentProcess*" + + def test_submit_and_authorize_shipment_when_order_confirmed_and_address_set + given([order_placed, order_confirmed, shipping_address_added], process:) + assert_all_commands( + Shipping::SubmitShipment.new(order_id:), + Shipping::AuthorizeShipment.new(order_id:), + ) + end + + def test_submit_shipment_when_order_placed_then_address_set + given([order_placed, shipping_address_added], process:) + assert_all_commands( + Shipping::SubmitShipment.new(order_id:), + ) + end + + def test_submit_shipment_when_address_set_then_order_placed + given([shipping_address_added, order_placed], process:) + assert_all_commands( + Shipping::SubmitShipment.new(order_id:), + ) + end + + def test_dont_submit_shipment_for_draft_order + given([shipping_address_added], process:) + assert_no_command + end + + def test_need_address_to_submit_shipment + given([order_placed], process:) + assert_no_command + end + + def test_need_address_to_authorize_shipment + given([order_placed, order_confirmed], process:) + assert_no_command + end + + private + + def process + ShipmentProcess.new(event_store, command_bus) + end + + def shipping_address_added + Shipping::ShippingAddressAddedToShipment.new( + data: { + order_id: order_id, + postal_address: { line_1: "123 Some Street", line_2: "", line_3: "", line_4: "" }, + } + ) + end + end +end diff --git a/ecommerce/processes/test/test_helper.rb b/ecommerce/processes/test/test_helper.rb index 9186c84d..67516deb 100644 --- a/ecommerce/processes/test/test_helper.rb +++ b/ecommerce/processes/test/test_helper.rb @@ -58,9 +58,11 @@ def customer_id @customer_id ||= SecureRandom.uuid end - def given(events, store: event_store) - events.each { |ev| store.append(ev) } - events + def given(events, store: event_store, process: nil) + events.each do |ev| + store.append(ev) + process.call(ev) if process + end end def order_placed diff --git a/infra/.mutant.yml b/infra/.mutant.yml index dd398251..e0139f63 100644 --- a/infra/.mutant.yml +++ b/infra/.mutant.yml @@ -24,3 +24,4 @@ matcher: - Infra::Mapper* - Infra::Process* - Infra::EventHandler* + - Infra::ProcessManager* diff --git a/infra/lib/infra.rb b/infra/lib/infra.rb index e93a67e2..af823371 100644 --- a/infra/lib/infra.rb +++ b/infra/lib/infra.rb @@ -14,6 +14,7 @@ require_relative "infra/event" require_relative "infra/event_store" require_relative "infra/process" +require_relative "infra/process_manager" require_relative "infra/retry" require_relative "infra/types" require_relative "infra/testing" diff --git a/infra/lib/infra/process_manager.rb b/infra/lib/infra/process_manager.rb new file mode 100644 index 00000000..b0ab5eb6 --- /dev/null +++ b/infra/lib/infra/process_manager.rb @@ -0,0 +1,60 @@ +module Infra + module ProcessManager + module ProcessMethods + def initialize(event_store, command_bus) + @event_store = event_store + @command_bus = command_bus + end + + def call(event) + @id = fetch_id(event) + build_state(event) + act + end + + private + + attr_reader :event_store, :command_bus, :id + + def build_state(event) + with_retry do + past_events = event_store.read.stream(stream_name).to_a + last_stored = past_events.size - 1 + event_store.link(event.event_id, stream_name:, expected_version: last_stored) + (past_events + [event]).each { |ev| @state = apply(ev) } + end + end + + def stream_name + "#{self.class.name}$#{id}" + end + end + + module Subscriptions + def self.extended(host_class) + host_class.instance_variable_set(:@subscribed_events, []) + end + + def subscribes_to(*events) + @subscribed_events += events + end + + attr_reader :subscribed_events + end + + def self.with_state(state_class) + + Module.new do + define_method :state do + @state ||= state_class.new + end + + def self.included(host_class) + host_class.include(ProcessMethods) + host_class.include(Infra::Retry) + host_class.extend(Subscriptions) + end + end + end + end +end