Skip to content

AWS Storage & Scheduler

The @omega-flow/store-aws package provides production-ready AWS implementations of the engine's three pluggable interfaces:

ClassImplementsBacked by
DynamoDBWorkflowStoreWorkflowStoreDynamoDB
DynamoDBWorkflowMemoryWorkflowMemoryDynamoDB
EventBusWorkflowSchedulerWorkflowSchedulerEventBridge Scheduler

These are drop-in replacements for the InMemory* implementations used in development.

Installation

bash
pnpm add @omega-flow/store-aws @omega-flow/engine @omega-flow/types

The package depends on the AWS SDK v3 clients — they are included as regular dependencies.

Quick Start

typescript
import { DynamoDBClient } from "@aws-sdk/client-dynamodb";
import { SchedulerClient } from "@aws-sdk/client-scheduler";
import { WorkflowManager, defaultNodeModels } from "@omega-flow/engine";
import {
  DynamoDBWorkflowStore,
  DynamoDBWorkflowMemory,
  EventBusWorkflowScheduler,
} from "@omega-flow/store-aws";

const ddb = new DynamoDBClient({ region: "eu-west-1" });

const manager = new WorkflowManager({
  workflowStore: new DynamoDBWorkflowStore({
    client: ddb,
    tableName: "omega-workflows",
  }),
  workflowMemory: new DynamoDBWorkflowMemory({
    client: ddb,
    tableName: "omega-contexts",
  }),
  workflowScheduler: new EventBusWorkflowScheduler({
    client: new SchedulerClient({ region: "eu-west-1" }),
    eventBusArn: "arn:aws:events:eu-west-1:123456789012:event-bus/omega",
    roleArn: "arn:aws:iam::123456789012:role/OmegaSchedulerRole",
  }),
  nodeModels: defaultNodeModels,
  eventExtractor: (event) => ["default", event.data.userId],
});

Required DynamoDB Tables

You need to create two DynamoDB tables in your AWS account — one for workflow definitions and one for execution contexts. Both use on-demand billing and a composite primary key (partition key + sort key). The exact table names are up to you — pass them via the tableName config option.

Workflows Table

Stores workflow definitions. Each domain (tenant) can have many workflows.

AttributeTypeKey
domainString (S)Partition key
workflowIdString (S)Sort key

Contexts Table

Stores workflow execution state. Each context key (domain + workflow + subject combination) can have multiple instances.

AttributeTypeKey
contextKeyString (S)Partition key
instanceIdString (S)Sort key
domainString (S)GSI partition key
subjectIdString (S)GSI sort key

The contexts table requires one Global Secondary Index:

Index namePartition keySort keyProjection
domain-subjectId-indexdomain (S)subjectId (S)ALL

This GSI enables listing all contexts for a domain (getAllContexts) and for a specific subject (getAllContextsForSubject).

TIP

Both tables use PAY_PER_REQUEST billing mode — you only pay for what you use. The GSI index name is configurable via the gsiName config option (defaults to domain-subjectId-index).

DynamoDBWorkflowStore

Stores workflow definitions in DynamoDB.

Config

typescript
interface DynamoDBWorkflowStoreConfig {
  client: DynamoDBClient;
  tableName: string;
}

Table Schema

AttributeKeyDescription
domainPartition key (S)Tenant identifier — e.g. an organization ID, company ID, or a fixed string like "default" for single-tenant setups. Maps to the domain parameter used throughout the engine.
workflowIdSort key (S)Workflow ID
dataFull Workflow JSON
createdAtEpoch ms, set on first write
updatedAtEpoch ms, updated on every write

Methods

In addition to the WorkflowStore interface methods (getWorkflow, getAllWorkflows), the DynamoDB store also supports:

MethodDescription
setWorkflow(domain, workflow)Create or update a workflow definition
createWorkflow(domain, workflowData)Create a new workflow with an auto-generated ID. Throws WorkflowAlreadyExistsError on collision.
deleteWorkflow(domain, workflowId)Delete a workflow. Returns true if it existed.

DynamoDBWorkflowMemory

Stores workflow execution contexts (per-subject state) in DynamoDB.

Config

