Skip to main content
Airweave is a context retrieval layer that connects to external data sources, syncs content, and makes it searchable through vector embeddings. This guide explains the architecture and how components work together.

High-Level Overview

┌─────────────────────────────────────────────────────────────────┐
│                         External APIs                            │
│  (Slack, Notion, GitHub, Google Drive, Asana, ...)              │
└────────────────────────┬────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────┐
│                    SOURCE CONNECTORS                             │
│  Extract entities from external systems via API                 │
│  • OAuth/API key authentication                                 │
│  • Pagination & rate limiting                                   │
│  • Convert to standardized entities                             │
└────────────────────────┬────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────┐
│                    SYNC ORCHESTRATOR                             │
│  Manages the data pipeline from source to destination          │
│  • Temporal workflows for reliability                           │
│  • Progress tracking & error handling                           │
│  • Incremental & full sync modes                                │
└────────────────────────┬────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────┐
│                    ENTITY PIPELINE                               │
│  Transforms raw entities into searchable vectors                │
│  1. Hash computation (change detection)                         │
│  2. Text chunking (for large documents)                         │
│  3. Vector embedding (OpenAI, Voyage, etc.)                     │
│  4. Metadata enrichment                                         │
└────────────────────────┬────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────┐
│                  VECTOR STORE (Qdrant/Vespa)                     │
│  High-performance vector database for semantic search          │
│  • Hybrid search (dense + sparse vectors)                       │
│  • Metadata filtering                                           │
│  • Relevance scoring                                            │
└────────────────────────┬────────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────┐
│                     SEARCH API                                   │
│  Query interface for AI agents                                  │
│  • REST API                                                     │
│  • Python SDK                                                   │
│  • MCP (Model Context Protocol) server                          │
└─────────────────────────────────────────────────────────────────┘

Core Components

Backend Structure

The Airweave backend is organized into domain-driven modules:
backend/airweave/
├── platform/              # Source connectors & sync engine
│   ├── sources/           # External API integrations
│   ├── entities/          # Data schemas
│   ├── configs/           # Auth & config schemas
│   ├── sync/              # Sync orchestration
│   │   ├── orchestrator.py      # Main sync coordinator
│   │   ├── entity_pipeline.py   # Entity processing pipeline
│   │   ├── processors/          # Chunk, embed, dispatch
│   │   └── handlers/            # Destination writers
│   ├── destinations/      # Vector store adapters (Qdrant, Vespa)
│   ├── temporal/          # Workflow & activity definitions
│   └── storage/           # File storage (ARF format)

├── domains/               # Business logic domains
│   ├── source_connections/  # Connection management
│   ├── sync/                # Sync lifecycle
│   ├── search/              # Search orchestration
│   └── webhooks/            # Event publishing

├── adapters/              # Infrastructure adapters
│   ├── temporal/          # Temporal Cloud/OSS client
│   ├── storage/           # Azure Blob, S3, local filesystem
│   ├── webhooks/          # Svix integration
│   └── event_bus/         # In-memory event bus

├── api/                   # REST API endpoints
│   └── v1/endpoints/      # FastAPI routes

├── core/                  # Cross-cutting concerns
│   ├── config.py          # Settings management
│   ├── logging.py         # Structured logging
│   └── protocols/         # Interface definitions

├── models/                # Database models (SQLAlchemy)
├── schemas/               # API schemas (Pydantic)
└── crud/                  # Database operations
Airweave is undergoing an architectural refactor to improve testability and modularity. The domains/ and adapters/ structure follows the new design, while some legacy code remains in core/. See backend/airweave/architecture-refactor.md for details.

Data Flow

Sync Process

When a sync is triggered, here’s what happens:
1

Sync initialization

Temporal workflow starts:
  1. User triggers sync via API or schedule
  2. SyncWorkflow starts in Temporal
  3. Sync job created in database with PENDING status
  4. Workflow validates connection and fetches credentials
