Skip to main content

Overview

The Python SDK provides full control over pipeline creation, execution, and monitoring. Pipelines are DAG-based workflows that automate deployment tasks.

Creating Pipelines

Basic Pipeline

from budai import BudClient

client = BudClient(api_key="your-key")

pipeline = client.pipelines.create(
    name="Deploy Model",
    description="Automated model deployment workflow",
    definition={
        "steps": [
            {
                "id": "add_model",
                "action": "model_add",
                "params": {
                    "model_uri": "meta-llama/Llama-3.2-1B-Instruct",
                    "model_source": "hugging_face"
                }
            },
            {
                "id": "deploy",
                "action": "deployment_create",
                "params": {
                    "model_id": "{{steps.add_model.output.model_id}}",
                    "cluster_id": "cluster_prod",
                    "deployment_name": "llama-deployment"
                },
                "depends_on": ["add_model"]
            }
        ]
    }
)

print(f"Pipeline created: {pipeline.id}")

With Error Handling

pipeline = client.pipelines.create(
    name="Resilient Deployment",
    definition={
        "steps": [
            {
                "id": "health_check",
                "action": "cluster_health",
                "params": {"cluster_id": "cluster_prod"},
                "retry": {
                    "max_attempts": 3,
                    "backoff_multiplier": 2
                }
            },
            {
                "id": "deploy",
                "action": "deployment_create",
                "params": {...},
                "depends_on": ["health_check"],
                "condition": "steps.health_check.output.status == 'healthy'"
            },
            {
                "id": "notify_failure",
                "action": "notification",
                "params": {
                    "message": "Deployment failed",
                    "channel": "slack"
                },
                "condition": "steps.deploy.status == 'failed'"
            }
        ]
    }
)

Actions in Pipelines

Actions are the building blocks of pipelines. Each action represents a discrete step in your workflow.

Action Structure

Every action has:
  • Unique ID - Identifies the action in the DAG
  • Type - Determines what the action does (e.g., log, transform, api_call)
  • Configuration - Action-specific settings via params
  • Dependencies - Which actions must complete first via depends_on

Common Action Types

# Log action
{
    "id": "logger",
    "action": "log",
    "params": {
        "message": "Processing started",
        "level": "info"
    }
}

# API call action
{
    "id": "fetch_data",
    "action": "api_call",
    "params": {
        "url": "https://api.example.com/data",
        "method": "POST",
        "headers": {"Content-Type": "application/json"}
    }
}

# Conditional branching
{
    "id": "check_status",
    "action": "condition",
    "params": {
        "condition": "value > 100",
        "on_true": "path_a",
        "on_false": "path_b"
    }
}
See the Actions Reference for a complete list of available action types.

Managing Pipelines

Listing Pipelines

# Get all pipelines
pipelines = client.pipelines.list()

for pipeline in pipelines:
    print(f"{pipeline.id}: {pipeline.name}")

# Filter by name
pipelines = client.pipelines.list(name_contains="deploy")

# Pagination
pipelines = client.pipelines.list(limit=50, offset=0)

Getting Pipeline Details

pipeline = client.pipelines.get("pipe_abc123")

print(f"Name: {pipeline.name}")
print(f"Created: {pipeline.created_at}")
print(f"Steps: {len(pipeline.definition['steps'])}")
print(f"Status: {pipeline.status}")

Updating Pipelines

# Update pipeline definition
updated = client.pipelines.update(
    pipeline_id="pipe_abc123",
    definition={
        "steps": [
            # Updated step definitions
        ]
    }
)

# Update metadata only
updated = client.pipelines.update(
    pipeline_id="pipe_abc123",
    name="New Pipeline Name",
    description="Updated description"
)

Deleting Pipelines

client.pipelines.delete(pipeline_id="pipe_abc123")
Deleting a pipeline does not affect existing executions or created resources (deployments, models, etc.).

Executing Pipelines

Synchronous Execution

# Execute and wait for completion
execution = client.executions.create(
    pipeline_id="pipe_abc123",
    wait=True
)

if execution.status == "completed":
    print(f"Success! Output: {execution.outputs}")
