Skip to main content

Taruvi SDK Guide for APP Mode Functions

Complete reference for using the Taruvi SDK within APP mode serverless functions.


Function Signature

APP mode functions must use this exact signature:

def main(params, user_data, sdk_client):
"""
APP mode function entry point.

Args:
params: User parameters + params['__function__'] metadata
user_data: Dict with id, username, email, full_name
sdk_client: Pre-authenticated Taruvi SDK client

Returns:
Any JSON-serializable data
"""
pass

Parameters Explained

params - Dictionary containing:

  • User-provided parameters from function invocation
  • params['__function__'] - Function metadata (name, slug, execution_mode, etc.)
  • params['request'] - HTTP request metadata (method, path, full_path, query_params, headers, content_type, scheme, host, origin, remote_addr, body). Empty dict {} for scheduled/event-triggered functions. Sensitive headers (Authorization, Cookie, X-Session-Token) are excluded.

user_data - Dictionary containing authenticated user info:

{
"id": 123,
"username": "alice",
"email": "alice@example.com",
"full_name": "Alice Smith"
}

sdk_client - Pre-authenticated Taruvi SDK client

  • Already authenticated with current user context
  • Ready to use - no need to call auth methods
  • Provides access to all platform features

Database Module

CRUD operations on DataTables.

Methods

sdk_client.database.from_(table_name)           # Returns QueryBuilder (chainable)
sdk_client.database.get(table_name, record_id) # Get single record
sdk_client.database.create(table_name, data) # Create record
sdk_client.database.update(table_name, record_id, data) # Update record
sdk_client.database.delete(table_name, record_id) # Delete record

QueryBuilder Chaining

.filter(field, operator, value)  # Operators: eq, ne, gt, gte, lt, lte, in, contains
.sort(field, direction) # direction: "asc" or "desc"
.page_size(limit) # Limit results
.page(number) # Page number
.populate(*fields) # Eager load relationships
.search(query) # Full-text search
.aggregate(*expressions) # e.g., "sum(price)", "count(*)"
.group_by(*fields) # Group aggregation results
.having(condition) # Filter aggregated results
.edges() # Target edges table for relationships
.format(type) # "tree" or "graph" for hierarchical data
.include(direction) # "descendants", "ancestors", or "both"
.depth(n) # Max traversal depth
.types(relationship_types) # Filter by relationship types
.execute() # Returns {"data": [...], "total": N}
.first() # Returns first record or None
.count() # Returns count of matching records

Examples

Simple Query:

def main(params, user_data, sdk_client):
result = sdk_client.database.from_("users").execute()
return {"users": result["data"], "total": result["total"]}

Query with Filters:

def main(params, user_data, sdk_client):
result = sdk_client.database.from_("users")\
.filter("status", "eq", "active")\
.filter("age", "gte", 18)\
.sort("created_at", "desc")\
.page_size(10)\
.execute()

return {"users": result["data"], "total": result["total"]}

Get Single Record:

def main(params, user_data, sdk_client):
user_id = params.get("user_id")
user = sdk_client.database.get("users", user_id)
return {"user": user}

Create Record:

def main(params, user_data, sdk_client):
new_user = sdk_client.database.create("users", {
"username": params.get("username"),
"email": params.get("email"),
"status": "active"
})
return {"created": True, "user": new_user}

Update Record:

def main(params, user_data, sdk_client):
user_id = params.get("user_id")
updated_user = sdk_client.database.update("users", user_id, {
"status": "inactive"
})
return {"updated": True, "user": updated_user}

Delete Record:

def main(params, user_data, sdk_client):
user_id = params.get("user_id")
sdk_client.database.delete("users", record_id=user_id)
return {"deleted": True, "user_id": user_id}

Functions Module

Execute other serverless functions and retrieve results.

Methods

sdk_client.functions.execute(function_slug, params, is_async=False, timeout=None)
sdk_client.functions.get_result(task_id)
sdk_client.functions.list(limit=None, offset=None)
sdk_client.functions.get(function_slug)
sdk_client.functions.list_invocations(function_slug, status=None, limit=None, offset=None)

Examples

Execute Sync Function:

def main(params, user_data, sdk_client):
result = sdk_client.functions.execute(
"send-email",
params={
"to": "user@example.com",
"subject": "Welcome"
}
)
return {"email_sent": True, "result": result}

Execute Async Function:

def main(params, user_data, sdk_client):
result = sdk_client.functions.execute(
"process-large-dataset",
params={"dataset_id": 123},
is_async=True
)