Location: platform/temporal/workflows/sync.py
2

Source data extraction

Source connector generates entities:
# Example: Notion connector
async def generate_entities(self) -> AsyncGenerator[BaseEntity, None]:
    # Fetch from Notion API
    for page in await self._fetch_pages():
        yield NotionPageEntity(
            entity_id=page["id"],
            title=page["title"],
            content=page["content"],
            created_at=page["created_time"],
            updated_at=page["last_edited_time"]
        )
Key aspects:
  • Async generator pattern for memory efficiency
  • Pagination handled by connector
  • Rate limiting with exponential backoff
  • OAuth token refresh if needed
Location: platform/sources/{source_name}.py
3

Entity processing pipeline

Each entity flows through the pipeline:
Entity → Hash → Chunk → Embed → Dispatch
1. Hash Computation:
# Compute hash for change detection
hash_value = compute_hash(entity)
if hash_value == stored_hash:
    skip  # No changes, skip processing
2. Chunking (for large documents):
# Split large content into chunks
for chunk in chunk_text(entity.content, max_tokens=512):
    yield ChunkedEntity(...)
3. Embedding:
# Generate vector embeddings
embedding = await embedder.embed(entity.content)
entity.dense_embedding = embedding
4. Dispatch:
# Write to vector store
await destination.bulk_insert([entity])
Location: platform/sync/entity_pipeline.py
4

Vector storage

Entities stored in Qdrant/Vespa:
{
  "id": "doc-123",
  "vector": [0.123, 0.456, ...],  // 3072-dim dense embedding
  "payload": {
    "entity_id": "notion-page-abc",
    "title": "Product Roadmap Q1 2024",
    "source_type": "notion",
    "created_at": "2024-01-15T10:30:00Z",
    "breadcrumbs": [{...}],
    "access_control": {...}
  }
}
Hybrid search support:
  • Dense vectors for semantic similarity
  • Sparse vectors (BM25) for keyword matching
  • Metadata filters for scoping
Location: platform/destinations/qdrant.py
5

Sync completion

Workflow finalizes:
  1. Update sync job status to COMPLETED
  2. Record metrics (entities synced, errors, duration)
  3. Publish webhook events for external systems
  4. Update cursor for incremental sync (if supported)
Location: platform/temporal/activities/sync.py

Key Architectural Patterns

Source Connector Pattern

All source connectors follow a standard interface:
from airweave.platform.sources._base import BaseSource
from airweave.platform.decorators import source

@source(
    name="MyApp",
    short_name="myapp",
    auth_methods=[AuthenticationMethod.OAUTH_BROWSER],
    oauth_type=OAuthType.WITH_REFRESH,
    config_class=MyAppConfig,
    labels=["Productivity"],
)
class MyAppSource(BaseSource):
    """MyApp source connector."""

    @classmethod
    async def create(cls, access_token: str, config: dict) -> "MyAppSource":
        """Factory method for creating source instance."""
        instance = cls()
        instance.access_token = access_token
        instance.config = config
        return instance

    async def validate(self) -> bool:
        """Test the connection."""
        # Ping the API to validate credentials
        return await self._ping_api()

    async def generate_entities(self) -> AsyncGenerator[BaseEntity, None]:
        """Yield entities from the source."""
        async for item in self._fetch_items():
            yield MyAppEntity(...)
Key design decisions:
  • Async generators - Memory-efficient streaming of large datasets
  • Decorator-based registration - Automatic discovery and metadata
  • Factory pattern - Credentials injected at runtime
  • Base class helpers - OAuth token refresh, rate limiting, file downloads

Entity Schema Pattern

Entities use Pydantic for validation and AirweaveField for metadata:
from airweave.platform.entities._base import ChunkEntity
from airweave.platform.entities._airweave_field import AirweaveField

