How to Connect Multiple Data Sources to Airbyte Agents with Context Management (2025)
How to Connect Multiple Data Sources to Airbyte Agents with Context Management (2025)
Building data pipelines that intelligently handle multiple source systems is challenging. Airbyte Agents introduce context-aware orchestration—but connecting disparate sources while maintaining execution context requires understanding how agent state flows across connectors.
This guide walks you through implementing multi-source Airbyte Agents with proper context management, addressing the specific pain point of data-dependent workflows where source A feeds into source B conditionally.
Understanding Airbyte Agents Context Architecture
Airbyte Agents extend traditional sync operations by maintaining execution context across multiple data source connections. Unlike standard connectors that operate independently, agents can:
- Preserve state between source connections
- Make routing decisions based on upstream data
- Handle complex dependency chains
- Access metadata from previous sync operations
The key difference from traditional Airbyte is that agents act as intelligent orchestrators rather than simple data movers.
Prerequisites and Setup
Before implementing multi-source context, ensure you have:
- Airbyte 0.40.0+ installed (agent support available from this version)
- At least two configured data sources (e.g., PostgreSQL + Salesforce, or Shopify + Google Analytics)
- Network connectivity between your Airbyte instance and all source systems
- Appropriate API credentials or connection strings for each source
# Verify Airbyte agent capability
curl http://localhost:8000/api/v1/health | grep -i agent
Step-by-Step Multi-Source Agent Configuration
Step 1: Define Your Data Sources
Start by creating individual source connections for each system. This is standard Airbyte workflow, but naming convention matters for context tracking:
sources:
source_crm:
type: salesforce
config:
client_id: ${SALESFORCE_CLIENT_ID}
client_secret: ${SALESFORCE_CLIENT_SECRET}
instance_url: https://your-instance.salesforce.com
metadata:
priority: 1
required_for: [source_orders]
source_orders:
type: postgresql
config:
host: orders-db.internal
database: production
username: ${DB_USER}
password: ${DB_PASSWORD}
metadata:
priority: 2
depends_on: [source_crm]
The metadata section is critical—it declares dependencies that the agent uses for intelligent scheduling.
Step 2: Create Agent Configuration with Context Persistence
Agent-specific configuration differs from standard connectors. You're defining how the agent manages state across sources:
{
"agent_config": {
"name": "multi_source_crm_order_agent",
"type": "context_aware_orchestrator",
"sources": ["source_crm", "source_orders"],
"context_management": {
"persistence_type": "memory",
"ttl_seconds": 3600,
"state_keys": [
"last_crm_sync_time",
"crm_record_count",
"extraction_filters"
]
},
"execution_strategy": "sequential_with_feedback",
"error_handling": {
"retry_policy": "exponential_backoff",
"max_retries": 3,
"propagate_failures": false
}
}
}
The context_management section controls how agent state persists between source syncs. Using memory persistence is suitable for workflows within a single execution; use postgres or redis for multi-hour workflows.
Step 3: Implement Conditional Logic Based on Source Data
This is where agents shine—using data from source_crm to filter source_orders:
# Agent transformation rule
from airbyte.agents import ContextAwareAgent, SourceContext
class CrmOrderAgent(ContextAwareAgent):
def execute(self, context: SourceContext):
# First sync: CRM data
crm_data = self.fetch_source('source_crm', {
'incremental': True,
'cursor_field': 'updated_at'
})
# Store CRM account IDs in context
account_ids = [record['AccountId'] for record in crm_data]
context.set('active_accounts', account_ids)
context.set('sync_timestamp', datetime.now().isoformat())
# Second sync: Orders filtered by active accounts from CRM
filter_condition = f"account_id IN ({','.join(account_ids)})"
order_data = self.fetch_source('source_orders', {
'incremental': True,
'filter': filter_condition,
'cursor_field': 'created_at'
})
return {
'crm_records': len(crm_data),
'order_records': len(order_data),
'execution_context': context.to_dict()
}
Step 4: Configure Destination Handling with Context
When writing to your destination, preserve the execution context:
{
"destination_config": {
"type": "postgres",
"write_strategy": "upsert",
"tables": {
"crm_accounts": {
"source": "source_crm",
"unique_key": "AccountId"
},
"orders_synced": {
"source": "source_orders",
"unique_key": "OrderId",
"context_fields": ["_agent_sync_time", "_agent_version"]
}
},
"metadata_table": "agent_execution_log",
"context_columns": true
}
}
Setting context_columns: true automatically adds agent context fields to every destination record.
Common Multi-Source Context Issues and Solutions
Issue: Context Loss Between Source Syncs
Problem: Agent context resets when transitioning between sources.
Solution: Explicitly persist context to shared storage:
context.persist_to_redis(
key=f"agent_execution_{execution_id}",
ttl=3600
)
Issue: Inconsistent Filtering Across Sources
Problem: CRM filter produces 1000 accounts, but order source only has 950 orders.
Solution: Implement validation checkpoints:
if len(account_ids) > len(order_records.get('account_ids', [])):
context.log_warning(
f"Account-Order mismatch: {len(account_ids)} accounts, "
f"{len(order_records)} order sources"
)
context.set('data_quality_issue', True)
Issue: Memory Bloat with Large Context
Problem: Context storage grows excessively with high-volume syncs.
Solution: Use selective context persistence:
context.set_transient('full_crm_records') # Discarded after sync
context.set_persistent('crm_hash', hashlib.md5( # Retained for audit
json.dumps(crm_data).encode()
).hexdigest())
Monitoring Agent Context Execution
Add observability to track context flow:
# Query agent execution logs
curl -X GET "http://localhost:8000/api/v1/agent_logs?agent=multi_source_crm_order_agent&limit=50"
# Example response:
{
"executions": [
{
"execution_id": "exec_abc123",
"status": "success",
"sources_synced": 2,
"context_keys_used": 3,
"duration_seconds": 127,
"records_processed": 1950
}
]
}
Best Practices for Multi-Source Agents
- Explicit dependency declaration: Always specify
depends_onmetadata to prevent race conditions - Context validation: Use schema validation on context values before dependent sources consume them
- Incremental state management: Store cursors in context to avoid full re-syncs
- Error boundaries: Wrap each source fetch in try-catch to prevent cascade failures
- Context cleanup: Implement TTL on temporary context fields to prevent memory leaks
Conclusion
Airbyte Agents with multi-source context management enable sophisticated data workflows that were previously impossible with standard connectors. The key is treating context as a first-class citizen in your pipeline—declaring dependencies explicitly, validating state transitions, and monitoring execution flow.
Start with two sources and build up complexity as you become comfortable with context persistence patterns.
Recommended Tools
- SupabaseOpen source Firebase alternative with Postgres
- DigitalOceanCloud hosting built for developers — $200 free credit for new users