Streaming
Baleybots support event-based streaming in real-time. As the LLM generates output, you receive typed StreamEvent objects for text, structured JSON, streaming tool-call arguments, and tool execution results.
StreamEvent types
type StreamEvent =
// LLM STREAMING EVENTS - emitted as the LLM generates tool calls
| { type: 'tool_call_stream_start'; id: string; toolName: string }
| { type: 'tool_call_arguments_delta'; id: string; toolName: string; argumentsDelta: string }
| { type: 'tool_call_stream_complete'; id: string; toolName: string; arguments: unknown }
// TOOL EXECUTION EVENTS - emitted when actually executing the tool function
| { type: 'tool_execution_start'; toolName: string; arguments: unknown }
| { type: 'tool_execution_output'; toolName: string; result: unknown; error?: string }
| { type: 'tool_validation_error'; toolName: string; validationErrors: unknown; receivedArguments: unknown }
// TEXT AND STRUCTURED OUTPUT EVENTS
| { type: 'text_delta'; content: string }
| { type: 'structured_output_delta'; content: string }
| { type: 'reasoning'; content: string }
// ERROR EVENTS
| { type: 'error'; error: Error }
// NESTED STREAMING EVENTS
| { type: 'tool_execution_stream'; toolName: string; nestedEvent: StreamEvent; childBotName?: string };
Basic usage
Use the onToken callback in ProcessOptions. You get both streaming events and the final result:
import { Baleybot } from '@baleybots/core';
import { z } from 'zod';
const chatBot = Baleybot.create({
name: 'chat-bot',
goal: 'Generate a friendly response',
outputSchema: z.object({
response: z.string()
})
});
const result = await chatBot.process('Hello!', {
onToken: (botName, event) => {
if (event.type === 'text_delta') {
process.stdout.write(event.content);
}
}
});
console.log('Final response:', result.response);
The onToken callback accepts either a function or a TokenHandlers object:
// Function form
onToken: (botName: string, event: StreamEvent) => void
// Object form
onToken: {
onTextDelta(botName: string, event: StreamEvent): void;
// ... other event handlers
}
All ProcessOptions callbacks
interface ProcessOptions {
onStart?: (botName: string) => void;
onToken?: ((botName: string, event: StreamEvent) => void) | TokenHandlers;
onComplete?: (botName: string, result: unknown) => void;
onError?: (botName: string, error: Error) => void;
}
Full example using all callbacks:
const result = await bot.process(input, {
onStart: (botName) => {
console.log(`${botName} started...`);
},
onToken: (botName, event) => {
if (event.type === 'text_delta') {
process.stdout.write(event.content);
}
},
onComplete: (botName, result) => {
console.log(`\n${botName} complete`);
},
onError: (botName, error) => {
console.error(`${botName} error:`, error);
}
});
console.log('Final response:', result);
Streaming tool calls
await agent.process('Compute 5 + 3', {
onToken: (_name, e) => {
switch (e.type) {
case 'tool_call_stream_start':
console.log('Tool:', e.toolName);
break;
case 'tool_call_arguments_delta':
process.stdout.write(e.argumentsDelta);
break;
case 'tool_call_stream_complete':
console.log('Args:', e.arguments);
break;
case 'tool_execution_start':
console.log('Executing:', e.toolName);
break;
case 'tool_execution_output':
console.log('Output:', e.result ?? e.error);
break;
}
}
});
Streaming through pipelines
Pipelines automatically pass ProcessOptions to each bot in the chain:
import { pipeline } from '@baleybots/core';
const chain = pipeline()
.step(bot1)
.step(bot2)
.step(bot3)
.build();
const result = await chain.process(input, {
onStart: (botName) => {
console.log(`\n${botName} starting...`);
},
onToken: (botName, event) => {
if (event.type === 'text_delta') {
process.stdout.write(event.content);
}
},
onComplete: (botName, result) => {
console.log(`\n${botName} done`);
}
});
console.log('Chain result:', result);
Events arrive from each bot in sequence. When using ParallelMerge, events from parallel bots may interleave.
Tips
Buffer management
If displaying partial JSON, be careful with incomplete data:
let buffer = '';
onToken: (_, event) => {
if (event.type === 'text_delta' || event.type === 'structured_output_delta') {
buffer += event.content;
if (buffer.endsWith('}')) {
try {
const parsed = JSON.parse(buffer);
// Display parsed data
} catch {
// Not valid yet
}
}
}
}
Terminal output
For CLI apps, use process.stdout.write() instead of console.log() to avoid extra newlines:
onToken: (_, event) => {
if (event.type === 'text_delta') {
process.stdout.write(event.content);
}
}
Async callbacks
Callbacks can be async if you need to do async work:
onToken: async (botName, event) => {
if (event.type === 'text_delta') {
await saveTokenToDatabase(botName, event.content);
}
}
Key benefits
- Real-time feedback -- users see progress immediately instead of waiting for the full response.
- Same validation -- whether streaming or not, output is validated against the schema.
- Same interface -- just add an
onTokencallback; everything else stays the same. - Works everywhere -- single bots, chains via
pipeline(),ParallelMerge, and all composition patterns.