task_id = result['invocation']['celery_task_id']
return {"task_started": True, "task_id": task_id}

Get Async Result:

def main(params, user_data, sdk_client):
task_id = params.get("task_id")
result = sdk_client.functions.get_result(task_id)

return {
"status": result['data']['status'],
"result": result['data'].get('result')
}

List Functions:

def main(params, user_data, sdk_client):
functions = sdk_client.functions.list(limit=10)
return {"functions": functions['data']}

Secrets Module

Access secrets securely without hardcoding credentials.

Methods

sdk_client.secrets.get(key, app=None, tags=None)
sdk_client.secrets.list(keys=None, search=None, app=None, tags=None, secret_type=None, include_metadata=False, page=None, page_size=None)

Examples

Get Single Secret:

def main(params, user_data, sdk_client):
api_key = sdk_client.secrets.get("SENDGRID_API_KEY")

# Use the secret
send_email_with_api_key(api_key["value"])

return {"sent": True}

Get Multiple Secrets:

def main(params, user_data, sdk_client):
secrets = sdk_client.secrets.list(keys=[
"SENDGRID_API_KEY",
"STRIPE_SECRET_KEY",
"AWS_ACCESS_KEY"
])

sendgrid_key = secrets["SENDGRID_API_KEY"]["value"]
stripe_key = secrets["STRIPE_SECRET_KEY"]["value"]

return {"keys_loaded": True}

List Secrets:

def main(params, user_data, sdk_client):
secrets = sdk_client.secrets.list(
search="API",
tags=["production"]
)
return {"secrets": secrets['data']}

⚠️ Critical Rules

  • Never hardcode secrets in function code
  • Always use secrets.get() to retrieve credentials
  • Handle missing secrets gracefully with try/except
  • Never log secret values - only log keys

Analytics Module

Execute saved analytics queries with parameters.

Methods

sdk_client.analytics.execute(query_slug, params=None)

Returns

{
"data": [...], # Query results
"total": 100, # Row count
"execution_key": "..." # Execution tracking key
}

Examples

Simple Query Execution:

def main(params, user_data, sdk_client):
result = sdk_client.analytics.execute("monthly-revenue")
return {"revenue": result["data"]}

Query with Parameters:

def main(params, user_data, sdk_client):
result = sdk_client.analytics.execute(
"user-signups",
params={
"start_date": "2024-01-01",
"end_date": "2024-12-31",
"group_by": "month"
}
)
return {
"signups": result["data"],
"total": result["total"]
}

Query with Dynamic Filters:

def main(params, user_data, sdk_client):
region = params.get("region", "US")
category = params.get("category", "all")

result = sdk_client.analytics.execute(
"sales-by-region",
params={
"region": region,
"product_category": category
}
)
return result["data"]

Notes

  • Queries must be created first (use MCP analytics tools or API)
  • Supports Jinja2 templating in query definitions
  • Uses secrets for database credentials
  • Results are cached based on execution_key

Storage Module

Upload, download, and manage files in storage buckets.

Methods

# Bucket management
sdk_client.storage.list_buckets(search=None, visibility=None, app_category=None, ordering=None, page=None, page_size=None)
sdk_client.storage.create_bucket(name, slug=None, visibility="private", file_size_limit=None, allowed_mime_types=None, app_category=None, max_size_bytes=None, max_objects=None)
sdk_client.storage.get_bucket(slug)
sdk_client.storage.update_bucket(slug, name=None, visibility=None, file_size_limit=None, allowed_mime_types=None, app_category=None, max_size_bytes=None, max_objects=None)
sdk_client.storage.delete_bucket(slug)

# File operations (chainable via from_)
sdk_client.storage.from_(bucket_name).upload(files, paths, metadatas=None)
sdk_client.storage.from_(bucket_name).download(file_path)
sdk_client.storage.from_(bucket_name).list()
sdk_client.storage.from_(bucket_name).filter(search=None, mimetype=None, visibility=None, ordering=None, page=None, page_size=None).list()
sdk_client.storage.from_(bucket_name).update(file_path, metadata=None, visibility=None)
sdk_client.storage.from_(bucket_name).delete(paths)
sdk_client.storage.from_(bucket_name).copy_object(source_path, destination_path, destination_bucket=None)
sdk_client.storage.from_(bucket_name).move_object(source_path, destination_path)

Upload Parameters

The upload() method uses the batch upload endpoint internally:

