How to build multi-source data agents with Airbyte and LangChain in 2025
How to Build Multi-Source Data Agents with Airbyte and LangChain in 2025
Building AI agents that can reason over data from multiple sources is one of the hardest problems in modern LLM applications. Your agent might need to query a PostgreSQL database, fetch data from a Stripe API, and pull historical records from a data warehouse—all within a single decision-making loop. Without proper context injection, the agent flails blindly.
Airbyte Agents solve this by giving your LLM direct access to normalized data connectors across dozens of platforms. Combined with LangChain, you can build agents that understand your entire data landscape in seconds, not weeks.
Why Standard Agent Patterns Fall Short
Most developers start by handing an agent a list of tool definitions—database query functions, API wrappers, file readers. The agent calls them sequentially, but:
- Context explosion: Each tool call returns raw data the agent must parse and reason over
- Connector hell: Building 20 different data connectors (Stripe, Salesforce, PostgreSQL, S3, etc.) takes months
- Latency: Multiple tool calls to fetch context means slow decisions
- Type mismatches: Normalizing schemas across systems is manual and fragile
Airbyte Agents pre-normalize and contextualize data from all your sources, so the agent receives structured, query-ready datasets instead of raw API responses.
Architecture: Airbyte + LangChain Integration
The pattern has three layers:
- Data layer: Airbyte connectors sync from your sources into a unified destination (Postgres, Snowflake, DuckDB)
- Retrieval layer: LangChain agents query normalized tables and fetch relevant context
- Decision layer: The LLM receives clean, structured data and makes decisions
Unlike traditional ETL, Airbyte Agents can run incremental syncs on-demand within an agent's reasoning loop, keeping context fresh without re-syncing everything.
Step 1: Set Up Airbyte with Multiple Connectors
Start by spinning up Airbyte (self-hosted or Airbyte Cloud). For this example, we'll connect three sources:
- PostgreSQL (production database)
- Stripe (billing data)
- Google Sheets (manual configurations)
# If running self-hosted with Docker
docker run -d \n -p 8000:8000 \n -p 8001:8001 \n -v airbyte_db:/data \n -e DATABASE_HOST=localhost \n airbyte/server:latest
Navigate to http://localhost:8000, create connections for each source, and configure them to write to a PostgreSQL destination.
Key setup choice: Use the same destination schema for all sources. This allows your agent to join and correlate data across platforms seamlessly.
Step 2: Build a LangChain Agent with Airbyte Context
Once data is synced, create a LangChain agent that treats normalized tables as tools.
from langchain.agents import Tool, initialize_agent, AgentType
from langchain.llms import OpenAI
from sqlalchemy import create_engine, MetaData, inspect
import json
# Connect to destination database
engine = create_engine("postgresql://user:pass@localhost/airbyte_db")
metadata = MetaData()
metadata.reflect(bind=engine)
# Introspect available tables from Airbyte syncs
inspector = inspect(engine)
tables = inspector.get_table_names()
def query_table(table_name: str, filters: str = "") -> str:
"""Query a normalized Airbyte table."""
query = f"SELECT * FROM {table_name}"
if filters:
query += f" WHERE {filters}"
query += " LIMIT 10"
with engine.connect() as conn:
result = conn.execute(query)
rows = result.fetchall()
return json.dumps([dict(row) for row in rows])
# Create tools for each table
tools = []
for table in tables:
tools.append(
Tool(
name=f"query_{table}",
func=lambda t=table: query_table(t),
description=f"Query the {table} table from Airbyte. Use this to fetch customer, transaction, or configuration data."
)
)
# Initialize agent
llm = OpenAI(model="gpt-4", temperature=0)
agent = initialize_agent(
tools,
llm,
agent=AgentType.OPENAI_FUNCTIONS,
verbose=True
)
# Run agent
result = agent.run(
"Find all customers from the Stripe connector who spent over $1000 in the last 30 days "
"and check their most recent transaction status."
)
print(result)
This agent can now:
- Understand which tables exist from Airbyte syncs
- Write SQL WHERE clauses to filter data
- Correlate information across sources
- Return structured results for downstream processing
Step 3: Add Incremental Refresh for Fresh Context
By default, Airbyte syncs run on a schedule. For agents making real-time decisions, add on-demand refresh:
from airbyte_api_client import ApiClient, Configuration
config = Configuration()
config.api_key["X-Airbyte-Secret-Header"] = "your-api-key"
api_client = ApiClient(config)
def refresh_source_and_query(connection_id: str, query: str):
"""Trigger Airbyte sync, wait for completion, then query."""
# Trigger manual sync
from airbyte_api_client.api.connections_api import ConnectionsApi
api = ConnectionsApi(api_client)
job = api.trigger_connection_manual(connection_id)
job_id = job.job_id
# Poll until complete
import time
status = "running"
while status == "running":
job_status = api.get_job(job_id)
status = job_status.job.status
time.sleep(2)
# Now execute query on fresh data
with engine.connect() as conn:
result = conn.execute(query)
return result.fetchall()
This pattern ensures your agent always operates on the freshest data without maintaining custom sync logic.
Step 4: Handle Complex Multi-Source Scenarios
Real agents need to join and correlate data. Use LangChain's tool definitions to guide the LLM:
def correlation_query(customer_id: str) -> str:
"""Get unified customer view across all sources."""
query = f"""
SELECT
c.id, c.email,
COUNT(s.id) as stripe_transactions,
SUM(s.amount) as total_spent,
gs.company_name,
gs.contract_status
FROM postgres_customers c
LEFT JOIN stripe_charges s ON c.stripe_id = s.customer_id
LEFT JOIN google_sheets_config gs ON c.id = gs.customer_id
WHERE c.id = {customer_id}
GROUP BY c.id, c.email, gs.company_name, gs.contract_status
"""
with engine.connect() as conn:
result = conn.execute(query)
return json.dumps(dict(result.fetchone()))
tools.append(
Tool(
name="get_unified_customer",
func=correlation_query,
description="Get a complete customer profile joining PostgreSQL, Stripe, and Google Sheets data."
)
)
Common Pitfalls to Avoid
1. Over-syncing large tables: Airbyte will sync full history. Use connector settings to limit scope (e.g., last 90 days of transactions).
2. Schema conflicts: If multiple sources have "user_id" columns, rename them during Airbyte setup (e.g., stripe_user_id, postgres_user_id).
3. Agent hallucinating non-existent columns: Introspect schema dynamically and pass table schemas to the LLM in the system prompt:
schema_info = {}
for table in tables:
columns = inspector.get_columns(table)
schema_info[table] = [col["name"] for col in columns]
system_prompt = f"""
You have access to the following data tables from Airbyte:
{json.dumps(schema_info, indent=2)}
Use the query tools to fetch relevant data before making decisions.
"""
4. Ignoring rate limits: The Airbyte API has rate limits. Cache frequently accessed data or use batch queries instead of row-by-row lookups.
Testing Your Agent
Before deploying, validate with known queries:
test_queries = [
"How many active customers do we have?",
"Which customers have failed payments in the last week?",
"Show me the top 5 customers by lifetime value.",
]
for query in test_queries:
print(f"\nQuery: {query}")
result = agent.run(query)
print(f"Result: {result}")
Validate that:
- The agent picks the correct tables
- Results match manual queries
- SQL syntax is valid
- Response times are acceptable (< 5 seconds per query)
Deployment Considerations
When moving to production:
- Use Airbyte Cloud for managed infrastructure and high availability
- Add observability: Log which data sources each agent query touches
- Implement caching: Store frequently accessed context (e.g., customer lists) in a vector database for faster retrieval
- Set up alerts: Monitor sync failures and query latencies
- Version control: Store Airbyte connection configs and LangChain agent definitions in Git
Next Steps
Once the basic agent works:
- Add memory: Use LangChain's ConversationMemory to give agents conversation history
- Expand sources: Add CRM (Salesforce), analytics (Mixpanel), and code repositories (GitHub)
- Implement retrieval augmentation: Use embeddings to find relevant context before querying
- A/B test prompts: Experiment with system messages to improve agent decision quality
Airbyte Agents dramatically reduce the friction of building context-aware AI applications. By normalizing data upfront, you let your LLM focus on reasoning instead of parsing APIs.
Recommended Tools
- VercelDeploy frontend apps instantly with zero config
- SupabaseOpen source Firebase alternative with Postgres
- Anthropic Claude APIBuild AI-powered applications with Claude