Skip to content

FastAPIAdapter

fastagency.adapters.fastapi.base.FastAPIAdapter #

FastAPIAdapter(
    provider: ProviderProtocol,
    *,
    initiate_workflow_path: str = "/fastagency/initiate_workflow",
    discovery_path: str = "/fastagency/discovery",
    ws_path: str = "/fastagency/ws",
    get_user_id: Optional[
        Callable[..., Optional[str]]
    ] = None
)

Bases: MessageProcessorMixin, CreateWorkflowUIMixin

Provider for FastAPI.

PARAMETER DESCRIPTION
provider

The provider.

TYPE: ProviderProtocol

initiate_workflow_path

The initiate workflow path. Defaults to "/fastagency/initiate_workflow".

TYPE: str DEFAULT: '/fastagency/initiate_workflow'

discovery_path

The discovery path. Defaults to "/fastagency/discovery".

TYPE: str DEFAULT: '/fastagency/discovery'

ws_path

The websocket path. Defaults to "/fastagency/ws".

TYPE: str DEFAULT: '/fastagency/ws'

get_user_id

The get user id. Defaults to None.

TYPE: Optional[Callable[[], Optional[UUID]]] DEFAULT: None

Source code in fastagency/adapters/fastapi/base.py
def __init__(
    self,
    provider: ProviderProtocol,
    *,
    initiate_workflow_path: str = "/fastagency/initiate_workflow",
    discovery_path: str = "/fastagency/discovery",
    ws_path: str = "/fastagency/ws",
    get_user_id: Optional[Callable[..., Optional[str]]] = None,
) -> None:
    """Provider for FastAPI.

    Args:
        provider (ProviderProtocol): The provider.
        initiate_workflow_path (str, optional): The initiate workflow path. Defaults to "/fastagency/initiate_workflow".
        discovery_path (str, optional): The discovery path. Defaults to "/fastagency/discovery".
        ws_path (str, optional): The websocket path. Defaults to "/fastagency/ws".
        get_user_id (Optional[Callable[[], Optional[UUID]]], optional): The get user id. Defaults to None.
    """
    self.provider = provider

    self.initiate_workflow_path = initiate_workflow_path
    self.discovery_path = discovery_path
    self.ws_path = ws_path

    self.get_user_id = get_user_id or (lambda: None)

    self.websockets: dict[str, WebSocket] = {}

    self.router = self.setup_routes()

discovery_path instance-attribute #

discovery_path = discovery_path

get_user_id instance-attribute #

get_user_id = get_user_id or lambda: None

initiate_workflow_path instance-attribute #

initiate_workflow_path = initiate_workflow_path

provider instance-attribute #

provider = provider

router instance-attribute #

router = setup_routes()

websockets instance-attribute #

websockets: dict[str, WebSocket] = {}

ws_path instance-attribute #

ws_path = ws_path

create #

create(app: Runnable, import_string: str) -> Iterator[None]
Source code in fastagency/adapters/fastapi/base.py
@contextmanager
def create(self, app: Runnable, import_string: str) -> Iterator[None]:
    raise NotImplementedError("create")

create_provider classmethod #

create_provider(fastapi_url: str) -> ProviderProtocol
Source code in fastagency/adapters/fastapi/base.py
@classmethod
def create_provider(
    cls,
    fastapi_url: str,
) -> ProviderProtocol:
    return FastAPIProvider(
        fastapi_url=fastapi_url,
    )

create_subconversation #

create_subconversation() -> UIBase
Source code in fastagency/adapters/fastapi/base.py
def create_subconversation(self) -> UIBase:
    return self

create_workflow_ui #

create_workflow_ui(workflow_uuid: str) -> UI
Source code in fastagency/base.py
def create_workflow_ui(self: UIBase, workflow_uuid: str) -> "UI":
    return UI(uibase=self, workflow_uuid=workflow_uuid)

error #

error(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    short: Optional[str] = None,
    long: Optional[str] = None,
) -> Optional[str]
Source code in fastagency/messages.py
def error(
    self,
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    short: Optional[str] = None,
    long: Optional[str] = None,
) -> Optional[str]:
    uuid = uuid or str(uuid4().hex)
    return self.process_message(
        Error(
            sender=sender,
            recipient=recipient,
            auto_reply=auto_reply,
            uuid=uuid,
            workflow_uuid=workflow_uuid,
            short=short,
            long=long,
        )
    )

function_call_execution #

