Events API
Overview
The Events API provides a powerful pub/sub system for subscribing serverless functions to system events. Functions can be triggered automatically when specific events occur in the platform, enabling reactive workflows, data validation, and custom business logic.
Key Features:
- Event-driven architecture - Subscribe functions to system events
- Async execution - All functions execute asynchronously via Celery
- Blocking support - PRE events can block operations until completion
- Type-safe - Pydantic validation for event types and parameters
- Multi-tenant - Fully integrated with tenant isolation
Architecture
graph TD
A[System Event Trigger] --> B[Event Object Created]
B --> C[EventService.publish_event]
C --> D{Get Subscribers}
D --> E[Function 1]
D --> F[Function 2]
D --> G[Function N]
E --> H[Celery Task 1]
F --> I[Celery Task 2]
G --> J[Celery Task N]
H --> K{PRE Event?}
I --> K
J --> K
K -->|Yes| L[Block & Wait]
K -->|No| M[Fire & Forget]
L --> N[Continue Operation]
M --> N
Key Concepts
EventType Enum
Events are defined as an enum to ensure type safety:
from core.events.registry import EventType
class EventType(str, Enum):
PRE_USER_CREATE = "PRE_USER_CREATE"
POST_USER_CREATE = "POST_USER_CREATE"
Event Object
Events use Pydantic models for validation:
from core.events.registry import Event, EventType
event = Event(
event_type=EventType.POST_USER_CREATE,
params={
"user_id": 123,
"username": "john_doe",
"email": "john@example.com"
}
)
Subscribers
A Subscriber links a Function to an Event. When the event is published, all subscribed functions are executed.
Unique Constraint: Each (event, function) pair must be unique - a function can only subscribe once to each event type.
Async Execution
All functions execute asynchronously via Celery. The publish_event() method returns task IDs immediately without waiting for completion (unless explicitly blocked).
PRE vs POST Events
PRE Events (e.g., PRE_USER_CREATE):
- Fire before the main operation
- Can block execution by waiting for all tasks to complete
- Used for validation, enrichment, or prerequisite checks
- If any task fails, the entire operation is aborted
POST Events (e.g., POST_USER_CREATE):
- Fire after the main operation completes
- Non-blocking (fire and forget)
- Used for notifications, logging, analytics, or side effects
- Errors are logged but don't affect the main operation
Event Registry
Available Events
| Event Type | When Triggered | Blocking | Use Cases |
|---|---|---|---|
PRE_USER_CREATE | Before user.save() | Yes | Validate user data, check external systems, enrich attributes |
POST_USER_CREATE | After user.save() | No | Send welcome email, create default settings, log analytics |
Base URL Structure
Global Events:
/api/cloud/events/
App-Scoped Events:
/api/apps/{app_slug}/events/
API Endpoints
List All Events
Get a list of all available event types.
Endpoint:
GET /api/cloud/events/
Authentication: Required
Response:
{
"success": true,
"message": "Events retrieved successfully",
"data": {
"events": [
"PRE_USER_CREATE",
"POST_USER_CREATE"
]
}
}
Example:
curl -X GET https://api.example.com/api/cloud/events/ \
-H "Authorization: Bearer <token>"
Subscribe to Event
Subscribe a function to an event type.
Endpoint:
POST /api/cloud/events/{event_name}/subscribe/
Authentication: Required
Path Parameters:
event_name(string, required) - Event type name (e.g., "PRE_USER_CREATE")
Request Body:
{
"function_id": 123
}
Response:
{
"success": true,
"message": "Function 123 subscribed to event 'PRE_USER_CREATE' successfully",
"data": {
"id": 1,
"event": "PRE_USER_CREATE",
"function": 123,
"function_name": "Validate User Data",
"function_slug": "validate-user-data",
"app_name": "User Management",
"app_slug": "user-management",
"created_at": "2025-01-15T10:30:00Z",
"updated_at": "2025-01-15T10:30:00Z"
}
}
Error Responses:
400 Bad Request- Invalid event type
{
"success": false,
"message": "Invalid event type 'INVALID_EVENT'. Must be one of: PRE_USER_CREATE, POST_USER_CREATE"
}
400 Bad Request- Function already subscribed
{
"success": false,
"message": "Function 123 is already subscribed to event 'PRE_USER_CREATE'"
}
400 Bad Request- Function doesn't exist
{
"success": false,
"message": "Function with id 123 does not exist"
}
Example:
curl -X POST https://api.example.com/api/cloud/events/PRE_USER_CREATE/subscribe/ \
-H "Authorization: Bearer <token>" \
-H "Content-Type: application/json" \
-d '{"function_id": 123}'
Unsubscribe from Event
Unsubscribe a function from an event type.
Endpoint:
POST /api/cloud/events/{event_name}/unsubscribe/
Authentication: Required
Path Parameters:
event_name(string, required) - Event type name
Request Body:
{
"function_id": 123
}
Response:
{
"success": true,
"message": "Function 123 unsubscribed from event 'PRE_USER_CREATE' successfully",
"data": {}
}
Error Responses:
404 Not Found- Subscription doesn't exist
{
"success": false,
"message": "Function 123 was not subscribed to event 'PRE_USER_CREATE'"
}
Example:
curl -X POST https://api.example.com/api/cloud/events/PRE_USER_CREATE/unsubscribe/ \
-H "Authorization: Bearer <token>" \
-H "Content-Type: application/json" \
-d '{"function_id": 123}'
Get App Event Details
Get event details with subscribed functions filtered by app.
Endpoint:
GET /api/apps/{app_slug}/events/{event_name}/
Authentication: Required
Path Parameters:
app_slug(string, required) - App slugevent_name(string, required) - Event type name
Response:
{
"success": true,
"message": "Event 'PRE_USER_CREATE' details retrieved successfully",
"data": {
"event": "PRE_USER_CREATE",
"app": {
"id": 1,
"name": "User Management",
"slug": "user-management"
},
"total_subscribers": 2,
"subscribers": [
{
"subscriber_id": 1,
"event": "PRE_USER_CREATE",
"function_id": 123,
"function_name": "Validate User Data",
"function_slug": "validate-user-data",
"app_id": 1,
"app_name": "User Management",
"app_slug": "user-management",
"created_at": "2025-01-15T10:30:00Z"
},
{
"subscriber_id": 2,
"event": "PRE_USER_CREATE",
"function_id": 124,
"function_name": "Check Email Domain",
"function_slug": "check-email-domain",
"app_id": 1,
"app_name": "User Management",
"app_slug": "user-management",
"created_at": "2025-01-15T11:00:00Z"
}
]
}
}
Example:
curl -X GET https://api.example.com/api/apps/user-management/events/PRE_USER_CREATE/ \
-H "Authorization: Bearer <token>"
Integration Example: User Creation Flow
The Events system is integrated into the user creation flow to demonstrate PRE and POST event patterns. Here's the complete implementation from the UserCreateUpdateSerializer:
from celery.result import AsyncResult
from rest_framework import serializers
from core.events.services import EventService
from core.events.registry import Event, EventType
class UserCreateUpdateSerializer(serializers.ModelSerializer):
def create(self, validated_data):
"""Create user with PRE and POST event hooks"""
password = validated_data.pop("password", None)
user = User(**validated_data)
if password:
user.set_password(password)
else:
user.set_unusable_password()
# Get requesting user from context
request_user = self.context.get('request').user if self.context.get('request') else None
# Initialize event service
event_service = EventService()
# PRE_USER_CREATE event - Block until all functions complete
try:
# Create Event object (validation happens here)
pre_event = Event(
event_type=EventType.PRE_USER_CREATE,
params=validated_data # Send validated user data
)
# Publish PRE_USER_CREATE event
result = event_service.publish_event(pre_event, request_user)
# Block on all task IDs - wait for all functions to complete
for task_info in result.get('task_ids', []):
task_id = task_info.get('task_id')
if task_id:
# Task was created successfully - wait for it to complete
try:
async_result = AsyncResult(task_id)
# Block until task completes
async_result.get()
except Exception as task_error:
raise serializers.ValidationError(
f"PRE_USER_CREATE event failed for function {task_info['function_name']}: {str(task_error)}"
)
elif task_info.get('status') == 'error':
# Task creation failed
raise serializers.ValidationError(
f"PRE_USER_CREATE event failed for function {task_info['function_name']}: {task_info.get('error')}"
)
except serializers.ValidationError:
# Re-raise validation errors
raise
except Exception as e:
# Catch any other errors and wrap them
raise serializers.ValidationError(f"PRE_USER_CREATE event failed: {str(e)}")
# Save user to database (only after PRE events succeed)
user.save()
# POST_USER_CREATE event (non-blocking)
try:
# Create Event object
post_event = Event(
event_type=EventType.POST_USER_CREATE,
params=UserDetailSerializer(user).data # Send complete user object
)
# Publish POST_USER_CREATE event (fire and forget)
event_service.publish_event(post_event, request_user)
except Exception as e:
# Log error but don't fail user creation
logger.error(f"POST_USER_CREATE event failed: {str(e)}")
return user
Event Flow Explanation
Step 1: PRE_USER_CREATE Event (Blocking)
- Create
Eventobject with validated user data - Call
publish_event()which executes all subscribed functions asynchronously - Receive
task_idsarray with Celery task IDs - Loop through each task ID and block using
AsyncResult.get() - If any task fails → abort user creation with
ValidationError - If all tasks succeed → continue to save user
Step 2: Save User
Only executed if all PRE events succeed.
Step 3: POST_USER_CREATE Event (Non-blocking)
- Create
Eventobject with complete user object (including ID) - Call
publish_event()and ignore the response - Functions execute asynchronously in the background
- Errors are logged but don't affect user creation
Response Format
The publish_event() method returns:
{
'event': 'PRE_USER_CREATE',
'task_ids': [
{
'function_id': 123,
'function_name': 'Validate User Data',
'task_id': 'celery-task-uuid-1',
'invocation_id': 1
},
{
'function_id': 124,
'function_name': 'Check Email Domain',
'task_id': 'celery-task-uuid-2',
'invocation_id': 2
}
],
'total_subscribers': 2
}
Error case (when function execution fails):
{
'function_id': 125,
'function_name': 'Failed Function',
'status': 'error',
'error': 'Function execution failed',
'task_id': None
}
Best Practices
When to Use PRE Events
Use PRE events when you need to:
- Validate data before committing changes
- Enrich data with information from external systems
- Check prerequisites (e.g., quota limits, permissions)
- Block operations that don't meet criteria
Example PRE event functions:
- Email domain validation
- Credit check before user activation
- Data enrichment from CRM
- Duplicate detection
When to Use POST Events
Use POST events when you need to:
- Send notifications (email, SMS, webhooks)
- Log or track events for analytics
- Sync data to external systems
- Create side effects that shouldn't block the main operation
Example POST event functions:
- Send welcome email
- Create Stripe customer
- Log to analytics platform
- Sync to CRM
Error Handling
PRE Events:
- Always wrap in try/except to handle failures gracefully
- Provide clear error messages indicating which function failed
- Consider retry logic for transient failures
POST Events:
- Log errors but don't re-raise them
- Consider dead letter queues for failed tasks
- Monitor execution via invocation records
Performance Considerations
- Keep functions fast - PRE events block the main operation
- Use timeouts - Default timeout is 300 seconds (5 minutes)
- Monitor task queues - Ensure Celery workers are scaled appropriately
- Limit subscribers - Too many subscribers can slow down operations
Security
- Validate event params - Don't trust event data implicitly
- Use permissions - Control who can subscribe functions to events
- Sanitize sensitive data - Be careful with passwords, tokens in event params
Internal Use Only
Important: The Events API does not provide a public endpoint to publish events. Events can only be published internally within the application code using the EventService class.
This design ensures:
- Security - External users cannot trigger arbitrary events
- Control - Event publishing is tightly controlled by application logic
- Consistency - Events are only fired at appropriate lifecycle points
To trigger events in your code:
from core.events.services import EventService
from core.events.registry import Event, EventType
# Initialize service
event_service = EventService()
# Create event
event = Event(
event_type=EventType.PRE_USER_CREATE,
params={"username": "john_doe", "email": "john@example.com"}
)
# Publish event
result = event_service.publish_event(event, user=request.user)