How to build multi-source AI agents with Airbyte agents and LangChain in 2025

How to Build Multi-Source AI Agents with Airbyte Agents and LangChain in 2025

Building AI agents that can intelligently query and process data across multiple data sources has been a persistent challenge for developers. Most agent frameworks struggle with context management when dealing with heterogeneous data sources—your PostgreSQL database, Stripe API, and S3 bucket all have different schemas, authentication requirements, and query patterns. Airbyte Agents solves this by providing agents with standardized context across multiple data sources.

In this guide, we'll walk through building a practical multi-source AI agent that can answer business questions by querying both your application database and external APIs.

The Challenge: Context Fragmentation in Multi-Source Agents

When you try to build agents using traditional LangChain tools, you quickly hit several problems:

  • Schema inconsistency: Each data source has different schemas and authentication methods
  • Context bloat: Passing raw connection strings and credentials to agents is a security nightmare
  • Tool explosion: You need to write custom tools for each data source integration
  • Query optimization: Agents don't understand the performance characteristics of different sources

Airbyte Agents provides a unified abstraction layer that handles these concerns automatically.

Architecture: How Airbyte Agents Provides Unified Context

Airbyte Agents works by:

  1. Connecting to your data sources using Airbyte's 300+ pre-built connectors
  2. Extracting and normalizing schemas from each source
  3. Providing agents with a unified interface to query across sources
  4. Managing authentication and rate limits transparently

This means your LLM-powered agent only needs to understand one query interface, while Airbyte handles the complexity of different data source APIs and formats.

Step-by-Step: Building Your First Multi-Source Agent

Step 1: Install Dependencies

First, set up your Python environment with the required packages:

pip install airbyte-agents langchain openai python-dotenv

Step 2: Configure Your Data Sources

Create a configuration file for your Airbyte connections. You'll need credentials for each source:

from airbyte_agents import AirbyteConnector, AgentBuilder
from langchain.agents import Tool
import os

# Initialize connectors for your data sources
postgres_connector = AirbyteConnector(
    source_type="postgres",
    config={
        "host": os.getenv("DB_HOST"),
        "port": 5432,
        "database": os.getenv("DB_NAME"),
        "username": os.getenv("DB_USER"),
        "password": os.getenv("DB_PASSWORD"),
    }
)

stripe_connector = AirbyteConnector(
    source_type="stripe",
    config={
        "api_key": os.getenv("STRIPE_API_KEY"),
        "lookback_window": 30,  # Last 30 days
    }
)

s3_connector = AirbyteConnector(
    source_type="s3",
    config={
        "bucket": os.getenv("AWS_BUCKET"),
        "aws_access_key_id": os.getenv("AWS_KEY_ID"),
        "aws_secret_access_key": os.getenv("AWS_SECRET_KEY"),
    }
)

Step 3: Create Unified Schema Context

Airbyte automatically extracts schemas from your sources. You can inspect and filter what your agent can access:

# Get normalized schemas from all sources
schema_context = {
    "postgres": postgres_connector.get_schema(),
    "stripe": stripe_connector.get_schema(),
    "s3": s3_connector.get_schema(),
}

print(schema_context)
# Output shows tables/fields available in each source with types and descriptions

Step 4: Build Query Tools for the Agent

Create LangChain tools that wrap Airbyte's unified query interface:

def query_postgres(sql_query: str) -> str:
    """Execute a query against the PostgreSQL database."""
    try:
        result = postgres_connector.execute_query(sql_query)
        return str(result)
    except Exception as e:
        return f"Error querying database: {str(e)}"

def query_stripe(query_type: str, filters: dict) -> str:
    """Query Stripe data (customers, invoices, charges, subscriptions)."""
    try:
        result = stripe_connector.execute_query(query_type, filters)
        return str(result)
    except Exception as e:
        return f"Error querying Stripe: {str(e)}"

def query_s3(path: str, operation: str) -> str:
    """Read files or list objects from S3."""
    try:
        result = s3_connector.execute_query(path, operation)
        return str(result)
    except Exception as e:
        return f"Error querying S3: {str(e)}"

# Create LangChain tools
tools = [
    Tool(
        name="postgres_query",
        func=query_postgres,
        description="Query your PostgreSQL database. Use for user, order, and product data."
    ),
    Tool(
        name="stripe_query",
        func=query_stripe,
        description="Query Stripe API for payment and customer data. Supports: customers, charges, invoices, subscriptions."
    ),
    Tool(
        name="s3_query",
        func=query_s3,
        description="Read CSV, JSON, or Parquet files from S3 for analytics and reporting."
    )
]

