Bases: Workflows
Initialize the workflows.
Source code in fastagency/runtimes/autogen/base.py
| def __init__(self) -> None:
"""Initialize the workflows."""
self._workflows: dict[str, tuple[Callable[[UI, str, str], str], str]] = {}
|
get_description
get_description(name: str) -> str
Source code in fastagency/runtimes/autogen/base.py
| def get_description(self, name: str) -> str:
_, description = self._workflows.get(name, (None, "Description not available!"))
return description
|
register
register(
name: str,
description: str,
*,
fail_on_redefintion: bool = False
) -> Callable[[Workflow], Workflow]
Source code in fastagency/runtimes/autogen/base.py
| def register(
self, name: str, description: str, *, fail_on_redefintion: bool = False
) -> Callable[[Workflow], Workflow]:
def decorator(func: Workflow) -> Workflow:
if name in self._workflows:
if fail_on_redefintion:
raise ValueError(f"A workflow with name '{name}' already exists.")
else:
logger.warning(f"Overwriting workflow with name '{name}'")
self._workflows[name] = func, description
return func
return decorator
|
run
run(
name: str, session_id: str, ui: UI, initial_message: str
) -> str
Source code in fastagency/runtimes/autogen/base.py
| def run(self, name: str, session_id: str, ui: UI, initial_message: str) -> str:
workflow, description = self._workflows[name]
iostream = IOStreamAdapter(ui)
with IOStream.set_default(iostream):
return workflow(ui, initial_message, session_id)
|