Readers

The reader is the low-level, fundamental data retrieval mechanism. It reads messages in order from a single message stream or category.

A reader reads raw message data. It doesn't convert message data into typed messages.

Note: The reader is rarely directly used in-practice except for certain circumstances where it might be leveraged in a utility script. In most cases, a reader is used indirectly by retrieving an entity from a store, or when a consumer retrieves and dispatches messages.

Reader Example

account_id = '123'

deposit = Deposit.new()
deposit.deposit_id = '456'
deposit.account_id = account_id
deposit.amount = 11
deposit.time = '2000-01-01T11:11:11.000Z'

command_stream_name = "account:command-#{account_id}"

Messaging::Postgres::Write.(deposit, command_stream_name)

MessageStore::Postgres::Read.(command_stream_name) do |message_data|  pp message_data.dataend
# => {:account_id=>"123", :deposit_id=>"456", :amount=>11, :time=>"2000-01-01T11:11:11.000Z"}

Reader Facts

  • Each message data is passed to the reader's block
  • Readers don't return anything
  • Readers retrieve messages in batches
  • A reader's batch size is configurable
  • A reader's starting position is configurable
  • A reader reads events in order, and continues to read until the end of the stream
  • Readers can be configured with a condition that filters the messages retrieved
  • A reader can be configured with an existing session, or it can create a new session

MessageStore::Postgres::Read Class

The Read class is a concrete class from the MessageStore::Postgres library and namespace.

The Read class provides:

  • The principle instance actuator .() (or the call instance method) for starting a reader
  • The class actuator .() (or the class call method) that provides a convenient invocation shortcut that does not require instantiating the reader class first

Reading a Stream

A reader can be actuated either via its class interface, as a matter of convenience, or via its instance interface, which allows for greater control of the configuration of the reader.

Readers are implemented as callable objects. Actuating them is simply a matter of invoking their call method.

Class Actuator

self.call(stream_name, position: 0, batch_size: 1000, session: nil, &action)

Parameters

NameDescriptionType
stream_nameName of stream that the reader will readString
positionPosition of the message to start reading fromInteger
batch_sizeNumber of messages to retrieve with each query to the message storeInteger
sessionAn existing session object to use, rather than allowing the reader to create a new sessionMessageStore::Postgres::Session
actionBlock to be evaluated for each message readCallable

Instance Actuator

call(stream_name, &action)

Parameters

NameDescriptionType
stream_nameName of stream that the reader will readString
actionBlock to be evaluated for each message readCallable

Read Loop

One Message per Loop

The action passed to the reader's actuator is processed once per message read, irrespective of batch size.

MessageStore::Postgres::Read.(stream_name) do |message_data|
  # Block is evaluated once per message data read
end

Reading in Batches

The reader retrieves messages in batches. The number of messages retrieved in each batch are specified using the reader actuator's batch_size parameter.

The action passed to the reader's actuator is evaluated once per message read. However, the reader doesn't query messages from the message store every time the black is evaluated.

A reader will retrieve a batch of messages, and then process each of those messages in-sequence by passing each message to the action block.

When the reader has completed the processing of a batch of messages, the reader retrieves another batch of messages. When there are no more messages to be processed, ie: when a retrieval of a batch yields no new results, the reader terminates.

Terminating a Reader

A reader will terminate under two conditions:

  1. There are no more messages to be retrieved from the message store
  2. A break statement is issued from within the action block passed to the reader
MessageStore::Postgres::Read.(stream_name) do |message_data|
  break # The reading will stop here
end

Constructing a Reader

Readers can be constructed in one of two ways:

  • Via the constructor
  • Via the initializer

Via the Constructor

The constructor not only instantiates the reader, but also invokes the reader's configure instance method, which constructs the reader's operational dependencies.

self.build(stream_name, position: 0, batch_size: 1000, session: nil)

Returns

Instance of the MessageStore::Postgres::Read class.

Parameters

NameDescriptionType
stream_nameName of stream that the reader will readString
positionPosition of the message to start reading fromInteger
batch_sizeNumber of messages to retrieve with each query to the message storeInteger
sessionAn existing session object to use, rather than allowing the reader to create a new sessionMessageStore::Postgres::Session

Via the Initializer

self.initialize(stream_name, position, batch_size)

Returns

Instance of the MessageStore::Postgres::Read class.

Parameters

NameDescriptionType
stream_nameName of stream that the reader will readString
positionPosition of the message to start reading fromInteger
batch_sizeNumber of messages to retrieve with each query to the message storeInteger
sessionAn existing session object to use, rather than allowing the reader to create a new sessionMessageStore::Postgres::Session

By constructing a reader using the initializer, the reader's dependencies are not set to operational dependencies. They remain inert substitutes.

TIP

See the useful objects user guide for background on inert substitutes.

Assigning a Reader as a Dependency

self.configure(receiver, stream_name, attr_name: :read, position: 0, batch_size: 1000, session: nil)

Constructs an instance of the reader and assigns it to the receiver's read attribute. By default, the receiving attribute's name is expected to be read, but it can be altered with the use of the attr_name parameter.

something = Something.new
Messaging::Postgres::Read.configure(receiver)

something.write
# => #<Messaging::Postgres::Read:0x...>

Parameters

NameDescriptionType
receiverThe object that will receive the constructed readerObject
attr_nameThe receiver's attribute that will be assigned the constructed readerSymbol
stream_nameName of stream that the reader will readString
positionPosition of the message to start reading fromInteger
batch_sizeNumber of messages to retrieve with each query to the message storeInteger
sessionAn existing session object to use, rather than allowing the reader to create a new sessionMessageStore::Postgres::Session

TIP

See the useful objects user guide for background on configuring dependencies.