Skip to main content

Async Tools

Use async/await for non-blocking operations:
import asyncio
import aiohttp
from fastapps import BaseWidget

class AsyncWeatherWidget(BaseWidget):
    identifier = "async_weather"
    title = "Async Weather Widget"
    input_schema = WeatherInput
    invoking = "Fetching weather data…"
    invoked = "Weather data ready!"

    async def execute(self, inputs: WeatherInput, ctx):
        async with aiohttp.ClientSession() as session:
            # Fetch multiple weather sources concurrently
            tasks = [
                self.fetch_weather(session, inputs.city, "openweather"),
                self.fetch_weather(session, inputs.city, "weather_api"),
                self.fetch_weather(session, inputs.city, "accuweather")
            ]
            
            results = await asyncio.gather(*tasks, return_exceptions=True)
            
            return {
                "city": inputs.city,
                "sources": len([r for r in results if not isinstance(r, Exception)]),
                "weather": self.merge_weather_data(results)
            }
    
    async def fetch_weather(self, session, city, provider):
        # Implementation details...
        pass

State Management

Use FastApps context for persistent state:
class StatefulWidget(BaseWidget):
    identifier = "stateful"
    title = "Stateful Widget"
    input_schema = StatefulInput
    invoking = "Processing…"
    invoked = "Done!"

    async def execute(self, inputs: StatefulInput, ctx):
        # Get existing state
        user_preferences = await ctx.state.get("user_preferences", {})
        session_count = await ctx.state.get("session_count", 0)
        
        # Update state
        await ctx.state.set("session_count", session_count + 1)
        await ctx.state.set("last_action", inputs.action)
        
        # Use state in logic
        if session_count > 5:
            user_preferences["experienced_user"] = True
            await ctx.state.set("user_preferences", user_preferences)
        
        return {
            "action": inputs.action,
            "session_count": session_count + 1,
            "preferences": user_preferences
        }

Database Integration

Connect to databases with proper connection management:
import asyncpg
from fastapps import BaseWidget

class DatabaseWidget(BaseWidget):
    identifier = "database"
    title = "Database Widget"
    input_schema = DatabaseInput
    invoking = "Querying database…"
    invoked = "Data retrieved!"

    async def execute(self, inputs: DatabaseInput, ctx):
        # Get database connection from settings
        db_url = ctx.settings.get("DATABASE_URL")
        
        if not db_url:
            raise ValueError("Database URL not configured")
        
        try:
            conn = await asyncpg.connect(db_url)
            
            # Your database queries
            if inputs.query_type == "users":
                rows = await conn.fetch("SELECT * FROM users WHERE active = $1", True)
                data = [dict(row) for row in rows]
            elif inputs.query_type == "analytics":
                rows = await conn.fetch("SELECT * FROM analytics WHERE date >= $1", inputs.start_date)
                data = self.process_analytics(rows)
            else:
                raise ValueError(f"Unknown query type: {inputs.query_type}")
            
            return {
                "query_type": inputs.query_type,
                "count": len(data),
                "data": data
            }
            
        finally:
            await conn.close()

File Processing

Handle file uploads and processing:
import tempfile
import os
from fastapps import BaseWidget

class FileProcessingWidget(BaseWidget):
    identifier = "file_processor"
    title = "File Processing Widget"
    input_schema = FileProcessingInput
    invoking = "Processing file…"
    invoked = "File processed!"

    async def execute(self, inputs: FileProcessingInput, ctx):
        # Validate file
        if inputs.file_size > 10 * 1024 * 1024:  # 10MB limit
            raise ValueError("File too large")
        
        # Create temporary file
        with tempfile.NamedTemporaryFile(delete=False, suffix=inputs.file_extension) as tmp_file:
            tmp_file.write(inputs.file_content)
            tmp_path = tmp_file.name
        
        try:
            # Process file based on type
            if inputs.file_extension == ".pdf":
                result = await self.process_pdf(tmp_path)
            elif inputs.file_extension == ".csv":
                result = await self.process_csv(tmp_path)
            elif inputs.file_extension in [".jpg", ".png"]:
                result = await self.process_image(tmp_path)
            else:
                raise ValueError(f"Unsupported file type: {inputs.file_extension}")
            
            return {
                "file_name": inputs.file_name,
                "file_size": inputs.file_size,
                "processing_result": result
            }
            
        finally:
            # Clean up temporary file
            os.unlink(tmp_path)
    
    async def process_pdf(self, file_path):
        # PDF processing logic
        pass
    
    async def process_csv(self, file_path):
        # CSV processing logic
        pass
    
    async def process_image(self, file_path):
        # Image processing logic
        pass

