Skip to content

OAuth2PasswordBearer

fastagency.api.openapi.security.OAuth2PasswordBearer #

Bases: BaseSecurity

OAuth2 Password Bearer security class.

in_value class-attribute #

in_value: Literal['bearer'] = 'bearer'

name instance-attribute #

name: str

token_url instance-attribute #

token_url: str

type class-attribute #

type: Literal['oauth2'] = 'oauth2'

Parameters #

Bases: BaseModel

OAuth2 Password Bearer security class.

bearer_token class-attribute instance-attribute #

bearer_token: Optional[str] = None

password class-attribute instance-attribute #

password: Optional[str] = None

token_url class-attribute instance-attribute #

token_url: Optional[str] = None

username class-attribute instance-attribute #

username: Optional[str] = None

apply #

apply(
    q_params: dict[str, Any],
    body_dict: dict[str, Any],
    security: BaseSecurity,
) -> None
Source code in fastagency/api/openapi/security.py
def apply(
    self,
    q_params: dict[str, Any],
    body_dict: dict[str, Any],
    security: BaseSecurity,
) -> None:
    if not self.bearer_token:
        if security.token_url is None:  # type: ignore
            raise ValueError("Token URL is not defined")
        self.bearer_token = self.get_token(security.token_url)  # type: ignore

    if "headers" not in body_dict:
        body_dict["headers"] = {}

    body_dict["headers"]["Authorization"] = f"Bearer {self.bearer_token}"

check_credentials #

check_credentials(values: dict[str, Any]) -> Any
Source code in fastagency/api/openapi/security.py
@model_validator(mode="before")
def check_credentials(cls, values: dict[str, Any]) -> Any:  # noqa
    username = values.get("username")
    password = values.get("password")
    bearer_token = values.get("bearer_token")

    if not bearer_token and (not username or not password):
        # If bearer_token is not provided, both username and password must be defined
        raise ValueError(
            "Both username and password are required if bearer_token is not provided."
        )

    return values

get_security_class #

get_security_class() -> type[BaseSecurity]
Source code in fastagency/api/openapi/security.py
def get_security_class(self) -> type[BaseSecurity]:
    return OAuth2PasswordBearer

get_token #

get_token(token_url: str) -> str
Source code in fastagency/api/openapi/security.py
def get_token(self, token_url: str) -> str:
    # Get the token
    request = requests.post(
        token_url,
        data={
            "username": self.username,
            "password": self.password,
        },
        timeout=5,
    )
    request.raise_for_status()
    return request.json()["access_token"]  # type: ignore

accept #

accept(security_params: BaseSecurityParameters) -> bool
Source code in fastagency/api/openapi/security.py
def accept(self, security_params: "BaseSecurityParameters") -> bool:
    return isinstance(self, security_params.get_security_class())

get_security_class classmethod #

get_security_class(
    type: str, schema_parameters: dict[str, Any]
) -> BaseSecurityType
Source code in fastagency/api/openapi/security.py
@classmethod
def get_security_class(
    cls, type: str, schema_parameters: dict[str, Any]
) -> BaseSecurityType:
    sub_classes = cls.__subclasses__()

    for sub_class in sub_classes:
        if sub_class.is_supported(type, schema_parameters):
            return sub_class

    logger.error(
        f"Unsupported type '{type}' and schema_parameters '{schema_parameters}' combination"
    )
    return UnsuportedSecurityStub

get_security_parameters classmethod #

get_security_parameters(
    schema_parameters: dict[str, Any]
) -> str
Source code in fastagency/api/openapi/security.py
@classmethod
def get_security_parameters(cls, schema_parameters: dict[str, Any]) -> str:
    name = schema_parameters.get("name")
    token_url = f'{schema_parameters.get("server_url")}/{schema_parameters["flows"]["password"]["tokenUrl"]}'
    return f'{cls.__name__}(name="{name}", token_url="{token_url}")'

is_supported classmethod #

is_supported(
    type: str, schema_parameters: dict[str, Any]
) -> bool
Source code in fastagency/api/openapi/security.py
@classmethod
def is_supported(cls, type: str, schema_parameters: dict[str, Any]) -> bool:
    return type == cls.type and "password" in schema_parameters.get("flows", {})