Workflows are long-running processes that handle complex, multi-step operations or scheduled tasks. Unlike conversations , workflows can run independently on a schedule or be triggered by events.
While workflows in the ADK are similar in concept to Workflows in Botpress Studio, they behave differently and shouldn’t be treated as equivalent.
Creating a workflow
Create a workflow in src/workflows/:
import { Workflow } from "@botpress/runtime" ;
export default new Workflow ({
name: "my-workflow" ,
description: "A workflow that processes data" ,
handler : async ({ step }) => {
// Workflow logic
},
});
Scheduled workflows
Workflows can run on a schedule using cron syntax:
import { WebsiteKB } from '../knowledge/docs'
export default new Workflow ({
name: "periodic-indexing" ,
description: "Indexes knowledge base every 6 hours" ,
schedule: "0 */6 * * *" , // Every 6 hours
handler : async ({}) => {
await WebsiteKB . refresh ();
},
});
Steps
By default, workflows time out after 5 minutes. For longer workflows, use the step function to break the workflow into a series of persisted steps:
export default new Workflow ({
name: "data-processing" ,
handler : async ({ step }) => {
// Step 1: Fetch data
const data = await step ( "fetch-data" , async () => {
return await fetchDataFromAPI ();
});
// Step 2: Process data
const processed = await step ( "process-data" , async () => {
return processData ( data );
});
// Step 3: Store results
await step ( "store-results" , async () => {
await saveResults ( processed );
});
},
});
Steps are persisted — if a workflow is interrupted, it can resume from the last completed step. This provides better handling when errors occur in complex, long-running workflows.
You can nest steps inside other steps — each step will complete when each of its sub-steps complete.
Step parameters
const data = await step ( "fetch-data" , async ({ attempt }) => {
return await fetchDataFromAPI ();
}, { maxAttempts: 5 });
Unique identifier for this step within the workflow. Make sure you set a unique identifier for each step. Otherwise, the second step won’t execute and will reuse the output of the first step.
run
(opts: { attempt: number }) => T | Promise<T>
required
Function to execute. Receives the current attempt number.
Optional configuration object. Maximum number of retry attempts if the step fails. Defaults to 5.
Handling failing steps
If a step fails, it throws a rejected promise. This will fail not only the current step, but the entire workflow .
To avoid this, make sure you catch errors gracefully:
try {
await step . request ( "orderId" , "Please provide the order ID." );
} catch ( err ) {
console . log ( err );
}
If the method you’re using has an options.maxAttempts field, it throws an error after the maximum number of retries has been exceeded:
try {
await step (
"data-processing" ,
async ({ attempt }) => {
console . log ( `Trying step: attempt # ${ attempt } ` );
// Your code here
},
{ maxAttempts: 10 }
);
} catch ( err ) {
console . log ( err );
}
Step methods
The step object provides methods for workflow control:
step.listen
Put the workflow into listening mode, waiting for external events to resume:
await step . listen ( "wait-for-approval" );
// Workflow pauses here until triggered
step.sleep
Pause workflow execution for a specified duration:
await step . sleep ( "wait-5-min" , 5 * 60 * 1000 );
step.sleepUntil
Sleep until a specific date:
await step . sleepUntil ( "wait-until-noon" , new Date ( "2025-01-15T12:00:00Z" ));
step.fail
Mark the workflow as failed and stop execution:
if (! user . isVerified ) {
await step . fail ( "User verification required" );
}
step.abort
Immediately abort the workflow execution without marking it as failed:
if ( shouldPause ) {
step . abort ();
}
step.progress
Record a progress checkpoint without performing any action:
await step . progress ( "Started processing" );
// ... do work ...
await step . progress ( "Finished processing" );
step.waitForWorkflow
Wait for another workflow to complete before continuing:
const childWorkflow = await childWorkflowInstance . start ({});
const result = await step . waitForWorkflow ( "wait-for-child" , childWorkflow . id );
step.executeWorkflow
Start another workflow and wait for it to complete:
import ProcessingWorkflow from "../workflows/processing" ;
const result = await step . executeWorkflow (
"process-data" ,
ProcessingWorkflow ,
{ data: inputData }
);
step.map
Process an array of items in parallel with controlled concurrency:
const results = await step . map (
"process-users" ,
users ,
async ( user , { i }) => await processUser ( user ),
{ concurrency: 5 , maxAttempts: 3 }
);
step.forEach
Process an array of items without collecting results:
await step . forEach (
"notify-users" ,
users ,
async ( user ) => await sendNotification ( user ),
{ concurrency: 10 }
);
step.batch
Process items in sequential batches:
await step . batch (
"bulk-insert" ,
records ,
async ( batch ) => await database . bulkInsert ( batch ),
{ batchSize: 100 }
);
step.request
Request data from a conversation and wait for a response. Requires defining requests in the workflow:
export default new Workflow ({
name: "order-workflow" ,
requests: {
orderId: z . object ({
orderId: z . string (),
}),
},
handler : async ({ step }) => {
const data = await step . request ( "orderId" , "Please provide the order ID" );
// data is typed based on the request schema
},
});
Output
Workflows can return an output that matches the output schema:
export default new Workflow ({
name: "calculate-totals" ,
input: z . object ({ orderId: z . string () }),
output: z . object ({ total: z . number () }),
handler : async ({ input , step }) => {
const order = await step ( "fetch-order" , async () => {
return await fetchOrder ( input . orderId );
});
return { total: order . total };
},
});
Workflow methods
Workflows can be started and managed programmatically.
workflow.start()
Start a new workflow instance:
import ProcessingWorkflow from "../workflows/processing" ;
const instance = await ProcessingWorkflow . start ({ orderId: "12345" });
console . log ( "Started workflow:" , instance . id );
workflow.getOrCreate()
Get an existing workflow or create a new one with deduplication:
const instance = await ProcessingWorkflow . getOrCreate ({
key: "order-12345" , // Unique key for deduplication
input: { orderId: "12345" },
statuses: [ "pending" , "in_progress" ], // Only match workflows with these statuses
});
Workflow.get()
Load any workflow instance by its ID:
import { Workflow } from "@botpress/runtime" ;
const instance = await Workflow . get ( "workflow-id" );
Convert a workflow into a tool for use with execute():
import { Conversation } from "@botpress/runtime" ;
import ProcessingWorkflow from "../workflows/processing" ;
export default new Conversation ({
channel: "*" ,
handler : async ({ execute }) => {
await execute ({
instructions: "You are a helpful assistant." ,
tools: [ ProcessingWorkflow . asTool ()],
});
},
});
workflow.provide()
Provide data in response to a workflow data request (used with step.request()):
import { isWorkflowDataRequest } from "@botpress/runtime" ;
import OrderWorkflow from "../workflows/order" ;
export default new Conversation ({
channel: "*" ,
handler : async ({ type , request }) => {
if ( type === "workflow_request" ) {
await OrderWorkflow . provide ( request , { orderId: "12345" });
}
},
});
Workflow instance
When you start a workflow, you get a workflow instance with additional methods:
instance.cancel()
Cancel a running workflow:
const instance = await ProcessingWorkflow . start ({ orderId: "12345" });
await instance . cancel ();
instance.setTimeout()
Extend the timeout of a running workflow:
// Relative duration
await instance . setTimeout ({ in: "30m" });
// Absolute timestamp
await instance . setTimeout ({ at: "2025-01-15T12:00:00Z" });
instance.complete()
Complete a workflow early with output (only available inside the handler):
export default new Workflow ({
name: "my-workflow" ,
output: z . object ({ result: z . string () }),
handler : async ({ workflow }) => {
workflow . complete ({ result: "done early" });
},
});
Helper functions
isWorkflowDataRequest()
Check if an event is a workflow data request:
import { isWorkflowDataRequest } from "@botpress/runtime" ;
if ( isWorkflowDataRequest ( event )) {
// Handle the workflow data request
}
isWorkflowCallback()
Check if an event is a workflow callback:
import { isWorkflowCallback } from "@botpress/runtime" ;
if ( isWorkflowCallback ( event )) {
// Handle the workflow callback
}
Reference
Workflow props
Unique name for the workflow. Used to identify and reference the workflow.
Optional description of what the workflow does.
Optional Zod schema defining the input parameters for the workflow. Defaults to an empty object if not provided.
Optional Zod schema defining the output/return type of the workflow. Defaults to an empty object if not provided.
Optional Zod schema defining the workflow state structure. Defaults to an empty object if not provided.
Optional object mapping request names to their Zod schemas. Each key is a request name and value is a Zod schema for the request data.
handler
(props: object) => Promise<any> | any
required
Handler function that implements the workflow logic. See handler parameters below for details.
Optional cron expression for scheduled workflows (e.g., “0 */6 * * *” for every 6 hours).
timeout
'{number}s' | '{number}m' | '{number}h'
Optional timeout for workflow execution (e.g., “5m”, “1h”). Defaults to “5m”.
Handler parameters
The validated input object matching the workflow’s input schema.
Workflow state object that persists across workflow executions and steps.
Step function for creating checkpoints. Also provides methods like listen, fail, progress, abort, sleep, sleepUntil, waitForWorkflow, executeWorkflow, map, forEach, batch, and request.
Botpress client for making API calls.
execute
(props: object) => Promise<any>
required
Function to execute autonomous AI logic. See Execute props for the full reference. An abort signal that indicates when the workflow should stop execution.
The current workflow instance. Provides access to id, name, tags, and methods like cancel(), complete(), and setTimeout().