Bases: IOMessageVisitor
Initialize the console UI object.
PARAMETER | DESCRIPTION |
super_conversation | The super conversation. Defaults to None. TYPE: Optional[MesopUI] DEFAULT: None |
security_policy | The security policy. Defaults to None. TYPE: Optional[SecurityPolicy] DEFAULT: None |
styles | The styles. Defaults to None. TYPE: Optional[MesopHomePageStyles] DEFAULT: None |
Source code in fastagency/ui/mesop/base.py
| def __init__(
self,
super_conversation: "Optional[MesopUI]" = None,
*,
security_policy: Optional[me.SecurityPolicy] = None,
styles: Optional[MesopHomePageStyles] = None,
) -> None:
"""Initialize the console UI object.
Args:
super_conversation (Optional[MesopUI], optional): The super conversation. Defaults to None.
security_policy (Optional[me.SecurityPolicy], optional): The security policy. Defaults to None.
styles (Optional[MesopHomePageStyles], optional): The styles. Defaults to None.
"""
logger.info(f"Initializing MesopUI: {self}")
try:
self.id: str = uuid4().hex
self.super_conversation: Optional[MesopUI] = super_conversation
self.sub_conversations: list[MesopUI] = []
self._in_queue: Optional[Queue[str]] = None
self._out_queue: Optional[Queue[MesopMessage]] = None
if super_conversation is None:
self._in_queue = Queue()
self._out_queue = Queue()
MesopUI.register(self)
if MesopUI._me is None:
from .main import create_home_page, me
create_home_page(self, security_policy=security_policy, styles=styles)
MesopUI._me = me
except Exception as e:
logger.error(e, exc_info=True)
raise
logger.info(f"Initialized MesopUI: {self}")
|
is_root_conversation property
is_root_conversation: bool
root_conversation property
sub_conversations instance-attribute
super_conversation instance-attribute
create
Source code in fastagency/ui/mesop/base.py
| @contextmanager
def create(self, app: Runnable, import_string: str) -> Iterator[None]:
logger.info(f"Creating MesopUI with import string: {import_string}")
MesopUI._app = app
MesopUI._import_string = import_string
start_script = """import fastagency.ui.mesop.main"""
with TemporaryDirectory() as temp_dir:
main_path = Path(temp_dir) / "main.py"
with main_path.open("w") as f:
f.write(start_script)
MESOP_FLAGS.mark_as_parsed()
MesopUI._main_path = str(main_path)
MesopUI._created_instance = self
yield
|
create_subconversation
create_subconversation() -> MesopUI
Source code in fastagency/ui/mesop/base.py
| def create_subconversation(self) -> "MesopUI":
sub_conversation = MesopUI(self)
self.sub_conversations.append(sub_conversation)
return sub_conversation
|
get_conversation classmethod
Source code in fastagency/ui/mesop/base.py
| @classmethod
def get_conversation(cls, id: str) -> "MesopUI":
return cls._registry[id]
|
get_created_instance classmethod
Source code in fastagency/ui/mesop/base.py
| @classmethod
def get_created_instance(cls) -> "MesopUI":
created_instance = cls._created_instance
if created_instance is None:
raise RuntimeError("MesopUI has not been created yet.")
return created_instance
|
get_message_stream
Source code in fastagency/ui/mesop/base.py
| def get_message_stream(self) -> Generator[MesopMessage, None, None]:
while True:
message = self.out_queue.get()
if self._is_stream_braker(message.io_message):
yield message
break
yield message
|
handle_wsgi
Source code in fastagency/ui/mesop/base.py
| def handle_wsgi(
self,
app: "Runnable",
environ: dict[str, Any],
start_response: Callable[..., Any],
) -> list[bytes]:
logger.info(f"Starting MesopUI using WSGI interface with app: {app}")
MesopUI._created_instance = self
MesopUI._app = app
if MesopUI._me is None:
logger.error("MesopUI._me is None")
raise RuntimeError("MesopUI._me is None")
return MesopUI._me(environ, start_response) # type: ignore[no-any-return]
|
process_message
Source code in fastagency/ui/mesop/base.py
| def process_message(self, message: IOMessage) -> Optional[str]:
return self.visit(message)
|
register classmethod
register(conversation: MesopUI) -> None
Source code in fastagency/ui/mesop/base.py
| @classmethod
def register(cls, conversation: "MesopUI") -> None:
cls._registry[conversation.id] = conversation
|
respond
respond(message: str) -> None
Source code in fastagency/ui/mesop/base.py
| def respond(self, message: str) -> None:
self.in_queue.put(message)
|
respond_to classmethod
Source code in fastagency/ui/mesop/base.py
| @classmethod
def respond_to(
cls, conversation_id: str, message: str
) -> Generator[MesopMessage, None, None]:
conversation = cls.get_conversation(conversation_id)
conversation.respond(message)
return conversation.get_message_stream()
|
start
Source code in fastagency/ui/mesop/base.py
| def start(
self,
*,
app: Runnable,
import_string: str,
name: Optional[str] = None,
initial_message: Optional[str] = None,
single_run: bool = False,
) -> None:
logger.info(
f"Starting MesopUI: import_string={self._import_string}, main_path={self._main_path}"
)
if single_run:
logger.warning("single_run parameter is currently not supported in MesopUI")
MesopUI._app = app
mesop_main(["mesop", self._main_path])
|
unregister classmethod
unregister(conversation: MesopUI) -> None
Source code in fastagency/ui/mesop/base.py
| @classmethod
def unregister(cls, conversation: "MesopUI") -> None:
del cls._registry[conversation.id]
|
visit
Source code in fastagency/base.py
| def visit(self, message: IOMessage) -> Optional[str]:
method_name = f"visit_{message.type}"
method = getattr(self, method_name, self.visit_default)
return method(message)
|
visit_default
Source code in fastagency/ui/mesop/base.py
| def visit_default(self, message: IOMessage) -> None:
mesop_msg = self._mesop_message(message)
self._publish(mesop_msg)
|
visit_error
Source code in fastagency/base.py
| def visit_error(self, message: Error) -> Optional[str]:
return self.visit_default(message)
|
visit_function_call_execution
Source code in fastagency/base.py
| def visit_function_call_execution(
self, message: FunctionCallExecution
) -> Optional[str]:
return self.visit_default(message)
|
visit_multiple_choice
Source code in fastagency/ui/mesop/base.py
| def visit_multiple_choice(self, message: MultipleChoice) -> str:
mesop_msg = self._mesop_message(message)
self._publish(mesop_msg)
return self.in_queue.get()
|
visit_suggested_function_call
Source code in fastagency/base.py
| def visit_suggested_function_call(
self, message: SuggestedFunctionCall
) -> Optional[str]:
return self.visit_default(message)
|
visit_system_message
Source code in fastagency/base.py
| def visit_system_message(self, message: SystemMessage) -> Optional[str]:
return self.visit_default(message)
|
visit_text_input
Source code in fastagency/ui/mesop/base.py
| def visit_text_input(self, message: TextInput) -> str:
mesop_msg = self._mesop_message(message)
self._publish(mesop_msg)
return self.in_queue.get()
|
visit_text_message
Source code in fastagency/ui/mesop/base.py
| def visit_text_message(self, message: TextMessage) -> None:
mesop_msg = self._mesop_message(message)
self._publish(mesop_msg)
|
visit_workflow_completed
Source code in fastagency/base.py
| def visit_workflow_completed(self, message: WorkflowCompleted) -> Optional[str]:
return self.visit_default(message)
|