Skip to main content

Function: pipeline()

pipeline<TInput, TSteps>(...steps): Processable<TInput, InferPipelineOutput<TSteps, TInput>>

Defined in: packages/core/src/pipeline/pipeline.ts:148

Create a functional pipeline that composes multiple steps into a single processable. Steps are executed sequentially, with the output of each step flowing to the next.

Features:

  • Automatic type inference - TypeScript infers the final output type from the step chain
  • Auto-wrapping - Plain functions are automatically wrapped as processables
  • Full error tracing - Errors include complete pipeline trace showing execution path
  • Event forwarding - Streaming events from nested bots are forwarded to subscribers
  • Bot name collection - Collects all bot names for subscription filtering

Type Parameters

TInput

TInput = unknown

Input type for the pipeline

TSteps

TSteps extends readonly PipelineStep[] = readonly PipelineStep[]

Readonly tuple of pipeline steps (for type inference)

Parameters

steps

...TSteps

Variable number of pipeline steps (processables, functions, or step objects)

Returns

Processable<TInput, InferPipelineOutput<TSteps, TInput>>

Processable that executes all steps sequentially

Examples

// Simple function composition
const myPipeline = pipeline(
validateInput,
calculateStats,
saveToDatabase
);

const result = await myPipeline.process(data);
// With conditional branching
const processBlogPost = pipeline<BlogPost>(
validatePost,
calculateReadingStats,
when({
condition: 'this post is high quality',
onPass: publishImmediately,
onFail: scheduleForReview
}),
notifyAuthor
);
// With loops and parallel execution
const enrichContent = pipeline<Content>(
loadDraft,
loop({
condition: (result, state) => result.qualityScore < 8 && state.iteration < 5,
body: improveContentBot,
maxIterations: 5
}),
parallel({
processors: {
sentiment: analyzeSentiment,
tags: generateTags,
category: categorize
},
merge: (mergeInput) => ({
...mergeInput.original,
...mergeInput.results
})
}),
publish
);
// Type inference through complex chains
const transform = pipeline(
(input: string) => input.length, // string -> number
(num: number) => num > 10, // number -> boolean
(bool: boolean) => bool ? 'long' : 'short' // boolean -> string
);
// transform has type: Processable<string, string>
// Error handling with full trace
try {
await pipeline(
step1,
when({ condition: check, onPass: step2 }),
step3
).process(input);
} catch (error) {
if (error instanceof PipelineError) {
console.log(error.formatTrace());
// Output: Pipeline error at step 1 (conditional:check) → onPass → step 2 (processable:step2): Validation failed
}
}
// Recursive processing
interface Comment {
text: string;
replies?: Comment[];
}

const processComments = pipeline<Comment>(
recursiveLoop({
condition: (result, depth) => result.replies && depth < 10,
body: async (comment, recurse) => {
const processed = await analyzeComment(comment);
if (comment.replies) {
processed.replies = await Promise.all(
comment.replies.map(recurse)
);
}
return processed;
},
maxIterations: 100
})
);