Consumers

A consumer continuously reads messages from a single stream and dispatches the messages to the handlers that have been registered to the consumer.

Many consumers can be hosted together in a single service, allowing a component to be fed messages from many streams.

A consumer keeps track of the progress that it has made through the stream that it's reading, allowing a consumer to pick up where it left off after it has been restarted.

It controls polling rates, pre-fetching batches of messages, the dispatching of messages to handlers, and the storage of message positions of messages that have already been read.

When a consumer has read through its stream's messages, it continues reading, waiting for new messages to arrive.

In messaging parlance, a consumer acts as a subscriber.

Example

class Consumer
  include Consumer::Postgres

  identifier 'someConsumer' # Note: This is optional

  handler SomeHandler
  handler SomeOtherHandler
end

Consumer Facts

  • A consumer reads from a single stream, usually a category stream
  • A consumer has one or more handlers
  • Messages are dispatched to a consumer's handlers in the order that they are declared
  • The consumer periodically records it's reader's position to the message store
  • Messages are retrieved in batches whose size can be configured
  • When there are no messages retrieved the consumer polls the message store
  • The polling interval is configurable
  • A consumer can be configured with a condition that filters the messages retrieved
  • A consumer can be configured with a correlation value filters based on messages' correlation stream name
  • A consumer can be configured with consumer group parameters for distributing messages amongst multiple consumers

Consumer::Postgres Module

A class becomes a consumer by including the Consumer::Postgres module from the Consumer::Postgres library and namespace.

The Consumer::Postgres module affords the receiver with:

  • The start method for starting a consumer and supplying it with arguments that control its behavior
  • The handler class macro used for adding handlers to a consumer
  • The identifier class macro used to declare an arbitrary string prefix used to compose the the stream name used by the position store

Registering a Handler

A handler is registered with the handler macro.

handler SomeHandler

The argument is a class name of a handler that the consumer will dispatch messages to.

Each handler is added to the consumer class's handler_registry.

If a handler is registered more than once, the Consumer::HandlerRegistry::Error will be raised when the consumer class is loaded.

Starting a Consumer

Start a consumer by invoking a consumer class's start method, passing the stream name that the consumer will read messages from.

SomeConsumer.start('someStream')

Note that for typical uses, the stream that a consumer reads is almost always a category stream.

self.start(stream_name, poll_interval_milliseconds: 100, batch_size: 1000, position_update_interval: 100, identifier: nil, correlation: nil, condition: nil, settings: nil)

Parameters

NameDescriptionType
stream_nameThe name of the stream (typically a category) that the consumer will readString
poll_interval_millisecondsThe frequency, in milliseconds, with which the consumer polls the message store for new messagesInteger
batch_sizeThe number of messages to retrieve in each batch fetched from the message storeInteger
position_update_intervalThe frequency with which progress that the consumer has made through the input stream is recorded by the position storeInteger
identifierQualifier appended to the consumer's position stream nameString
correlationA category name used to restrict the messages consumed to those whose correlation stream is in the specified correlation category (this feature is used to effect pub/sub)String
group_sizeThe size of a group of consumers that are cooperatively processing a single input streamInteger
group_memberThe member number of an individual consumer that is participating in a consumer groupInteger
conditionSQL condition fragment that constrains the messages of the stream that are readString
settingsSettings that can configure a session object for the consumer to use, rather than the default settings read from settings/message_store_postgres.jsonSettings

Consumers Must Be Started Within an Active Actor Supervisor

The threading model of actors requires that a consumer be started within an active actor supervisor. If a consumer is started without being under supervision, its reader will not start, and the consumer will not dispatch messages to its handler.

In the vast majority of operational cases, a consumer is started by the component host. The component host starts an actor supervisor and manages the individual actors used by the consumers operating in the host.

It can be useful under the right conditions to exercise a consumer directly.

Actor::Supervisor.start do
  Controls::Consumer::Example.start(
    category,
    condition: condition,
    correlation: correlation_cateogry,
    position_update_interval: position_update_interval
  )
end

Eventide uses the ntl-actor implementation of the actor model.

Note: As an alternative to starting a consumer within an actor supervisor in order to exercise it directly, a `sleep` can be issued immediately after starting a consumer.

Correlation and Pub/Sub

It's common when using pub/sub that a service will use a consumer to subscribe to events from an external service. However, if that external service is leveraged by a number of different services, then only some of the events it publishes will pertain to messaging workflows started by the originating, or coordinating, service.

If a service is only concerned with some of the events published by an external service, then the consumer can use the correlation parameter to filter the messages received from external service's stream.

