How to build a read model with Rails Event Store Projection

— Published originally on blog.arkency.com

Recently I faced interesting challenge in one of our customer's application. Imagine that you take a test after which you get a personalised reports about your skills level. Existing mechanism for that was time and resource consuming. People had to wait for e-mail delivery with PDF-generated report several hours due to several constraints, which I would prefer not to dive into.

The solution was obvious — lets progressively build read model every time someone answers the question. After the test is done, the report will be available instantly in a web ui.

Let's start with a domain event

module TestExecution
  class AnswerRegistered < ::Event
    attribute :participant_id, Types::Integer
    attribute :test_id, Types::Integer
    attribute :question_id, Types::Integer
    attribute :answer_id, Types::Integer
    attribute :skill_id, Types::Integer
    attribute :score, Types::Float
    attribute :time_elapsed, Types::Integer
  end
end

Nothing fancy, a typical domain event powered by Rails Event Store, with a schema defined, keeping identifiers of involved entities and score calculated by the domain service which publishes the event above when its job is done.

Build the read model

Next building block is the asynchronous handler. Why asynchronous? Not to waste time on participant's request—response cycle and lower their satisfaction from using our application:

module Reporting
  class CalculateparticipantReport < ApplicationJob
    prepend RailsEventStore::AsyncHandler

    def perform(event)
      participant_id = event.data.fetch(:participant_id)
      test_id        = event.data.fetch(:test_id)

      link_to_stream(event, test_id, participant_id)

      scores = calculate_scores(test_id, participant_id)

      ParticipantReport.write(
        *prepare_data_for_read_model(scores, test_id, participant_id)
      )
    end

    private

    def prepare_data_for_read_model(scores, test_id, participant_id)
      # magic happens, querying additional info, formatting data
      [scores, participant, test]
    end

    def calculate_scores(test_id, participant_id)
      RailsEventStore::Projection
        .from_stream(stream_name(test_id, participant_id))
        .init(-> { Hash.new { |scores, skill_id| scores[skill_id] = { score: 0, number_of_scores: 0 } })
        .when(
            SurveyExecution::AnswerRegistered,
            ->(state, event) do
              skill_id = event.data.fetch(:skill_id)
              state[skill_id][:score] += event.data.fetch(:score)
              state[skill_id][:number_of_scores] += 1
            end
          )
            .run(Rails.configuration.event_store)
            .reduce({}) do |scores, (skill_id, values)|
            scores[skill_id] = values[:score] / values[:n]
            scores
          end
    end

    def link_to_stream(event, test_id, participant_id)
      begin
        Rails.configuration.event_store.link(
          event.event_id,
          stream_name: stream_name(test_id, participant_id)
        )
      rescue RubyEventStore::EventDuplicatedInStream
      end
    end

    def stream_name(test_id, participant_id)
      "ParticipantReport$#{test_id}-#{participant_id}"
    end
  end
end

What happens here:

  1. AnswerRegistered event is linked to a dedicated report stream participantReport$123-456. By doing that, we can scope events in a way we desired, in our case, the stream contains id of a test and participant. RubyEventStore::EventDuplicatedInStream is being rescued to support deliver at least once strategy.
  2. Then, with the use of Projection reading from our dedicated stream ParticipantReport$123-456 all the scores are grouped by the skill_id, accumulated with additional info ( number of elements, specifically). After the projection is done, reduce is being used to do the math, resulting in average scores for each skill.
  3. When the scores are ready, further calculations come and additional info for read model (like skill names, etc.) is gathered and formatted. There's no need to use any other query to present it to the participant.

How the read model looks like?

id report_slug participant_name test_name skills
997 cf827527c552 Jane Doe Important skillz test [{name: 'Sleeping', average: '2.5', global: '2.2'}, #...]
998 6adb1fc1d201 Ugly Joe Programming skills assessment [{name: 'Ruby', average: '4.0', global: '2.0'}, #...]
999 4cece2d44ae0 Mr Kobayashi Smartness test [{name: 'Whatever', average: '5.0', global: '1.0'}, #...]

Vaughn Vernon in his „Implementing Domain-Driven Design” book describes read model this way:

The query model is a denormalized data model. It is not meant to deliver domain behavior, only data for display (and possibly reporting). If this data model is a SQL database, each table would hold the data for a single kind of client view (display). The table can have many columns, even a superset of those needed by any given user interface display view. Table views can be created from tables, each of which is used as a logical subset of the whole.

Denormalization is not a popular technique in the Rails world. What it gives? Complex, often many queries replaced with simple lookup for a single record which contains all the data to be displayed in a pre—formatted manner.

How to deal with concurrency issues

Please, have a look at the read model implementation:


module Reporting
  class ParticipantReport < ApplicationRecord
    def self.write(scores, participant, test)
      ApplicationRecord.transaction do
        advisory_lock(participant.id, test.id)

        report = find_or_initialize_by(participant_id: participant.id, test_id: test.id)

        report.slug = SecureRandom.hex(6)
        report.participant_name = participant.name
        report.scores = scores
        report.save!
      end
    end

    private_class_method def self.advisory_lock(participant_id, test_id)
       bigint = [participant_id, test_id].join.hash
       ApplicationRecord.connection.execute(
         "SELECT pg_advisory_xact_lock(#{bigint})"
       )
    end
  end
end

It's mostly obvious. One might think that, there's already with_lock or simply lock! method in ActiveRecord. Yes, it is. However, it won't work for the not–yet–existing records because it uses lock for update and on first write there's no update operation, but create. So, in many cases ActiveRecord::RecordNotUnique errors would appear if two or more concurrent threads would try to insert the row for the first time. Thanks to pg_advisory_xact_lock ( key bigint ) → void we can obtain an exclusive transaction-level advisory lock, waiting if necessary. Yet another reason to use PostgreSQL.

How to use the read model

# app/controllers/test_results_controller.rb
class TestResultsController < ApplicationController
  def show
    render locals: { report: ParticipantReport.find_by!(report_slug: params[:slug]) }
  end
end
\# app/views/test_results/show.html.erb
%h1
  Personalised report for #{report.participant_name}
%h2= report.test_name
- report.skills.each do |skill|
  %div
    Your performance in #{skill[:name]} is
    \#{skill[:average]} comparing to
    \#{skill[:global]} earned by others

But...

What if another field is required or there was a bug in the calculations? Not a problem, read models can be thrown out and rebuild with ease, because all the history behind them is known — thanks to domain events.

Btw. You might be also interested in other posts on read models on our blog.

Tags: · · ·

Subscribe to my Newsletter

Get the latest posts and exclusive content delivered right to your inbox

Avatar of Author

Szymon Fiedler

I solve problems. This is the place where I share my thoughts on Software Engineering.