Skip to content

FastAPIProvider

fastagency.adapters.fastapi.FastAPIProvider #

FastAPIProvider(
    fastapi_url: str,
    initiate_workflow_path: str = "/fastagency/initiate_workflow",
    discovery_path: str = "/fastagency/discovery",
    ws_path: str = "/fastagency/ws",
)

Bases: ProviderProtocol

Initialize the fastapi workflows.

Source code in fastagency/adapters/fastapi/base.py
def __init__(
    self,
    fastapi_url: str,
    initiate_workflow_path: str = "/fastagency/initiate_workflow",
    discovery_path: str = "/fastagency/discovery",
    ws_path: str = "/fastagency/ws",
) -> None:
    """Initialize the fastapi workflows."""
    self._workflows: dict[
        str, tuple[Callable[[WorkflowsProtocol, UIBase, str, str], str], str]
    ] = {}

    self.fastapi_url = (
        fastapi_url[:-1] if fastapi_url.endswith("/") else fastapi_url
    )
    self.ws_url = "ws" + self.fastapi_url[4:]

    self.is_broker_running: bool = False

    self.initiate_workflow_path = initiate_workflow_path
    self.discovery_path = discovery_path
    self.ws_path = ws_path

discovery_path instance-attribute #

discovery_path = discovery_path

fastapi_url instance-attribute #

fastapi_url = (
    fastapi_url[:-1] if endswith("/") else fastapi_url
)

initiate_workflow_path instance-attribute #

initiate_workflow_path = initiate_workflow_path

is_broker_running instance-attribute #

is_broker_running: bool = False

names property #

names: list[str]

ws_path instance-attribute #

ws_path = ws_path

ws_url instance-attribute #

ws_url = 'ws' + fastapi_url[4:]

get_description #

get_description(name: str) -> str
Source code in fastagency/adapters/fastapi/base.py
def get_description(self, name: str) -> str:
    return self._get_description(name)

run #

run(
    name: str,
    ui: UI,
    user_id: Optional[str] = None,
    **kwargs: Any
) -> str
Source code in fastagency/adapters/fastapi/base.py
def run(
    self,
    name: str,
    ui: UI,
    user_id: Optional[str] = None,
    **kwargs: Any,
) -> str:
    workflow_uuid = ui._workflow_uuid

    initiate_workflow = self._send_initiate_chat_msg(
        name, workflow_uuid=workflow_uuid, user_id=user_id, params=kwargs
    )
    user_id = initiate_workflow.user_id if initiate_workflow.user_id else "None"
    workflow_uuid = initiate_workflow.workflow_uuid.hex

    _from_server_subject = f"chat.client.messages.{user_id}.{workflow_uuid}"
    _to_server_subject = f"chat.server.messages.{user_id}.{workflow_uuid}"

    async def _setup_and_run() -> None:
        await self._run_websocket_subscriber(
            ui,
            name,
            user_id,
            _from_server_subject,
            _to_server_subject,
            kwargs,
        )

    async def run_lifespan() -> None:
        if not self.is_broker_running:
            self.is_broker_running = True
            await _setup_and_run()
        else:
            await _setup_and_run()

    try:
        loop = asyncio.get_event_loop()
    except RuntimeError:
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)

    loop.run_until_complete(run_lifespan())

    return "FastAPIWorkflows.run() completed"