# Reading Messages
The reader is the low-level, fundamental data retrieval mechanism. It reads messages in order from a single message stream or category.
A reader reads raw message data. It doesn't convert message data into typed messages.
Note: The reader is rarely directly used in-practice except for certain circumstances where it might be leveraged in a utility script. In most cases, a reader is used indirectly by retrieving an entity from a store, or when a consumer retrieves and dispatches messages.
# Example
account_id = '123'
deposit = Deposit.new()
deposit.deposit_id = '456'
deposit.account_id = account_id
deposit.amount = 11
deposit.time = '2000-01-01T11:11:11.000Z'
command_stream_name = "account:command-#{account_id}"
Messaging::Postgres::Write.(deposit, command_stream_name)
MessageStore::Postgres::Read.(command_stream_name) do |message_data|
pp message_data.data
end
# => {:account_id=>"123", :deposit_id=>"456", :amount=>11, :time=>"2000-01-01T11:11:11.000Z"}
# Reader Facts
- Each message data is passed to the reader's block
- Readers don't return anything
- Readers retrieve messages in batches
- A reader reads events in order, and continues to read until the end of the stream
- A reader's batch size is configurable
- A reader's starting position is configurable
- A reader can be configured with a
condition
that filters the messages retrieved based on a SQL condition - A reader can be configured with an existing session, or it can create a new session
# MessageStore::Postgres::Read Class
The Read
class is a concrete class from the MessageStore::Postgres
library and namespace.
The Read
class provides:
- The principle instance actuator
.()
(or thecall
instance method) for starting a reader - The class actuator
.()
(or the classcall
method) that provides a convenient invocation shortcut that does not require instantiating the reader class first
# Reading a Stream
A reader can be actuated either via its class interface, as a matter of convenience, or via its instance interface, which allows for greater control of the configuration of the reader.
Readers are implemented as callable objects. Actuating them is simply a matter of invoking their call
method.
# Class Actuator
self.call(stream_name, position: 0, batch_size: 1000, condition: nil, session: nil, &action)
Parameters
Name | Description | Type |
---|---|---|
stream_name | Name of the stream that the reader will read | String |
position | Position of the message to start reading from | Integer |
batch_size | Number of messages to retrieve with each query to the message store | Integer |
condition | SQL condition to filter the batch by | String |
session | An existing session object to use, rather than allowing the reader to create a new session | MessageStore::Postgres::Session |
action | Block to be evaluated for each message read | Callable |
# Instance Actuator
call(&action)
Parameters
Name | Description | Type |
---|---|---|
action | Block to be evaluated for each message read | Callable |
# Read Loop
# One Message per Loop
The action
passed to the reader's actuator is processed once per message read, irrespective of batch size.
MessageStore::Postgres::Read.(stream_name) do |message_data|
# Block is evaluated once per message data read
end
# Reading in Batches
The reader retrieves messages in batches. The number of messages retrieved in each batch are specified using the reader actuator's batch_size
parameter. The default batch size is 1000 messages.
MessageStore::Postgres::Read.(stream_name, batch_size: 10) do |message_data|
# ...
end
The action
passed to the reader's actuator is evaluated once per message read. However, the reader doesn't query messages from the message store every time the block is evaluated.
A reader will retrieve a batch of messages, and then process each of those messages in-sequence by passing each message to the action
block.
When the reader has completed the processing of a batch of messages, the reader retrieves another batch of messages. When there are no more messages to be processed, ie: when a retrieval of a batch yields no new results, the reader terminates.
# Terminating a Reader
A reader will terminate under two conditions:
- There are no more messages to be retrieved from the message store
- A
break
statement is issued from within theaction
block passed to the reader
MessageStore::Postgres::Read.(stream_name) do |message_data|
break # The reading will stop here
end
# Filtering Messages with a SQL Condition
The reader can be given a SQL condition which further filters the messages read beyond selecting only the messages of the stream being read.
For example, the reader can read messages from someStream-123
whose position is 0.
Read.('someStream-123', condition: 'position = 0')
The above example isn't a realistic use of this feature. It's a contrived example merely intended to demonstrate the mechanics of use the SQL condition.
# Constructing a Reader
Readers can be constructed in one of two ways:
- Via the constructor
- Via the initializer
# Via the Constructor
The constructor not only instantiates the reader, but also invokes the reader's configure
instance method, which constructs the reader's operational dependencies.
self.build(stream_name, position: 0, batch_size: 1000, condition: nil, session: nil)
Returns
Instance of the MessageStore::Postgres::Read
class.
Parameters
Name | Description | Type |
---|---|---|
stream_name | Name of stream that the reader will read | String |
position | Position of the message to start reading from | Integer |
batch_size | Number of messages to retrieve with each query to the message store | Integer |
condition | SQL condition to filter the batch by | String |
session | An existing session object to use, rather than allowing the reader to create a new session | MessageStore::Postgres::Session |
# Via the Initializer
self.initialize(stream_name, position, batch_size)
Returns
Instance of the MessageStore::Postgres::Read
class.
Parameters
Name | Description | Type |
---|---|---|
stream_name | Name of stream that the reader will read | String |
position | Position of the message to start reading from | Integer |
batch_size | Number of messages to retrieve with each query to the message store | Integer |
session | An existing session object to use, rather than allowing the reader to create a new session | MessageStore::Postgres::Session |
By constructing a reader using the initializer, the reader's dependencies are not set to operational dependencies. They remain inert substitutes.
TIP
See the useful objects user guide for background on inert substitutes.
# Assigning a Reader as a Dependency
self.configure(receiver, stream_name, attr_name: :read, position: 0, batch_size: 1000, condition: nil, session: nil)
Constructs an instance of the reader and assigns it to the receiver's read
attribute. By default, the receiving attribute's name is expected to be read
, but it can be altered with the use of the attr_name
parameter.
something = Something.new
Messaging::Postgres::Read.configure(something)
something.write
# => #<Messaging::Postgres::Read:0x...>
Parameters
Name | Description | Type |
---|---|---|
receiver | The object that will receive the constructed reader | Object |
attr_name | The receiver's attribute that will be assigned the constructed reader | Symbol |
stream_name | Name of stream that the reader will read | String |
position | Position of the message to start reading from | Integer |
batch_size | Number of messages to retrieve with each query to the message store | Integer |
condition | SQL condition to filter the batch by | String |
session | An existing session object to use, rather than allowing the reader to create a new session | MessageStore::Postgres::Session |
TIP
See the useful objects user guide for background on configuring dependencies.
# Log Tags
The following tags are applied to log messages recorded by a reader:
Tag | Description |
---|---|
read | Applied to all log messages recorded by a reader |
message_store | Applied to all log messages recorded inside the MessageStore namespace |
The following tags may be applied to log messages recorded by a reader:
Tag | Description |
---|---|
message_data | Applied to log messages that record the data content of read message data |
data | Applied to log messages that record the data content of read message data |
See the logging user guide for more on log tags.