def run_workflow(
*,
wf: Workflows,
ui: UI,
name: Optional[str],
initial_message: Optional[str] = None,
single_run: bool = False,
) -> None:
"""Run a workflow.
Args:
wf (Workflows): The workflows object to use.
ui (UI): The UI object to use.
name (Optional[str]): The name of the workflow to run. If not provided, the default workflow will be run.
initial_message (Optional[str], optional): The initial message to send to the workflow. If not provided, a default message will be sent. Defaults to None.
single_run (bool, optional): If True, the workflow will only be run once. Defaults to False.
"""
while True:
name = wf.names[0] if name is None else name
description = wf.get_description(name)
if initial_message is None:
initial_message = ui.process_message(
TextInput(
sender="FastAgency",
recipient="user",
prompt=(
f"Starting a new workflow '{name}' with the following description:"
+ "\n\n"
+ f"{description}"
+ "\n\nPlease enter an initial message"
),
)
)
else:
ui.process_message(
SystemMessage(
sender="FastAgency",
recipient="user",
message={
"body": (
f"Starting a new workflow '{name}' with the following description:"
+ "\n\n"
+ textwrap.indent(description, prefix=" " * 2)
+ "\n\nand using the following initial message:"
+ textwrap.indent(initial_message, prefix=" " * 2)
)
},
)
)
result = wf.run(
name=name,
session_id="session_id",
ui=ui.create_subconversation(),
initial_message="Hi!" if initial_message is None else initial_message,
)
ui.process_message(
WorkflowCompleted(
sender="workflow",
recipient="user",
result=result,
)
)
initial_message = None
if single_run:
break