Messaging Writer

The messaging writer writes instances of message objects to the message store. It converts them to the more raw, low level message data form which can then be converted to JSON and stored in the database.

Example

deposited = Deposited.build(some_deposit_data)

stream_name = "account-123"

Messaging::Postgres::Write.(deposited, stream_name)

Writer Facts

  • The messaging writer can write one message or a batch of messages
  • A write is always made to a single stream
  • The writer can operate in a diagnostic mode, which records the messages written to it in-memory for later inspection
  • A writer can protect writes against concurrency using its expected_version argument
  • Coordination of workflows between streams can be effected using the writer's facility for replies and replying
  • Actuating a writer can be done either from its class interface or its instance interface

Messaging::Write Class

The Write class is a concrete class from the Messaging::Postgres library and namespace.

The Messaging::Write class provides the following:

  • Actuator methods for both the class and instance interface that write messages to the specified stream
  • The initial method that writes a message and assures that the message written is the first message in a stream
  • The reply method that is a shortcut for writing a reply message to the reply stream name registered in the message's metadata
  • A substitute implementation of a write than can be used in diagnostic contexts, such as testing.

Writing a Message

call(message, stream_name, expected_version: nil, reply_stream_name: nil)

Returns

Position of the message written.

Alias

write

Parameters

NameDescriptionType
messageThe message to be writtenMessaging::Message or Array
stream_nameThe stream name to write the message toString
expected_versionExpected version of the stream at the time of the writeInteger
reply_stream_nameName of stream that the receiver uses to reply to the messageString

The writer is a callable object. It's actuated using the .() convention.

write.(some_message, some_stream)

Conversely, the writer can be actuated by directly invoking the call method. It can also be actuated via the write alias, although this option is rarely exercised in practice.

Note: Streams only come into existence when messages are written to them. There's no need to create a stream before using it. A stream is created implicitly by an event having been written to it.

Writing Atomic Batches

Either a single message or an array of messages can be passed to the writer's actuator.

batch = [some_message, some_other_message]
write.(batch, some_stream)

When an array is passed to the writer, it is written as an atomic batch. If the write fails for any reason while the batch is being written, none of the messages will be written.

Note: As with individual messages, a batch is written to a single stream. Atomic writes are only possible to one stream.

Expected Version and Concurrency

The expected_version argument is typically used as an optimistic concurrency protection. It can also be used to assure that a message written to a stream is the first message in the stream.

In the typical handler workflow, a stream's version is retrieved along with the stream's projected entity at the start of a handler. That retrieved version is then included as the value of the expected_version argument to the writer.

handle Something do |something|
  account, version = store.fetch(something.id, include: :version)

  # ...

  write.(some_message, some_stream, expected_version: version)
end

If the expected version and the stream version no longer match at the time of the write, the MessageStore::ExpectedVersion::Error is raised.

Concurrency

WARNING

Except when running multiple instances of a component for hot fail-over, concurrent writing to an event stream is considered an anomaly. It's not expected that two separate writers would be writing to the same event stream concurrently as this would violate the authority of a component over its streams. Only one instance of a hosted component should be empowered to write to a stream. If two instances of the same component are writing to the same event stream, then appropriate measures must be taken to retry the writes.

Assuring an Initial Write

initial(message, stream_name)

Returns

Position of the message written.

Alias

write_initial

Parameters

NameDescriptionType
messageThe initial message to be writtenMessaging::Message
stream_nameThe stream name to write the initial message toString

Writing an event with the initial method is a shortcut for writing a message with the value of the expected_version argument set to -1.

write.initial(some_message, some_stream)

# Is equivalent to

write.(some_message, some_stream, expected_version: -1)

The version of a stream that has no events written to it - and thus a stream that doesn't exist yet - it -1.

To assure that a message is written in the first position of a stream - position 0 - write the message with an expected version of -1.

If there is already any messages in the stream, the stream's version will be a value greater than -1. Attempting an initial write of a message into a stream that has messages in it will result in the MessageStore::ExpectedVersion::Error being raised.

