FastAPIProvider(
fastapi_url: str,
initiate_workflow_path: str = "/fastagency/initiate_workflow",
discovery_path: str = "/fastagency/discovery",
ws_path: str = "/fastagency/ws",
)
Bases: ProviderProtocol
Initialize the fastapi workflows.
Source code in fastagency/adapters/fastapi/base.py
| def __init__(
self,
fastapi_url: str,
initiate_workflow_path: str = "/fastagency/initiate_workflow",
discovery_path: str = "/fastagency/discovery",
ws_path: str = "/fastagency/ws",
) -> None:
"""Initialize the fastapi workflows."""
self._workflows: dict[
str, tuple[Callable[[WorkflowsProtocol, UIBase, str, str], str], str]
] = {}
self.fastapi_url = (
fastapi_url[:-1] if fastapi_url.endswith("/") else fastapi_url
)
self.ws_url = "ws" + self.fastapi_url[4:]
self.is_broker_running: bool = False
self.initiate_workflow_path = initiate_workflow_path
self.discovery_path = discovery_path
self.ws_path = ws_path
|
discovery_path instance-attribute
fastapi_url instance-attribute
initiate_workflow_path instance-attribute
is_broker_running instance-attribute
is_broker_running: bool = False
ws_path instance-attribute
ws_url instance-attribute
ws_url = 'ws' + fastapi_url[4:]
get_description
get_description(name: str) -> str
Source code in fastagency/adapters/fastapi/base.py
| def get_description(self, name: str) -> str:
return self._get_description(name)
|
run
Source code in fastagency/adapters/fastapi/base.py
| def run(
self,
name: str,
ui: UI,
user_id: Optional[str] = None,
**kwargs: Any,
) -> str:
workflow_uuid = ui._workflow_uuid
initiate_workflow = self._send_initiate_chat_msg(
name, workflow_uuid=workflow_uuid, user_id=user_id, params=kwargs
)
user_id = initiate_workflow.user_id if initiate_workflow.user_id else "None"
workflow_uuid = initiate_workflow.workflow_uuid.hex
_from_server_subject = f"chat.client.messages.{user_id}.{workflow_uuid}"
_to_server_subject = f"chat.server.messages.{user_id}.{workflow_uuid}"
async def _setup_and_run() -> None:
await self._run_websocket_subscriber(
ui,
name,
user_id,
_from_server_subject,
_to_server_subject,
kwargs,
)
async def run_lifespan() -> None:
if not self.is_broker_running:
self.is_broker_running = True
await _setup_and_run()
else:
await _setup_and_run()
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(run_lifespan())
return "FastAPIWorkflows.run() completed"
|