ParameterTypeRequiredDescription
fileslist[tuple[str, BinaryIO]]YesList of (filename, file_object) tuples
pathslist[str]YesStorage paths for each file (must match files length)
metadataslist[dict]NoOptional metadata dict per file (max 2KB each per S3 limits)

Constraints: Max 10 files per request. Each file validated against bucket's file_size_limit and allowed_mime_types.

Response: Returns the data field from the API response containing uploaded_count, failed_count, successful, and failed arrays.

Examples

Upload File:

def main(params, user_data, sdk_client):
with open("report.pdf", "rb") as f:
result = sdk_client.storage.from_("documents").upload(
files=[("report.pdf", f)],
paths=["uploads/report.pdf"],
metadatas=[{"department": "finance"}]
)

return {"uploaded": True, "result": result}

Upload Multiple Files:

def main(params, user_data, sdk_client):
files = [
("photo1.jpg", open("photo1.jpg", "rb")),
("photo2.jpg", open("photo2.jpg", "rb")),
]
result = sdk_client.storage.from_("images").upload(
files=files,
paths=["gallery/photo1.jpg", "gallery/photo2.jpg"],
metadatas=[{"album": "vacation"}, {"album": "vacation"}]
)
return {"uploaded_count": result.get("uploaded_count", 0)}

Download File:

def main(params, user_data, sdk_client):
file_path = params.get("file_path")
file_data = sdk_client.storage.from_("documents").download(file_path)

# file_data is raw bytes
return {"downloaded": True, "size": len(file_data)}

List and Filter Files:

def main(params, user_data, sdk_client):
# List all files
files = sdk_client.storage.from_("documents").list()

# List with filters
images = (
sdk_client.storage.from_("uploads")
.filter(mimetype_category="image", ordering="-created_at", page_size=20)
.list()
)
return {"files": files, "images": images}

Update File Metadata:

def main(params, user_data, sdk_client):
result = sdk_client.storage.from_("documents").update(
"reports/q1.pdf",
metadata={"status": "reviewed", "reviewer": "alice"},
visibility="public"
)
return {"updated": result}

Delete Files:

def main(params, user_data, sdk_client):
paths = params.get("paths", [])
sdk_client.storage.from_("documents").delete(paths)
return {"deleted": True, "count": len(paths)}

Copy Object:

def main(params, user_data, sdk_client):
# Copy within same bucket
result = sdk_client.storage.from_("images").copy_object(
"users/123/avatar.jpg",
"users/456/avatar.jpg"
)

# Copy to different bucket
result = sdk_client.storage.from_("uploads").copy_object(
"temp/file.pdf",
"archive/file.pdf",
destination_bucket="archives"
)
return {"copied": result}

Move/Rename Object:

def main(params, user_data, sdk_client):
result = sdk_client.storage.from_("documents").move_object(
"temp/draft.pdf",
"final/report-2024.pdf"
)
return {"moved": result}

Create Bucket with Quotas:

def main(params, user_data, sdk_client):
bucket = sdk_client.storage.create_bucket(
name="User Uploads",
slug="user-uploads",
visibility="private",
file_size_limit=10485760, # 10MB per file
allowed_mime_types=["image/*", "application/pdf"],
app_category="attachments",
max_size_bytes=1073741824, # 1GB total bucket quota
max_objects=1000 # Max 1000 files
)
return {"created": True, "bucket": bucket}

Users Module

Manage users and retrieve user information.

Methods

sdk_client.users.get(username)                                    # Get user by username
sdk_client.users.create(data) # Create user (dict with username, email, password, confirm_password, ...)
sdk_client.users.update(username, data) # Update user (dict with fields to update)
sdk_client.users.delete(username) # Delete user
sdk_client.users.list(search=None, is_active=None, is_staff=None, roles=None, page=None, page_size=None, **kwargs) # List users (kwargs for reference attribute filters)
sdk_client.users.apps(username) # Get user's apps
sdk_client.users.assign_roles(roles, usernames, expires_at=None) # Bulk assign roles
sdk_client.users.revoke_roles(roles, usernames) # Bulk revoke roles

Examples

Get User:

def main(params, user_data, sdk_client):
username = params.get("username")
user = sdk_client.users.get(username)
return {"user": user["data"]}

List Users:

def main(params, user_data, sdk_client):
users = sdk_client.users.list(
search="alice",
is_active=True,
page=1,
page_size=10
)
return {"users": users["data"], "total": users["total"]}

List Users by Reference Attribute:

