Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions .checkov.baseline
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,43 @@
"CKV_AWS_120"
]
},
{
"resource": "AWS::ApiGateway::Method.dataalldevapiauthtokenexchangePOSTD92E005E",
"check_ids": [
"CKV_AWS_59"
]
},
{
"resource": "AWS::ApiGateway::Method.dataalldevapiauthlogoutPOST5A8B3C2D",
"check_ids": [
"CKV_AWS_59"
]
},
{
"resource": "AWS::ApiGateway::Method.dataalldevapiauthlogoutPOST89141B56",
"check_ids": [
"CKV_AWS_59"
]
},
{
"resource": "AWS::ApiGateway::Method.dataalldevapiauthuserinfoGET9388EE8D",
"check_ids": [
"CKV_AWS_59"
]
},
{
"resource": "AWS::Lambda::Function.AuthHandler9DC767B7",
"check_ids": [
"CKV_AWS_115",
"CKV_AWS_116"
]
},
{
"resource": "AWS::Logs::LogGroup.authhandlerloggroup",
"check_ids": [
"CKV_AWS_158"
]
},
{
"resource": "AWS::Lambda::Function.AWSWorkerAA1523CA",
"check_ids": [
Expand Down
244 changes: 244 additions & 0 deletions backend/auth_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
import json
import logging
import os
import urllib.request
import urllib.parse
import base64
import binascii
from http.cookies import SimpleCookie

logger = logging.getLogger(__name__)
logger.setLevel(os.environ.get('LOG_LEVEL', 'INFO'))


def handler(event, context):
"""Main Lambda handler - routes requests to appropriate function"""
path = event.get('path', '')
method = event.get('httpMethod', '')

if path == '/auth/token-exchange' and method == 'POST':
return token_exchange_handler(event)
elif path == '/auth/logout' and method == 'POST':
return logout_handler(event)
elif path == '/auth/userinfo' and method == 'GET':
return userinfo_handler(event)
else:
return error_response(
404, 'Auth endpoint not found. Valid routes: /auth/token-exchange, /auth/logout, /auth/userinfo', event
)


def error_response(status_code, message, event=None):
"""Return error response with CORS headers"""
response = {
'statusCode': status_code,
'headers': get_cors_headers(event) if event else {'Content-Type': 'application/json'},
'body': json.dumps({'error': message}),
}
return response


def get_cors_headers(event):
"""Get CORS headers for response"""
cloudfront_url = os.environ.get('CLOUDFRONT_URL', '')
if not cloudfront_url:
logger.debug('CLOUDFRONT_URL not set - authentication endpoints will reject cross-origin requests')

return {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': cloudfront_url,
'Access-Control-Allow-Credentials': 'true',
'Access-Control-Allow-Methods': 'GET, POST, OPTIONS',
'Access-Control-Allow-Headers': 'Content-Type',
}


def token_exchange_handler(event):
"""Exchange authorization code for tokens and set httpOnly cookies"""
try:
body = json.loads(event.get('body', '{}'))
code = body.get('code')
code_verifier = body.get('code_verifier')

if not code or not code_verifier:
return error_response(400, 'Missing code or code_verifier', event)

okta_url = os.environ.get('CUSTOM_AUTH_URL', '')
client_id = os.environ.get('CUSTOM_AUTH_CLIENT_ID', '')
redirect_uri = os.environ.get('CUSTOM_AUTH_REDIRECT_URL', '')

if not okta_url or not client_id:
return error_response(500, 'Missing Okta configuration', event)

# Call Okta token endpoint
token_url = f'{okta_url}/v1/token'
token_data = {
'grant_type': 'authorization_code',
'code': code,
'code_verifier': code_verifier,
'client_id': client_id,
'redirect_uri': redirect_uri,
}

data = urllib.parse.urlencode(token_data).encode('utf-8')
req = urllib.request.Request(
token_url,
data=data,
headers={'Content-Type': 'application/x-www-form-urlencoded'},
)

try:
# nosemgrep: python.lang.security.audit.dynamic-urllib-use-detected.dynamic-urllib-use-detected
with urllib.request.urlopen(req, timeout=10) as response:
tokens = json.loads(response.read().decode('utf-8'))
except urllib.error.HTTPError as e:
error_body = e.read().decode('utf-8')
logger.error(f'Token exchange failed: {error_body}')
return error_response(401, 'Authentication failed. Please try again.', event)

cookies = build_cookies(tokens)

return {
'statusCode': 200,
'headers': get_cors_headers(event),
'multiValueHeaders': {'Set-Cookie': cookies},
'body': json.dumps({'success': True}),
}

except Exception as e:
logger.error(f'Token exchange error: {str(e)}')
return error_response(500, 'Internal server error', event)


def build_cookies(tokens):
"""Build httpOnly cookies for tokens"""
cookies = []
secure = True
httponly = True
samesite = 'Lax'
max_age = 3600 # 1 hour

for token_name in ['access_token', 'id_token']:
if tokens.get(token_name):
cookie = SimpleCookie()
cookie[token_name] = tokens[token_name]
cookie[token_name]['path'] = '/'
cookie[token_name]['secure'] = secure
cookie[token_name]['httponly'] = httponly
cookie[token_name]['samesite'] = samesite
cookie[token_name]['max-age'] = max_age
cookies.append(cookie[token_name].OutputString())

return cookies


def logout_handler(event):
"""Clear all auth cookies and return Okta logout URL"""
# Get id_token from cookie for Okta logout
cookie_header = event.get('headers', {}).get('Cookie') or event.get('headers', {}).get('cookie', '')
cookies_in = SimpleCookie()
cookies_in.load(cookie_header)

id_token = None
id_token_cookie = cookies_in.get('id_token')
if id_token_cookie:
id_token = id_token_cookie.value

# Clear all auth cookies
cookies = []
for cookie_name in ['access_token', 'id_token', 'refresh_token']:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also logout from okta. Here we are deleting those cookies but that doesn't mean we will be logging out of Okta. Should we make a call to the okta endpoint to let Okta know that we want to logout ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good point! I'll implement Okta logout by redirecting to Okta's /v1/logout endpoint from the frontend after clearing cookies. This fully ends the Okta session so the user must re-authenticate on next login.

The flow will be:

Frontend calls /auth/logout to clear cookies
Frontend redirects to {okta_url}/v1/logout?id_token_hint=...&post_logout_redirect_uri=...
This is handled on the frontend side since it requires a browser redirect.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super nit: can you comment this flow as a comment here so that anyone looking at this will have clear reference

cookie = SimpleCookie()
cookie[cookie_name] = ''
cookie[cookie_name]['path'] = '/'
cookie[cookie_name]['max-age'] = 0
cookies.append(cookie[cookie_name].OutputString())

# Build Okta logout URL if we have the id_token
logout_url = None
okta_url = os.environ.get('CUSTOM_AUTH_URL', '')
post_logout_uri = os.environ.get('CLOUDFRONT_URL', '')

if id_token and okta_url and post_logout_uri:
logout_params = urllib.parse.urlencode(
{
'id_token_hint': id_token,
'post_logout_redirect_uri': post_logout_uri,
}
)
logout_url = f'{okta_url}/v1/logout?{logout_params}'

return {
'statusCode': 200,
'headers': get_cors_headers(event),
'multiValueHeaders': {'Set-Cookie': cookies},
'body': json.dumps({'success': True, 'logout_url': logout_url}),
}


def userinfo_handler(event):
"""Return user info from id_token cookie"""
try:
# Check both 'Cookie' and 'cookie' - API Gateway may normalize header casing
cookie_header = event.get('headers', {}).get('Cookie') or event.get('headers', {}).get('cookie', '')

cookies = SimpleCookie()
cookies.load(cookie_header)

id_token_cookie = cookies.get('id_token')
if not id_token_cookie:
return error_response(401, 'Not authenticated', event)

id_token = id_token_cookie.value

# Decode JWT payload (middle part of token)
# JWT format: header.payload.signature (base64url encoded)
parts = id_token.split('.')
if len(parts) != 3:
return error_response(401, 'Invalid token format', event)

payload = parts[1]

# Base64 requires padding to be multiple of 4 characters
# URL-safe base64 in JWTs often omits padding, so we add it back
padding = 4 - len(payload) % 4
if padding != 4:
payload += '=' * padding

decoded = base64.urlsafe_b64decode(payload)
claims = json.loads(decoded)

# Check if token is expired
import time

exp = claims.get('exp')
if exp and int(exp) < int(time.time()):
return error_response(401, 'Token expired', event)

email_claim = os.environ.get('CLAIMS_MAPPING_EMAIL', 'email')
user_id_claim = os.environ.get('CLAIMS_MAPPING_USER_ID', 'sub')

email = claims.get(email_claim, claims.get('email', claims.get('sub', '')))
user_id = claims.get(user_id_claim, claims.get('sub', ''))

return {
'statusCode': 200,
'headers': get_cors_headers(event),
'body': json.dumps(
{
'email': email,
'name': claims.get('name', email),
'sub': user_id,
'exp': exp, # Include expiration time for frontend to set up timer
}
),
}

except (binascii.Error, ValueError) as e:
logger.error(f'Failed to decode JWT payload: {str(e)}')
return error_response(401, 'Invalid token', event)
except json.JSONDecodeError as e:
logger.error(f'Failed to parse JWT claims: {str(e)}')
return error_response(401, 'Invalid token', event)
except Exception as e:
logger.error(f'Userinfo error: {str(e)}')
return error_response(500, 'Internal server error', event)
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import os
from http.cookies import SimpleCookie

from requests import HTTPError

Expand All @@ -23,10 +24,32 @@


def lambda_handler(incoming_event, context):
# Get the Token which is sent in the Authorization Header
# Get the Token - first try Cookie header, then Authorization header
logger.debug(incoming_event)
auth_token = incoming_event['headers']['Authorization']
headers = incoming_event.get('headers', {})

# Try to get access_token from Cookie header first (for cookie-based auth)
auth_token = None
cookie_header = headers.get('Cookie') or headers.get('cookie', '')

if cookie_header:
# Parse cookies to find access_token
cookies = SimpleCookie()
cookies.load(cookie_header)
access_token_cookie = cookies.get('access_token')
if access_token_cookie:
# Add Bearer prefix for consistency with existing validation
auth_token = f'Bearer {access_token_cookie.value}'
logger.debug('Using access_token from Cookie header')

# Fallback to Authorization header (for backward compatibility)
if not auth_token:
auth_token = headers.get('Authorization') or headers.get('authorization')
if auth_token:
logger.debug('Using token from Authorization header')

if not auth_token:
logger.warning('No authentication token found in Cookie or Authorization header')
return AuthServices.generate_deny_policy(incoming_event['methodArn'])

# Validate User is Active with Proper Access Token
Expand Down
Loading
Loading