Skip to main content

Overview

Airweave’s rate limiting system prevents exhausting customer API quotas by enforcing configurable limits on external source API calls. It uses Redis-based distributed rate limiting with atomic Lua scripts to handle concurrent requests across horizontally scaled instances.
Rate limiting is opt-in via the SOURCE_RATE_LIMITING feature flag and is configured per organization.

Rate Limit Levels

Each source defines its rate limiting strategy in the source table via the rate_limit_level field:
Level: orgScope: All users/connections in the organization share the same limitUse case: Sources with org-wide API quotasExamples:
  • Google Drive (quota per Google Workspace org)
  • GitHub (quota per GitHub org)
  • Salesforce (quota per Salesforce instance)
Redis key format:
source_rate_limit:{org_id}:google_drive:org:org

Configuration Model

Rate limits are configured in the source_rate_limit table:
CREATE TABLE source_rate_limit (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    organization_id UUID NOT NULL REFERENCES organization(id),
    source_short_name VARCHAR NOT NULL,  -- e.g., "google_drive", "notion"
    limit INTEGER NOT NULL,              -- Max requests per window
    window_seconds INTEGER NOT NULL DEFAULT 60,  -- Time window in seconds
    created_at TIMESTAMP DEFAULT NOW(),
    modified_at TIMESTAMP DEFAULT NOW(),
    
    UNIQUE(organization_id, source_short_name)
);
source_short_name
string
required
Source identifier from the source tableExamples: "google_drive", "notion", "github"Special case: "pipedream_proxy" for Pipedream OAuth proxy limits
limit
integer
required
Maximum number of requests allowed within the time windowMust be: > 0Examples:
  • 100 requests per window
  • 1000 requests per window
window_seconds
integer
default:"60"
Time window duration in secondsCommon values:
  • 60 = 1 minute
  • 300 = 5 minutes
  • 3600 = 1 hour
  • 86400 = 1 day
Default: 60 seconds (1 minute)

API Endpoints

List Source Rate Limits

GET /api/v1/source-rate-limits
Returns: All sources with their rate limit configurations Response:
[
  {
    "source_short_name": "pipedream_proxy",
    "rate_limit_level": "org",
    "limit": 1000,
    "window_seconds": 300,
    "id": "abc-123-def-456"
  },
  {
    "source_short_name": "google_drive",
    "rate_limit_level": "org",
    "limit": 100,
    "window_seconds": 60,
    "id": "xyz-789-ghi-012"
  },
  {
    "source_short_name": "notion",
    "rate_limit_level": "connection",
    "limit": 50,
    "window_seconds": 60,
    "id": null  // Not configured yet
  },
  {
    "source_short_name": "file_upload",
    "rate_limit_level": null,
    "limit": null,
    "window_seconds": null,
    "id": null
  }
]
Notes:
  • Pipedream proxy always appears first
  • Sources sorted by rate_limit_level (supported first)
  • id=null means no custom limit configured (uses defaults or unlimited)

Set Rate Limit

PUT /api/v1/source-rate-limits/{source_short_name}
Content-Type: application/json

{
  "limit": 100,
  "window_seconds": 60
}
Creates new limit if doesn’t exist, updates if it does. Example:
curl -X PUT http://localhost:8001/api/v1/source-rate-limits/google_drive \
  -H "Content-Type: application/json" \
  -d '{
    "limit": 100,
    "window_seconds": 60
  }'
Response: 200 OK
{
  "id": "abc-123-def-456",
  "organization_id": "org-789",
  "source_short_name": "google_drive",
  "limit": 100,
  "window_seconds": 60,
  "created_at": "2024-03-02T12:00:00Z",
  "modified_at": "2024-03-02T12:00:00Z"
}

Delete Rate Limit

DELETE /api/v1/source-rate-limits/{source_short_name}
Removes rate limit configuration (reverts to unlimited or default) Response: 204 No Content

Sliding Window Algorithm

Airweave uses a Redis sorted set sliding window algorithm for accurate, distributed rate limiting:

How It Works

1

Request arrives

Backend calls check_and_increment(org_id, source_short_name, connection_id)
2

Lookup configuration

Fetches rate limit config from database (cached in Redis for 5 minutes):
config = await get_limit_config(org_id, source_short_name)
# Returns: {"limit": 100, "window_seconds": 60}
3

Build Redis key

Creates key based on rate_limit_level:
# Org-level (Google Drive)
key = "source_rate_limit:{org_id}:google_drive:org:org"

# Connection-level (Notion)
key = "source_rate_limit:{org_id}:notion:connection:{connection_id}"
4

Execute Lua script

Atomically checks count and increments if under limit:
-- Remove old entries outside window
redis.call('ZREMRANGEBYSCORE', key, 0, window_start)

-- Count current requests in window
local current_count = redis.call('ZCOUNT', key, window_start, current_time)

-- Check limit
if current_count >= limit then
    return {-1, retry_after}  -- Over limit
end