def main(params, user_data, sdk_client):
dept_id = params.get("department_id")
users = sdk_client.users.list(department_id=dept_id, is_active=True)
return {"users": users["data"], "total": users["total"]}

Create User:

def main(params, user_data, sdk_client):
new_user = sdk_client.users.create({
"username": params.get("username"),
"email": params.get("email"),
"password": params.get("password"),
"confirm_password": params.get("password"),
"first_name": params.get("first_name"),
"last_name": params.get("last_name"),
"is_active": True
})
return {"created": True, "user": new_user}

Assign Roles:

def main(params, user_data, sdk_client):
result = sdk_client.users.assign_roles(
roles=["admin", "editor"],
usernames=["alice", "bob"]
)
return {"message": result["message"]}

Get User's Apps:

def main(params, user_data, sdk_client):
username = params.get("username")
apps = sdk_client.users.apps(username)
return {"apps": apps}

Update User:

def main(params, user_data, sdk_client):
username = params.get("username")
updated_user = sdk_client.users.update(username, {
"first_name": params.get("first_name"),
"is_active": True
})
return {"updated": True, "user": updated_user}

Policy Module

Check authorization permissions and filter allowed resources using Cerbos-based policies.

Methods

sdk_client.policy.check_resources(resources, principal=None, aux_data=None)
sdk_client.policy.filter_allowed(resources, actions, principal=None, aux_data=None)
sdk_client.policy.get_allowed_actions(resource, actions=None, principal=None, aux_data=None)

Resource Kind Format

The kind field in resources follows a specific format based on the entity type:

Format: {entity_type}:{resource_name}

Entity TypeFormatExample
DataTabledatatable:{table_name}datatable:users, datatable:orders
Functionfunction:{function_slug}function:send-email, function:process-data
Storagestorage:{bucket_name}storage:documents, storage:images
Queryquery:{query_slug}query:monthly-revenue, query:user-signups
Custom{custom_type}:{name}invoice:sales_invoices, project:active_projects

System Entity Types and Actions

Entity TypeAvailable Actions
datatableread, update, create, delete
functionexecute
queryexecute
storageread, update, create, delete

Principal Parameter

⚠️ Important: In APP mode functions, the principal parameter should be set to None. The principal is automatically populated from the API key/JWT token used to authenticate the SDK client.

# ✅ CORRECT - Let the system auto-populate principal from API key
result = sdk_client.policy.check_resources(
resources=[...],
principal=None # Auto-populated from authenticated context
)

# ❌ WRONG - Don't manually specify principal in APP mode
result = sdk_client.policy.check_resources(
resources=[...],
principal={"id": user_data["id"], "roles": ["user"]} # Not needed!
)

Examples

Check Permissions on DataTable:

def main(params, user_data, sdk_client):
result = sdk_client.policy.check_resources(
resources=[{
"resource": {
"kind": "datatable:users", # Format: entity_type:resource_name
"id": "users",
"attr": {"owner_id": user_data["id"]} # Optional resource attributes
},
"actions": ["read", "update", "delete"]
}],
principal=None # Auto-populated from API key
)

can_read = result["results"][0]["actions"]["read"] == "EFFECT_ALLOW"
can_update = result["results"][0]["actions"]["update"] == "EFFECT_ALLOW"
can_delete = result["results"][0]["actions"]["delete"] == "EFFECT_ALLOW"

return {
"can_read": can_read,
"can_update": can_update,
"can_delete": can_delete
}

Check Function Execution Permission:

def main(params, user_data, sdk_client):
result = sdk_client.policy.check_resources(
resources=[{
"resource": {
"kind": "function:send-email", # Format: function:{slug}
"id": "send-email"
},
"actions": ["execute"]
}],
principal=None # Auto-populated from API key
)

can_execute = result["results"][0]["actions"]["execute"] == "EFFECT_ALLOW"
return {"can_execute": can_execute}

Check Storage Permissions:

def main(params, user_data, sdk_client):
result = sdk_client.policy.check_resources(
resources=[{
"resource": {
"kind": "storage:documents", # Format: storage:{bucket_name}
"id": "documents",
"attr": {"visibility": "private"}
},
"actions": ["read", "create", "delete"]
}],
principal=None # Auto-populated from API key
)

actions = result["results"][0]["actions"]
return {
"can_read": actions["read"] == "EFFECT_ALLOW",
"can_create": actions["create"] == "EFFECT_ALLOW",
"can_delete": actions["delete"] == "EFFECT_ALLOW"
}