function_call_execution(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    function_name: Optional[str] = None,
    call_id: Optional[str] = None,
    retval: Any = None,
) -> Optional[str]
Source code in fastagency/messages.py
def function_call_execution(
    self,
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    function_name: Optional[str] = None,
    call_id: Optional[str] = None,
    retval: Any = None,
) -> Optional[str]:
    uuid = uuid or str(uuid4().hex)
    return self.process_message(
        FunctionCallExecution(
            sender=sender,
            recipient=recipient,
            auto_reply=auto_reply,
            uuid=uuid,
            workflow_uuid=workflow_uuid,
            function_name=function_name,
            call_id=call_id,
            retval=retval,
        )
    )

get_user_id_websocket async #

get_user_id_websocket(
    websocket: WebSocket,
) -> Optional[str]
Source code in fastagency/adapters/fastapi/base.py
async def get_user_id_websocket(self, websocket: WebSocket) -> Optional[str]:
    def get_user_id_depends_stub(
        user_id: Optional[str] = Depends(self.get_user_id),
    ) -> Optional[str]:
        raise RuntimeError(
            "Stub get_user_id_depends_stub called"
        )  # pragma: no cover

    dependant = get_dependant(path="", call=get_user_id_depends_stub)

    try:
        async with AsyncExitStack() as cm:
            scope = websocket.scope
            scope["type"] = "http"

            solved_dependency = await solve_dependencies(
                dependant=dependant,
                request=Request(scope=scope),  # Inject the request here
                body=None,
                dependency_overrides_provider=None,
                async_exit_stack=cm,
                embed_body_fields=False,
            )
    except HTTPException as e:
        raise e
    finally:
        scope["type"] = "websocket"

    return solved_dependency.values["user_id"]  # type: ignore[no-any-return]

keep_alive #

keep_alive(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
) -> Optional[str]
Source code in fastagency/messages.py
def keep_alive(
    self,
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
) -> Optional[str]:
    uuid = uuid or str(uuid4().hex)
    return self.process_message(
        KeepAlive(
            sender=sender,
            recipient=recipient,
            auto_reply=auto_reply,
            uuid=uuid,
            workflow_uuid=workflow_uuid,
        )
    )

multiple_choice #

multiple_choice(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    prompt: Optional[str] = None,
    choices: Optional[list[str]] = None,
    default: Optional[str] = None,
    single: bool = True,
) -> Optional[str]
Source code in fastagency/messages.py
def multiple_choice(
    self,
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    prompt: Optional[str] = None,
    choices: Optional[list[str]] = None,
    default: Optional[str] = None,
    single: bool = True,
) -> Optional[str]:
    uuid = uuid or str(uuid4().hex)
    choices = choices or []
    return self.process_message(
        MultipleChoice(
            sender=sender,
            recipient=recipient,
            auto_reply=auto_reply,
            uuid=uuid,
            workflow_uuid=workflow_uuid,
            prompt=prompt,
            choices=choices,
            default=default,
            single=single,
        )
    )

process_message #

process_message(message: IOMessage) -> Optional[str]
Source code in fastagency/messages.py
def process_message(self, message: IOMessage) -> Optional[str]:
    try:
        return self.visit(message)
    except Exception as e:
        # log the error and return None
        logger.error(f"Error processing message ({message}): {e}", exc_info=True)
        return None

setup_routes #

setup_routes() -> APIRouter
Source code in fastagency/adapters/fastapi/base.py
def setup_routes(self) -> APIRouter:
    router = APIRouter()

    @router.post(self.initiate_workflow_path)
    async def initiate_chat(
        initiate_chat: InititateChatModel,
        user_id: Optional[str] = Depends(self.get_user_id),
    ) -> InitiateWorkflowModel:
        workflow_uuid: UUID = uuid4()

        init_msg = InitiateWorkflowModel(
            user_id=user_id,
            workflow_uuid=workflow_uuid,
            params=initiate_chat.params,
            name=initiate_chat.workflow_name,
        )

        return init_msg

    @router.websocket(self.ws_path)
    async def websocket_endpoint(
        websocket: WebSocket,
    ) -> None:
        try:
            user_id = await self.get_user_id_websocket(websocket)
        except HTTPException as e:
            headers = getattr(e, "headers", None)
            await websocket.send_denial_response(
                Response(status_code=e.status_code, headers=headers)
            )
            return

        logger.info("Websocket connected")
        await websocket.accept()
        logger.info("Websocket accepted")

        init_msg_json = await websocket.receive_text()
        logger.info(f"Received message: {init_msg_json}")

        init_msg = InitiateWorkflowModel.model_validate_json(init_msg_json)

        workflow_uuid = init_msg.workflow_uuid.hex
        self.websockets[workflow_uuid] = websocket

        try:
            await asyncify(self.provider.run)(
                name=init_msg.name,
                ui=self.create_workflow_ui(workflow_uuid),
                user_id=user_id if user_id else "None",
                **init_msg.params,
            )
        except Exception as e:
            logger.error(f"Error in websocket_endpoint: {e}", stack_info=True)
        finally:
            self.websockets.pop(workflow_uuid)

    @router.get(
        self.discovery_path,
        responses={
            404: {"detail": "Key Not Found"},
            504: {"detail": "Unable to connect to provider"},
        },
    )
    def discovery(
        user_id: Optional[str] = Depends(self.get_user_id),
    ) -> list[WorkflowInfo]:
        try:
            names = self.provider.names
        except FastAgencyConnectionError as e:
            raise HTTPException(status_code=504, detail=str(e)) from e

        try:
            descriptions = [self.provider.get_description(name) for name in names]
        except FastAgencyKeyError as e:
            raise HTTPException(status_code=404, detail=str(e)) from e

        return [
            WorkflowInfo(name=name, description=description)
            for name, description in zip(names, descriptions)
        ]

    return router

