Message Writer

The message writer writes instances of message objects to the message store. It converts them to the more raw, low level message data form which can then be converted to JSON and stored in the database.

Example

deposited = Deposited.build(some_deposit_data)

stream_name = "account-123"

Messaging::Postgres::Write.(deposited, stream_name)

Message Writer Facts

  • The message writer can write one message or a batch of messages
  • A write is always made to a single stream
  • The writer can operate in a diagnostic mode, which records the messages written to it in-memory for later inspection
  • A writer can protect writes against concurrency using its expected_version argument
  • Coordination of workflows between streams can be effected using the writer's facility for replies and replying
  • Actuating a writer can be done either from its class interface or its instance interface
  • The writer provides a diagnostic substitute that records data about the write operations actuated

Messaging::Postgres::Write Class

The Write class is a concrete class from the Messaging::Postgres library and namespace.

The Messaging::Postgres::Write class provides:

  • Actuator methods for both the class and instance interface that write messages to the specified stream
  • The initial method that writes a message and assures that the message written is the first message in a stream
  • The reply method that is a shortcut for writing a reply message to the reply stream name registered in the message's metadata
  • A substitute implementation of a write than can be used in diagnostic contexts, such as testing.

Writing a Message

call(message, stream_name, expected_version: nil, reply_stream_name: nil)

Returns

Position of the message written.

Alias

write

Parameters

Name Description Type
message The message to be written Messaging::Message or Array
stream_name The stream name to write the message to String
expected_version Expected version of the stream at the time of the write Integer
reply_stream_name Name of stream that the receiver uses to reply to the message String

The writer is a callable object. It's actuated using the .() convention.

write.(some_message, some_stream)

Conversely, the writer can be actuated by directly invoking the call method. It can also be actuated via the write alias, although this option is rarely exercised in practice.

Note: Streams only come into existence when messages are written to them. There's no need to create a stream before using it. A stream is created implicitly by an event having been written to it.

Assuring an Initial Write

initial(message, stream_name)

Returns

Position of the message written.

Alias

write_initial

Parameters

Name Description Type
message The initial message to be written Messaging::Message
stream_name The stream name to write the initial message to String

Writing an event with the initial method is a shortcut for writing a message with the value of the expected_version argument set to -1.

write.initial(some_message, some_stream)
# Is equivalent to
write.(some_message, some_stream, expected_version: -1)

The version of a stream that has no events written to it - and thus a stream that doesn't exist yet - is -1.

To assure that a message is written in the first position of a stream - position 0 - write the message with an expected version of -1.

If there is already any messages in the stream, the stream's version will be a value greater than -1. Attempting an initial write of a message into a stream that has messages in it will result in the MessageStore::ExpectedVersion::Error being raised.

write = Write.build
write.(some_message, some_stream)
write.initial(some_other_message, some_stream)
# => MessageStore::ExpectedVersion::Error (Wrong expected version: -1 (Stream: some_stream, Stream Version: 0)

This pattern is useful for proving uniqueness or for reserving something, for example: a unique username, a seat on a flight, a purchase, or a concert ticket. It's also useful in certain idempotence protection patterns.

Replying

The reply_stream_name Parameter

The reply_stream_name argument passed to the writer is similar in principle to a callback. In this case, it's a callback address.

When two (or more) components are coordinating with each other, it's not uncommon for a message to be sent to a component with information about how that component should report back to the first component.

write.(some_message, 'otherServiceStream', reply_stream_name: 'thisServiceStream')

In the above example, the "thisServiceStream" is the stream name of the service that the handler is running in. The "otherServiceStream" is the stream name of a service that the message is being "sent" to.

By specifying a reply_stream_name, the some_message message's metadata will contain the reply_stream_name value. That value is used by the other service that receives this message, and may send a reply as part of processing that message.

Note: The reply pattern is typically only useful in point-to-point messaging scenarios where Pub/Sub isn't (or can't) be used, and explicit reply commands are used instead of event subscriptions.

The reply Method

reply(message)

Returns

Position of the message written.

Parameters

Name Description Type
message The message written to the reply stream Messaging::Message

A message whose metadata contains a reply stream name can be replied to using the writer's reply method.

The message will be written to the stream name contained in the metadata's reply_stream_name attribute.

If the reply message metadata's reply_stream_name attribute is nil when it is passed to the reply method, the Messaging::Write::Error is raised.

The reply message metadata's reply_stream_name attribute is cleared (set to nil) once the reply has been executed.

The reply method depends on having maintained the contents of the reply_stream_name attribute through all of the steps of a messaging workflow. A messaging workflow is commonly exemplified by the use of Message.follow(previous_message) in a handler.

handle Something do |something|
  account, version = store.fetch(something.id, include: :version)

  # Without using follow, the reply_stream_name contained
  # in the something message's metadata will not be carried
  # forward into the some_message message, and the next
  # handler in the workflow will not be able to reply
  some_message = SomeMessage.follow(something)

  # ...

  write.(some_message, some_stream, expected_version: version)
end

Constructing a Writer

Writers can be constructed in one of two ways

  • Via the constructor
  • Via the initializer

Via the Constructor

self.build(session: nil)

The constructor not only instantiates the writer, but also invokes the writer's configure instance method, which constructs the writer's operational dependencies.

writer = Write.build

Returns

Instance of the Messaging::Postgres::Write class.

Parameters

Name Description Type
session An existing session object to use, rather than allowing the writer to create a new session MessageStore::Postgres::Session

Note: If the session argument is nil, a new session will be constructed and assigned to the writer.

Via the Initializer

self.initialize()

Returns

Instance of the Messaging::Postgres::Write class.

By constructing a writer using the initializer, the writer's dependencies are not set to operational dependencies. They remain inert substitutes.

TIP

See the useful objects user guide for background on inert substitutes.

Assigning a Writer as a Dependency

self.configure(receiver, attr_name: :write, session: nil)

Constructs an instance of the writer and assigns it to the receiver's write attribute. By default, the receiving attribute's name is expected to be write, but it can be altered with the use of the attr_name parameter.

something = Something.new
Messaging::Postgres::Write.configure(something)

something.write
# => #<Messaging::Postgres::Write:0x...>

Parameters

Name Description Type
receiver The object that will receive the constructed writer Object
attr_name The receiver's attribute that will be assigned the constructed writer Symbol
session An existing session object to use, rather than allowing the writer to create a new session MessageStore::Postgres::Session

TIP

See the useful objects user guide for background on configuring dependencies.

Log Tags

The following tags are applied to log messages recorded by a message writer:

Tag Description
write Applied to all log messages recorded by a message writer
messaging Applied to all log messages recorded inside the Messaging namespace

The following tags may be applied to log messages recorded by a message writer:

Tag Description
reply Applied to log messages written by the message writer when replying to a message
message Applied to log messages that record the writing of a typed message
data Applied to log messages that record the data content of a typed message

See the logging user guide for more on log tags.


Related