How to connect multiple data sources with Airbyte Agents for ETL pipelines 2025

How to Connect Multiple Data Sources with Airbyte Agents for ETL Pipelines (2025)

Managing ETL workflows across multiple data sources is a common challenge for backend developers. You need to orchestrate connections between databases, APIs, and cloud storage while maintaining context about schema relationships, data freshness, and transformation logic. Airbyte Agents solve this by providing an agent-based framework that maintains context across disparate sources automatically.

This guide walks you through configuring Airbyte Agents to handle multi-source ETL pipelines without manually managing state between connections.

Why Airbyte Agents for Multi-Source ETL

Traditional Airbyte workflows require you to create separate connectors for each source-destination pair and manually handle orchestration. When dealing with 5+ data sources feeding into a data warehouse, this becomes brittle:

  • State management breaks across connector instances
  • Schema drift in one source cascades without warning
  • Context loss between sync jobs means duplicate logic
  • Incremental sync coordination requires external scheduling tools

Airbyte Agents introduce agentic reasoning—the system maintains awareness of all connected sources, their current state, and transformation requirements. The agent can dynamically decide which sources to sync, handle dependencies, and apply context-aware transformations.

Prerequisites and Architecture

Before setting up, ensure you have:

  • Airbyte Cloud or Self-Hosted instance (version 0.60.0+)
  • At least 2 data sources ready (PostgreSQL, MySQL, API, S3, etc.)
  • Target warehouse (Snowflake, BigQuery, or Postgres)
  • API credentials for all sources stored securely

The Airbyte Agent architecture works like this:

  1. Agent receives a synchronization goal ("sync customer data from 3 sources")
  2. Agent analyzes source schemas and active connections
  3. Agent determines optimal sync order (respecting foreign key dependencies)
  4. Agent executes syncs while maintaining shared context
  5. Agent applies unified transformations and validation

Step 1: Configure Your Data Sources with Context Tagging

Start by setting up your source connectors with metadata that helps the agent understand relationships:

# example-sources.yml
sources:
  - name: production_postgres
    type: postgres
    host: prod-db.example.com
    database: core
    metadata:
      domain: "customer_data"
      sync_priority: 1
      depends_on: null
      
  - name: analytics_mysql
    type: mysql
    host: analytics-db.example.com
    database: events
    metadata:
      domain: "customer_data"
      sync_priority: 2
      depends_on: "production_postgres"
      
  - name: crm_api_source
    type: http
    url: https://api.crm.example.com/v2
    metadata:
      domain: "customer_data"
      sync_priority: 3
      depends_on: "production_postgres"

The sync_priority and depends_on fields let the agent understand which sources should sync first. Source 1 syncs immediately, source 2 waits for source 1 completion, etc.

Step 2: Define Agent Workspace and Connection Pools

Create an agent workspace that groups related connectors:

# agent-workspace.yml
workspace:
  name: customer_360_agent
  description: Multi-source agent for unified customer view
  
  connection_pool:
    max_concurrent_syncs: 3
    timeout_minutes: 60
    retry_policy:
      max_attempts: 2
      backoff_multiplier: 2
      
  source_groups:
    - group_id: crm_sources
      sources:
        - production_postgres
        - crm_api_source
      shared_context:
        customer_id_column: "id"
        update_watermark_key: "updated_at"
        
    - group_id: events_sources
      sources:
        - analytics_mysql
      shared_context:
        event_timestamp_column: "event_time"
        partition_key: "date"

This configuration allows the agent to understand which sources share common keys and how to coordinate incremental syncs.

Step 3: Implement Agent Logic in Python

Use the Airbyte Python SDK to define agent behavior:

from airbyte_agents import Agent, SourceContext, SyncPlan
from airbyte import Client
import logging

logger = logging.getLogger(__name__)

