# Metadata
A message's metadata object contains information about the stream where the message resides, the previous message in a series of messages that make up a messaging workflow, the originating process to which the message belongs, as well as other data that are pertinent to understanding the provenance and disposition of the message.
Where as a message's data represents information pertinent to the business process that the message is involved with, a message's metadata contains information that is mechanical and infrastructural. Message metadata is data about messaging machinery, like message schema version, source stream, positions, provenance, reply address, and the like.
# Messaging::Message::Metadata Class
The Message::Metadata
class is a concrete class from the Messaging
library and namespace.
The Metadata
class provides:
- Metadata attributes
- The
follow
method that copies workflow attributes from one message's metadata instance to another message's metadata instance - The
follows?
predicate that determines whether metadata instance follows a previous metadata instance - The
reply?
predicate for determining whether a message's metadata contains reply stream data - The
correlated?
method (aliased ascorrelates?
) that determines whether a metadata instance's correlation stream name corresponds to the argument - The
identifier
method (aliased assource_message_identifier
) that returns the message identifier URI fragment - The
causation_message_identifier
method that returns the causation message identifier URI fragment
# Metadata Attributes
Name | Description | Type | Aliases |
---|---|---|---|
stream_name | The name of the stream where the message resides | String | source_message_stream_name |
position | The sequential position of the message in its stream | Integer | source_message_position |
global_position | The sequential position of the message in the entire message store | Integer | source_message_global_position, sequence |
causation_message_stream_name | The stream name of the message that precedes the message in a sequential message flow | ||
causation_message_position | The sequential position of the causation message in its stream | Integer | |
causation_message_global_position | The sequential position of the message in the entire message store | Integer | causation_sequence |
correlation_stream_name | Name of the stream that represents an encompassing business process that coordinates the sub-process that the message is a part of | String | origin_stream_name, origin |
reply_stream_name | Name of a stream where a reply should be sent as a result of processing the message | String | |
time | Timestamp that the message was written to the message store | Time | |
schema_version | Version identifier of the message schema itself | String |
# Message Workflows
When messages represent subsequent steps in a workflow, a subsequent message's metadata records elements of the preceding message's metadata. Each message in a workflow carries provenance data of the message that precedes it.
The message's implementation of follow
specifically manages the transfer of message data from the preceding message to the subsequent method, and then delegates to the metadata object to manage the transfer of message flow and provenance data between the two metadata objects.
follow(preceding_metadata)
Parameters
Name | Description | Type |
---|---|---|
preceding_metadata | Metadata instance from which to copy the message flow and provenance data from | Metadata |
There are three metadata attributes that comprise the identifying information of a message's preceding message. They are collectively referred to as causation data.
causation_message_stream_name
causation_message_position
causation_message_global_position
Each message's metadata in a workflow may also carry identifying information about the overall or coordinating workflow that the messages participates in. That identifying information is referred to as correlation data.
correlation_stream_name
Additionally, a message's metadata may carry a reply address:
reply_stream_name
See the Messaging::Writer user guide for more on replying to messages.
# Metadata Data Transfer
Provenance metadata transfer logic from preceding message to subsequent message as caused by the follow
method is:
subsequent_metadata.causation_stream_name = preceding_metadata.stream_name
subsequent_metadata.causation_position = preceding_metadata.position
subsequent_metadata.causation_global_position = preceding_metadata.global_position
subsequent_metadata.correlation_stream_name = preceding_metadata.correlation_stream_name
subsequent_metadata.reply_stream_name = preceding_metadata.reply_stream_name
# Determining Message Precedence
Metadata objects can be determined to follow each other using the metadata's follows?
predicate method.
follows?(metadata)
Returns
Boolean.
Parameters
Name | Description | Type |
---|---|---|
metadata | A metadata instance that may precede the metadata being inspected | Metadata |
The follows?
predicate method returns true
when the metadata's causation and provenance attributes match the metadata argument's message source attributes.
preceding_metadata = Metadata.new()
preceding_metadata.stream_name = 'someStream'
preceding_metadata.position = 11
preceding_metadata.global_position = 111
preceding_message.metadata.correlation_stream_name = 'someReplyStream'
preceding_metadata.reply_stream_name = 'someReplyStream'
metadata = Metadata.new()
metadata.follow(preceding_metadata)
metadata.follows?(preceding_metadata)
# => true
preceding_metadata.stream_name = `someOtherStream`
metadata.follows?(preceding_metadata)
# => false
Metadata precedence is determined as:
metadata.causation_message_stream_name == preceding_metadata.stream_name &&
metadata.causation_message_position == preceding_metadata.position &&
metadata.causation_message_global_position == preceding_metadata.global_position &&
metadata.correlation_stream_name == preceding_metadata.correlation_stream_name &&
metadata.reply_stream_name == preceding_metadata.reply_stream_name
However, the correlation_stream_name
attribute and the reply_stream_name
attribute is only a factor in determining precedence if their values are assigned on the preceding message metadata. If the preceding message metadata's correlation_stream_name
attribute or reply_stream_name
attribute is nil
, then it is not taken into consideration for message precedence. Therefore, the preceding message metadata's correlation_stream_name
attribute or reply_stream_name
attribute can be nil
while the following message's correlation_stream_name
attribute or reply_stream_name
attribute are set to a value, and the messages are considered precedent.
In addition, if both the preceding message metadata's stream_name
attribute and the following message metadata's causation_stream_name
attribute are both nil
the messages are not considered precedent. The same is true for the position
and causation_message_position
pair and the global_position
and causation_message_global_position
pair.
The implementation of the metadata's follows?
predicate method is the best resource for understanding the specifics of message precedence. The source code can be read at:
# Determining Whether a Reply is Required
It can be useful in handler logic to determine whether a message requires a reply message to be sent to its reply address.
reply?()
Returns
Boolean.
The reply?
predicate method returns true if the reply_stream_name
attribute has been assigned a stream name.
metadata = Metadata.new()
metadata.reply_stream_name = 'someReplyStream'
metadata.reply?
# => true
metadata.reply_stream_name = nil
metadata.reply?
# => false
For convenience, the metadata provides the clear_reply_stream_name
method to remove the reply_stream_name
from a metadata instance.
metadata = Metadata.new()
metadata.reply_stream_name = 'someReplyStream'
metadata.clear_reply_stream_name()
metadata.reply?
# => false
# Message Correlation
When coordinating workflows between services using Pub/Sub, it may be necessary to determine if an event published by another service pertains to an on going process in the originating service.
For example, a component that is responsible for accounting might be used by numerous other coordinating components. A funds transfer component will interact with the account component, but many other components will, as well, including payroll, bill payment, etc. Each of these coordinating services needs to know when an accounting transaction that was started by the coordinating service has completed.
A coordinating service will subscribe to the accounting service's events in order to detect transactions that pertain to it. But the coordinating service will receive events that pertain to transactions from all other coordinating services as well.
The metadata's correlation_stream_name
is the mechanism by which a subscriber can determine if the event being processed has originated from the subscriber's service.
When a message is sent to the afferent service from the coordinating service, the metadata's correlation_stream_name
is set to a value that indicates its origin.
metadata = Metadata.new()
metadata.correlation_stream_name = 'someStream-123'
Because the follow
method keeps the correlation_stream_name
with the metadata of all subsequent messages - even those from other services - the correlation_stream_name
will be present in the event metadata that the originating, coordinating service subscribes to.
# Determining Message Correlation
When a coordinating, originating service receives an event from an afferent service the coordinating service must determine whether the event should be processed or disregarded.
correlated?(stream_name)
Returns
Boolean.
Alias
correlates?
Parameters
Name | Description | Type |
---|---|---|
stream_name | The stream name to compare to the message metadata's correlation_stream_name | String |
metadata = Metadata.new()
metadata.correlation_stream_name = 'someStream-123'
metadata.correlated?('someStream-123')
# => true
metadata.correlated?('someStream-456')
# => false
If the value of the stream_name
argument is a category stream name and the correlation stream name is an entity stream name, only the categories will be compared.
metadata.correlated?('someStream')
# => true
# Message Identifier
The de facto unique identifier for a message is a combination of the message's stream name and the message's position number within that stream.
identifier()
Alias
source_message_identifier
Returns
String
The identifier is formatted as a URI fragment of the form stream_name/position
.
metadata = Messaging::Message::Metadata.new()
metadata.stream_name = 'someStream'
metadata.position = 11
metadata.identifier
# => "someStream/11"
# Causation Message Identifier
The unique identifier for a message's causation message is a combination of the causation message's stream name and the causation message's position number within that stream.
causation_identifier()
Alias
causation_message_identifier
Returns
String
The identifier is formatted as a URI fragment of the form causation_message_stream_name/causation_message_position
.
metadata = Messaging::Message::Metadata.new()
metadata.causation_message_stream_name = 'someCausationStream'
metadata.causation_message_position = 111
metadata.causation_message_identifier
# => "someCausationStream/111"
← Messages MessageData →