-- Add current request
redis.call('ZADD', key, current_time, unique_id)
redis.call('EXPIRE', key, window_seconds * 2)

return {current_count + 1, 0}  -- Success
5

Handle result

  • Under limit: Request proceeds, counter incremented
  • Over limit: SourceRateLimitExceededException raised with retry_after

Atomic Lua Script

The complete Lua script ensuring race-condition-free rate limiting:
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window_start = tonumber(ARGV[2])
local current_time = tonumber(ARGV[3])
local window_seconds = tonumber(ARGV[4])
local expire_seconds = tonumber(ARGV[5])
local unique_id = ARGV[6]

-- Remove old entries outside sliding window
redis.call('ZREMRANGEBYSCORE', key, 0, window_start)

-- Count current requests in window
local current_count = redis.call('ZCOUNT', key, window_start, current_time)

-- Check if over limit
if current_count >= limit then
    -- Calculate retry_after from oldest entry
    local oldest = redis.call('ZRANGE', key, 0, 0, 'WITHSCORES')
    local retry_after = window_seconds
    
    if oldest and oldest[2] then
        local oldest_timestamp = tonumber(oldest[2])
        retry_after = math.max(0.1, (oldest_timestamp + window_seconds) - current_time)
    end
    
    return {-1, retry_after}
end

-- Add current request to window
redis.call('ZADD', key, current_time, unique_id)
redis.call('EXPIRE', key, expire_seconds)

return {current_count + 1, 0}
Key features:
  • Atomic: All operations in single Lua script (no race conditions)
  • Sliding window: Continuously slides, not fixed buckets
  • Automatic cleanup: Old entries removed, TTL set for auto-deletion
  • Accurate retry: Calculates exact retry_after from oldest entry

Pipedream Proxy Limits

Pipedream’s OAuth proxy has infrastructure limits shared across all sources and users in an organization:
PIPEDREAM_PROXY_LIMIT
int
default:"1000"
Default limit for Pipedream proxyFrom Pipedream documentation
PIPEDREAM_PROXY_WINDOW
int
default:"300"
Default window (5 minutes)
Configuration:
# Set custom limit via API
curl -X PUT http://localhost:8001/api/v1/source-rate-limits/pipedream_proxy \
  -H "Content-Type: application/json" \
  -d '{"limit": 2000, "window_seconds": 300}'
Usage:
from airweave.core.source_rate_limiter_service import SourceRateLimiter

# Check before any Pipedream OAuth request
await SourceRateLimiter.check_pipedream_proxy_limit(org_id)

# If limit exceeded:
# SourceRateLimitExceededException: Pipedream proxy rate limit exceeded, 
# retry after 120.5s

Implementation Details

Redis Data Structure

Sorted Set with:
  • Score: Request timestamp (Unix epoch)
  • Member: Unique request ID (UUID)
  • Key: Namespaced by org, source, level, connection
Example:
ZADD source_rate_limit:org-123:google_drive:org:org \
  1709481234.567 "req-uuid-1"
  1709481235.123 "req-uuid-2"
  1709481236.789 "req-uuid-3"

ZCOUNT source_rate_limit:org-123:google_drive:org:org \
  1709481180.000 1709481240.000
# Returns: 3

Caching Strategy

Rate limit configurations cached in Redis:
cache_key = "source_rate_limit_config:{org_id}:{source_short_name}"
ttl = 300  # 5 minutes

# Cache hit
cached = await redis.get(cache_key)
if cached:
    return json.loads(cached)

# Cache miss - fetch from database
config = await db.fetch_limit(org_id, source_short_name)
await redis.setex(cache_key, ttl, json.dumps(config))
Benefits:
  • Reduces database load
  • Fast lookups on every request
  • 5-minute TTL allows config changes to propagate quickly
rate_limit_level from source table cached:
cache_key = "source_metadata:{source_short_name}:rate_limit_level"
ttl = 600  # 10 minutes

cached = await redis.get(cache_key)
if cached:
    return cached if cached != "None" else None

# Fetch from source table
source = await crud.source.get_by_short_name(db, source_short_name)
await redis.setex(cache_key, ttl, source.rate_limit_level or "None")
Longer TTL: Source metadata rarely changes
Absense of config also cached:
if not rate_limit_obj:
    # Cache empty dict to indicate "no limit configured"
    await redis.setex(cache_key, ttl, "{}")
    return None
Prevents: Repeated database lookups for unconfigured sources

Error Handling

from airweave.core.exceptions import SourceRateLimitExceededException

try:
    await SourceRateLimiter.check_and_increment(
        org_id=org_id,
        source_short_name="google_drive",
        source_connection_id=connection_id
    )
    
    # Request allowed - proceed with API call
    response = await source_api.fetch_documents()
    