Check Custom Resource Type:

def main(params, user_data, sdk_client):
# For custom entity types (non-system), use your own type:name format
result = sdk_client.policy.check_resources(
resources=[{
"resource": {
"kind": "invoice:sales_invoices", # Custom type: invoice
"id": "inv_001",
"attr": {
"owner_id": user_data["id"],
"status": "pending",
"amount": 1500.00
}
},
"actions": ["read", "update", "approve", "delete"]
}],
principal=None # Auto-populated from API key
)

return {"permissions": result["results"][0]["actions"]}

Check Multiple Resources at Once:

def main(params, user_data, sdk_client):
result = sdk_client.policy.check_resources(
resources=[
{
"resource": {"kind": "datatable:users", "id": "users"},
"actions": ["read", "update"]
},
{
"resource": {"kind": "datatable:orders", "id": "orders"},
"actions": ["read", "create"]
},
{
"resource": {"kind": "function:process-order", "id": "process-order"},
"actions": ["execute"]
}
],
principal=None # Auto-populated from API key
)

return {
"users_permissions": result["results"][0]["actions"],
"orders_permissions": result["results"][1]["actions"],
"function_permissions": result["results"][2]["actions"]
}

Filter Allowed Resources:

def main(params, user_data, sdk_client):
all_tables = [
{"kind": "datatable:users", "id": "users"},
{"kind": "datatable:orders", "id": "orders"},
{"kind": "datatable:admin_logs", "id": "admin_logs"}
]

allowed = sdk_client.policy.filter_allowed(
resources=all_tables,
actions=["read", "update"],
principal=None # Auto-populated from API key
)

return {"allowed_tables": allowed}

Using Auxiliary Data:

def main(params, user_data, sdk_client):
# aux_data provides additional context for policy evaluation
result = sdk_client.policy.check_resources(
resources=[{
"resource": {
"kind": "datatable:sensitive_data",
"id": "sensitive_data",
"attr": {"classification": "confidential"}
},
"actions": ["read"]
}],
principal=None, # Auto-populated from API key
aux_data={
"ip_address": params.get("client_ip", "unknown"),
"time_of_day": "business_hours",
"device_type": "desktop"
}
)

return {"can_access": result["results"][0]["actions"]["read"] == "EFFECT_ALLOW"}

Response Format

The check_resources method returns a response in this format:

{
"requestId": "unique-request-id",
"results": [
{
"resource": {
"id": "users",
"kind": "datatable:users",
"policyVersion": "default",
"scope": "tenant_id_app_slug"
},
"actions": {
"read": "EFFECT_ALLOW",
"update": "EFFECT_ALLOW",
"delete": "EFFECT_DENY"
},
"validationErrors": [],
"meta": {
"actions": {...},
"effectiveDerivedRoles": [...]
}
}
],
"cerbosCallId": "..."
}

Notes

  • Scope Injection: The system automatically injects the scope ({tenant_id}_{app_slug}) into all resource checks for multi-tenant isolation
  • Policy Version: Defaults to "default" unless specified otherwise
  • Resource Attributes: Use attr to pass resource-specific attributes that policies can evaluate (e.g., owner_id, status, department)
  • Effect Values: Actions return either "EFFECT_ALLOW" or "EFFECT_DENY"

Auth Module

ℹ️ Usually not needed in APP mode functions

The sdk_client is already authenticated with the current user context.

Methods

sdk_client.auth.signInWithPassword(username, password)
sdk_client.auth.signInWithToken(token, token_type) # token_type: 'jwt', 'api_key', 'session_token'
sdk_client.auth.refreshToken(refresh_token)
sdk_client.auth.signOut()
sdk_client.is_authenticated # Property on the client, not auth module

In most cases, you don't need auth methods because sdk_client is already authenticated:

def main(params, user_data, sdk_client):
# ✅ sdk_client is already authenticated as the calling user
# ✅ Just use it directly
users = sdk_client.database.from_("users").execute()
return users

Advanced Usage (If Needed)

If you need to authenticate as a different user or service account:

def main(params, user_data, sdk_client):
# Get service account API key from secrets
service_key = sdk_client.secrets.get("SERVICE_ACCOUNT_KEY")

# Authenticate as service account
service_client = sdk_client.auth.signInWithToken(
token=service_key["value"],
token_type='api_key'
)

# Use service_client for privileged operations
admin_data = service_client.database.from_("admin_logs").execute()

return {"admin_data": admin_data}

External SDK Usage

