Skip to main content

Overview

Transformers are custom data processing functions that transform entities from one schema to another before indexing. They enable:
  • Data enrichment: Add computed fields or metadata
  • Schema mapping: Convert between entity definitions
  • Filtering: Remove sensitive or irrelevant data
  • Aggregation: Combine multiple entities into summaries
Transformers run after data extraction but before chunking and embedding.

Transformer Model

Each transformer is defined in the database with:
name
string
required
Human-readable transformer nameExample: "Enrich Support Tickets"
description
string
Optional description of what the transformer doesExample: "Adds customer sentiment and priority scores to support tickets"
method_name
string
required
Python function name to invokeExample: "enrich_support_ticket"
module_name
string
required
Python module path where the function is definedExample: "airweave.platform.transformers.support"
input_entity_definition_ids
list[UUID]
required
List of entity definition IDs this transformer accepts as inputExample: ["abc-123-def"] (Support Ticket entity)
output_entity_definition_ids
list[UUID]
required
List of entity definition IDs this transformer produces as outputExample: ["xyz-789-ghi"] (Enriched Support Ticket entity)
config_schema
JSONSchema
required
JSON Schema defining configuration parameters for the transformerExample:
{
  "type": "object",
  "properties": {
    "sentiment_threshold": {
      "type": "number",
      "minimum": 0,
      "maximum": 1,
      "default": 0.5
    },
    "priority_weights": {
      "type": "object",
      "properties": {
        "urgency": {"type": "number"},
        "impact": {"type": "number"}
      }
    }
  }
}
organization_id
UUID
Organization that owns this transformer (null for system transformers)

Database Schema

CREATE TABLE transformer (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    name VARCHAR NOT NULL,
    description VARCHAR,
    method_name VARCHAR NOT NULL,
    module_name VARCHAR NOT NULL,
    input_entity_definition_ids JSON NOT NULL,   -- Array of UUIDs
    output_entity_definition_ids JSON NOT NULL,  -- Array of UUIDs
    config_schema JSON NOT NULL,                 -- JSON Schema
    organization_id UUID REFERENCES organization(id),
    created_at TIMESTAMP DEFAULT NOW(),
    modified_at TIMESTAMP DEFAULT NOW(),
    created_by_email VARCHAR,
    modified_by_email VARCHAR
);

API Endpoints

Manage transformers via REST API:

List Transformers

GET /api/v1/transformers
Response:
[
  {
    "id": "abc-123-def-456",
    "name": "Enrich Support Tickets",
    "description": "Adds sentiment and priority scores",
    "method_name": "enrich_support_ticket",
    "module_name": "airweave.platform.transformers.support",
    "input_entity_definition_ids": ["entity-def-1"],
    "output_entity_definition_ids": ["entity-def-2"],
    "config_schema": {...},
    "organization_id": "org-123",
    "created_by_email": "admin@example.com",
    "modified_by_email": "admin@example.com"
  }
]

Create Transformer

POST /api/v1/transformers
Content-Type: application/json

{
  "name": "Code Comment Extractor",
  "description": "Extracts docstrings and inline comments from code",
  "method_name": "extract_code_comments",
  "module_name": "airweave.platform.transformers.code",
  "input_entity_definition_ids": ["code-file-entity-id"],
  "output_entity_definition_ids": ["comment-entity-id"],
  "config_schema": {
    "type": "object",
    "properties": {
      "include_inline": {"type": "boolean", "default": true},
      "min_length": {"type": "integer", "default": 10}
    }
  }
}
Response: 201 Created with transformer object

Update Transformer

PUT /api/v1/transformers/{transformer_id}
Content-Type: application/json

{
  "name": "Updated Transformer Name",
  "description": "Updated description",
  ...
}
Response: 200 OK with updated transformer object

Implementation Example

Create a transformer function:
# File: backend/airweave/platform/transformers/support.py

from typing import Any, Dict, List

async def enrich_support_ticket(
    entities: List[Dict[str, Any]],
    config: Dict[str, Any]
) -> List[Dict[str, Any]]:
    """Enrich support tickets with sentiment and priority.
    
    Args:
        entities: List of support ticket entities from input definition
        config: Configuration from transformer config_schema
    
    Returns:
        List of enriched entities matching output definition
    """
    enriched = []
    
    for ticket in entities:
        # Extract configuration
        sentiment_threshold = config.get("sentiment_threshold", 0.5)
        priority_weights = config.get("priority_weights", {
            "urgency": 0.6,
            "impact": 0.4
        })
        
        # Analyze sentiment (simplified example)
        text = ticket.get("description", "")
        sentiment_score = await analyze_sentiment(text)
        
        # Calculate priority
        urgency = ticket.get("urgency", 0)
        impact = ticket.get("impact", 0)
        priority = (
            urgency * priority_weights["urgency"] + 
            impact * priority_weights["impact"]
        )
        
        # Create enriched entity
        enriched_ticket = {
            **ticket,  # Original fields
            "sentiment_score": sentiment_score,
            "sentiment_label": "positive" if sentiment_score > sentiment_threshold else "negative",
            "calculated_priority": priority,
            "priority_label": "high" if priority > 0.7 else "medium" if priority > 0.4 else "low"
        }
        
        enriched.append(enriched_ticket)
    
    return enriched

