Class: ParallelMerge<TFanOut, TParallel, TMerge>
Defined in: packages/core/src/patterns/parallel-merge.ts:84
ParallelMerge Pattern
Fan-out → Parallel Processing → Fan-in
Flow:
- Fan-out: Split/prepare input for parallel bots
- Parallel: Run multiple bots concurrently
- Merge: Combine results into final output
Example
const parallel = new ParallelMerge(
splitterBot, // Fan-out: prepares inputs
[bot1, bot2, bot3], // Parallel: run concurrently
mergerBot // Fan-in: combines results
);
const result = await parallel.process(input);
Type Parameters
TFanOut
TFanOut extends Processable = Processable
Type of the fan-out bot
TParallel
TParallel extends Processable = Processable
Type of the parallel bots array
TMerge
TMerge extends Processable = Processable
Type of the merge bot
Constructors
Constructor
new ParallelMerge<
TFanOut,TParallel,TMerge>(fanOut,parallel,merge,processorNames?,failureMode?):ParallelMerge<TFanOut,TParallel,TMerge>
Defined in: packages/core/src/patterns/parallel-merge.ts:96
Parameters
fanOut
TFanOut
parallel
TParallel[]
merge
TMerge
processorNames?
string[]
failureMode?
"fail-fast" | "settle"
Returns
ParallelMerge<TFanOut, TParallel, TMerge>
Methods
getBotNames()
getBotNames():
string[]
Defined in: packages/core/src/patterns/parallel-merge.ts:387
Returns
string[]
getId()
getId():
string
Defined in: packages/core/src/patterns/parallel-merge.ts:347
Returns
string
getName()
getName():
string
Defined in: packages/core/src/patterns/parallel-merge.ts:328
Returns
string
getProcessorNames()
getProcessorNames():
string[] |undefined
Defined in: packages/core/src/patterns/parallel-merge.ts:336
Returns
string[] | undefined
getProcessors()
getProcessors():
TParallel[]
Defined in: packages/core/src/patterns/parallel-merge.ts:332
Returns
TParallel[]
getSchema()
getSchema():
unknown
Defined in: packages/core/src/patterns/parallel-merge.ts:340
Returns
unknown
process()
process(
input,options?):Promise<InferSchemaOutput<TMergeextendsProcessable<unknown,TOutput> ?TOutput:unknown>>
Defined in: packages/core/src/patterns/parallel-merge.ts:253
Process input and return structured output
You can use BOTH streaming events AND final result together:
- Streaming events: Provide
options.onTokento receive real-time events during execution - Final result: Returned when Promise resolves (complete parsed response)
Both work simultaneously - streaming events fire in real-time, then final result is returned.
Parameters
input
unknown
Input data (typed for safety)
options?
Optional callbacks for streaming, completion, errors
Returns
Promise<InferSchemaOutput<TMerge extends Processable<unknown, TOutput> ? TOutput : unknown>>
Promise resolving to structured output (inferred from TOutput if it's a Zod schema)
subscribeToAll()
subscribeToAll(
options?):Subscription
Defined in: packages/core/src/patterns/parallel-merge.ts:351
Parameters
options?
omit?
string[]
onComplete?
(botId, botName, output) => void
onError?
(botId, botName, event) => void
onProgressUpdate?
(botId, botName, event) => void
onStreamEvent?
(botId, botName, event) => void
Returns
run()
staticrun<TInput,TOutput,TProcessors>(processors,options?):Processable<TInput,TOutput> & (input,options?) =>Promise<InferSchemaOutput<TOutput>> &ParallelMerge<Processable<unknown,unknown>,Processable<unknown,unknown>,Processable<ParallelMergeInput<Record<string,unknown> |unknown[],unknown,unknown>,TOutput>>
Defined in: packages/core/src/patterns/parallel-merge.ts:138
Simple parallel execution with optional input/merge transformations
Type Parameters
TInput
TInput = unknown
TOutput
TOutput = unknown
TProcessors
TProcessors extends Processable<unknown, unknown>[] = Processable<unknown, unknown>[]
Parameters
processors
TProcessors
options?
failureMode?
"fail-fast" | "settle"
input?
(data) => unknown
merge?
((results) => TOutput) | ((input) => TOutput)
Returns
Processable<TInput, TOutput> & (input, options?) => Promise<InferSchemaOutput<TOutput>> & ParallelMerge<Processable<unknown, unknown>, Processable<unknown, unknown>, Processable<ParallelMergeInput<Record<string, unknown> | unknown[], unknown, unknown>, TOutput>>
Example
// Run processors in parallel, get array of results
const runner = ParallelMerge.run([bot1, bot2, bot3]);
const results = await runner.process(input);
// With custom merge function
const runner = ParallelMerge.run([v1, v2, v3], {
merge: (results) => ({
isValid: results.every(r => r.valid),
errors: results.flatMap(r => r.errors)
})
});
// With input transformation
const runner = ParallelMerge.run([bot1, bot2], {
input: (data) => data.message
});
runNamed()
staticrunNamed<TProcessors,TOutput>(processors,options?):Processable<unknown,TOutput> & (input,options?) =>Promise<InferSchemaOutput<TOutput>> &ParallelMerge<Processable<unknown,unknown>,Processable<unknown,unknown>,Processable<ParallelMergeInput<Record<string,unknown> |unknown[],unknown,unknown>,TOutput>>
Defined in: packages/core/src/patterns/parallel-merge.ts:211
Parallel execution with named results object
Type Parameters
TProcessors
TProcessors extends Record<string, Processable<unknown, unknown>>
TOutput
TOutput = { [K in string | number | symbol]: Awaited<ReturnType<TProcessors[K]["process"]>> }
Parameters
processors
TProcessors
options?
failureMode?
"fail-fast" | "settle"
input?
(data) => unknown
Returns
Processable<unknown, TOutput> & (input, options?) => Promise<InferSchemaOutput<TOutput>> & ParallelMerge<Processable<unknown, unknown>, Processable<unknown, unknown>, Processable<ParallelMergeInput<Record<string, unknown> | unknown[], unknown, unknown>, TOutput>>
Example
// Returns { spam: {...}, moderation: {...}, tone: {...} }
const runner = ParallelMerge.runNamed({
spam: detectSpam,
moderation: moderateContent,
tone: analyzeTone
});
// With input transformation
const runner = ParallelMerge.runNamed({
spam: detectSpam,
tone: analyzeTone
}, {
input: (data) => data.message
});