Airweave is a context retrieval layer that connects to external data sources, syncs content, and makes it searchable through vector embeddings. This guide explains the architecture and how components work together.
High-Level Overview
┌─────────────────────────────────────────────────────────────────┐
│ External APIs │
│ (Slack, Notion, GitHub, Google Drive, Asana, ...) │
└────────────────────────┬────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ SOURCE CONNECTORS │
│ Extract entities from external systems via API │
│ • OAuth/API key authentication │
│ • Pagination & rate limiting │
│ • Convert to standardized entities │
└────────────────────────┬────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ SYNC ORCHESTRATOR │
│ Manages the data pipeline from source to destination │
│ • Temporal workflows for reliability │
│ • Progress tracking & error handling │
│ • Incremental & full sync modes │
└────────────────────────┬────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ ENTITY PIPELINE │
│ Transforms raw entities into searchable vectors │
│ 1. Hash computation (change detection) │
│ 2. Text chunking (for large documents) │
│ 3. Vector embedding (OpenAI, Voyage, etc.) │
│ 4. Metadata enrichment │
└────────────────────────┬────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ VECTOR STORE (Qdrant/Vespa) │
│ High-performance vector database for semantic search │
│ • Hybrid search (dense + sparse vectors) │
│ • Metadata filtering │
│ • Relevance scoring │
└────────────────────────┬────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ SEARCH API │
│ Query interface for AI agents │
│ • REST API │
│ • Python SDK │
│ • MCP (Model Context Protocol) server │
└─────────────────────────────────────────────────────────────────┘
Core Components
Backend Structure
The Airweave backend is organized into domain-driven modules:
backend/airweave/
├── platform/ # Source connectors & sync engine
│ ├── sources/ # External API integrations
│ ├── entities/ # Data schemas
│ ├── configs/ # Auth & config schemas
│ ├── sync/ # Sync orchestration
│ │ ├── orchestrator.py # Main sync coordinator
│ │ ├── entity_pipeline.py # Entity processing pipeline
│ │ ├── processors/ # Chunk, embed, dispatch
│ │ └── handlers/ # Destination writers
│ ├── destinations/ # Vector store adapters (Qdrant, Vespa)
│ ├── temporal/ # Workflow & activity definitions
│ └── storage/ # File storage (ARF format)
│
├── domains/ # Business logic domains
│ ├── source_connections/ # Connection management
│ ├── sync/ # Sync lifecycle
│ ├── search/ # Search orchestration
│ └── webhooks/ # Event publishing
│
├── adapters/ # Infrastructure adapters
│ ├── temporal/ # Temporal Cloud/OSS client
│ ├── storage/ # Azure Blob, S3, local filesystem
│ ├── webhooks/ # Svix integration
│ └── event_bus/ # In-memory event bus
│
├── api/ # REST API endpoints
│ └── v1/endpoints/ # FastAPI routes
│
├── core/ # Cross-cutting concerns
│ ├── config.py # Settings management
│ ├── logging.py # Structured logging
│ └── protocols/ # Interface definitions
│
├── models/ # Database models (SQLAlchemy)
├── schemas/ # API schemas (Pydantic)
└── crud/ # Database operations
Airweave is undergoing an architectural refactor to improve testability and modularity. The domains/ and adapters/ structure follows the new design, while some legacy code remains in core/. See backend/airweave/architecture-refactor.md for details.
Data Flow
Sync Process
When a sync is triggered, here’s what happens:
Sync initialization
Temporal workflow starts:
User triggers sync via API or schedule
SyncWorkflow starts in Temporal
Sync job created in database with PENDING status
Workflow validates connection and fetches credentials
Location: platform/temporal/workflows/sync.py
Source data extraction
Source connector generates entities: # Example: Notion connector
async def generate_entities ( self ) -> AsyncGenerator[BaseEntity, None ]:
# Fetch from Notion API
for page in await self ._fetch_pages():
yield NotionPageEntity(
entity_id = page[ "id" ],
title = page[ "title" ],
content = page[ "content" ],
created_at = page[ "created_time" ],
updated_at = page[ "last_edited_time" ]
)
Key aspects:
Async generator pattern for memory efficiency
Pagination handled by connector
Rate limiting with exponential backoff
OAuth token refresh if needed
Location: platform/sources/{source_name}.py
Entity processing pipeline
Each entity flows through the pipeline: Entity → Hash → Chunk → Embed → Dispatch
1. Hash Computation: # Compute hash for change detection
hash_value = compute_hash(entity)
if hash_value == stored_hash:
skip # No changes, skip processing
2. Chunking (for large documents): # Split large content into chunks
for chunk in chunk_text(entity.content, max_tokens = 512 ):
yield ChunkedEntity( ... )
3. Embedding: # Generate vector embeddings
embedding = await embedder.embed(entity.content)
entity.dense_embedding = embedding
4. Dispatch: # Write to vector store
await destination.bulk_insert([entity])
Location: platform/sync/entity_pipeline.py
Vector storage
Entities stored in Qdrant/Vespa: {
"id" : "doc-123" ,
"vector" : [ 0.123 , 0.456 , ... ], // 3072-dim dense embedding
"payload" : {
"entity_id" : "notion-page-abc" ,
"title" : "Product Roadmap Q1 2024" ,
"source_type" : "notion" ,
"created_at" : "2024-01-15T10:30:00Z" ,
"breadcrumbs" : [{ ... }],
"access_control" : { ... }
}
}
Hybrid search support:
Dense vectors for semantic similarity
Sparse vectors (BM25) for keyword matching
Metadata filters for scoping
Location: platform/destinations/qdrant.py
Sync completion
Workflow finalizes:
Update sync job status to COMPLETED
Record metrics (entities synced, errors, duration)
Publish webhook events for external systems
Update cursor for incremental sync (if supported)
Location: platform/temporal/activities/sync.py
Key Architectural Patterns
Source Connector Pattern
All source connectors follow a standard interface:
from airweave.platform.sources._base import BaseSource
from airweave.platform.decorators import source
@source (
name = "MyApp" ,
short_name = "myapp" ,
auth_methods = [AuthenticationMethod. OAUTH_BROWSER ],
oauth_type = OAuthType. WITH_REFRESH ,
config_class = MyAppConfig,
labels = [ "Productivity" ],
)
class MyAppSource ( BaseSource ):
"""MyApp source connector."""
@ classmethod
async def create ( cls , access_token : str , config : dict ) -> "MyAppSource" :
"""Factory method for creating source instance."""
instance = cls ()
instance.access_token = access_token
instance.config = config
return instance
async def validate ( self ) -> bool :
"""Test the connection."""
# Ping the API to validate credentials
return await self ._ping_api()
async def generate_entities ( self ) -> AsyncGenerator[BaseEntity, None ]:
"""Yield entities from the source."""
async for item in self ._fetch_items():
yield MyAppEntity( ... )
Key design decisions:
Async generators - Memory-efficient streaming of large datasets
Decorator-based registration - Automatic discovery and metadata
Factory pattern - Credentials injected at runtime
Base class helpers - OAuth token refresh, rate limiting, file downloads
Entity Schema Pattern
Entities use Pydantic for validation and AirweaveField for metadata:
from airweave.platform.entities._base import ChunkEntity
from airweave.platform.entities._airweave_field import AirweaveField
class MyAppDocumentEntity ( ChunkEntity ):
"""Schema for MyApp documents."""
document_id: str = AirweaveField(
... ,
description = "Unique document ID" ,
is_entity_id = True , # Used as primary identifier
embeddable = False # Don't include in embeddings
)
title: str = AirweaveField(
... ,
is_name = True , # Display name
embeddable = True # Include in search
)
content: str = AirweaveField(
... ,
embeddable = True # Main searchable content
)
created_at: datetime = AirweaveField(
... ,
is_created_at = True , # Used for temporal relevance
unhashable = True # Don't use for change detection
)
Field metadata controls:
Embedding : What gets vectorized for search
Hashing : What’s used for change detection
Display : How it appears in results
Indexing : What can be filtered on
Temporal Workflow Pattern
Sync reliability through Temporal workflows:
from temporalio import workflow
@workflow.defn
class SyncWorkflow :
"""Durable workflow for sync execution."""
@workflow.run
async def run ( self , sync_id : str , org_id : str , force_full : bool = False ):
# 1. Create sync job
sync_job_id = await workflow.execute_activity(
create_sync_job_activity,
args = [sync_id, org_id],
start_to_close_timeout = timedelta( seconds = 30 )
)
# 2. Execute sync (with retry & timeout)
await workflow.execute_activity(
run_sync_activity,
args = [sync_id, sync_job_id, org_id, force_full],
start_to_close_timeout = timedelta( hours = 2 ),
retry_policy = RetryPolicy(
initial_interval = timedelta( seconds = 1 ),
maximum_interval = timedelta( seconds = 60 ),
maximum_attempts = 3
)
)
Benefits:
Durability - Survives worker crashes and restarts
Retries - Automatic exponential backoff
Observability - Full execution history
Cancellation - Graceful sync cancellation
Event-Driven Architecture
Domains publish events; subscribers react independently:
# Domain publishes event
await event_bus.publish(
SyncLifecycleEvent(
event_type = SyncEventType. COMPLETED ,
sync_id = sync.id,
sync_job_id = job.id,
organization_id = org.id,
entities_synced = 1234 ,
duration_seconds = 45.2
)
)
# Subscribers handle side effects
class WebhookSubscriber :
async def handle ( self , event : SyncLifecycleEvent):
# Send webhook to external systems
await self .webhook_publisher.publish(event)
class AnalyticsSubscriber :
async def handle ( self , event : SyncLifecycleEvent):
# Track in PostHog
await self .analytics.track(event)
Decoupling benefits:
Sync engine doesn’t know about webhooks or analytics
Easy to add new subscribers without changing core code
Failures in subscribers don’t affect sync
Technology Stack
Backend
FastAPI - Modern async Python web framework
SQLAlchemy - ORM for PostgreSQL
Pydantic - Data validation and serialization
Temporal - Durable workflow orchestration
httpx - Async HTTP client for API calls
Tenacity - Retry logic with backoff
Data Storage
PostgreSQL - Primary relational database
Qdrant - Vector database for semantic search
Vespa (optional) - Alternative vector store
Redis - Real-time progress updates
Azure Blob / S3 - File storage (ARF format)
Infrastructure
Docker Compose - Local development stack
Temporal Cloud/OSS - Workflow execution
Azure Key Vault - Credential encryption
Svix - Webhook delivery
Search Architecture
When an AI agent queries Airweave:
Query: "What were the key decisions from last week's product meeting?"
│
▼
┌─────────────────────────┐
│ Query Processing │
│ • Parse query │
│ • Extract filters │
│ • Generate embedding │
└───────────┬─────────────┘
│
▼
┌─────────────────────────┐
│ Vector Search │
│ • Hybrid search │
│ • Metadata filtering │
│ • Relevance scoring │
└───────────┬─────────────┘
│
▼
┌─────────────────────────┐
│ Result Ranking │
│ • Temporal relevance │
│ • Source diversity │
│ • Reranking (optional) │
└───────────┬─────────────┘
│
▼
Results
Search features:
Hybrid search - Combines semantic similarity with keyword matching
Temporal relevance - Boost recent content
Access control - Filter by user permissions
Source filtering - Limit to specific connectors
Metadata filters - Custom field filtering
Scalability Considerations
Horizontal Scaling
API servers - Stateless FastAPI instances behind load balancer
Temporal workers - Scale by adding more worker processes
Vector stores - Qdrant and Vespa support clustering
Incremental sync - Only process changed entities (via cursor)
Lazy loading - Stream entities instead of loading all into memory
Batch processing - Group operations for efficiency
Caching - Redis for frequently accessed data
Connection pooling - Reuse HTTP connections
Rate Limiting
# Per-connection rate limiting
@source (
rate_limit_level = RateLimitLevel. CONNECTION , # or ORG, or None
)
class MyAppSource ( BaseSource ):
async def generate_entities ( self ):
async with self .rate_limiter.acquire():
# API call protected by rate limiter
await self ._fetch_data()
Security Architecture
Credential Storage
Encryption at rest - Azure Key Vault or AWS Secrets Manager
Scoped access - Per-organization credential isolation
OAuth token refresh - Automatic token renewal
Credential rotation - Support for key rotation
Access Control
Entity-level ACLs - Fine-grained permissions from source systems
Organization isolation - Multi-tenant data separation
API authentication - JWT tokens for API access
Webhook signatures - HMAC verification for webhooks
Monitoring & Observability
Logging
# Structured logging with context
self .logger.info(
"Fetching pages from Notion" ,
extra = {
"sync_id" : str (sync_id),
"organization_id" : str (org_id),
"page_count" : len (pages)
}
)
Metrics
Sync metrics - Duration, entities synced, errors
API metrics - Request rate, latency, errors
Search metrics - Query latency, result quality
Worker metrics - Queue depth, task completion
Tracing
Temporal execution history - Full workflow trace
Activity retries - Automatic retry logging
Error context - Stack traces with correlation IDs
Testing Strategy
Airweave uses a multi-layer testing approach:
Unit Tests
# Test pure logic without I/O
def test_hash_computation ():
entity = MyEntity( content = "test" )
hash1 = compute_hash(entity)
hash2 = compute_hash(entity)
assert hash1 == hash2
Integration Tests
# Test with fake adapters
async def test_sync_flow ( fake_source , fake_vector_store ):
await sync_orchestrator.run(
source = fake_source,
destination = fake_vector_store
)
assert fake_vector_store.entity_count == 10
End-to-End Tests (Monke)
# Test real connector with real API
./monke.sh github
Monke creates test data, syncs it through Airweave, and verifies vector search results. See the Monke README .
Future Directions
Airweave is actively evolving:
Ongoing Refactor
Domain-driven design - Cleaner separation of concerns
Dependency injection - Better testability
Protocol-based interfaces - Easier to swap implementations
See backend/airweave/architecture-refactor.md for the full plan.
Planned Features
Streaming sync - Real-time updates via webhooks
Multi-modal search - Images, audio, video
Advanced RAG - Query rewriting, multi-hop reasoning
Federated search expansion - More sources using query-time search
Contributing to Architecture
When adding new features:
Follow existing patterns - Study similar connectors
Use protocols - Define interfaces before implementations
Write tests first - TDD for complex logic
Document decisions - Update architecture docs
Add a Connector Build a new source integration
Contributing Guide Development workflow and standards
Additional Resources
Questions? Join our Discord community !