# Message Writer
The message writer writes instances of message objects to the message store. It converts them to the more raw, low level message data form which can then be converted to JSON and stored in the database.
# Example
deposited = Deposited.build(some_deposit_data)
stream_name = "account-123"
Messaging::Postgres::Write.(deposited, stream_name)
# Message Writer Facts
- The message writer can write one message or a batch of messages
- A write is always made to a single stream
- The writer can operate in a diagnostic mode, which records the messages written to it in-memory for later inspection
- A writer can protect writes against concurrency using its
expected_version
argument - Coordination of workflows between streams can be effected using the writer's facility for replies and replying
- Actuating a writer can be done either from its class interface or its instance interface
- The writer provides a diagnostic substitute that records data about the write operations actuated
# Messaging::Postgres::Write Class
The Write
class is a concrete class from the Messaging::Postgres
library and namespace.
The Messaging::Postgres::Write
class provides:
- Actuator methods for both the class and instance interface that write messages to the specified stream
- The
initial
method that writes a message and assures that the message written is the first message in a stream - The
reply
method that is a shortcut for writing a reply message to the reply stream name registered in the message's metadata - A substitute implementation of a write than can be used in diagnostic contexts, such as testing.
# Writing a Message
call(message, stream_name, expected_version: nil, reply_stream_name: nil)
Returns
Position of the message written.
Alias
write
Parameters
Name | Description | Type |
---|---|---|
message | The message to be written | Messaging::Message or Array |
stream_name | The stream name to write the message to | String |
expected_version | Expected version of the stream at the time of the write | Integer |
reply_stream_name | Name of stream that the receiver uses to reply to the message | String |
The writer is a callable object. It's actuated using the .()
convention.
write.(some_message, some_stream)
Conversely, the writer can be actuated by directly invoking the call
method. It can also be actuated via the write
alias, although this option is rarely exercised in practice.
Note: Streams only come into existence when messages are written to them. There's no need to create a stream before using it. A stream is created implicitly by an event having been written to it.
# Assuring an Initial Write
initial(message, stream_name)
Returns
Position of the message written.
Alias
write_initial
Parameters
Name | Description | Type |
---|---|---|
message | The initial message to be written | Messaging::Message |
stream_name | The stream name to write the initial message to | String |
Writing an event with the initial
method is a shortcut for writing a message with the value of the expected_version
argument set to -1
.
write.initial(some_message, some_stream)
# Is equivalent to
write.(some_message, some_stream, expected_version: -1)
The version of a stream that has no events written to it - and thus a stream that doesn't exist yet - is -1
.
To assure that a message is written in the first position of a stream - position 0
- write the message with an expected version of -1
.
If there is already any messages in the stream, the stream's version will be a value greater than -1
. Attempting an initial write of a message into a stream that has messages in it will result in the MessageStore::ExpectedVersion::Error
being raised.
write = Write.build
write.(some_message, some_stream)
write.initial(some_other_message, some_stream)
# => MessageStore::ExpectedVersion::Error (Wrong expected version: -1 (Stream: some_stream, Stream Version: 0)
This pattern is useful for proving uniqueness or for reserving something, for example: a unique username, a seat on a flight, a purchase, or a concert ticket. It's also useful in certain idempotence protection patterns.
# Replying
# The reply_stream_name Parameter
The reply_stream_name
argument passed to the writer is similar in principle to a callback. In this case, it's a callback address.
When two (or more) components are coordinating with each other, it's not uncommon for a message to be sent to a component with information about how that component should report back to the first component.
write.(some_message, 'otherServiceStream', reply_stream_name: 'thisServiceStream')
In the above example, the "thisServiceStream" is the stream name of the service that the handler is running in. The "otherServiceStream" is the stream name of a service that the message is being "sent" to.
By specifying a reply_stream_name
, the some_message
message's metadata will contain the reply_stream_name
value. That value is used by the other service that receives this message, and may send a reply as part of processing that message.
Note: The reply pattern is typically only useful in point-to-point messaging scenarios where Pub/Sub isn't (or can't) be used, and explicit reply commands are used instead of event subscriptions.
# The reply
Method
reply(message)
Returns
Position of the message written.
Parameters
Name | Description | Type |
---|---|---|
message | The message written to the reply stream | Messaging::Message |
A message whose metadata contains a reply stream name can be replied to using the writer's reply
method.
The message will be written to the stream name contained in the metadata's reply_stream_name
attribute.
If the reply message metadata's reply_stream_name
attribute is nil when it is passed to the reply
method, the Messaging::Write::Error
is raised.
The reply message metadata's reply_stream_name
attribute is cleared (set to nil
) once the reply has been executed.
The reply
method depends on having maintained the contents of the reply_stream_name
attribute through all of the steps of a messaging workflow. A messaging workflow is commonly exemplified by the use of Message.follow(previous_message)
in a handler.
handle Something do |something|
account, version = store.fetch(something.id, include: :version)
# Without using follow, the reply_stream_name contained
# in the something message's metadata will not be carried
# forward into the some_message message, and the next
# handler in the workflow will not be able to reply
some_message = SomeMessage.follow(something)
# ...
write.(some_message, some_stream, expected_version: version)
end
# Constructing a Writer
Writers can be constructed in one of two ways
- Via the constructor
- Via the initializer
# Via the Constructor
self.build(session: nil)
The constructor not only instantiates the writer, but also invokes the writer's configure
instance method, which constructs the writer's operational dependencies.
writer = Write.build
Returns
Instance of the Messaging::Postgres::Write
class.
Parameters
Name | Description | Type |
---|---|---|
session | An existing session object to use, rather than allowing the writer to create a new session | MessageStore::Postgres::Session |
Note: If the session
argument is nil, a new session will be constructed and assigned to the writer.
# Via the Initializer
self.initialize()
Returns
Instance of the Messaging::Postgres::Write
class.
By constructing a writer using the initializer, the writer's dependencies are not set to operational dependencies. They remain inert substitutes.
TIP
See the useful objects user guide for background on inert substitutes.
# Assigning a Writer as a Dependency
self.configure(receiver, attr_name: :write, session: nil)
Constructs an instance of the writer and assigns it to the receiver's write
attribute. By default, the receiving attribute's name is expected to be write
, but it can be altered with the use of the attr_name
parameter.
something = Something.new
Messaging::Postgres::Write.configure(something)
something.write
# => #<Messaging::Postgres::Write:0x...>
Parameters
Name | Description | Type |
---|---|---|
receiver | The object that will receive the constructed writer | Object |
attr_name | The receiver's attribute that will be assigned the constructed writer | Symbol |
session | An existing session object to use, rather than allowing the writer to create a new session | MessageStore::Postgres::Session |
TIP
See the useful objects user guide for background on configuring dependencies.
# Log Tags
The following tags are applied to log messages recorded by a message writer:
Tag | Description |
---|---|
write | Applied to all log messages recorded by a message writer |
messaging | Applied to all log messages recorded inside the Messaging namespace |
The following tags may be applied to log messages recorded by a message writer:
Tag | Description |
---|---|
reply | Applied to log messages written by the message writer when replying to a message |
message | Applied to log messages that record the writing of a typed message |
data | Applied to log messages that record the data content of a typed message |
See the logging user guide for more on log tags.
Related