else:
    print(f"Failed: {execution.error}")

Asynchronous Execution

# Start execution without waiting
execution = client.executions.create(
    pipeline_id="pipe_abc123",
    wait=False
)

print(f"Execution started: {execution.id}")

# Check status later
execution = client.executions.get(execution.id)
print(f"Status: {execution.status}")

With Parameters

execution = client.executions.create(
    pipeline_id="pipe_abc123",
    params={
        "cluster_id": "cluster_prod",
        "model_id": "model_xyz789",
        "replicas": 3,
        "environment": "production"
    }
)

Execution Priority

# High priority - runs immediately
urgent = client.executions.create(
    pipeline_id="pipe_abc123",
    priority="high"
)

# Low priority - runs when resources available
background = client.executions.create(
    pipeline_id="pipe_batch_process",
    priority="low"
)

Monitoring Executions

Listing Executions

# Get all executions
executions = client.executions.list()

# Filter by pipeline
pipeline_runs = client.executions.list(pipeline_id="pipe_abc123")

# Filter by status
failed = client.executions.list(status="failed")

# Filter by date
recent = client.executions.list(created_after="2024-01-01")

Execution Status

StatusDescription
pendingQueued for execution
runningCurrently executing
completedFinished successfully
failedExecution failed
cancelledManually cancelled
timeoutExceeded time limit

Polling for Completion

import time

execution = client.executions.create(
    pipeline_id="pipe_abc123",
    wait=False
)

while execution.status in ["pending", "running"]:
    time.sleep(5)
    execution = client.executions.get(execution.id)
    print(f"Status: {execution.status} ({execution.progress}%)")

print(f"Final status: {execution.status}")

Getting Execution Details

execution = client.executions.get("exec_abc123")

print(f"Pipeline: {execution.pipeline_id}")
print(f"Status: {execution.status}")
print(f"Started: {execution.started_at}")
print(f"Completed: {execution.completed_at}")
print(f"Duration: {execution.duration_seconds}s")

Execution Outputs

Pipeline Outputs

execution = client.executions.get("exec_abc123")

# Access pipeline-level outputs
print(execution.outputs)
# {
#   "deployment_id": "deploy_xyz",
#   "endpoint_url": "https://...",
#   "status": "success"
# }

# Access specific output
deployment_id = execution.outputs.get("deployment_id")

Step Outputs

# Access individual step outputs
for step_id, step in execution.steps.items():
    print(f"{step_id}:")
    print(f"  Status: {step.status}")
    print(f"  Output: {step.output}")
    print(f"  Duration: {step.duration_seconds}s")

Execution Logs

Retrieving Logs

# Get all logs
logs = client.executions.logs(execution_id="exec_abc123")

for log in logs:
    print(f"[{log.timestamp}] [{log.level}] {log.message}")

Filtered Logs

# Only errors
error_logs = client.executions.logs(
    execution_id="exec_abc123",
    level="error"
)

# Specific step
step_logs = client.executions.logs(
    execution_id="exec_abc123",
    step_id="deploy"
)

# Time range
recent_logs = client.executions.logs(
    execution_id="exec_abc123",
    since="5m"  # Last 5 minutes
)

Streaming Logs

# Stream logs in real-time
logs = client.executions.logs(
    execution_id="exec_abc123",
    follow=True
)

for log in logs:
    print(f"{log.message}")
    if log.level == "error":
        # Handle errors immediately
        alert_team(log.message)

Managing Executions

Cancelling Executions

# Cancel running execution
client.executions.cancel(execution_id="exec_abc123")

# Verify cancellation
execution = client.executions.get("exec_abc123")
print(execution.status)  # "cancelled"

Retrying Failed Executions

# Retry with same parameters
new_execution = client.executions.retry(execution_id="exec_abc123")

# Retry with different parameters
new_execution = client.executions.retry(
    execution_id="exec_abc123",
    params={
        "cluster_id": "cluster_backup"  # Try different cluster
    }
)

Execution Callbacks

