# Server Functions
The message store provides an interface of Postgres server functions that can be used with any programming language or through the psql
command line tool.
View the source code: https://github.com/message-db/message-db/blob/master/database (opens new window)
# Interface
- write_message
- get_stream_messages
- get_category_messages
- get_last_stream_message
- stream_version
- id
- cardinal_id
- category
- is_category
- acquire_lock
- hash_64
- message_store_version
# Write a Message
Write a JSON-formatted message to a named stream, optionally specifying JSON-formatted metadata and an expected version number.
write_message(
id varchar,
stream_name varchar,
type varchar,
data jsonb,
metadata jsonb DEFAULT NULL,
expected_version bigint DEFAULT NULL
)
# Returns
Position of the message written.
# Arguments
Name | Description | Type | Default | Example |
---|---|---|---|---|
id | UUID of the message being written | varchar | a5eb2a97-84d9-4ccf-8a56-7160338b11e2 | |
stream_name | Name of stream to which the message is written | varchar | someStream-123 | |
type | The type of the message | varchar | Withdrawn | |
data | JSON representation of the message body | jsonb | {"someAttribute": "some value"} | |
metadata (optional) | JSON representation of the message metadata | jsonb | NULL | {"metadataAttribute": "some meta data value"} |
expected_version (optional) | Version that the stream is expected to be when the message is written | bigint | NULL | 11 |
# Usage
SELECT write_message('a11e9022-e741-4450-bf9c-c4cc5ddb6ea3', 'someStream-123', 'SomeMessageType', '{"someAttribute": "some value"}', '{"metadataAttribute": "some meta data value"}');
-[ RECORD 1 ]-+--
write_message | 0
# Specifying the Expected Version of the Stream
The expected_version argument is as an optimistic concurrency protection. It can also be used to assure that a message written to a stream is the first message in the stream.
A more detailed explanation of expected_version
and optimistic concurrency control can be found in Eventide's message writer user guide.
SELECT write_message('a11e9022-e741-4450-bf9c-c4cc5ddb6ea3', 'someStream-123', 'SomeMessageType', '{"someAttribute": "some value"}', '{"metadataAttribute": "some meta data value"}', 11);
If the expected version does not match the stream version at the time of the write, an error is raised:
'Wrong expected version: {specified_stream_version} (Stream: {stream_name}, Stream Version: {current_stream_version})'
Example (no expected version error): https://github.com/message-db/message-db/blob/master/test/write-message/expected-version.sh (opens new window)
Example (with expected version error): https://github.com/message-db/message-db/blob/master/test/write-message/expected-version-error.sh (opens new window)
# Writing Batches of Messages
Writing batches of messages to a stream isn't directly supported by a single call to the write_message
function. However, because Postgres supports atomic writes using database transactions, batch writes is ultimately supported using Postgres transactions.
To write multiple messages to a stream in a batch, start a Postgres transaction and issue multiple calls to the write_messages
function.
WARNING
Transactions should only be used for writes to the same stream. While it is technically possible to write to multiple streams using a Postgres transaction, doing so is ultimately a violation of event sourcing patterns and is strongly discouraged.
For more on Postgres transactions, see: https://www.postgresql.org/docs/current/tutorial-transactions.html (opens new window)
The Message DB server functions do not set or otherwise change the default transaction isolation level configured for the Postgres server. Fore more on Postgres isolation levels, see: https://www.postgresql.org/docs/current/transaction-iso.html (opens new window)
# Get Messages from a Stream
Retrieve messages from a single stream, optionally specifying the starting position, the number of messages to retrieve, and an additional condition that will be appended to the SQL command's WHERE clause.
get_stream_messages(
stream_name varchar,
position bigint DEFAULT 0,
batch_size bigint DEFAULT 1000,
condition varchar DEFAULT NULL
)
# Arguments
Name | Description | Type | Default | Example |
---|---|---|---|---|
stream_name | Name of stream to retrieve messages from | varchar | someStream-123 | |
position (optional) | Starting position of the messages to retrieve | bigint | 0 | 11 |
batch_size (optional) | Number of messages to retrieve | bigint | 1000 | 111 |
condition (optional) | SQL condition to filter the batch by | varchar | NULL | 'messages.time::time >= current_time' |
# Usage
SELECT * FROM get_stream_messages('someStream-123', 0, 1000, condition => 'messages.time::time >= current_time');
-[ RECORD 1 ]---+---------------------------------------------------------
id | 4b96f09e-104a-4b1f-b198-5b3b46cf1d06
stream_name | someStream-123
type | SomeType
position | 0
global_position | 1
data | {"attribute": "some value"}
metadata | {"metaAttribute": "some meta value"}
time | 2019-11-24 17:56:09.71594
-[ RECORD 2 ]---+---------------------------------------------------------
id | d94e79e3-cdda-49a3-9aad-ce5d70a5edd7
stream_name | someStream-123
type | SomeType
position | 1
global_position | 2
data | {"attribute": "some value"}
metadata | {"metaAttribute": "some meta value"}
time | 2019-11-24 17:56:09.75969
# Get Messages from a Category
Retrieve messages from a category of streams, optionally specifying the starting position, the number of messages to retrieve, the correlation category for Pub/Sub, consumer group parameters, and an additional condition that will be appended to the SQL command's WHERE clause.
get_category_messages(
category_name varchar,
position bigint DEFAULT 0,
batch_size bigint DEFAULT 1000,
correlation varchar DEFAULT NULL,
consumer_group_member bigint DEFAULT NULL,
consumer_group_size bigint DEFAULT NULL,
condition varchar DEFAULT NULL
)
# Arguments
Name | Description | Type | Default | Example |
---|---|---|---|---|
category_name | Name of the category to retrieve messages from | varchar | someCategory | |
position (optional) | Global position to start retrieving messages from | bigint | 0 | 11 |
batch_size (optional) | Number of messages to retrieve | bigint | 1000 | 111 |
correlation (optional) | Category or stream name recorded in message metadata's correlationStreamName attribute to filter the batch by | varchar | NULL | someCorrelationCategory |
consumer_group_member (optional) | The zero-based member number of an individual consumer that is participating in a consumer group | bigint | NULL | 1 |
consumer_group_size (optional) | The size of a group of consumers that are cooperatively processing a single category | bigint | NULL | 2 |
condition (optional) | SQL condition to filter the batch by | varchar | NULL | 'messages.time::time >= current_time' |
# Usage
SELECT * FROM get_category_messages('someCategory', 1, 1000, correlation => 'someCorrelationCategory', consumer_group_member => 1, consumer_group_size => 2, condition => 'messages.time >= current_time');
-[ RECORD 1 ]---+---------------------------------------------------------
id | 28d8347f-677e-4738-b6b9-954f1b15463b
stream_name | someCategory-123
type | SomeType
position | 0
global_position | 111
data | {"attribute": "some value"}
metadata | {"correlationStreamName": "someCorrelationCategory-123"}
time | 2019-11-24 17:51:49.836341
-[ RECORD 2 ]---+---------------------------------------------------------
id | 57894da7-680b-4483-825c-732dcf873e93
stream_name | someCategory-456
type | SomeType
position | 1
global_position | 1111
data | {"attribute": "some value"}
metadata | {"correlationStreamName": "someCorrelationCategory-123"}
time | 2019-11-24 17:51:49.879011
TIP
Where someStream-123
is a stream name, someStream
is a category. Reading the someStream
category retrieves messages from all streams whose names start with someStream
and are followed by an ID, or where someStream
is the whole stream name.
# Pub/Sub and Retrieving Correlated Messages
The principle use of the correlation
parameter is to implement Pub/Sub.
The correlation
parameter filters the retrieved batch based on the content of message metadata's correlationStreamName
attribute. The correlation stream name is like a return address. It's a way to give the message some information about the component that the message originated from. This information is carried from message to message in a workflow until it ultimately returns to the originating component.
WARNING
Pub/Sub and correlation works only with the retrieval of messages from a category. An error will occur if the correlation
argument is sent to a retrieval of a stream rather than a category.
The correlationStreamName
attribute allows a component to tag an outbound message with its origin. And then later, the originating component can subscribe to other components' events that carry the origin metadata.
Before the source component sends the message to the receiving component, the source component assigns it's own stream name to the message metadata's correlationStreamName
attribute. That attribute is carried from message to message through messaging workflows.
SELECT write_message('a11e9022-e741-4450-bf9c-c4cc5ddb6ea3', 'otherComponent-123', 'SomeMessageType', '{"someAttribute": "some value"}', '{"correlationStreamName": "thisComponent-789"}');
SELECT * FROM get_category_messages('otherComponent', correlation => 'thisComponent');
For more details on pub/sub using the correlation stream, see the pub/sub topic in the consumers user guide.
# Consumer Groups
Consumers processing a single category can be operated in parallel in a consumer group. Consumer groups provide a means of scaling horizontally to distribute the processing load of a single category amongst a number of consumers.
Consumers operating in consumer groups process a single category, with each consumer in the group processing messages that are not processed by any other consumer in the group.
WARNING
Consumer groups work only with the retrieval of messages from a category. An error will occur if consumer group arguments are sent to a retrieval of a stream rather than a category.
Specify both the consumer_group_member
argument and the consumer_group_size
argument to retrieve a batch of messages for a specific member of a user group. The consumer_group_size
argument specifies the total number of consumers participating in the group. The consumer_group_member
argument specifies the unique ordinal ID of a consumer. A consumer group with three members will have a group_size
of 3, and will have members with group_member
numbers 0
, 1
, and 2
.
SELECT * FROM get_category_messages('otherComponent', consumer_group_member => 0, consumer_group_size => 3);
Consumer groups ensure that any given stream is processed by a single consumer, and that the consumer processing the stream is always the same consumer. This is achieved by the consistent hashing of a message's stream name.
A stream name's cardinal ID is hashed to a 64-bit integer, and the modulo of that number by the consumer group size yields a consumer group member number that will consistently process that stream name.
Specifying values for the consumer_group_size
and consumer_group_member
consumer causes the query for messages to include a condition that is based on the hash of the stream name, the modulo of the group size, and the consumer member number.
WHERE @hash_64(cardinal_id(stream_name)) % {group_size} = {group_member}
# Filtering Messages with a SQL Condition
The condition
parameter receives an arbitrary SQL condition which further filters the messages retrieved.
The condition
parameter is supported for both stream retrieval and category retrieval.
SELECT * FROM get_stream_messages('someStream-123', condition => 'extract(month from messages.time) = extract(month from now())');
SELECT * FROM get_category_messages('someStream', condition => 'extract(month from messages.time) = extract(month from now())');
Any valid Postgres SQL condition can be used, even JSON functions and operators.
SELECT * FROM get_stream_messages('someStream-123', condition => 'data->>''amount'' > 0');
SELECT * FROM get_category_messages('someStream', condition => 'data->>''amount'' > 0');
For more details on using Postgres JSON functions and operators, see:
https://www.postgresql.org/docs/current/functions-json.html (opens new window)
WARNING
The SQL condition feature is deactivated by default. The feature is activated using the message_store.sql_condition
Postgres configuration option: message_store.sql_condition=on
. Using the feature without activating the configuration option will result in an error. See the PostgreSQL documentation for more on configuration options: https://www.postgresql.org/docs/current/config-setting.html (opens new window)
DANGER
Activating the SQL condition feature may expose the message store to unforeseen security risks. Before activating this condition, be certain that access to the message store is appropriately protected.
# Retrieving All Messages in a Single Batch
Typically, the number of messages to retrieve is specified using the batch_size
parameter. If omitted, the default value of the batch size is 1000.
There is a special value that deactivates the limit on the number of messages returned, and results in all messages in a stream or category being returned.
To retrieve all messages in a stream or category, use the special value of -1
as the batch size.
This works for both the get_stream_messages
function and the get_category_messages
function.
SELECT * FROM get_stream_messages('someStream-123', batch_size => -1);
SELECT * FROM get_category_messages('someCategory', batch_size => -1);
# Get Last Message from a Stream
get_last_stream_message(
stream_name varchar,
type varchar DEFAULT NULL
)
# Returns
Row from the messages table that corresponds to the highest position number in the stream, and (optionally) corresponds to the message type specified by the type
parameter.
# Arguments
Name | Description | Type | Default | Example |
---|---|---|---|---|
stream_name | Name of the stream to retrieve messages from | varchar | someStream-123 | |
type (optional) | Message type to filter by | varchar | NULL | SomeType |
Note: The type
argument is supported in Message DB from v1.3.0.
# Usage
SELECT * FROM get_last_stream_message('someStream-123', type => 'SomeType');
-[ RECORD 1 ]---+---------------------------------------------------------
id | 03e38825-b106-44f9-8b40-a2b8037b98d8
stream_name | someStream-123
type | SomeType
position | 11
global_position | 111
data | {"attribute": "some value"}
metadata | {"metaAttribute": "some meta value"}
time | 2019-11-24 17:46:43.608025
Note: This function works only with entity streams, and does not work with category streams.
# Get Stream Version from a Stream
stream_version(
stream_name varchar
)
# Returns
Highest position number in the stream.
# Arguments
Name | Description | Type | Default | Example |
---|---|---|---|---|
stream_name | Name of the stream to retrieve the stream version from | varchar | someStream-123 |
# Usage
SELECT * FROM stream_version('someStream-123');
-[ RECORD 1 ]--+---
stream_version | 11
Note: This function works only with entity streams, and does not work with category streams.
# Get the ID from a Stream Name
id(
stream_name varchar
)
# Returns
The ID part of the stream name.
# Arguments
Name | Description | Type | Default | Example |
---|---|---|---|---|
stream_name | Name of the stream to parse the ID from | varchar | someStream-123 |
# Usage
SELECT * FROM id('someStream-123');
-[ RECORD 1 ]
id | 123
Example: https://github.com/message-db/message-db/blob/master/test/id/stream-name.sh (opens new window)
# Get the Cardinal ID from a Stream Name
cardinal_id(
stream_name varchar
)
# Returns
The cardinal ID part of the stream name.
# Arguments
Name | Description | Type | Default | Example |
---|---|---|---|---|
stream_name | Name of the stream to parse the cardinal ID from | varchar | someStream-123 |
# Usage
SELECT * FROM cardinal_id('someStream-123+abc');
-[ RECORD 1 ]----
cardinal_id | 123
# Get the Category from a Stream Name
category(
stream_name varchar
)
# Returns
The category part of the stream name.
# Arguments
Name | Description | Type | Default | Example |
---|---|---|---|---|
stream_name | Name of the stream to parse the category from | varchar | someStream-123 |
# Usage
SELECT * FROM category('someStream-123');
-[ RECORD 1 ]--------
category | someStream
Example: https://github.com/message-db/message-db/blob/master/test/category/stream-name.sh (opens new window)
# Determine Whether a Stream Name is a Category
is_category(
stream_name varchar
)
# Returns
Boolean affirmative if the stream name is a category.
# Arguments
Name | Description | Type | Default | Example |
---|---|---|---|---|
stream_name | Name of the stream to determine whether it's a category | varchar | someStream-123 |
# Usage
SELECT * FROM is_category('someCategory');
-[ RECORD 1 ]--
is_category | t
# Acquire a Lock for a Stream Name
An exclusive, transaction-level advisory lock (opens new window) is acquired when a message is written to the stream. The advisory lock ensures that writes are processed sequentially.
The lock ID is derived from the category name of the stream being written to. The result of which is that all writes to streams in a given category are queued and processed in sequence. This ensures that write of a message to a stream does not complete after a consumer has already proceeded past its position.
acquire_lock(
stream_name varchar
)
# Returns
Integer representing the lock ID.
# Arguments
Name | Description | Type | Default | Example |
---|---|---|---|---|
stream_name | Name of the stream to acquire the lock for | varchar | someStream-123 |
# Usage
SELECT acquire_lock('someStream-123');
-[ RECORD 1 ]+--------------------
acquire_lock | 2053039834977696644
Example: https://github.com/message-db/message-db/blob/master/test/acquire-lock.sh (opens new window)
# Calculate a 64-Bit Hash for a Stream Name
The lock ID generated to acquire an exclusive advisory lock is a hash calculated based on the stream name.
hash_64(
value varchar
)
# Returns
Integer representing the lock ID.
# Arguments
Name | Description | Type | Default | Example |
---|---|---|---|---|
value | Text value to generate integer hash from | varchar | someStream-123 |
# Usage
SELECT hash_64('someStream');
-[ RECORD 1 ]----------------
hash_64 | 2053039834977696644
Example: https://github.com/message-db/message-db/blob/master/test/hash-64.sh (opens new window)
# Get Message Store Database Schema Version
message_store_version()
# Returns
The version number of the message store database.
# Usage
SELECT message_store_version();
message_store_version
-----------------------
1.2.3
The version number will change when the database schema changes. A database schema change could be a change to the messages
table structure, changes to Postgres server functions, types, indexes, users, or permissions. The version number follows the SemVer (opens new window) scheme for the last three numbers in the version (the first number is the product generation, and implies a major version change).
# Debugging Output
The message store's server functions will print parameter values, and any generated SQL code, to the standard I/O of the client process.
Debugging output can be enabled for all server functions, or for the get functions and the write function separately.
# message_store.debug_get
The debug_get
setting controls debug output for the retrieval functions, including get_stream_messages
, get_category_messages
, and get_last_message
.
Assign the value on
to the setting to enable debug output.
message_store.debug_get=on
# message_store.debug_write
The debug_write
setting controls debug output for the write function, write_message
.
Assign the value on
to the setting to enable debug output.
message_store.debug_write=on
# message_store.debug
The debug
setting controls debug output for the get functions and the write function.
Assign the value on
to the setting to enable debug output.
message_store.debug=on
# Enabling Debug Output Using a Postgres Environment Variable
The debugging output configuration settings can be enabled in a terminal session using the PGOPTIONS
environment variable.
PGOPTIONS="-c message_store.debug=on"
# Enabling Debug Output Using the Postgres Configuration File
The debugging output configuration settings can be set using PostgresSQL's configuration file.
The file system location of the configuration file can be displayed at the command line using the psql
tool.
psql -c 'show config_file'
# More on Postgres Configuration
See the PostgreSQL documentation for more configuration options:
https://www.postgresql.org/docs/current/config-setting.html (opens new window)