How to Build Multi-Source Data Agents with Airbyte and OpenAI in 2025

How to Build Multi-Source Data Agents with Airbyte and OpenAI in 2025

Building AI agents that can intelligently query data across multiple sources is one of the most powerful use cases for LLMs in 2025. However, most developers struggle with the plumbing: how do you give an agent safe, structured access to disparate databases, APIs, and data warehouses?

Airbyte Agents solve this by providing a unified context layer that lets your agent understand and query multiple data sources without hardcoding connection logic. This guide walks you through building a production-ready multi-source data agent.

Why Traditional Approaches Fall Short

Before Airbyte Agents, developers typically handled multi-source data access in one of two brittle ways:

  1. Hard-coded SQL connectors: Each data source needs custom connection logic, authentication handling, and schema management. Adding a new source means modifying your agent code.

  2. Unified data warehouse approach: Syncing all data to a single warehouse introduces latency, complexity, and costs. Your agent becomes as slow as your slowest data pipeline.

  3. LLM-generated queries: Letting the model write raw SQL or API calls is dangerous and unreliable without strict guardrails.

Airbyte Agents abstract away these problems by providing agents with a standardized interface to any connected data source.

Architecture: How Airbyte Agents Work

Airbyte Agents operate on a simple principle: normalize data source metadata and connections into a queryable format that LLMs understand natively.

Here's the flow:

  1. Connection Registry: Airbyte maintains all your configured connections (PostgreSQL, Snowflake, Stripe API, MongoDB, etc.)
  2. Schema Discovery: Your agent automatically gets context about available tables, fields, and relationships
  3. Query Translation: The agent formulates requests using Airbyte's schema vocabulary
  4. Execution & Caching: Queries execute through Airbyte connectors with built-in result caching
  5. Response Assembly: Results are formatted and returned to your agent's reasoning loop

The key advantage: your agent never writes raw SQL or authentication code. It just understands "what data exists" and "how to ask for it."

Step 1: Set Up Airbyte with Multiple Sources

First, ensure you have Airbyte running (Cloud or Self-Hosted) with at least two configured sources. For this guide, we'll use:

  • PostgreSQL: Customer transaction data
  • Stripe API: Payment and subscription data
# If using Airbyte Cloud, configure sources via UI
# If self-hosted, use Docker Compose
docker-compose up -d

# Access at http://localhost:8000

In the Airbyte UI, add your sources and test connections. Don't create any destinations yet—we'll query sources directly through agents.

Step 2: Initialize Your Agent with Airbyte Context

Create a Python environment with the necessary dependencies:

pip install openai airbyte-python-connector langchain python-dotenv

Here's the core setup:

import os
from openai import OpenAI
from langchain.agents import Tool, AgentExecutor, initialize_agent
from langchain.llms import OpenAI as LangChainOpenAI
from typing import Any, Dict, List
import json

# Initialize OpenAI client
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

# Airbyte configuration
AIRBYTE_API_URL = os.getenv("AIRBYTE_API_URL", "http://localhost:8000/api/v1")
AIRBYTE_API_TOKEN = os.getenv("AIRBYTE_API_TOKEN")

class AirbyteSourceContext:
    """Manages agent context for Airbyte sources"""
    
    def __init__(self, api_url: str, api_token: str):
        self.api_url = api_url
        self.api_token = api_token
        self.sources = {}
        self.schema_cache = {}
    
    def discover_sources(self) -> Dict[str, Any]:
        """Fetch all configured sources from Airbyte"""
        # This endpoint returns your configured sources
        headers = {"Authorization": f"Bearer {self.api_token}"}
        
        # In production, fetch from: {self.api_url}/sources
        # For demo, using hardcoded config
        self.sources = {
            "postgres_prod": {
                "type": "postgres",
                "tables": ["customers", "transactions", "products"]
            },
            "stripe_live": {
                "type": "stripe",
                "tables": ["charges", "subscriptions", "invoices"]
            }
        }
        return self.sources
    
    def get_schema(self, source_name: str) -> str:
        """Get human-readable schema for a source"""
        if source_name in self.schema_cache:
            return self.schema_cache[source_name]
        
        # Fetch schema from Airbyte
        schema_info = f"""
        Source: {source_name}
        Tables:
        - customers (id INT, email VARCHAR, signup_date TIMESTAMP, stripe_customer_id VARCHAR)
        - transactions (id INT, customer_id INT, amount DECIMAL, created_at TIMESTAMP)
        - charges (id VARCHAR, customer_id VARCHAR, amount INT, status VARCHAR)
        """
        
        self.schema_cache[source_name] = schema_info
        return schema_info

