Skip to main content
Connectors are how Airweave pulls data from external systems, turns it into entities, and makes it searchable for AI agents. This guide walks you through building a complete connector from scratch.

What You’ll Build

A complete source connector consists of:
  1. Authentication schema - Define credentials needed to connect
  2. Configuration schema - Optional settings for the connector
  3. Entity schemas - Data structures for each entity type
  4. Source implementation - The connector logic that fetches and yields entities
Before starting, review the Contributing Overview to set up your development environment.

Prerequisites

  • Airweave development environment set up
  • API documentation for the service you’re integrating
  • Test account or credentials for the service
  • Basic understanding of Python async/await patterns

Step-by-Step Guide

1

Plan your connector

Before writing code, understand:Authentication type:
  • OAuth2 (browser flow, token-only, or with refresh)
  • API Key
  • Username/Password
  • Database connection
Data model:
  • What entities will you sync? (e.g., files, messages, issues)
  • What metadata is important? (title, author, timestamps)
  • Are there hierarchies? (workspaces → projects → tasks)
API constraints:
  • Rate limits
  • Pagination approach
  • File size limits
2

Create authentication schema

Define credentials in backend/airweave/platform/configs/auth.py:
# For OAuth sources, no auth config needed
# Just specify oauth_type in @source decorator
For OAuth sources, set auth_config_class=None in the @source decorator. OAuth tokens are handled automatically.
3

Create configuration schema

Define optional settings in backend/airweave/platform/configs/config.py:
class MyAppConfig(SourceConfig):
    """MyApp configuration schema."""

    workspace_id: str = Field(
        default="",
        title="Workspace ID",
        description="Specific workspace to sync. Leave empty for all workspaces."
    )

    include_archived: bool = Field(
        default=False,
        title="Include Archived",
        description="Whether to sync archived items"
    )

    file_extensions: list[str] = Field(
        default=[],
        title="File Extensions",
        description="File types to include (e.g., ['.pdf', '.docx']). Empty = all files."
    )

    @field_validator("file_extensions", mode="before")
    @classmethod
    def parse_file_extensions(cls, value):
        """Convert comma-separated string to list."""
        if isinstance(value, str):
            if not value.strip():
                return []
            return [ext.strip() for ext in value.split(",") if ext.strip()]
        return value
Common configuration patterns:
  • Workspace/project filters
  • Date ranges for incremental sync
  • Entity type selections
  • File type filters
4

Define entity schemas

Create entity definitions in backend/airweave/platform/entities/myapp.py:
from datetime import datetime
from typing import Optional, List
from pydantic import Field

from airweave.platform.entities._airweave_field import AirweaveField
from airweave.platform.entities._base import ChunkEntity, Breadcrumb

class MyAppDocumentEntity(ChunkEntity):
    """Schema for MyApp document entities.

    Reference: https://docs.myapp.com/api/documents
    """

    # Use AirweaveField to control embedding and indexing behavior
    document_id: str = AirweaveField(
        ...,
        description="Unique document ID",
        embeddable=False,  # Don't include in embeddings
        is_entity_id=True  # Use as entity_id
    )

    title: str = AirweaveField(
        ...,
        description="Document title",
        embeddable=True,   # Include in embeddings
        is_name=True       # Use as display name
    )

    content: str = AirweaveField(
        ...,
        description="Full document text",
        embeddable=True    # Primary searchable content
    )

    author: Optional[str] = AirweaveField(
        None,
        description="Document author",
        embeddable=True
    )

    tags: List[str] = AirweaveField(
        default_factory=list,
        description="Document tags",
        embeddable=True
    )

    created_time: datetime = AirweaveField(
        ...,
        description="When document was created",
        is_created_at=True  # Used for temporal relevance
    )

    updated_time: Optional[datetime] = AirweaveField(
        None,
        description="Last update time",
        is_updated_at=True  # Used for change detection
    )

    url: str = AirweaveField(
        ...,
        description="URL to view document",
        embeddable=False,
        unhashable=True  # Don't include in change detection hash
    )
Entity Base Classes:
# For searchable text content
# (documents, messages, pages)
from airweave.platform.entities._base import ChunkEntity

class MyTextEntity(ChunkEntity):
    # Must have 'content' field for searchable text
    content: str = AirweaveField(..., embeddable=True)
AirweaveField Parameters:
  • embeddable=True - Include in vector embeddings
  • is_entity_id=True - Use as unique identifier
  • is_name=True - Use as display name
  • is_created_at=True - Creation timestamp
  • is_updated_at=True - Last modified timestamp
  • unhashable=True - Exclude from change detection
5

Implement the source connector

Create your connector in backend/airweave/platform/sources/myapp.py:
from typing import Any, AsyncGenerator, Dict, Optional
import httpx

from airweave.platform.configs.auth import MyAppAuthConfig
from airweave.platform.configs.config import MyAppConfig
from airweave.platform.decorators import source
from airweave.platform.entities.myapp import MyAppDocumentEntity
from airweave.platform.sources._base import BaseSource
from airweave.schemas.source_connection import AuthenticationMethod