class MultiSourceETLAgent(Agent):
    def __init__(self, workspace_config: dict):
        super().__init__()
        self.client = Client(token=os.getenv("AIRBYTE_API_TOKEN"))
        self.workspace_config = workspace_config
        self.source_contexts: dict[str, SourceContext] = {}
        
    async def analyze_sources(self) -> SyncPlan:
        """Analyze all sources and determine optimal sync order"""
        plan = SyncPlan()
        
        for source_config in self.workspace_config["sources"]:
            source_name = source_config["name"]
            
            # Check schema for changes
            schema = await self.client.get_source_schema(source_name)
            
            # Create source context maintaining state
            context = SourceContext(
                name=source_name,
                domain=source_config["metadata"]["domain"],
                last_sync_cursor=await self._get_last_cursor(source_name),
                schema_hash=self._hash_schema(schema)
            )
            self.source_contexts[source_name] = context
            
            # Add to plan respecting dependencies
            priority = source_config["metadata"]["sync_priority"]
            depends_on = source_config["metadata"].get("depends_on")
            plan.add_source(source_name, priority, depends_on)
            
        return plan
        
    async def execute_sync(self, sync_plan: SyncPlan) -> dict:
        """Execute syncs in planned order, maintaining context"""
        results = {}
        
        for batch in sync_plan.batches:  # Sources grouped by dependency
            batch_results = await asyncio.gather(*[
                self._sync_source_with_context(source)
                for source in batch
            ])
            results.update(batch_results)
            
        return results
        
    async def _sync_source_with_context(self, source_name: str) -> dict:
        """Sync a single source using maintained context"""
        context = self.source_contexts[source_name]
        
        logger.info(f"Syncing {source_name} from cursor {context.last_sync_cursor}")
        
        try:
            job = await self.client.trigger_sync(
                connection_id=source_name,
                sync_mode="incremental",
                cursor=context.last_sync_cursor
            )
            
            # Wait for completion
            job_status = await self.client.wait_for_job(job.id, timeout=3600)
            
            if job_status["status"] == "succeeded":
                context.last_sync_cursor = job_status["end_cursor"]
                logger.info(f"{source_name} synced successfully")
                return {source_name: {"status": "success", "rows": job_status["record_count"]}}
            else:
                logger.error(f"{source_name} sync failed: {job_status['error']}")
                return {source_name: {"status": "failed", "error": job_status["error"]}}
                
        except Exception as e:
            logger.error(f"Error syncing {source_name}: {str(e)}")
            return {source_name: {"status": "error", "error": str(e)}}

# Usage
agent = MultiSourceETLAgent(workspace_config)
sync_plan = await agent.analyze_sources()
results = await agent.execute_sync(sync_plan)
print(f"Sync results: {results}")

Comparison: Airbyte Agents vs. Traditional Orchestration

| Aspect | Airbyte Agents | Apache Airflow | Manual Python Scripts | |--------|---|---|---| | Context awareness | Automatic across sources | Requires DAG logic | Must implement manually | | Dependency handling | Built-in, declarative | DAG definition overhead | Error-prone | | Schema drift detection | Agent-powered | Manual operators | Not typically included | | Incremental sync coordination | Native to agent | Requires external tracking | Must build cursor management | | Setup time for 5+ sources | ~2 hours | ~8 hours | ~16 hours | | Operational complexity | Low | Medium | High |

Step 4: Handle Schema Drift and Data Quality

The agent can automatically respond to schema changes across sources:

async def validate_and_reconcile_schemas(self) -> bool:
    """Detect schema drift across sources and trigger reconciliation"""
    current_schemas = {}
    
    for source_name, context in self.source_contexts.items():
        schema = await self.client.get_source_schema(source_name)
        current_schema_hash = self._hash_schema(schema)
        
        if current_schema_hash != context.schema_hash:
            logger.warning(f"Schema drift detected in {source_name}")
            
            # Agent decides: update destination schema or pause sync?
            if await self._is_breaking_change(context.schema_hash, current_schema_hash):
                logger.error(f"Breaking schema change in {source_name}. Pausing syncs.")
                return False
            else:
                logger.info(f"Non-breaking change in {source_name}. Proceeding.")
                context.schema_hash = current_schema_hash
                
    return True

Step 5: Monitor Agent Performance

Set up monitoring to track multi-source sync efficiency:

from datetime import datetime, timedelta

def get_agent_metrics(self) -> dict:
    """Retrieve agent performance metrics"""
    last_24h = datetime.utcnow() - timedelta(hours=24)
    
    metrics = {
        "total_syncs": len(self.source_contexts),
        "successful_syncs": sum(1 for c in self.source_contexts.values() if c.last_sync_status == "success"),
        "failed_syncs": sum(1 for c in self.source_contexts.values() if c.last_sync_status == "failed"),
        "avg_sync_time_minutes": self._calculate_avg_sync_time(),
        "schema_drift_events": self._count_schema_changes(since=last_24h),
        "data_freshness_gaps": self._identify_stale_sources(max_age_hours=6)
    }
    
    return metrics

Common Pitfalls and Solutions

Problem: Circular dependencies between sources

  • Solution: Airbyte Agents detect cycles and raise errors at validation time. Define clear dependency DAGs during workspace setup.

Problem: One slow source blocks all others

  • Solution: Configure independent source groups with separate connection pools and timeout thresholds.

Problem: Cursor management across agent restarts

  • Solution: Store cursors in a persistent backend (Postgres/Redis) that the agent queries on startup.

Next Steps

Once your multi-source agent is running:

  1. Implement transformation logic using dbt models triggered post-sync
  2. Add alerting for schema drift or sync failures
  3. Scale to 10+ sources by implementing source groups
  4. Optimize sync schedules based on agent metrics

Airbyte Agents transform ETL from a collection of brittle connectors into an intelligent, self-managing system that understands your entire data landscape.

Recommended Tools

  • SupabaseOpen source Firebase alternative with Postgres
  • DigitalOceanCloud hosting built for developers — $200 free credit for new users