Overview
Transformers are custom data processing functions that transform entities from one schema to another before indexing. They enable:
Data enrichment : Add computed fields or metadata
Schema mapping : Convert between entity definitions
Filtering : Remove sensitive or irrelevant data
Aggregation : Combine multiple entities into summaries
Transformers run after data extraction but before chunking and embedding.
Each transformer is defined in the database with:
Human-readable transformer name Example : "Enrich Support Tickets"
Optional description of what the transformer does Example : "Adds customer sentiment and priority scores to support tickets"
Python function name to invoke Example : "enrich_support_ticket"
Python module path where the function is defined Example : "airweave.platform.transformers.support"
input_entity_definition_ids
List of entity definition IDs this transformer accepts as input Example : ["abc-123-def"] (Support Ticket entity)
output_entity_definition_ids
List of entity definition IDs this transformer produces as output Example : ["xyz-789-ghi"] (Enriched Support Ticket entity)
JSON Schema defining configuration parameters for the transformer Example :{
"type" : "object" ,
"properties" : {
"sentiment_threshold" : {
"type" : "number" ,
"minimum" : 0 ,
"maximum" : 1 ,
"default" : 0.5
},
"priority_weights" : {
"type" : "object" ,
"properties" : {
"urgency" : { "type" : "number" },
"impact" : { "type" : "number" }
}
}
}
}
Organization that owns this transformer (null for system transformers)
Database Schema
CREATE TABLE transformer (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR NOT NULL ,
description VARCHAR ,
method_name VARCHAR NOT NULL ,
module_name VARCHAR NOT NULL ,
input_entity_definition_ids JSON NOT NULL , -- Array of UUIDs
output_entity_definition_ids JSON NOT NULL , -- Array of UUIDs
config_schema JSON NOT NULL , -- JSON Schema
organization_id UUID REFERENCES organization(id),
created_at TIMESTAMP DEFAULT NOW (),
modified_at TIMESTAMP DEFAULT NOW (),
created_by_email VARCHAR ,
modified_by_email VARCHAR
);
API Endpoints
Manage transformers via REST API:
Response :
[
{
"id" : "abc-123-def-456" ,
"name" : "Enrich Support Tickets" ,
"description" : "Adds sentiment and priority scores" ,
"method_name" : "enrich_support_ticket" ,
"module_name" : "airweave.platform.transformers.support" ,
"input_entity_definition_ids" : [ "entity-def-1" ],
"output_entity_definition_ids" : [ "entity-def-2" ],
"config_schema" : { ... },
"organization_id" : "org-123" ,
"created_by_email" : "admin@example.com" ,
"modified_by_email" : "admin@example.com"
}
]
POST /api/v1/transformers
Content-Type : application/json
{
"name" : "Code Comment Extractor" ,
"description" : "Extracts docstrings and inline comments from code" ,
"method_name" : "extract_code_comments" ,
"module_name" : "airweave.platform.transformers.code" ,
"input_entity_definition_ids" : [ "code-file-entity-id" ],
"output_entity_definition_ids" : [ "comment-entity-id" ],
"config_schema" : {
"type" : "object" ,
"properties" : {
"include_inline" : { "type" : "boolean" , "default" : true },
"min_length" : { "type" : "integer" , "default" : 10 }
}
}
}
Response : 201 Created with transformer object
PUT /api/v1/transformers/{transformer_id}
Content-Type : application/json
{
"name" : "Updated Transformer Name" ,
"description" : "Updated description" ,
...
}
Response : 200 OK with updated transformer object
Implementation Example
Create a transformer function:
Basic Transformer
Entity Mapping Transformer
Filtering Transformer
Aggregation Transformer
# File: backend/airweave/platform/transformers/support.py
from typing import Any, Dict, List
async def enrich_support_ticket (
entities : List[Dict[ str , Any]],
config : Dict[ str , Any]
) -> List[Dict[ str , Any]]:
"""Enrich support tickets with sentiment and priority.
Args:
entities: List of support ticket entities from input definition
config: Configuration from transformer config_schema
Returns:
List of enriched entities matching output definition
"""
enriched = []
for ticket in entities:
# Extract configuration
sentiment_threshold = config.get( "sentiment_threshold" , 0.5 )
priority_weights = config.get( "priority_weights" , {
"urgency" : 0.6 ,
"impact" : 0.4
})
# Analyze sentiment (simplified example)
text = ticket.get( "description" , "" )
sentiment_score = await analyze_sentiment(text)
# Calculate priority
urgency = ticket.get( "urgency" , 0 )
impact = ticket.get( "impact" , 0 )
priority = (
urgency * priority_weights[ "urgency" ] +
impact * priority_weights[ "impact" ]
)
# Create enriched entity
enriched_ticket = {
** ticket, # Original fields
"sentiment_score" : sentiment_score,
"sentiment_label" : "positive" if sentiment_score > sentiment_threshold else "negative" ,
"calculated_priority" : priority,
"priority_label" : "high" if priority > 0.7 else "medium" if priority > 0.4 else "low"
}
enriched.append(enriched_ticket)
return enriched
All transformer functions must follow this signature:
async def transformer_name (
entities : List[Dict[ str , Any]],
config : Dict[ str , Any]
) -> List[Dict[ str , Any]]:
"""Transformer docstring.
Args:
entities: Input entities matching input_entity_definition_ids
config: Configuration validated against config_schema
Returns:
Output entities matching output_entity_definition_ids
"""
pass
Requirements :
Must be async
Takes exactly 2 parameters: entities and config
Returns list of dictionaries (entities)
Can be in any module (specify via module_name)
Configuration Schema
Define transformer parameters using JSON Schema:
Simple Config
Complex Config
{
"type" : "object" ,
"properties" : {
"enabled" : {
"type" : "boolean" ,
"default" : true ,
"description" : "Enable/disable this transformer"
},
"threshold" : {
"type" : "number" ,
"minimum" : 0 ,
"maximum" : 1 ,
"default" : 0.5
}
},
"required" : [ "threshold" ]
}
Execution Pipeline
Transformers are executed during the sync pipeline:
Entity Extraction
Source connector extracts raw entities from API
Transformer Lookup
System looks up transformers configured for this entity definition
Execution
Transformers execute in configured order: for transformer in transformers:
entities = await invoke_transformer(
transformer.module_name,
transformer.method_name,
entities,
transformer_config
)
Schema Validation
Output entities validated against output entity definition schema
Continue Pipeline
Transformed entities proceed to chunking → embedding → indexing
Best Practices
Keep transformers focused
Use config for flexibility
Make transformers configurable via config_schema: # Instead of hardcoding
if sentiment_score > 0.5 : # ❌ Hardcoded
...
# Use config
threshold = config.get( "sentiment_threshold" , 0.5 ) # ✅ Configurable
if sentiment_score > threshold:
...
Don’t let single entity failures break entire batch: results = []
for entity in entities:
try :
transformed = await transform_entity(entity, config)
results.append(transformed)
except Exception as e:
logger.error( f "Failed to transform entity { entity.get( 'id' ) } : { e } " )
# Option 1: Skip entity
continue
# Option 2: Return original entity
# results.append(entity)
return results
Document input/output schemas
Optimize for batch processing
Process entities in batches when possible: # ❌ Inefficient: One API call per entity
for entity in entities:
sentiment = await api.analyze(entity[ "text" ])
# ✅ Efficient: Batch API call
texts = [e[ "text" ] for e in entities]
sentiments = await api.analyze_batch(texts)
for entity, sentiment in zip (entities, sentiments):
entity[ "sentiment" ] = sentiment
Use Cases
Data Enrichment
Schema Normalization
Data Cleaning
Aggregation
Add computed or external data: Examples :
Sentiment analysis on customer feedback
Geocoding addresses to lat/lng
Fetching stock prices for company mentions
Calculating metrics from raw data
Adding taxonomy/category labels
Map different source schemas to unified format: Examples :
Salesforce → Internal CRM schema
GitHub Issues → Jira tickets
Gmail → Unified email format
Multiple HR systems → Single employee schema
Remove or fix data quality issues: Examples :
Redacting PII (SSN, credit cards)
Normalizing phone numbers
Fixing malformed dates
Removing duplicate fields
Stripping HTML/markdown
Combine multiple entities: Examples :
Email threads from individual messages
Conversation summaries from chat messages
Project timelines from task updates
Customer journey from touchpoints
Troubleshooting
Transformer not executing
Symptom :ModuleNotFoundError: No module named 'airweave.platform.transformers.custom'
Solution : Ensure module exists at specified path:backend/airweave/platform/transformers/custom.py
Config validation failures
Symptom : Transformer fails with config errorsSolution : Validate config against schema before saving:from jsonschema import validate, ValidationError
try :
validate( instance = config, schema = transformer.config_schema)
except ValidationError as e:
print ( f "Config invalid: { e.message } " )
Symptom : Transformed entities fail validationSolution : Ensure output matches output_entity_definition_ids schema:# Check required fields are present
required_fields = [ "id" , "title" , "content" ]
for entity in output_entities:
for field in required_fields:
if field not in entity:
raise ValueError ( f "Missing required field: { field } " )
Next Steps
Entity Definitions Define input and output schemas
Chunking Configure chunking after transformation
Embeddings Set up embeddings for transformed entities
API Reference Complete API documentation