Skip to main content

Custom Token Verification

For advanced use cases, implement a custom TokenVerifier to add custom validation logic.

Basic Custom Verifier

from fastapps import WidgetMCPServer, TokenVerifier, AccessToken

class CustomVerifier(TokenVerifier):
    async def verify_token(self, token: str) -> AccessToken | None:
        try:
            # Custom validation logic
            payload = my_jwt_validation(token)
            
            # Custom authorization checks
            if not await self.check_user_status(payload["sub"]):
                return None
            
            return AccessToken(
                token=token,
                client_id=payload["azp"],
                subject=payload["sub"],
                scopes=payload.get("permissions", []),
                claims=payload,
            )
        except Exception:
            return None
    
    async def check_user_status(self, user_id: str) -> bool:
        # Check if user is active in your database
        return True

# Use custom verifier
server = WidgetMCPServer(
    name="my-widgets",
    widgets=tools,
    auth_issuer_url="https://tenant.auth0.com",
    auth_resource_server_url="https://example.com/mcp",
    token_verifier=CustomVerifier(),
)

Database Validation

Verify users against your database:
import asyncpg

class DatabaseVerifier(TokenVerifier):
    def __init__(self, issuer_url: str, db_url: str):
        self.issuer_url = issuer_url
        self.db_url = db_url
        # Initialize JWT verification
        from fastapps import JWTVerifier
        self.jwt_verifier = JWTVerifier(issuer_url=issuer_url)
    
    async def verify_token(self, token: str) -> AccessToken | None:
        # First, verify JWT signature
        access_token = await self.jwt_verifier.verify_token(token)
        if not access_token:
            return None
        
        # Then check user in database
        conn = await asyncpg.connect(self.db_url)
        try:
            user = await conn.fetchrow(
                "SELECT active, banned FROM users WHERE id = $1",
                access_token.subject
            )
            
            if not user or not user['active'] or user['banned']:
                return None
            
            return access_token
        finally:
            await conn.close()

Rate Limiting

Add rate limiting to token verification:
from collections import defaultdict
from datetime import datetime, timedelta

class RateLimitedVerifier(TokenVerifier):
    def __init__(self, issuer_url: str, max_requests: int = 100):
        self.issuer_url = issuer_url
        self.max_requests = max_requests
        self.request_counts = defaultdict(list)
        
        from fastapps import JWTVerifier
        self.jwt_verifier = JWTVerifier(issuer_url=issuer_url)
    
    async def verify_token(self, token: str) -> AccessToken | None:
        # Verify JWT
        access_token = await self.jwt_verifier.verify_token(token)
        if not access_token:
            return None
        
        # Check rate limit
        user_id = access_token.subject
        now = datetime.utcnow()
        
        # Remove old requests (older than 1 hour)
        cutoff = now - timedelta(hours=1)
        self.request_counts[user_id] = [
            req_time for req_time in self.request_counts[user_id]
            if req_time > cutoff
        ]
        
        # Check if under rate limit
        if len(self.request_counts[user_id]) >= self.max_requests:
            return None  # Rate limit exceeded
        
        # Record this request
        self.request_counts[user_id].append(now)
        
        return access_token

Logging and Monitoring

Add comprehensive logging:
import logging

class MonitoredVerifier(TokenVerifier):
    def __init__(self, issuer_url: str):
        self.issuer_url = issuer_url
        self.logger = logging.getLogger(__name__)
        
        from fastapps import JWTVerifier
        self.jwt_verifier = JWTVerifier(issuer_url=issuer_url)
    
    async def verify_token(self, token: str) -> AccessToken | None:
        try:
            access_token = await self.jwt_verifier.verify_token(token)
            
            if access_token:
                self.logger.info(
                    f"Auth success: user={access_token.subject}, "
                    f"scopes={access_token.scopes}"
                )
            else:
                self.logger.warning(
                    f"Auth failed: token={token[:20]}..."
                )
            
            return access_token
            
        except Exception as e:
            self.logger.error(f"Auth error: {str(e)}")
            return None

Security Best Practices

1. Always Use HTTPS in Production

# ✅ Good
auth_resource_server_url="https://yourdomain.com/mcp"

# ❌ Bad
auth_resource_server_url="http://yourdomain.com/mcp"

2. Use Environment Variables

Never hardcode credentials:
import os

# ✅ Good
server = WidgetMCPServer(
    name="my-widgets",
    widgets=tools,
    auth_issuer_url=os.getenv("AUTH_ISSUER_URL"),
    auth_resource_server_url=os.getenv("AUTH_RESOURCE_SERVER_URL"),
    auth_audience=os.getenv("AUTH_AUDIENCE"),
)

# ❌ Bad
server = WidgetMCPServer(
    name="my-widgets",
    widgets=tools,
    auth_issuer_url="https://my-tenant.auth0.com",  # Hardcoded
)

3. Require Specific Scopes

# ✅ Good: Specific scopes for different operations
@auth_required(scopes=["user", "write:documents"])
class CreateDocumentWidget(BaseWidget):
    pass

