Message Writer

The message writer writes instances of message objects to the message store. It converts them to the more raw, low level message data form which can then be converted to JSON and stored in the database.

Example

deposited = Deposited.build(some_deposit_data)

stream_name = "account-123"

Messaging::Postgres::Write.(deposited, stream_name)

Message Writer Facts

  • The message writer can write one message or a batch of messages
  • A write is always made to a single stream
  • The writer can operate in a diagnostic mode, which records the messages written to it in-memory for later inspection
  • A writer can protect writes against concurrency using its expected_version argument
  • Coordination of workflows between streams can be effected using the writer's facility for replies and replying
  • Actuating a writer can be done either from its class interface or its instance interface
  • The writer provides a diagnostic substitute that records data about the write operations actuated

Messaging::Postgres::Write Class

The Write class is a concrete class from the Messaging::Postgres library and namespace.

The Messaging::Postgres::Write class provides:

  • Actuator methods for both the class and instance interface that write messages to the specified stream
  • The initial method that writes a message and assures that the message written is the first message in a stream
  • The reply method that is a shortcut for writing a reply message to the reply stream name registered in the message's metadata
  • A substitute implementation of a write than can be used in diagnostic contexts, such as testing.

Writing a Message

call(message, stream_name, expected_version: nil, reply_stream_name: nil)

Returns

Position of the message written.

Alias

write

Parameters

NameDescriptionType
messageThe message to be writtenMessaging::Message or Array
stream_nameThe stream name to write the message toString
expected_versionExpected version of the stream at the time of the writeInteger
reply_stream_nameName of stream that the receiver uses to reply to the messageString

The writer is a callable object. It's actuated using the .() convention.

write.(some_message, some_stream)

Conversely, the writer can be actuated by directly invoking the call method. It can also be actuated via the write alias, although this option is rarely exercised in practice.

Note: Streams only come into existence when messages are written to them. There's no need to create a stream before using it. A stream is created implicitly by an event having been written to it.

Assuring an Initial Write

initial(message, stream_name)

Returns

Position of the message written.

Alias

write_initial

Parameters

NameDescriptionType
messageThe initial message to be writtenMessaging::Message
stream_nameThe stream name to write the initial message toString

Writing an event with the initial method is a shortcut for writing a message with the value of the expected_version argument set to -1.

write.initial(some_message, some_stream)
# Is equivalent to
write.(some_message, some_stream, expected_version: -1)

The version of a stream that has no events written to it - and thus a stream that doesn't exist yet - is -1.

To assure that a message is written in the first position of a stream - position 0 - write the message with an expected version of -1.

If there is already any messages in the stream, the stream's version will be a value greater than -1. Attempting an initial write of a message into a stream that has messages in it will result in the MessageStore::ExpectedVersion::Error being raised.

write = Write.build
write.(some_message, some_stream)
write.initial(some_other_message, some_stream)
# => MessageStore::ExpectedVersion::Error (Wrong expected version: -1 (Stream: some_stream, Stream Version: 0)

This pattern is useful for proving uniqueness or for reserving something, for example: a unique username, a seat on a flight, a purchase, or a concert ticket. It's also useful in certain idempotence protection patterns.

Replying

The reply_stream_name Parameter

The reply_stream_name argument passed to the writer is similar in principle to a callback. In this case, it's a callback address.

When two (or more) components are coordinating with each other, it's not uncommon for a message to be sent to a component with information about how that component should report back to the first component.

write.(some_message, 'otherServiceStream', reply_stream_name: 'thisServiceStream')

In the above example, the "thisServiceStream" is the stream name of the service that the handler is running in. The "otherServiceStream" is the stream name of a service that the message is being "sent" to.

By specifying a reply_stream_name, the some_message message's metadata will contain the reply_stream_name value. That value is used by the other service that receives this message, and may send a reply as part of processing that message.

Note: The reply pattern is typically only useful in point-to-point messaging scenarios where Pub/Sub isn't (or can't) be used, and explicit reply commands are used instead of event subscriptions.

The reply Method

reply(message)

Returns

Position of the message written.

Parameters

NameDescriptionType
messageThe message written to the reply streamMessaging::Message

A message whose metadata contains a reply stream name can be replied to using the writer's reply method.

The message will be written to the stream name contained in the metadata's reply_stream_name attribute.

If the reply message metadata's reply_stream_name attribute is nil when it is passed to the reply method, the Messaging::Write::Error is raised.

The reply message metadata's reply_stream_name attribute is cleared (set to nil) once the reply has been executed.

The reply method depends on having maintained the contents of the reply_stream_name attribute through all of the steps of a messaging workflow. A messaging workflow is commonly exemplified by the use of Message.follow(previous_message) in a handler.

handle Something do |something|
  account, version = store.fetch(something.id, include: :version)

  # Without using follow, the reply_stream_name contained
  # in the something message's metadata will not be carried
  # forward into the some_message message, and the next
  # handler in the workflow will not be able to reply
  some_message = SomeMessage.follow(something)

  # ...

  write.(some_message, some_stream, expected_version: version)
end

Constructing a Writer

Writers can be constructed in one of two ways

  • Via the constructor
  • Via the initializer

Via the Constructor

self.build(session: nil)

The constructor not only instantiates the writer, but also invokes the writer's configure instance method, which constructs the writer's operational dependencies.

writer = Write.build

Returns

Instance of the Messaging::Postgres::Write class.

Parameters

NameDescriptionType
sessionAn existing session object to use, rather than allowing the writer to create a new sessionMessageStore::Postgres::Session

Note: If the session argument is nil, a new session will be constructed and assigned to the writer.

Via the Initializer

self.initialize()

Returns

Instance of the Messaging::Postgres::Write class.

By constructing a writer using the initializer, the writer's dependencies are not set to operational dependencies. They remain inert substitutes.

TIP

See the useful objects user guide for background on inert substitutes.

Assigning a Writer as a Dependency

self.configure(receiver, attr_name: :write, session: nil)

Constructs an instance of the writer and assigns it to the receiver's write attribute. By default, the receiving attribute's name is expected to be write, but it can be altered with the use of the attr_name parameter.

something = Something.new
Messaging::Postgres::Write.configure(something)

something.write
# => #<Messaging::Postgres::Write:0x...>

Parameters

NameDescriptionType
receiverThe object that will receive the constructed writerObject
attr_nameThe receiver's attribute that will be assigned the constructed writerSymbol
sessionAn existing session object to use, rather than allowing the writer to create a new sessionMessageStore::Postgres::Session

TIP

See the useful objects user guide for background on configuring dependencies.

Log Tags

The following tags are applied to log messages recorded by a message writer:

TagDescription
writeApplied to all log messages recorded by a message writer
messagingApplied to all log messages recorded inside the Messaging namespace

The following tags may be applied to log messages recorded by a message writer:

TagDescription
replyApplied to log messages written by the message writer when replying to a message
messageApplied to log messages that record the writing of a typed message
dataApplied to log messages that record the data content of a typed message

See the logging user guide for more on log tags.


Related