Skip to content

Engine API

API reference for the @omega-flow/engine package.

WorkflowManager

Top-level orchestrator that manages multiple workflows across domains and subjects.

Constructor

typescript
new WorkflowManager(config: WorkflowManagerConfig)

WorkflowManagerConfig

PropertyTypeDescription
workflowStoreWorkflowStoreStorage backend for workflow definitions
workflowMemoryWorkflowMemoryStorage backend for workflow execution contexts
workflowSchedulerWorkflowSchedulerScheduler for time-based events
nodeModelsNodeModelRegistryMap of node type names to their NodeModel classes (Record<string, NodeModelClass>)
eventExtractor(event: Event) => [string, string]Function to extract [domain, subjectId] from events

Methods

processEvent

typescript
processEvent(event: Event): Promise<void>

Process an event by routing it to appropriate workflow instances. This method:

  • Extracts domain and subject ID from the event
  • Loads all workflows for the domain
  • Resumes active workflow instances with the event
  • Starts new instances if allowed by frequency rules

getScheduler

typescript
getScheduler(): WorkflowScheduler

Returns the workflow scheduler instance.

Example

typescript
import {
  WorkflowManager,
  InMemoryWorkflowStore,
  InMemoryWorkflowMemory,
  InMemoryWorkflowScheduler,
  defaultNodeModels,
} from "@omega-flow/engine";

const manager = new WorkflowManager({
  workflowStore: new InMemoryWorkflowStore("default", workflows),
  workflowMemory: new InMemoryWorkflowMemory(),
  workflowScheduler: new InMemoryWorkflowScheduler(),
  nodeModels: defaultNodeModels,
  eventExtractor: (event) => ["default", event.data.userId],
});

await manager.processEvent({
  id: "evt-1",
  type: "user.signup",
  time: Date.now(),
  data: { userId: "user-123" }
});

WorkflowModel

Executes a single workflow instance. Manages the lifecycle of workflow execution including state transitions and event processing.

Constructor

typescript
new WorkflowModel(workflow: Workflow, nodeModels: NodeModelRegistry, services?: NodeServices)
ParameterTypeDescription
workflowWorkflowThe workflow definition to execute
nodeModelsNodeModelRegistryMap of node type names to their classes
servicesNodeServicesOptional services bag injected into all nodes

Properties

PropertyTypeDescription
workflowWorkflowThe workflow definition
nodesNodeModel[]Instantiated node models
edgesEdgeModel[]Edge models connecting nodes
currentNodeNodeModel | nullNode currently waiting for events
historyWorkflowHistoryItem[]Execution history
statusWorkflowStatusCurrent execution status
instanceIdstringUnique instance identifier
startedAtnumberStart timestamp (ms)

Methods

start

typescript
start(): void

Starts or resumes workflow execution. If no context was set, starts from the beginning. If a context was set via setContext(), resumes from the saved position.

Throws: Error if workflow is already running or completed.

acceptEvent

typescript
acceptEvent(event: Event): Promise<void>

Processes an incoming event through the workflow. Passes the event to the current node and handles transitions. Recursively processes through subsequent nodes until a node doesn't accept the event or the workflow completes.

Throws: Error if workflow is not in waiting status.

getContext

typescript
getContext(): Context

Exports the current workflow state as a Context object for persistence.

setContext

typescript
setContext(context: Context): void

Restores workflow state from a previously saved context. Call start() after this to begin processing events.

Throws: Error if workflow is already running or context is invalid.

getStatus

typescript
getStatus(): WorkflowStatus

Returns the current execution status.

getCurrentNode

typescript
getCurrentNode(): NodeModel | null

Returns the current node waiting for events.

Throws: Error if workflow is not running.

getStartNode

typescript
getStartNode(): NodeModel | null

Returns the start node (node with no incoming edges).

getNode

typescript
getNode(nodeId: string | null): NodeModel | null

Finds a node by its ID.

Example

typescript
import { WorkflowModel, defaultNodeModels } from "@omega-flow/engine";

// Create and start
const workflow = new WorkflowModel(workflowDef, defaultNodeModels);
workflow.start();

// Process event
await workflow.acceptEvent({
  id: "evt-1",
  type: "trigger-event",
  time: Date.now(),
  data: {}
});

