# Batch Retrieval
The MessageStore::Postgres::Get class is a utility that retrieves a single batch of messages from a stream.
The Get class behaves similarly to a reader, except that it doesn't continually retrieve subsequent batches of messages. It retrieves a single batch and terminates.
The reader uses the Get class to retrieve messages. While it's mostly intended for internal use, it an be useful when building tools or any time that directly retrieving a batch of messages is necessary.
There are two implementations of the Get class: MessageStore::Postgres::Get::Stream and MessageStore::Postgres::Get::Category. Each implementation is deferred to by MessageStore::Postgres::Get to effect the retrieval of batches of messages from streams and categories, respectively.
# Example
deposit_1 = Deposited.new()
deposit_1.deposit_id = '456'
deposit_2 = Deposited.new()
deposit_2.deposit_id = '789'
stream_name = 'account-123'
Messaging::Postgres::Write.(deposit_1, stream_name)
Messaging::Postgres::Write.(deposit_2, stream_name)
messages = MessageStore::Postgres::Get.(stream_name)
messages.length
# => 2
messages[0].deposit_id
# => 456
messages[0].deposit_id
# => 786
# MessageStore::Postgres::Get Facts
- The
Getclass returns a single batch of message data - The
Getclass can retrieve either from streams or categories - There are two separate implementations of the
Getclass that are specialized for retrieving from either streams or categories:Get::StreamandGet::Category - A
Getcan be configured with an existing session, or it can create a new session - A
Getinstance's batch size is configurable - A
Getinstance's starting position is configurable Getcan be configured with acorrelationthat filters the messages retrieved based on a the value of a message matadata's correlation stream attributeGetcan be configured with consumer group parameters for partitioning message streams for parallel processing based on a consistent hash of the stream name (category retrieval only)Getcan be configured with aconditionthat filters the messages retrieved based on a SQL condition- A
Getinstance can be configured with an existing session, or it can create a new session - Two concrete classes implement the specific retrieval of batches of messages from streams and categories:
MessageStore::Postgres::Get::StreamandMessageStore::Postgres::Get::Category
# MessageStore::Postgres::Get Module
The Get module is an abstract module from the MessageStore::Postgres library and namespace.
The Get module provides:
- The class actuator
.()(or the classcallmethod) that provides a convenient invocation shortcut that does not require instantiating a concrete implementation of theGetmodule. - An outer namespace and factory for the implementation classes,
MessageStore::Postgres::Get::StreamandMessageStore::Postgres::Get::Category
# Retrieving a Batch
The Get module is implemented as a callable object. Actuating it is simply a matter of invoking it's call method.
The actual retrieval work is done by the implementation classes, MessageStore::Postgres::Get::Stream and MessageStore::Postgres::Get::Category.
# Class Actuator
self.call(stream_name, position: 0, batch_size: 1000, correlation: nil, consumer_group_member: nil, consumer_group_size: nil, condition: nil, session: nil)
Returns
Array of MessageStore::MessageData::Read instances.
Parameters
| Name | Description | Type |
|---|---|---|
| stream_name | Name of stream to retrieve message data from | String |
| position | Position of the message to start retrieving from | Integer |
| batch_size | Number of messages to retrieve from the message store | Integer |
| correlation | Category recorded in message metadata's correlation_stream_name attribute to filter the batch by | String |
| consumer_group_member | The zero-based member number of an individual consumer that is participating in a consumer group | Integer |
| consumer_group_size | The size of a group of consumers that are cooperatively processing a single category | Integer |
| condition | SQL condition to filter the batch by | String |
| session | An existing session object to use, rather than allowing the reader to create a new session | MessageStore::Postgres::Session |
The class actuator both constructs and actuates the implementation of Get appropriate to the stream being queried.
# Instance Actuator
The instance actuator of both the MessageStore::Postgres::Get::Stream and MessageStore::Postgres::Get::Category implementations accept the position to start fetching messages from. The default value of the position varies based on whether the stream name is a stream or a category.
# MessageStore::Postgres::Get::Stream
call(position=0)
# MessageStore::Postgres::Get::Category
call(position=1)
Returns
Array of MessageStore::MessageData::Read instances.
Parameters
| Name | Description | Type |
|---|---|---|
| position | Position of the message to start retrieving from | Integer |
# Pub/Sub and Retrieving Correlated Messages
The principle use of the correlation parameter is to implement Pub/Sub.
WARNING
Correlation works only with the retrieval of messages from a category. A MessageStore::Postgres::Get::Category::Error error will be raised if consumer group parameters are sent with a retrieval of a stream rather than a category.
The correlation parameter filters the retrieved batch based on the content of message metadata's correlation_stream_name attribute. The correlation stream name is like a return address. It's a way to give the message some information about the component that the message originated from. This information is carried from message to message in a workflow until it ultimately returns to the originating component.
The correlation_stream_name attribute allows a component to tag an outbound message with its origin. And then later, the originating component can subscribe to other components' events that carry the origin metadata.
Before the source component sends the message to the receiving component, the source component assigns it's own stream name to the message metadata's correlation_stream_name attribute. That attribute is carried from message to message through messaging workflows.
destination_stream_name = 'otherComponent-123'
correlation_stream_name = 'thisComponent-789'
command = SomeCommand.new
command.metadata.correlation_stream_name = correlation_stream_name
write.(command, destination_stream_name)
To retrieve messages that are correlated to the thisComponent category, the correlation parameter is used.
Get.('otherComponent-123', correlation: 'thisComponent')
WARNING
Note that value of the correlation argument must be a category and not a full stream name. An MessageStore::Postgres::Get::Category::Correlation::Error error will be raised if the value is set to a stream name.
For more details on pub/sub using the correlation stream, see the pub/sub topic in the consumers user guide.
# Consumer Groups
Consumers processing a single category can be operated in parallel in a consumer group. Consumer groups provide a means of scaling horizontally to distribute the processing load of a single category amongst a number of consumers.
WARNING
Consumer groups work only with the retrieval of messages from a category. A MessageStore::Postgres::Get::Category::ConsumerGroup::Error error will be raised if consumer group parameters are sent with a retrieval of a stream rather than a category.
Consumers operating in consumer groups process a single category, with each consumer in the group processing messages that are not processed by any other consumer in the group.
Specify both the consumer_group_member argument and the consumer_group_size argument to retrieve a batch of messages for a specific member of a user group. The consumer_group_size argument specifies the total number of consumers participating in the group. The consumer_group_member argument specifies the unique ordinal ID of a consumer. A consumer group with three members will have a group_size of 3, and will have members with group_member numbers 0, 1, and 2.
Get.('someCategory', consumer_group_member => 0, consumer_group_size => 3);
Consumer groups ensure that any given stream is processed by a single consumer, and that the consumer processing the stream is always the same consumer. This is achieved by the consistent hashing of a message's stream name.
A stream name's cardinal ID is hashed to a 64-bit integer, and the modulo of that number by the consumer group size yields a consumer group member number that will consistently process that stream name.
Specifying values for the consumer_group_size and consumer_group_member consumer causes the query for messages to include a condition that is based on the hash of the stream name, the modulo of the group size, and the consumer member number.
WHERE @hash_64(cardinal_id(stream_name)) % {group_size} = {group_member}
# Filtering Messages with a SQL Condition
The condition parameter receives an arbitrary SQL condition which further filters the messages retrieved.
Get.('someStream-123', condition: 'extract(month from messages.time) = extract(month from now())')
WARNING
The SQL condition feature is deactivated by default. The feature is activated using the message_store.sql_condition Postgres configuration option: message_store.sql_condition=on. A MessageStore::Postgres::Get::Condition::Error error will be raise if the feature is used without activating the configuration option. See the PostgreSQL documentation for more on configuration options: https://www.postgresql.org/docs/current/config-setting.html (opens new window)
DANGER
Activating the SQL condition feature may expose the message store to unforeseen security risks. Before activating this condition, be certain that access to the message store is appropriately protected.
# Constructing a Get Implementation Using the Abstract Constructor
The two implementations of the Get module, MessageStore::Postgres::Get::Stream and MessageStore::Postgres::Get::Category, can be constructed using the build constructor. Depending on whether the value of the stream_name argument is a stream or a category, the appropriate implementation will be constructed.
self.build(stream_name, batch_size: 1000, correlation: nil, consumer_group_member: nil, consumer_group_size: nil, condition: nil, session: nil)
Returns
Instance of the MessageStore::Postgres::Get::Stream class if the value of the stream_name argument is a stream, or an instance of the MessageStore::Postgres::Get::Category class if the value of the stream_name argument is a category.
Parameters
| Name | Description | Type |
|---|---|---|
| stream_name | Name of stream to retrieve message data from | String |
| batch_size | Number of messages to retrieve from the message store | Integer |
| correlation | Category recorded in message metadata's correlation_stream_name attribute to filter the batch by | String |
| consumer_group_member | The zero-based member number of an individual consumer that is participating in a consumer group | Integer |
| consumer_group_size | The size of a group of consumers that are cooperatively processing a single category | Integer |
| condition | SQL condition to filter the batch by | String |
| session | An existing session object to use, rather than allowing the reader to create a new session | MessageStore::Postgres::Session |
# Constructing a MessageStore::Postgres::Get::Stream
A MessageStore::Postgres::Get::Stream class can be constructed in one of two ways:
- Via the constructor
- Via the initializer
# Via the Constructor
The constructor not only instantiates the MessageStore::Postgres::Get::Stream, but also invokes its configure instance method, which constructs the instance's operational dependencies.
self.build(stream_name, batch_size: 1000, condition: nil, session: nil)
Returns
Instance of the MessageStore::Postgres::Get::Stream class.
Parameters
| Name | Description | Type |
|---|---|---|
| stream_name | Name of stream to retrieve message data from | String |
| batch_size | Number of messages to retrieve from the message store | Integer |
| condition | SQL condition to filter the batch by | String |
| session | An existing session object to use, rather than allowing the reader to create a new session | MessageStore::Postgres::Session |
# Via the Initializer
self.initialize(stream_name, batch_size, condition)
Returns
Instance of the MessageStore::Postgres::Get::Stream class.
Parameters
| Name | Description | Type |
|---|---|---|
| stream_name | Name of stream to retrieve message data from | String |
| batch_size | Number of messages to retrieve from the message store | Integer |
| condition | SQL condition to filter the batch by | String |
By constructing the MessageStore::Postgres::Get::Stream instance using the initializer, the instance's dependencies are not set to operational dependencies. They remain inert substitutes.
TIP
See the useful objects user guide for background on inert substitutes.
# Constructing a MessageStore::Postgres::Get::Category
A MessageStore::Postgres::Get::Category class can be constructed in one of two ways:
- Via the constructor
- Via the initializer
# Via the Constructor
The constructor not only instantiates the MessageStore::Postgres::Get::Category, but also invokes its configure instance method, which constructs the instance's operational dependencies.
self.build(stream_name, batch_size: 1000, correlation: nil, consumer_group_member: nil, consumer_group_size: nil, condition: nil, session: nil)
Returns
Instance of the MessageStore::Postgres::Get::Category class.
Parameters
| Name | Description | Type |
|---|---|---|
| stream_name | Name of stream to retrieve message data from | String |
| batch_size | Number of messages to retrieve from the message store | Integer |
| correlation | Category recorded in message metadata's correlation_stream_name attribute to filter the batch by | String |
| consumer_group_member | The zero-based member number of an individual consumer that is participating in a consumer group | Integer |
| consumer_group_size | The size of a group of consumers that are cooperatively processing a single category | Integer |
| condition | SQL condition to filter the batch by | String |
| session | An existing session object to use, rather than allowing the reader to create a new session | MessageStore::Postgres::Session |
# Via the Initializer
self.initialize(category, batch_size, correlation, consumer_group_member, consumer_group_size, condition)
Returns
Instance of the MessageStore::Postgres::Get::Category class.
Parameters
| stream_name | Name of stream to retrieve message data from | String |
| batch_size | Number of messages to retrieve from the message store | Integer |
| correlation | Category recorded in message metadata's correlation_stream_name attribute to filter the batch by | String |
| consumer_group_member | The zero-based member number of an individual consumer that is participating in a consumer group | Integer |
| consumer_group_size | The size of a group of consumers that are cooperatively processing a single category | Integer |
| condition | SQL condition to filter the batch by | String |
By constructing the MessageStore::Postgres::Get::Category instance using the initializer, the instance's dependencies are not set to operational dependencies. They remain inert substitutes.
TIP
See the useful objects user guide for background on inert substitutes.
# Assigning a Get Implementation as a Dependency
The two implementations of the Get module, MessageStore::Postgres::Get::Stream and MessageStore::Postgres::Get::Category, can be assigned as a dependency using the configure class method. Depending on whether the value of the stream_name argument is a stream name or a category, the appropriate implementation will be constructed and assigned to the receiver's get attribute. By default, the receiving attribute's name is expected to be get, but it can be altered with the use of the attr_name parameter.
self.configure(receiver, stream_name, attr_name: :get, batch_size: 1000, correlation: nil, consumer_group_member: nil, consumer_group_size: nil, condition: nil, session: nil)
# Stream
stream_name = 'someStream-123'
something = Something.new
Messaging::Postgres::Get.configure(something, stream_name)
something.get
# => #<Messaging::Postgres::Get::Stream:0x...>
# Category
stream_name = 'someStream'
something = Something.new
Messaging::Postgres::Get.configure(something, stream_name)
something.get
# => #<Messaging::Postgres::Get::Category:0x...>
Parameters
| Name | Description | Type |
|---|---|---|
| receiver | The object that will receive the constructed Get | Object |
| stream_name | Name of stream to retrieve message data from | String |
| attr_name | The receiver's attribute that will be assigned the constructed Get | Symbol |
| batch_size | Number of messages to retrieve from the message store | Integer |
| correlation | Category recorded in message metadata's correlation_stream_name attribute to filter the batch by | String |
| consumer_group_member | The zero-based member number of an individual consumer that is participating in a consumer group | Integer |
| consumer_group_size | The size of a group of consumers that are cooperatively processing a single category | Integer |
| condition | SQL condition to filter the batch by | String |
| session | An existing session object to use, rather than allowing the reader to create a new session | MessageStore::Postgres::Session |
TIP
See the useful objects user guide for background on configuring dependencies.
# Assigning a MessageStore::Postgres::Get::Stream as a Dependency
self.configure(receiver, stream_name, attr_name: :get, batch_size: 1000, condition: nil, session: nil)
# Assigning a MessageStore::Postgres::Get::Category as a Dependency
self.configure(receiver, stream_name, attr_name: :get, batch_size: 1000, correlation: nil, consumer_group_member: nil, consumer_group_size: nil, condition: nil, session: nil)
# Log Tags
The following tags are applied to log messages recorded by a Get instance:
| Tag | Description |
|---|---|
| get | Applied to all log messages recorded by an instance of a Get implementation |
| message_store | Applied to all log messages recorded inside the MessageStore namespace |
The following tags may be applied to log messages recorded by a Get instance:
| Tag | Description |
|---|---|
| message_data | Applied to log messages that record the data content of retrieved message data |
| data | Applied to log messages that record the data content of retrieved message data |
See the logging user guide for more on log tags.