> ## Documentation Index
> Fetch the complete documentation index at: https://docs.budecosystem.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Pipeline DSL

> Build pipelines with Pythonic DSL instead of JSON definitions

## Overview

The Pipeline DSL provides a Pythonic way to build pipelines with type safety and IDE autocomplete.

## Basic Pipeline

Instead of JSON:

```python theme={null}
# JSON definition (verbose)
pipeline = client.pipelines.create(
    name="Deploy",
    definition={
        "steps": [
            {
                "id": "add_model",
                "action": "model_add",
                "params": {"model_uri": "..."}
            }
        ]
    }
)
```

Use DSL:

```python theme={null}
from budai.dsl import Pipeline, Step

# Pythonic DSL
with Pipeline(name="Deploy") as p:
    add_model = Step.model_add(
        model_uri="meta-llama/Llama-3.2-1B-Instruct",
        model_source="hugging_face"
    )

    deploy = Step.deployment_create(
        model_id=add_model.output.model_id,
        cluster_id="cluster_prod"
    ).depends_on(add_model)

pipeline = client.pipelines.create_from_dsl(p)
```

## Step Actions

### Control Flow

```python theme={null}
from budai.dsl import Step

# Log
log = Step.log(
    message="Starting deployment",
    level="info"
)

# Conditional
condition = Step.conditional(
    condition=lambda: deploy.output.status == "active"
)

# Transform
transform = Step.transform(
    operation="json_extract",
    input=api_call.output,
    template="{{ data.results }}"
)

# Set output
output = Step.set_output(
    key="deployment_url",
    value=deploy.output.endpoint_url
)

# Fail
fail = Step.fail(
    message="Deployment failed",
    error_code="DEPLOY_ERROR"
)
```

### Model Operations

```python theme={null}
# Add model from HuggingFace
add_hf = Step.model_add(
    model_uri="meta-llama/Llama-3.2-1B",
    model_source="hugging_face"
)

# Add cloud model
add_cloud = Step.add_cloud_model(
    provider="openai",
    model_id="gpt-4o",
    api_key=secrets.OPENAI_KEY
)

# Benchmark model
benchmark = Step.model_benchmark(
    model_id=add_hf.output.model_id,
    test_prompts=["Hello", "Test"],
    metrics=["latency", "throughput"]
)

# Delete model
delete = Step.model_delete(
    model_id=old_model.output.model_id
)
```

### Deployment Operations

```python theme={null}
# Create deployment
deploy = Step.deployment_create(
    model_id=add_model.output.model_id,
    cluster_id="cluster_prod",
    deployment_name="llama-inference",
    replicas=2
)

# Scale deployment
scale = Step.deployment_scale(
    deployment_id=deploy.output.deployment_id,
    replicas=5
)

# Configure rate limit
rate_limit = Step.configure_rate_limiting(
    deployment_id=deploy.output.deployment_id,
    rate_limit=100,
    time_window="minute"
)

# Delete deployment
delete_deploy = Step.deployment_delete(
    deployment_id=old_deploy.output.deployment_id
)
```

### Cluster Operations

```python theme={null}
# Health check
health = Step.cluster_health(
    cluster_id="cluster_prod",
    timeout_seconds=30
)

# Conditional on health
deploy = Step.deployment_create(...).depends_on(health).when(
    lambda: health.output.status == "healthy"
)
```

### Integration

```python theme={null}
# HTTP request
webhook = Step.http_request(
    url="https://api.example.com/webhook",
    method="POST",
    headers={"Content-Type": "application/json"},
    body={"event": "deployment_complete"}
)

# Notification
notify = Step.notification(
    message=f"Deployment {deploy.output.deployment_id} complete",
    channel="slack",
    priority="normal"
)
```

## Dependencies

### Linear Dependencies

```python theme={null}
with Pipeline(name="Linear Flow") as p:
    step1 = Step.log(message="Step 1")
    step2 = Step.log(message="Step 2").depends_on(step1)
    step3 = Step.log(message="Step 3").depends_on(step2)
```

### Multiple Dependencies

```python theme={null}
with Pipeline(name="Fan-in") as p:
    health = Step.cluster_health(cluster_id="cluster_prod")
    check_model = Step.log(message="Check model")

    # Deploy depends on both
    deploy = Step.deployment_create(...).depends_on(health, check_model)
```

### Parallel Execution

```python theme={null}
with Pipeline(name="Parallel") as p:
    # These run in parallel
    deploy1 = Step.deployment_create(model_id="model_1", cluster_id="cluster_1")
    deploy2 = Step.deployment_create(model_id="model_2", cluster_id="cluster_2")
    deploy3 = Step.deployment_create(model_id="model_3", cluster_id="cluster_3")

    # Wait for all to complete
    notify = Step.notification(
        message="All deployments complete"
    ).depends_on(deploy1, deploy2, deploy3)
```

## Conditional Execution

### When Clause