Transformer Function Signature

All transformer functions must follow this signature:
async def transformer_name(
    entities: List[Dict[str, Any]],
    config: Dict[str, Any]
) -> List[Dict[str, Any]]:
    """Transformer docstring.
    
    Args:
        entities: Input entities matching input_entity_definition_ids
        config: Configuration validated against config_schema
    
    Returns:
        Output entities matching output_entity_definition_ids
    """
    pass
Requirements:
  • Must be async
  • Takes exactly 2 parameters: entities and config
  • Returns list of dictionaries (entities)
  • Can be in any module (specify via module_name)

Configuration Schema

Define transformer parameters using JSON Schema:
{
  "type": "object",
  "properties": {
    "enabled": {
      "type": "boolean",
      "default": true,
      "description": "Enable/disable this transformer"
    },
    "threshold": {
      "type": "number",
      "minimum": 0,
      "maximum": 1,
      "default": 0.5
    }
  },
  "required": ["threshold"]
}

Execution Pipeline

Transformers are executed during the sync pipeline:
1

Entity Extraction

Source connector extracts raw entities from API
2

Transformer Lookup

System looks up transformers configured for this entity definition
3

Execution

Transformers execute in configured order:
for transformer in transformers:
    entities = await invoke_transformer(
        transformer.module_name,
        transformer.method_name,
        entities,
        transformer_config
    )
4

Schema Validation

Output entities validated against output entity definition schema
5

Continue Pipeline

Transformed entities proceed to chunking → embedding → indexing

Best Practices

Each transformer should do one thing well:Good:
  • extract_code_comments - Single purpose
  • calculate_priority - Specific calculation
  • redact_pii - Clear responsibility
Avoid:
  • process_everything - Too broad
  • enrich_and_filter_and_map - Multiple concerns
Make transformers configurable via config_schema:
# Instead of hardcoding
if sentiment_score > 0.5:  # ❌ Hardcoded
    ...

# Use config
threshold = config.get("sentiment_threshold", 0.5)  # ✅ Configurable
if sentiment_score > threshold:
    ...
Don’t let single entity failures break entire batch:
results = []

for entity in entities:
    try:
        transformed = await transform_entity(entity, config)
        results.append(transformed)
    except Exception as e:
        logger.error(f"Failed to transform entity {entity.get('id')}: {e}")
        # Option 1: Skip entity
        continue
        # Option 2: Return original entity
        # results.append(entity)

return results
Clearly document expected entity structure:
async def my_transformer(
    entities: List[Dict[str, Any]],
    config: Dict[str, Any]
) -> List[Dict[str, Any]]:
    """Transform support tickets.
    
    Input schema (from Zendesk):
    {
        "id": "123",
        "subject": "...",
        "description": "...",
        "priority": "high"|"medium"|"low",
        "status": "open"|"closed"
    }
    
    Output schema (enriched):
    {
        ... (all input fields) ...
        "sentiment_score": 0.0-1.0,
        "urgency_level": 1-5,
        "estimated_resolution_time": "2h"|"1d"|"1w"
    }
    
    Config schema:
    {
        "sentiment_model": "basic"|"advanced",
        "urgency_weights": {"priority": 0.6, "age": 0.4}
    }
    """
Process entities in batches when possible:
# ❌ Inefficient: One API call per entity
for entity in entities:
    sentiment = await api.analyze(entity["text"])

# ✅ Efficient: Batch API call
texts = [e["text"] for e in entities]
sentiments = await api.analyze_batch(texts)

for entity, sentiment in zip(entities, sentiments):
    entity["sentiment"] = sentiment

Use Cases

Add computed or external data:Examples:
  • Sentiment analysis on customer feedback
  • Geocoding addresses to lat/lng
  • Fetching stock prices for company mentions
  • Calculating metrics from raw data
  • Adding taxonomy/category labels

Troubleshooting

Check:
  1. Transformer registered in database
  2. input_entity_definition_ids matches source entities
  3. module_name and method_name are correct
  4. Function signature matches protocol
Debug:
# Add logging to transformer
logger.info(f"Transformer {transformer.name} executing on {len(entities)} entities")
Symptom:
ModuleNotFoundError: No module named 'airweave.platform.transformers.custom'
Solution: Ensure module exists at specified path:
backend/airweave/platform/transformers/custom.py
Symptom: Transformer fails with config errorsSolution: Validate config against schema before saving:
from jsonschema import validate, ValidationError

try:
    validate(instance=config, schema=transformer.config_schema)
except ValidationError as e:
    print(f"Config invalid: {e.message}")
Symptom: Transformed entities fail validationSolution: Ensure output matches output_entity_definition_ids schema:
# Check required fields are present
required_fields = ["id", "title", "content"]
for entity in output_entities:
    for field in required_fields:
        if field not in entity:
            raise ValueError(f"Missing required field: {field}")

Next Steps

Entity Definitions

Define input and output schemas

Chunking

Configure chunking after transformation

Embeddings

Set up embeddings for transformed entities

API Reference

Complete API documentation