FastAPIProvider(
    fastapi_url: str,
    initiate_workflow_path: str = "/fastagency/initiate_workflow",
    discovery_path: str = "/fastagency/discovery",
    ws_path: str = "/fastagency/ws",
)
  Bases: ProviderProtocol
 Initialize the fastapi workflows.
  Source code in fastagency/adapters/fastapi/base.py
 |  | def __init__(
    self,
    fastapi_url: str,
    initiate_workflow_path: str = "/fastagency/initiate_workflow",
    discovery_path: str = "/fastagency/discovery",
    ws_path: str = "/fastagency/ws",
) -> None:
    """Initialize the fastapi workflows."""
    self._workflows: dict[
        str, tuple[Callable[[WorkflowsProtocol, UIBase, str, str], str], str]
    ] = {}
    self.fastapi_url = (
        fastapi_url[:-1] if fastapi_url.endswith("/") else fastapi_url
    )
    self.ws_url = "ws" + self.fastapi_url[4:]
    self.is_broker_running: bool = False
    self.initiate_workflow_path = initiate_workflow_path
    self.discovery_path = discovery_path
    self.ws_path = ws_path
 | 
     instance-attribute  
 discovery_path = discovery_path
 
     instance-attribute  
 fastapi_url = (
    fastapi_url[:-1] if endswith("/") else fastapi_url
)
 
     instance-attribute  
 initiate_workflow_path = initiate_workflow_path
 
     instance-attribute  
 is_broker_running: bool = False
 
      instance-attribute  
   
     instance-attribute  
 ws_url = 'ws' + fastapi_url[4:]
 
     
 get_description(name: str) -> str
  Source code in fastagency/adapters/fastapi/base.py
 |  | def get_description(self, name: str) -> str:
    return self._get_description(name)
 | 
        
    Source code in fastagency/adapters/fastapi/base.py
 |  | def run(
    self,
    name: str,
    ui: UI,
    user_id: Optional[str] = None,
    **kwargs: Any,
) -> str:
    workflow_uuid = ui._workflow_uuid
    initiate_workflow = self._send_initiate_chat_msg(
        name, workflow_uuid=workflow_uuid, user_id=user_id, params=kwargs
    )
    user_id = initiate_workflow.user_id if initiate_workflow.user_id else "None"
    workflow_uuid = initiate_workflow.workflow_uuid.hex
    _from_server_subject = f"chat.client.messages.{user_id}.{workflow_uuid}"
    _to_server_subject = f"chat.server.messages.{user_id}.{workflow_uuid}"
    async def _setup_and_run() -> None:
        await self._run_websocket_subscriber(
            ui,
            name,
            user_id,
            _from_server_subject,
            _to_server_subject,
            kwargs,
        )
    async def run_lifespan() -> None:
        if not self.is_broker_running:
            self.is_broker_running = True
            await _setup_and_run()
        else:
            await _setup_and_run()
    try:
        loop = asyncio.get_event_loop()
    except RuntimeError:
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
    loop.run_until_complete(run_lifespan())
    return "FastAPIWorkflows.run() completed"
 |