How to Build Multi-Source Data Agents with Airbyte in 2025
How to Build Multi-Source Data Agents with Airbyte in 2025
Building AI agents that can query and reason across multiple data sources is a complex problem. Most developers struggle with context management—how do you give an LLM access to data from PostgreSQL, Salesforce, and Stripe simultaneously without overwhelming the model or creating security vulnerabilities?
Airbyte Agents solves this by providing a framework to manage context across multiple data connectors automatically. This guide walks you through building your first multi-source data agent.
The Multi-Source Data Agent Problem
When you build AI agents for data analysis, you hit three major pain points:
- Context Management: Different data sources have different schemas, authentication methods, and rate limits
- Source Switching: Agents struggle to know which tool to use for which query
- Latency: Making individual API calls to each source for every agent reasoning step is slow
Traditional approaches require you to manually define tool definitions for each data source, manage connection state, and handle fallback logic. Airbyte Agents automates the context layer so your agent can focus on reasoning.
Prerequisites
Before starting, ensure you have:
- Python 3.10 or later
- An Airbyte instance (cloud or self-hosted)
- At least two configured data sources (PostgreSQL, MySQL, API, or SaaS connector)
- OpenAI API key or compatible LLM endpoint
pipinstalled with virtual environment support
Step 1: Install Airbyte SDK and Dependencies
Start by setting up your Python environment:
python3 -m venv airbyte-agent-env
source airbyte-agent-env/bin/activate # On Windows: airbyte-agent-env\Scripts\activate
pip install airbyte-sdk anthropic python-dotenv pydantic
The airbyte-sdk package provides the agent framework. We're using Anthropic's Claude for the LLM, but you can swap this for OpenAI's GPT models.
Step 2: Configure Data Source Credentials
Create a .env file to store your data source credentials securely:
# .env
AIRBYTE_API_URL=https://your-airbyte-instance.com/api/v1
AIRBYTE_API_KEY=your_airbyte_api_key
# Data source credentials
POSTGRES_HOST=localhost
POSTGRES_PORT=5432
POSTGRES_USER=analytics_user
POSTGRES_PASSWORD=secure_password
POSTGRES_DATABASE=analytics_db
SALESFORCE_CLIENT_ID=your_sf_client_id
SALESFORCE_CLIENT_SECRET=your_sf_client_secret
ANTHROPIC_API_KEY=your_claude_api_key
Step 3: Initialize Your Agent with Data Source Context
Create the main agent file (data_agent.py):
import os
from dotenv import load_dotenv
from airbyte_sdk import AirbyteAgent, DataSourceConfig
from anthropic import Anthropic
load_dotenv()
# Define your data sources
data_sources = [
DataSourceConfig(
name="postgres_analytics",
type="postgres",
config={
"host": os.getenv("POSTGRES_HOST"),
"port": int(os.getenv("POSTGRES_PORT")),
"user": os.getenv("POSTGRES_USER"),
"password": os.getenv("POSTGRES_PASSWORD"),
"database": os.getenv("POSTGRES_DATABASE"),
},
schema_discovery=True, # Auto-introspect schema
),
DataSourceConfig(
name="salesforce_crm",
type="salesforce",
config={
"client_id": os.getenv("SALESFORCE_CLIENT_ID"),
"client_secret": os.getenv("SALESFORCE_CLIENT_SECRET"),
},
schema_discovery=True,
),
]
# Initialize the agent
agent = AirbyteAgent(
api_url=os.getenv("AIRBYTE_API_URL"),
api_key=os.getenv("AIRBYTE_API_KEY"),
data_sources=data_sources,
llm_provider="anthropic",
model="claude-3-5-sonnet-20241022",
)
print("Agent initialized with data sources:")
for source in agent.available_sources():
print(f" - {source.name}: {source.type}")
print(f" Tables: {', '.join(source.tables)}")
Run this to verify connectivity:
python data_agent.py
You should see all available tables from both sources printed.
Step 4: Build Query Functions for Each Source
Create a module that handles source-specific queries (queries.py):
from airbyte_sdk import SQLQuery, APIQuery
# PostgreSQL queries
def get_customer_revenue(customer_id: str):
"""Query total revenue from PostgreSQL analytics warehouse."""
return SQLQuery(
source="postgres_analytics",
query="""
SELECT customer_id, SUM(amount) as total_revenue
FROM orders
WHERE customer_id = %s
GROUP BY customer_id
""",
parameters=[customer_id],
)
def get_customer_churn_risk(customer_id: str):
"""Query churn prediction score from PostgreSQL model outputs."""
return SQLQuery(
source="postgres_analytics",
query="""
SELECT customer_id, churn_probability, last_predicted_at
FROM ml_predictions.churn_scores
WHERE customer_id = %s
ORDER BY last_predicted_at DESC LIMIT 1
""",
parameters=[customer_id],
)
# Salesforce queries
def get_customer_account_data(customer_id: str):
"""Query account info from Salesforce CRM."""
return APIQuery(
source="salesforce_crm",
endpoint="/sobjects/Account",
filters={"Name": customer_id},
)
def get_customer_support_cases(customer_id: str):
"""Query open support cases from Salesforce."""
return APIQuery(
source="salesforce_crm",
endpoint="/sobjects/Case",
filters={"AccountId": customer_id, "Status": "Open"},
)
Step 5: Run Multi-Source Queries Through the Agent
Extend your main agent file to handle conversations:
from queries import (
get_customer_revenue,
get_customer_churn_risk,
get_customer_account_data,
get_customer_support_cases,
)
# Register query functions with the agent
agent.register_tool(
name="get_customer_revenue",
function=get_customer_revenue,
description="Get total revenue for a customer from the analytics warehouse",
)
agent.register_tool(
name="get_customer_churn_risk",
function=get_customer_churn_risk,
description="Get churn prediction score for a customer",
)
agent.register_tool(
name="get_customer_account_data",
function=get_customer_account_data,
description="Get CRM account information from Salesforce",
)
agent.register_tool(
name="get_customer_support_cases",
function=get_customer_support_cases,
description="Get open support tickets from Salesforce",
)
# Example conversation
message = "What's the revenue and churn risk for customer ACME-001? Also, what open support cases do they have?"
response = agent.run(message)
print(f"Query: {message}")
print(f"Response: {response.text}")
print(f"\nSources queried: {', '.join(response.sources_used)}")
print(f"Execution time: {response.execution_time_ms}ms")
Step 6: Handle Context Window Limitations
When working with multiple data sources, you'll quickly hit context window limits. Airbyte Agents provides summarization:
# Configure context management
agent.configure_context(
max_tokens=4000, # Reserve space for reasoning
summarize_large_results=True,
summary_strategy="importance", # Prioritize relevant rows
cache_schema_introspection=True, # Cache table schemas
)
This ensures the agent can reason over large datasets without wasting tokens on repetitive schema information.
Step 7: Add Error Handling for Source Failures
Multi-source queries will sometimes fail. Handle gracefully:
from airbyte_sdk import QueryError, SourceUnavailableError
try:
response = agent.run(
"Give me a customer health score combining revenue, churn risk, and support sentiment",
timeout_seconds=30,
)
except SourceUnavailableError as e:
print(f"Data source unavailable: {e.source_name}")
response = agent.run(
"Give me just the revenue and churn risk (Salesforce is down)",
available_sources=["postgres_analytics"],
)
except QueryError as e:
print(f"Query execution failed: {e.message}")
print(f"Failed source: {e.source_name}")
Performance Optimization Tips
1. Index your database columns used in agent queries:
CREATE INDEX idx_customer_id_orders ON orders(customer_id);
CREATE INDEX idx_customer_churn ON ml_predictions.churn_scores(customer_id);
2. Use result caching for frequently accessed data:
agent.enable_caching(
ttl_seconds=300, # Cache results for 5 minutes
max_cache_size_mb=100,
)
3. Batch queries when querying multiple customers:
customer_ids = ["ACME-001", "ACME-002", "ACME-003"]
results = agent.batch_run(
query="Summarize revenue and churn risk",
parameter_sets=[{"customer_id": cid} for cid in customer_ids],
parallel_workers=3,
)
Common Pitfalls to Avoid
- Over-fetching data: Let the agent decide what it needs. Don't return entire tables.
- Missing schema documentation: Use
descriptionfields to help the agent understand field meanings. - Ignoring authentication timeouts: Salesforce tokens expire. Implement refresh logic.
- Not setting query timeouts: Slow queries will block the agent. Set
timeout_secondson all queries.
Conclusion
Airbyte Agents abstracts away the complexity of managing multiple data sources for AI agents. By handling context management, schema discovery, and query routing automatically, you can focus on building intelligent features instead of infrastructure.
Start with two sources and gradually add more as your agent matures. Monitor query execution times and cache hit rates to optimize over time.
Recommended Tools
- Anthropic Claude APIBuild AI-powered applications with Claude