Bases: Workflows
Initialize the workflows.
Source code in fastagency/runtime/autogen/base.py
| def __init__(self) -> None:
"""Initialize the workflows."""
self._workflows: dict[
str, tuple[Callable[[Workflows, UI, str, str], str], str]
] = {}
|
get_description
get_description(name: str) -> str
Source code in fastagency/runtime/autogen/base.py
| def get_description(self, name: str) -> str:
_, description = self._workflows.get(name, (None, "Description not available!"))
return description
|
register
register(
name: str,
description: str,
*,
fail_on_redefintion: bool = False
) -> Callable[[Workflow], Workflow]
Source code in fastagency/runtime/autogen/base.py
| def register(
self, name: str, description: str, *, fail_on_redefintion: bool = False
) -> Callable[[Workflow], Workflow]:
def decorator(func: Workflow) -> Workflow:
if name in self._workflows:
if fail_on_redefintion:
raise ValueError(f"A workflow with name '{name}' already exists.")
else:
logger.warning(f"Overwriting workflow with name '{name}'")
self._workflows[name] = func, description
return func
return decorator
|
register_api
register_api(
api: OpenAPI,
callers: Union[
ConversableAgent, Iterable[ConversableAgent]
],
executors: Union[
ConversableAgent, Iterable[ConversableAgent]
],
functions: Optional[
Union[
str,
Iterable[
Union[str, Mapping[str, Mapping[str, str]]]
],
]
] = None,
) -> None
Source code in fastagency/runtime/autogen/base.py
| def register_api(
self,
api: "OpenAPI",
callers: Union[ConversableAgent, Iterable[ConversableAgent]],
executors: Union[ConversableAgent, Iterable[ConversableAgent]],
functions: Optional[
Union[str, Iterable[Union[str, Mapping[str, Mapping[str, str]]]]]
] = None,
) -> None:
if not isinstance(callers, Iterable):
callers = [callers]
if not isinstance(executors, Iterable):
executors = [executors]
if isinstance(functions, str):
functions = [functions]
for caller in callers:
api._register_for_llm(caller, functions=functions)
for executor in executors:
api._register_for_execution(executor, functions=functions)
|
run
run(
name: str, session_id: str, ui: UI, initial_message: str
) -> str
Source code in fastagency/runtime/autogen/base.py
| def run(self, name: str, session_id: str, ui: UI, initial_message: str) -> str:
workflow, description = self._workflows[name]
iostream = IOStreamAdapter(ui)
with IOStream.set_default(iostream):
return workflow(self, ui, initial_message, session_id)
|