"""Authentication Router - Google OAuth""" import os import secrets from datetime import datetime, timedelta from typing import Optional from fastapi import APIRouter, HTTPException, Request, Response, Depends from fastapi.responses import RedirectResponse from pydantic import BaseModel from authlib.integrations.starlette_client import OAuth from jose import jwt, JWTError import httpx router = APIRouter() # Configuration from environment GOOGLE_CLIENT_ID = os.getenv("GOOGLE_CLIENT_ID", "") GOOGLE_CLIENT_SECRET = os.getenv("GOOGLE_CLIENT_SECRET", "") JWT_SECRET = os.getenv("JWT_SECRET", os.getenv("SECRET_KEY", "change-me-in-production")) JWT_ALGORITHM = "HS256" JWT_EXPIRY_HOURS = 24 * 7 # 1 week # Frontend URL for redirects after auth FRONTEND_URL = os.getenv("FRONTEND_URL", "https://cockpit.valuecurve.co") # Backend URL for OAuth callback (defaults to FRONTEND_URL for production where they share domain) BACKEND_URL = os.getenv("BACKEND_URL", FRONTEND_URL) # Allowed emails (invite-only) - comma-separated in env var ALLOWED_EMAILS_STR = os.getenv("ALLOWED_EMAILS", "") ALLOWED_EMAILS = set(email.strip().lower() for email in ALLOWED_EMAILS_STR.split(",") if email.strip()) # OAuth setup oauth = OAuth() oauth.register( name='google', client_id=GOOGLE_CLIENT_ID, client_secret=GOOGLE_CLIENT_SECRET, server_metadata_url='https://accounts.google.com/.well-known/openid-configuration', client_kwargs={'scope': 'openid email profile'}, ) class UserInfo(BaseModel): email: str name: str picture: Optional[str] = None class TokenData(BaseModel): email: str name: str picture: Optional[str] = None exp: datetime def create_token(user: UserInfo) -> str: """Create JWT token for user""" expire = datetime.utcnow() + timedelta(hours=JWT_EXPIRY_HOURS) payload = { "email": user.email, "name": user.name, "picture": user.picture, "exp": expire } return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM) def verify_token(token: str) -> Optional[TokenData]: """Verify JWT token and return user data""" try: payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM]) return TokenData(**payload) except JWTError: return None def get_token_from_cookie(request: Request) -> Optional[str]: """Extract token from cookie""" return request.cookies.get("auth_token") async def get_current_user(request: Request) -> TokenData: """Dependency to get current authenticated user""" token = get_token_from_cookie(request) if not token: raise HTTPException(status_code=401, detail="Not authenticated") user = verify_token(token) if not user: raise HTTPException(status_code=401, detail="Invalid or expired token") return user def is_email_allowed(email: str) -> bool: """Check if email is in allowed list (or if list is empty, allow all)""" if not ALLOWED_EMAILS: # If no allowed list configured, allow anyone with valid OAuth return True return email.lower() in ALLOWED_EMAILS @router.get("/login/google") async def login_google(request: Request): """Initiate Google OAuth login""" if not GOOGLE_CLIENT_ID or not GOOGLE_CLIENT_SECRET: raise HTTPException(status_code=500, detail="Google OAuth not configured") # Callback goes to backend URL (same as frontend in production, different locally) redirect_uri = f"{BACKEND_URL}/auth/callback/google" return await oauth.google.authorize_redirect(request, redirect_uri) @router.get("/callback/google") async def callback_google(request: Request): """Handle Google OAuth callback""" try: token = await oauth.google.authorize_access_token(request) user_info = token.get('userinfo') if not user_info: # Fetch user info from Google async with httpx.AsyncClient() as client: resp = await client.get( 'https://www.googleapis.com/oauth2/v3/userinfo', headers={'Authorization': f'Bearer {token["access_token"]}'} ) user_info = resp.json() email = user_info.get('email', '').lower() name = user_info.get('name', email.split('@')[0]) picture = user_info.get('picture') # Check if email is allowed if not is_email_allowed(email): # Redirect to login with error return RedirectResponse( url=f"{FRONTEND_URL}/login?error=not_authorized", status_code=302 ) # Create JWT token user = UserInfo(email=email, name=name, picture=picture) jwt_token = create_token(user) # Set cookie and redirect to app # Use secure=False for localhost (HTTP), secure=True for production (HTTPS) is_secure = FRONTEND_URL.startswith("https://") response = RedirectResponse(url=FRONTEND_URL, status_code=302) response.set_cookie( key="auth_token", value=jwt_token, httponly=True, secure=is_secure, samesite="lax", max_age=JWT_EXPIRY_HOURS * 3600 ) return response except Exception as e: import traceback print(f"OAuth error: {e}") traceback.print_exc() return RedirectResponse( url=f"{FRONTEND_URL}/login?error=oauth_failed", status_code=302 ) @router.get("/me") async def get_me(user: TokenData = Depends(get_current_user)): """Get current user info""" return { "email": user.email, "name": user.name, "picture": user.picture } @router.post("/logout") async def logout(): """Logout user by clearing cookie""" response = Response(content='{"message": "Logged out"}', media_type="application/json") response.delete_cookie(key="auth_token") return response @router.get("/status") async def auth_status(request: Request): """Check authentication status (doesn't require auth)""" token = get_token_from_cookie(request) if not token: return {"authenticated": False} user = verify_token(token) if not user: return {"authenticated": False} return { "authenticated": True, "user": { "email": user.email, "name": user.name, "picture": user.picture } } # Admin endpoint to manage allowed emails (protected) @router.get("/allowed-emails") async def get_allowed_emails(user: TokenData = Depends(get_current_user)): """Get list of allowed emails (admin only)""" # For now, just return the list - could add admin check later return {"allowed_emails": list(ALLOWED_EMAILS), "allow_all": len(ALLOWED_EMAILS) == 0}