Skip to main content

Overview

The Pipeline DSL provides a Pythonic way to build pipelines with type safety and IDE autocomplete.

Basic Pipeline

Instead of JSON:
# JSON definition (verbose)
pipeline = client.pipelines.create(
    name="Deploy",
    definition={
        "steps": [
            {
                "id": "add_model",
                "action": "model_add",
                "params": {"model_uri": "..."}
            }
        ]
    }
)
Use DSL:
from budai.dsl import Pipeline, Step

# Pythonic DSL
with Pipeline(name="Deploy") as p:
    add_model = Step.model_add(
        model_uri="meta-llama/Llama-3.2-1B-Instruct",
        model_source="hugging_face"
    )

    deploy = Step.deployment_create(
        model_id=add_model.output.model_id,
        cluster_id="cluster_prod"
    ).depends_on(add_model)

pipeline = client.pipelines.create_from_dsl(p)

Step Actions

Control Flow

from budai.dsl import Step

# Log
log = Step.log(
    message="Starting deployment",
    level="info"
)

# Conditional
condition = Step.conditional(
    condition=lambda: deploy.output.status == "active"
)

# Transform
transform = Step.transform(
    operation="json_extract",
    input=api_call.output,
    template="{{ data.results }}"
)

# Set output
output = Step.set_output(
    key="deployment_url",
    value=deploy.output.endpoint_url
)

# Fail
fail = Step.fail(
    message="Deployment failed",
    error_code="DEPLOY_ERROR"
)

Model Operations

# Add model from HuggingFace
add_hf = Step.model_add(
    model_uri="meta-llama/Llama-3.2-1B",
    model_source="hugging_face"
)

# Add cloud model
add_cloud = Step.add_cloud_model(
    provider="openai",
    model_id="gpt-4o",
    api_key=secrets.OPENAI_KEY
)

# Benchmark model
benchmark = Step.model_benchmark(
    model_id=add_hf.output.model_id,
    test_prompts=["Hello", "Test"],
    metrics=["latency", "throughput"]
)

# Delete model
delete = Step.model_delete(
    model_id=old_model.output.model_id
)

Deployment Operations

# Create deployment
deploy = Step.deployment_create(
    model_id=add_model.output.model_id,
    cluster_id="cluster_prod",
    deployment_name="llama-inference",
    replicas=2
)

# Scale deployment
scale = Step.deployment_scale(
    deployment_id=deploy.output.deployment_id,
    replicas=5
)

# Configure rate limit
rate_limit = Step.configure_rate_limiting(
    deployment_id=deploy.output.deployment_id,
    rate_limit=100,
    time_window="minute"
)

# Delete deployment
delete_deploy = Step.deployment_delete(
    deployment_id=old_deploy.output.deployment_id
)

Cluster Operations

# Health check
health = Step.cluster_health(
    cluster_id="cluster_prod",
    timeout_seconds=30
)

# Conditional on health
deploy = Step.deployment_create(...).depends_on(health).when(
    lambda: health.output.status == "healthy"
)

Integration

# HTTP request
webhook = Step.http_request(
    url="https://api.example.com/webhook",
    method="POST",
    headers={"Content-Type": "application/json"},
    body={"event": "deployment_complete"}
)

# Notification
notify = Step.notification(
    message=f"Deployment {deploy.output.deployment_id} complete",
    channel="slack",
    priority="normal"
)

Dependencies

Linear Dependencies

with Pipeline(name="Linear Flow") as p:
    step1 = Step.log(message="Step 1")
    step2 = Step.log(message="Step 2").depends_on(step1)
    step3 = Step.log(message="Step 3").depends_on(step2)

Multiple Dependencies

with Pipeline(name="Fan-in") as p:
    health = Step.cluster_health(cluster_id="cluster_prod")
    check_model = Step.log(message="Check model")

    # Deploy depends on both
    deploy = Step.deployment_create(...).depends_on(health, check_model)

Parallel Execution

with Pipeline(name="Parallel") as p:
    # These run in parallel
    deploy1 = Step.deployment_create(model_id="model_1", cluster_id="cluster_1")
    deploy2 = Step.deployment_create(model_id="model_2", cluster_id="cluster_2")
    deploy3 = Step.deployment_create(model_id="model_3", cluster_id="cluster_3")

    # Wait for all to complete
    notify = Step.notification(
        message="All deployments complete"
    ).depends_on(deploy1, deploy2, deploy3)

Conditional Execution

When Clause

