# Entity Store
An entity store is the primary means of retrieving entities from the message store database.
Entity data is stored as events in an event stream. Each entity has it's own stream. When an entity is "retrieved", its events are applied to the entity by a projection.
An entity store combines a projection, a reader, an temporary in-memory cache, and an durable on-disk cache to provide a simple and efficient interface for retrieving entities.
An entity's event stream is read by the reader specified by the reader
macro, and projected by the projection specified by the projection
macro. The entity returned from a retrieval operation is an instance of the class specified by the entity
macro. The category
macro declares the category of the stream name that will be read when a retrieval operation is invoked.
To avoid the cost of projecting every event in an entity's stream when an entity is retrieved, the entity is cached when it is retrieved. The next time an entity is retrieved, only events that have been recorded since the last time the entity was retrieved are read and projected.
To avoid the cost of projecting all of an entity's events when the entity is not in the cache (as when a service has just been started), an entity is periodically persisted to disk in a snapshot stream. If an entity is retrieved and there's no cache entry for it, the latest snapshot will be retrieved and cached before the latest events are read and projected.
# Example
class Store
include EntityStore
entity Account
category :account
projection Projection
reader MessageStore::Postgres::Read, batch_size: 1000
# Snapshotting is optional and can be omitted
snapshot EntitySnapshot::Postgres, interval: 100
end
store = Store.build
deposited = Deposited.new()
deposited.account_id = '123'
deposited.amount = 11
Messaging::Postgres::Write.(deposited, 'account-123')
account, version = store.fetch('123', include: :version)
account.balance
# => 11
version
# => 0
withdrawn = Withdrawn.new()
withdrawn.account_id = '123'
withdrawn.amount = 1
Messaging::Postgres::Write.(withdrawn, 'account-123')
deposited = Deposited.new()
deposited.account_id = '123'
deposited.amount = 111
Messaging::Postgres::Write.(deposited, 'account-123')
account, version = store.fetch('123', include: :version)
account.balance
# => 121
version
# => 2
# Entity Store Facts
- An entity store projects an entity's events when it is retrieved
- The entity store caches the retrieved entity
- Only events recorded since the last cached retrieval are projected
- An entity is retrieved by its ID
- An entity store can only be used to retrieve the entity class and category it's defined for
- An entity store's cache is only cleared when its service is restarted
- A store will optionally write snapshots of an entity on a specified interval
# EntityStore Module
A class becomes a store by including the EntityStore
module from the EntityStore
library and namespace.
The EntityStore
module affords the receiver with:
- The
fetch
method for retrieving an entity by its ID (does not return nil) - The
get
method for retrieving an entity by its ID (may return nil) - The
get_version
method for retrieving just the stream's current version - The
entity
macro for declaring the class of the entity that the store manages - The
category
macro for declaring the stream category used to compose the entity's stream name - The
projection
macro for declaring the projection class that the store uses - The
reader
macro for declaring the platform-specific reader implementation to be used by the store - The
snapshot
macro for declaring the snapshot implementation to be used by the store
# Retrieving an Entity
Entities can be retrieved in one of two ways:
- Via the
fetch
method - Via the
get
method
The significant difference between the fetch
and get
methods is the return value when a non-existent entity is attempted to be retrieved. The fetch
method will return a newly-constructed instance of the store's declared entity class. The get
method will return a nil
.
It's more common in handler business logic to use the fetch
method so that the returned entity does not have to be checked for a nil
value before being used.
# Fetch
fetch(id, include: nil)
Alias
project
Returns
Instance of the store's entity class with the identified stream's data projected onto it.
If the optional include
argument is specified, data from the entity's cache record can be returned as well.
Note: The fetch
method never returns a nil
when the entity retrieved does not exist (ie: There's no entity stream with the entity's ID). When the entity does not exist, rather than returning a nil
, the fetch
method will return a newly-constructed instance of the store's declared entity class.
Parameters
Name | Description | Type |
---|---|---|
id | The ID of the entity to retrieve | String |
include | List of cache record attributes to retrieve | Symbol or Array |
# Entities Are Always Cached Before They Are Returned
Once the retrieval operation has completed reading and projecting the entity's events, the entity is inserted into the store's cache.
If there isn't a cache record already in the cache for the entity, a new cache record will be created. If the cache does have a cache record for the entity, that record is updated.
For more details on caching, see the entity cache user guide
# Including Cache Record Data in the Returned Values
The include
named parameter returns selected data from the entity's cache record along with the entity.
It's important to note that the cache record is updated before any data is returned.
The most common use of the include
parameter is to retrieve the entity's version along with the entity.
entity, version = store.fetch(some_id, include: :version)
Any number of cache record attributes can be returned by specifying a list of cache record attribute names.
entity, version, time = store.fetch(some_id, include: :version, :time)
# Cache Record Attributes That Can Be Included
Name | Description | Type |
---|---|---|
id | The ID of the cached entity | String |
entity | The cached entity itself | Object |
version | The stream version of the cached entity at the time it was cached | Integer |
time | The UTC time that the entity was cached | Time |
persisted_version | The version of the entity that may have been persisted as a snapshot | Integer |
persisted_time | The UTC time that the entity may have been persisted as a snapshot | Time |
# The no_stream Stream Version
When an attempt is made to retrieve an entity that does not exist, the symbol :no_stream
is returned as the value of version
.
entity, version = store.fetch(some_non_existant_id, include: :version)
version
# => :no_stream
The :no_stream
symbol is equivalent to a stream version of -1
.
# Retrieval Stream Name
The category
macro declares the category of the stream name that will be read when a retrieval operation is invoked.
When an entity whose ID is 123
is retrieved from the store, the store's category is concatenated with the ID to form the event stream name to read.
class SomeStore
include EntityStore
category :some_entity
# ...
end
store = SomeStore.build
# The store's reader reads from the stream "someEntity-123"
some_entity = store.fetch('123')
store.stream_name
# => "someEntity-123"
# Get
get(id, include: nil)
The get
method is almost identical to the fetch
method.
The significant difference between the get
and fetch
methods is the return value when a non-existent entity is attempted to be retrieved. The get
method will return a nil
. Whereas the fetch
method will return a newly-constructed instance of the store's declared entity class.
The get
method isn't typically a good choice in handler business logic, as it will require a nil
check for each entity returned from the store due to the possibility that an entity returned from get
may be nil
# Retrieving an Entity's Version
get_version(id)
Returns
Stream version of the stream identified by the id
argument and the store's declared category.
Parameters
Name | Description | Type |
---|---|---|
id | The ID of the entity whose version to retrieve | String |
Note: Retrieving the version will cause a normal reading and projecting of recent, unprojected events and an update of the cache with the results.
# Retrieval Workflow
The following is done by the store and its caches when an entity is retrieved:
- Retrieval by ID is actuated by using either the store's
fetch
,get
, orget_version
method. - The in-memory, internal cache is checked for a cache record for the entity ID.
- If the cache record is found, the entity and the version number are retrieved from the cache.
- A stream reader retrieves and enumerates any events that may have been written since the entity was last cached, based on the version number that is cached with the entity.
- Any new events are projected on to the entity.
- The cache is updated with the updated entity and the updated entity version.
- The entity is returned to the caller.
# When There Is No Cache Record
When there is no cache record found in step #2 (above), the following alternative flow happens:
- Retrieval by ID is actuated by using either the store's
fetch
,get
, orget_version
method. - The in-memory, internal cache is checked for a cache record for the entity ID.
- No cache record is found.
- If the store's snapshotting is configured, the entity's snapshot will be retrieved from the external snapshot storage.
- The retrieved snapshot and the entity version of the snapshot is inserted into the internal, in-memory cache.
- A stream reader retrieves and enumerates any events that may have been written since the entity was last cached, based on the version number that is cached with the entity.
- Any new events are projected on to the entity.
- The cache is updated with the updated entity and the updated entity version.
- The entity is returned to the caller.
# When There Is No Cache Record and No Snapshot
When neither a cache record is in the internal, in-memory cache and there is no snapshot stored (or if the store's optional snapshotting is not configured), the following flow happens:
- Retrieval by ID is actuated by using either the store's
fetch
,get
, orget_version
method. - The in-memory, internal cache is checked for a cache record for the entity ID.
- No cache record is found.
- If the store's snapshotting is configured, an attempt to retrieve the entity's snapshot is made.
- There is no snapshot found.
- A stream reader retrieves and enumerates any events that may have been written since the entity was last cached, based on the version number that is cached with the entity.
- Any new events are projected on to the entity.
- The cache is updated with the updated entity and the updated entity version.
- The entity is returned to the caller.
# Defining an Entity Store
An entity store requires the declaration of:
- The store's entity class
- The stream category of the store's entity streams
- The projection class that the store uses to give an entity its data
- The reader used by the store to retrieve events that will be dispatched to the store's projection
class SomeStore
include EntityStore
entity SomeEntity
category :some_entity
projection SomeProjection
reader MessageStore::Postgres::Read
end
# The entity Macro
self.entity(entity_class)
Parameters
Name | Description | Type |
---|---|---|
entity_class | Class of the entity that the store retrieves and caches instances of | Class |
The macro defines the entity_class
instance method on the entity store that returns the class passed to the macro.
some_store.entity_class
# => SomeEntity
The macro defines the entity_class
class method on the entity store class that returns the class passed to the macro.
SomeStore.entity_class
# => SomeEntity
# The category Macro
self.category(category)
Parameters
Name | Description | Type |
---|---|---|
category | Name of the category that the store's entities are retrieved from | Symbol or String |
The macro defines the category
instance method on the entity store that returns the normalized category name.
some_store.category
# => "someEntity"
Note: The category name that is specified via the category macro will be normalized to the canonical _camel-cased_ form. If the input is the symbol :some_entity
, it will be normalized to the string someEntity
. If the input is a camel-cased string, it is not normalized.
# The projection Macro
self.projection(projection_class)
Parameters
Name | Description | Type |
---|---|---|
projection_class | Class of the projection that the store uses to project state onto an entity | Class |
The macro defines the projection_class
instance method on the entity store that returns the class passed to the macro.
some_store.projection_class
# => SomeProjection
# The reader Macro
self.reader(reader_class, batch_size: 1000)
Parameters
Name | Description | Type |
---|---|---|
reader_class | Class of the reader that the store uses to read the event stream of the entity being retrieved | Class |
batch_size | Number of events at-a-time to be read from the message store | Integer |
The macro defines the reader_class
instance method on the entity store that returns the class passed to the macro.
some_store.reader_class
# => MessageStore::Postgres::Read
The macro also defines the reader_batch_size
instance method on the entity store that returns the batch size passed to the macro.
some_store.reader_batch_size
# => 1000
# Incomplete Store Definition
The definition of a store must specify all of the mandatory properties required for the operation of the store.
A store's definition must include declarations for:
- The entity class
- The category
- The projection class
- The reader class
If any of these are missing, the EntityStore::Error
class is raised while the store class is evaluated.
# Optional Snapshotting
# The snapshot Macro
The snapshot macro is optional and can be omitted.
self.snapshot(snapshot_class, interval: nil)
Parameters
Name | Description | Type |
---|---|---|
snapshot_class | Class of the snapshot writer that the store uses to periodically snapshot the state of an entity | Class |
interval | Interval, in messages read and projected, in which the entity state is written to its snapshot stream | Integer |
The macro defines the snapshot_class
instance method and the snapshot_interval
instance method on the entity store that returns the values passed to the macros.
class SomeStore
include EntityStore
# ...
snapshot EntitySnapshot::Postgres, interval: 100
end
TIP
For more information about snapshotting, see the snapshotting user guide.
# Deleting Cache Records from the Store's Internal Cache
delete_cache_record(id)
Returns
Returns the cache record that corresponds to the ID, or nil
if there is no cache record for the ID.
Parameters
Name | Description | Type |
---|---|---|
id | The entity ID of the cache record being deleted | String |
Note: Deleting the cache record from the store's internal cache has no effect on any entity snapshots that have been recorded. If snapshots have been recorded, deleting the cache record from the internal cache does not delete the snapshot from the external snapshot store.
# Constructing Entity Stores
Entity stores can be constructed in one of two ways:
- Via the constructor
- Via the initializer
# Via the Constructor
self.build(snapshot_interval: nil, session: nil)
Returns
Instance of the class that includes the EntityStore
module.
Parameters
Name | Description | Type |
---|---|---|
snapshot_interval | Optional interval in number of messages after which a snapshot is persisted if a snapshot writer is configured | Integer |
session | Optionally, an existing session object to use, rather than allowing the store to create a new session | MessageStore::Postgres::Session |
# Via the Initializer
self.initialize()
Returns
Instance of the class that includes the EntityProjection
module.
By constructing a store using the initializer, the store's dependencies are not set to operational dependencies. They remain inert substitutes.
TIP
See the useful objects user guide for background on inert substitutes.
# Log Tags
The following tags are applied to log messages recorded by an entity store:
Tag | Description |
---|---|
entity_store | Applied to all log messages recorded by an entity store |
The following tags may be applied to log messages recorded by an entity store:
Tag | Description |
---|---|
fetch | Applied to log messages recorded while fetching an entity |
get | Applied to log messages recorded while getting an entity |
refresh | Applied to log messages recorded while refreshing an entity |
entity | Applied to log messages that record the data content of an entity |
data | Applied to log messages that record the data content of an entity |
See the logging user guide for more on log tags.