Skip to main content

Function: parallel()

parallel<TInput, TOutput>(config): ParallelStep<TInput, TOutput>

Defined in: packages/core/src/pipeline/helpers.ts:354

Create a parallel execution step that fans out to multiple processors. Supports both object mode (named results) and array mode (ordered results).

Type Parameters

TInput

TInput

Input type for all processors

TOutput

TOutput

Output type from merge function

Parameters

config

ParallelConfig<TInput, TOutput>

Configuration object with processors and optional merge function

Returns

ParallelStep<TInput, TOutput>

ParallelStep object for use in pipelines

Example

// Object mode with named results
const enrichPost = pipeline(
loadPost,
parallel({
processors: {
sentiment: analyzeSentiment,
category: categorizePost,
tags: generateTags,
seoScore: async (post) => {
const result = await checkSEO(post);
return result.score;
}
},
merge: (mergeInput) => ({
...mergeInput.original,
sentiment: mergeInput.results.sentiment,
category: mergeInput.results.category,
tags: mergeInput.results.tags,
seoScore: mergeInput.results.seoScore
})
}),
savePost
);

// Array mode with ordered results
const runValidations = pipeline(
loadContent,
parallel({
processors: [
checkSpelling,
checkGrammar,
checkReadability
],
merge: (mergeInput) => ({
allPassed: mergeInput.results.every(r => r.passed),
avgScore: mergeInput.results.reduce((sum, r) => sum + r.score, 0) / mergeInput.results.length,
details: mergeInput.results
})
})
);

// With input transformation
const analyzeMetrics = pipeline(
loadData,
parallel({
processors: [calculateAverage, calculateMedian, calculateStdDev],
input: (data) => data.values, // Extract just the values array
merge: (mergeInput) => ({
...mergeInput.original,
stats: {
avg: mergeInput.results[0],
median: mergeInput.results[1],
stdDev: mergeInput.results[2]
}
})
})
);