category = <some external service's category stream>
correlation_cateogry = <this service's category stream>

SomeConsumer.start(
  category,
  correlation: correlation_cateogry)

In order for an event written to an external service's stream to carry the correlation information from the originating service, the outbound message being written to the external service must have its correlation_stream_name attribute set to the current service's stream name.

stream_name = <some external service's stream name>
correlation_stream_name = <this service's stream name>

command = SomeCommandToExternalService.new

command.metadata.correlation_stream_name = correlation_stream_name
write.(command, stream_name)

In the external service's command handler, the resulting event written must preserve the correlation data from message to message.

The follow constructor of messages is the mechanism that preserves message metadata, including the correlation_stream_name attribute.

handle SomeCommandToExternalService do |some_command_to_external_service|
  some_id = some_command_to_external_service.some_id

  # The follow constructor copies the correlation metadata from
  # the input command to the output event
  some_event = SomeEvent.follow(some_command_to_external_service)
  stream_name = stream_name(some_id)

  write.(some_event, stream_name)
end

The originating service can now select the events written to this external service's stream based on the correlation data preserved in the events.

Postgres' ability to select events based on the content of specific attributes of the message metadata is the underlying mechanism by which this is implemented.

Specifying a value for the correlation parameter when starting a consumer causes the consumer's stream reader to filter the consumed stream using Postgres' support for JSON document querying.

metadata->>'correlationStreamName' like '<some correlation category>-%'

Consumer Groups

Consumers can be operated in parallel in a consumer group. Consumer groups provide a means of scaling horizontally to distribute the processing load of a single stream amongst a number of consumers.

Consumers operating in consumer groups process a single input stream, with each consumer in the group processing messages that are not processed by any other consumer in the group.

Specify both the group_size argument and the group_member argument to enlist a consumer in a consumer group. The group_size argument specifies the total number of consumers participating in the group. The group_member argument specifies the unique ordinal ID of a consumer. A consumer group with three members will have a group_size of 3, and will have members with group_member numbers 0, 1, and 2.

group_size = 3
group_member = 0

SomeConsumer.start(
  category,
  group_size: group_size,  group_member: group_member)

Consumer groups ensure that any given stream is processed by a single consumer. The consumer that processes a stream is always the same consumer. This is achieved by the consistent hashing of a message's stream name.

A stream name is hashed to a 64-bit integer, and the modulo of that number by the consumer group size yields a consumer group member number that will consistently process that stream name.

Specifying values for the group_size and group_member parameters when starting a consumer causes the consumer's stream reader to filter the consumed stream using a query condition that is based on the hash of the stream name, the modulo of the group size, and the consumer member number.

The resulting SQL where clause reflects the following condition:

WHERE @hash_64(stream_name) % <group_size> = <group_member>

WARNING

Consumer groups should always be used in conjunction with the concurrency protection offered by the message writer. Handler logic should always write messages using the writer's expected_version feature, irrespective of the use of consumer groups. However, the use of concurrency protection is even more imperative when using consumer groups. For more on concurrent writes, see the writers user guide.

Conditions

Since the consumer reads the given stream using a SQL query, that query can be extended by the condition keyword argument. This further constrains the messages read by the consumer beyond selecting only the messages of the stream being consumed.

For example, the consumer can read messages from someCategory whose position is 0.

SomeConsumer.start('someCategory', condition: 'position == 0')

The above example isn't a realistic use of this feature. It's a contrived example merely intended to demonstrate the mechanics of the feature.

WARNING

Usage of this feature should be treated with caution. While this feature can be used to great effect, under certain circumstances, it can also result in messages not being processed, or even processed out of order. Ensure that you fully understand the implications before proceeding.

Polling

A consumer starts polling the message store for new messages if a fetch of a batch returns no messages.

A consumer's poll_interval_milliseconds controls the delay between each fetch issued by the consumer. The default value of the interval is 100 milliseconds.

The fetch is executed once per polling interval rather than executing immediately at the conclusion of the previous cycle.

If the polling interval is nil, there is no delay between fetches. The lower the value of the polling interval, the greater the number of attempts to fetch batches from the message store. This value should be carefully tuned to balance the need for low-latency and the need to not flood the message store server with requests.

The polling interval can be used to relieve pressure on the message store server when a stream is not continually saturated with messages waiting to be processed, and the consumer begins polling.

If the fetch execution time is greater than the polling interval time, the fetch is re-executed immediately at the conclusion of the previous fetch.

For more details on polling, see the Poll library

Error Handling

