Skip to main content

HTTP API Integration

REST API Calls

import aiohttp
import asyncio
from fastapps import BaseWidget

class APIIntegrationWidget(BaseWidget):
    identifier = "api_integration"
    title = "API Integration Widget"
    input_schema = APIIntegrationInput
    invoking = "Fetching data from API…"
    invoked = "Data retrieved!"

    async def execute(self, inputs: APIIntegrationInput, ctx):
        api_key = ctx.settings.get("EXTERNAL_API_KEY")
        if not api_key:
            raise ValueError("API key not configured")
        
        async with aiohttp.ClientSession() as session:
            headers = {
                "Authorization": f"Bearer {api_key}",
                "Content-Type": "application/json"
            }
            
            # GET request
            async with session.get(
                f"https://api.example.com/data/{inputs.resource_id}",
                headers=headers
            ) as response:
                if response.status == 200:
                    data = await response.json()
                    return {
                        "status": "success",
                        "data": data,
                        "source": "external_api"
                    }
                else:
                    raise Exception(f"API request failed: {response.status}")

class MultipleAPIsWidget(BaseWidget):
    identifier = "multiple_apis"
    title = "Multiple APIs Widget"
    input_schema = MultipleAPIsInput
    invoking = "Fetching from multiple sources…"
    invoked = "All data retrieved!"

    async def execute(self, inputs: MultipleAPIsInput, ctx):
        # Fetch from multiple APIs concurrently
        tasks = [
            self.fetch_weather_api(inputs.city),
            self.fetch_news_api(inputs.city),
            self.fetch_maps_api(inputs.city)
        ]
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        return {
            "city": inputs.city,
            "weather": results[0] if not isinstance(results[0], Exception) else None,
            "news": results[1] if not isinstance(results[1], Exception) else None,
            "maps": results[2] if not isinstance(results[2], Exception) else None,
            "errors": [str(r) for r in results if isinstance(r, Exception)]
        }
    
    async def fetch_weather_api(self, city):
        async with aiohttp.ClientSession() as session:
            async with session.get(f"https://api.weather.com/v1/current?city={city}") as response:
                return await response.json()
    
    async def fetch_news_api(self, city):
        async with aiohttp.ClientSession() as session:
            async with session.get(f"https://api.news.com/v1/articles?location={city}") as response:
                return await response.json()
    
    async def fetch_maps_api(self, city):
        async with aiohttp.ClientSession() as session:
            async with session.get(f"https://api.maps.com/v1/geocode?address={city}") as response:
                return await response.json()

GraphQL Integration

import aiohttp
from fastapps import BaseWidget

class GraphQLWidget(BaseWidget):
    identifier = "graphql"
    title = "GraphQL Widget"
    input_schema = GraphQLInput
    invoking = "Querying GraphQL API…"
    invoked = "Query executed!"

    async def execute(self, inputs: GraphQLInput, ctx):
        query = """
        query GetUserData($userId: ID!) {
            user(id: $userId) {
                id
                name
                email
                posts {
                    id
                    title
                    content
                    createdAt
                }
            }
        }
        """
        
        variables = {"userId": inputs.user_id}
        
        payload = {
            "query": query,
            "variables": variables
        }
        
        async with aiohttp.ClientSession() as session:
            async with session.post(
                "https://api.example.com/graphql",
                json=payload,
                headers={"Content-Type": "application/json"}
            ) as response:
                result = await response.json()
                
                if "errors" in result:
                    raise Exception(f"GraphQL errors: {result['errors']}")
                
                return {
                    "user": result["data"]["user"],
                    "query": query,
                    "variables": variables
                }

Database Integration

PostgreSQL with AsyncPG

import asyncpg
from fastapps import BaseWidget

