# Consumers

A consumer continuously reads messages from a single category and dispatches the messages to the handlers that have been added to the consumer.

Many consumers can be hosted together in a single service, allowing a component to be fed messages from many categories.

A consumer keeps track of the progress that it has made through the category stream that it's reading, allowing a consumer to pick up where it left off after it has been restarted.

It controls polling rates, pre-fetching batches of messages, the dispatching of messages to handlers, and the storage of message positions of messages that have already been read.

When a consumer has read through its category's messages, it continues reading, waiting for new messages to arrive.

In messaging parlance, a consumer acts as a subscriber.

# Example

class Consumer
  include Consumer::Postgres

  identifier 'someUniqueIdentifier' # Note: This is optional

  handler SomeHandler
  handler SomeOtherHandler
end

# Consumer Facts

  • A consumer reads from a single category stream
  • A consumer has one or more handlers
  • Messages are dispatched to a consumer's handlers in the order that they are declared
  • The consumer periodically records its reader's position to the message store
  • Messages are retrieved in batches whose size can be configured
  • When there are no messages retrieved the consumer polls the message store
  • The polling interval is configurable
  • A consumer can be configured with a correlation value filters based on messages' correlation stream name
  • A consumer can be configured with consumer group parameters for partitioning category streams for parallel processing by multiple consumers based on a consistent hash of the category name
  • A consumer can be configured with a condition that filters the messages retrieved

# Consumer::Postgres Module

A class becomes a consumer by including the Consumer::Postgres module from the Consumer::Postgres library and namespace.

The Consumer::Postgres module affords the receiver with:

  • The start method for starting a consumer and supplying it with arguments that control its behavior
  • The handler class macro used for adding handlers to a consumer
  • The identifier class macro used to declare an arbitrary string suffix used to compose a distinct position stream name used by the position store to store consumer position records

# Registering a Handler

A handler is registered with the handler macro.

handler SomeHandler

The argument is a class name of a handler that the consumer will dispatch messages to.

Each handler is added to the consumer class's handler_registry.

If a handler is registered more than once, the Consumer::HandlerRegistry::Error will be raised when the consumer class is loaded.

# Starting a Consumer

Start a consumer by invoking a consumer class's start method, passing the category name that the consumer will read messages from.

SomeConsumer.start('someCategory')

WARNING

An error will be raised and the consumer will terminate if the consumer is started with a stream name rather than a category.

self.start(category, poll_interval_milliseconds: 100, batch_size: 1000, position_update_interval: 100, identifier: nil, correlation: nil, group_member: nil, group_size: nil, condition: nil, settings: nil)

Parameters

Name Description Type
category The name of the category that the consumer will read String
poll_interval_milliseconds The frequency, in milliseconds, with which the consumer polls the message store for new messages Integer
batch_size The number of messages to retrieve in each batch fetched from the message store Integer
position_update_interval The frequency with which progress that the consumer has made through the input category is recorded by the position store Integer
identifier Qualifier appended to the consumer's position stream name String
correlation A category name used to restrict the messages consumed to those whose correlation stream is in the specified correlation category (this feature is used to effect pub/sub) String
group_member The member number of an individual consumer that is participating in a consumer group Integer
group_size The size of a group of consumers that are cooperatively processing a single category Integer
condition SQL condition that filters the messages of the category that are read String
settings Settings that can configure a session object for the consumer to use, rather than the default settings read from settings/message_store_postgres.json Settings

# Consumers Must Be Started Within an Active Actor Supervisor

The threading model of actors requires that a consumer be started within an active actor supervisor. If a consumer is started without being under supervision, its reader will not start, and the consumer will not dispatch messages to its handler.

In the vast majority of operational cases, a consumer is started by the component host. The component host starts an actor supervisor and manages the individual actors used by the consumers operating in the host.

It can be useful under the right conditions to exercise a consumer directly.

Actor::Supervisor.start do
  Controls::Consumer::Example.start(
    category,
    condition: condition,
    correlation: correlation_category,
    position_update_interval: position_update_interval
  )
end

Eventide uses the ntl-actor (opens new window) implementation of the actor model.

Note: As an alternative to starting a consumer within an actor supervisor in order to exercise it directly, a `sleep` can be issued immediately after starting a consumer.

# Pub/Sub and Correlation

When using Pub/Sub, a service will use a consumer to subscribe to events from an external service. However, the consumer may not want to process all events published by that external service. It will likely only want to process messages that are returning to the originating service from the external service.

For example, an account component processes all withdrawal and deposit transactions for an entire company. A funds transfer component will send withdrawal and deposit transactions to the account component, and then will want to be notified when these operations have been processed. The funds transfer component will want to process only those accounting transactions that originated from operations sent to the account component from the funds transfer component.

Before the funds transfer component sends messages to the account component, it sets the messages' correlation_stream_name metadata attribute to the funds transfer's stream name.

The correlation stream name is like a return address. It's a way to give the message some information about the component that the message originated from. This information is carried from message to message in a workflow until it ultimately returns to the originating component.






 


category = 'account'
correlation_category = 'fundsTransfer'

