How to build multi-source AI agents with Airbyte agents and LangChain in 2025
How to Build Multi-Source AI Agents with Airbyte Agents and LangChain in 2025
Building AI agents that can intelligently query and process data across multiple data sources has been a persistent challenge for developers. Most agent frameworks struggle with context management when dealing with heterogeneous data sources—your PostgreSQL database, Stripe API, and S3 bucket all have different schemas, authentication requirements, and query patterns. Airbyte Agents solves this by providing agents with standardized context across multiple data sources.
In this guide, we'll walk through building a practical multi-source AI agent that can answer business questions by querying both your application database and external APIs.
The Challenge: Context Fragmentation in Multi-Source Agents
When you try to build agents using traditional LangChain tools, you quickly hit several problems:
- Schema inconsistency: Each data source has different schemas and authentication methods
- Context bloat: Passing raw connection strings and credentials to agents is a security nightmare
- Tool explosion: You need to write custom tools for each data source integration
- Query optimization: Agents don't understand the performance characteristics of different sources
Airbyte Agents provides a unified abstraction layer that handles these concerns automatically.
Architecture: How Airbyte Agents Provides Unified Context
Airbyte Agents works by:
- Connecting to your data sources using Airbyte's 300+ pre-built connectors
- Extracting and normalizing schemas from each source
- Providing agents with a unified interface to query across sources
- Managing authentication and rate limits transparently
This means your LLM-powered agent only needs to understand one query interface, while Airbyte handles the complexity of different data source APIs and formats.
Step-by-Step: Building Your First Multi-Source Agent
Step 1: Install Dependencies
First, set up your Python environment with the required packages:
pip install airbyte-agents langchain openai python-dotenv
Step 2: Configure Your Data Sources
Create a configuration file for your Airbyte connections. You'll need credentials for each source:
from airbyte_agents import AirbyteConnector, AgentBuilder
from langchain.agents import Tool
import os
# Initialize connectors for your data sources
postgres_connector = AirbyteConnector(
source_type="postgres",
config={
"host": os.getenv("DB_HOST"),
"port": 5432,
"database": os.getenv("DB_NAME"),
"username": os.getenv("DB_USER"),
"password": os.getenv("DB_PASSWORD"),
}
)
stripe_connector = AirbyteConnector(
source_type="stripe",
config={
"api_key": os.getenv("STRIPE_API_KEY"),
"lookback_window": 30, # Last 30 days
}
)
s3_connector = AirbyteConnector(
source_type="s3",
config={
"bucket": os.getenv("AWS_BUCKET"),
"aws_access_key_id": os.getenv("AWS_KEY_ID"),
"aws_secret_access_key": os.getenv("AWS_SECRET_KEY"),
}
)
Step 3: Create Unified Schema Context
Airbyte automatically extracts schemas from your sources. You can inspect and filter what your agent can access:
# Get normalized schemas from all sources
schema_context = {
"postgres": postgres_connector.get_schema(),
"stripe": stripe_connector.get_schema(),
"s3": s3_connector.get_schema(),
}
print(schema_context)
# Output shows tables/fields available in each source with types and descriptions
Step 4: Build Query Tools for the Agent
Create LangChain tools that wrap Airbyte's unified query interface:
def query_postgres(sql_query: str) -> str:
"""Execute a query against the PostgreSQL database."""
try:
result = postgres_connector.execute_query(sql_query)
return str(result)
except Exception as e:
return f"Error querying database: {str(e)}"
def query_stripe(query_type: str, filters: dict) -> str:
"""Query Stripe data (customers, invoices, charges, subscriptions)."""
try:
result = stripe_connector.execute_query(query_type, filters)
return str(result)
except Exception as e:
return f"Error querying Stripe: {str(e)}"
def query_s3(path: str, operation: str) -> str:
"""Read files or list objects from S3."""
try:
result = s3_connector.execute_query(path, operation)
return str(result)
except Exception as e:
return f"Error querying S3: {str(e)}"
# Create LangChain tools
tools = [
Tool(
name="postgres_query",
func=query_postgres,
description="Query your PostgreSQL database. Use for user, order, and product data."
),
Tool(
name="stripe_query",
func=query_stripe,
description="Query Stripe API for payment and customer data. Supports: customers, charges, invoices, subscriptions."
),
Tool(
name="s3_query",
func=query_s3,
description="Read CSV, JSON, or Parquet files from S3 for analytics and reporting."
)
]
Step 5: Initialize the Agent with Context
Create an agent that understands all available schemas:
from langchain.chat_models import ChatOpenAI
from langchain.agents import initialize_agent, AgentType
import json
llm = ChatOpenAI(model="gpt-4", temperature=0)
# Create a system prompt with all available schema context
system_context = f"""
You are a data analyst agent with access to three data sources:
1. PostgreSQL Database:
{json.dumps(schema_context['postgres'], indent=2)}
2. Stripe API:
{json.dumps(schema_context['stripe'], indent=2)}
3. S3 Data Lake:
{json.dumps(schema_context['s3'], indent=2)}
When answering questions:
- First determine which data sources are needed
- Use appropriate tools to query each source
- Combine results from multiple sources if needed
- Always specify exact column names and filters in SQL queries
"""
agent = initialize_agent(
tools,
llm,
agent=AgentType.OPENAI_FUNCTIONS,
verbose=True,
system_message=system_context
)
Step 6: Query Your Multi-Source Agent
Now you can ask complex questions that span multiple sources:
# Example queries that require multiple sources
result = agent.run(
"What are the top 5 customers by revenue in the last 30 days? "
"Include their email from our database and their Stripe subscription status."
)
print(result)
# Another example: correlate database and S3 data
result = agent.run(
"Find customers in our database who churned in the last quarter. "
"Check their download activity from S3 logs to see if low engagement predicted churn."
)
print(result)
Best Practices: Optimizing Multi-Source Agents
Cache Schema Context
Schema extraction can be expensive. Cache it:
import json
from datetime import datetime, timedelta
def get_or_refresh_schema(connector, cache_file, max_age_hours=24):
if os.path.exists(cache_file):
file_age = datetime.now() - datetime.fromtimestamp(os.path.getmtime(cache_file))
if file_age < timedelta(hours=max_age_hours):
with open(cache_file, 'r') as f:
return json.load(f)
schema = connector.get_schema()
with open(cache_file, 'w') as f:
json.dump(schema, f)
return schema
Limit Agent Tool Access
Don't expose all tables to the agent. Create restricted connectors:
restrictive_postgres = AirbyteConnector(
source_type="postgres",
config={...},
allowed_schemas=["public"],
allowed_tables=["users", "orders", "products"], # Exclude sensitive tables
read_only=True # Prevent writes
)
Monitor and Log Agent Decisions
from langchain.callbacks import StdOutCallbackHandler
from langchain.callbacks.base import BaseCallbackHandler
class AgentLogger(BaseCallbackHandler):
def on_tool_start(self, serialized, input_str, **kwargs):
print(f"🔧 Using tool: {serialized['name']} with input: {input_str}")
def on_tool_end(self, output, **kwargs):
print(f"✅ Tool output: {output[:200]}...")
agent.run(
"Your question here",
callbacks=[AgentLogger()]
)
Comparison: Airbyte Agents vs Custom Solutions
| Aspect | Airbyte Agents | Custom LangChain Tools | Hardcoded Scripts | |--------|----------------|----------------------|------------------| | Setup time | 30 minutes | 2-3 hours | 1-2 days | | Data source support | 300+ connectors | Whatever you build | Limited to what exists | | Schema management | Automatic | Manual | Hardcoded | | Authentication | Centralized | Per-tool | Scattered | | Scalability | Handles new sources easily | Requires new tool code | Very brittle | | Cost | Reasonable | Your engineer time | High engineer maintenance |
Troubleshooting Common Issues
Agent doesn't find the right data source: Ensure schema context is detailed. Add example queries to the system prompt.
Rate limiting from Stripe/external APIs: Airbyte handles this, but set appropriate lookback windows in connector config.
Agent hallucinating columns: This usually means schema context is incomplete. Test connector.get_schema() directly.
Next Steps
With Airbyte Agents, you can now:
- Build data analysis assistants that work across your entire data stack
- Create customer support bots that access both internal and external data
- Develop anomaly detection agents that correlate signals from multiple sources
The unified context approach solves the hardest problem in multi-source agents: making the agent understand that your database schema, API responses, and file formats all represent coherent data that can be combined intelligently.
Start with 2-3 data sources, test your agent's outputs carefully, and expand from there.