class DatabaseWidget(BaseWidget):
    identifier = "database"
    title = "Database Widget"
    input_schema = DatabaseInput
    invoking = "Querying database…"
    invoked = "Query executed!"

    async def execute(self, inputs: DatabaseInput, ctx):
        db_url = ctx.settings.get("DATABASE_URL")
        if not db_url:
            raise ValueError("Database URL not configured")
        
        conn = await asyncpg.connect(db_url)
        
        try:
            if inputs.query_type == "users":
                rows = await conn.fetch("""
                    SELECT id, name, email, created_at 
                    FROM users 
                    WHERE active = $1 
                    ORDER BY created_at DESC 
                    LIMIT $2
                """, True, inputs.limit)
                
                users = [dict(row) for row in rows]
                return {"users": users, "count": len(users)}
                
            elif inputs.query_type == "analytics":
                rows = await conn.fetch("""
                    SELECT 
                        DATE(created_at) as date,
                        COUNT(*) as count,
                        SUM(amount) as total_amount
                    FROM transactions 
                    WHERE created_at >= $1 AND created_at <= $2
                    GROUP BY DATE(created_at)
                    ORDER BY date
                """, inputs.start_date, inputs.end_date)
                
                analytics = [dict(row) for row in rows]
                return {"analytics": analytics, "period": f"{inputs.start_date} to {inputs.end_date}"}
                
        finally:
            await conn.close()

MongoDB with Motor

from motor.motor_asyncio import AsyncIOMotorClient
from fastapps import BaseWidget

class MongoWidget(BaseWidget):
    identifier = "mongodb"
    title = "MongoDB Widget"
    input_schema = MongoInput
    invoking = "Querying MongoDB…"
    invoked = "Query executed!"

    async def execute(self, inputs: MongoInput, ctx):
        mongo_url = ctx.settings.get("MONGODB_URL")
        if not mongo_url:
            raise ValueError("MongoDB URL not configured")
        
        client = AsyncIOMotorClient(mongo_url)
        db = client[inputs.database]
        collection = db[inputs.collection]
        
        try:
            if inputs.query_type == "find":
                cursor = collection.find(inputs.filter or {})
                if inputs.limit:
                    cursor = cursor.limit(inputs.limit)
                
                documents = await cursor.to_list(length=inputs.limit)
                return {"documents": documents, "count": len(documents)}
                
            elif inputs.query_type == "aggregate":
                pipeline = inputs.pipeline
                cursor = collection.aggregate(pipeline)
                results = await cursor.to_list(length=None)
                return {"results": results, "count": len(results)}
                
        finally:
            client.close()

Third-Party Service Integration

OpenAI API

import openai
from fastapps import BaseWidget

class OpenAIIntegrationWidget(BaseWidget):
    identifier = "openai_integration"
    title = "OpenAI Integration Widget"
    input_schema = OpenAIIntegrationInput
    invoking = "Processing with OpenAI…"
    invoked = "OpenAI processing complete!"

    async def execute(self, inputs: OpenAIIntegrationInput, ctx):
        api_key = ctx.settings.get("OPENAI_API_KEY")
        if not api_key:
            raise ValueError("OpenAI API key not configured")
        
        openai.api_key = api_key
        
        if inputs.service == "completion":
            response = await openai.Completion.acreate(
                model=inputs.model or "text-davinci-003",
                prompt=inputs.prompt,
                max_tokens=inputs.max_tokens or 100,
                temperature=inputs.temperature or 0.7
            )
            
            return {
                "service": "completion",
                "response": response.choices[0].text,
                "usage": response.usage
            }
            
        elif inputs.service == "embedding":
            response = await openai.Embedding.acreate(
                model="text-embedding-ada-002",
                input=inputs.text
            )
            
            return {
                "service": "embedding",
                "embedding": response.data[0].embedding,
                "usage": response.usage
            }

Stripe Integration

import stripe
from fastapps import BaseWidget

class StripeWidget(BaseWidget):
    identifier = "stripe"
    title = "Stripe Integration Widget"
    input_schema = StripeInput
    invoking = "Processing payment…"
    invoked = "Payment processed!"

    async def execute(self, inputs: StripeInput, ctx):
        api_key = ctx.settings.get("STRIPE_SECRET_KEY")
        if not api_key:
            raise ValueError("Stripe API key not configured")
        
        stripe.api_key = api_key
        
        if inputs.operation == "create_payment_intent":
            intent = stripe.PaymentIntent.create(
                amount=int(inputs.amount * 100),  # Convert to cents
                currency=inputs.currency,
                metadata={"widget_id": inputs.widget_id}
            )
            
            return {
                "operation": "create_payment_intent",
                "client_secret": intent.client_secret,
                "status": intent.status,
                "amount": inputs.amount,
                "currency": inputs.currency
            }
            
        elif inputs.operation == "retrieve_payment":
            payment = stripe.PaymentIntent.retrieve(inputs.payment_intent_id)
            
            return {
                "operation": "retrieve_payment",
                "payment": {
                    "id": payment.id,
                    "amount": payment.amount / 100,  # Convert from cents
                    "currency": payment.currency,
                    "status": payment.status
                }
            }

