Custom Token Verification
For advanced use cases, implement a customTokenVerifier
to add custom validation logic.
Basic Custom Verifier
Copy
Ask AI
from fastapps import WidgetMCPServer, TokenVerifier, AccessToken
class CustomVerifier(TokenVerifier):
async def verify_token(self, token: str) -> AccessToken | None:
try:
# Custom validation logic
payload = my_jwt_validation(token)
# Custom authorization checks
if not await self.check_user_status(payload["sub"]):
return None
return AccessToken(
token=token,
client_id=payload["azp"],
subject=payload["sub"],
scopes=payload.get("permissions", []),
claims=payload,
)
except Exception:
return None
async def check_user_status(self, user_id: str) -> bool:
# Check if user is active in your database
return True
# Use custom verifier
server = WidgetMCPServer(
name="my-widgets",
widgets=tools,
auth_issuer_url="https://tenant.auth0.com",
auth_resource_server_url="https://example.com/mcp",
token_verifier=CustomVerifier(),
)
Database Validation
Verify users against your database:Copy
Ask AI
import asyncpg
class DatabaseVerifier(TokenVerifier):
def __init__(self, issuer_url: str, db_url: str):
self.issuer_url = issuer_url
self.db_url = db_url
# Initialize JWT verification
from fastapps import JWTVerifier
self.jwt_verifier = JWTVerifier(issuer_url=issuer_url)
async def verify_token(self, token: str) -> AccessToken | None:
# First, verify JWT signature
access_token = await self.jwt_verifier.verify_token(token)
if not access_token:
return None
# Then check user in database
conn = await asyncpg.connect(self.db_url)
try:
user = await conn.fetchrow(
"SELECT active, banned FROM users WHERE id = $1",
access_token.subject
)
if not user or not user['active'] or user['banned']:
return None
return access_token
finally:
await conn.close()
Rate Limiting
Add rate limiting to token verification:Copy
Ask AI
from collections import defaultdict
from datetime import datetime, timedelta
class RateLimitedVerifier(TokenVerifier):
def __init__(self, issuer_url: str, max_requests: int = 100):
self.issuer_url = issuer_url
self.max_requests = max_requests
self.request_counts = defaultdict(list)
from fastapps import JWTVerifier
self.jwt_verifier = JWTVerifier(issuer_url=issuer_url)
async def verify_token(self, token: str) -> AccessToken | None:
# Verify JWT
access_token = await self.jwt_verifier.verify_token(token)
if not access_token:
return None
# Check rate limit
user_id = access_token.subject
now = datetime.utcnow()
# Remove old requests (older than 1 hour)
cutoff = now - timedelta(hours=1)
self.request_counts[user_id] = [
req_time for req_time in self.request_counts[user_id]
if req_time > cutoff
]
# Check if under rate limit
if len(self.request_counts[user_id]) >= self.max_requests:
return None # Rate limit exceeded
# Record this request
self.request_counts[user_id].append(now)
return access_token
Logging and Monitoring
Add comprehensive logging:Copy
Ask AI
import logging
class MonitoredVerifier(TokenVerifier):
def __init__(self, issuer_url: str):
self.issuer_url = issuer_url
self.logger = logging.getLogger(__name__)
from fastapps import JWTVerifier
self.jwt_verifier = JWTVerifier(issuer_url=issuer_url)
async def verify_token(self, token: str) -> AccessToken | None:
try:
access_token = await self.jwt_verifier.verify_token(token)
if access_token:
self.logger.info(
f"Auth success: user={access_token.subject}, "
f"scopes={access_token.scopes}"
)
else:
self.logger.warning(
f"Auth failed: token={token[:20]}..."
)
return access_token
except Exception as e:
self.logger.error(f"Auth error: {str(e)}")
return None
Security Best Practices
1. Always Use HTTPS in Production
Copy
Ask AI
# ✅ Good
auth_resource_server_url="https://yourdomain.com/mcp"
# ❌ Bad
auth_resource_server_url="http://yourdomain.com/mcp"
2. Use Environment Variables
Never hardcode credentials:Copy
Ask AI
import os
# ✅ Good
server = WidgetMCPServer(
name="my-widgets",
widgets=tools,
auth_issuer_url=os.getenv("AUTH_ISSUER_URL"),
auth_resource_server_url=os.getenv("AUTH_RESOURCE_SERVER_URL"),
auth_audience=os.getenv("AUTH_AUDIENCE"),
)
# ❌ Bad
server = WidgetMCPServer(
name="my-widgets",
widgets=tools,
auth_issuer_url="https://my-tenant.auth0.com", # Hardcoded
)
3. Require Specific Scopes
Copy
Ask AI
# ✅ Good: Specific scopes for different operations
@auth_required(scopes=["user", "write:documents"])
class CreateDocumentWidget(BaseWidget):
pass
# ⚠️ Less ideal: Generic scope
@auth_required(scopes=["user"])
class CreateDocumentWidget(BaseWidget):
pass
4. Short-Lived Tokens
Configure your OAuth provider to issue short-lived access tokens:- Recommended: 15 minutes to 1 hour
- Configure token lifetime in your OAuth provider settings
5. Validate Audience
Always specify audience in production:Copy
Ask AI
server = WidgetMCPServer(
name="my-widgets",
widgets=tools,
auth_issuer_url=os.getenv("AUTH_ISSUER_URL"),
auth_resource_server_url=os.getenv("AUTH_RESOURCE_SERVER_URL"),
auth_audience=os.getenv("AUTH_AUDIENCE"), # Prevents token reuse
)
6. Validate User Input
Never trust user-supplied IDs:Copy
Ask AI
@auth_required
class UserDataWidget(BaseWidget):
async def execute(self, input_data, context, user: UserContext):
# ✅ Good: Use authenticated user ID
user_data = await fetch_data(user.subject)
# ❌ Bad: Trust user-supplied ID
# user_data = await fetch_data(input_data.user_id)
return {"data": user_data}
7. Double-Check Critical Operations
Copy
Ask AI
@auth_required(scopes=["admin"])
class DeleteWidget(BaseWidget):
async def execute(self, input_data, context, user: UserContext):
# Double-check scope for critical operations
if not user.has_scope("admin"):
return {"error": "Unauthorized"}
# Proceed with deletion
await delete_resource(input_data.resource_id)
return {"success": True}
8. Log Authentication Events
Copy
Ask AI
import logging
@auth_required
class SensitiveWidget(BaseWidget):
async def execute(self, input_data, context, user: UserContext):
# Log access for audit trail
logging.info(
f"Sensitive data accessed: user={user.subject}, "
f"action={input_data.action}, "
f"timestamp={datetime.utcnow()}"
)
return {"data": "..."}
9. Handle Token Expiration
Tokens will expire - handle gracefully:Copy
Ask AI
async def execute(self, input_data, context, user: UserContext):
if not user.is_authenticated:
return {
"error": "Authentication required",
"message": "Please sign in again"
}
# Proceed with authenticated logic
10. Implement CORS Properly
Copy
Ask AI
from starlette.middleware.cors import CORSMiddleware
app = server.get_app()
app.add_middleware(
CORSMiddleware,
allow_origins=["https://chatgpt.com"], # Specific origins
allow_methods=["POST", "GET"], # Specific methods
allow_headers=["Authorization"], # Specific headers
allow_credentials=True,
)
Advanced Patterns
Multi-Tenant Authentication
Support multiple tenants with different scopes:Copy
Ask AI
@auth_required(scopes=["user"])
class MultiTenantWidget(BaseWidget):
async def execute(self, input_data, context, user: UserContext):
# Get tenant from custom claim
tenant_id = user.claims.get('https://example.com/tenant_id')
if not tenant_id:
return {"error": "No tenant associated with user"}
# Fetch tenant-specific data
data = await fetch_tenant_data(tenant_id, user.subject)
return {
"tenant_id": tenant_id,
"data": data
}
Hierarchical Permissions
Implement permission hierarchies:Copy
Ask AI
@auth_required
class HierarchicalWidget(BaseWidget):
PERMISSION_HIERARCHY = {
"super_admin": ["admin", "manager", "user"],
"admin": ["manager", "user"],
"manager": ["user"],
"user": []
}
async def execute(self, input_data, context, user: UserContext):
role = user.claims.get('role', 'user')
# Get all permissions for this role
permissions = self.PERMISSION_HIERARCHY.get(role, [])
return {
"role": role,
"permissions": permissions,
"access_level": len(permissions)
}
Conditional Scope Requirements
Require different scopes based on operation:Copy
Ask AI
@auth_required(scopes=["user"])
class ConditionalWidget(BaseWidget):
async def execute(self, input_data, context, user: UserContext):
operation = input_data.operation
# Check operation-specific scopes
if operation == "read":
if not user.has_scope("read:data"):
return {"error": "Missing read:data scope"}
return await self.read_data(user)
elif operation == "write":
if not user.has_scope("write:data"):
return {"error": "Missing write:data scope"}
return await self.write_data(user, input_data)
elif operation == "delete":
if not user.has_scope("delete:data"):
return {"error": "Missing delete:data scope"}
return await self.delete_data(user, input_data)
Using JWTVerifier Directly
You can useJWTVerifier
independently:
Copy
Ask AI
from fastapps import JWTVerifier
# Create verifier
verifier = JWTVerifier(
issuer_url="https://tenant.auth0.com",
audience="https://api.example.com",
required_scopes=["user", "read:data"]
)
# Verify a token
access_token = await verifier.verify_token(jwt_token)
if access_token:
print(f"User: {access_token.subject}")
print(f"Scopes: {access_token.scopes}")
print(f"Email: {access_token.claims.get('email')}")
else:
print("Token verification failed")
Next Steps
- Examples - See real-world implementations
- Troubleshooting - Debug authentication issues
- User Context - Access user information