Message Store Database Interface

The message store provides an interface of Postgres server functions that you can access with any programming language, or the psql command line tool.

There are working examples of uses of the server functions included with the source code:

Example: https://github.com/eventide-project/message-store-postgres-database/blob/master/database

Write a Message

Write a JSON-formatted message to a named stream, optionally specifying JSON-formatted metadata and an expected version number.

write_message(
  _id varchar,
  _stream_name varchar,
  _type varchar,
  _data jsonb,
  _metadata jsonb DEFAULT NULL,
  _expected_version bigint DEFAULT NULL
)

Arguments

NameTypeDescriptionDefaultExample
_idvarcharUUID of the message being writtena5eb2a97-84d9-4ccf-8a56-7160338b11e2
_stream_namevarcharName of stream to which the message is writtensomeStream-123
_typevarcharThe type of the messageWithdrawn
_datajsonbJSON representation of the message body{"messageAttribute": "some value"}
_metadata (optional)jsonbJSON representation of the message metadataNULL{"metaDataAttribute": "some meta data value"}
_expected_version (optional)bigintVersion that the stream is expected to be when the message is writtenNULL11

Usage

SELECT write_message('uuid'::varchar, 'stream_name'::varchar, 'message_type'::varchar, '{"messageAttribute": "some value"}'::jsonb, '{"metaDataAttribute": "some meta data value"}'::jsonb);"

Example: https://github.com/eventide-project/message-store-postgres-database/blob/master/database/write-test-message.sh

Specifying the Expected Version of the Stream

SELECT write_message('uuid'::varchar, 'stream_name'::varchar, 'message_type'::varchar, '{"messageAttribute": "some value"}'::jsonb, '{"metaDataAttribute": "some meta data value"}'::jsonb, expected_version::bigint);"

NOTE: If the expected version does not match the stream version at the time of the write, an error is raised of the form:

'Wrong expected version: % (Stream: %, Stream Version: %)'

Example (no expected version error): https://github.com/eventide-project/message-store-postgres-database/blob/master/test/write-message-expected-version.sh

Example (with expected version error): https://github.com/eventide-project/message-store-postgres-database/blob/master/test/write-message-expected-version-error.sh

Get Messages from a Stream

Retrieve messages from a single stream, optionally specifying the starting position, the number of messages to retrieve, and an additional condition that will be appended to the SQL command's WHERE clause.

get_stream_messages(
  _stream_name varchar,
  _position bigint DEFAULT 0,
  _batch_size bigint DEFAULT 1000,
  _condition varchar DEFAULT NULL
)

Arguments

NameTypeDescriptionDefaultExample
_stream_namevarcharName of stream to retrieve messages fromsomeStream-123
_position (optional)bigintStarting position of the messages to retrieve011
_batch_size (optional)bigintNumber of messages to retrieve1000111
_condition (optional)varcharWHERE clause fragmentNULLmessages.time >= current_timestamp

Usage

SELECT * FROM get_stream_messages('stream_name'::varchar, starting_position::bigint, batch_size::bigint, _condition => 'messages.time >= current_timestamp'::varchar);"

Example: https://github.com/eventide-project/message-store-postgres-database/blob/master/test/get-stream-messages.sh

Get Messages from a Stream Category

Retrieve messages from a category or streams, optionally specifying the starting position, the number of messages to retrieve, and an additional condition that will be appended to the SQL command's WHERE clause.

CREATE OR REPLACE FUNCTION get_category_messages(
  _category_name varchar,
  _position bigint DEFAULT 0,
  _batch_size bigint DEFAULT 1000,
  _condition varchar DEFAULT NULL
)

Arguments

NameTypeDescriptionDefaultExample
_category_namevarcharName of the category to retrieve messages fromsomeStream
_position (optional)bigintStarting position of the messages to retrieve011
_batch_size (optional)bigintNumber of messages to retrieve1000111
_condition (optional)varcharWHERE clause fragmentNULLmessages.time >= current_timestamp

Usage

SELECT * FROM get_category_messages('cateogry_name'::varchar, starting_position::bigint, batch_size::bigint, _condition => 'messages.time >= current_timestamp'::varchar);"

TIP

Where someThing-123 is a stream name, someThing is a category. Reading the someThing category retrieves messages from all streams whose names start with someThing-.

Example: https://github.com/eventide-project/message-store-postgres-database/blob/master/test/get-category-messages.sh

Get Last Message from a Stream

Retrieve the last message in a stream.

get_last_message(
  _stream_name varchar
)

Arguments

NameTypeDescriptionDefaultExample
_stream_namevarcharName of the stream to retrieve messages fromsomeStream-123

Usage

SELECT * FROM get_last_message('stream_name'::varchar)

Note: This is only for entity streams, and does not work for categories.

Example: https://github.com/eventide-project/message-store-postgres-database/blob/master/test/get-last-message.sh