Auth methods are primarily for external SDK usage (scripts, CLIs, standalone apps):

# External Python script (NOT an APP mode function)
from taruvi import Client

client = Client(api_url='https://api.taruvi.cloud', app_slug='my-app')

# Authenticate (returns NEW client instance)
auth_client = client.auth.signInWithPassword(
username='user@example.com',
password='password'
)

# Use authenticated client
result = auth_client.database.from_("users").execute()
users = result["data"] # execute() returns {"data": [...], "total": N}

Complete Examples

Email Notification Function

def main(params, user_data, sdk_client):
# Get parameters
recipient_id = params.get("user_id")
message = params.get("message")

# Fetch recipient from database
recipient = sdk_client.database.get("users", recipient_id)

# Get SendGrid API key from secrets
sendgrid = sdk_client.secrets.get("SENDGRID_API_KEY")

# Call email function
result = sdk_client.functions.execute(
"send-email",
params={
"to": recipient["email"],
"subject": "Notification",
"body": message,
"api_key": sendgrid["value"]
}
)

return {
"sent": True,
"recipient": recipient["email"],
"message_id": result.get("message_id")
}

Data Processing Pipeline

def main(params, user_data, sdk_client):
# Get input data
dataset_id = params.get("dataset_id")

# Fetch dataset
dataset = sdk_client.database.get("datasets", dataset_id)

# Check permissions - principal auto-populated from API key
can_process = sdk_client.policy.check_resources(
resources=[{
"resource": {
"kind": "datatable:datasets", # Format: entity_type:resource_name
"id": str(dataset_id),
"attr": {"owner_id": dataset.get("owner_id")}
},
"actions": ["read", "update"] # Use valid datatable actions
}],
principal=None # Auto-populated from API key
)

if can_process["results"][0]["actions"]["read"] != "EFFECT_ALLOW":
return {"error": "Permission denied"}

# Execute processing function asynchronously
result = sdk_client.functions.execute(
"process-dataset",
params={"dataset_id": dataset_id},
is_async=True
)

# Upload results to storage
task_id = result['invocation']['celery_task_id']

return {
"processing": True,
"task_id": task_id,
"dataset": dataset["name"]
}

Report Generation Function

def main(params, user_data, sdk_client):
# Get date range from params
start_date = params.get("start_date")
end_date = params.get("end_date")

# Execute analytics query
revenue_data = sdk_client.analytics.execute(
"monthly-revenue",
params={
"start_date": start_date,
"end_date": end_date,
"group_by": "month"
}
)

# Get user signups
signup_data = sdk_client.analytics.execute(
"user-signups",
params={
"start_date": start_date,
"end_date": end_date
}
)

# Generate report content
report_content = f"""
Revenue Report
Period: {start_date} to {end_date}

Total Revenue: ${sum(r['revenue'] for r in revenue_data['data'])}
New Signups: {signup_data['total']}
"""

# Upload report to storage
with open("/tmp/report.txt", "w") as f:
f.write(report_content)

with open("/tmp/report.txt", "rb") as f:
sdk_client.storage.from_("reports").upload(
files=[("report.txt", f)],
paths=[f"reports/{start_date}_to_{end_date}.txt"]
)

return {
"report_generated": True,
"total_revenue": sum(r['revenue'] for r in revenue_data['data']),
"new_signups": signup_data['total']
}

Critical Rules

❌ Don't Do This

  • Don't create new Client() - Use provided sdk_client
  • Don't call auth methods - sdk_client is already authenticated
  • Don't hardcode secrets - Always use secrets.get()
  • Don't skip error handling - User input can be anything
  • Don't assume records exist - Always check before operations

✅ Do This

  • Validate all params - Check for required fields
  • Handle exceptions gracefully - Use try/except blocks
  • Return meaningful data - Include success flags and error messages
  • Use secrets for credentials - Never hardcode API keys
  • Check permissions - Use policy module when needed
  • Log important events - For debugging and audit trails

Error Handling Pattern

def main(params, user_data, sdk_client):
try:
# Validate params
user_id = params.get("user_id")
if not user_id:
return {"success": False, "error": "user_id is required"}

# Fetch user
user = sdk_client.database.get("users", user_id)

# Get secret
api_key = sdk_client.secrets.get("API_KEY")

# Perform operation
result = perform_operation(user, api_key["value"])

return {"success": True, "result": result}

except Exception as e:
return {
"success": False,
"error": str(e),
"error_type": type(e).__name__
}