-
Notifications
You must be signed in to change notification settings - Fork 3
feat: Add support to refresh federated auth access token #46
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
c50220d
895e221
66b9979
8d31236
42c90f9
7fad890
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,9 +1,11 @@ | ||
| import base64 | ||
| import contextlib | ||
| import json | ||
| import logging | ||
| import re | ||
| import uuid | ||
| import warnings | ||
| from typing import Any | ||
| from urllib.parse import quote | ||
|
|
||
| import google.oauth2.credentials | ||
|
|
@@ -14,6 +16,7 @@ | |
| from google.api_core.client_info import ClientInfo | ||
| from google.cloud import bigquery | ||
| from packaging.version import parse as parse_version | ||
| from pydantic import BaseModel, ValidationError | ||
| from sqlalchemy.engine import URL, create_engine, make_url | ||
| from sqlalchemy.exc import ResourceClosedError | ||
|
|
||
|
|
@@ -33,6 +36,18 @@ | |
| from deepnote_toolkit.sql.sql_utils import is_single_select_query | ||
| from deepnote_toolkit.sql.url_utils import replace_user_pass_in_pg_url | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class IntegrationFederatedAuthParams(BaseModel): | ||
| integrationId: str | ||
| authContextToken: str | ||
|
|
||
|
|
||
| class FederatedAuthResponseData(BaseModel): | ||
| integrationType: str | ||
| accessToken: str | ||
|
|
||
|
|
||
| def compile_sql_query( | ||
| skip_jinja_template_render, | ||
|
|
@@ -242,11 +257,89 @@ def _generate_temporary_credentials(integration_id): | |
|
|
||
| response = requests.post(url, timeout=10, headers=headers) | ||
|
|
||
| response.raise_for_status() | ||
|
|
||
| data = response.json() | ||
|
|
||
| return quote(data["username"]), quote(data["password"]) | ||
|
|
||
|
|
||
| def _get_federated_auth_credentials( | ||
| integration_id: str, user_pod_auth_context_token: str | ||
| ) -> FederatedAuthResponseData: | ||
| """Get federated auth credentials for the given integration ID and user pod auth context token.""" | ||
|
|
||
| url = get_absolute_userpod_api_url( | ||
| f"integrations/federated-auth-token/{integration_id}" | ||
| ) | ||
|
|
||
| # Add project credentials in detached mode | ||
| headers = get_project_auth_headers() | ||
| headers["UserPodAuthContextToken"] = user_pod_auth_context_token | ||
|
|
||
| response = requests.post(url, timeout=10, headers=headers) | ||
|
|
||
| response.raise_for_status() | ||
|
|
||
| data = FederatedAuthResponseData.model_validate(response.json()) | ||
|
|
||
| return data | ||
|
|
||
|
Comment on lines
+267
to
+287
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add error handling for JSON decode and validation.
🔎 Proposed fix def _get_federated_auth_credentials(
integration_id: str, user_pod_auth_context_token: str
) -> FederatedAuthResponseData:
"""Get federated auth credentials for the given integration ID and user pod auth context token."""
url = get_absolute_userpod_api_url(
f"integrations/federated-auth-token/{integration_id}"
)
# Add project credentials in detached mode
headers = get_project_auth_headers()
headers["UserPodAuthContextToken"] = user_pod_auth_context_token
response = requests.post(url, timeout=10, headers=headers)
response.raise_for_status()
+ try:
- data = FederatedAuthResponseData.model_validate(response.json())
+ data = FederatedAuthResponseData.model_validate(response.json())
+ except (ValueError, ValidationError) as e:
+ logger.error(
+ "Failed to parse federated auth response from %s: %s",
+ url,
+ response.text,
+ exc_info=e,
+ )
+ raise
return data
|
||
|
|
||
| def _handle_iam_params(sql_alchemy_dict: dict[str, Any]) -> None: | ||
| """Apply IAM credentials to the connection URL in-place.""" | ||
|
|
||
| if "iamParams" not in sql_alchemy_dict: | ||
| return | ||
|
|
||
| integration_id = sql_alchemy_dict["iamParams"]["integrationId"] | ||
|
|
||
| temporary_username, temporary_password = _generate_temporary_credentials( | ||
| integration_id | ||
| ) | ||
|
|
||
| sql_alchemy_dict["url"] = replace_user_pass_in_pg_url( | ||
| sql_alchemy_dict["url"], temporary_username, temporary_password | ||
| ) | ||
|
|
||
|
|
||
| def _handle_federated_auth_params(sql_alchemy_dict: dict[str, Any]) -> None: | ||
| """Fetch and apply federated auth credentials to connection params in-place.""" | ||
|
|
||
| if "federatedAuthParams" not in sql_alchemy_dict: | ||
| return | ||
|
|
||
| try: | ||
| federated_auth_params = IntegrationFederatedAuthParams.model_validate( | ||
| sql_alchemy_dict["federatedAuthParams"] | ||
| ) | ||
| except ValidationError as e: | ||
| logger.error( | ||
| "Invalid federated auth params, try updating toolkit version:", exc_info=e | ||
| ) | ||
| return | ||
|
|
||
| federated_auth = _get_federated_auth_credentials( | ||
| federated_auth_params.integrationId, federated_auth_params.authContextToken | ||
| ) | ||
|
|
||
| if federated_auth.integrationType == "trino": | ||
| sql_alchemy_dict["params"]["connect_args"]["http_headers"][ | ||
| "Authorization" | ||
| ] = f"Bearer {federated_auth.accessToken}" | ||
| elif federated_auth.integrationType == "big-query": | ||
| sql_alchemy_dict["params"]["access_token"] = federated_auth.accessToken | ||
|
Comment on lines
+326
to
+331
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing KeyError handling for nested dict access. Lines 319-321 assume 🔎 Proposed fix if federated_auth.integrationType == "trino":
- sql_alchemy_dict["params"]["connect_args"]["http_headers"][
- "Authorization"
- ] = f"Bearer {federated_auth.accessToken}"
+ try:
+ sql_alchemy_dict["params"]["connect_args"]["http_headers"][
+ "Authorization"
+ ] = f"Bearer {federated_auth.accessToken}"
+ except KeyError:
+ logger.error(
+ "Missing required connection structure for Trino federated auth"
+ )
+ return
elif federated_auth.integrationType == "big-query":
sql_alchemy_dict["params"]["access_token"] = federated_auth.accessToken🤖 Prompt for AI Agents |
||
| elif federated_auth.integrationType == "snowflake": | ||
| logger.warning( | ||
| "Snowflake federated auth is not supported yet, using the original connection URL" | ||
| ) | ||
| else: | ||
| logger.error( | ||
| "Unsupported integration type: %s, try updating toolkit version", | ||
| federated_auth.integrationType, | ||
| ) | ||
|
|
||
|
|
||
| @contextlib.contextmanager | ||
| def _create_sql_ssh_uri(ssh_enabled, sql_alchemy_dict): | ||
| server = None | ||
|
|
@@ -346,16 +439,9 @@ def _query_data_source( | |
| ): | ||
| sshEnabled = sql_alchemy_dict.get("ssh_options", {}).get("enabled", False) | ||
|
|
||
| if "iamParams" in sql_alchemy_dict: | ||
| integration_id = sql_alchemy_dict["iamParams"]["integrationId"] | ||
| _handle_iam_params(sql_alchemy_dict) | ||
|
|
||
| temporaryUsername, temporaryPassword = _generate_temporary_credentials( | ||
| integration_id | ||
| ) | ||
|
|
||
| sql_alchemy_dict["url"] = replace_user_pass_in_pg_url( | ||
| sql_alchemy_dict["url"], temporaryUsername, temporaryPassword | ||
| ) | ||
| _handle_federated_auth_params(sql_alchemy_dict) | ||
|
|
||
| with _create_sql_ssh_uri(sshEnabled, sql_alchemy_dict) as url: | ||
| if url is None: | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
Consider snake_case attributes with camelCase aliases.
The model fields use camelCase, which matches the API but violates Python conventions. For better ergonomics, use snake_case attributes with Pydantic aliases:
This allows idiomatic Python access (
federated_auth.access_token) while preserving API compatibility.🤖 Prompt for AI Agents