Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ruby_event_store-active_record/.mutant.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ matcher:
- RubyEventStore::ActiveRecord::Event.hook_attribute_type
- RubyEventStore::ActiveRecord::DoubleSerializationDetector
- RubyEventStore::ActiveRecord::EventRepositoryReader#initialize
- RubyEventStore::ActiveRecord::EventRepository#initialize
- RubyEventStore::ActiveRecord::EventRepository#model_klasses
- RubyEventStore::ActiveRecord::EventRepository#json_data_type?
- RubyEventStore::ActiveRecord::EventRepository#rescue_from_double_json_serialization!
- RubyEventStore::ActiveRecord::SkipJsonSerialization
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
require_relative "active_record/index_violation_detector"
require_relative "active_record/pg_linearized_event_repository"
require_relative "active_record/version"
require_relative "active_record/skip_json_serialization"
require_relative "active_record/event"
require_relative "active_record/railtie" if defined?(Rails::Engine)
if defined?(Rails::Engine)
require_relative "active_record/railtie"
else
require_relative "active_record/skip_json_serialization"
require_relative "active_record/event"
end
Original file line number Diff line number Diff line change
Expand Up @@ -7,39 +7,14 @@ class EventRepository

def initialize(model_factory: WithDefaultModels.new, serializer:)
@serializer = serializer
@event_klass, @stream_klass = model_factory.call
if serializer == NULL && json_data_type?
warn <<~MSG
The data or metadata column is of a JSON/B type and expects a JSON string.

Yet the repository serializer is configured as #{serializer} and it would not
produce the expected JSON string.

In ActiveRecord there's an implicit serialization to JSON for JSON/B column types
that made it work so far. This behaviour is unfortunately also a source of undesired
double serialization — first in the EventRepository, second in the ActiveRecord.

In the past we've advised workarounds that introduced configuration incosistency
with other data types and serialization formats, i.e. explicitly passing NULL serializer
just for the JSON/B data types.

As of now this special ActiveRecord behaviour is disabled. You should be using JSON
serializer back again:

RubyEventStore::ActiveRecord::EventRepository.new(serializer: JSON)
MSG
else
@event_klass.include(SkipJsonSerialization)
end
@repo_reader = EventRepositoryReader.new(@event_klass, @stream_klass, serializer)
@index_violation_detector = IndexViolationDetector.new(@event_klass.table_name, @stream_klass.table_name)
@model_factory = model_factory
end

def rescue_from_double_json_serialization!
if @serializer == JSON && json_data_type?
@repo_reader.instance_eval { alias __record__ record }
if @serializer == JSON && json_data_type?(event_klass)
repo_reader.instance_eval { alias __record__ record }

@repo_reader.define_singleton_method :unwrap do |column_name, payload|
repo_reader.define_singleton_method :unwrap do |column_name, payload|
if String === payload && payload.start_with?("\{")
warn "Double serialization of #{column_name} column detected"
@serializer.load(payload)
Expand All @@ -48,7 +23,7 @@ def rescue_from_double_json_serialization!
end
end

@repo_reader.define_singleton_method :record do |record|
repo_reader.define_singleton_method :record do |record|
r = __record__(record)