start #

start(
    *,
    app: Runnable,
    import_string: str,
    name: Optional[str] = None,
    params: dict[str, Any],
    single_run: bool = False
) -> None
Source code in fastagency/adapters/fastapi/base.py
def start(
    self,
    *,
    app: "Runnable",
    import_string: str,
    name: Optional[str] = None,
    params: dict[str, Any],
    single_run: bool = False,
) -> None:
    raise NotImplementedError("start")

suggested_function_call #

suggested_function_call(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    function_name: Optional[str] = None,
    call_id: Optional[str] = None,
    arguments: Optional[dict[str, Any]] = None,
) -> Optional[str]
Source code in fastagency/messages.py
def suggested_function_call(
    self,
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    function_name: Optional[str] = None,
    call_id: Optional[str] = None,
    arguments: Optional[dict[str, Any]] = None,
) -> Optional[str]:
    uuid = uuid or str(uuid4().hex)
    arguments = arguments or {}
    return self.process_message(
        SuggestedFunctionCall(
            sender=sender,
            recipient=recipient,
            auto_reply=auto_reply,
            uuid=uuid,
            workflow_uuid=workflow_uuid,
            function_name=function_name,
            call_id=call_id,
            arguments=arguments,
        )
    )

system_message #

system_message(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    message: Optional[dict[str, Any]] = None,
) -> Optional[str]
Source code in fastagency/messages.py
def system_message(
    self,
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    message: Optional[dict[str, Any]] = None,
) -> Optional[str]:
    uuid = uuid or str(uuid4().hex)
    message = message or {}
    return self.process_message(
        SystemMessage(
            sender=sender,
            recipient=recipient,
            auto_reply=auto_reply,
            uuid=uuid,
            workflow_uuid=workflow_uuid,
            message=message,
        )
    )

text_input #

text_input(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    prompt: Optional[str] = None,
    suggestions: Optional[list[str]] = None,
    password: bool = False,
) -> Optional[str]
Source code in fastagency/messages.py
def text_input(
    self,
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    prompt: Optional[str] = None,
    suggestions: Optional[list[str]] = None,
    password: bool = False,
) -> Optional[str]:
    uuid = uuid or str(uuid4().hex)
    suggestions = suggestions or []
    return self.process_message(
        TextInput(
            sender=sender,
            recipient=recipient,
            auto_reply=auto_reply,
            uuid=uuid,
            workflow_uuid=workflow_uuid,
            prompt=prompt,
            suggestions=suggestions,
            password=password,
        )
    )

text_message #

text_message(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    body: Optional[str] = None,
) -> Optional[str]
Source code in fastagency/messages.py
def text_message(
    self,
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    body: Optional[str] = None,
) -> Optional[str]:
    uuid = uuid or str(uuid4().hex)
    return self.process_message(
        TextMessage(
            sender=sender,
            recipient=recipient,
            auto_reply=auto_reply,
            uuid=uuid,
            workflow_uuid=workflow_uuid,
            body=body,
        )
    )

visit #

visit(message: IOMessage) -> Optional[str]
Source code in fastagency/messages.py
def visit(self, message: IOMessage) -> Optional[str]:
    method_name = f"visit_{message.type}"
    method = getattr(self, method_name, self.visit_default)
    return method(message)

visit_default #