@source(
    name="MyApp",
    short_name="myapp",
    auth_methods=[AuthenticationMethod.DIRECT],
    auth_config_class=MyAppAuthConfig,
    config_class=MyAppConfig,
    labels=["Productivity", "Documents"],
    supports_continuous=False,
    rate_limit_level=None,
)
class MyAppSource(BaseSource):
    """MyApp source connector.

    Syncs documents and content from MyApp workspaces.
    """

    @classmethod
    async def create(
        cls,
        credentials: MyAppAuthConfig,
        config: Optional[Dict[str, Any]] = None
    ) -> "MyAppSource":
        """Create a new MyApp source instance.

        Args:
            credentials: Authentication credentials
            config: Optional configuration parameters

        Returns:
            Configured MyAppSource instance
        """
        instance = cls()
        instance.api_key = credentials.api_key
        instance.config = config or {}
        return instance

    async def validate(self) -> bool:
        """Validate the connection to MyApp.

        Returns:
            True if connection is valid
        """
        try:
            async with httpx.AsyncClient() as client:
                response = await client.get(
                    "https://api.myapp.com/v1/user",
                    headers={"Authorization": f"Bearer {self.api_key}"},
                    timeout=10.0
                )
                return response.status_code == 200
        except Exception as e:
            self.logger.error(f"Validation failed: {e}")
            return False

    async def generate_entities(self) -> AsyncGenerator[MyAppDocumentEntity, None]:
        """Generate document entities from MyApp.

        Yields:
            MyAppDocumentEntity objects
        """
        async with httpx.AsyncClient() as client:
            # Fetch all documents with pagination
            cursor = None
            while True:
                params = {"limit": 100}
                if cursor:
                    params["cursor"] = cursor

                response = await client.get(
                    "https://api.myapp.com/v1/documents",
                    headers={"Authorization": f"Bearer {self.api_key}"},
                    params=params,
                    timeout=30.0
                )
                response.raise_for_status()
                data = response.json()

                # Yield each document as an entity
                for doc in data.get("documents", []):
                    yield MyAppDocumentEntity(
                        document_id=doc["id"],
                        title=doc["title"],
                        content=doc["content"],
                        author=doc.get("author"),
                        tags=doc.get("tags", []),
                        created_time=doc["created_at"],
                        updated_time=doc.get("updated_at"),
                        url=doc["url"]
                    )

                # Check for more pages
                cursor = data.get("next_cursor")
                if not cursor:
                    break
Key Implementation Points:
  • Use @source decorator with metadata
  • Implement create() classmethod for initialization
  • Implement validate() to test connection
  • Implement generate_entities() as async generator
  • Handle pagination properly
  • Use structured logging with self.logger
6

Add OAuth support (if needed)

For OAuth sources, additional configuration is required:1. Update the @source decorator:
from airweave.schemas.source_connection import OAuthType

@source(
    name="MyApp",
    short_name="myapp",
    auth_methods=[
        AuthenticationMethod.OAUTH_BROWSER,
        AuthenticationMethod.OAUTH_TOKEN,
        AuthenticationMethod.AUTH_PROVIDER,
    ],
    oauth_type=OAuthType.WITH_REFRESH,  # or ACCESS_ONLY
    auth_config_class=None,  # No auth config for OAuth
    config_class=MyAppConfig,
    # ... rest of config
)
2. Add integration to dev.integrations.yaml:
# backend/airweave/platform/auth/yaml/dev.integrations.yaml
myapp:
  name: MyApp
  client_id: ${MYAPP_CLIENT_ID}
  client_secret: ${MYAPP_CLIENT_SECRET}
  scopes:
    - read:documents
    - read:workspaces
  authorization_url: https://myapp.com/oauth/authorize
  token_url: https://myapp.com/oauth/token
  supports_refresh: true
3. Update create() method:
@classmethod
async def create(
    cls,
    access_token: str,
    config: Optional[Dict[str, Any]] = None
) -> "MyAppSource":
    """Create from OAuth access token."""
    instance = cls()
    instance.access_token = access_token
    instance.config = config or {}
    return instance
4. Use token manager for API calls:
async def _get_with_auth(self, url: str) -> dict:
    """Make authenticated request with token refresh."""
    access_token = await self.get_access_token()
    if not access_token:
        raise ValueError("No access token available")

    async with httpx.AsyncClient() as client:
        response = await client.get(
            url,
            headers={"Authorization": f"Bearer {access_token}"},
            timeout=30.0
        )

        # Handle token expiration
        if response.status_code == 401 and self.token_manager:
            # Refresh and retry
            new_token = await self.token_manager.refresh_on_unauthorized()
            if new_token:
                response = await client.get(
                    url,
                    headers={"Authorization": f"Bearer {new_token}"},
                    timeout=30.0
                )

        response.raise_for_status()
        return response.json()
7

Handle rate limiting and retries

Add robust error handling:
from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type
)
from httpx import HTTPStatusError

