FastAPIProvider (
fastapi_url : str ,
initiate_workflow_path : str = "/fastagency/initiate_workflow" ,
discovery_path : str = "/fastagency/discovery" ,
ws_path : str = "/fastagency/ws" ,
)
Bases: ProviderProtocol
Initialize the fastapi workflows.
Source code in fastagency/adapters/fastapi/base.py
def __init__ (
self ,
fastapi_url : str ,
initiate_workflow_path : str = "/fastagency/initiate_workflow" ,
discovery_path : str = "/fastagency/discovery" ,
ws_path : str = "/fastagency/ws" ,
) -> None :
"""Initialize the fastapi workflows."""
self . _workflows : dict [
str , tuple [ Callable [[ WorkflowsProtocol , UIBase , str , str ], str ], str ]
] = {}
self . fastapi_url = (
fastapi_url [: - 1 ] if fastapi_url . endswith ( "/" ) else fastapi_url
)
self . ws_url = "ws" + self . fastapi_url [ 4 :]
self . is_broker_running : bool = False
self . initiate_workflow_path = initiate_workflow_path
self . discovery_path = discovery_path
self . ws_path = ws_path
discovery_path instance-attribute
discovery_path = discovery_path
fastapi_url instance-attribute
fastapi_url = (
fastapi_url [: - 1 ] if endswith ( "/" ) else fastapi_url
)
initiate_workflow_path instance-attribute
initiate_workflow_path = initiate_workflow_path
is_broker_running instance-attribute
is_broker_running : bool = False
ws_path instance-attribute
ws_url instance-attribute
ws_url = 'ws' + fastapi_url [ 4 :]
get_description get_description ( name : str ) -> str
Source code in fastagency/adapters/fastapi/base.py
def get_description ( self , name : str ) -> str :
return self . _get_description ( name )
run Source code in fastagency/adapters/fastapi/base.py
def run (
self ,
name : str ,
ui : UI ,
user_id : Optional [ str ] = None ,
** kwargs : Any ,
) -> str :
workflow_uuid = ui . _workflow_uuid
initiate_workflow = self . _send_initiate_chat_msg (
name , workflow_uuid = workflow_uuid , user_id = user_id , params = kwargs
)
user_id = initiate_workflow . user_id if initiate_workflow . user_id else "None"
workflow_uuid = initiate_workflow . workflow_uuid . hex
_from_server_subject = f "chat.client.messages. { user_id } . { workflow_uuid } "
_to_server_subject = f "chat.server.messages. { user_id } . { workflow_uuid } "
async def _setup_and_run () -> None :
await self . _run_websocket_subscriber (
ui ,
name ,
user_id ,
_from_server_subject ,
_to_server_subject ,
kwargs ,
)
async def run_lifespan () -> None :
if not self . is_broker_running :
self . is_broker_running = True
await _setup_and_run ()
else :
await _setup_and_run ()
try :
loop = asyncio . get_event_loop ()
except RuntimeError :
loop = asyncio . new_event_loop ()
asyncio . set_event_loop ( loop )
loop . run_until_complete ( run_lifespan ())
return "FastAPIWorkflows.run() completed"