Overview
Airweave’s rate limiting system prevents exhausting customer API quotas by enforcing configurable limits on external source API calls. It uses Redis-based distributed rate limiting with atomic Lua scripts to handle concurrent requests across horizontally scaled instances.
Rate limiting is opt-in via the SOURCE_RATE_LIMITING feature flag and is configured per organization .
Rate Limit Levels
Each source defines its rate limiting strategy in the source table via the rate_limit_level field:
Organization-Level
Connection-Level
Not Supported
Level : orgScope : All users/connections in the organization share the same limitUse case : Sources with org-wide API quotasExamples :
Google Drive (quota per Google Workspace org)
GitHub (quota per GitHub org)
Salesforce (quota per Salesforce instance)
Redis key format :source_rate_limit:{org_id}:google_drive:org:org
Level : connectionScope : Each user connection has its own separate limitUse case : Sources with per-user API quotasExamples :
Notion (quota per Notion workspace/user)
Slack (quota per user token)
Personal API integrations
Redis key format :source_rate_limit:{org_id}:notion:connection:{connection_id}
Level : nullBehavior : No rate limiting appliedUse case : Sources without meaningful rate limits or where limiting isn’t neededExamples :
File uploads (local processing)
Database connectors (internal network)
Sources with extremely high limits
Configuration Model
Rate limits are configured in the source_rate_limit table:
CREATE TABLE source_rate_limit (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
organization_id UUID NOT NULL REFERENCES organization(id),
source_short_name VARCHAR NOT NULL , -- e.g., "google_drive", "notion"
limit INTEGER NOT NULL , -- Max requests per window
window_seconds INTEGER NOT NULL DEFAULT 60 , -- Time window in seconds
created_at TIMESTAMP DEFAULT NOW (),
modified_at TIMESTAMP DEFAULT NOW (),
UNIQUE (organization_id, source_short_name)
);
Source identifier from the source table Examples : "google_drive", "notion", "github"Special case : "pipedream_proxy" for Pipedream OAuth proxy limits
Maximum number of requests allowed within the time window Must be : > 0Examples :
100 requests per window
1000 requests per window
Time window duration in seconds Common values :
60 = 1 minute
300 = 5 minutes
3600 = 1 hour
86400 = 1 day
Default : 60 seconds (1 minute)
API Endpoints
List Source Rate Limits
GET /api/v1/source-rate-limits
Returns : All sources with their rate limit configurations
Response :
[
{
"source_short_name" : "pipedream_proxy" ,
"rate_limit_level" : "org" ,
"limit" : 1000 ,
"window_seconds" : 300 ,
"id" : "abc-123-def-456"
},
{
"source_short_name" : "google_drive" ,
"rate_limit_level" : "org" ,
"limit" : 100 ,
"window_seconds" : 60 ,
"id" : "xyz-789-ghi-012"
},
{
"source_short_name" : "notion" ,
"rate_limit_level" : "connection" ,
"limit" : 50 ,
"window_seconds" : 60 ,
"id" : null // Not configured yet
},
{
"source_short_name" : "file_upload" ,
"rate_limit_level" : null ,
"limit" : null ,
"window_seconds" : null ,
"id" : null
}
]
Notes :
Pipedream proxy always appears first
Sources sorted by rate_limit_level (supported first)
id=null means no custom limit configured (uses defaults or unlimited)
Set Rate Limit
PUT /api/v1/source-rate-limits/{source_short_name}
Content-Type : application/json
{
"limit" : 100 ,
"window_seconds" : 60
}
Creates new limit if doesn’t exist, updates if it does.
Example :
curl -X PUT http://localhost:8001/api/v1/source-rate-limits/google_drive \
-H "Content-Type: application/json" \
-d '{
"limit": 100,
"window_seconds": 60
}'
Response : 200 OK
{
"id" : "abc-123-def-456" ,
"organization_id" : "org-789" ,
"source_short_name" : "google_drive" ,
"limit" : 100 ,
"window_seconds" : 60 ,
"created_at" : "2024-03-02T12:00:00Z" ,
"modified_at" : "2024-03-02T12:00:00Z"
}
Delete Rate Limit
DELETE /api/v1/source-rate-limits/{source_short_name}
Removes rate limit configuration (reverts to unlimited or default)
Response : 204 No Content
Sliding Window Algorithm
Airweave uses a Redis sorted set sliding window algorithm for accurate, distributed rate limiting:
How It Works
Request arrives
Backend calls check_and_increment(org_id, source_short_name, connection_id)
Lookup configuration
Fetches rate limit config from database (cached in Redis for 5 minutes): config = await get_limit_config(org_id, source_short_name)
# Returns: {"limit": 100, "window_seconds": 60}
Build Redis key
Creates key based on rate_limit_level: # Org-level (Google Drive)
key = "source_rate_limit: {org_id} :google_drive:org:org"
# Connection-level (Notion)
key = "source_rate_limit: {org_id} :notion:connection: {connection_id} "
Execute Lua script
Atomically checks count and increments if under limit: -- Remove old entries outside window
redis . call ( 'ZREMRANGEBYSCORE' , key , 0 , window_start )
-- Count current requests in window
local current_count = redis . call ( 'ZCOUNT' , key , window_start , current_time )
-- Check limit
if current_count >= limit then
return { - 1 , retry_after } -- Over limit
end
-- Add current request
redis . call ( 'ZADD' , key , current_time , unique_id )
redis . call ( 'EXPIRE' , key , window_seconds * 2 )
return { current_count + 1 , 0 } -- Success
Handle result
Under limit : Request proceeds, counter incremented
Over limit : SourceRateLimitExceededException raised with retry_after
Atomic Lua Script
The complete Lua script ensuring race-condition-free rate limiting:
local key = KEYS [ 1 ]
local limit = tonumber ( ARGV [ 1 ])
local window_start = tonumber ( ARGV [ 2 ])
local current_time = tonumber ( ARGV [ 3 ])
local window_seconds = tonumber ( ARGV [ 4 ])
local expire_seconds = tonumber ( ARGV [ 5 ])
local unique_id = ARGV [ 6 ]
-- Remove old entries outside sliding window
redis . call ( 'ZREMRANGEBYSCORE' , key , 0 , window_start )
-- Count current requests in window
local current_count = redis . call ( 'ZCOUNT' , key , window_start , current_time )
-- Check if over limit
if current_count >= limit then
-- Calculate retry_after from oldest entry
local oldest = redis . call ( 'ZRANGE' , key , 0 , 0 , 'WITHSCORES' )
local retry_after = window_seconds
if oldest and oldest [ 2 ] then
local oldest_timestamp = tonumber ( oldest [ 2 ])
retry_after = math.max ( 0.1 , ( oldest_timestamp + window_seconds ) - current_time )
end
return { - 1 , retry_after }
end
-- Add current request to window
redis . call ( 'ZADD' , key , current_time , unique_id )
redis . call ( 'EXPIRE' , key , expire_seconds )
return { current_count + 1 , 0 }
Key features :
Atomic : All operations in single Lua script (no race conditions)
Sliding window : Continuously slides, not fixed buckets
Automatic cleanup : Old entries removed, TTL set for auto-deletion
Accurate retry : Calculates exact retry_after from oldest entry
Pipedream Proxy Limits
Pipedream’s OAuth proxy has infrastructure limits shared across all sources and users in an organization:
Default window (5 minutes)
Configuration :
# Set custom limit via API
curl -X PUT http://localhost:8001/api/v1/source-rate-limits/pipedream_proxy \
-H "Content-Type: application/json" \
-d '{"limit": 2000, "window_seconds": 300}'
Usage :
from airweave.core.source_rate_limiter_service import SourceRateLimiter
# Check before any Pipedream OAuth request
await SourceRateLimiter.check_pipedream_proxy_limit(org_id)
# If limit exceeded:
# SourceRateLimitExceededException: Pipedream proxy rate limit exceeded,
# retry after 120.5s
Implementation Details
Redis Data Structure
Sorted Set with:
Score : Request timestamp (Unix epoch)
Member : Unique request ID (UUID)
Key : Namespaced by org, source, level, connection
Example :
ZADD source_rate_limit:org-123:google_drive:org:org \
1709481234.567 "req-uuid-1"
1709481235.123 "req-uuid-2"
1709481236.789 "req-uuid-3"
ZCOUNT source_rate_limit:org-123:google_drive:org:org \
1709481180.000 1709481240.000
# Returns: 3
Caching Strategy
Rate limit configurations cached in Redis: cache_key = "source_rate_limit_config: {org_id} : {source_short_name} "
ttl = 300 # 5 minutes
# Cache hit
cached = await redis.get(cache_key)
if cached:
return json.loads(cached)
# Cache miss - fetch from database
config = await db.fetch_limit(org_id, source_short_name)
await redis.setex(cache_key, ttl, json.dumps(config))
Benefits :
Reduces database load
Fast lookups on every request
5-minute TTL allows config changes to propagate quickly
Source Metadata Cache (10 minutes)
Absense of config also cached: if not rate_limit_obj:
# Cache empty dict to indicate "no limit configured"
await redis.setex(cache_key, ttl, " {} " )
return None
Prevents : Repeated database lookups for unconfigured sources
Error Handling
from airweave.core.exceptions import SourceRateLimitExceededException
try :
await SourceRateLimiter.check_and_increment(
org_id = org_id,
source_short_name = "google_drive" ,
source_connection_id = connection_id
)
# Request allowed - proceed with API call
response = await source_api.fetch_documents()
except SourceRateLimitExceededException as e:
# Rate limit exceeded
logger.warning(
f "Rate limit exceeded for { e.source_short_name } . "
f "Retry after { e.retry_after :.2f} s"
)
# Option 1: Retry with backoff
await asyncio.sleep(e.retry_after)
await retry_request()
# Option 2: Queue for later
await queue_for_retry(request, delay = e.retry_after)
# Option 3: Fail sync job
raise SyncFailureError( f "Rate limit exceeded, retry in { e.retry_after } s" )
Example Configurations
Quota : 1000 requests per 100 seconds per orgConfiguration :{
"source_short_name" : "google_drive" ,
"limit" : 900 , // 10% safety margin
"window_seconds" : 100
}
Behavior : All users/connections in org share 900 req/100s limitQuota : 3 requests per second per workspaceConfiguration :{
"source_short_name" : "notion" ,
"limit" : 150 , // 3 req/s * 60s = 180, with margin
"window_seconds" : 60
}
Behavior : Each Notion workspace connection has separate 150 req/min limitQuota : 5000 requests per hour per orgConfiguration :{
"source_short_name" : "github" ,
"limit" : 4500 , // 10% safety margin
"window_seconds" : 3600
}
Behavior : All GitHub connections share 4500 req/hour limitQuota : 15,000 API calls per 24 hoursConfiguration :{
"source_short_name" : "salesforce" ,
"limit" : 13500 , // 10% safety margin
"window_seconds" : 86400
}
Behavior : Org-wide daily limit
Monitoring & Debugging
View Current Counts
# Check current request count in window
redis-cli ZCOUNT source_rate_limit:org-123:google_drive:org:org \
$( date -d '60 seconds ago' +%s ) .0 $( date +%s ) .0
# List all requests in window with timestamps
redis-cli ZRANGE source_rate_limit:org-123:google_drive:org:org \
0 -1 WITHSCORES
Debug Logs
# Enable debug logging
import logging
logging.getLogger( "airweave.core.source_rate_limiter_service" ).setLevel(logging. DEBUG )
Sample output :
[DEBUG] [SourceRateLimit] Checking source rate limit: org=org-123, source=google_drive, connection=None, rate_limit_level=org
[DEBUG] [SourceRateLimit] Using custom limit from DB: 100 req/60s
[DEBUG] [SourceRateLimit] ✅ Request allowed - 45/100 requests in window. org=org-123, source=google_drive, rate_limit_level=org, window=60s
Test Rate Limiting
import asyncio
from airweave.core.source_rate_limiter_service import SourceRateLimiter
from airweave.core.exceptions import SourceRateLimitExceededException
async def test_rate_limit ():
org_id = "test-org-123"
# Set low limit for testing
# (via API: PUT /api/v1/source-rate-limits/test_source)
for i in range ( 150 ):
try :
await SourceRateLimiter.check_and_increment(
org_id = org_id,
source_short_name = "test_source"
)
print ( f "Request { i + 1 } : ✅ Allowed" )
await asyncio.sleep( 0.1 )
except SourceRateLimitExceededException as e:
print ( f "Request { i + 1 } : ❌ Blocked (retry after { e.retry_after :.2f} s)" )
break
asyncio.run(test_rate_limit())
Troubleshooting
Rate limit not being enforced
Check :
Feature flag enabled:
ctx.has_feature(FeatureFlag. SOURCE_RATE_LIMITING )
Source has rate_limit_level set (not null)
Rate limit configured in source_rate_limit table
check_and_increment called before API requests
Debug :# Check source metadata
SELECT short_name, rate_limit_level FROM source WHERE short_name='google_drive' ;
# Check configured limit
SELECT * FROM source_rate_limit WHERE source_short_name='google_drive' ;
Symptom : Rate limit triggered even though actual count is lowCauses :
Old entries not cleaned : Check Redis sorted set has correct TTL
Clock skew : Servers have different times
Multiple orgs : Wrong org_id used
Solution :# Manually clear rate limit key
redis-cli DEL source_rate_limit:org-123:google_drive:org:org
# Check server time sync
date && docker exec airweave-redis redis-cli TIME
Symptom : Config changes not taking effectCause : Cache TTL (5 minutes for config, 10 minutes for metadata)Solution :# Force cache invalidation
redis-cli DEL source_rate_limit_config:org-123:google_drive
redis-cli DEL source_metadata:google_drive:rate_limit_level
Or wait up to 10 minutes for automatic expiration.
Symptom :ConnectionError: Error connecting to Redis
Solution :
Verify Redis is running:
docker ps | grep redis
redis-cli ping
Check connection settings in .env:
REDIS_HOST = localhost
REDIS_PORT = 6379
Best Practices
Set limits below actual quotas
Leave 10-20% safety margin to account for:
Other API consumers in the org
Quota calculation differences
Burst traffic
Example : If quota is 1000 req/min, set limit to 900
Choose appropriate windows
Match window to source API’s reset interval:
Per-second : window_seconds=1
Per-minute : window_seconds=60
Per-hour : window_seconds=3600
Per-day : window_seconds=86400
Avoid : Mismatched windows (e.g., 90-second window for per-minute quota)
Track actual API calls vs configured limits: -- Count requests in last 60 seconds
SELECT COUNT ( * ) FROM api_request_log
WHERE source = 'google_drive'
AND org_id = 'org-123'
AND timestamp > NOW () - INTERVAL '60 seconds' ;
Adjust limits based on actual usage patterns.
Use connection-level for per-user quotas
If source has separate quotas per user:
Set rate_limit_level='connection' in source table
Configure limit per connection, not org
Example : Notion workspaces have independent rate limits
Next Steps
Source Connections Configure source connections with rate limits
API Reference Complete API documentation
Webhooks Set up webhooks for rate limit events
Monitoring Monitor rate limit usage in production