Under normal circumstances, errors that are raised in the course of a consumer doing its work should not be caught. The purpose of an error is to signal a fatal, unrecoverable condition.

Such a condition isn't something that can be remediated at runtime, and indicates that something has happened that is so extraordinary and exceptional that it should cause the consumer to crash to the surrounding service, and to stop processing messages until operators and developers have a chance to investigate the condition.

However, there are certain special circumstances where a an error should be intercepted before allowing the error to crash the consumer. For example, when errors are published to an error reporting service, or under certain consumer-wide retry configurations.

To intercept an error, override the error_raised method.

class Consumer
  include Consumer::Postgres

  handler SomeHandler
  handler SomeOtherHandler

  def error_raised(error, message_data)
    # Do something with the error
    raise error
  end
end

WARNING

The error must be explicitly re-raised in order for the error to be able to crash the service that the consumer is hosted by. Only in the case of a retry should an error not be re-raised. In all other cases, the error must be re-raised or else the consumer will continue to process messages even though an exceptional and unrecoverable condition is in-effect.

The default implementation of the error_raised method simply re-raises the error.

Error handling can be specialized by overriding the error_raised method.

error_raised(error, message_data)

Parameters

NameDescriptionType
errorThe instance of the error that has been raisedRuntimeError
message_dataThe message_data that was being processed when the error occurredMessageStore::MessageData

Dispatching Messages

A consumer's reader retrieves raw MessagingStore::MessageData and dispatches each message data to each of its handlers by passing the message data to the consumer's actuator, which then passes the message data object to each handler's actuator.

A consumer's handlers are actuated in the same order in which they're declared.

Note: A new instance of a handler class is created for each message dispatched to it.

A consumer can be exercised directly by building it and then passing an instance of MessagingStore::MessageData to its actuator.

consumer = SomeConsumer.build('someStream')
consumer.(message_data)

In practice, it's not necessary for the user to interact with the consumer on this level.

Position Store

At an interval specified by the position_update_interval, the global position of the message being read by the consumer's read is written to the message store. This allows a consumer to not have to process all of a stream's message each time the consumer is started.

By default, the position is written every 100 messages. The value in controlled using the consumer's position_update_interval.

Position Stream and the Consumer Identifier

WARNING

If two consumers read the same stream, they must use distinct consumer identifiers. If not, these consumers will write their position records to the same stream, which will cause these consumers to skip messages that have not been processed.

The consumer writes the position to a stream derived from the name of the stream that a consumer is started with. For example, if a consumer is started with a stream named account:command, then the position is recorded in a stream named account:command+position.

The name of the position stream can be specialized by specifying a stream name qualifier with the identifier macro, or with the identifier parameter of the start method.

class Consumer
  include Consumer::Postgres

  identifier 'someConsumer'

  handler SomeHandler
end

Consumer.start('account:command')

In the above example, the consumer's position stream would be account:command+position-someConsumer.

Consumers can also be assigned an identifier when they are started. If an identifier macro is also declared on the consumer class, the one given when starting the consumer has precedence over the one declared on the consumer class.

In the following example, the consumer's position stream would be account:command+position-otherIdentifier.

Consumer.start('account:command', identifier: 'otherIdentifier')

Constructing Consumers

In general, it's not necessary to construct a consumer. The general use case of a consumer is to invoke its start method.

A consumer can be constructed with its build method.

self.build(stream_name, poll_interval_milliseconds: 100, batch_size: 1000, position_update_interval: 100, identifier: nil, condition: nil, settings: nil)

Parameters

NameDescriptionType
stream_nameThe name of the stream that the consumer will readString
poll_interval_millisecondsThe frequency, in milliseconds, with which the consumer polls the message store for new messagesInteger
batch_sizeThe number of messages to retrieve in each batch fetched from the message storeInteger
position_update_intervalThe frequency with which progress that the consumer has made through the input stream is recorded by the position storeInteger
identifierQualifier appended to the consumer's position stream nameString
conditionSQL condition fragment that constrains the messages of the stream that are readString
settingsSettings that can configure a session object for the consumer to use, rather than the default settings read from settings/message_store_postgres.jsonSettings

Log Tags

The following tags are applied to log messages recorded by a consumer:

TagDescription
consumerApplied to all log messages recorded by a consumer

The following tags may be applied to log messages recorded by a consumer:

TagDescription
position_storeApplied to log messages recorded by the consumer's position store
getApplied to log messages recorded while getting an position record from the position store
putApplied to log messages recorded while putting a position record to the position store

See the logging user guide for more on log tags.