@retry(
    stop=stop_after_attempt(5),
    wait=wait_exponential(multiplier=1, min=2, max=60),
    retry=retry_if_exception_type((HTTPStatusError, TimeoutException)),
    reraise=True
)
async def _fetch_with_retry(self, url: str, params: dict = None) -> dict:
    """Fetch data with exponential backoff retry."""
    async with httpx.AsyncClient() as client:
        response = await client.get(
            url,
            headers={"Authorization": f"Bearer {self.api_key}"},
            params=params,
            timeout=30.0
        )

        # Handle rate limiting
        if response.status_code == 429:
            retry_after = int(response.headers.get("Retry-After", 60))
            self.logger.warning(f"Rate limited, waiting {retry_after}s")
            await asyncio.sleep(retry_after)
            raise HTTPStatusError("Rate limited", request=response.request, response=response)

        response.raise_for_status()
        return response.json()
8

Test your connector

1. Manual testing in the UI:Start Airweave and create a test collection:
./start.sh
Navigate to http://localhost:3000, create a collection, and add your new source.2. Unit testing:Create tests in backend/tests/sources/test_myapp.py:
import pytest
from airweave.platform.sources.myapp import MyAppSource
from airweave.platform.configs.auth import MyAppAuthConfig

@pytest.mark.asyncio
async def test_myapp_source_creation():
    """Test MyApp source can be created."""
    credentials = MyAppAuthConfig(api_key="test-key")
    source = await MyAppSource.create(credentials)
    assert source.api_key == "test-key"

@pytest.mark.asyncio
async def test_myapp_generates_entities():
    """Test entity generation."""
    # Mock the API responses
    # Test entity generation
    # Verify entity structure
    pass
3. Integration testing with Monke:See the Monke README for end-to-end testing.

Advanced Patterns

Hierarchical Entities with Breadcrumbs

For nested structures (workspaces → projects → tasks):
from airweave.platform.entities._base import Breadcrumb

yield MyAppTaskEntity(
    task_id=task["id"],
    title=task["title"],
    content=task["description"],
    breadcrumbs=[
        Breadcrumb(
            entity_id=workspace["id"],
            name=workspace["name"],
            entity_type="MyAppWorkspaceEntity"
        ),
        Breadcrumb(
            entity_id=project["id"],
            name=project["name"],
            entity_type="MyAppProjectEntity"
        )
    ]
)

File Downloads

For sources that include files:
from airweave.platform.entities._base import FileEntity

class MyAppFileEntity(FileEntity):
    file_url: str = AirweaveField(..., description="Download URL")
    mime_type: str = AirweaveField(..., description="File MIME type")

async def generate_entities(self) -> AsyncGenerator[MyAppFileEntity, None]:
    for file in await self._fetch_files():
        # Download and process file
        file_content = await self.download_file(file["download_url"])

        yield MyAppFileEntity(
            file_id=file["id"],
            name=file["name"],
            file_url=file["download_url"],
            mime_type=file["mime_type"],
            file_content=file_content,  # FileEntity handles extraction
            created_time=file["created_at"]
        )

Access Control

For sources with entity-level permissions:
from airweave.platform.entities._base import AccessControl

@source(
    # ... other config
    supports_access_control=True,
)
class MyAppSource(BaseSource):
    async def generate_entities(self):
        for doc in await self._fetch_documents():
            # Get permissions from API
            permissions = await self._fetch_permissions(doc["id"])

            yield MyAppDocumentEntity(
                # ... other fields
                access=AccessControl(
                    viewers=[
                        f"user:{email}" for email in permissions["users"]
                    ] + [
                        f"group:{group_id}" for group_id in permissions["groups"]
                    ],
                    is_public=doc.get("is_public", False)
                )
            )

    async def generate_access_control_memberships(self):
        """Generate group membership relationships."""
        # Yield ("group:eng", "user:alice@company.com") tuples
        pass

Real-World Examples

Study these existing connectors for reference:

Slack

Federated search example
  • No sync, search at query time
  • OAuth with token refresh
  • Rate limiting with retry
sources/slack.py

Notion

Complex hierarchy example
  • Pages, databases, properties
  • Breadcrumb navigation
  • Lazy loading with semaphores
sources/notion.py

GitHub

File-based connector
  • Repository files and issues
  • Direct authentication
  • Branch filtering
sources/github.py

Asana

Task management example
  • Projects, tasks, attachments
  • OAuth integration
  • Workspace filtering
sources/asana.py

Submitting Your Connector

Once your connector is complete:
  1. Run all checks:
    ruff check .
    ruff format .
    pytest
    
  2. Test end-to-end in the Airweave UI
  3. Update documentation if needed
  4. Submit a pull request following the Contributing Guidelines
Include in your PR description:
  • Link to API documentation
  • Test account setup instructions (if needed)
  • Screenshots of successful sync
  • Any special considerations

Getting Help

Stuck? We’re here to help:

Discord Community

Get real-time help from maintainers and contributors

Example Connectors

Browse existing connectors for reference
Thank you for expanding Airweave’s capabilities!