class MyAppDocumentEntity(ChunkEntity):
    """Schema for MyApp documents."""

    document_id: str = AirweaveField(
        ...,
        description="Unique document ID",
        is_entity_id=True,    # Used as primary identifier
        embeddable=False       # Don't include in embeddings
    )

    title: str = AirweaveField(
        ...,
        is_name=True,         # Display name
        embeddable=True       # Include in search
    )

    content: str = AirweaveField(
        ...,
        embeddable=True       # Main searchable content
    )

    created_at: datetime = AirweaveField(
        ...,
        is_created_at=True,   # Used for temporal relevance
        unhashable=True       # Don't use for change detection
    )
Field metadata controls:
  • Embedding: What gets vectorized for search
  • Hashing: What’s used for change detection
  • Display: How it appears in results
  • Indexing: What can be filtered on

Temporal Workflow Pattern

Sync reliability through Temporal workflows:
from temporalio import workflow

@workflow.defn
class SyncWorkflow:
    """Durable workflow for sync execution."""

    @workflow.run
    async def run(self, sync_id: str, org_id: str, force_full: bool = False):
        # 1. Create sync job
        sync_job_id = await workflow.execute_activity(
            create_sync_job_activity,
            args=[sync_id, org_id],
            start_to_close_timeout=timedelta(seconds=30)
        )

        # 2. Execute sync (with retry & timeout)
        await workflow.execute_activity(
            run_sync_activity,
            args=[sync_id, sync_job_id, org_id, force_full],
            start_to_close_timeout=timedelta(hours=2),
            retry_policy=RetryPolicy(
                initial_interval=timedelta(seconds=1),
                maximum_interval=timedelta(seconds=60),
                maximum_attempts=3
            )
        )
Benefits:
  • Durability - Survives worker crashes and restarts
  • Retries - Automatic exponential backoff
  • Observability - Full execution history
  • Cancellation - Graceful sync cancellation

Event-Driven Architecture

Domains publish events; subscribers react independently:
# Domain publishes event
await event_bus.publish(
    SyncLifecycleEvent(
        event_type=SyncEventType.COMPLETED,
        sync_id=sync.id,
        sync_job_id=job.id,
        organization_id=org.id,
        entities_synced=1234,
        duration_seconds=45.2
    )
)

# Subscribers handle side effects
class WebhookSubscriber:
    async def handle(self, event: SyncLifecycleEvent):
        # Send webhook to external systems
        await self.webhook_publisher.publish(event)

class AnalyticsSubscriber:
    async def handle(self, event: SyncLifecycleEvent):
        # Track in PostHog
        await self.analytics.track(event)
Decoupling benefits:
  • Sync engine doesn’t know about webhooks or analytics
  • Easy to add new subscribers without changing core code
  • Failures in subscribers don’t affect sync

Technology Stack

Backend

  • FastAPI - Modern async Python web framework
  • SQLAlchemy - ORM for PostgreSQL
  • Pydantic - Data validation and serialization
  • Temporal - Durable workflow orchestration
  • httpx - Async HTTP client for API calls
  • Tenacity - Retry logic with backoff

Data Storage

  • PostgreSQL - Primary relational database
  • Qdrant - Vector database for semantic search
  • Vespa (optional) - Alternative vector store
  • Redis - Real-time progress updates
  • Azure Blob / S3 - File storage (ARF format)

Infrastructure

  • Docker Compose - Local development stack
  • Temporal Cloud/OSS - Workflow execution
  • Azure Key Vault - Credential encryption
  • Svix - Webhook delivery

Search Architecture

When an AI agent queries Airweave:
Query: "What were the key decisions from last week's product meeting?"


┌─────────────────────────┐
│  Query Processing       │
│  • Parse query          │
│  • Extract filters      │
│  • Generate embedding   │
└───────────┬─────────────┘


┌─────────────────────────┐
│  Vector Search          │
│  • Hybrid search        │
│  • Metadata filtering   │
│  • Relevance scoring    │
└───────────┬─────────────┘