```python theme={null}
with Pipeline(name="Conditional Deploy") as p:
    health = Step.cluster_health(cluster_id="cluster_prod")

    # Only deploy if healthy
    deploy = Step.deployment_create(...).when(
        lambda: health.output.status == "healthy"
    )

    # Alert if unhealthy
    alert = Step.notification(
        message="Cluster unhealthy!"
    ).when(
        lambda: health.output.status != "healthy"
    )
```

### Unless Clause

```python theme={null}
# Execute unless condition is true
deploy = Step.deployment_create(...).unless(
    lambda: health.output.status == "unhealthy"
)
```

## Output References

### Accessing Step Outputs

```python theme={null}
with Pipeline(name="Output Chaining") as p:
    add_model = Step.model_add(...)

    # Reference output directly
    deploy = Step.deployment_create(
        model_id=add_model.output.model_id,  # Type-safe reference
        cluster_id="cluster_prod"
    )

    # Set pipeline output
    Step.set_output(
        key="endpoint",
        value=deploy.output.endpoint_url
    )
```

### Output Types

```python theme={null}
# IDE knows output structure
add_model = Step.model_add(...)

# Autocompletes: model_id, model_name, size_bytes
model_id: str = add_model.output.model_id

deploy = Step.deployment_create(...)

# Autocompletes: deployment_id, endpoint_url, status
endpoint: str = deploy.output.endpoint_url
```

## Error Handling

### Retry Configuration

```python theme={null}
with Pipeline(name="Resilient") as p:
    deploy = Step.deployment_create(...).retry(
        max_attempts=3,
        backoff_multiplier=2
    )
```

### Failure Paths

```python theme={null}
with Pipeline(name="Error Handling") as p:
    deploy = Step.deployment_create(...)

    # Success path
    notify_success = Step.notification(
        message="Deploy succeeded"
    ).when(lambda: deploy.status == "completed")

    # Failure path
    notify_failure = Step.notification(
        message="Deploy failed",
        priority="high"
    ).when(lambda: deploy.status == "failed")
```

## Complex Example

Full deployment workflow with DSL:

```python theme={null}
from budai import BudClient
from budai.dsl import Pipeline, Step

client = BudClient(api_key="your-key")

with Pipeline(name="Production Deploy", description="Full deployment workflow") as p:

    # 1. Verify cluster health
    health = Step.cluster_health(
        cluster_id="cluster_prod",
        timeout_seconds=30
    )

    # 2. Add model (only if cluster healthy)
    add_model = Step.model_add(
        model_uri="meta-llama/Llama-3.2-1B-Instruct",
        model_source="hugging_face"
    ).depends_on(health).when(
        lambda: health.output.status == "healthy"
    ).retry(max_attempts=3)

    # 3. Benchmark model
    benchmark = Step.model_benchmark(
        model_id=add_model.output.model_id,
        test_prompts=["Hello", "Test"],
        metrics=["latency", "throughput"]
    ).depends_on(add_model)

    # 4. Deploy model
    deploy = Step.deployment_create(
        model_id=add_model.output.model_id,
        cluster_id="cluster_prod",
        deployment_name="llama-production",
        replicas=3
    ).depends_on(benchmark)

    # 5. Configure autoscaling
    autoscale = Step.deployment_scale(
        deployment_id=deploy.output.deployment_id,
        auto_scaling={
            "min_replicas": 2,
            "max_replicas": 10,
            "target_cpu": 70
        }
    ).depends_on(deploy)

    # 6. Set pipeline outputs
    Step.set_output(
        key="deployment_id",
        value=deploy.output.deployment_id
    )
    Step.set_output(
        key="endpoint_url",
        value=deploy.output.endpoint_url
    )

    # 7. Send success notification
    Step.notification(
        message=f"Deployed {add_model.output.model_name} successfully",
        channel="slack"
    ).depends_on(autoscale)

# Create pipeline from DSL
pipeline = client.pipelines.create_from_dsl(p)

# Execute
execution = client.executions.create(pipeline_id=pipeline.id)
```

## DSL vs JSON Comparison

**JSON** (70 lines):

```python theme={null}
definition = {
    "steps": [
        {
            "id": "add_model",
            "action": "model_add",
            "params": {
                "model_uri": "meta-llama/Llama-3.2-1B",
                "model_source": "hugging_face"
            }
        },
        {
            "id": "deploy",
            "action": "deployment_create",
            "params": {
                "model_id": "{{steps.add_model.output.model_id}}",
                "cluster_id": "cluster_prod"
            },
            "depends_on": ["add_model"]
        }
    ]
}
```

**DSL** (8 lines):

```python theme={null}
with Pipeline(name="Deploy") as p:
    add = Step.model_add(
        model_uri="meta-llama/Llama-3.2-1B",
        model_source="hugging_face"
    )
    deploy = Step.deployment_create(
        model_id=add.output.model_id,
        cluster_id="cluster_prod"
    ).depends_on(add)
```

## Next Steps

<CardGroup cols={2}>
  <Card title="Pipelines" icon="diagram-project" href="/api-sdk/python-sdk/pipelines">
    Pipeline management
  </Card>

  <Card title="Advanced" icon="gear" href="/api-sdk/python-sdk/advanced">
    Production patterns
  </Card>
</CardGroup>