with Pipeline(name="Conditional Deploy") as p:
    health = Step.cluster_health(cluster_id="cluster_prod")

    # Only deploy if healthy
    deploy = Step.deployment_create(...).when(
        lambda: health.output.status == "healthy"
    )

    # Alert if unhealthy
    alert = Step.notification(
        message="Cluster unhealthy!"
    ).when(
        lambda: health.output.status != "healthy"
    )

Unless Clause

# Execute unless condition is true
deploy = Step.deployment_create(...).unless(
    lambda: health.output.status == "unhealthy"
)

Output References

Accessing Step Outputs

with Pipeline(name="Output Chaining") as p:
    add_model = Step.model_add(...)

    # Reference output directly
    deploy = Step.deployment_create(
        model_id=add_model.output.model_id,  # Type-safe reference
        cluster_id="cluster_prod"
    )

    # Set pipeline output
    Step.set_output(
        key="endpoint",
        value=deploy.output.endpoint_url
    )

Output Types

# IDE knows output structure
add_model = Step.model_add(...)

# Autocompletes: model_id, model_name, size_bytes
model_id: str = add_model.output.model_id

deploy = Step.deployment_create(...)

# Autocompletes: deployment_id, endpoint_url, status
endpoint: str = deploy.output.endpoint_url

Error Handling

Retry Configuration

with Pipeline(name="Resilient") as p:
    deploy = Step.deployment_create(...).retry(
        max_attempts=3,
        backoff_multiplier=2
    )

Failure Paths

with Pipeline(name="Error Handling") as p:
    deploy = Step.deployment_create(...)

    # Success path
    notify_success = Step.notification(
        message="Deploy succeeded"
    ).when(lambda: deploy.status == "completed")

    # Failure path
    notify_failure = Step.notification(
        message="Deploy failed",
        priority="high"
    ).when(lambda: deploy.status == "failed")

Complex Example

Full deployment workflow with DSL:
from budai import BudClient
from budai.dsl import Pipeline, Step

client = BudClient(api_key="your-key")

with Pipeline(name="Production Deploy", description="Full deployment workflow") as p:

    # 1. Verify cluster health
    health = Step.cluster_health(
        cluster_id="cluster_prod",
        timeout_seconds=30
    )

    # 2. Add model (only if cluster healthy)
    add_model = Step.model_add(
        model_uri="meta-llama/Llama-3.2-1B-Instruct",
        model_source="hugging_face"
    ).depends_on(health).when(
        lambda: health.output.status == "healthy"
    ).retry(max_attempts=3)

    # 3. Benchmark model
    benchmark = Step.model_benchmark(
        model_id=add_model.output.model_id,
        test_prompts=["Hello", "Test"],
        metrics=["latency", "throughput"]
    ).depends_on(add_model)

    # 4. Deploy model
    deploy = Step.deployment_create(
        model_id=add_model.output.model_id,
        cluster_id="cluster_prod",
        deployment_name="llama-production",
        replicas=3
    ).depends_on(benchmark)

    # 5. Configure autoscaling
    autoscale = Step.deployment_scale(
        deployment_id=deploy.output.deployment_id,
        auto_scaling={
            "min_replicas": 2,
            "max_replicas": 10,
            "target_cpu": 70
        }
    ).depends_on(deploy)

    # 6. Set pipeline outputs
    Step.set_output(
        key="deployment_id",
        value=deploy.output.deployment_id
    )
    Step.set_output(
        key="endpoint_url",
        value=deploy.output.endpoint_url
    )

    # 7. Send success notification
    Step.notification(
        message=f"Deployed {add_model.output.model_name} successfully",
        channel="slack"
    ).depends_on(autoscale)

# Create pipeline from DSL
pipeline = client.pipelines.create_from_dsl(p)

# Execute
execution = client.executions.create(pipeline_id=pipeline.id)

DSL vs JSON Comparison

JSON (70 lines):
definition = {
    "steps": [
        {
            "id": "add_model",
            "action": "model_add",
            "params": {
                "model_uri": "meta-llama/Llama-3.2-1B",
                "model_source": "hugging_face"
            }
        },
        {
            "id": "deploy",
            "action": "deployment_create",
            "params": {
                "model_id": "{{steps.add_model.output.model_id}}",
                "cluster_id": "cluster_prod"
            },
            "depends_on": ["add_model"]
        }
    ]
}
DSL (8 lines):
with Pipeline(name="Deploy") as p:
    add = Step.model_add(
        model_uri="meta-llama/Llama-3.2-1B",
        model_source="hugging_face"
    )
    deploy = Step.deployment_create(
        model_id=add.output.model_id,
        cluster_id="cluster_prod"
    ).depends_on(add)

Next Steps