Webhook Integration

import hmac
import hashlib
from fastapps import BaseWidget

class WebhookWidget(BaseWidget):
    identifier = "webhook"
    title = "Webhook Widget"
    input_schema = WebhookInput
    invoking = "Processing webhook…"
    invoked = "Webhook processed!"

    async def execute(self, inputs: WebhookInput, ctx):
        # Verify webhook signature
        if not self.verify_webhook_signature(inputs.payload, inputs.signature, ctx):
            raise ValueError("Invalid webhook signature")
        
        # Process webhook payload
        if inputs.event_type == "payment.succeeded":
            return await self.handle_payment_succeeded(inputs.payload, ctx)
        elif inputs.event_type == "user.created":
            return await self.handle_user_created(inputs.payload, ctx)
        else:
            ctx.logger.warning(f"Unknown webhook event type: {inputs.event_type}")
            return {"status": "ignored", "event_type": inputs.event_type}
    
    def verify_webhook_signature(self, payload, signature, ctx):
        webhook_secret = ctx.settings.get("WEBHOOK_SECRET")
        if not webhook_secret:
            return True  # Skip verification if no secret configured
        
        expected_signature = hmac.new(
            webhook_secret.encode(),
            payload.encode(),
            hashlib.sha256
        ).hexdigest()
        
        return hmac.compare_digest(signature, expected_signature)
    
    async def handle_payment_succeeded(self, payload, ctx):
        # Process successful payment
        ctx.logger.info(f"Payment succeeded: {payload.get('id')}")
        return {"status": "processed", "event": "payment.succeeded"}
    
    async def handle_user_created(self, payload, ctx):
        # Process new user
        ctx.logger.info(f"User created: {payload.get('email')}")
        return {"status": "processed", "event": "user.created"}

Configuration Management

from fastapps import BaseWidget

class ConfigurableWidget(BaseWidget):
    identifier = "configurable"
    title = "Configurable Widget"
    input_schema = ConfigurableInput
    invoking = "Processing with configuration…"
    invoked = "Processing complete!"

    async def execute(self, inputs: ConfigurableInput, ctx):
        # Get configuration from settings
        config = {
            "api_timeout": ctx.settings.get("API_TIMEOUT", 30),
            "max_retries": ctx.settings.get("MAX_RETRIES", 3),
            "cache_ttl": ctx.settings.get("CACHE_TTL", 3600),
            "feature_flags": ctx.settings.get("FEATURE_FLAGS", {}),
            "api_endpoints": ctx.settings.get("API_ENDPOINTS", {})
        }
        
        # Use configuration in your logic
        if config["feature_flags"].get("new_api", False):
            result = await self.use_new_api(inputs, config)
        else:
            result = await self.use_old_api(inputs, config)
        
        return {
            "result": result,
            "config_used": config,
            "feature_flags": config["feature_flags"]
        }
    
    async def use_new_api(self, inputs, config):
        # Implementation using new API
        pass
    
    async def use_old_api(self, inputs, config):
        # Implementation using old API
        pass

Authenticated API Calls

When your widget requires authentication, you can access user context to make authenticated API calls:
from fastapps import BaseWidget, auth_required, UserContext
import aiohttp

@auth_required(scopes=["user"])
class AuthenticatedAPIWidget(BaseWidget):
    identifier = "authenticated_api"
    title = "Authenticated API Widget"
    input_schema = AuthenticatedAPIInput
    invoking = "Fetching user data…"
    invoked = "Data retrieved!"

    async def execute(self, inputs: AuthenticatedAPIInput, context, user: UserContext):
        # Access authenticated user information
        user_id = user.subject
        user_email = user.claims.get('email')
        
        # Make API call with user-specific data
        async with aiohttp.ClientSession() as session:
            headers = {
                "Authorization": f"Bearer {context.settings.get('API_KEY')}",
                "X-User-ID": user_id,
                "Content-Type": "application/json"
            }
            
            async with session.get(
                f"https://api.example.com/users/{user_id}/data",
                headers=headers
            ) as response:
                if response.status == 200:
                    data = await response.json()
                    return {
                        "user_id": user_id,
                        "email": user_email,
                        "data": data,
                        "personalized": True
                    }
                else:
                    raise Exception(f"API request failed: {response.status}")
Learn more about authentication in the Authentication guide.

Next Steps

I