Step 5: Initialize the Agent with Context

Create an agent that understands all available schemas:

from langchain.chat_models import ChatOpenAI
from langchain.agents import initialize_agent, AgentType
import json

llm = ChatOpenAI(model="gpt-4", temperature=0)

# Create a system prompt with all available schema context
system_context = f"""
You are a data analyst agent with access to three data sources:

1. PostgreSQL Database:
{json.dumps(schema_context['postgres'], indent=2)}

2. Stripe API:
{json.dumps(schema_context['stripe'], indent=2)}

3. S3 Data Lake:
{json.dumps(schema_context['s3'], indent=2)}

When answering questions:
- First determine which data sources are needed
- Use appropriate tools to query each source
- Combine results from multiple sources if needed
- Always specify exact column names and filters in SQL queries
"""

agent = initialize_agent(
    tools,
    llm,
    agent=AgentType.OPENAI_FUNCTIONS,
    verbose=True,
    system_message=system_context
)

Step 6: Query Your Multi-Source Agent

Now you can ask complex questions that span multiple sources:

# Example queries that require multiple sources
result = agent.run(
    "What are the top 5 customers by revenue in the last 30 days? "
    "Include their email from our database and their Stripe subscription status."
)

print(result)

# Another example: correlate database and S3 data
result = agent.run(
    "Find customers in our database who churned in the last quarter. "
    "Check their download activity from S3 logs to see if low engagement predicted churn."
)

print(result)

Best Practices: Optimizing Multi-Source Agents

Cache Schema Context

Schema extraction can be expensive. Cache it:

import json
from datetime import datetime, timedelta

def get_or_refresh_schema(connector, cache_file, max_age_hours=24):
    if os.path.exists(cache_file):
        file_age = datetime.now() - datetime.fromtimestamp(os.path.getmtime(cache_file))
        if file_age < timedelta(hours=max_age_hours):
            with open(cache_file, 'r') as f:
                return json.load(f)
    
    schema = connector.get_schema()
    with open(cache_file, 'w') as f:
        json.dump(schema, f)
    return schema

Limit Agent Tool Access

Don't expose all tables to the agent. Create restricted connectors:

restrictive_postgres = AirbyteConnector(
    source_type="postgres",
    config={...},
    allowed_schemas=["public"],
    allowed_tables=["users", "orders", "products"],  # Exclude sensitive tables
    read_only=True  # Prevent writes
)

Monitor and Log Agent Decisions

from langchain.callbacks import StdOutCallbackHandler
from langchain.callbacks.base import BaseCallbackHandler

class AgentLogger(BaseCallbackHandler):
    def on_tool_start(self, serialized, input_str, **kwargs):
        print(f"🔧 Using tool: {serialized['name']} with input: {input_str}")
    
    def on_tool_end(self, output, **kwargs):
        print(f"✅ Tool output: {output[:200]}...")

agent.run(
    "Your question here",
    callbacks=[AgentLogger()]
)

Comparison: Airbyte Agents vs Custom Solutions

| Aspect | Airbyte Agents | Custom LangChain Tools | Hardcoded Scripts | |--------|----------------|----------------------|------------------| | Setup time | 30 minutes | 2-3 hours | 1-2 days | | Data source support | 300+ connectors | Whatever you build | Limited to what exists | | Schema management | Automatic | Manual | Hardcoded | | Authentication | Centralized | Per-tool | Scattered | | Scalability | Handles new sources easily | Requires new tool code | Very brittle | | Cost | Reasonable | Your engineer time | High engineer maintenance |

Troubleshooting Common Issues

Agent doesn't find the right data source: Ensure schema context is detailed. Add example queries to the system prompt.

Rate limiting from Stripe/external APIs: Airbyte handles this, but set appropriate lookback windows in connector config.

Agent hallucinating columns: This usually means schema context is incomplete. Test connector.get_schema() directly.

Next Steps

With Airbyte Agents, you can now:

  • Build data analysis assistants that work across your entire data stack
  • Create customer support bots that access both internal and external data
  • Develop anomaly detection agents that correlate signals from multiple sources

The unified context approach solves the hardest problem in multi-source agents: making the agent understand that your database schema, API responses, and file formats all represent coherent data that can be combined intelligently.

Start with 2-3 data sources, test your agent's outputs carefully, and expand from there.

Recommended Tools

  • SupabaseOpen source Firebase alternative with Postgres
  • RenderZero-DevOps cloud platform for web apps and APIs