// Check status
console.log(workflow.getStatus()); // "waiting" | "completed"

// Save state
const context = workflow.getContext();
// ... persist context ...

// Later, restore and continue
const workflow2 = new WorkflowModel(workflowDef, nodeTypes);
workflow2.setContext(savedContext);
workflow2.start();
await workflow2.acceptEvent(nextEvent);

NodeModel

Base class for all workflow node types. Subclasses implement specific node behaviors.

Constructor

typescript
new NodeModel(node: Node)

Properties

PropertyTypeDescription
nodeNodeThe underlying node definition
connectionsConnection[]Outgoing connections to other nodes
stateanyInternal state for cross-method data sharing
servicesNodeServicesServices available to the node (scheduler, etc.)

Methods

Static: create

typescript
static create(node: Node): NodeModel

Factory method to create a node instance. Subclasses should override for type validation.

getId

typescript
getId(): string

Returns the node's unique identifier.

getData

typescript
getData(): any

Returns the node's data payload (params, configuration).

getState

typescript
getState(): any

Gets the node's internal state.

setState

typescript
setState(state: any): void

Replaces the node's internal state.

updateState

typescript
updateState(changes: any): void

Merges changes into existing state (shallow merge).

connect

typescript
connect(targetNode: NodeModel, edge: EdgeModel): void

Connects this node to a target node via an edge.

getConnections

typescript
getConnections(): Connection[]

Returns all outgoing connections.

getSourceHandles

typescript
getSourceHandles(): string[]

Returns all source handle (output) identifiers.

getTargetNodeFromSourceHandle

typescript
getTargetNodeFromSourceHandle(sourceHandle: string): NodeModel | null

Gets the node connected to a specific output handle.

getDefaultNext

typescript
getDefaultNext(): NodeModel | null

Shortcut for the single-output case — returns the node connected to the first source handle, or null if this node has no outgoing connections. Use it instead of getTargetNodeFromSourceHandle(getSourceHandles()[0]) in pass-through nodes.

acceptEvent (abstract)

typescript
acceptEvent(event: Event): Promise<boolean>

Processes an incoming event. Must be overridden by subclasses.

Returns: true if event is accepted and processing is complete, false if still waiting.

nextNode (abstract)

typescript
nextNode(event: Event): Promise<NodeModel | null>

Determines the next node to execute. Must be overridden by subclasses.

Returns: The next NodeModel, or null to end the workflow.

Example

typescript
import { NodeModel } from "@omega-flow/engine";
import type { Node, Event } from "@omega-flow/types";

class MyCustomNode extends NodeModel {
  static create(node: Node): MyCustomNode {
    if (node.type !== "MyCustom") {
      throw new Error("Node type must be MyCustom");
    }
    return new this(node);
  }

  async acceptEvent(event: Event): Promise<boolean> {
    const data = this.getData();
    if (event.type === data.params.triggerEvent) {
      this.setState({ processed: true });
      return true;
    }
    return false;
  }

  async nextNode(event: Event): Promise<NodeModel | null> {
    return this.getDefaultNext();
  }
}

NodeModelClass / NodeModelRegistry

Helper type aliases for typing custom node classes and registries.

typescript
import type { NodeModelClass, NodeModelRegistry } from "@omega-flow/engine";

// NodeModelClass = typeof NodeModel
// NodeModelRegistry = Record<string, NodeModelClass>

NodeModelRegistry is the type accepted by WorkflowManagerConfig.nodeModels and WorkflowModel's constructor.


NodeServices

Interface for services injected into nodes by the workflow engine. This provides an extensible way to give nodes access to infrastructure without polluting the base class with individual properties.

typescript
interface NodeServices {
  scheduler?: WorkflowScheduler;
}
PropertyTypeDescription
schedulerWorkflowSchedulerOptional scheduler for nodes that need to schedule future events

Nodes access services via this.services:

typescript
if (this.services.scheduler) {
  await this.services.scheduler.schedule(event, delayMs);
}

When using WorkflowManager, services are automatically constructed and injected. When using WorkflowModel directly, pass services as the third constructor argument.


Storage Interfaces

WorkflowStore

Interface for workflow definition storage.

