Skip to main content

Class: ParallelMerge<TFanOut, TParallel, TMerge>

Defined in: packages/core/src/patterns/parallel-merge.ts:84

ParallelMerge Pattern

Fan-out → Parallel Processing → Fan-in

Flow:

  1. Fan-out: Split/prepare input for parallel bots
  2. Parallel: Run multiple bots concurrently
  3. Merge: Combine results into final output

Example

const parallel = new ParallelMerge(
splitterBot, // Fan-out: prepares inputs
[bot1, bot2, bot3], // Parallel: run concurrently
mergerBot // Fan-in: combines results
);

const result = await parallel.process(input);

Type Parameters

TFanOut

TFanOut extends Processable = Processable

Type of the fan-out bot

TParallel

TParallel extends Processable = Processable

Type of the parallel bots array

TMerge

TMerge extends Processable = Processable

Type of the merge bot

Constructors

Constructor

new ParallelMerge<TFanOut, TParallel, TMerge>(fanOut, parallel, merge, processorNames?, failureMode?): ParallelMerge<TFanOut, TParallel, TMerge>

Defined in: packages/core/src/patterns/parallel-merge.ts:96

Parameters

fanOut

TFanOut

parallel

TParallel[]

merge

TMerge

processorNames?

string[]

failureMode?

"fail-fast" | "settle"

Returns

ParallelMerge<TFanOut, TParallel, TMerge>

Methods

getBotNames()

getBotNames(): string[]

Defined in: packages/core/src/patterns/parallel-merge.ts:387

Returns

string[]


getId()

getId(): string

Defined in: packages/core/src/patterns/parallel-merge.ts:347

Returns

string


getName()

getName(): string

Defined in: packages/core/src/patterns/parallel-merge.ts:328

Returns

string


getProcessorNames()

getProcessorNames(): string[] | undefined

Defined in: packages/core/src/patterns/parallel-merge.ts:336

Returns

string[] | undefined


getProcessors()

getProcessors(): TParallel[]

Defined in: packages/core/src/patterns/parallel-merge.ts:332

Returns

TParallel[]


getSchema()

getSchema(): unknown

Defined in: packages/core/src/patterns/parallel-merge.ts:340

Returns

unknown


process()

process(input, options?): Promise<InferSchemaOutput<TMerge extends Processable<unknown, TOutput> ? TOutput : unknown>>

Defined in: packages/core/src/patterns/parallel-merge.ts:253

Process input and return structured output

You can use BOTH streaming events AND final result together:

  • Streaming events: Provide options.onToken to receive real-time events during execution
  • Final result: Returned when Promise resolves (complete parsed response)

Both work simultaneously - streaming events fire in real-time, then final result is returned.

Parameters

input

unknown

Input data (typed for safety)

options?

ProcessOptions

Optional callbacks for streaming, completion, errors

Returns

Promise<InferSchemaOutput<TMerge extends Processable<unknown, TOutput> ? TOutput : unknown>>

Promise resolving to structured output (inferred from TOutput if it's a Zod schema)


subscribeToAll()

subscribeToAll(options?): Subscription

Defined in: packages/core/src/patterns/parallel-merge.ts:351

Parameters

options?
omit?

string[]

onComplete?

(botId, botName, output) => void

onError?

(botId, botName, event) => void

onProgressUpdate?

(botId, botName, event) => void

onStreamEvent?

(botId, botName, event) => void

Returns

Subscription


run()

static run<TInput, TOutput, TProcessors>(processors, options?): Processable<TInput, TOutput> & (input, options?) => Promise<InferSchemaOutput<TOutput>> & ParallelMerge<Processable<unknown, unknown>, Processable<unknown, unknown>, Processable<ParallelMergeInput<Record<string, unknown> | unknown[], unknown, unknown>, TOutput>>

Defined in: packages/core/src/patterns/parallel-merge.ts:138

Simple parallel execution with optional input/merge transformations

Type Parameters

TInput

TInput = unknown

TOutput

TOutput = unknown

TProcessors

TProcessors extends Processable<unknown, unknown>[] = Processable<unknown, unknown>[]

Parameters

processors

TProcessors

options?
failureMode?

"fail-fast" | "settle"

input?

(data) => unknown

merge?

((results) => TOutput) | ((input) => TOutput)

Returns

Processable<TInput, TOutput> & (input, options?) => Promise<InferSchemaOutput<TOutput>> & ParallelMerge<Processable<unknown, unknown>, Processable<unknown, unknown>, Processable<ParallelMergeInput<Record<string, unknown> | unknown[], unknown, unknown>, TOutput>>

Example

// Run processors in parallel, get array of results
const runner = ParallelMerge.run([bot1, bot2, bot3]);
const results = await runner.process(input);

// With custom merge function
const runner = ParallelMerge.run([v1, v2, v3], {
merge: (results) => ({
isValid: results.every(r => r.valid),
errors: results.flatMap(r => r.errors)
})
});

// With input transformation
const runner = ParallelMerge.run([bot1, bot2], {
input: (data) => data.message
});

runNamed()

static runNamed<TProcessors, TOutput>(processors, options?): Processable<unknown, TOutput> & (input, options?) => Promise<InferSchemaOutput<TOutput>> & ParallelMerge<Processable<unknown, unknown>, Processable<unknown, unknown>, Processable<ParallelMergeInput<Record<string, unknown> | unknown[], unknown, unknown>, TOutput>>

Defined in: packages/core/src/patterns/parallel-merge.ts:211

Parallel execution with named results object

Type Parameters

TProcessors

TProcessors extends Record<string, Processable<unknown, unknown>>

TOutput

TOutput = { [K in string | number | symbol]: Awaited<ReturnType<TProcessors[K]["process"]>> }

Parameters

processors

TProcessors

options?
failureMode?

"fail-fast" | "settle"

input?

(data) => unknown

Returns

Processable<unknown, TOutput> & (input, options?) => Promise<InferSchemaOutput<TOutput>> & ParallelMerge<Processable<unknown, unknown>, Processable<unknown, unknown>, Processable<ParallelMergeInput<Record<string, unknown> | unknown[], unknown, unknown>, TOutput>>

Example

// Returns { spam: {...}, moderation: {...}, tone: {...} }
const runner = ParallelMerge.runNamed({
spam: detectSpam,
moderation: moderateContent,
tone: analyzeTone
});

// With input transformation
const runner = ParallelMerge.runNamed({
spam: detectSpam,
tone: analyzeTone
}, {
input: (data) => data.message
});