AWS Storage & Scheduler
The @omega-flow/store-aws package provides production-ready AWS implementations of the engine's three pluggable interfaces:
| Class | Implements | Backed by |
|---|---|---|
DynamoDBWorkflowStore | WorkflowStore | DynamoDB |
DynamoDBWorkflowMemory | WorkflowMemory | DynamoDB |
EventBusWorkflowScheduler | WorkflowScheduler | EventBridge Scheduler |
These are drop-in replacements for the InMemory* implementations used in development.
Installation
pnpm add @omega-flow/store-aws @omega-flow/engine @omega-flow/typesThe package depends on the AWS SDK v3 clients — they are included as regular dependencies.
Quick Start
import { DynamoDBClient } from "@aws-sdk/client-dynamodb";
import { SchedulerClient } from "@aws-sdk/client-scheduler";
import { WorkflowManager, defaultNodeModels } from "@omega-flow/engine";
import {
DynamoDBWorkflowStore,
DynamoDBWorkflowMemory,
EventBusWorkflowScheduler,
} from "@omega-flow/store-aws";
const ddb = new DynamoDBClient({ region: "eu-west-1" });
const manager = new WorkflowManager({
workflowStore: new DynamoDBWorkflowStore({
client: ddb,
tableName: "omega-workflows",
}),
workflowMemory: new DynamoDBWorkflowMemory({
client: ddb,
tableName: "omega-contexts",
}),
workflowScheduler: new EventBusWorkflowScheduler({
client: new SchedulerClient({ region: "eu-west-1" }),
eventBusArn: "arn:aws:events:eu-west-1:123456789012:event-bus/omega",
roleArn: "arn:aws:iam::123456789012:role/OmegaSchedulerRole",
}),
nodeModels: defaultNodeModels,
eventExtractor: (event) => ["default", event.data.userId],
});Required DynamoDB Tables
You need to create two DynamoDB tables in your AWS account — one for workflow definitions and one for execution contexts. Both use on-demand billing and a composite primary key (partition key + sort key). The exact table names are up to you — pass them via the tableName config option.
Workflows Table
Stores workflow definitions. Each domain (tenant) can have many workflows.
| Attribute | Type | Key |
|---|---|---|
domain | String (S) | Partition key |
workflowId | String (S) | Sort key |
Contexts Table
Stores workflow execution state. Each context key (domain + workflow + subject combination) can have multiple instances.
| Attribute | Type | Key |
|---|---|---|
contextKey | String (S) | Partition key |
instanceId | String (S) | Sort key |
domain | String (S) | GSI partition key |
subjectId | String (S) | GSI sort key |
The contexts table requires one Global Secondary Index:
| Index name | Partition key | Sort key | Projection |
|---|---|---|---|
domain-subjectId-index | domain (S) | subjectId (S) | ALL |
This GSI enables listing all contexts for a domain (getAllContexts) and for a specific subject (getAllContextsForSubject).
TIP
Both tables use PAY_PER_REQUEST billing mode — you only pay for what you use. The GSI index name is configurable via the gsiName config option (defaults to domain-subjectId-index).
DynamoDBWorkflowStore
Stores workflow definitions in DynamoDB.
Config
interface DynamoDBWorkflowStoreConfig {
client: DynamoDBClient;
tableName: string;
}Table Schema
| Attribute | Key | Description |
|---|---|---|
domain | Partition key (S) | Tenant identifier — e.g. an organization ID, company ID, or a fixed string like "default" for single-tenant setups. Maps to the domain parameter used throughout the engine. |
workflowId | Sort key (S) | Workflow ID |
data | — | Full Workflow JSON |
createdAt | — | Epoch ms, set on first write |
updatedAt | — | Epoch ms, updated on every write |
Methods
In addition to the WorkflowStore interface methods (getWorkflow, getAllWorkflows), the DynamoDB store also supports:
| Method | Description |
|---|---|
setWorkflow(domain, workflow) | Create or update a workflow definition |
createWorkflow(domain, workflowData) | Create a new workflow with an auto-generated ID. Throws WorkflowAlreadyExistsError on collision. |
deleteWorkflow(domain, workflowId) | Delete a workflow. Returns true if it existed. |
DynamoDBWorkflowMemory
Stores workflow execution contexts (per-subject state) in DynamoDB.
Config
interface DynamoDBWorkflowMemoryConfig {
client: DynamoDBClient;
tableName: string;
gsiName?: string; // default: "domain-subjectId-index"
}| Property | Required | Description |
|---|---|---|
client | Yes | DynamoDB SDK client |
tableName | Yes | Name of the contexts table |
gsiName | No | Name of the domain/subjectId GSI (default: "domain-subjectId-index") |
Table Schema
| Attribute | Key | Description |
|---|---|---|
contextKey | Partition key (S) | Composite key: {domain}#{workflowId}#{subjectId} |
instanceId | Sort key (S) | Workflow instance ID |
domain | GSI partition key (S) | Tenant identifier (denormalised from contextKey for GSI queries) |
subjectId | GSI sort key (S) | Subject identifier (denormalised from contextKey for GSI queries) |
data | — | Full Context JSON |
isCompleted | — | Mirrored from Context for filtering |
startedAt | — | Mirrored from Context for sorting |
updatedAt | — | Epoch ms, updated on every save |
The composite partition key groups all instances of a workflow for a given subject together, making getContexts a single partition query. The domain and subjectId attributes are written alongside every item to power the GSI.
WARNING
Domain, workflow ID, and subject ID must not contain the # character, as it is used as the delimiter in the partition key.
Methods
The store implements the full WorkflowMemory interface (getContexts, saveContext, deleteContext) plus:
| Method | Description |
|---|---|
getContext(domain, workflowId, subjectId, instanceId) | Fetch a single context by instance ID. |
getAllContextsForSubject(domain, subjectId) | List all contexts for a subject across all workflows. Uses the GSI. |
getAllContexts(domain) | List all contexts in a domain across all subjects. Returns contexts annotated with subjectId. Uses the GSI. |
EventBusWorkflowScheduler
Schedules delayed workflow events using EventBridge Scheduler. When a schedule fires, it publishes the event to an EventBridge bus. A downstream consumer (e.g. a Lambda) picks it up and calls WorkflowManager.processEvent.
Schedules are created with ActionAfterCompletion: DELETE, so AWS cleans them up automatically after they fire.
Config
interface EventBusWorkflowSchedulerConfig {
client: SchedulerClient;
eventBusArn: string;
roleArn: string;
scheduleGroupName?: string; // default: "default"
source?: string; // default: "omega-flow"
detailType?: string; // default: "workflow.event"
}| Property | Required | Description |
|---|---|---|
client | Yes | AWS Scheduler SDK client |
eventBusArn | Yes | ARN of the EventBridge bus that receives the scheduled event |
roleArn | Yes | IAM role assumed by Scheduler to put events on the bus |
scheduleGroupName | No | Schedule group name (default: "default") |
source | No | Source field on the published EventBridge event (default: "omega-flow") |
detailType | No | DetailType field on the published EventBridge event (default: "workflow.event") |
How It Works
- A node (e.g.
WaitorTriggerOrTimeout) callsscheduler.schedule(event, delayMs) - The scheduler creates a one-time EventBridge Scheduler schedule that fires at
now + delayMs - When the schedule fires, it publishes the serialized
Eventto the EventBridge bus - A Lambda (or other consumer) subscribed to the bus deserializes the event and calls
manager.processEvent(event) - The schedule auto-deletes after firing
IAM Permissions
The scheduler role (roleArn) needs permission to put events on the bus:
{
"Effect": "Allow",
"Action": "events:PutEvents",
"Resource": "<eventBusArn>"
}The caller creating schedules needs:
{
"Effect": "Allow",
"Action": [
"scheduler:CreateSchedule",
"scheduler:DeleteSchedule"
],
"Resource": "arn:aws:scheduler:*:*:schedule/<groupName>/*"
}And iam:PassRole for the scheduler role.
Mixing Implementations
You can mix AWS and in-memory implementations. For example, use DynamoDB for storage but keep the in-memory scheduler during local development:
import { InMemoryWorkflowScheduler } from "@omega-flow/engine";
import {
DynamoDBWorkflowStore,
DynamoDBWorkflowMemory,
} from "@omega-flow/store-aws";
const manager = new WorkflowManager({
workflowStore: new DynamoDBWorkflowStore({ client: ddb, tableName: "workflows" }),
workflowMemory: new DynamoDBWorkflowMemory({ client: ddb, tableName: "contexts" }),
workflowScheduler: new InMemoryWorkflowScheduler(),
nodeModels: defaultNodeModels,
eventExtractor: (event) => ["default", event.data.userId],
});