HTTP API Integration
REST API Calls
Copy
Ask AI
import aiohttp
import asyncio
from fastapps import BaseWidget
class APIIntegrationWidget(BaseWidget):
identifier = "api_integration"
title = "API Integration Widget"
input_schema = APIIntegrationInput
invoking = "Fetching data from API…"
invoked = "Data retrieved!"
async def execute(self, inputs: APIIntegrationInput, ctx):
api_key = ctx.settings.get("EXTERNAL_API_KEY")
if not api_key:
raise ValueError("API key not configured")
async with aiohttp.ClientSession() as session:
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
# GET request
async with session.get(
f"https://api.example.com/data/{inputs.resource_id}",
headers=headers
) as response:
if response.status == 200:
data = await response.json()
return {
"status": "success",
"data": data,
"source": "external_api"
}
else:
raise Exception(f"API request failed: {response.status}")
class MultipleAPIsWidget(BaseWidget):
identifier = "multiple_apis"
title = "Multiple APIs Widget"
input_schema = MultipleAPIsInput
invoking = "Fetching from multiple sources…"
invoked = "All data retrieved!"
async def execute(self, inputs: MultipleAPIsInput, ctx):
# Fetch from multiple APIs concurrently
tasks = [
self.fetch_weather_api(inputs.city),
self.fetch_news_api(inputs.city),
self.fetch_maps_api(inputs.city)
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return {
"city": inputs.city,
"weather": results[0] if not isinstance(results[0], Exception) else None,
"news": results[1] if not isinstance(results[1], Exception) else None,
"maps": results[2] if not isinstance(results[2], Exception) else None,
"errors": [str(r) for r in results if isinstance(r, Exception)]
}
async def fetch_weather_api(self, city):
async with aiohttp.ClientSession() as session:
async with session.get(f"https://api.weather.com/v1/current?city={city}") as response:
return await response.json()
async def fetch_news_api(self, city):
async with aiohttp.ClientSession() as session:
async with session.get(f"https://api.news.com/v1/articles?location={city}") as response:
return await response.json()
async def fetch_maps_api(self, city):
async with aiohttp.ClientSession() as session:
async with session.get(f"https://api.maps.com/v1/geocode?address={city}") as response:
return await response.json()
GraphQL Integration
Copy
Ask AI
import aiohttp
from fastapps import BaseWidget
class GraphQLWidget(BaseWidget):
identifier = "graphql"
title = "GraphQL Widget"
input_schema = GraphQLInput
invoking = "Querying GraphQL API…"
invoked = "Query executed!"
async def execute(self, inputs: GraphQLInput, ctx):
query = """
query GetUserData($userId: ID!) {
user(id: $userId) {
id
name
email
posts {
id
title
content
createdAt
}
}
}
"""
variables = {"userId": inputs.user_id}
payload = {
"query": query,
"variables": variables
}
async with aiohttp.ClientSession() as session:
async with session.post(
"https://api.example.com/graphql",
json=payload,
headers={"Content-Type": "application/json"}
) as response:
result = await response.json()
if "errors" in result:
raise Exception(f"GraphQL errors: {result['errors']}")
return {
"user": result["data"]["user"],
"query": query,
"variables": variables
}
Database Integration
PostgreSQL with AsyncPG
Copy
Ask AI
import asyncpg
from fastapps import BaseWidget
class DatabaseWidget(BaseWidget):
identifier = "database"
title = "Database Widget"
input_schema = DatabaseInput
invoking = "Querying database…"
invoked = "Query executed!"
async def execute(self, inputs: DatabaseInput, ctx):
db_url = ctx.settings.get("DATABASE_URL")
if not db_url:
raise ValueError("Database URL not configured")
conn = await asyncpg.connect(db_url)
try:
if inputs.query_type == "users":
rows = await conn.fetch("""
SELECT id, name, email, created_at
FROM users
WHERE active = $1
ORDER BY created_at DESC
LIMIT $2
""", True, inputs.limit)
users = [dict(row) for row in rows]
return {"users": users, "count": len(users)}
elif inputs.query_type == "analytics":
rows = await conn.fetch("""
SELECT
DATE(created_at) as date,
COUNT(*) as count,
SUM(amount) as total_amount
FROM transactions
WHERE created_at >= $1 AND created_at <= $2
GROUP BY DATE(created_at)
ORDER BY date
""", inputs.start_date, inputs.end_date)
analytics = [dict(row) for row in rows]
return {"analytics": analytics, "period": f"{inputs.start_date} to {inputs.end_date}"}
finally:
await conn.close()
MongoDB with Motor
Copy
Ask AI
from motor.motor_asyncio import AsyncIOMotorClient
from fastapps import BaseWidget
class MongoWidget(BaseWidget):
identifier = "mongodb"
title = "MongoDB Widget"
input_schema = MongoInput
invoking = "Querying MongoDB…"
invoked = "Query executed!"
async def execute(self, inputs: MongoInput, ctx):
mongo_url = ctx.settings.get("MONGODB_URL")
if not mongo_url:
raise ValueError("MongoDB URL not configured")
client = AsyncIOMotorClient(mongo_url)
db = client[inputs.database]
collection = db[inputs.collection]
try:
if inputs.query_type == "find":
cursor = collection.find(inputs.filter or {})
if inputs.limit:
cursor = cursor.limit(inputs.limit)
documents = await cursor.to_list(length=inputs.limit)
return {"documents": documents, "count": len(documents)}
elif inputs.query_type == "aggregate":
pipeline = inputs.pipeline
cursor = collection.aggregate(pipeline)
results = await cursor.to_list(length=None)
return {"results": results, "count": len(results)}
finally:
client.close()
Third-Party Service Integration
OpenAI API
Copy
Ask AI
import openai
from fastapps import BaseWidget
class OpenAIIntegrationWidget(BaseWidget):
identifier = "openai_integration"
title = "OpenAI Integration Widget"
input_schema = OpenAIIntegrationInput
invoking = "Processing with OpenAI…"
invoked = "OpenAI processing complete!"
async def execute(self, inputs: OpenAIIntegrationInput, ctx):
api_key = ctx.settings.get("OPENAI_API_KEY")
if not api_key:
raise ValueError("OpenAI API key not configured")
openai.api_key = api_key
if inputs.service == "completion":
response = await openai.Completion.acreate(
model=inputs.model or "text-davinci-003",
prompt=inputs.prompt,
max_tokens=inputs.max_tokens or 100,
temperature=inputs.temperature or 0.7
)
return {
"service": "completion",
"response": response.choices[0].text,
"usage": response.usage
}
elif inputs.service == "embedding":
response = await openai.Embedding.acreate(
model="text-embedding-ada-002",
input=inputs.text
)
return {
"service": "embedding",
"embedding": response.data[0].embedding,
"usage": response.usage
}
Stripe Integration
Copy
Ask AI
import stripe
from fastapps import BaseWidget
class StripeWidget(BaseWidget):
identifier = "stripe"
title = "Stripe Integration Widget"
input_schema = StripeInput
invoking = "Processing payment…"
invoked = "Payment processed!"
async def execute(self, inputs: StripeInput, ctx):
api_key = ctx.settings.get("STRIPE_SECRET_KEY")
if not api_key:
raise ValueError("Stripe API key not configured")
stripe.api_key = api_key
if inputs.operation == "create_payment_intent":
intent = stripe.PaymentIntent.create(
amount=int(inputs.amount * 100), # Convert to cents
currency=inputs.currency,
metadata={"widget_id": inputs.widget_id}
)
return {
"operation": "create_payment_intent",
"client_secret": intent.client_secret,
"status": intent.status,
"amount": inputs.amount,
"currency": inputs.currency
}
elif inputs.operation == "retrieve_payment":
payment = stripe.PaymentIntent.retrieve(inputs.payment_intent_id)
return {
"operation": "retrieve_payment",
"payment": {
"id": payment.id,
"amount": payment.amount / 100, # Convert from cents
"currency": payment.currency,
"status": payment.status
}
}
Webhook Integration
Copy
Ask AI
import hmac
import hashlib
from fastapps import BaseWidget
class WebhookWidget(BaseWidget):
identifier = "webhook"
title = "Webhook Widget"
input_schema = WebhookInput
invoking = "Processing webhook…"
invoked = "Webhook processed!"
async def execute(self, inputs: WebhookInput, ctx):
# Verify webhook signature
if not self.verify_webhook_signature(inputs.payload, inputs.signature, ctx):
raise ValueError("Invalid webhook signature")
# Process webhook payload
if inputs.event_type == "payment.succeeded":
return await self.handle_payment_succeeded(inputs.payload, ctx)
elif inputs.event_type == "user.created":
return await self.handle_user_created(inputs.payload, ctx)
else:
ctx.logger.warning(f"Unknown webhook event type: {inputs.event_type}")
return {"status": "ignored", "event_type": inputs.event_type}
def verify_webhook_signature(self, payload, signature, ctx):
webhook_secret = ctx.settings.get("WEBHOOK_SECRET")
if not webhook_secret:
return True # Skip verification if no secret configured
expected_signature = hmac.new(
webhook_secret.encode(),
payload.encode(),
hashlib.sha256
).hexdigest()
return hmac.compare_digest(signature, expected_signature)
async def handle_payment_succeeded(self, payload, ctx):
# Process successful payment
ctx.logger.info(f"Payment succeeded: {payload.get('id')}")
return {"status": "processed", "event": "payment.succeeded"}
async def handle_user_created(self, payload, ctx):
# Process new user
ctx.logger.info(f"User created: {payload.get('email')}")
return {"status": "processed", "event": "user.created"}
Configuration Management
Copy
Ask AI
from fastapps import BaseWidget
class ConfigurableWidget(BaseWidget):
identifier = "configurable"
title = "Configurable Widget"
input_schema = ConfigurableInput
invoking = "Processing with configuration…"
invoked = "Processing complete!"
async def execute(self, inputs: ConfigurableInput, ctx):
# Get configuration from settings
config = {
"api_timeout": ctx.settings.get("API_TIMEOUT", 30),
"max_retries": ctx.settings.get("MAX_RETRIES", 3),
"cache_ttl": ctx.settings.get("CACHE_TTL", 3600),
"feature_flags": ctx.settings.get("FEATURE_FLAGS", {}),
"api_endpoints": ctx.settings.get("API_ENDPOINTS", {})
}
# Use configuration in your logic
if config["feature_flags"].get("new_api", False):
result = await self.use_new_api(inputs, config)
else:
result = await self.use_old_api(inputs, config)
return {
"result": result,
"config_used": config,
"feature_flags": config["feature_flags"]
}
async def use_new_api(self, inputs, config):
# Implementation using new API
pass
async def use_old_api(self, inputs, config):
# Implementation using old API
pass
Authenticated API Calls
When your widget requires authentication, you can access user context to make authenticated API calls:Copy
Ask AI
from fastapps import BaseWidget, auth_required, UserContext
import aiohttp
@auth_required(scopes=["user"])
class AuthenticatedAPIWidget(BaseWidget):
identifier = "authenticated_api"
title = "Authenticated API Widget"
input_schema = AuthenticatedAPIInput
invoking = "Fetching user data…"
invoked = "Data retrieved!"
async def execute(self, inputs: AuthenticatedAPIInput, context, user: UserContext):
# Access authenticated user information
user_id = user.subject
user_email = user.claims.get('email')
# Make API call with user-specific data
async with aiohttp.ClientSession() as session:
headers = {
"Authorization": f"Bearer {context.settings.get('API_KEY')}",
"X-User-ID": user_id,
"Content-Type": "application/json"
}
async with session.get(
f"https://api.example.com/users/{user_id}/data",
headers=headers
) as response:
if response.status == 200:
data = await response.json()
return {
"user_id": user_id,
"email": user_email,
"data": data,
"personalized": True
}
else:
raise Exception(f"API request failed: {response.status}")
Next Steps
- Authentication - Add OAuth authentication to your widgets
- Back to Advanced Patterns
- Back to Tool Basics
- Back to Server Overview