SomeConsumer.start(
  category,
  correlation: correlation_category
)

WARNING

The value of the correlation argument must be a category and not a full stream name. An error will be raised and the consumer will terminate if the value is set to a stream name.

In order for an event written to an external service's stream to carry the correlation information from the originating service, the outbound message being written to the external service must have its correlation_stream_name attribute set to the current service's stream name.





 



stream_name = 'account-123'
correlation_stream_name = 'fundsTransfer-456'

command = Withdraw.new
command.metadata.correlation_stream_name = correlation_stream_name

write.(command, stream_name)

In the external service's command handler, the resulting event written must preserve the correlation data from message to message.

The follow constructor of messages is the mechanism that preserves message metadata, including the correlation_stream_name attribute.






 






handle Withdraw do |withdraw|
  some_id = withdraw.some_id

  # The follow constructor copies the correlation metadata from
  # the input command to the output event
  withdrawn = Withdrawn.follow(withdraw)

  stream_name = stream_name(some_id)

  write.(withdrawn , stream_name)
end

The originating service can now select the events written to this external service's stream based on the correlation data preserved in the events.

Postgres' ability to select events based on the content of specific attributes of the message metadata is the underlying mechanism by which this is implemented.

Specifying a value for the correlation parameter when starting a consumer causes the consumer's stream reader to filter the consumed stream using Postgres' support for JSON document querying.

category(metadata->>'correlationStreamName') = 'fundsTransfer'

# Consumer Groups

Consumers processing a single category can be operated in parallel in a consumer group. Consumer groups provide a means of scaling horizontally to distribute the processing load of a single category amongst a number of consumers.

Consumers operating in consumer groups process a single category, with each consumer in the group processing messages that are not processed by any other consumer in the group.

DANGER

Consumers operated in consumer groups must be used in conjunction with the identifier attribute, or else the individual consumers in a consumer group will overwrite each other's position records.

Specify both the group_size argument and the group_member argument to enlist a consumer in a consumer group. The group_size argument specifies the total number of consumers participating in the group. The group_member argument specifies the unique ordinal ID of a consumer. A consumer group with three members will have a group_size of 3, and will have members with group_member numbers 0, 1, and 2.






 
 


group_size = 3
group_member = 0

SomeConsumer.start(
  category,
  group_size: group_size,
  group_member: group_member
)

WARNING

A consumer that is a member of a group must also have a unique identifier so that each consumer in a group will write the consumer's position to and read from distinct position streams. See the Position Store topic for more details.

Consumer groups ensure that any given stream is processed by a single consumer, and that the consumer processing the stream is always the same consumer. This is achieved by the consistent hashing of a message's stream name.

A stream name's cardinal ID is hashed to a 64-bit integer, and the modulo of that number by the consumer group size yields a consumer group member number that will consistently process that stream name.

Specifying values for the consumer_group_size and consumer_group_member consumer causes the query for messages to include a condition that is based on the hash of the stream's cardinal ID modulo of the group size, and the consumer member number.

WHERE @hash_64(cardinal_id(stream_name)) % {group_size} = {group_member}

WARNING

Consumer groups should always be used in conjunction with the concurrency protection offered by the message writer. Handler logic should always write messages using the writer's expected_version feature, irrespective of the use of consumer groups. However, the use of concurrency protection is even more imperative when using consumer groups. For more on concurrent writes, see the writers user guide.

# Filtering Messages with a SQL Condition

Since the consumer reads the given stream using a SQL query, that query can be extended by the condition keyword argument. This further filters the messages read by the consumer.

SomeConsumer.start('someCategory', condition: 'extract(month from messages.time) = extract(month from now())')

WARNING

The SQL condition feature is deactivated by default. The feature is activated using the message_store.sql_condition Postgres configuration option: message_store.sql_condition=on. Using the feature without activating the configuration option will result in an error. See the PostgreSQL documentation for more on configuration options: https://www.postgresql.org/docs/current/config-setting.html (opens new window)

DANGER

Activating the SQL condition feature may expose the message store to unforeseen security risks. Before activating this condition, be certain that access to the message store is appropriately protected.

DANGER

Usage of this feature should be treated with caution. While this feature can be used to great effect, under certain circumstances, it can also result in messages not being processed, or even processed out of order. Ensure that you fully understand the implications before proceeding.

# Polling

A consumer starts polling the message store for new messages if a fetch of a batch returns no messages. As long as each query for messages retrieves messages, the consumer won't insert a delay between message retrieval queries.

In the event that the consumer's query for messages doesn't retrieve messages, the consumer will delay before the next query. The amount of time that the consumer delays is specified by the poll_interval_milliseconds parameter. The default value of the interval is 100 milliseconds.

By inserting a delay into the retrieval cycle when a query returns no messages, the consumer is prevented from flooding the message store database with requests for messages. The polling interval is used to relieve pressure on the message store when a category is not continually saturated with messages waiting to be processed.

If the polling interval is nil, there is no delay between fetches. The lower the value of the polling interval, the greater the number of attempts to fetch batches from the message store. This value should be carefully tuned to balance the need for low-latency and the need to not flood the message store with requests.

