Overview
The Python SDK provides full control over pipeline creation, execution, and monitoring. Pipelines are DAG-based workflows that automate deployment tasks.
Creating Pipelines
Basic Pipeline
from budai import BudClient
client = BudClient(api_key="your-key")
pipeline = client.pipelines.create(
name="Deploy Model",
description="Automated model deployment workflow",
definition={
"steps": [
{
"id": "add_model",
"action": "model_add",
"params": {
"model_uri": "meta-llama/Llama-3.2-1B-Instruct",
"model_source": "hugging_face"
}
},
{
"id": "deploy",
"action": "deployment_create",
"params": {
"model_id": "{{steps.add_model.output.model_id}}",
"cluster_id": "cluster_prod",
"deployment_name": "llama-deployment"
},
"depends_on": ["add_model"]
}
]
}
)
print(f"Pipeline created: {pipeline.id}")
With Error Handling
pipeline = client.pipelines.create(
name="Resilient Deployment",
definition={
"steps": [
{
"id": "health_check",
"action": "cluster_health",
"params": {"cluster_id": "cluster_prod"},
"retry": {
"max_attempts": 3,
"backoff_multiplier": 2
}
},
{
"id": "deploy",
"action": "deployment_create",
"params": {...},
"depends_on": ["health_check"],
"condition": "steps.health_check.output.status == 'healthy'"
},
{
"id": "notify_failure",
"action": "notification",
"params": {
"message": "Deployment failed",
"channel": "slack"
},
"condition": "steps.deploy.status == 'failed'"
}
]
}
)
Actions in Pipelines
Actions are the building blocks of pipelines. Each action represents a discrete step in your workflow.
Action Structure
Every action has:
- Unique ID - Identifies the action in the DAG
- Type - Determines what the action does (e.g.,
log, transform, api_call)
- Configuration - Action-specific settings via
params
- Dependencies - Which actions must complete first via
depends_on
Common Action Types
# Log action
{
"id": "logger",
"action": "log",
"params": {
"message": "Processing started",
"level": "info"
}
}
# API call action
{
"id": "fetch_data",
"action": "api_call",
"params": {
"url": "https://api.example.com/data",
"method": "POST",
"headers": {"Content-Type": "application/json"}
}
}
# Conditional branching
{
"id": "check_status",
"action": "condition",
"params": {
"condition": "value > 100",
"on_true": "path_a",
"on_false": "path_b"
}
}
See the Actions Reference for a complete list of available action types.
Managing Pipelines
Listing Pipelines
# Get all pipelines
pipelines = client.pipelines.list()
for pipeline in pipelines:
print(f"{pipeline.id}: {pipeline.name}")
# Filter by name
pipelines = client.pipelines.list(name_contains="deploy")
# Pagination
pipelines = client.pipelines.list(limit=50, offset=0)
Getting Pipeline Details
pipeline = client.pipelines.get("pipe_abc123")
print(f"Name: {pipeline.name}")
print(f"Created: {pipeline.created_at}")
print(f"Steps: {len(pipeline.definition['steps'])}")
print(f"Status: {pipeline.status}")
Updating Pipelines
# Update pipeline definition
updated = client.pipelines.update(
pipeline_id="pipe_abc123",
definition={
"steps": [
# Updated step definitions
]
}
)
# Update metadata only
updated = client.pipelines.update(
pipeline_id="pipe_abc123",
name="New Pipeline Name",
description="Updated description"
)
Deleting Pipelines
client.pipelines.delete(pipeline_id="pipe_abc123")
Deleting a pipeline does not affect existing executions or created resources (deployments, models, etc.).
Executing Pipelines
Synchronous Execution
# Execute and wait for completion
execution = client.executions.create(
pipeline_id="pipe_abc123",
wait=True
)
if execution.status == "completed":
print(f"Success! Output: {execution.outputs}")
else:
print(f"Failed: {execution.error}")
Asynchronous Execution
# Start execution without waiting
execution = client.executions.create(
pipeline_id="pipe_abc123",
wait=False
)
print(f"Execution started: {execution.id}")
# Check status later
execution = client.executions.get(execution.id)
print(f"Status: {execution.status}")
With Parameters
execution = client.executions.create(
pipeline_id="pipe_abc123",
params={
"cluster_id": "cluster_prod",
"model_id": "model_xyz789",
"replicas": 3,
"environment": "production"
}
)
Execution Priority
# High priority - runs immediately
urgent = client.executions.create(
pipeline_id="pipe_abc123",
priority="high"
)
# Low priority - runs when resources available
background = client.executions.create(
pipeline_id="pipe_batch_process",
priority="low"
)
Monitoring Executions
Listing Executions
# Get all executions
executions = client.executions.list()
# Filter by pipeline
pipeline_runs = client.executions.list(pipeline_id="pipe_abc123")
# Filter by status
failed = client.executions.list(status="failed")
# Filter by date
recent = client.executions.list(created_after="2024-01-01")
Execution Status
| Status | Description |
|---|
pending | Queued for execution |
running | Currently executing |
completed | Finished successfully |
failed | Execution failed |
cancelled | Manually cancelled |
timeout | Exceeded time limit |
Polling for Completion
import time
execution = client.executions.create(
pipeline_id="pipe_abc123",
wait=False
)
while execution.status in ["pending", "running"]:
time.sleep(5)
execution = client.executions.get(execution.id)
print(f"Status: {execution.status} ({execution.progress}%)")
print(f"Final status: {execution.status}")
Getting Execution Details
execution = client.executions.get("exec_abc123")
print(f"Pipeline: {execution.pipeline_id}")
print(f"Status: {execution.status}")
print(f"Started: {execution.started_at}")
print(f"Completed: {execution.completed_at}")
print(f"Duration: {execution.duration_seconds}s")
Execution Outputs
Pipeline Outputs
execution = client.executions.get("exec_abc123")
# Access pipeline-level outputs
print(execution.outputs)
# {
# "deployment_id": "deploy_xyz",
# "endpoint_url": "https://...",
# "status": "success"
# }
# Access specific output
deployment_id = execution.outputs.get("deployment_id")
Step Outputs
# Access individual step outputs
for step_id, step in execution.steps.items():
print(f"{step_id}:")
print(f" Status: {step.status}")
print(f" Output: {step.output}")
print(f" Duration: {step.duration_seconds}s")
Execution Logs
Retrieving Logs
# Get all logs
logs = client.executions.logs(execution_id="exec_abc123")
for log in logs:
print(f"[{log.timestamp}] [{log.level}] {log.message}")
Filtered Logs
# Only errors
error_logs = client.executions.logs(
execution_id="exec_abc123",
level="error"
)
# Specific step
step_logs = client.executions.logs(
execution_id="exec_abc123",
step_id="deploy"
)
# Time range
recent_logs = client.executions.logs(
execution_id="exec_abc123",
since="5m" # Last 5 minutes
)
Streaming Logs
# Stream logs in real-time
logs = client.executions.logs(
execution_id="exec_abc123",
follow=True
)
for log in logs:
print(f"{log.message}")
if log.level == "error":
# Handle errors immediately
alert_team(log.message)
Managing Executions
Cancelling Executions
# Cancel running execution
client.executions.cancel(execution_id="exec_abc123")
# Verify cancellation
execution = client.executions.get("exec_abc123")
print(execution.status) # "cancelled"
Retrying Failed Executions
# Retry with same parameters
new_execution = client.executions.retry(execution_id="exec_abc123")
# Retry with different parameters
new_execution = client.executions.retry(
execution_id="exec_abc123",
params={
"cluster_id": "cluster_backup" # Try different cluster
}
)
Execution Callbacks
execution = client.executions.create(
pipeline_id="pipe_abc123",
callbacks={
"on_complete": "https://api.example.com/webhook/complete",
"on_failure": "https://api.example.com/webhook/failure"
}
)
# Webhooks receive POST requests with execution data:
# {
# "execution_id": "exec_abc123",
# "status": "completed",
# "outputs": {...}
# }
execution = client.executions.create(
pipeline_id="pipe_abc123",
tags=["production", "customer-123", "v1.2.0"]
)
# Search by tags
customer_runs = client.executions.list(tags=["customer-123"])
Ephemeral Pipelines
Execute pipelines without persisting them:
# One-time execution
execution = client.executions.run_ephemeral(
pipeline_definition={
"steps": [
{
"id": "quick_deploy",
"action": "deployment_create",
"params": {
"model_id": "model_xyz",
"cluster_id": "cluster_prod"
}
}
]
}
)
Pipeline Templates
Use templates for common workflows:
# Create template
template = {
"steps": [
{
"id": "add_model",
"action": "model_add",
"params": {
"model_uri": "{{params.model_uri}}",
"model_source": "{{params.model_source}}"
}
},
{
"id": "deploy",
"action": "deployment_create",
"params": {
"model_id": "{{steps.add_model.output.model_id}}",
"cluster_id": "{{params.cluster_id}}"
},
"depends_on": ["add_model"]
}
]
}
# Reuse template with different parameters
execution1 = client.executions.run_ephemeral(
pipeline_definition=template,
params={
"model_uri": "meta-llama/Llama-3.2-1B",
"model_source": "hugging_face",
"cluster_id": "cluster_prod"
}
)
execution2 = client.executions.run_ephemeral(
pipeline_definition=template,
params={
"model_uri": "mistralai/Mistral-7B",
"model_source": "hugging_face",
"cluster_id": "cluster_dev"
}
)
Batch Executions
Run multiple executions in parallel:
pipeline_id = "pipe_abc123"
param_sets = [
{"cluster_id": "cluster_1", "model_id": "model_1"},
{"cluster_id": "cluster_2", "model_id": "model_2"},
{"cluster_id": "cluster_3", "model_id": "model_3"},
]
executions = []
for params in param_sets:
execution = client.executions.create(
pipeline_id=pipeline_id,
params=params,
wait=False
)
executions.append(execution)
# Wait for all to complete
import time
while any(e.status in ["pending", "running"] for e in executions):
time.sleep(5)
executions = [client.executions.get(e.id) for e in executions]
# Check results
for execution in executions:
print(f"{execution.id}: {execution.status}")
Execution Metrics
execution = client.executions.get("exec_abc123")
# Performance metrics
print(f"Duration: {execution.duration_seconds}s")
print(f"Steps completed: {execution.completed_steps}/{execution.total_steps}")
# Resource usage
print(f"CPU time: {execution.cpu_seconds}s")
print(f"Memory peak: {execution.peak_memory_mb} MB")
# Calculate success rate
history = client.executions.list(
pipeline_id="pipe_abc123",
limit=10,
sort_by="created_at",
sort_order="desc"
)
total = len(history)
successful = len([e for e in history if e.status == "completed"])
success_rate = (successful / total) * 100
print(f"Success rate: {success_rate:.1f}%")
Best Practices
Use descriptive IDs: Name steps clearly for easier debugging
Add logging: Include log actions for visibility
Handle failures: Use conditional branching for error paths
Parameterize: Make pipelines reusable with params
Test incrementally: Test each step before adding the next
Monitor executions: Set up callbacks and tags for tracking
Clean up: Delete old executions to save storage
Error Handling
from budai import NotFoundError, TimeoutError
try:
execution = client.executions.create(
pipeline_id="pipe_abc123",
wait=True,
timeout=60
)
except TimeoutError:
print("Execution timed out")
# Check status separately
execution = client.executions.get(execution.id)
except NotFoundError:
print("Pipeline not found")
Next Steps