write = Write.build
write.(some_message, some_stream)
write.initial(some_other_message, some_stream)
# => MessageStore::ExpectedVersion::Error (Wrong expected version: -1 (Stream: some_stream, Stream Version: 0)

This pattern is useful for proving uniqueness or for reserving something, for example: a unique username, a seat on a flight, a purchase, or a concert ticket. It's also useful in certain idempotence protection patterns.

The :no_stream Expected Version

The :no_stream symbol can be substituted for -1 when writing with an expected version of -1.

Write.(some_message, some_stream, expected_version: :no_stream)

Replying

The reply_stream_name Parameter

The reply_stream_name argument passed to the writer is similar in principle to a callback. In this case, it's a callback address.

When two (or more) components are coordinating with each other, it's not uncommon for a message to be sent to a component with information about how that component should report back to the first component.

write.(some_message, 'otherServiceStream', reply_stream_name: 'thisServiceStream')

In the above example, the "thisServiceStream" is the stream name of the service that the handler is running in. The "otherServiceStream" is the stream name of a service that the message is being "sent" to.

By specifying a reply_stream_name, the some_message message's metadata will contain the reply_stream_name value. That value is used by the other service that receives this message, and may send a reply as part of processing that message.

Note: The reply pattern is typically only useful in point-to-point messaging scenarios where Pub/Sub isn't (or can't) be used, and explicit reply commands are used instead of event subscriptions.

The reply Method

reply(message)

Returns

Position of the message written.

Parameters

NameDescriptionType
messageThe message written to the reply streamMessaging::Message

A message whose metadata contains a reply stream name can be replied to using the writer's reply method.

The message will be written to the stream name contained in the metadata's reply_stream_name attribute.

If the reply message metadata's reply_stream_name attribute is nil when it is passed to the reply method, the Messaging::Write::Error is raised.

The reply message metadata's reply_stream_name attribute is cleared (set to nil) once the reply has been executed.

The reply method depends on having maintained the contents of the reply_stream_name attribute through all of the steps of a messaging workflow. A messaging workflow is commonly exemplified by the use of Message.follow(previous_message) in a handler.

handle Something do |something|
  account, version = store.fetch(something.id, include: :version)

  # Without using follow, the reply_stream_name contained
  # in the something message's metadata will not be carried
  # forward into the some_message message, and the next
  # handler in the workflow will not be able to reply
  some_message = SomeMessage.follow(something)

  # ...

  write.(some_message, some_stream, expected_version: version)
end

Constructing a Writer

Writers can be constructed in one of two ways

  • Via the initializer
  • Via the constructor

Via the Initializer

self.new()

Returns

Instance of the Messaging::Postgres::Write class.

By constructing a writer using the initializer, the writer's dependencies are not set to operational dependencies. They remain inert substitutes.

TIP

See the useful objects user guide for background on inert substitutes.

Via the Constructor

self.build(session: nil)

The constructor not only instantiates the writer, but also invokes the writer's configure instance method, which constructs the writer's operational dependencies.

writer = Write.build

Returns

Instance of the Messaging::Postgres::Write class.

Parameters

NameDescriptionType
sessionAn existing session object to use, rather than allowing the writer to create a new sessionMessageStore::Postgres::Session

Note: If the session argument is nil, a new session will be constructed and assigned to the writer.

Assigning a Writer as a Dependency

self.configure(receiver, attr_name: :write, session: nil)

Constructs an instance of the writer and assigns it to the receiver's write attribute. By default, the receiving attribute's name is expected to be write, but it can be altered with the use of the attr_name parameter.

something = Something.new
Messaging::Postgres::Write.configure(receiver)

something.write
# => #<Messaging::Postgres::Write:0x...>

Parameters

NameDescriptionType
receiverThe object that will receive the constructed writerObject
attr_nameThe receiver's attribute that will be assigned the constructed writerSymbol
sessionAn existing session object to use, rather than allowing the writer to create a new sessionMessageStore::Postgres::Session

TIP

See the useful objects user guide for background on configuring dependencies.