visit_default(message: IOMessage) -> Optional[str]
Source code in fastagency/adapters/fastapi/base.py
def visit_default(self, message: IOMessage) -> Optional[str]:
    async def a_visit_default(
        self: FastAPIAdapter, message: IOMessage
    ) -> Optional[str]:
        workflow_uuid = message.workflow_uuid
        if workflow_uuid not in self.websockets:
            logger.error(
                f"Workflow {workflow_uuid} not found in websockets: {self.websockets}"
            )
            raise RuntimeError(
                f"Workflow {workflow_uuid} not found in websockets: {self.websockets}"
            )
        websocket = self.websockets[workflow_uuid]  # type: ignore[index]
        await websocket.send_text(json.dumps(message.model_dump()))

        if isinstance(message, AskingMessage):
            response = await websocket.receive_text()
            return response
        return None

    return syncify(a_visit_default)(self, message)

visit_error #

visit_error(message: Error) -> Optional[str]
Source code in fastagency/messages.py
def visit_error(self, message: Error) -> Optional[str]:
    return self.visit_default(message)

visit_function_call_execution #

visit_function_call_execution(
    message: FunctionCallExecution,
) -> Optional[str]
Source code in fastagency/messages.py
def visit_function_call_execution(
    self, message: FunctionCallExecution
) -> Optional[str]:
    return self.visit_default(message)

visit_keep_alive #

visit_keep_alive(message: KeepAlive) -> Optional[str]
Source code in fastagency/messages.py
def visit_keep_alive(self, message: KeepAlive) -> Optional[str]:
    return self.visit_default(message)

visit_multiple_choice #

visit_multiple_choice(
    message: MultipleChoice,
) -> Optional[str]
Source code in fastagency/messages.py
def visit_multiple_choice(self, message: MultipleChoice) -> Optional[str]:
    return self.visit_default(message)

visit_suggested_function_call #

visit_suggested_function_call(
    message: SuggestedFunctionCall,
) -> Optional[str]
Source code in fastagency/messages.py
def visit_suggested_function_call(
    self, message: SuggestedFunctionCall
) -> Optional[str]:
    return self.visit_default(message)

visit_system_message #

visit_system_message(
    message: SystemMessage,
) -> Optional[str]
Source code in fastagency/messages.py
def visit_system_message(self, message: SystemMessage) -> Optional[str]:
    return self.visit_default(message)

visit_text_input #

visit_text_input(message: TextInput) -> Optional[str]
Source code in fastagency/messages.py
def visit_text_input(self, message: TextInput) -> Optional[str]:
    return self.visit_default(message)

visit_text_message #

visit_text_message(message: TextMessage) -> Optional[str]
Source code in fastagency/messages.py
def visit_text_message(self, message: TextMessage) -> Optional[str]:
    return self.visit_default(message)

visit_workflow_completed #

visit_workflow_completed(
    message: WorkflowCompleted,
) -> Optional[str]
Source code in fastagency/messages.py
def visit_workflow_completed(self, message: WorkflowCompleted) -> Optional[str]:
    return self.visit_default(message)

visit_workflow_started #

visit_workflow_started(
    message: WorkflowStarted,
) -> Optional[str]
Source code in fastagency/messages.py
def visit_workflow_started(self, message: WorkflowStarted) -> Optional[str]:
    return self.visit_default(message)

workflow_completed #

workflow_completed(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    result: Optional[str] = None,
) -> Optional[str]
Source code in fastagency/messages.py
def workflow_completed(
    self,
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    result: Optional[str] = None,
) -> Optional[str]:
    uuid = uuid or str(uuid4().hex)
    return self.process_message(
        WorkflowCompleted(
            sender=sender,
            recipient=recipient,
            auto_reply=auto_reply,
            uuid=uuid,
            workflow_uuid=workflow_uuid,
            result=result,
        )
    )

workflow_started #

workflow_started(
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    name: Optional[str] = None,
    description: Optional[str] = None,
    params: Optional[dict[str, Any]] = None,
) -> Optional[str]
Source code in fastagency/messages.py
def workflow_started(
    self,
    workflow_uuid: str,
    sender: Optional[str] = None,
    recipient: Optional[str] = None,
    auto_reply: bool = False,
    uuid: Optional[str] = None,
    name: Optional[str] = None,
    description: Optional[str] = None,
    params: Optional[dict[str, Any]] = None,
) -> Optional[str]:
    uuid = uuid or str(uuid4().hex)
    params = params or {}
    return self.process_message(
        WorkflowStarted(
            sender=sender,
            recipient=recipient,
            auto_reply=auto_reply,
            uuid=uuid,
            workflow_uuid=workflow_uuid,
            name=name,
            description=description,
            params=params,
        )
    )