Function: pipeline()
pipeline<
TInput,TSteps>(...steps):Processable<TInput,InferPipelineOutput<TSteps,TInput>>
Defined in: packages/core/src/pipeline/pipeline.ts:148
Create a functional pipeline that composes multiple steps into a single processable. Steps are executed sequentially, with the output of each step flowing to the next.
Features:
- Automatic type inference - TypeScript infers the final output type from the step chain
- Auto-wrapping - Plain functions are automatically wrapped as processables
- Full error tracing - Errors include complete pipeline trace showing execution path
- Event forwarding - Streaming events from nested bots are forwarded to subscribers
- Bot name collection - Collects all bot names for subscription filtering
Type Parameters
TInput
TInput = unknown
Input type for the pipeline
TSteps
TSteps extends readonly PipelineStep[] = readonly PipelineStep[]
Readonly tuple of pipeline steps (for type inference)
Parameters
steps
...TSteps
Variable number of pipeline steps (processables, functions, or step objects)
Returns
Processable<TInput, InferPipelineOutput<TSteps, TInput>>
Processable that executes all steps sequentially
Examples
// Simple function composition
const myPipeline = pipeline(
validateInput,
calculateStats,
saveToDatabase
);
const result = await myPipeline.process(data);
// With conditional branching
const processBlogPost = pipeline<BlogPost>(
validatePost,
calculateReadingStats,
when({
condition: 'this post is high quality',
onPass: publishImmediately,
onFail: scheduleForReview
}),
notifyAuthor
);
// With loops and parallel execution
const enrichContent = pipeline<Content>(
loadDraft,
loop({
condition: (result, state) => result.qualityScore < 8 && state.iteration < 5,
body: improveContentBot,
maxIterations: 5
}),
parallel({
processors: {
sentiment: analyzeSentiment,
tags: generateTags,
category: categorize
},
merge: (mergeInput) => ({
...mergeInput.original,
...mergeInput.results
})
}),
publish
);
// Type inference through complex chains
const transform = pipeline(
(input: string) => input.length, // string -> number
(num: number) => num > 10, // number -> boolean
(bool: boolean) => bool ? 'long' : 'short' // boolean -> string
);
// transform has type: Processable<string, string>
// Error handling with full trace
try {
await pipeline(
step1,
when({ condition: check, onPass: step2 }),
step3
).process(input);
} catch (error) {
if (error instanceof PipelineError) {
console.log(error.formatTrace());
// Output: Pipeline error at step 1 (conditional:check) → onPass → step 2 (processable:step2): Validation failed
}
}
// Recursive processing
interface Comment {
text: string;
replies?: Comment[];
}
const processComments = pipeline<Comment>(
recursiveLoop({
condition: (result, depth) => result.replies && depth < 10,
body: async (comment, recurse) => {
const processed = await analyzeComment(comment);
if (comment.replies) {
processed.replies = await Promise.all(
comment.replies.map(recurse)
);
}
return processed;
},
maxIterations: 100
})
);