execution = client.executions.create(
    pipeline_id="pipe_abc123",
    callbacks={
        "on_complete": "https://api.example.com/webhook/complete",
        "on_failure": "https://api.example.com/webhook/failure"
    }
)

# Webhooks receive POST requests with execution data:
# {
#   "execution_id": "exec_abc123",
#   "status": "completed",
#   "outputs": {...}
# }

Execution Tags

execution = client.executions.create(
    pipeline_id="pipe_abc123",
    tags=["production", "customer-123", "v1.2.0"]
)

# Search by tags
customer_runs = client.executions.list(tags=["customer-123"])

Ephemeral Pipelines

Execute pipelines without persisting them:
# One-time execution
execution = client.executions.run_ephemeral(
    pipeline_definition={
        "steps": [
            {
                "id": "quick_deploy",
                "action": "deployment_create",
                "params": {
                    "model_id": "model_xyz",
                    "cluster_id": "cluster_prod"
                }
            }
        ]
    }
)

Pipeline Templates

Use templates for common workflows:
# Create template
template = {
    "steps": [
        {
            "id": "add_model",
            "action": "model_add",
            "params": {
                "model_uri": "{{params.model_uri}}",
                "model_source": "{{params.model_source}}"
            }
        },
        {
            "id": "deploy",
            "action": "deployment_create",
            "params": {
                "model_id": "{{steps.add_model.output.model_id}}",
                "cluster_id": "{{params.cluster_id}}"
            },
            "depends_on": ["add_model"]
        }
    ]
}

# Reuse template with different parameters
execution1 = client.executions.run_ephemeral(
    pipeline_definition=template,
    params={
        "model_uri": "meta-llama/Llama-3.2-1B",
        "model_source": "hugging_face",
        "cluster_id": "cluster_prod"
    }
)

execution2 = client.executions.run_ephemeral(
    pipeline_definition=template,
    params={
        "model_uri": "mistralai/Mistral-7B",
        "model_source": "hugging_face",
        "cluster_id": "cluster_dev"
    }
)

Batch Executions

Run multiple executions in parallel:
pipeline_id = "pipe_abc123"
param_sets = [
    {"cluster_id": "cluster_1", "model_id": "model_1"},
    {"cluster_id": "cluster_2", "model_id": "model_2"},
    {"cluster_id": "cluster_3", "model_id": "model_3"},
]

executions = []
for params in param_sets:
    execution = client.executions.create(
        pipeline_id=pipeline_id,
        params=params,
        wait=False
    )
    executions.append(execution)

# Wait for all to complete
import time
while any(e.status in ["pending", "running"] for e in executions):
    time.sleep(5)
    executions = [client.executions.get(e.id) for e in executions]

# Check results
for execution in executions:
    print(f"{execution.id}: {execution.status}")

Execution Metrics

execution = client.executions.get("exec_abc123")

# Performance metrics
print(f"Duration: {execution.duration_seconds}s")
print(f"Steps completed: {execution.completed_steps}/{execution.total_steps}")

# Resource usage
print(f"CPU time: {execution.cpu_seconds}s")
print(f"Memory peak: {execution.peak_memory_mb} MB")

# Calculate success rate
history = client.executions.list(
    pipeline_id="pipe_abc123",
    limit=10,
    sort_by="created_at",
    sort_order="desc"
)

total = len(history)
successful = len([e for e in history if e.status == "completed"])
success_rate = (successful / total) * 100

print(f"Success rate: {success_rate:.1f}%")

Best Practices

Use descriptive IDs: Name steps clearly for easier debugging
Add logging: Include log actions for visibility
Handle failures: Use conditional branching for error paths
Parameterize: Make pipelines reusable with params
Test incrementally: Test each step before adding the next
Monitor executions: Set up callbacks and tags for tracking
Clean up: Delete old executions to save storage

Error Handling

from budai import NotFoundError, TimeoutError

try:
    execution = client.executions.create(
        pipeline_id="pipe_abc123",
        wait=True,
        timeout=60
    )
except TimeoutError:
    print("Execution timed out")
    # Check status separately
    execution = client.executions.get(execution.id)
except NotFoundError:
    print("Pipeline not found")

Next Steps