except SourceRateLimitExceededException as e:
    # Rate limit exceeded
    logger.warning(
        f"Rate limit exceeded for {e.source_short_name}. "
        f"Retry after {e.retry_after:.2f}s"
    )
    
    # Option 1: Retry with backoff
    await asyncio.sleep(e.retry_after)
    await retry_request()
    
    # Option 2: Queue for later
    await queue_for_retry(request, delay=e.retry_after)
    
    # Option 3: Fail sync job
    raise SyncFailureError(f"Rate limit exceeded, retry in {e.retry_after}s")

Example Configurations

Quota: 1000 requests per 100 seconds per orgConfiguration:
{
  "source_short_name": "google_drive",
  "limit": 900,  // 10% safety margin
  "window_seconds": 100
}
Behavior: All users/connections in org share 900 req/100s limit

Monitoring & Debugging

View Current Counts

# Check current request count in window
redis-cli ZCOUNT source_rate_limit:org-123:google_drive:org:org \
  $(date -d '60 seconds ago' +%s).0 $(date +%s).0

# List all requests in window with timestamps
redis-cli ZRANGE source_rate_limit:org-123:google_drive:org:org \
  0 -1 WITHSCORES

Debug Logs

# Enable debug logging
import logging
logging.getLogger("airweave.core.source_rate_limiter_service").setLevel(logging.DEBUG)
Sample output:
[DEBUG] [SourceRateLimit] Checking source rate limit: org=org-123, source=google_drive, connection=None, rate_limit_level=org
[DEBUG] [SourceRateLimit] Using custom limit from DB: 100 req/60s
[DEBUG] [SourceRateLimit] ✅ Request allowed - 45/100 requests in window. org=org-123, source=google_drive, rate_limit_level=org, window=60s

Test Rate Limiting

import asyncio
from airweave.core.source_rate_limiter_service import SourceRateLimiter
from airweave.core.exceptions import SourceRateLimitExceededException

async def test_rate_limit():
    org_id = "test-org-123"
    
    # Set low limit for testing
    # (via API: PUT /api/v1/source-rate-limits/test_source)
    
    for i in range(150):
        try:
            await SourceRateLimiter.check_and_increment(
                org_id=org_id,
                source_short_name="test_source"
            )
            print(f"Request {i+1}: ✅ Allowed")
            await asyncio.sleep(0.1)
        except SourceRateLimitExceededException as e:
            print(f"Request {i+1}: ❌ Blocked (retry after {e.retry_after:.2f}s)")
            break

asyncio.run(test_rate_limit())

Troubleshooting

Check:
  1. Feature flag enabled:
    ctx.has_feature(FeatureFlag.SOURCE_RATE_LIMITING)
    
  2. Source has rate_limit_level set (not null)
  3. Rate limit configured in source_rate_limit table
  4. check_and_increment called before API requests
Debug:
# Check source metadata
SELECT short_name, rate_limit_level FROM source WHERE short_name='google_drive';

# Check configured limit
SELECT * FROM source_rate_limit WHERE source_short_name='google_drive';
Symptom: Rate limit triggered even though actual count is lowCauses:
  1. Old entries not cleaned: Check Redis sorted set has correct TTL
  2. Clock skew: Servers have different times
  3. Multiple orgs: Wrong org_id used
Solution:
# Manually clear rate limit key
redis-cli DEL source_rate_limit:org-123:google_drive:org:org

# Check server time sync
date && docker exec airweave-redis redis-cli TIME
Symptom: Config changes not taking effectCause: Cache TTL (5 minutes for config, 10 minutes for metadata)Solution:
# Force cache invalidation
redis-cli DEL source_rate_limit_config:org-123:google_drive
redis-cli DEL source_metadata:google_drive:rate_limit_level
Or wait up to 10 minutes for automatic expiration.
Symptom:
ConnectionError: Error connecting to Redis
Solution:
  1. Verify Redis is running:
    docker ps | grep redis
    redis-cli ping
    
  2. Check connection settings in .env:
    REDIS_HOST=localhost
    REDIS_PORT=6379
    

Best Practices

Leave 10-20% safety margin to account for:
  • Other API consumers in the org
  • Quota calculation differences
  • Burst traffic
Example: If quota is 1000 req/min, set limit to 900
Match window to source API’s reset interval:
  • Per-second: window_seconds=1
  • Per-minute: window_seconds=60
  • Per-hour: window_seconds=3600
  • Per-day: window_seconds=86400
Avoid: Mismatched windows (e.g., 90-second window for per-minute quota)
Track actual API calls vs configured limits:
-- Count requests in last 60 seconds
SELECT COUNT(*) FROM api_request_log 
WHERE source='google_drive' 
  AND org_id='org-123'
  AND timestamp > NOW() - INTERVAL '60 seconds';
Adjust limits based on actual usage patterns.
If source has separate quotas per user:
  • Set rate_limit_level='connection' in source table
  • Configure limit per connection, not org
Example: Notion workspaces have independent rate limits

Next Steps

Source Connections

Configure source connections with rate limits

API Reference

Complete API documentation

Webhooks

Set up webhooks for rate limit events

Monitoring

Monitor rate limit usage in production