Skip to content

PrismaBackendDB

fastagency.studio.db.prisma.PrismaBackendDB #

Bases: BackendDBProtocol, PrismaBaseDB

ENV_VAR class-attribute instance-attribute #

ENV_VAR = 'PY_DATABASE_URL'

create_auth_token async #

create_auth_token(
    auth_token_uuid: Union[str, UUID],
    name: str,
    user_uuid: Union[str, UUID],
    deployment_uuid: Union[str, UUID],
    hashed_auth_token: str,
    expiry: str,
    expires_at: datetime,
) -> dict[str, Any]
Source code in fastagency/studio/db/prisma.py
async def create_auth_token(
    self,
    auth_token_uuid: Union[str, UUID],
    name: str,
    user_uuid: Union[str, UUID],
    deployment_uuid: Union[str, UUID],
    hashed_auth_token: str,
    expiry: str,
    expires_at: datetime,
) -> dict[str, Any]:
    async with self._get_db_connection() as db:
        created_auth_token = await db.authtoken.create(  # type: ignore[attr-defined]
            data={
                "uuid": str(auth_token_uuid),
                "name": name,
                "user_uuid": str(user_uuid),
                "deployment_uuid": str(deployment_uuid),
                "auth_token": hashed_auth_token,
                "expiry": expiry,
                "expires_at": expires_at,
            }
        )
    return created_auth_token.model_dump()  # type: ignore[no-any-return,union-attr]

create_model async #

create_model(
    model_uuid: Union[str, UUID],
    user_uuid: Union[str, UUID],
    type_name: str,
    model_name: str,
    json_str: str,
) -> dict[str, Any]
Source code in fastagency/studio/db/prisma.py
async def create_model(
    self,
    model_uuid: Union[str, UUID],
    user_uuid: Union[str, UUID],
    type_name: str,
    model_name: str,
    json_str: str,
) -> dict[str, Any]:
    async with self._get_db_connection() as db:
        created_model = await db.model.create(
            data={
                "uuid": str(model_uuid),
                "user_uuid": str(user_uuid),
                "type_name": type_name,
                "model_name": model_name,
                "json_str": json_str,  # type: ignore[typeddict-item]
            }
        )
    return created_model.model_dump()  # type: ignore[no-any-return]

delete_auth_token async #

delete_auth_token(
    auth_token_uuid: Union[str, UUID],
    deployment_uuid: Union[str, UUID],
    user_uuid: Union[str, UUID],
) -> dict[str, Any]
Source code in fastagency/studio/db/prisma.py
async def delete_auth_token(
    self,
    auth_token_uuid: Union[str, UUID],
    deployment_uuid: Union[str, UUID],
    user_uuid: Union[str, UUID],
) -> dict[str, Any]:
    async with self._get_db_connection() as db:
        deleted_auth_token = await db.authtoken.delete(
            where={  # type: ignore[typeddict-unknown-key]
                "uuid": str(auth_token_uuid),
                "deployment_uuid": str(deployment_uuid),
                "user_uuid": str(user_uuid),
            },
        )
    if deleted_auth_token is None:
        raise KeyNotFoundError(f"auth_token_uuid {auth_token_uuid} not found")
    return deleted_auth_token.model_dump()  # type: ignore[no-any-return,union-attr]

delete_model async #

delete_model(
    model_uuid: Union[str, UUID]
) -> dict[str, Any]
Source code in fastagency/studio/db/prisma.py
async def delete_model(self, model_uuid: Union[str, UUID]) -> dict[str, Any]:
    async with self._get_db_connection() as db:
        deleted_model = await db.model.delete(where={"uuid": str(model_uuid)})
    if deleted_model is None:
        raise KeyNotFoundError(f"model_uuid {model_uuid} not found")
    return deleted_model.model_dump()  # type: ignore[no-any-return,union-attr]

find_many_auth_token async #

find_many_auth_token(
    user_uuid: Union[str, UUID],
    deployment_uuid: Union[str, UUID],
) -> list[dict[str, Any]]
Source code in fastagency/studio/db/prisma.py
async def find_many_auth_token(
    self, user_uuid: Union[str, UUID], deployment_uuid: Union[str, UUID]
) -> list[dict[str, Any]]:
    async with self._get_db_connection() as db:
        auth_tokens = await db.authtoken.find_many(
            where={
                "deployment_uuid": str(deployment_uuid),
                "user_uuid": str(user_uuid),
            },
        )
    return [auth_token.model_dump() for auth_token in auth_tokens]

find_many_model async #

find_many_model(
    user_uuid: Union[str, UUID],
    type_name: Optional[str] = None,
) -> list[dict[str, Any]]
Source code in fastagency/studio/db/prisma.py
async def find_many_model(
    self, user_uuid: Union[str, UUID], type_name: Optional[str] = None
) -> list[dict[str, Any]]:
    filters: dict[str, Any] = {"user_uuid": str(user_uuid)}
    if type_name:
        filters["type_name"] = type_name

    async with self._get_db_connection() as db:
        models = await db.model.find_many(where=filters)  # type: ignore[arg-type]
    return [model.model_dump() for model in models]

find_model async #

find_model(model_uuid: Union[str, UUID]) -> dict[str, Any]
Source code in fastagency/studio/db/prisma.py
async def find_model(self, model_uuid: Union[str, UUID]) -> dict[str, Any]:
    model_uuid = str(model_uuid)
    async with self._get_db_connection() as db:
        model: Optional[dict[str, Any]] = await db.query_first(
            'SELECT * from "Model" where uuid='  # nosec: [B608]
            + f"'{model_uuid}'"
        )
    if not model:
        raise KeyNotFoundError(f"model_uuid {model_uuid} not found")
    return model

update_model async #

update_model(
    model_uuid: Union[str, UUID],
    user_uuid: Union[str, UUID],
    type_name: str,
    model_name: str,
    json_str: str,
) -> dict[str, Any]
Source code in fastagency/studio/db/prisma.py
async def update_model(
    self,
    model_uuid: Union[str, UUID],
    user_uuid: Union[str, UUID],
    type_name: str,
    model_name: str,
    json_str: str,
) -> dict[str, Any]:
    async with self._get_db_connection() as db:
        updated_model = await db.model.update(
            where={"uuid": str(model_uuid)},  # type: ignore[arg-type]
            data={  # type: ignore[typeddict-unknown-key]
                "type_name": type_name,
                "model_name": model_name,
                "json_str": json_str,  # type: ignore[typeddict-item]
                "user_uuid": str(user_uuid),
            },
        )
    if updated_model is None:
        raise KeyNotFoundError(f"model_uuid {model_uuid} not found")
    return updated_model.model_dump()  # type: ignore[no-any-return,union-attr]