Engine API
API reference for the @omega-flow/engine package.
WorkflowManager
Top-level orchestrator that manages multiple workflows across domains and subjects.
Constructor
new WorkflowManager(config: WorkflowManagerConfig)WorkflowManagerConfig
| Property | Type | Description |
|---|---|---|
workflowStore | WorkflowStore | Storage backend for workflow definitions |
workflowMemory | WorkflowMemory | Storage backend for workflow execution contexts |
workflowScheduler | WorkflowScheduler | Scheduler for time-based events |
nodeModels | NodeModelRegistry | Map of node type names to their NodeModel classes (Record<string, NodeModelClass>) |
eventExtractor | (event: Event) => [string, string] | Function to extract [domain, subjectId] from events |
Methods
processEvent
processEvent(event: Event): Promise<void>Process an event by routing it to appropriate workflow instances. This method:
- Extracts domain and subject ID from the event
- Loads all workflows for the domain
- Resumes active workflow instances with the event
- Starts new instances if allowed by frequency rules
getScheduler
getScheduler(): WorkflowSchedulerReturns the workflow scheduler instance.
Example
import {
WorkflowManager,
InMemoryWorkflowStore,
InMemoryWorkflowMemory,
InMemoryWorkflowScheduler,
defaultNodeModels,
} from "@omega-flow/engine";
const manager = new WorkflowManager({
workflowStore: new InMemoryWorkflowStore("default", workflows),
workflowMemory: new InMemoryWorkflowMemory(),
workflowScheduler: new InMemoryWorkflowScheduler(),
nodeModels: defaultNodeModels,
eventExtractor: (event) => ["default", event.data.userId],
});
await manager.processEvent({
id: "evt-1",
type: "user.signup",
time: Date.now(),
data: { userId: "user-123" }
});WorkflowModel
Executes a single workflow instance. Manages the lifecycle of workflow execution including state transitions and event processing.
Constructor
new WorkflowModel(workflow: Workflow, nodeModels: NodeModelRegistry, services?: NodeServices)| Parameter | Type | Description |
|---|---|---|
workflow | Workflow | The workflow definition to execute |
nodeModels | NodeModelRegistry | Map of node type names to their classes |
services | NodeServices | Optional services bag injected into all nodes |
Properties
| Property | Type | Description |
|---|---|---|
workflow | Workflow | The workflow definition |
nodes | NodeModel[] | Instantiated node models |
edges | EdgeModel[] | Edge models connecting nodes |
currentNode | NodeModel | null | Node currently waiting for events |
history | WorkflowHistoryItem[] | Execution history |
status | WorkflowStatus | Current execution status |
instanceId | string | Unique instance identifier |
startedAt | number | Start timestamp (ms) |
Methods
start
start(): voidStarts or resumes workflow execution. If no context was set, starts from the beginning. If a context was set via setContext(), resumes from the saved position.
Throws: Error if workflow is already running or completed.
acceptEvent
acceptEvent(event: Event): Promise<void>Processes an incoming event through the workflow. Passes the event to the current node and handles transitions. Recursively processes through subsequent nodes until a node doesn't accept the event or the workflow completes.
Throws: Error if workflow is not in waiting status.
getContext
getContext(): ContextExports the current workflow state as a Context object for persistence.
setContext
setContext(context: Context): voidRestores workflow state from a previously saved context. Call start() after this to begin processing events.
Throws: Error if workflow is already running or context is invalid.
getStatus
getStatus(): WorkflowStatusReturns the current execution status.
getCurrentNode
getCurrentNode(): NodeModel | nullReturns the current node waiting for events.
Throws: Error if workflow is not running.
getStartNode
getStartNode(): NodeModel | nullReturns the start node (node with no incoming edges).
getNode
getNode(nodeId: string | null): NodeModel | nullFinds a node by its ID.
Example
import { WorkflowModel, defaultNodeModels } from "@omega-flow/engine";
// Create and start
const workflow = new WorkflowModel(workflowDef, defaultNodeModels);
workflow.start();
// Process event
await workflow.acceptEvent({
id: "evt-1",
type: "trigger-event",
time: Date.now(),
data: {}
});
// Check status
console.log(workflow.getStatus()); // "waiting" | "completed"
// Save state
const context = workflow.getContext();
// ... persist context ...
// Later, restore and continue
const workflow2 = new WorkflowModel(workflowDef, nodeTypes);
workflow2.setContext(savedContext);
workflow2.start();
await workflow2.acceptEvent(nextEvent);NodeModel
Base class for all workflow node types. Subclasses implement specific node behaviors.
Constructor
new NodeModel(node: Node)Properties
| Property | Type | Description |
|---|---|---|
node | Node | The underlying node definition |
connections | Connection[] | Outgoing connections to other nodes |
state | any | Internal state for cross-method data sharing |
services | NodeServices | Services available to the node (scheduler, etc.) |
Methods
Static: create
static create(node: Node): NodeModelFactory method to create a node instance. Subclasses should override for type validation.
getId
getId(): stringReturns the node's unique identifier.
getData
getData(): anyReturns the node's data payload (params, configuration).
getState
getState(): anyGets the node's internal state.
setState
setState(state: any): voidReplaces the node's internal state.
updateState
updateState(changes: any): voidMerges changes into existing state (shallow merge).
connect
connect(targetNode: NodeModel, edge: EdgeModel): voidConnects this node to a target node via an edge.
getConnections
getConnections(): Connection[]Returns all outgoing connections.
getSourceHandles
getSourceHandles(): string[]Returns all source handle (output) identifiers.
getTargetNodeFromSourceHandle
getTargetNodeFromSourceHandle(sourceHandle: string): NodeModel | nullGets the node connected to a specific output handle.
getDefaultNext
getDefaultNext(): NodeModel | nullShortcut for the single-output case — returns the node connected to the first source handle, or null if this node has no outgoing connections. Use it instead of getTargetNodeFromSourceHandle(getSourceHandles()[0]) in pass-through nodes.
acceptEvent (abstract)
acceptEvent(event: Event): Promise<boolean>Processes an incoming event. Must be overridden by subclasses.
Returns: true if event is accepted and processing is complete, false if still waiting.
nextNode (abstract)
nextNode(event: Event): Promise<NodeModel | null>Determines the next node to execute. Must be overridden by subclasses.
Returns: The next NodeModel, or null to end the workflow.
Example
import { NodeModel } from "@omega-flow/engine";
import type { Node, Event } from "@omega-flow/types";
class MyCustomNode extends NodeModel {
static create(node: Node): MyCustomNode {
if (node.type !== "MyCustom") {
throw new Error("Node type must be MyCustom");
}
return new this(node);
}
async acceptEvent(event: Event): Promise<boolean> {
const data = this.getData();
if (event.type === data.params.triggerEvent) {
this.setState({ processed: true });
return true;
}
return false;
}
async nextNode(event: Event): Promise<NodeModel | null> {
return this.getDefaultNext();
}
}NodeModelClass / NodeModelRegistry
Helper type aliases for typing custom node classes and registries.
import type { NodeModelClass, NodeModelRegistry } from "@omega-flow/engine";
// NodeModelClass = typeof NodeModel
// NodeModelRegistry = Record<string, NodeModelClass>NodeModelRegistry is the type accepted by WorkflowManagerConfig.nodeModels and WorkflowModel's constructor.
NodeServices
Interface for services injected into nodes by the workflow engine. This provides an extensible way to give nodes access to infrastructure without polluting the base class with individual properties.
interface NodeServices {
scheduler?: WorkflowScheduler;
}| Property | Type | Description |
|---|---|---|
scheduler | WorkflowScheduler | Optional scheduler for nodes that need to schedule future events |
Nodes access services via this.services:
if (this.services.scheduler) {
await this.services.scheduler.schedule(event, delayMs);
}When using WorkflowManager, services are automatically constructed and injected. When using WorkflowModel directly, pass services as the third constructor argument.
Storage Interfaces
WorkflowStore
Interface for workflow definition storage.
interface WorkflowStore {
getWorkflow(domain: string, workflowId: string): Promise<Workflow | null>;
getAllWorkflows(domain: string): Promise<Workflow[]>;
}| Method | Description |
|---|---|
getWorkflow | Retrieve a workflow definition by ID |
getAllWorkflows | Get all workflow definitions for a domain |
WorkflowMemory
Interface for workflow execution state storage.
interface WorkflowMemory {
getContexts(domain: string, workflowId: string, subjectId: string): Promise<Context[]>;
saveContext(domain: string, workflowId: string, subjectId: string, context: Context): Promise<void>;
deleteContext(domain: string, workflowId: string, subjectId: string, instanceId: string): Promise<void>;
}| Method | Description |
|---|---|
getContexts | Get all contexts for a workflow and subject |
saveContext | Save a workflow context |
deleteContext | Delete a specific workflow instance context |
WorkflowScheduler
Interface for scheduling future events.
interface WorkflowScheduler {
schedule(event: Event, delayMs: number): Promise<string>;
cancel(scheduleId: string): Promise<boolean>;
}| Method | Description |
|---|---|
schedule | Schedule an event to be delivered after a delay |
cancel | Cancel a scheduled event |
Built-in Implementations
InMemoryWorkflowStore
In-memory implementation of WorkflowStore for development and testing.
new InMemoryWorkflowStore(domain: string, workflows: Workflow[])InMemoryWorkflowMemory
In-memory implementation of WorkflowMemory for development and testing.
new InMemoryWorkflowMemory()InMemoryWorkflowScheduler
In-memory implementation of WorkflowScheduler using setTimeout.
new InMemoryWorkflowScheduler()AWS Implementations
The @omega-flow/store-aws package provides production-ready implementations backed by DynamoDB and EventBridge Scheduler:
| Class | Implements | Backed by |
|---|---|---|
DynamoDBWorkflowStore | WorkflowStore | DynamoDB |
DynamoDBWorkflowMemory | WorkflowMemory | DynamoDB |
EventBusWorkflowScheduler | WorkflowScheduler | EventBridge Scheduler |
See the AWS Storage & Scheduler guide for configuration, table schemas, and IAM setup.
Built-in Node Types
Available from @omega-flow/engine:
import { defaultNodeModels } from "@omega-flow/engine";
// { Trigger, Action, Condition, Wait, TriggerOrTimeout, Exit }Trigger
Waits for a specific event type.
| Config | Type | Description |
|---|---|---|
data.params.event | string | Event type to listen for |
acceptEvent: Returns true if event.type === params.event
nextNode: Returns first connected node
Action
Pass-through node that accepts all events.
| Config | Type | Description |
|---|---|---|
data.action | string | Action identifier |
data.params | object | Action parameters |
acceptEvent: Always returns true
nextNode: Returns first connected node
Condition
Evaluates conditions using the built-in evaluator.
| Config | Type | Description |
|---|---|---|
data.conditions | Conditions | Rule groups in the shared Conditions format from @omega-flow/types |
acceptEvent: Always returns true, stores result in state
nextNode: Returns node from "true" or "false" handle based on evaluation
Wait
Pauses workflow for a duration.
| Config | Type | Description |
|---|---|---|
data.params.duration | number | Wait duration in milliseconds |
acceptEvent: Returns false while waiting, true when duration elapsed
nextNode: Returns first connected node
TriggerOrTimeout
Waits for event or timeout, whichever comes first.
| Config | Type | Description |
|---|---|---|
data.params.event | string | Event type to listen for |
data.params.duration | number | Timeout duration in milliseconds |
acceptEvent: Returns true on matching event or timeout, recording which one resolved in state
nextNode: Returns node from "trigger" or "timeout" handle based on which path resolved
Exit
Terminates the workflow.
acceptEvent: Always returns true
nextNode: Always returns null
Enums
WorkflowStatus
enum WorkflowStatus {
Idle = "idle",
Waiting = "waiting",
Processing = "processing",
Transforming = "transforming",
Completed = "completed"
}| Value | Description |
|---|---|
idle | Workflow created but not started |
waiting | Running, waiting for events |
processing | Currently handling an event |
transforming | Moving between nodes |
completed | Workflow finished |
Types
See Types Reference for complete type definitions including:
WorkflowWorkflowOptionsWorkflowFrequencyContextNodeStateEventNodeEdgeWorkflowHistoryItem