Chat Completions
Basic Chat
Copy
from bud import BudClient
client = BudClient()
response = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Explain quantum computing"}
]
)
print(response.choices[0].message.content)
Streaming Chat
Copy
stream = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Write a story"}],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
Function Calling
Copy
tools = [
{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get current weather",
"parameters": {
"type": "object",
"properties": {
"location": {"type": "string"},
"unit": {"type": "string", "enum": ["celsius", "fahrenheit"]}
},
"required": ["location"]
}
}
}
]
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "What's the weather in Paris?"}],
tools=tools,
tool_choice="auto"
)
if response.choices[0].message.tool_calls:
for tool_call in response.choices[0].message.tool_calls:
print(f"Function: {tool_call.function.name}")
print(f"Arguments: {tool_call.function.arguments}")
Embeddings
Single Text
Copy
response = client.embeddings.create(
model="text-embedding-3-small",
input="Machine learning is transforming industries"
)
embedding = response.data[0].embedding
print(f"Embedding dimension: {len(embedding)}")
Batch Embeddings
Copy
texts = [
"First document about AI",
"Second document about ML",
"Third document about data science"
]
response = client.embeddings.create(
model="text-embedding-3-small",
input=texts
)
for i, item in enumerate(response.data):
print(f"Document {i}: {len(item.embedding)} dimensions")
With Chunking
Copy
response = client.embeddings.create(
model="text-embedding-3-small",
input="Very long document text...",
chunking={
"enabled": True,
"strategy": "token",
"chunk_size": 512,
"chunk_overlap": 50
}
)
Pipelines
Simple Pipeline
Copy
from bud import Pipeline, Action
with Pipeline("hello-world") as p:
start = Action("start", type="log").with_config(
message="Starting pipeline"
)
end = Action("end", type="log").after(start).with_config(
message="Pipeline completed"
)
pipeline = client.pipelines.create(dag=p.to_dag(), name=p.name)
Data Processing Pipeline
Copy
with Pipeline("process-data") as p:
# Fetch data from API
fetch = Action("fetch", type="api_call").with_config(
url="https://api.example.com/data",
method="GET"
)
# Validate data
validate = Action("validate", type="condition").after(fetch).with_config(
condition="data.count > 0"
)
# Transform data
transform = Action("transform", type="transform").after(validate).with_config(
operation="normalize"
)
# Save to database
save = Action("save", type="database").after(transform).with_config(
table="processed_data",
operation="insert"
)
pipeline = client.pipelines.create(
dag=p.to_dag(),
name=p.name,
description="Fetch, validate, transform, and save data"
)
Parallel Processing
Copy
with Pipeline("parallel-workflow") as p:
start = Action("start", type="log")
# Run these in parallel
task1 = Action("task1", type="process").after(start)
task2 = Action("task2", type="process").after(start)
task3 = Action("task3", type="process").after(start)
# Wait for all to complete
end = Action("end", type="log").after(task1, task2, task3)
pipeline = client.pipelines.create(dag=p.to_dag(), name=p.name)
Executions
Create and Monitor Execution
Copy
# Create execution
execution = client.executions.create(
pipeline_id="pipeline-123",
params={"input": "data.csv"}
)
print(f"Execution ID: {execution.id}")
print(f"Status: {execution.status}")
# Poll for completion
import time
while execution.status in ["pending", "running"]:
time.sleep(2)
execution = client.executions.get(execution.id)
print(f"Status: {execution.status}")
print(f"Final status: {execution.status}")
Run and Wait
Copy
# Run ephemeral pipeline and wait for completion
result = client.executions.run_ephemeral(
pipeline_id="pipeline-123",
params={"input": "data.csv"},
wait=True
)
print(f"Result: {result}")
List Recent Executions
Copy
# Get recent executions
executions = client.executions.list(
status="completed",
limit=10
)
for execution in executions:
print(f"{execution.id}: {execution.status} - {execution.created_at}")
Async Client
Async Chat Completion
Copy
import asyncio
from bud import AsyncBudClient
async def main():
async with AsyncBudClient() as client:
response = await client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Hello!"}]
)
print(response.choices[0].message.content)
asyncio.run(main())
Concurrent Requests
Copy
async def process_multiple():
async with AsyncBudClient() as client:
tasks = [
client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": f"Question {i}"}]
)
for i in range(5)
]
responses = await asyncio.gather(*tasks)
for i, response in enumerate(responses):
print(f"Response {i}: {response.choices[0].message.content}")
asyncio.run(process_multiple())
Error Handling
Copy
from bud import BudClient
from bud.exceptions import BudAPIError, AuthenticationError
client = BudClient()
try:
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Hello"}]
)
except AuthenticationError as e:
print(f"Authentication failed: {e}")
except BudAPIError as e:
print(f"API error: {e.status_code} - {e.message}")
except Exception as e:
print(f"Unexpected error: {e}")
Complete Application Example
Copy
from bud import BudClient, Pipeline, Action
import os
# Initialize client
client = BudClient(api_key=os.environ["BUD_API_KEY"])
# Define data processing pipeline
with Pipeline("customer-insights") as p:
# Fetch customer data
fetch = Action("fetch-data", type="api_call").with_config(
url="https://api.example.com/customers",
method="GET"
)
# Analyze sentiment
analyze = Action("sentiment", type="classification").after(fetch).with_config(
model="sentiment-analyzer",
field="feedback"
)
# Generate insights with LLM
insights = Action("generate-insights", type="chat").after(analyze).with_config(
model="gpt-4",
prompt="Analyze this customer feedback: {{feedback}}"
)
# Save results
save = Action("save", type="database").after(insights).with_config(
table="customer_insights"
)
# Register pipeline
pipeline = client.pipelines.create(
dag=p.to_dag(),
name=p.name,
description="Automated customer insights generation"
)
# Execute pipeline
execution = client.executions.create(
pipeline_id=pipeline.id,
params={"customer_id": "12345"}
)
print(f"Pipeline execution started: {execution.id}")