Async Tools
Use async/await for non-blocking operations:Copy
Ask AI
import asyncio
import aiohttp
from fastapps import BaseWidget
class AsyncWeatherWidget(BaseWidget):
identifier = "async_weather"
title = "Async Weather Widget"
input_schema = WeatherInput
invoking = "Fetching weather data…"
invoked = "Weather data ready!"
async def execute(self, inputs: WeatherInput, ctx):
async with aiohttp.ClientSession() as session:
# Fetch multiple weather sources concurrently
tasks = [
self.fetch_weather(session, inputs.city, "openweather"),
self.fetch_weather(session, inputs.city, "weather_api"),
self.fetch_weather(session, inputs.city, "accuweather")
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return {
"city": inputs.city,
"sources": len([r for r in results if not isinstance(r, Exception)]),
"weather": self.merge_weather_data(results)
}
async def fetch_weather(self, session, city, provider):
# Implementation details...
pass
State Management
Use FastApps context for persistent state:Copy
Ask AI
class StatefulWidget(BaseWidget):
identifier = "stateful"
title = "Stateful Widget"
input_schema = StatefulInput
invoking = "Processing…"
invoked = "Done!"
async def execute(self, inputs: StatefulInput, ctx):
# Get existing state
user_preferences = await ctx.state.get("user_preferences", {})
session_count = await ctx.state.get("session_count", 0)
# Update state
await ctx.state.set("session_count", session_count + 1)
await ctx.state.set("last_action", inputs.action)
# Use state in logic
if session_count > 5:
user_preferences["experienced_user"] = True
await ctx.state.set("user_preferences", user_preferences)
return {
"action": inputs.action,
"session_count": session_count + 1,
"preferences": user_preferences
}
Database Integration
Connect to databases with proper connection management:Copy
Ask AI
import asyncpg
from fastapps import BaseWidget
class DatabaseWidget(BaseWidget):
identifier = "database"
title = "Database Widget"
input_schema = DatabaseInput
invoking = "Querying database…"
invoked = "Data retrieved!"
async def execute(self, inputs: DatabaseInput, ctx):
# Get database connection from settings
db_url = ctx.settings.get("DATABASE_URL")
if not db_url:
raise ValueError("Database URL not configured")
try:
conn = await asyncpg.connect(db_url)
# Your database queries
if inputs.query_type == "users":
rows = await conn.fetch("SELECT * FROM users WHERE active = $1", True)
data = [dict(row) for row in rows]
elif inputs.query_type == "analytics":
rows = await conn.fetch("SELECT * FROM analytics WHERE date >= $1", inputs.start_date)
data = self.process_analytics(rows)
else:
raise ValueError(f"Unknown query type: {inputs.query_type}")
return {
"query_type": inputs.query_type,
"count": len(data),
"data": data
}
finally:
await conn.close()
File Processing
Handle file uploads and processing:Copy
Ask AI
import tempfile
import os
from fastapps import BaseWidget
class FileProcessingWidget(BaseWidget):
identifier = "file_processor"
title = "File Processing Widget"
input_schema = FileProcessingInput
invoking = "Processing file…"
invoked = "File processed!"
async def execute(self, inputs: FileProcessingInput, ctx):
# Validate file
if inputs.file_size > 10 * 1024 * 1024: # 10MB limit
raise ValueError("File too large")
# Create temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix=inputs.file_extension) as tmp_file:
tmp_file.write(inputs.file_content)
tmp_path = tmp_file.name
try:
# Process file based on type
if inputs.file_extension == ".pdf":
result = await self.process_pdf(tmp_path)
elif inputs.file_extension == ".csv":
result = await self.process_csv(tmp_path)
elif inputs.file_extension in [".jpg", ".png"]:
result = await self.process_image(tmp_path)
else:
raise ValueError(f"Unsupported file type: {inputs.file_extension}")
return {
"file_name": inputs.file_name,
"file_size": inputs.file_size,
"processing_result": result
}
finally:
# Clean up temporary file
os.unlink(tmp_path)
async def process_pdf(self, file_path):
# PDF processing logic
pass
async def process_csv(self, file_path):
# CSV processing logic
pass
async def process_image(self, file_path):
# Image processing logic
pass
Caching
Implement caching for expensive operations:Copy
Ask AI
import hashlib
import json
from fastapps import BaseWidget
class CachedWidget(BaseWidget):
identifier = "cached"
title = "Cached Widget"
input_schema = CachedInput
invoking = "Processing…"
invoked = "Done!"
async def execute(self, inputs: CachedInput, ctx):
# Generate cache key
cache_key = self.generate_cache_key(inputs)
# Try to get from cache
cached_result = await ctx.state.get(f"cache:{cache_key}")
if cached_result:
ctx.logger.info(f"Cache hit for key: {cache_key}")
return {
**cached_result,
"cached": True,
"cache_key": cache_key
}
# Expensive operation
result = await self.expensive_operation(inputs)
# Cache the result (with TTL)
await ctx.state.set(f"cache:{cache_key}", result, ttl=3600) # 1 hour
return {
**result,
"cached": False,
"cache_key": cache_key
}
def generate_cache_key(self, inputs):
# Create deterministic cache key
data = inputs.dict(sort_keys=True)
data_str = json.dumps(data, sort_keys=True)
return hashlib.md5(data_str.encode()).hexdigest()
async def expensive_operation(self, inputs):
# Your expensive operation here
await asyncio.sleep(2) # Simulate expensive operation
return {"result": "expensive_data"}
Error Handling & Logging
Comprehensive error handling and logging:Copy
Ask AI
import traceback
from fastapps import BaseWidget
class RobustWidget(BaseWidget):
identifier = "robust"
title = "Robust Widget"
input_schema = RobustInput
invoking = "Processing…"
invoked = "Done!"
async def execute(self, inputs: RobustInput, ctx):
ctx.logger.info(f"Starting execution with inputs: {inputs.dict()}")
try:
# Validate inputs
self.validate_inputs(inputs)
# Main logic
result = await self.main_logic(inputs, ctx)
ctx.logger.info(f"Execution completed successfully")
return result
except ValidationError as e:
ctx.logger.warning(f"Validation error: {e}")
return {
"error": "validation_error",
"message": str(e),
"field": getattr(e, 'field', None)
}
except TimeoutError as e:
ctx.logger.error(f"Timeout error: {e}")
return {
"error": "timeout_error",
"message": "Operation timed out",
"fallback": await self.get_fallback_data()
}
except Exception as e:
ctx.logger.exception(f"Unexpected error: {e}")
ctx.logger.error(f"Traceback: {traceback.format_exc()}")
return {
"error": "internal_error",
"message": "An unexpected error occurred",
"error_id": ctx.logger.get_correlation_id()
}
def validate_inputs(self, inputs):
# Custom validation logic
if inputs.amount < 0:
raise ValidationError("Amount cannot be negative")
async def main_logic(self, inputs, ctx):
# Your main business logic
pass
async def get_fallback_data(self):
# Return fallback data when operations fail
return {"status": "fallback", "data": "cached_data"}
Background Tasks
Run background tasks without blocking the response:Copy
Ask AI
import asyncio
from fastapps import BaseWidget
class BackgroundTaskWidget(BaseWidget):
identifier = "background_task"
title = "Background Task Widget"
input_schema = BackgroundTaskInput
invoking = "Starting task…"
invoked = "Task started!"
async def execute(self, inputs: BackgroundTaskInput, ctx):
# Start background task
task_id = self.generate_task_id()
# Don't await - let it run in background
asyncio.create_task(self.run_background_task(task_id, inputs, ctx))
return {
"task_id": task_id,
"status": "started",
"message": "Background task is running"
}
async def run_background_task(self, task_id, inputs, ctx):
try:
ctx.logger.info(f"Background task {task_id} started")
# Long-running task
for i in range(10):
await asyncio.sleep(1)
ctx.logger.info(f"Background task {task_id} progress: {i}/10")
# Update final state
await ctx.state.set(f"task:{task_id}", {
"status": "completed",
"result": "background_task_completed"
})
ctx.logger.info(f"Background task {task_id} completed")
except Exception as e:
ctx.logger.exception(f"Background task {task_id} failed: {e}")
await ctx.state.set(f"task:{task_id}", {
"status": "failed",
"error": str(e)
})
def generate_task_id(self):
import uuid
return str(uuid.uuid4())
Next Steps
- API Integration - Connect to external APIs
- Back to Tool Basics
- Back to Server Overview