For more details on polling, see the Poll library (opens new window)

# Error Handling

Under normal circumstances, errors that are raised in the course of a consumer doing its work should not be caught. The purpose of an error is to signal a fatal, unrecoverable condition.

Such a condition isn't something that can be remediated at runtime, and indicates that something has happened that is so extraordinary and exceptional that it should cause the consumer to crash to the surrounding service, and to stop processing messages until operators and developers have a chance to investigate the condition.

However, there are certain special circumstances where a an error should be intercepted before allowing the error to crash the consumer. For example, when errors are published to an error reporting service, or under certain consumer-wide retry configurations.

To intercept an error, override the error_raised method.

class Consumer
  include Consumer::Postgres

  handler SomeHandler
  handler SomeOtherHandler

  def error_raised(error, message_data)
    # Do something with the error
    raise error
  end
end

DANGER

The error must be explicitly re-raised in order for the error to be able to crash the service that the consumer is hosted by. Only in the case of a retry should an error not be re-raised. In all other cases, the error must be re-raised or else the consumer will continue to process messages even though an exceptional and unrecoverable condition is in-effect.

The default implementation of the error_raised method simply re-raises the error.

Error handling can be specialized by overriding the error_raised method.

error_raised(error, message_data)

Parameters

Name Description Type
error The instance of the error that has been raised RuntimeError
message_data The message_data that was being processed when the error occurred MessageStore::MessageData

# Dispatching Messages

A consumer's reader retrieves raw MessagingStore::MessageData and dispatches each message data to each of its handlers by passing the message data to the consumer's actuator, which then passes the message data object to each handler's actuator.

A consumer's handlers are actuated in the same order in which they're declared.

Note: A new instance of a handler class is created for each message dispatched to it.

A consumer can be exercised directly by building it and then passing an instance of MessagingStore::MessageData to its actuator.

consumer = SomeConsumer.build('someStream')
consumer.(message_data)

In practice, it's not necessary for the user to interact with the consumer on this level.

# Position Store

At an interval specified by the position_update_interval, the global position of the message being read by the consumer's read is written to the message store. This allows a consumer to not have to process all of a stream's message each time the consumer is started.

By default, the position is written every 100 messages. The value in controlled using the consumer's position_update_interval.

# Position Stream and the Consumer Identifier

DANGER

If two consumers read the same stream, they must use distinct consumer identifiers. If not, these consumers will write their position records to the same stream, which will cause these consumers to skip messages that have not been processed.

The consumer writes the position to a stream derived from the name of the stream that a consumer is started with. For example, if a consumer is started with a stream named account:command, then the position is recorded in a stream named account:command+position.

The name of the position stream can be specialized by specifying a stream name qualifier with the identifier macro, or with the identifier parameter of the start method.

class Consumer
  include Consumer::Postgres

  identifier 'someUniqueIdentifier'

  handler SomeHandler
end

Consumer.start('account:command')

In the above example, the consumer's position stream would be account:command+position-someUniqueIdentifier.

Consumers can also be assigned an identifier when they are started. If an identifier macro is also declared on the consumer class, the one given when starting the consumer has precedence over the one declared on the consumer class.

In the following example, the consumer's position stream would be account:command+position-someOtherIdentifier.

Consumer.start('account:command', identifier: 'someOtherIdentifier')

# Constructing Consumers

In general, it's not necessary to construct a consumer. The general use case of a consumer is to invoke its start method.

A consumer can be constructed with its build method.

self.build(category, poll_interval_milliseconds: 100, batch_size: 1000, position_update_interval: 100, identifier: nil, condition: nil, settings: nil)

Parameters

Name Description Type
category The name of the category that the consumer will read String
poll_interval_milliseconds The frequency, in milliseconds, with which the consumer polls the message store for new messages Integer
batch_size The number of messages to retrieve in each batch fetched from the message store Integer
position_update_interval The frequency with which progress that the consumer has made through the input stream is recorded by the position store Integer
identifier Qualifier appended to the consumer's position stream name String
condition SQL condition fragment that constrains the messages of the stream that are read String
settings Settings that can configure a session object for the consumer to use, rather than the default settings read from settings/message_store_postgres.json Settings

# Overriding the Poll Interval

A consumer's poll_interval_milliseconds can be overridden using the POLL_INTERVAL_MILLISECONDS environment variable.

POLL_INTERVAL_MILLISECONDS=10 start_service.sh

# Log Tags

The following tags are applied to log messages recorded by a consumer:

Tag Description
consumer Applied to all log messages recorded by a consumer

The following tags may be applied to log messages recorded by a consumer:

Tag Description
start Applied to log messages recorded when a consumer is starting
dispatch Applied to log messages recorded when a message is being dispatched
message Applied to log messages recorded when a message is being dispatched
position Applied to log messages recorded when a position is being recorded
position_store Applied to log messages recorded by the consumer's position store
get Applied to log messages recorded while getting an position record from the position store
put Applied to log messages recorded while putting a position record to the position store

See the logging user guide for more on log tags.