# ⚠️ Less ideal: Generic scope
@auth_required(scopes=["user"])
class CreateDocumentWidget(BaseWidget):
    pass

4. Short-Lived Tokens

Configure your OAuth provider to issue short-lived access tokens:
  • Recommended: 15 minutes to 1 hour
  • Configure token lifetime in your OAuth provider settings

5. Validate Audience

Always specify audience in production:
server = WidgetMCPServer(
    name="my-widgets",
    widgets=tools,
    auth_issuer_url=os.getenv("AUTH_ISSUER_URL"),
    auth_resource_server_url=os.getenv("AUTH_RESOURCE_SERVER_URL"),
    auth_audience=os.getenv("AUTH_AUDIENCE"),  # Prevents token reuse
)

6. Validate User Input

Never trust user-supplied IDs:
@auth_required
class UserDataWidget(BaseWidget):
    async def execute(self, input_data, context, user: UserContext):
        # ✅ Good: Use authenticated user ID
        user_data = await fetch_data(user.subject)
        
        # ❌ Bad: Trust user-supplied ID
        # user_data = await fetch_data(input_data.user_id)
        
        return {"data": user_data}

7. Double-Check Critical Operations

@auth_required(scopes=["admin"])
class DeleteWidget(BaseWidget):
    async def execute(self, input_data, context, user: UserContext):
        # Double-check scope for critical operations
        if not user.has_scope("admin"):
            return {"error": "Unauthorized"}
        
        # Proceed with deletion
        await delete_resource(input_data.resource_id)
        return {"success": True}

8. Log Authentication Events

import logging

@auth_required
class SensitiveWidget(BaseWidget):
    async def execute(self, input_data, context, user: UserContext):
        # Log access for audit trail
        logging.info(
            f"Sensitive data accessed: user={user.subject}, "
            f"action={input_data.action}, "
            f"timestamp={datetime.utcnow()}"
        )
        
        return {"data": "..."}

9. Handle Token Expiration

Tokens will expire - handle gracefully:
async def execute(self, input_data, context, user: UserContext):
    if not user.is_authenticated:
        return {
            "error": "Authentication required",
            "message": "Please sign in again"
        }
    
    # Proceed with authenticated logic

10. Implement CORS Properly

from starlette.middleware.cors import CORSMiddleware

app = server.get_app()
app.add_middleware(
    CORSMiddleware,
    allow_origins=["https://chatgpt.com"],  # Specific origins
    allow_methods=["POST", "GET"],          # Specific methods
    allow_headers=["Authorization"],        # Specific headers
    allow_credentials=True,
)

Advanced Patterns

Multi-Tenant Authentication

Support multiple tenants with different scopes:
@auth_required(scopes=["user"])
class MultiTenantWidget(BaseWidget):
    async def execute(self, input_data, context, user: UserContext):
        # Get tenant from custom claim
        tenant_id = user.claims.get('https://example.com/tenant_id')
        
        if not tenant_id:
            return {"error": "No tenant associated with user"}
        
        # Fetch tenant-specific data
        data = await fetch_tenant_data(tenant_id, user.subject)
        
        return {
            "tenant_id": tenant_id,
            "data": data
        }

Hierarchical Permissions

Implement permission hierarchies:
@auth_required
class HierarchicalWidget(BaseWidget):
    PERMISSION_HIERARCHY = {
        "super_admin": ["admin", "manager", "user"],
        "admin": ["manager", "user"],
        "manager": ["user"],
        "user": []
    }
    
    async def execute(self, input_data, context, user: UserContext):
        role = user.claims.get('role', 'user')
        
        # Get all permissions for this role
        permissions = self.PERMISSION_HIERARCHY.get(role, [])
        
        return {
            "role": role,
            "permissions": permissions,
            "access_level": len(permissions)
        }

Conditional Scope Requirements

Require different scopes based on operation:
@auth_required(scopes=["user"])
class ConditionalWidget(BaseWidget):
    async def execute(self, input_data, context, user: UserContext):
        operation = input_data.operation
        
        # Check operation-specific scopes
        if operation == "read":
            if not user.has_scope("read:data"):
                return {"error": "Missing read:data scope"}
            return await self.read_data(user)
        
        elif operation == "write":
            if not user.has_scope("write:data"):
                return {"error": "Missing write:data scope"}
            return await self.write_data(user, input_data)
        
        elif operation == "delete":
            if not user.has_scope("delete:data"):
                return {"error": "Missing delete:data scope"}
            return await self.delete_data(user, input_data)

Using JWTVerifier Directly

You can use JWTVerifier independently:
from fastapps import JWTVerifier

# Create verifier
verifier = JWTVerifier(
    issuer_url="https://tenant.auth0.com",
    audience="https://api.example.com",
    required_scopes=["user", "read:data"]
)

# Verify a token
access_token = await verifier.verify_token(jwt_token)

if access_token:
    print(f"User: {access_token.subject}")
    print(f"Scopes: {access_token.scopes}")
    print(f"Email: {access_token.claims.get('email')}")
else:
    print("Token verification failed")

Next Steps

I