Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions ecommerce/processes/.mutant.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,10 @@ matcher:
- Processes::Configuration*
- Processes::OrderConfirmation#stream_name
- Processes::Test*
- Processes::ShipmentProcess*
- Processes::ReleasePaymentProcess*
- Processes::OrderItemInvoicingProcess*
- Processes::SyncShipmentFromPricing*
- Processes::SyncInventoryFromOrdering*
- Processes::NotifyPaymentsAboutOrderValue*
- Processes::ThreePlusOneFree*
- Processes::ReservationProcess*
- Processes::ReservationProcess::ProcessState#call
- Processes::DetermineVatRatesOnOrderPlaced*
5 changes: 4 additions & 1 deletion ecommerce/processes/lib/processes.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ def call(event_store, command_bus)
private

def enable_shipment_process(event_store, command_bus)
ShipmentProcess.new(event_store, command_bus)
event_store.subscribe(
ShipmentProcess.new(event_store, command_bus),
to: ShipmentProcess.subscribed_events
)
end

def enable_shipment_sync(event_store, command_bus)
Expand Down
122 changes: 51 additions & 71 deletions ecommerce/processes/lib/processes/reservation_process.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
module Processes
class ReservationProcess
include Infra::Retry

class SomeInventoryNotAvailable < StandardError
attr_reader :unavailable_products

Expand All @@ -10,112 +8,94 @@ def initialize(unavailable_products)
end
end

def initialize(event_store, command_bus)
@event_store = event_store
@command_bus = command_bus
class ProcessState < Data.define(:order, :order_lines)
def initialize(order: nil, order_lines: nil) = super

def reserved_product_ids = order_lines.keys
end
attr_accessor :event_store, :command_bus

def call(event)
state = build_state(event)
include Infra::ProcessManager.with_state(ProcessState)

subscribes_to(
Pricing::OfferAccepted,
Fulfillment::OrderCancelled,
Fulfillment::OrderConfirmed
)

def apply(event)
case event
when Pricing::OfferAccepted
update_order_state(state) { reserve_stock(state) }
state.with(
order: :accepted,
order_lines: event.data.fetch(:order_lines).map { |ol| [ol.fetch(:product_id), ol.fetch(:quantity)] }.to_h
)
when Fulfillment::OrderCancelled
release_stock(state)
state.with(order: :cancelled)
when Fulfillment::OrderConfirmed
dispatch_stock(state)
state.with(order: :confirmed)
end
end

def act
case state
in order: :accepted
begin
reserve_stock
rescue SomeInventoryNotAvailable => exc
reject_order(exc.unavailable_products)
else
accept_order
end
in order: :cancelled
release_stock(state.reserved_product_ids)
in order: :confirmed
dispatch_stock
else
end
end

private

def reserve_stock(state)
def reserve_stock
unavailable_products = []
reserved_products = []
state.order_lines.each do |product_id, quantity|
command_bus.(Inventory::Reserve.new(product_id: product_id, quantity: quantity))
state.product_reserved(product_id)
reserved_products << product_id
rescue Inventory::InventoryEntry::InventoryNotAvailable
unavailable_products << product_id
end

unless unavailable_products.empty?
release_stock(state)
if unavailable_products.any?
release_stock(reserved_products)
raise SomeInventoryNotAvailable.new(unavailable_products)
end
end

def update_order_state(state)
yield
accept_order(state)
rescue SomeInventoryNotAvailable => exc
reject_order(state, exc.unavailable_products)
end

def release_stock(state)
state.order_lines.slice(*state.reserved_product_ids).each do |product_id, quantity|
def release_stock(product_ids)
state.order_lines.slice(*product_ids).each do |product_id, quantity|
command_bus.(Inventory::Release.new(product_id: product_id, quantity: quantity))
end
end

def dispatch_stock(state)
def dispatch_stock
state.order_lines.each do |product_id, quantity|
command_bus.(Inventory::Dispatch.new(product_id: product_id, quantity: quantity))
end
end

def accept_order(state)
command_bus.(Fulfillment::RegisterOrder.new(order_id: state.order_id))
def accept_order
command_bus.(Fulfillment::RegisterOrder.new(order_id: id))
end

def reject_order(state, unavailable_product_ids)
def reject_order(unavailable_product_ids)
command_bus.(Pricing::RejectOffer.new(
order_id: state.order_id, reason: "Some products were unavailable", unavailable_product_ids: )
order_id: id, reason: "Some products were unavailable", unavailable_product_ids:)
)
end