┌─────────────────────────┐
│  Result Ranking         │
│  • Temporal relevance   │
│  • Source diversity     │
│  • Reranking (optional) │
└───────────┬─────────────┘


       Results
Search features:
  • Hybrid search - Combines semantic similarity with keyword matching
  • Temporal relevance - Boost recent content
  • Access control - Filter by user permissions
  • Source filtering - Limit to specific connectors
  • Metadata filters - Custom field filtering

Scalability Considerations

Horizontal Scaling

  • API servers - Stateless FastAPI instances behind load balancer
  • Temporal workers - Scale by adding more worker processes
  • Vector stores - Qdrant and Vespa support clustering

Performance Optimizations

  • Incremental sync - Only process changed entities (via cursor)
  • Lazy loading - Stream entities instead of loading all into memory
  • Batch processing - Group operations for efficiency
  • Caching - Redis for frequently accessed data
  • Connection pooling - Reuse HTTP connections

Rate Limiting

# Per-connection rate limiting
@source(
    rate_limit_level=RateLimitLevel.CONNECTION,  # or ORG, or None
)
class MyAppSource(BaseSource):
    async def generate_entities(self):
        async with self.rate_limiter.acquire():
            # API call protected by rate limiter
            await self._fetch_data()

Security Architecture

Credential Storage

  • Encryption at rest - Azure Key Vault or AWS Secrets Manager
  • Scoped access - Per-organization credential isolation
  • OAuth token refresh - Automatic token renewal
  • Credential rotation - Support for key rotation

Access Control

  • Entity-level ACLs - Fine-grained permissions from source systems
  • Organization isolation - Multi-tenant data separation
  • API authentication - JWT tokens for API access
  • Webhook signatures - HMAC verification for webhooks

Monitoring & Observability

Logging

# Structured logging with context
self.logger.info(
    "Fetching pages from Notion",
    extra={
        "sync_id": str(sync_id),
        "organization_id": str(org_id),
        "page_count": len(pages)
    }
)

Metrics

  • Sync metrics - Duration, entities synced, errors
  • API metrics - Request rate, latency, errors
  • Search metrics - Query latency, result quality
  • Worker metrics - Queue depth, task completion

Tracing

  • Temporal execution history - Full workflow trace
  • Activity retries - Automatic retry logging
  • Error context - Stack traces with correlation IDs

Testing Strategy

Airweave uses a multi-layer testing approach:

Unit Tests

# Test pure logic without I/O
def test_hash_computation():
    entity = MyEntity(content="test")
    hash1 = compute_hash(entity)
    hash2 = compute_hash(entity)
    assert hash1 == hash2

Integration Tests

# Test with fake adapters
async def test_sync_flow(fake_source, fake_vector_store):
    await sync_orchestrator.run(
        source=fake_source,
        destination=fake_vector_store
    )
    assert fake_vector_store.entity_count == 10

End-to-End Tests (Monke)

# Test real connector with real API
./monke.sh github
Monke creates test data, syncs it through Airweave, and verifies vector search results. See the Monke README.

Future Directions

Airweave is actively evolving:

Ongoing Refactor

  • Domain-driven design - Cleaner separation of concerns
  • Dependency injection - Better testability
  • Protocol-based interfaces - Easier to swap implementations
See backend/airweave/architecture-refactor.md for the full plan.

Planned Features

  • Streaming sync - Real-time updates via webhooks
  • Multi-modal search - Images, audio, video
  • Advanced RAG - Query rewriting, multi-hop reasoning
  • Federated search expansion - More sources using query-time search

Contributing to Architecture

When adding new features:
  1. Follow existing patterns - Study similar connectors
  2. Use protocols - Define interfaces before implementations
  3. Write tests first - TDD for complex logic
  4. Document decisions - Update architecture docs

Add a Connector

Build a new source integration

Contributing Guide

Development workflow and standards

Additional Resources

Questions? Join our Discord community!