Overview
The Pipeline DSL provides a Pythonic way to build pipelines with type safety and IDE autocomplete.Basic Pipeline
Instead of JSON:Copy
# JSON definition (verbose)
pipeline = client.pipelines.create(
name="Deploy",
definition={
"steps": [
{
"id": "add_model",
"action": "model_add",
"params": {"model_uri": "..."}
}
]
}
)
Copy
from budai.dsl import Pipeline, Step
# Pythonic DSL
with Pipeline(name="Deploy") as p:
add_model = Step.model_add(
model_uri="meta-llama/Llama-3.2-1B-Instruct",
model_source="hugging_face"
)
deploy = Step.deployment_create(
model_id=add_model.output.model_id,
cluster_id="cluster_prod"
).depends_on(add_model)
pipeline = client.pipelines.create_from_dsl(p)
Step Actions
Control Flow
Copy
from budai.dsl import Step
# Log
log = Step.log(
message="Starting deployment",
level="info"
)
# Conditional
condition = Step.conditional(
condition=lambda: deploy.output.status == "active"
)
# Transform
transform = Step.transform(
operation="json_extract",
input=api_call.output,
template="{{ data.results }}"
)
# Set output
output = Step.set_output(
key="deployment_url",
value=deploy.output.endpoint_url
)
# Fail
fail = Step.fail(
message="Deployment failed",
error_code="DEPLOY_ERROR"
)
Model Operations
Copy
# Add model from HuggingFace
add_hf = Step.model_add(
model_uri="meta-llama/Llama-3.2-1B",
model_source="hugging_face"
)
# Add cloud model
add_cloud = Step.add_cloud_model(
provider="openai",
model_id="gpt-4o",
api_key=secrets.OPENAI_KEY
)
# Benchmark model
benchmark = Step.model_benchmark(
model_id=add_hf.output.model_id,
test_prompts=["Hello", "Test"],
metrics=["latency", "throughput"]
)
# Delete model
delete = Step.model_delete(
model_id=old_model.output.model_id
)
Deployment Operations
Copy
# Create deployment
deploy = Step.deployment_create(
model_id=add_model.output.model_id,
cluster_id="cluster_prod",
deployment_name="llama-inference",
replicas=2
)
# Scale deployment
scale = Step.deployment_scale(
deployment_id=deploy.output.deployment_id,
replicas=5
)
# Configure rate limit
rate_limit = Step.configure_rate_limiting(
deployment_id=deploy.output.deployment_id,
rate_limit=100,
time_window="minute"
)
# Delete deployment
delete_deploy = Step.deployment_delete(
deployment_id=old_deploy.output.deployment_id
)
Cluster Operations
Copy
# Health check
health = Step.cluster_health(
cluster_id="cluster_prod",
timeout_seconds=30
)
# Conditional on health
deploy = Step.deployment_create(...).depends_on(health).when(
lambda: health.output.status == "healthy"
)
Integration
Copy
# HTTP request
webhook = Step.http_request(
url="https://api.example.com/webhook",
method="POST",
headers={"Content-Type": "application/json"},
body={"event": "deployment_complete"}
)
# Notification
notify = Step.notification(
message=f"Deployment {deploy.output.deployment_id} complete",
channel="slack",
priority="normal"
)
Dependencies
Linear Dependencies
Copy
with Pipeline(name="Linear Flow") as p:
step1 = Step.log(message="Step 1")
step2 = Step.log(message="Step 2").depends_on(step1)
step3 = Step.log(message="Step 3").depends_on(step2)
Multiple Dependencies
Copy
with Pipeline(name="Fan-in") as p:
health = Step.cluster_health(cluster_id="cluster_prod")
check_model = Step.log(message="Check model")
# Deploy depends on both
deploy = Step.deployment_create(...).depends_on(health, check_model)
Parallel Execution
Copy
with Pipeline(name="Parallel") as p:
# These run in parallel
deploy1 = Step.deployment_create(model_id="model_1", cluster_id="cluster_1")
deploy2 = Step.deployment_create(model_id="model_2", cluster_id="cluster_2")
deploy3 = Step.deployment_create(model_id="model_3", cluster_id="cluster_3")
# Wait for all to complete
notify = Step.notification(
message="All deployments complete"
).depends_on(deploy1, deploy2, deploy3)
Conditional Execution
When Clause
Copy
with Pipeline(name="Conditional Deploy") as p:
health = Step.cluster_health(cluster_id="cluster_prod")
# Only deploy if healthy
deploy = Step.deployment_create(...).when(
lambda: health.output.status == "healthy"
)
# Alert if unhealthy
alert = Step.notification(
message="Cluster unhealthy!"
).when(
lambda: health.output.status != "healthy"
)
Unless Clause
Copy
# Execute unless condition is true
deploy = Step.deployment_create(...).unless(
lambda: health.output.status == "unhealthy"
)
Output References
Accessing Step Outputs
Copy
with Pipeline(name="Output Chaining") as p:
add_model = Step.model_add(...)
# Reference output directly
deploy = Step.deployment_create(
model_id=add_model.output.model_id, # Type-safe reference
cluster_id="cluster_prod"
)
# Set pipeline output
Step.set_output(
key="endpoint",
value=deploy.output.endpoint_url
)
Output Types
Copy
# IDE knows output structure
add_model = Step.model_add(...)
# Autocompletes: model_id, model_name, size_bytes
model_id: str = add_model.output.model_id
deploy = Step.deployment_create(...)
# Autocompletes: deployment_id, endpoint_url, status
endpoint: str = deploy.output.endpoint_url
Error Handling
Retry Configuration
Copy
with Pipeline(name="Resilient") as p:
deploy = Step.deployment_create(...).retry(
max_attempts=3,
backoff_multiplier=2
)
Failure Paths
Copy
with Pipeline(name="Error Handling") as p:
deploy = Step.deployment_create(...)
# Success path
notify_success = Step.notification(
message="Deploy succeeded"
).when(lambda: deploy.status == "completed")
# Failure path
notify_failure = Step.notification(
message="Deploy failed",
priority="high"
).when(lambda: deploy.status == "failed")
Complex Example
Full deployment workflow with DSL:Copy
from budai import BudClient
from budai.dsl import Pipeline, Step
client = BudClient(api_key="your-key")
with Pipeline(name="Production Deploy", description="Full deployment workflow") as p:
# 1. Verify cluster health
health = Step.cluster_health(
cluster_id="cluster_prod",
timeout_seconds=30
)
# 2. Add model (only if cluster healthy)
add_model = Step.model_add(
model_uri="meta-llama/Llama-3.2-1B-Instruct",
model_source="hugging_face"
).depends_on(health).when(
lambda: health.output.status == "healthy"
).retry(max_attempts=3)
# 3. Benchmark model
benchmark = Step.model_benchmark(
model_id=add_model.output.model_id,
test_prompts=["Hello", "Test"],
metrics=["latency", "throughput"]
).depends_on(add_model)
# 4. Deploy model
deploy = Step.deployment_create(
model_id=add_model.output.model_id,
cluster_id="cluster_prod",
deployment_name="llama-production",
replicas=3
).depends_on(benchmark)
# 5. Configure autoscaling
autoscale = Step.deployment_scale(
deployment_id=deploy.output.deployment_id,
auto_scaling={
"min_replicas": 2,
"max_replicas": 10,
"target_cpu": 70
}
).depends_on(deploy)
# 6. Set pipeline outputs
Step.set_output(
key="deployment_id",
value=deploy.output.deployment_id
)
Step.set_output(
key="endpoint_url",
value=deploy.output.endpoint_url
)
# 7. Send success notification
Step.notification(
message=f"Deployed {add_model.output.model_name} successfully",
channel="slack"
).depends_on(autoscale)
# Create pipeline from DSL
pipeline = client.pipelines.create_from_dsl(p)
# Execute
execution = client.executions.create(pipeline_id=pipeline.id)
DSL vs JSON Comparison
JSON (70 lines):Copy
definition = {
"steps": [
{
"id": "add_model",
"action": "model_add",
"params": {
"model_uri": "meta-llama/Llama-3.2-1B",
"model_source": "hugging_face"
}
},
{
"id": "deploy",
"action": "deployment_create",
"params": {
"model_id": "{{steps.add_model.output.model_id}}",
"cluster_id": "cluster_prod"
},
"depends_on": ["add_model"]
}
]
}
Copy
with Pipeline(name="Deploy") as p:
add = Step.model_add(
model_uri="meta-llama/Llama-3.2-1B",
model_source="hugging_face"
)
deploy = Step.deployment_create(
model_id=add.output.model_id,
cluster_id="cluster_prod"
).depends_on(add)