Skip to content

run_workflow

fastagency.base.run_workflow #

run_workflow(
    wf: Workflows,
    ui: UI,
    name: Optional[str],
    initial_message: Optional[str] = None,
) -> None

Run a workflow.

PARAMETER DESCRIPTION
wf

The workflows object to use.

TYPE: Workflows

ui

The UI object to use.

TYPE: UI

name

The name of the workflow to run. If not provided, the default workflow will be run.

TYPE: Optional[str]

initial_message

The initial message to send to the workflow. If not provided, a default message will be sent. Defaults to None.

TYPE: Optional[str] DEFAULT: None

Source code in fastagency/base.py
def run_workflow(
    wf: Workflows,
    ui: UI,
    name: Optional[str],
    initial_message: Optional[str] = None,
) -> None:
    """Run a workflow.

    Args:
        wf (Workflows): The workflows object to use.
        ui (UI): The UI object to use.
        name (Optional[str]): The name of the workflow to run. If not provided, the default workflow will be run.
        initial_message (Optional[str], optional): The initial message to send to the workflow. If not provided, a default message will be sent. Defaults to None.
    """
    while True:
        name = wf.names[0] if name is None else name
        description = wf.get_description(name)

        if initial_message is None:
            initial_message = ui.process_message(
                TextInput(
                    sender="FastAgency",
                    recipient="user",
                    prompt=(
                        f"Starting a new workflow '{name}' with the following description:"
                        + "\n\n"
                        + f"{description}"
                        + "\n\nPlease enter an initial message"
                    ),
                )
            )
        else:
            ui.process_message(
                SystemMessage(
                    sender="FastAgency",
                    recipient="user",
                    message={
                        "body": (
                            f"Starting a new workflow '{name}' with the following description:"
                            + "\n\n"
                            + textwrap.indent(description, prefix=" " * 2)
                            + "\n\nand using the following initial message:"
                            + textwrap.indent(initial_message, prefix=" " * 2)
                        )
                    },
                )
            )

        result = wf.run(
            name=name,
            session_id="session_id",
            ui=ui.create_subconversation(),
            initial_message="Hi!" if initial_message is None else initial_message,
        )

        ui.process_message(
            WorkflowCompleted(
                sender="workflow",
                recipient="user",
                result=result,
            )
        )

        initial_message = None