typescript
interface WorkflowStore {
  getWorkflow(domain: string, workflowId: string): Promise<Workflow | null>;
  getAllWorkflows(domain: string): Promise<Workflow[]>;
}
MethodDescription
getWorkflowRetrieve a workflow definition by ID
getAllWorkflowsGet all workflow definitions for a domain

WorkflowMemory

Interface for workflow execution state storage.

typescript
interface WorkflowMemory {
  getContexts(domain: string, workflowId: string, subjectId: string): Promise<Context[]>;
  saveContext(domain: string, workflowId: string, subjectId: string, context: Context): Promise<void>;
  deleteContext(domain: string, workflowId: string, subjectId: string, instanceId: string): Promise<void>;
}
MethodDescription
getContextsGet all contexts for a workflow and subject
saveContextSave a workflow context
deleteContextDelete a specific workflow instance context

WorkflowScheduler

Interface for scheduling future events.

typescript
interface WorkflowScheduler {
  schedule(event: Event, delayMs: number): Promise<string>;
  cancel(scheduleId: string): Promise<boolean>;
}
MethodDescription
scheduleSchedule an event to be delivered after a delay
cancelCancel a scheduled event

Built-in Implementations

InMemoryWorkflowStore

In-memory implementation of WorkflowStore for development and testing.

typescript
new InMemoryWorkflowStore(domain: string, workflows: Workflow[])

InMemoryWorkflowMemory

In-memory implementation of WorkflowMemory for development and testing.

typescript
new InMemoryWorkflowMemory()

InMemoryWorkflowScheduler

In-memory implementation of WorkflowScheduler using setTimeout.

typescript
new InMemoryWorkflowScheduler()

AWS Implementations

The @omega-flow/store-aws package provides production-ready implementations backed by DynamoDB and EventBridge Scheduler:

ClassImplementsBacked by
DynamoDBWorkflowStoreWorkflowStoreDynamoDB
DynamoDBWorkflowMemoryWorkflowMemoryDynamoDB
EventBusWorkflowSchedulerWorkflowSchedulerEventBridge Scheduler

See the AWS Storage & Scheduler guide for configuration, table schemas, and IAM setup.


Built-in Node Types

Available from @omega-flow/engine:

typescript
import { defaultNodeModels } from "@omega-flow/engine";
// { Trigger, Action, Condition, Wait, TriggerOrTimeout, Exit }

Trigger

Waits for a specific event type.

ConfigTypeDescription
data.params.eventstringEvent type to listen for

acceptEvent: Returns true if event.type === params.event

nextNode: Returns first connected node

Action

Pass-through node that accepts all events.

ConfigTypeDescription
data.actionstringAction identifier
data.paramsobjectAction parameters

acceptEvent: Always returns true

nextNode: Returns first connected node

Condition

Evaluates conditions using the built-in evaluator.

ConfigTypeDescription
data.conditionsConditionsRule groups in the shared Conditions format from @omega-flow/types

acceptEvent: Always returns true, stores result in state

nextNode: Returns node from "true" or "false" handle based on evaluation

Wait

Pauses workflow for a duration.

ConfigTypeDescription
data.params.durationnumberWait duration in milliseconds

acceptEvent: Returns false while waiting, true when duration elapsed

nextNode: Returns first connected node

TriggerOrTimeout

Waits for event or timeout, whichever comes first.

ConfigTypeDescription
data.params.eventstringEvent type to listen for
data.params.durationnumberTimeout duration in milliseconds

acceptEvent: Returns true on matching event or timeout, recording which one resolved in state

nextNode: Returns node from "trigger" or "timeout" handle based on which path resolved

Exit

Terminates the workflow.

acceptEvent: Always returns true

nextNode: Always returns null


Enums

WorkflowStatus

typescript
enum WorkflowStatus {
  Idle = "idle",
  Waiting = "waiting",
  Processing = "processing",
  Transforming = "transforming",
  Completed = "completed"
}
ValueDescription
idleWorkflow created but not started
waitingRunning, waiting for events
processingCurrently handling an event
transformingMoving between nodes
completedWorkflow finished

Types

See Types Reference for complete type definitions including:

  • Workflow
  • WorkflowOptions
  • WorkflowFrequency
  • Context
  • NodeState
  • Event
  • Node
  • Edge
  • WorkflowHistoryItem