Overview
The Python SDK provides full control over pipeline creation, execution, and monitoring. Pipelines are DAG-based workflows that automate deployment tasks.
Creating Pipelines
Basic Pipeline
from budai import BudClient
client = BudClient( api_key = "your-key" )
pipeline = client.pipelines.create(
name = "Deploy Model" ,
description = "Automated model deployment workflow" ,
definition = {
"steps" : [
{
"id" : "add_model" ,
"action" : "model_add" ,
"params" : {
"model_uri" : "meta-llama/Llama-3.2-1B-Instruct" ,
"model_source" : "hugging_face"
}
},
{
"id" : "deploy" ,
"action" : "deployment_create" ,
"params" : {
"model_id" : " {{ steps.add_model.output.model_id }} " ,
"cluster_id" : "cluster_prod" ,
"deployment_name" : "llama-deployment"
},
"depends_on" : [ "add_model" ]
}
]
}
)
print ( f "Pipeline created: { pipeline.id } " )
With Error Handling
pipeline = client.pipelines.create(
name = "Resilient Deployment" ,
definition = {
"steps" : [
{
"id" : "health_check" ,
"action" : "cluster_health" ,
"params" : { "cluster_id" : "cluster_prod" },
"retry" : {
"max_attempts" : 3 ,
"backoff_multiplier" : 2
}
},
{
"id" : "deploy" ,
"action" : "deployment_create" ,
"params" : { ... },
"depends_on" : [ "health_check" ],
"condition" : "steps.health_check.output.status == 'healthy'"
},
{
"id" : "notify_failure" ,
"action" : "notification" ,
"params" : {
"message" : "Deployment failed" ,
"channel" : "slack"
},
"condition" : "steps.deploy.status == 'failed'"
}
]
}
)
Actions in Pipelines
Actions are the building blocks of pipelines. Each action represents a discrete step in your workflow.
Action Structure
Every action has:
Unique ID - Identifies the action in the DAG
Type - Determines what the action does (e.g., log, transform, api_call)
Configuration - Action-specific settings via params
Dependencies - Which actions must complete first via depends_on
Common Action Types
# Log action
{
"id" : "logger" ,
"action" : "log" ,
"params" : {
"message" : "Processing started" ,
"level" : "info"
}
}
# API call action
{
"id" : "fetch_data" ,
"action" : "api_call" ,
"params" : {
"url" : "https://api.example.com/data" ,
"method" : "POST" ,
"headers" : { "Content-Type" : "application/json" }
}
}
# Conditional branching
{
"id" : "check_status" ,
"action" : "condition" ,
"params" : {
"condition" : "value > 100" ,
"on_true" : "path_a" ,
"on_false" : "path_b"
}
}
See the Actions Reference for a complete list of available action types.
Managing Pipelines
Listing Pipelines
# Get all pipelines
pipelines = client.pipelines.list()
for pipeline in pipelines:
print ( f " { pipeline.id } : { pipeline.name } " )
# Filter by name
pipelines = client.pipelines.list( name_contains = "deploy" )
# Pagination
pipelines = client.pipelines.list( limit = 50 , offset = 0 )
Getting Pipeline Details
pipeline = client.pipelines.get( "pipe_abc123" )
print ( f "Name: { pipeline.name } " )
print ( f "Created: { pipeline.created_at } " )
print ( f "Steps: { len (pipeline.definition[ 'steps' ]) } " )
print ( f "Status: { pipeline.status } " )
Updating Pipelines
# Update pipeline definition
updated = client.pipelines.update(
pipeline_id = "pipe_abc123" ,
definition = {
"steps" : [
# Updated step definitions
]
}
)
# Update metadata only
updated = client.pipelines.update(
pipeline_id = "pipe_abc123" ,
name = "New Pipeline Name" ,
description = "Updated description"
)
Deleting Pipelines
client.pipelines.delete( pipeline_id = "pipe_abc123" )
Deleting a pipeline does not affect existing executions or created resources (deployments, models, etc.).
Executing Pipelines
Synchronous Execution
# Execute and wait for completion
execution = client.executions.create(
pipeline_id = "pipe_abc123" ,
wait = True
)
if execution.status == "completed" :
print ( f "Success! Output: { execution.outputs } " )
else :
print ( f "Failed: { execution.error } " )
Asynchronous Execution
# Start execution without waiting
execution = client.executions.create(
pipeline_id = "pipe_abc123" ,
wait = False
)
print ( f "Execution started: { execution.id } " )
# Check status later
execution = client.executions.get(execution.id)
print ( f "Status: { execution.status } " )
With Parameters
execution = client.executions.create(
pipeline_id = "pipe_abc123" ,
params = {
"cluster_id" : "cluster_prod" ,
"model_id" : "model_xyz789" ,
"replicas" : 3 ,
"environment" : "production"
}
)
Execution Priority
# High priority - runs immediately
urgent = client.executions.create(
pipeline_id = "pipe_abc123" ,
priority = "high"
)
# Low priority - runs when resources available
background = client.executions.create(
pipeline_id = "pipe_batch_process" ,
priority = "low"
)
Monitoring Executions
Listing Executions
# Get all executions
executions = client.executions.list()
# Filter by pipeline
pipeline_runs = client.executions.list( pipeline_id = "pipe_abc123" )
# Filter by status
failed = client.executions.list( status = "failed" )
# Filter by date
recent = client.executions.list( created_after = "2024-01-01" )
Execution Status
Status Description pendingQueued for execution runningCurrently executing completedFinished successfully failedExecution failed cancelledManually cancelled timeoutExceeded time limit
Polling for Completion
import time
execution = client.executions.create(
pipeline_id = "pipe_abc123" ,
wait = False
)
while execution.status in [ "pending" , "running" ]:
time.sleep( 5 )
execution = client.executions.get(execution.id)
print ( f "Status: { execution.status } ( { execution.progress } %)" )
print ( f "Final status: { execution.status } " )
Getting Execution Details
execution = client.executions.get( "exec_abc123" )
print ( f "Pipeline: { execution.pipeline_id } " )
print ( f "Status: { execution.status } " )
print ( f "Started: { execution.started_at } " )
print ( f "Completed: { execution.completed_at } " )
print ( f "Duration: { execution.duration_seconds } s" )
Execution Outputs
Pipeline Outputs
execution = client.executions.get( "exec_abc123" )
# Access pipeline-level outputs
print (execution.outputs)
# {
# "deployment_id": "deploy_xyz",
# "endpoint_url": "https://...",
# "status": "success"
# }
# Access specific output
deployment_id = execution.outputs.get( "deployment_id" )
Step Outputs
# Access individual step outputs
for step_id, step in execution.steps.items():
print ( f " { step_id } :" )
print ( f " Status: { step.status } " )
print ( f " Output: { step.output } " )
print ( f " Duration: { step.duration_seconds } s" )
Execution Logs
Retrieving Logs
# Get all logs
logs = client.executions.logs( execution_id = "exec_abc123" )
for log in logs:
print ( f "[ { log.timestamp } ] [ { log.level } ] { log.message } " )
Filtered Logs
# Only errors
error_logs = client.executions.logs(
execution_id = "exec_abc123" ,
level = "error"
)
# Specific step
step_logs = client.executions.logs(
execution_id = "exec_abc123" ,
step_id = "deploy"
)
# Time range
recent_logs = client.executions.logs(
execution_id = "exec_abc123" ,
since = "5m" # Last 5 minutes
)
Streaming Logs
# Stream logs in real-time
logs = client.executions.logs(
execution_id = "exec_abc123" ,
follow = True
)
for log in logs:
print ( f " { log.message } " )
if log.level == "error" :
# Handle errors immediately
alert_team(log.message)
Managing Executions
Cancelling Executions
# Cancel running execution
client.executions.cancel( execution_id = "exec_abc123" )
# Verify cancellation
execution = client.executions.get( "exec_abc123" )
print (execution.status) # "cancelled"
Retrying Failed Executions
# Retry with same parameters
new_execution = client.executions.retry( execution_id = "exec_abc123" )
# Retry with different parameters
new_execution = client.executions.retry(
execution_id = "exec_abc123" ,
params = {
"cluster_id" : "cluster_backup" # Try different cluster
}
)
Execution Callbacks
execution = client.executions.create(
pipeline_id = "pipe_abc123" ,
callbacks = {
"on_complete" : "https://api.example.com/webhook/complete" ,
"on_failure" : "https://api.example.com/webhook/failure"
}
)
# Webhooks receive POST requests with execution data:
# {
# "execution_id": "exec_abc123",
# "status": "completed",
# "outputs": {...}
# }
execution = client.executions.create(
pipeline_id = "pipe_abc123" ,
tags = [ "production" , "customer-123" , "v1.2.0" ]
)
# Search by tags
customer_runs = client.executions.list( tags = [ "customer-123" ])
Ephemeral Pipelines
Execute pipelines without persisting them:
# One-time execution
execution = client.executions.run_ephemeral(
pipeline_definition = {
"steps" : [
{
"id" : "quick_deploy" ,
"action" : "deployment_create" ,
"params" : {
"model_id" : "model_xyz" ,
"cluster_id" : "cluster_prod"
}
}
]
}
)
Pipeline Templates
Use templates for common workflows:
# Create template
template = {
"steps" : [
{
"id" : "add_model" ,
"action" : "model_add" ,
"params" : {
"model_uri" : " {{ params.model_uri }} " ,
"model_source" : " {{ params.model_source }} "
}
},
{
"id" : "deploy" ,
"action" : "deployment_create" ,
"params" : {
"model_id" : " {{ steps.add_model.output.model_id }} " ,
"cluster_id" : " {{ params.cluster_id }} "
},
"depends_on" : [ "add_model" ]
}
]
}
# Reuse template with different parameters
execution1 = client.executions.run_ephemeral(
pipeline_definition = template,
params = {
"model_uri" : "meta-llama/Llama-3.2-1B" ,
"model_source" : "hugging_face" ,
"cluster_id" : "cluster_prod"
}
)
execution2 = client.executions.run_ephemeral(
pipeline_definition = template,
params = {
"model_uri" : "mistralai/Mistral-7B" ,
"model_source" : "hugging_face" ,
"cluster_id" : "cluster_dev"
}
)
Batch Executions
Run multiple executions in parallel:
pipeline_id = "pipe_abc123"
param_sets = [
{ "cluster_id" : "cluster_1" , "model_id" : "model_1" },
{ "cluster_id" : "cluster_2" , "model_id" : "model_2" },
{ "cluster_id" : "cluster_3" , "model_id" : "model_3" },
]
executions = []
for params in param_sets:
execution = client.executions.create(
pipeline_id = pipeline_id,
params = params,
wait = False
)
executions.append(execution)
# Wait for all to complete
import time
while any (e.status in [ "pending" , "running" ] for e in executions):
time.sleep( 5 )
executions = [client.executions.get(e.id) for e in executions]
# Check results
for execution in executions:
print ( f " { execution.id } : { execution.status } " )
Execution Metrics
execution = client.executions.get( "exec_abc123" )
# Performance metrics
print ( f "Duration: { execution.duration_seconds } s" )
print ( f "Steps completed: { execution.completed_steps } / { execution.total_steps } " )
# Resource usage
print ( f "CPU time: { execution.cpu_seconds } s" )
print ( f "Memory peak: { execution.peak_memory_mb } MB" )
# Calculate success rate
history = client.executions.list(
pipeline_id = "pipe_abc123" ,
limit = 10 ,
sort_by = "created_at" ,
sort_order = "desc"
)
total = len (history)
successful = len ([e for e in history if e.status == "completed" ])
success_rate = (successful / total) * 100
print ( f "Success rate: { success_rate :.1f} %" )
Best Practices
Use descriptive IDs : Name steps clearly for easier debugging
Add logging : Include log actions for visibility
Handle failures : Use conditional branching for error paths
Parameterize : Make pipelines reusable with params
Test incrementally : Test each step before adding the next
Monitor executions : Set up callbacks and tags for tracking
Clean up : Delete old executions to save storage
Error Handling
from budai import NotFoundError, TimeoutError
try :
execution = client.executions.create(
pipeline_id = "pipe_abc123" ,
wait = True ,
timeout = 60
)
except TimeoutError :
print ( "Execution timed out" )
# Check status separately
execution = client.executions.get(execution.id)
except NotFoundError:
print ( "Pipeline not found" )
Next Steps
Pipeline DSL Pythonic pipeline builder with type safety
Actions Reference Complete list of available action types