typescript
interface DynamoDBWorkflowMemoryConfig {
  client: DynamoDBClient;
  tableName: string;
  gsiName?: string;  // default: "domain-subjectId-index"
}
PropertyRequiredDescription
clientYesDynamoDB SDK client
tableNameYesName of the contexts table
gsiNameNoName of the domain/subjectId GSI (default: "domain-subjectId-index")

Table Schema

AttributeKeyDescription
contextKeyPartition key (S)Composite key: {domain}#{workflowId}#{subjectId}
instanceIdSort key (S)Workflow instance ID
domainGSI partition key (S)Tenant identifier (denormalised from contextKey for GSI queries)
subjectIdGSI sort key (S)Subject identifier (denormalised from contextKey for GSI queries)
dataFull Context JSON
isCompletedMirrored from Context for filtering
startedAtMirrored from Context for sorting
updatedAtEpoch ms, updated on every save

The composite partition key groups all instances of a workflow for a given subject together, making getContexts a single partition query. The domain and subjectId attributes are written alongside every item to power the GSI.

WARNING

Domain, workflow ID, and subject ID must not contain the # character, as it is used as the delimiter in the partition key.

Methods

The store implements the full WorkflowMemory interface (getContexts, saveContext, deleteContext) plus:

MethodDescription
getContext(domain, workflowId, subjectId, instanceId)Fetch a single context by instance ID.
getAllContextsForSubject(domain, subjectId)List all contexts for a subject across all workflows. Uses the GSI.
getAllContexts(domain)List all contexts in a domain across all subjects. Returns contexts annotated with subjectId. Uses the GSI.

EventBusWorkflowScheduler

Schedules delayed workflow events using EventBridge Scheduler. When a schedule fires, it publishes the event to an EventBridge bus. A downstream consumer (e.g. a Lambda) picks it up and calls WorkflowManager.processEvent.

Schedules are created with ActionAfterCompletion: DELETE, so AWS cleans them up automatically after they fire.

Config

typescript
interface EventBusWorkflowSchedulerConfig {
  client: SchedulerClient;
  eventBusArn: string;
  roleArn: string;
  scheduleGroupName?: string;  // default: "default"
  source?: string;             // default: "omega-flow"
  detailType?: string;         // default: "workflow.event"
}
PropertyRequiredDescription
clientYesAWS Scheduler SDK client
eventBusArnYesARN of the EventBridge bus that receives the scheduled event
roleArnYesIAM role assumed by Scheduler to put events on the bus
scheduleGroupNameNoSchedule group name (default: "default")
sourceNoSource field on the published EventBridge event (default: "omega-flow")
detailTypeNoDetailType field on the published EventBridge event (default: "workflow.event")

How It Works

  1. A node (e.g. Wait or TriggerOrTimeout) calls scheduler.schedule(event, delayMs)
  2. The scheduler creates a one-time EventBridge Scheduler schedule that fires at now + delayMs
  3. When the schedule fires, it publishes the serialized Event to the EventBridge bus
  4. A Lambda (or other consumer) subscribed to the bus deserializes the event and calls manager.processEvent(event)
  5. The schedule auto-deletes after firing

IAM Permissions

The scheduler role (roleArn) needs permission to put events on the bus:

json
{
  "Effect": "Allow",
  "Action": "events:PutEvents",
  "Resource": "<eventBusArn>"
}

The caller creating schedules needs:

json
{
  "Effect": "Allow",
  "Action": [
    "scheduler:CreateSchedule",
    "scheduler:DeleteSchedule"
  ],
  "Resource": "arn:aws:scheduler:*:*:schedule/<groupName>/*"
}

And iam:PassRole for the scheduler role.

Mixing Implementations

You can mix AWS and in-memory implementations. For example, use DynamoDB for storage but keep the in-memory scheduler during local development:

typescript
import { InMemoryWorkflowScheduler } from "@omega-flow/engine";
import {
  DynamoDBWorkflowStore,
  DynamoDBWorkflowMemory,
} from "@omega-flow/store-aws";

const manager = new WorkflowManager({
  workflowStore: new DynamoDBWorkflowStore({ client: ddb, tableName: "workflows" }),
  workflowMemory: new DynamoDBWorkflowMemory({ client: ddb, tableName: "contexts" }),
  workflowScheduler: new InMemoryWorkflowScheduler(),
  nodeModels: defaultNodeModels,
  eventExtractor: (event) => ["default", event.data.userId],
});