def stream_name(order_id)
"ReservationProcess$#{order_id}"
end

def build_state(event)
stream_name = stream_name(event.data.fetch(:order_id))
past_events = nil
begin
with_retry do
past_events = event_store.read.stream(stream_name).to_a
last_stored = past_events.size - 1
event_store.link(event.event_id, stream_name: stream_name, expected_version: last_stored)
end
rescue RubyEventStore::EventDuplicatedInStream
return
end
ProcessState.new.tap do |state|
past_events.each { |ev| state.call(ev) }
state.call(event)
end
end

class ProcessState
def initialize()
@reserved_product_ids = []
end

attr_reader :order_id, :order_lines, :reserved_product_ids

def call(event)
case event
when Pricing::OfferAccepted
@order_lines = event.data.fetch(:order_lines).map { |ol| [ol.fetch(:product_id), ol.fetch(:quantity)] }.to_h
@order_id = event.data.fetch(:order_id)
when Fulfillment::OrderCancelled, Fulfillment::OrderConfirmed
@reserved_product_ids = order_lines.keys
end
end

def product_reserved(product_id)
reserved_product_ids << product_id
end
def fetch_id(event)
event.data.fetch(:order_id)
end
end
end
100 changes: 34 additions & 66 deletions ecommerce/processes/lib/processes/shipment_process.rb
Original file line number Diff line number Diff line change
@@ -1,83 +1,51 @@
module Processes
class ShipmentProcess
include Infra::Retry

def initialize(event_store, command_bus)
@event_store = event_store
@command_bus = command_bus
@event_store.subscribe(
self,
to: [
Shipping::ShippingAddressAddedToShipment,
Shipping::ShipmentSubmitted,
Fulfillment::OrderRegistered,
Fulfillment::OrderConfirmed
]
)
end

def call(event)
state = build_state(event)
submit_shipment(state) if state.submit?
authorize_shipment(state) if state.authorize?
class ProcessState < Data.define(:order, :shipment)
def initialize(order: nil, shipment: nil) = super
end

private

def submit_shipment(state)
command_bus.call(Shipping::SubmitShipment.new(order_id: state.order_id))
end
include Infra::ProcessManager.with_state(ProcessState)

def authorize_shipment(state)
command_bus.call(Shipping::AuthorizeShipment.new(order_id: state.order_id))
end
subscribes_to(
Shipping::ShippingAddressAddedToShipment,
Fulfillment::OrderRegistered,
Fulfillment::OrderConfirmed
)

attr_reader :command_bus, :event_store
private

def build_state(event)
with_retry do
stream_name = "ShipmentProcess$#{event.data.fetch(:order_id)}"
past_events = event_store.read.stream(stream_name).to_a
last_stored = past_events.size - 1
event_store.link(event.event_id, stream_name: stream_name, expected_version: last_stored)
ProcessState.new.tap do |state|
past_events.each { |ev| state.call(ev) }
state.call(event)
end
def apply(event)
case event
when Shipping::ShippingAddressAddedToShipment
state.with(shipment: :address_set)
when Fulfillment::OrderRegistered
state.with(order: :placed)
when Fulfillment::OrderConfirmed
state.with(order: :confirmed)
end
end

class ProcessState
def initialize
@order = :draft
@shipment = :draft
end

attr_reader :order_id

def call(event)
case event
when Shipping::ShippingAddressAddedToShipment
@shipment = :address_set
when Shipping::ShipmentSubmitted
@shipment = :submitted
when Fulfillment::OrderRegistered
@order = :placed
@order_id = event.data.fetch(:order_id)
when Fulfillment::OrderConfirmed
@order = :confirmed
end
def act
case state
in { shipment: :address_set, order: :placed }
submit_shipment
in { shipment: :address_set, order: :confirmed }
submit_shipment
authorize_shipment
else
end
end

def submit?
return false if @shipment == :submitted
def submit_shipment
command_bus.call(Shipping::SubmitShipment.new(order_id: id))
end

@shipment == :address_set && @order != :draft
end
def authorize_shipment
command_bus.call(Shipping::AuthorizeShipment.new(order_id: id))
end

def authorize?
@shipment == :address_set && @order == :confirmed
end
def fetch_id(event)
event.data.fetch(:order_id)
end
end
end
Loading