Caching

Implement caching for expensive operations:
import hashlib
import json
from fastapps import BaseWidget

class CachedWidget(BaseWidget):
    identifier = "cached"
    title = "Cached Widget"
    input_schema = CachedInput
    invoking = "Processing…"
    invoked = "Done!"

    async def execute(self, inputs: CachedInput, ctx):
        # Generate cache key
        cache_key = self.generate_cache_key(inputs)
        
        # Try to get from cache
        cached_result = await ctx.state.get(f"cache:{cache_key}")
        if cached_result:
            ctx.logger.info(f"Cache hit for key: {cache_key}")
            return {
                **cached_result,
                "cached": True,
                "cache_key": cache_key
            }
        
        # Expensive operation
        result = await self.expensive_operation(inputs)
        
        # Cache the result (with TTL)
        await ctx.state.set(f"cache:{cache_key}", result, ttl=3600)  # 1 hour
        
        return {
            **result,
            "cached": False,
            "cache_key": cache_key
        }
    
    def generate_cache_key(self, inputs):
        # Create deterministic cache key
        data = inputs.dict(sort_keys=True)
        data_str = json.dumps(data, sort_keys=True)
        return hashlib.md5(data_str.encode()).hexdigest()
    
    async def expensive_operation(self, inputs):
        # Your expensive operation here
        await asyncio.sleep(2)  # Simulate expensive operation
        return {"result": "expensive_data"}

Error Handling & Logging

Comprehensive error handling and logging:
import traceback
from fastapps import BaseWidget

class RobustWidget(BaseWidget):
    identifier = "robust"
    title = "Robust Widget"
    input_schema = RobustInput
    invoking = "Processing…"
    invoked = "Done!"

    async def execute(self, inputs: RobustInput, ctx):
        ctx.logger.info(f"Starting execution with inputs: {inputs.dict()}")
        
        try:
            # Validate inputs
            self.validate_inputs(inputs)
            
            # Main logic
            result = await self.main_logic(inputs, ctx)
            
            ctx.logger.info(f"Execution completed successfully")
            return result
            
        except ValidationError as e:
            ctx.logger.warning(f"Validation error: {e}")
            return {
                "error": "validation_error",
                "message": str(e),
                "field": getattr(e, 'field', None)
            }
            
        except TimeoutError as e:
            ctx.logger.error(f"Timeout error: {e}")
            return {
                "error": "timeout_error",
                "message": "Operation timed out",
                "fallback": await self.get_fallback_data()
            }
            
        except Exception as e:
            ctx.logger.exception(f"Unexpected error: {e}")
            ctx.logger.error(f"Traceback: {traceback.format_exc()}")
            
            return {
                "error": "internal_error",
                "message": "An unexpected error occurred",
                "error_id": ctx.logger.get_correlation_id()
            }
    
    def validate_inputs(self, inputs):
        # Custom validation logic
        if inputs.amount < 0:
            raise ValidationError("Amount cannot be negative")
    
    async def main_logic(self, inputs, ctx):
        # Your main business logic
        pass
    
    async def get_fallback_data(self):
        # Return fallback data when operations fail
        return {"status": "fallback", "data": "cached_data"}

Background Tasks

Run background tasks without blocking the response:
import asyncio
from fastapps import BaseWidget

class BackgroundTaskWidget(BaseWidget):
    identifier = "background_task"
    title = "Background Task Widget"
    input_schema = BackgroundTaskInput
    invoking = "Starting task…"
    invoked = "Task started!"

    async def execute(self, inputs: BackgroundTaskInput, ctx):
        # Start background task
        task_id = self.generate_task_id()
        
        # Don't await - let it run in background
        asyncio.create_task(self.run_background_task(task_id, inputs, ctx))
        
        return {
            "task_id": task_id,
            "status": "started",
            "message": "Background task is running"
        }
    
    async def run_background_task(self, task_id, inputs, ctx):
        try:
            ctx.logger.info(f"Background task {task_id} started")
            
            # Long-running task
            for i in range(10):
                await asyncio.sleep(1)
                ctx.logger.info(f"Background task {task_id} progress: {i}/10")
            
            # Update final state
            await ctx.state.set(f"task:{task_id}", {
                "status": "completed",
                "result": "background_task_completed"
            })
            
            ctx.logger.info(f"Background task {task_id} completed")
            
        except Exception as e:
            ctx.logger.exception(f"Background task {task_id} failed: {e}")
            await ctx.state.set(f"task:{task_id}", {
                "status": "failed",
                "error": str(e)
            })
    
    def generate_task_id(self):
        import uuid
        return str(uuid.uuid4())

Next Steps

I