Skip to content

AutoGenWorkflows

fastagency.runtimes.autogen.AutoGenWorkflows #

AutoGenWorkflows()

Bases: Workflows

Initialize the workflows.

Source code in fastagency/runtimes/autogen/base.py
def __init__(self) -> None:
    """Initialize the workflows."""
    self._workflows: dict[str, tuple[Callable[[UI, str, str], str], str]] = {}

names property #

names: list[str]

get_description #

get_description(name: str) -> str
Source code in fastagency/runtimes/autogen/base.py
def get_description(self, name: str) -> str:
    _, description = self._workflows.get(name, (None, "Description not available!"))
    return description

register #

register(
    name: str,
    description: str,
    *,
    fail_on_redefintion: bool = False
) -> Callable[[Workflow], Workflow]
Source code in fastagency/runtimes/autogen/base.py
def register(
    self, name: str, description: str, *, fail_on_redefintion: bool = False
) -> Callable[[Workflow], Workflow]:
    def decorator(func: Workflow) -> Workflow:
        if name in self._workflows:
            if fail_on_redefintion:
                raise ValueError(f"A workflow with name '{name}' already exists.")
            else:
                logger.warning(f"Overwriting workflow with name '{name}'")

        self._workflows[name] = func, description
        return func

    return decorator

run #

run(
    name: str, session_id: str, ui: UI, initial_message: str
) -> str
Source code in fastagency/runtimes/autogen/base.py
def run(self, name: str, session_id: str, ui: UI, initial_message: str) -> str:
    workflow, description = self._workflows[name]

    iostream = IOStreamAdapter(ui)

    with IOStream.set_default(iostream):
        return workflow(ui, initial_message, session_id)