Bases: BaseSecurity
OAuth2 Password Bearer security class.
in_value class-attribute
in_value: Literal['bearer'] = 'bearer'
token_url instance-attribute
Parameters
Bases: BaseModel
OAuth2 Password Bearer security class.
bearer_token class-attribute
instance-attribute
password class-attribute
instance-attribute
token_url class-attribute
instance-attribute
username class-attribute
instance-attribute
apply
Source code in fastagency/api/openapi/security.py
| def apply(
self,
q_params: dict[str, Any],
body_dict: dict[str, Any],
security: BaseSecurity,
) -> None:
if not self.bearer_token:
if security.token_url is None: # type: ignore
raise ValueError("Token URL is not defined")
self.bearer_token = self.get_token(security.token_url) # type: ignore
if "headers" not in body_dict:
body_dict["headers"] = {}
body_dict["headers"]["Authorization"] = f"Bearer {self.bearer_token}"
|
check_credentials
Source code in fastagency/api/openapi/security.py
| @model_validator(mode="before")
def check_credentials(cls, values: dict[str, Any]) -> Any: # noqa
username = values.get("username")
password = values.get("password")
bearer_token = values.get("bearer_token")
if not bearer_token and (not username or not password):
# If bearer_token is not provided, both username and password must be defined
raise ValueError(
"Both username and password are required if bearer_token is not provided."
)
return values
|
get_security_class
Source code in fastagency/api/openapi/security.py
| def get_security_class(self) -> type[BaseSecurity]:
return OAuth2PasswordBearer
|
get_token
get_token(token_url: str) -> str
Source code in fastagency/api/openapi/security.py
| def get_token(self, token_url: str) -> str:
# Get the token
request = requests.post(
token_url,
data={
"username": self.username,
"password": self.password,
},
timeout=5,
)
request.raise_for_status()
return request.json()["access_token"] # type: ignore
|
accept
Source code in fastagency/api/openapi/security.py
| def accept(self, security_params: "BaseSecurityParameters") -> bool:
return isinstance(self, security_params.get_security_class())
|
get_security_class classmethod
get_security_class(
type: str, schema_parameters: dict[str, Any]
) -> BaseSecurityType
Source code in fastagency/api/openapi/security.py
| @classmethod
def get_security_class(
cls, type: str, schema_parameters: dict[str, Any]
) -> BaseSecurityType:
sub_classes = cls.__subclasses__()
for sub_class in sub_classes:
if sub_class.is_supported(type, schema_parameters):
return sub_class
logger.error(
f"Unsupported type '{type}' and schema_parameters '{schema_parameters}' combination"
)
return UnsuportedSecurityStub
|
get_security_parameters classmethod
get_security_parameters(
schema_parameters: dict[str, Any]
) -> str
Source code in fastagency/api/openapi/security.py
| @classmethod
def get_security_parameters(cls, schema_parameters: dict[str, Any]) -> str:
name = schema_parameters.get("name")
token_url = f'{schema_parameters.get("server_url")}/{schema_parameters["flows"]["password"]["tokenUrl"]}'
return f'{cls.__name__}(name="{name}", token_url="{token_url}")'
|
is_supported classmethod
Source code in fastagency/api/openapi/security.py
| @classmethod
def is_supported(cls, type: str, schema_parameters: dict[str, Any]) -> bool:
return type == cls.type and "password" in schema_parameters.get("flows", {})
|