Record.new(
Expand Down Expand Up @@ -76,31 +51,31 @@ def link_to_stream(event_ids, stream, expected_version)
end

def delete_stream(stream)
@stream_klass.where(stream: stream.name).delete_all
stream_klass.where(stream: stream.name).delete_all
end

def has_event?(event_id)
@repo_reader.has_event?(event_id)
repo_reader.has_event?(event_id)
end

def last_stream_event(stream)
@repo_reader.last_stream_event(stream)
repo_reader.last_stream_event(stream)
end

def read(specification)
@repo_reader.read(specification)
repo_reader.read(specification)
end

def count(specification)
@repo_reader.count(specification)
repo_reader.count(specification)
end

def update_messages(records)
hashes = records.map { |record| upsert_hash(record, record.serialize(@serializer)) }
for_update = records.map(&:event_id)
start_transaction do
existing =
@event_klass
event_klass
.where(event_id: for_update)
.pluck(:event_id, :id, :created_at)
.reduce({}) { |acc, (event_id, id, created_at)| acc.merge(event_id => [id, created_at]) }
Expand All @@ -109,31 +84,31 @@ def update_messages(records)
h[:id] = existing.fetch(h.fetch(:event_id)).at(0)
h[:created_at] = existing.fetch(h.fetch(:event_id)).at(1)
end
@event_klass.upsert_all(hashes)
event_klass.upsert_all(hashes)
end
end

def streams_of(event_id)
@repo_reader.streams_of(event_id)
repo_reader.streams_of(event_id)
end

def position_in_stream(event_id, stream)
@repo_reader.position_in_stream(event_id, stream)
repo_reader.position_in_stream(event_id, stream)
end

def global_position(event_id)
@repo_reader.global_position(event_id)
repo_reader.global_position(event_id)
end

def event_in_stream?(event_id, stream)
@repo_reader.event_in_stream?(event_id, stream)
repo_reader.event_in_stream?(event_id, stream)
end

private

def add_to_stream(event_ids, stream, expected_version)
last_stream_version = ->(stream_) do
@stream_klass.where(stream: stream_.name).order("position DESC").first.try(:position)
stream_klass.where(stream: stream_.name).order("position DESC").first.try(:position)
end
resolved_version = expected_version.resolve_for(stream, last_stream_version)

Expand All @@ -148,7 +123,7 @@ def add_to_stream(event_ids, stream, expected_version)
created_at: Time.now.utc,
}
end
@stream_klass.insert_all!(in_stream) unless stream.global?
stream_klass.insert_all!(in_stream) unless stream.global?
end
self
rescue ::ActiveRecord::RecordNotUnique => e
Expand All @@ -165,7 +140,7 @@ def compute_position(resolved_version, index)
end

def detect_index_violated(message)
@index_violation_detector.detect(message)
index_violation_detector.detect(message)
end

def insert_hash(record, serialized_record)
Expand Down Expand Up @@ -194,11 +169,11 @@ def optimize_timestamp(valid_at, created_at)
end

def start_transaction(&block)
@event_klass.transaction(requires_new: true, &block)
event_klass.transaction(requires_new: true, &block)
end

def link_to_stream_(event_ids, stream, expected_version)
(event_ids - @event_klass.where(event_id: event_ids).pluck(:event_id)).each { |id| raise EventNotFound.new(id) }
(event_ids - event_klass.where(event_id: event_ids).pluck(:event_id)).each { |id| raise EventNotFound.new(id) }
add_to_stream(event_ids, stream, expected_version)
end

Expand All @@ -209,11 +184,50 @@ def append_to_stream_(records, stream, expected_version)
hashes << insert_hash(record, record.serialize(@serializer))
event_ids << record.event_id
end
add_to_stream(event_ids, stream, expected_version) { @event_klass.insert_all!(hashes) }
add_to_stream(event_ids, stream, expected_version) { event_klass.insert_all!(hashes) }
end

def model_klasses
@model_klasses ||= @model_factory.call.tap do |event_model, stream_model|
if @serializer == NULL && json_data_type?(event_model)
warn <<~MSG
The data or metadata column is of a JSON/B type and expects a JSON string.

Yet the repository serializer is configured as #{@serializer} and it would not
produce the expected JSON string.

In ActiveRecord there's an implicit serialization to JSON for JSON/B column types
that made it work so far. This behaviour is unfortunately also a source of undesired
double serialization — first in the EventRepository, second in the ActiveRecord.

In the past we've advised workarounds that introduced configuration incosistency
with other data types and serialization formats, i.e. explicitly passing NULL serializer
just for the JSON/B data types.

As of now this special ActiveRecord behaviour is disabled. You should be using JSON
serializer back again:

RubyEventStore::ActiveRecord::EventRepository.new(serializer: JSON)
MSG
else
event_model.include(SkipJsonSerialization)
end
end
end

def event_klass = model_klasses.first
def stream_klass = model_klasses.last

def repo_reader
@repo_reader ||= EventRepositoryReader.new(event_klass, stream_klass, @serializer)
end

def index_violation_detector
@index_violation_detector ||= IndexViolationDetector.new(event_klass.table_name, stream_klass.table_name)
end

def json_data_type?
%i[data metadata].any? { |attr| @event_klass.column_for_attribute(attr).type.start_with?("json") }
def json_data_type?(klass)
%i[data metadata].any? { |attr| klass.column_for_attribute(attr).type.start_with?("json") }
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@
module RubyEventStore
module ActiveRecord
class Railtie < ::Rails::Railtie
initializer "ruby_event_store-active_record" do
ActiveSupport.on_load(:active_record) do
require_relative "../active_record/skip_json_serialization"
require_relative "../active_record/event"
end
end
end
end
end
30 changes: 16 additions & 14 deletions ruby_event_store-active_record/spec/event_repository_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ module ActiveRecord

specify "use default models" do
repository = EventRepository.new(serializer: mk_serializer.call)
expect(repository.instance_variable_get(:@event_klass)).to be(Event)
expect(repository.instance_variable_get(:@stream_klass)).to be(EventInStream)
expect(repository.send(:event_klass)).to be(Event)
expect(repository.send(:stream_klass)).to be(EventInStream)
end

specify "allows custom base class" do
Expand All @@ -86,8 +86,8 @@ module ActiveRecord
model_factory: WithAbstractBaseClass.new(CustomApplicationRecord),
serializer: mk_serializer.call,
)
expect(repository.instance_variable_get(:@event_klass).ancestors).to include(CustomApplicationRecord)
expect(repository.instance_variable_get(:@stream_klass).ancestors).to include(CustomApplicationRecord)
expect(repository.send(:event_klass).ancestors).to include(CustomApplicationRecord)
expect(repository.send(:stream_klass).ancestors).to include(CustomApplicationRecord)
end

specify "reading/writting works with custom base class" do
Expand Down Expand Up @@ -286,21 +286,23 @@ module ActiveRecord
specify "JSON/B backwards compatibility — explicit NULL serializer as advised before introduction of JSONClient" do
skip unless %w[json jsonb].include?(ENV["DATA_TYPE"])

expect { repository = EventRepository.new(serializer: NULL) }.to output(<<~MSG).to_stderr
The data or metadata column is of a JSON/B type and expects a JSON string.
expect { EventRepository.new(serializer: NULL) }.not_to output.to_stderr

Yet the repository serializer is configured as RubyEventStore::NULL and it would not
produce the expected JSON string.
expect { repository = EventRepository.new(serializer: NULL).has_event?(SecureRandom.uuid) }.to output(<<~MSG).to_stderr
The data or metadata column is of a JSON/B type and expects a JSON string.

In ActiveRecord there's an implicit serialization to JSON for JSON/B column types
that made it work so far. This behaviour is unfortunately also a source of undesired
Yet the repository serializer is configured as RubyEventStore::NULL and it would not
produce the expected JSON string.

In ActiveRecord there's an implicit serialization to JSON for JSON/B column types
that made it work so far. This behaviour is unfortunately also a source of undesired
double serialization — first in the EventRepository, second in the ActiveRecord.
In the past we've advised workarounds that introduced configuration incosistency
with other data types and serialization formats, i.e. explicitly passing NULL serializer

In the past we've advised workarounds that introduced configuration incosistency
with other data types and serialization formats, i.e. explicitly passing NULL serializer
just for the JSON/B data types.

As of now this special ActiveRecord behaviour is disabled. You should be using JSON
As of now this special ActiveRecord behaviour is disabled. You should be using JSON
serializer back again:

RubyEventStore::ActiveRecord::EventRepository.new(serializer: JSON)
Expand Down