Pipelines
Pipelines provide a middleware system for extending Atlas and Prism without modifying core code. Add logging, authentication, metrics, and more through composable handlers.
Extending Prism
Atlas pipelines are designed to extend Prism's capabilities. Since Atlas wraps Prism, pipelines give you hooks into all Prism operations—text generation, embeddings, images, audio, and moderation—allowing you to add observability, validation, and custom logic around any AI operation.
How Pipelines Work
Pipelines intercept key operations and allow you to:
- Execute code before/after operations
- Modify data flowing through the system
- Short-circuit execution with custom responses
- Add cross-cutting concerns like logging
Available Pipelines
Agent Pipelines
| Pipeline | Trigger |
|---|---|
agent.before_execute | Before agent execution starts |
agent.context.validate | After before_execute, before building the request (validate/modify context) |
agent.after_execute | After agent execution completes |
agent.stream.after | After streaming completes (success or error) |
agent.system_prompt.before_build | Before building system prompt |
agent.system_prompt.after_build | After building system prompt |
agent.on_error | When agent execution fails (supports recovery responses) |
Tool Pipelines
| Pipeline | Trigger |
|---|---|
tool.before_resolve | Before tools are built for an agent (filter/modify tool list) |
tool.after_resolve | After tools are built, before execution (audit/modify Prism tools) |
tool.before_execute | Before tool execution |
tool.after_execute | After tool execution |
tool.on_error | When tool execution fails |
Text Pipelines
| Pipeline | Trigger |
|---|---|
text.before_text | Before text generation |
text.after_text | After text generation |
text.before_stream | Before streaming starts |
text.after_stream | After streaming completes |
Structured Pipelines
| Pipeline | Trigger |
|---|---|
structured.before_structured | Before structured output generation |
structured.after_structured | After structured output generation |
Embeddings Pipelines
| Pipeline | Trigger |
|---|---|
embeddings.before_embeddings | Before generating embeddings |
embeddings.after_embeddings | After generating embeddings |
Image Pipelines
| Pipeline | Trigger |
|---|---|
image.before_generate | Before generating an image |
image.after_generate | After generating an image |
Audio Pipelines
| Pipeline | Trigger |
|---|---|
audio.before_audio | Before text-to-speech conversion |
audio.after_audio | After text-to-speech conversion |
audio.before_text | Before speech-to-text transcription |
audio.after_text | After speech-to-text transcription |
Moderation Pipelines
| Pipeline | Trigger |
|---|---|
moderation.before_moderation | Before content moderation |
moderation.after_moderation | After content moderation |
Creating a Handler
Pipeline handlers must implement PipelineContract:
use Atlasphp\Atlas\Contracts\PipelineContract;
use Closure;
use Illuminate\Support\Facades\Log;
class LogAgentExecution implements PipelineContract
{
public function handle(mixed $data, Closure $next): mixed
{
// Before execution
Log::info('Agent execution started', [
'agent' => $data['agent']->key(),
'input_length' => strlen($data['input']),
]);
// Continue pipeline
$result = $next($data);
// After execution (for agent.after_execute, $result contains 'response')
Log::info('Agent execution completed', [
'agent' => $data['agent']->key(),
]);
return $result;
}
}Registering Handlers
Register handlers in a service provider:
use Atlasphp\Atlas\Pipelines\PipelineRegistry;
public function boot(): void
{
$registry = app(PipelineRegistry::class);
$registry->register(
'agent.after_execute',
LogAgentExecution::class,
priority: 100,
);
}Using Instances
You can also register handler instances directly:
$registry->register('agent.after_execute', new AuditLogHandler(), priority: 50);Defining Pipelines
Optionally define pipelines with metadata:
$registry->define('agent.before_execute', 'Runs before agent execution', active: true);Conditional Execution
Register handlers that only run when a condition is met:
$registry->registerWhen(
'agent.before_execute',
PremiumOnlyHandler::class,
fn(array $data) => $data['context']->getMeta('tier') === 'premium',
priority: 100,
);The condition callback receives the pipeline data and should return true if the handler should run:
// Only run for specific agents
$registry->registerWhen(
'agent.after_execute',
SpecialAgentLogger::class,
fn(array $data) => $data['agent']->key() === 'special-agent',
);
// Only run for authenticated users
$registry->registerWhen(
'tool.before_execute',
AuditToolUsage::class,
fn(array $data) => $data['context']->getMeta('user_id') !== null,
);
// Only run for large inputs
$registry->registerWhen(
'text.before_text',
LogLargeRequests::class,
fn(array $data) => strlen($data['metadata']['prompt'] ?? '') > 1000,
);Conditional handlers can be mixed with regular handlers and respect priority ordering.
Querying the Registry
// Check if a pipeline has handlers
$registry->has('agent.before_execute');
// Get all registered pipeline names
$registry->pipelines();
// Get all pipeline definitions
$registry->definitions();
// Check if a pipeline is active
$registry->active('agent.before_execute');Priority
Handlers run in priority order (highest first):
$registry->register('agent.after_execute', HighPriorityHandler::class, priority: 200);
$registry->register('agent.after_execute', LowPriorityHandler::class, priority: 50);
// HighPriorityHandler runs before LowPriorityHandlerPipeline Data
Each pipeline receives specific data:
agent.before_execute
[
'agent' => AgentContract,
'input' => string,
'context' => ExecutionContext,
]agent.context.validate
Runs after agent.before_execute but before building the system prompt or request. Useful for validating or modifying the execution context.
[
'agent' => AgentContract,
'input' => string,
'context' => ExecutionContext,
]You can modify the context by replacing it in the data array:
class InjectMetadataHandler implements PipelineContract
{
public function handle(mixed $data, Closure $next): mixed
{
// Modify context with injected metadata
$data['context'] = new ExecutionContext(
messages: $data['context']->messages,
variables: $data['context']->variables,
metadata: array_merge($data['context']->metadata, [
'validated_at' => now()->toIso8601String(),
]),
);
return $next($data);
}
}agent.after_execute
[
'agent' => AgentContract,
'input' => string,
'context' => ExecutionContext,
'response' => PrismResponse|StructuredResponse,
'system_prompt' => ?string,
]agent.stream.after
Fires when streaming completes (whether successful or with an error). Useful for analytics, logging stream completion, and cleanup.
[
'agent' => AgentContract,
'input' => string,
'context' => ExecutionContext,
'system_prompt' => ?string,
'events' => array, // All stream events collected
'error' => ?Throwable, // Exception if streaming failed, null on success
]The ExecutionContext provides access to:
messages— Conversation history (may include attachments per message)variables— System prompt variablesmetadata— Execution metadata (user_id, session_id, etc.)prismMedia— Prism media objects for current input (images, documents, audio, video)
agent.system_prompt.before_build
[
'agent' => AgentContract,
'context' => ExecutionContext,
'variables' => array, // Merged global and context variables
]agent.system_prompt.after_build
[
'agent' => AgentContract,
'context' => ExecutionContext,
'prompt' => string, // The built prompt
]tool.before_resolve
Fires before tools are built for an agent. Allows filtering or modifying which tools are available.
[
'agent' => AgentContract,
'tool_classes' => array, // Array of tool class names
'context' => ToolContext,
]Modify tool_classes to filter which tools are available:
class FilterToolsForUser implements PipelineContract
{
public function handle(mixed $data, Closure $next): mixed
{
$allowedTools = $this->getUserAllowedTools($data['context']->getMeta('user_id'));
$data['tool_classes'] = array_filter(
$data['tool_classes'],
fn ($tool) => in_array($tool, $allowedTools)
);
return $next($data);
}
}tool.after_resolve
Fires after tools are built into Prism tool objects. Allows auditing or modifying the final tool list.
[
'agent' => AgentContract,
'tool_classes' => array, // Original tool class names
'prism_tools' => array, // Built Prism Tool objects
'context' => ToolContext,
]tool.before_execute / tool.after_execute
[
'tool' => ToolContract,
'args' => array,
'context' => ToolContext,
]After execute also includes:
result— The ToolResult object
agent.on_error
[
'agent' => AgentContract,
'input' => string,
'context' => ExecutionContext,
'system_prompt' => ?string,
'exception' => Throwable,
]Recovery Support: You can return a recovery response instead of letting the exception propagate:
class ErrorRecoveryHandler implements PipelineContract
{
public function handle(mixed $data, Closure $next): mixed
{
// Optionally provide a recovery response
if ($this->shouldRecover($data['exception'])) {
$data['recovery'] = $this->createFallbackResponse();
}
return $next($data);
}
protected function createFallbackResponse(): PrismResponse
{
return new PrismResponse(
steps: collect([]),
text: 'I apologize, but I encountered an issue. Please try again.',
finishReason: FinishReason::Stop,
toolCalls: [],
toolResults: [],
usage: new Usage(0, 0),
meta: new Meta('fallback', 'fallback'),
messages: collect([]),
additionalContent: [],
);
}
}When a recovery key is set with a valid PrismResponse or StructuredResponse, the exception will not be thrown and the recovery response will be returned instead.
tool.on_error
[
'tool' => ToolContract,
'args' => array,
'context' => ToolContext,
'exception' => Throwable,
]Prism Proxy Pipelines
All Prism proxy pipelines (text, structured, embeddings, image, audio, moderation) receive:
[
'pipeline' => string, // The module name (e.g., 'text', 'image')
'metadata' => array, // Custom metadata passed via withMetadata()
'request' => object, // The Prism pending request object
]After pipelines also include:
response— The Prism response object
Example: Audit Logging
use Atlasphp\Atlas\Contracts\PipelineContract;
use Closure;
class AuditMiddleware implements PipelineContract
{
public function handle(mixed $data, Closure $next): mixed
{
$result = $next($data);
AuditLog::create([
'type' => 'agent_execution',
'agent' => $data['agent']->key(),
'user_id' => $data['context']?->getMeta('user_id'),
'created_at' => now(),
]);
return $result;
}
}Example: Dynamic System Prompt
use Atlasphp\Atlas\Contracts\PipelineContract;
use Closure;
class AddTimestampToPrompt implements PipelineContract
{
public function handle(mixed $data, Closure $next): mixed
{
// Modify the built prompt
$timestamp = now()->toDateTimeString();
$data['prompt'] .= "\n\nCurrent time: {$timestamp}";
return $next($data);
}
}
$registry->register('agent.system_prompt.after_build', AddTimestampToPrompt::class);Example: Tool Rate Limiting
use Atlasphp\Atlas\Contracts\PipelineContract;
use Atlasphp\Atlas\Tools\Support\ToolResult;
use Closure;
use Illuminate\Support\Facades\RateLimiter;
class RateLimitTools implements PipelineContract
{
public function handle(mixed $data, Closure $next): mixed
{
$userId = $data['context']->getMeta('user_id');
$toolName = $data['tool']->name();
$key = "tool:{$userId}:{$toolName}";
if (RateLimiter::tooManyAttempts($key, maxAttempts: 10)) {
$data['result'] = ToolResult::error('Rate limit exceeded. Try again later.');
return $data;
}
RateLimiter::hit($key, decaySeconds: 60);
return $next($data);
}
}
$registry->register('tool.before_execute', RateLimitTools::class);Example: Authentication Check
use Atlasphp\Atlas\Contracts\PipelineContract;
use Closure;
class RequireAuthentication implements PipelineContract
{
public function handle(mixed $data, Closure $next): mixed
{
$userId = $data['context']?->getMeta('user_id');
if (! $userId) {
throw new UnauthorizedException('User must be authenticated');
}
return $next($data);
}
}
$registry->register('agent.before_execute', RequireAuthentication::class, priority: 1000);Example: Attachment Auditing
Log multimodal attachments (images, documents, audio, video) for compliance and monitoring:
use Atlasphp\Atlas\Contracts\PipelineContract;
use Closure;
class AuditAttachments implements PipelineContract
{
public function handle(mixed $data, Closure $next): mixed
{
$context = $data['context'];
// Log current input attachments (Prism media objects)
if ($context?->hasAttachments()) {
foreach ($context->prismMedia as $media) {
AuditLog::create([
'type' => 'attachment_sent',
'media_type' => get_class($media),
'user_id' => $context->getMeta('user_id'),
'agent' => $data['agent']->key(),
'timestamp' => now(),
]);
}
}
return $next($data);
}
}
$registry->register('agent.before_execute', AuditAttachments::class, priority: 500);See Chat Attachments for complete attachment documentation.
Example: Token Usage Logging
Log token usage for direct Prism text generation:
use Atlasphp\Atlas\Contracts\PipelineContract;
use Closure;
use Illuminate\Support\Facades\Log;
class LogTokenUsage implements PipelineContract
{
public function handle(mixed $data, Closure $next): mixed
{
$result = $next($data);
$response = $result['response'];
$metadata = $data['metadata'];
Log::channel('usage')->info('Text generation completed', [
'user_id' => $metadata['user_id'] ?? null,
'prompt_tokens' => $response->usage->promptTokens,
'completion_tokens' => $response->usage->completionTokens,
'total_tokens' => $response->usage->promptTokens + $response->usage->completionTokens,
]);
return $result;
}
}
$registry->register('text.after_text', LogTokenUsage::class);Usage with metadata:
$response = Atlas::text()
->using('openai', 'gpt-4o')
->withMetadata(['user_id' => auth()->id()])
->withPrompt('Explain quantum computing')
->asText();Example: Caching Embeddings
Cache embeddings to reduce API calls. Use metadata to pass a cache key:
use Atlasphp\Atlas\Contracts\PipelineContract;
use Closure;
use Illuminate\Support\Facades\Cache;
class CacheEmbeddings implements PipelineContract
{
public function handle(mixed $data, Closure $next): mixed
{
$cacheKey = $data['metadata']['cache_key'] ?? null;
if ($cacheKey && Cache::has($cacheKey)) {
$data['response'] = Cache::get($cacheKey);
return $data;
}
$result = $next($data);
if ($cacheKey) {
Cache::put($cacheKey, $result['response'], now()->addDay());
}
return $result;
}
}
$registry->register('embeddings.before_embeddings', CacheEmbeddings::class);Usage:
$cacheKey = 'embeddings:' . md5($text);
$response = Atlas::embeddings()
->using('openai', 'text-embedding-3-small')
->withMetadata(['cache_key' => $cacheKey])
->fromInput($text)
->asEmbeddings();Disabling Pipelines
Temporarily disable a pipeline:
$registry->setActive('agent.before_execute', false);
// Pipeline won't run
$response = Atlas::agent('agent')->chat('input');
// Re-enable
$registry->setActive('agent.before_execute', true);Next Steps
- Error Handling — Handle pipeline errors