Skip to main content

Chat Completions

Basic Chat

from bud import BudClient

client = BudClient()

response = client.chat.completions.create(
    model="gpt-4",
    messages=[
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": "Explain quantum computing"}
    ]
)

print(response.choices[0].message.content)

Streaming Chat

stream = client.chat.completions.create(
    model="gpt-4",
    messages=[{"role": "user", "content": "Write a story"}],
    stream=True
)

for chunk in stream:
    if chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="", flush=True)

Function Calling

tools = [
    {
        "type": "function",
        "function": {
            "name": "get_weather",
            "description": "Get current weather",
            "parameters": {
                "type": "object",
                "properties": {
                    "location": {"type": "string"},
                    "unit": {"type": "string", "enum": ["celsius", "fahrenheit"]}
                },
                "required": ["location"]
            }
        }
    }
]

response = client.chat.completions.create(
    model="gpt-4",
    messages=[{"role": "user", "content": "What's the weather in Paris?"}],
    tools=tools,
    tool_choice="auto"
)

if response.choices[0].message.tool_calls:
    for tool_call in response.choices[0].message.tool_calls:
        print(f"Function: {tool_call.function.name}")
        print(f"Arguments: {tool_call.function.arguments}")

Embeddings

Single Text

response = client.embeddings.create(
    model="text-embedding-3-small",
    input="Machine learning is transforming industries"
)

embedding = response.data[0].embedding
print(f"Embedding dimension: {len(embedding)}")

Batch Embeddings

texts = [
    "First document about AI",
    "Second document about ML",
    "Third document about data science"
]

response = client.embeddings.create(
    model="text-embedding-3-small",
    input=texts
)

for i, item in enumerate(response.data):
    print(f"Document {i}: {len(item.embedding)} dimensions")

With Chunking

response = client.embeddings.create(
    model="text-embedding-3-small",
    input="Very long document text...",
    chunking={
        "enabled": True,
        "strategy": "token",
        "chunk_size": 512,
        "chunk_overlap": 50
    }
)

Pipelines

Simple Pipeline

from bud import Pipeline, Action

with Pipeline("hello-world") as p:
    start = Action("start", type="log").with_config(
        message="Starting pipeline"
    )

    end = Action("end", type="log").after(start).with_config(
        message="Pipeline completed"
    )

pipeline = client.pipelines.create(dag=p.to_dag(), name=p.name)

Data Processing Pipeline

with Pipeline("process-data") as p:
    # Fetch data from API
    fetch = Action("fetch", type="api_call").with_config(
        url="https://api.example.com/data",
        method="GET"
    )

    # Validate data
    validate = Action("validate", type="condition").after(fetch).with_config(
        condition="data.count > 0"
    )

    # Transform data
    transform = Action("transform", type="transform").after(validate).with_config(
        operation="normalize"
    )

    # Save to database
    save = Action("save", type="database").after(transform).with_config(
        table="processed_data",
        operation="insert"
    )

pipeline = client.pipelines.create(
    dag=p.to_dag(),
    name=p.name,
    description="Fetch, validate, transform, and save data"
)

Parallel Processing

with Pipeline("parallel-workflow") as p:
    start = Action("start", type="log")

    # Run these in parallel
    task1 = Action("task1", type="process").after(start)
    task2 = Action("task2", type="process").after(start)
    task3 = Action("task3", type="process").after(start)

    # Wait for all to complete
    end = Action("end", type="log").after(task1, task2, task3)

pipeline = client.pipelines.create(dag=p.to_dag(), name=p.name)

Executions

Create and Monitor Execution

# Create execution
execution = client.executions.create(
    pipeline_id="pipeline-123",
    params={"input": "data.csv"}
)

print(f"Execution ID: {execution.id}")
print(f"Status: {execution.status}")

# Poll for completion
import time

while execution.status in ["pending", "running"]:
    time.sleep(2)
    execution = client.executions.get(execution.id)
    print(f"Status: {execution.status}")

print(f"Final status: {execution.status}")

Run and Wait

# Run ephemeral pipeline and wait for completion
result = client.executions.run_ephemeral(
    pipeline_id="pipeline-123",
    params={"input": "data.csv"},
    wait=True
)

print(f"Result: {result}")

List Recent Executions

# Get recent executions
executions = client.executions.list(
    status="completed",
    limit=10
)

for execution in executions:
    print(f"{execution.id}: {execution.status} - {execution.created_at}")

Async Client

Async Chat Completion

import asyncio
from bud import AsyncBudClient

async def main():
    async with AsyncBudClient() as client:
        response = await client.chat.completions.create(
            model="gpt-4",
            messages=[{"role": "user", "content": "Hello!"}]
        )
        print(response.choices[0].message.content)

asyncio.run(main())

Concurrent Requests

async def process_multiple():
    async with AsyncBudClient() as client:
        tasks = [
            client.chat.completions.create(
                model="gpt-4",
                messages=[{"role": "user", "content": f"Question {i}"}]
            )
            for i in range(5)
        ]

        responses = await asyncio.gather(*tasks)

        for i, response in enumerate(responses):
            print(f"Response {i}: {response.choices[0].message.content}")

asyncio.run(process_multiple())

Error Handling

from bud import BudClient
from bud.exceptions import BudAPIError, AuthenticationError

client = BudClient()

try:
    response = client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": "Hello"}]
    )
except AuthenticationError as e:
    print(f"Authentication failed: {e}")
except BudAPIError as e:
    print(f"API error: {e.status_code} - {e.message}")
except Exception as e:
    print(f"Unexpected error: {e}")

Complete Application Example

from bud import BudClient, Pipeline, Action
import os

# Initialize client
client = BudClient(api_key=os.environ["BUD_API_KEY"])

# Define data processing pipeline
with Pipeline("customer-insights") as p:
    # Fetch customer data
    fetch = Action("fetch-data", type="api_call").with_config(
        url="https://api.example.com/customers",
        method="GET"
    )

    # Analyze sentiment
    analyze = Action("sentiment", type="classification").after(fetch).with_config(
        model="sentiment-analyzer",
        field="feedback"
    )

    # Generate insights with LLM
    insights = Action("generate-insights", type="chat").after(analyze).with_config(
        model="gpt-4",
        prompt="Analyze this customer feedback: {{feedback}}"
    )

    # Save results
    save = Action("save", type="database").after(insights).with_config(
        table="customer_insights"
    )

# Register pipeline
pipeline = client.pipelines.create(
    dag=p.to_dag(),
    name=p.name,
    description="Automated customer insights generation"
)

# Execute pipeline
execution = client.executions.create(
    pipeline_id=pipeline.id,
    params={"customer_id": "12345"}
)

print(f"Pipeline execution started: {execution.id}")