Bases: MessageProcessorMixin
, CreateWorkflowUIMixin
Initialize the console UI object.
PARAMETER | DESCRIPTION |
super_conversation | The super conversation. Defaults to None. TYPE: Optional[MesopUI] DEFAULT: None |
security_policy | The security policy. Defaults to None. TYPE: Optional[SecurityPolicy] DEFAULT: None |
styles | The styles. Defaults to None. TYPE: Optional[MesopHomePageStyles] DEFAULT: None |
keep_alive | If keep alive messages should be inserted, defaults to False` TYPE: Optional[bool] DEFAULT: False |
auth | The auth settings to use. Defaults to None. TYPE: Optional[AuthProtocol] DEFAULT: None |
Source code in fastagency/ui/mesop/mesop.py
| def __init__(
self,
super_conversation: "Optional[MesopUI]" = None,
*,
security_policy: Optional[me.SecurityPolicy] = None,
styles: Optional[MesopHomePageStyles] = None,
keep_alive: Optional[bool] = False,
auth: Optional[AuthProtocol] = None,
) -> None:
"""Initialize the console UI object.
Args:
super_conversation (Optional[MesopUI], optional): The super conversation. Defaults to None.
security_policy (Optional[me.SecurityPolicy], optional): The security policy. Defaults to None.
styles (Optional[MesopHomePageStyles], optional): The styles. Defaults to None.
keep_alive (Optional[bool]): If keep alive messages should be inserted, defaults to False`
auth (Optional[AuthProtocol]): The auth settings to use. Defaults to None.
"""
logger.info(f"Initializing MesopUI: {self}")
try:
self.id: str = uuid4().hex
self.super_conversation: Optional[MesopUI] = super_conversation
self.sub_conversations: list[MesopUI] = []
self._in_queue: Optional[Queue[str]] = None
self._out_queue: Optional[Queue[MesopMessage]] = None
self._keep_me_alive = keep_alive
self._keep_alive_thread: Optional[threading.Thread] = None
if super_conversation is None:
self._in_queue = Queue()
self._out_queue = Queue()
self.keep_me_alive()
MesopUI.register(self)
if MesopUI._me is None:
from .main import create_home_page, me
create_home_page(
self, security_policy=security_policy, styles=styles, auth=auth
)
MesopUI._me = me
except Exception as e:
logger.error(e, exc_info=True)
raise
logger.info(f"Initialized MesopUI: {self}")
|
is_root_conversation property
is_root_conversation: bool
root_conversation property
sub_conversations instance-attribute
super_conversation instance-attribute
create
Source code in fastagency/ui/mesop/mesop.py
| @contextmanager
def create(self, app: Runnable, import_string: str) -> Iterator[None]:
logger.info(f"Creating MesopUI with import string: {import_string}")
MesopUI._app = app
MesopUI._import_string = import_string
start_script = """import fastagency.ui.mesop.main"""
with TemporaryDirectory() as temp_dir:
main_path = Path(temp_dir) / "main.py"
with main_path.open("w") as f:
f.write(start_script)
MESOP_FLAGS.mark_as_parsed()
MesopUI._main_path = str(main_path)
MesopUI._created_instance = self
yield
|
create_subconversation
create_subconversation() -> MesopUI
Source code in fastagency/ui/mesop/mesop.py
| def create_subconversation(self) -> "MesopUI":
sub_conversation = MesopUI(self)
self.sub_conversations.append(sub_conversation)
return sub_conversation
|
create_workflow_ui
create_workflow_ui(workflow_uuid: str) -> UI
Source code in fastagency/base.py
| def create_workflow_ui(self: UIBase, workflow_uuid: str) -> "UI":
return UI(uibase=self, workflow_uuid=workflow_uuid)
|
do_not_keep_me_alive
do_not_keep_me_alive() -> None
Source code in fastagency/ui/mesop/mesop.py
| def do_not_keep_me_alive(self) -> None:
self._keep_me_alive = False
|
error
Source code in fastagency/messages.py
| def error(
self,
workflow_uuid: str,
sender: Optional[str] = None,
recipient: Optional[str] = None,
auto_reply: bool = False,
uuid: Optional[str] = None,
short: Optional[str] = None,
long: Optional[str] = None,
) -> Optional[str]:
uuid = uuid or str(uuid4().hex)
return self.process_message(
Error(
sender=sender,
recipient=recipient,
auto_reply=auto_reply,
uuid=uuid,
workflow_uuid=workflow_uuid,
short=short,
long=long,
)
)
|
function_call_execution
Source code in fastagency/messages.py
| def function_call_execution(
self,
workflow_uuid: str,
sender: Optional[str] = None,
recipient: Optional[str] = None,
auto_reply: bool = False,
uuid: Optional[str] = None,
function_name: Optional[str] = None,
call_id: Optional[str] = None,
retval: Any = None,
) -> Optional[str]:
uuid = uuid or str(uuid4().hex)
return self.process_message(
FunctionCallExecution(
sender=sender,
recipient=recipient,
auto_reply=auto_reply,
uuid=uuid,
workflow_uuid=workflow_uuid,
function_name=function_name,
call_id=call_id,
retval=retval,
)
)
|
get_conversation classmethod
Source code in fastagency/ui/mesop/mesop.py
| @classmethod
def get_conversation(cls, id: str) -> "MesopUI":
return cls._registry[id]
|
get_created_instance classmethod
Source code in fastagency/ui/mesop/mesop.py
| @classmethod
def get_created_instance(cls) -> "MesopUI":
created_instance = cls._created_instance
if created_instance is None:
raise RuntimeError("MesopUI has not been created yet.")
return created_instance
|
get_message_stream
Source code in fastagency/ui/mesop/mesop.py
| def get_message_stream(self) -> Generator[MesopMessage, None, None]:
while True:
message = self.out_queue.get()
if self._is_stream_braker(message.io_message):
yield message
break
yield message
|
handle_wsgi
Source code in fastagency/ui/mesop/mesop.py
| def handle_wsgi(
self,
app: "Runnable",
environ: dict[str, Any],
start_response: Callable[..., Any],
) -> list[bytes]:
logger.debug(f"Starting MesopUI using WSGI interface with app: {app}")
MesopUI._created_instance = self
MesopUI._app = app
if configure_static_file_serving is None: # pragme: no cover
logger.error("configure_static_file_serving is None")
if MesopUI._me is None:
logger.error("MesopUI._me is None")
raise RuntimeError("MesopUI._me is None")
return MesopUI._me(environ, start_response) # type: ignore[no-any-return]
|
keep_alive
Source code in fastagency/messages.py
| def keep_alive(
self,
workflow_uuid: str,
sender: Optional[str] = None,
recipient: Optional[str] = None,
auto_reply: bool = False,
uuid: Optional[str] = None,
) -> Optional[str]:
uuid = uuid or str(uuid4().hex)
return self.process_message(
KeepAlive(
sender=sender,
recipient=recipient,
auto_reply=auto_reply,
uuid=uuid,
workflow_uuid=workflow_uuid,
)
)
|
keep_me_alive
Source code in fastagency/ui/mesop/mesop.py
| def keep_me_alive(self) -> None:
def keep_alive_worker() -> None:
while self._keep_me_alive:
time.sleep(3)
if self._out_queue:
# todo: do something more elegant
msg = KeepAlive(workflow_uuid="")
mesop_msg = self._mesop_message(msg)
logger.debug(f"putting keepalive {msg.uuid}")
self._out_queue.put(mesop_msg)
if self._keep_me_alive and self._keep_alive_thread is None:
self._keep_alive_thread = threading.Thread(target=keep_alive_worker)
self._keep_alive_thread.start()
|
multiple_choice
Source code in fastagency/messages.py
| def multiple_choice(
self,
workflow_uuid: str,
sender: Optional[str] = None,
recipient: Optional[str] = None,
auto_reply: bool = False,
uuid: Optional[str] = None,
prompt: Optional[str] = None,
choices: Optional[list[str]] = None,
default: Optional[str] = None,
single: bool = True,
) -> Optional[str]:
uuid = uuid or str(uuid4().hex)
choices = choices or []
return self.process_message(
MultipleChoice(
sender=sender,
recipient=recipient,
auto_reply=auto_reply,
uuid=uuid,
workflow_uuid=workflow_uuid,
prompt=prompt,
choices=choices,
default=default,
single=single,
)
)
|
process_message
Source code in fastagency/ui/mesop/mesop.py
| def process_message(self, message: IOMessage) -> Optional[str]:
return self.visit(message)
|
register classmethod
register(conversation: MesopUI) -> None
Source code in fastagency/ui/mesop/mesop.py
| @classmethod
def register(cls, conversation: "MesopUI") -> None:
cls._registry[conversation.id] = conversation
|
respond
respond(message: str) -> None
Source code in fastagency/ui/mesop/mesop.py
| def respond(self, message: str) -> None:
self.in_queue.put(message)
|
respond_to classmethod
Source code in fastagency/ui/mesop/mesop.py
| @classmethod
def respond_to(
cls, conversation_id: str, message: str
) -> Generator[MesopMessage, None, None]:
conversation = cls.get_conversation(conversation_id)
conversation.respond(message)
return conversation.get_message_stream()
|
start
Source code in fastagency/ui/mesop/mesop.py
| def start(
self,
*,
app: Runnable,
import_string: str,
name: Optional[str] = None,
params: dict[str, Any],
single_run: bool = False,
) -> None:
logger.info(
f"Starting MesopUI: import_string={self._import_string}, main_path={self._main_path}"
)
if single_run:
logger.warning("single_run parameter is currently not supported in MesopUI")
MesopUI._app = app
mesop_main(["mesop", self._main_path])
|
suggested_function_call
Source code in fastagency/messages.py
| def suggested_function_call(
self,
workflow_uuid: str,
sender: Optional[str] = None,
recipient: Optional[str] = None,
auto_reply: bool = False,
uuid: Optional[str] = None,
function_name: Optional[str] = None,
call_id: Optional[str] = None,
arguments: Optional[dict[str, Any]] = None,
) -> Optional[str]:
uuid = uuid or str(uuid4().hex)
arguments = arguments or {}
return self.process_message(
SuggestedFunctionCall(
sender=sender,
recipient=recipient,
auto_reply=auto_reply,
uuid=uuid,
workflow_uuid=workflow_uuid,
function_name=function_name,
call_id=call_id,
arguments=arguments,
)
)
|
system_message
Source code in fastagency/messages.py
| def system_message(
self,
workflow_uuid: str,
sender: Optional[str] = None,
recipient: Optional[str] = None,
auto_reply: bool = False,
uuid: Optional[str] = None,
message: Optional[dict[str, Any]] = None,
) -> Optional[str]:
uuid = uuid or str(uuid4().hex)
message = message or {}
return self.process_message(
SystemMessage(
sender=sender,
recipient=recipient,
auto_reply=auto_reply,
uuid=uuid,
workflow_uuid=workflow_uuid,
message=message,
)
)
|
text_input
Source code in fastagency/messages.py
| def text_input(
self,
workflow_uuid: str,
sender: Optional[str] = None,
recipient: Optional[str] = None,
auto_reply: bool = False,
uuid: Optional[str] = None,
prompt: Optional[str] = None,
suggestions: Optional[list[str]] = None,
password: bool = False,
) -> Optional[str]:
uuid = uuid or str(uuid4().hex)
suggestions = suggestions or []
return self.process_message(
TextInput(
sender=sender,
recipient=recipient,
auto_reply=auto_reply,
uuid=uuid,
workflow_uuid=workflow_uuid,
prompt=prompt,
suggestions=suggestions,
password=password,
)
)
|
text_message
Source code in fastagency/messages.py
| def text_message(
self,
workflow_uuid: str,
sender: Optional[str] = None,
recipient: Optional[str] = None,
auto_reply: bool = False,
uuid: Optional[str] = None,
body: Optional[str] = None,
) -> Optional[str]:
uuid = uuid or str(uuid4().hex)
return self.process_message(
TextMessage(
sender=sender,
recipient=recipient,
auto_reply=auto_reply,
uuid=uuid,
workflow_uuid=workflow_uuid,
body=body,
)
)
|
unregister classmethod
unregister(conversation: MesopUI) -> None
Source code in fastagency/ui/mesop/mesop.py
| @classmethod
def unregister(cls, conversation: "MesopUI") -> None:
del cls._registry[conversation.id]
|
visit
Source code in fastagency/messages.py
| def visit(self, message: IOMessage) -> Optional[str]:
method_name = f"visit_{message.type}"
method = getattr(self, method_name, self.visit_default)
return method(message)
|
visit_default
Source code in fastagency/ui/mesop/mesop.py
| def visit_default(self, message: IOMessage) -> None:
mesop_msg = self._mesop_message(message)
self._publish(mesop_msg)
|
visit_error
Source code in fastagency/messages.py
| def visit_error(self, message: Error) -> Optional[str]:
return self.visit_default(message)
|
visit_function_call_execution
Source code in fastagency/messages.py
| def visit_function_call_execution(
self, message: FunctionCallExecution
) -> Optional[str]:
return self.visit_default(message)
|
visit_keep_alive
Source code in fastagency/messages.py
| def visit_keep_alive(self, message: KeepAlive) -> Optional[str]:
return self.visit_default(message)
|
visit_multiple_choice
Source code in fastagency/ui/mesop/mesop.py
| def visit_multiple_choice(self, message: MultipleChoice) -> str:
mesop_msg = self._mesop_message(message)
self._publish(mesop_msg)
return self.in_queue.get()
|
visit_suggested_function_call
Source code in fastagency/messages.py
| def visit_suggested_function_call(
self, message: SuggestedFunctionCall
) -> Optional[str]:
return self.visit_default(message)
|
visit_system_message
Source code in fastagency/messages.py
| def visit_system_message(self, message: SystemMessage) -> Optional[str]:
return self.visit_default(message)
|
visit_text_input
Source code in fastagency/ui/mesop/mesop.py
| def visit_text_input(self, message: TextInput) -> str:
mesop_msg = self._mesop_message(message)
self._publish(mesop_msg)
return self.in_queue.get()
|
visit_text_message
Source code in fastagency/ui/mesop/mesop.py
| def visit_text_message(self, message: TextMessage) -> None:
mesop_msg = self._mesop_message(message)
self._publish(mesop_msg)
|
visit_workflow_completed
Source code in fastagency/messages.py
| def visit_workflow_completed(self, message: WorkflowCompleted) -> Optional[str]:
return self.visit_default(message)
|
visit_workflow_started
Source code in fastagency/messages.py
| def visit_workflow_started(self, message: WorkflowStarted) -> Optional[str]:
return self.visit_default(message)
|
workflow_completed
Source code in fastagency/messages.py
| def workflow_completed(
self,
workflow_uuid: str,
sender: Optional[str] = None,
recipient: Optional[str] = None,
auto_reply: bool = False,
uuid: Optional[str] = None,
result: Optional[str] = None,
) -> Optional[str]:
uuid = uuid or str(uuid4().hex)
return self.process_message(
WorkflowCompleted(
sender=sender,
recipient=recipient,
auto_reply=auto_reply,
uuid=uuid,
workflow_uuid=workflow_uuid,
result=result,
)
)
|
workflow_started
Source code in fastagency/messages.py
| def workflow_started(
self,
workflow_uuid: str,
sender: Optional[str] = None,
recipient: Optional[str] = None,
auto_reply: bool = False,
uuid: Optional[str] = None,
name: Optional[str] = None,
description: Optional[str] = None,
params: Optional[dict[str, Any]] = None,
) -> Optional[str]:
uuid = uuid or str(uuid4().hex)
params = params or {}
return self.process_message(
WorkflowStarted(
sender=sender,
recipient=recipient,
auto_reply=auto_reply,
uuid=uuid,
workflow_uuid=workflow_uuid,
name=name,
description=description,
params=params,
)
)
|