# Initialize context
airbyte = AirbyteSourceContext(AIRBYTE_API_URL, AIRBYTE_API_TOKEN)
airbyte.discover_sources()

Step 3: Create Structured Query Tools

Now bind Airbyte sources to agent tools:

def query_postgres(query: str) -> str:
    """Query PostgreSQL source through Airbyte"""
    # In production, use Airbyte SDK or REST API
    try:
        # Validate query doesn't contain dangerous operations
        if any(op in query.upper() for op in ["DROP", "DELETE", "TRUNCATE"]):
            return "Error: Destructive operations not allowed"
        
        # Execute through Airbyte connector
        result = f"Executed: {query}\nReturned 42 rows"
        return result
    except Exception as e:
        return f"Query error: {str(e)}"

def query_stripe(query: str) -> str:
    """Query Stripe source through Airbyte"""
    try:
        # Stripe query validation
        result = f"Queried Stripe: {query}\nReturned 15 records"
        return result
    except Exception as e:
        return f"Stripe API error: {str(e)}"

# Define tools for LangChain
tools = [
    Tool(
        name="QueryPostgres",
        func=query_postgres,
        description="""Query customer transaction data from PostgreSQL.
        Schema: customers (id, email, signup_date), transactions (id, customer_id, amount, created_at)
        Use SQL syntax. Example: SELECT * FROM customers WHERE signup_date > '2025-01-01'"""
    ),
    Tool(
        name="QueryStripe",
        func=query_stripe,
        description="""Query payment data from Stripe.
        Available tables: charges (id, customer_id, amount, status), subscriptions (id, customer_id, status)
        Use natural language queries. Example: Get all charges for customer cus_abc123"""
    )
]

Step 4: Initialize and Run Your Agent

# Initialize LangChain agent
llm = LangChainOpenAI(temperature=0, model="gpt-4o-mini")

agent = initialize_agent(
    tools,
    llm,
    agent="zero-shot-react-description",
    verbose=True
)

# Example agent execution
query = """Find customers who signed up in January 2025 and have made at least 
3 transactions in the last month. For each, check if they have active Stripe subscriptions."""

result = agent.run(query)
print(f"Agent result:\n{result}")

Comparison: Direct Query vs. Agent-Based Access

| Aspect | Direct SQL/API | Airbyte Agents | |--------|----------------|----------------| | Setup Time | Hours (auth, validation per source) | Minutes (configure once) | | Code Maintainability | High (hardcoded connections) | Low (abstracted layer) | | Schema Changes | Manual updates | Auto-discovered | | Access Control | Custom per query | Centralized in Airbyte | | Caching | Manual implementation | Built-in | | Multi-source Queries | Complex joins/unions | Natural agent reasoning |

Best Practices for Production

1. Implement Query Validation: Never let agents execute arbitrary queries. Use allowlists:

ALLOWED_TABLES = {
    "postgres": ["customers", "transactions"],
    "stripe": ["charges", "subscriptions"]
}

2. Cache Schema Context: Embed your Airbyte schema in the system prompt rather than fetching repeatedly:

system_prompt = f"""You are a data analyst with access to these sources:
{airbyte.get_schema('postgres_prod')}
{airbyte.get_schema('stripe_live')}

Use the QueryPostgres tool for customer data. Use QueryStripe for payment data."""

3. Use Connection Pooling: For high-volume queries, configure Airbyte's connection pooling settings in docker-compose.yml.

4. Audit Agent Queries: Log all executed queries for compliance:

import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("agent_audit")
logger.info(f"Agent executed: {query}")

Common Gotchas

  • Schema Drift: Airbyte caches schemas. Refresh after adding new tables: airbyte.discover_sources(force=True)
  • Rate Limiting: Stripe API has strict rate limits. Use exponential backoff in retry logic
  • Timezone Mismatches: PostgreSQL and Stripe use different timezone conventions. Normalize in your schema context
  • Cost Explosion: Each agent query can trigger multiple data source hits. Implement query budgets

Next Steps

With multi-source agents in place, you can:

  1. Build chatbots that answer business questions across databases
  2. Implement automated data quality checks
  3. Create self-service BI tools for non-technical users
  4. Build data synchronization workflows triggered by agent decisions

Airbyte Agents abstract the complexity of managing multiple data connections, letting your agent focus on reasoning rather than plumbing.