# Reading Messages
The reader is the low-level, fundamental data retrieval mechanism. It reads messages in order from a single message stream or category.
A reader reads raw message data. It doesn't convert message data into typed messages.
Note: The reader is rarely directly used in-practice except for certain circumstances where it might be leveraged in a utility script. In most cases, a reader is used indirectly by retrieving an entity from a store, or when a consumer retrieves and dispatches messages.
# Example
account_id = '123'
deposit = Deposit.new()
deposit.deposit_id = '456'
deposit.account_id = account_id
deposit.amount = 11
deposit.time = '2000-01-01T11:11:11.000Z'
command_stream_name = "account:command-#{account_id}"
Messaging::Postgres::Write.(deposit, command_stream_name)
MessageStore::Postgres::Read.(command_stream_name) do |message_data|
pp message_data.data
end
# => {:account_id=>"123", :deposit_id=>"456", :amount=>11, :time=>"2000-01-01T11:11:11.000Z"}
# Reader Facts
- Each message data is passed to the reader's block
- Readers don't return anything
- Readers retrieve messages in batches
- A reader reads events in order, and continues to read until the end of the stream
- A reader's batch size is configurable
- A reader's starting position is configurable
- A reader can be configured with a
conditionthat filters the messages retrieved based on a SQL condition - A reader can be configured with an existing session, or it can create a new session
# MessageStore::Postgres::Read Class
The Read class is a concrete class from the MessageStore::Postgres library and namespace.
The Read class provides:
- The principle instance actuator
.()(or thecallinstance method) for starting a reader - The class actuator
.()(or the classcallmethod) that provides a convenient invocation shortcut that does not require instantiating the reader class first
# Reading a Stream
A reader can be actuated either via its class interface, as a matter of convenience, or via its instance interface, which allows for greater control of the configuration of the reader.
Readers are implemented as callable objects. Actuating them is simply a matter of invoking their call method.
# Class Actuator
self.call(stream_name, position: 0, batch_size: 1000, condition: nil, session: nil, &action)
Parameters
| Name | Description | Type |
|---|---|---|
| stream_name | Name of the stream that the reader will read | String |
| position | Position of the message to start reading from | Integer |
| batch_size | Number of messages to retrieve with each query to the message store | Integer |
| condition | SQL condition to filter the batch by | String |
| session | An existing session object to use, rather than allowing the reader to create a new session | MessageStore::Postgres::Session |
| action | Block to be evaluated for each message read | Callable |
# Instance Actuator
call(&action)
Parameters
| Name | Description | Type |
|---|---|---|
| action | Block to be evaluated for each message read | Callable |
# Read Loop
# One Message per Loop
The action passed to the reader's actuator is processed once per message read, irrespective of batch size.
MessageStore::Postgres::Read.(stream_name) do |message_data|
# Block is evaluated once per message data read
end
# Reading in Batches
The reader retrieves messages in batches. The number of messages retrieved in each batch are specified using the reader actuator's batch_size parameter. The default batch size is 1000 messages.
MessageStore::Postgres::Read.(stream_name, batch_size: 10) do |message_data|
# ...
end
The action passed to the reader's actuator is evaluated once per message read. However, the reader doesn't query messages from the message store every time the block is evaluated.
A reader will retrieve a batch of messages, and then process each of those messages in-sequence by passing each message to the action block.
When the reader has completed the processing of a batch of messages, the reader retrieves another batch of messages. When there are no more messages to be processed, ie: when a retrieval of a batch yields no new results, the reader terminates.
# Terminating a Reader
A reader will terminate under two conditions:
- There are no more messages to be retrieved from the message store
- A
breakstatement is issued from within theactionblock passed to the reader
MessageStore::Postgres::Read.(stream_name) do |message_data|
break # The reading will stop here
end
# Filtering Messages with a SQL Condition
The reader can be given a SQL condition which further filters the messages read beyond selecting only the messages of the stream being read.
For example, the reader can read messages from someStream-123 whose position is 0.
Read.('someStream-123', condition: 'position = 0')
The above example isn't a realistic use of this feature. It's a contrived example merely intended to demonstrate the mechanics of use the SQL condition.
# Constructing a Reader
Readers can be constructed in one of two ways:
- Via the constructor
- Via the initializer
# Via the Constructor
The constructor not only instantiates the reader, but also invokes the reader's configure instance method, which constructs the reader's operational dependencies.
self.build(stream_name, position: 0, batch_size: 1000, condition: nil, session: nil)
Returns
Instance of the MessageStore::Postgres::Read class.
Parameters
| Name | Description | Type |
|---|---|---|
| stream_name | Name of stream that the reader will read | String |
| position | Position of the message to start reading from | Integer |
| batch_size | Number of messages to retrieve with each query to the message store | Integer |
| condition | SQL condition to filter the batch by | String |
| session | An existing session object to use, rather than allowing the reader to create a new session | MessageStore::Postgres::Session |
# Via the Initializer
self.initialize(stream_name, position, batch_size)
Returns
Instance of the MessageStore::Postgres::Read class.
Parameters
| Name | Description | Type |
|---|---|---|
| stream_name | Name of stream that the reader will read | String |
| position | Position of the message to start reading from | Integer |
| batch_size | Number of messages to retrieve with each query to the message store | Integer |
| session | An existing session object to use, rather than allowing the reader to create a new session | MessageStore::Postgres::Session |
By constructing a reader using the initializer, the reader's dependencies are not set to operational dependencies. They remain inert substitutes.
TIP
See the useful objects user guide for background on inert substitutes.
# Assigning a Reader as a Dependency
self.configure(receiver, stream_name, attr_name: :read, position: 0, batch_size: 1000, condition: nil, session: nil)
Constructs an instance of the reader and assigns it to the receiver's read attribute. By default, the receiving attribute's name is expected to be read, but it can be altered with the use of the attr_name parameter.
something = Something.new
Messaging::Postgres::Read.configure(something)
something.write
# => #<Messaging::Postgres::Read:0x...>
Parameters
| Name | Description | Type |
|---|---|---|
| receiver | The object that will receive the constructed reader | Object |
| attr_name | The receiver's attribute that will be assigned the constructed reader | Symbol |
| stream_name | Name of stream that the reader will read | String |
| position | Position of the message to start reading from | Integer |
| batch_size | Number of messages to retrieve with each query to the message store | Integer |
| condition | SQL condition to filter the batch by | String |
| session | An existing session object to use, rather than allowing the reader to create a new session | MessageStore::Postgres::Session |
TIP
See the useful objects user guide for background on configuring dependencies.
# Log Tags
The following tags are applied to log messages recorded by a reader:
| Tag | Description |
|---|---|
| read | Applied to all log messages recorded by a reader |
| message_store | Applied to all log messages recorded inside the MessageStore namespace |
The following tags may be applied to log messages recorded by a reader:
| Tag | Description |
|---|---|
| message_data | Applied to log messages that record the data content of read message data |
| data | Applied to log messages that record the data content of read message data |
See the logging user guide for more on log tags.