Bases: MessageProcessorMixin
, CreateWorkflowUIMixin
Initialize the console UI object.
Source code in fastagency/ui/mesop/mesop.py
def __init__ (
self ,
super_conversation : "Optional[MesopUI]" = None ,
* ,
security_policy : Optional [ me . SecurityPolicy ] = None ,
styles : Optional [ MesopHomePageStyles ] = None ,
keep_alive : Optional [ bool ] = False ,
auth : Optional [ AuthProtocol ] = None ,
) -> None :
"""Initialize the console UI object.
Args:
super_conversation (Optional[MesopUI], optional): The super conversation. Defaults to None.
security_policy (Optional[me.SecurityPolicy], optional): The security policy. Defaults to None.
styles (Optional[MesopHomePageStyles], optional): The styles. Defaults to None.
keep_alive (Optional[bool]): If keep alive messages should be inserted, defaults to False`
auth (Optional[AuthProtocol]): The auth settings to use. Defaults to None.
"""
logger . info ( f "Initializing MesopUI: { self } " )
try :
self . id : str = uuid4 () . hex
self . super_conversation : Optional [ MesopUI ] = super_conversation
self . sub_conversations : list [ MesopUI ] = []
self . _in_queue : Optional [ Queue [ str ]] = None
self . _out_queue : Optional [ Queue [ MesopMessage ]] = None
self . _keep_me_alive = keep_alive
self . _keep_alive_thread : Optional [ threading . Thread ] = None
if super_conversation is None :
self . _in_queue = Queue ()
self . _out_queue = Queue ()
self . keep_me_alive ()
MesopUI . register ( self )
if MesopUI . _me is None :
from .main import create_home_page , me
create_home_page (
self , security_policy = security_policy , styles = styles , auth = auth
)
MesopUI . _me = me
except Exception as e :
logger . error ( e , exc_info = True )
raise
logger . info ( f "Initialized MesopUI: { self } " )
is_root_conversation property
is_root_conversation : bool
root_conversation property
sub_conversations instance-attribute
super_conversation instance-attribute
create Source code in fastagency/ui/mesop/mesop.py
@contextmanager
def create ( self , app : Runnable , import_string : str ) -> Iterator [ None ]:
logger . info ( f "Creating MesopUI with import string: { import_string } " )
MesopUI . _app = app
MesopUI . _import_string = import_string
start_script = """import fastagency.ui.mesop.main"""
with TemporaryDirectory () as temp_dir :
main_path = Path ( temp_dir ) / "main.py"
with main_path . open ( "w" ) as f :
f . write ( start_script )
MESOP_FLAGS . mark_as_parsed ()
MesopUI . _main_path = str ( main_path )
MesopUI . _created_instance = self
yield
create_subconversation create_subconversation () -> MesopUI
Source code in fastagency/ui/mesop/mesop.py
def create_subconversation ( self ) -> "MesopUI" :
sub_conversation = MesopUI ( self )
self . sub_conversations . append ( sub_conversation )
return sub_conversation
create_workflow_ui create_workflow_ui ( workflow_uuid : str ) -> UI
Source code in fastagency/base.py
def create_workflow_ui ( self : UIBase , workflow_uuid : str ) -> "UI" :
return UI ( uibase = self , workflow_uuid = workflow_uuid )
do_not_keep_me_alive do_not_keep_me_alive () -> None
Source code in fastagency/ui/mesop/mesop.py
def do_not_keep_me_alive ( self ) -> None :
self . _keep_me_alive = False
error Source code in fastagency/messages.py
def error (
self ,
workflow_uuid : str ,
sender : Optional [ str ] = None ,
recipient : Optional [ str ] = None ,
auto_reply : bool = False ,
uuid : Optional [ str ] = None ,
short : Optional [ str ] = None ,
long : Optional [ str ] = None ,
) -> Optional [ str ]:
uuid = uuid or str ( uuid4 () . hex )
return self . process_message (
Error (
sender = sender ,
recipient = recipient ,
auto_reply = auto_reply ,
uuid = uuid ,
workflow_uuid = workflow_uuid ,
short = short ,
long = long ,
)
)
function_call_execution Source code in fastagency/messages.py
def function_call_execution (
self ,
workflow_uuid : str ,
sender : Optional [ str ] = None ,
recipient : Optional [ str ] = None ,
auto_reply : bool = False ,
uuid : Optional [ str ] = None ,
function_name : Optional [ str ] = None ,
call_id : Optional [ str ] = None ,
retval : Any = None ,
) -> Optional [ str ]:
uuid = uuid or str ( uuid4 () . hex )
return self . process_message (
FunctionCallExecution (
sender = sender ,
recipient = recipient ,
auto_reply = auto_reply ,
uuid = uuid ,
workflow_uuid = workflow_uuid ,
function_name = function_name ,
call_id = call_id ,
retval = retval ,
)
)
get_conversation classmethod
Source code in fastagency/ui/mesop/mesop.py
@classmethod
def get_conversation ( cls , id : str ) -> "MesopUI" :
return cls . _registry [ id ]
get_created_instance classmethod
Source code in fastagency/ui/mesop/mesop.py
@classmethod
def get_created_instance ( cls ) -> "MesopUI" :
created_instance = cls . _created_instance
if created_instance is None :
raise RuntimeError ( "MesopUI has not been created yet." )
return created_instance
get_message_stream Source code in fastagency/ui/mesop/mesop.py
def get_message_stream ( self ) -> Generator [ MesopMessage , None , None ]:
while True :
message = self . out_queue . get ()
if self . _is_stream_braker ( message . io_message ):
yield message
break
yield message
handle_wsgi Source code in fastagency/ui/mesop/mesop.py
def handle_wsgi (
self ,
app : "Runnable" ,
environ : dict [ str , Any ],
start_response : Callable [ ... , Any ],
) -> list [ bytes ]:
logger . debug ( f "Starting MesopUI using WSGI interface with app: { app } " )
MesopUI . _created_instance = self
MesopUI . _app = app
if configure_static_file_serving is None : # pragme: no cover
logger . error ( "configure_static_file_serving is None" )
if MesopUI . _me is None :
logger . error ( "MesopUI._me is None" )
raise RuntimeError ( "MesopUI._me is None" )
return MesopUI . _me ( environ , start_response ) # type: ignore[no-any-return]
keep_alive Source code in fastagency/messages.py
def keep_alive (
self ,
workflow_uuid : str ,
sender : Optional [ str ] = None ,
recipient : Optional [ str ] = None ,
auto_reply : bool = False ,
uuid : Optional [ str ] = None ,
) -> Optional [ str ]:
uuid = uuid or str ( uuid4 () . hex )
return self . process_message (
KeepAlive (
sender = sender ,
recipient = recipient ,
auto_reply = auto_reply ,
uuid = uuid ,
workflow_uuid = workflow_uuid ,
)
)
keep_me_alive Source code in fastagency/ui/mesop/mesop.py
def keep_me_alive ( self ) -> None :
def keep_alive_worker () -> None :
while self . _keep_me_alive :
time . sleep ( 3 )
if self . _out_queue :
# todo: do something more elegant
msg = KeepAlive ( workflow_uuid = "" )
mesop_msg = self . _mesop_message ( msg )
logger . debug ( f "putting keepalive { msg . uuid } " )
self . _out_queue . put ( mesop_msg )
if self . _keep_me_alive and self . _keep_alive_thread is None :
self . _keep_alive_thread = threading . Thread ( target = keep_alive_worker )
self . _keep_alive_thread . start ()
multiple_choice Source code in fastagency/messages.py
def multiple_choice (
self ,
workflow_uuid : str ,
sender : Optional [ str ] = None ,
recipient : Optional [ str ] = None ,
auto_reply : bool = False ,
uuid : Optional [ str ] = None ,
prompt : Optional [ str ] = None ,
choices : Optional [ list [ str ]] = None ,
default : Optional [ str ] = None ,
single : bool = True ,
) -> Optional [ str ]:
uuid = uuid or str ( uuid4 () . hex )
choices = choices or []
return self . process_message (
MultipleChoice (
sender = sender ,
recipient = recipient ,
auto_reply = auto_reply ,
uuid = uuid ,
workflow_uuid = workflow_uuid ,
prompt = prompt ,
choices = choices ,
default = default ,
single = single ,
)
)
process_message Source code in fastagency/ui/mesop/mesop.py
def process_message ( self , message : IOMessage ) -> Optional [ str ]:
return self . visit ( message )
register classmethod
register ( conversation : MesopUI ) -> None
Source code in fastagency/ui/mesop/mesop.py
@classmethod
def register ( cls , conversation : "MesopUI" ) -> None :
cls . _registry [ conversation . id ] = conversation
respond respond ( message : str ) -> None
Source code in fastagency/ui/mesop/mesop.py
def respond ( self , message : str ) -> None :
self . in_queue . put ( message )
respond_to classmethod
Source code in fastagency/ui/mesop/mesop.py
@classmethod
def respond_to (
cls , conversation_id : str , message : str
) -> Generator [ MesopMessage , None , None ]:
conversation = cls . get_conversation ( conversation_id )
conversation . respond ( message )
return conversation . get_message_stream ()
start Source code in fastagency/ui/mesop/mesop.py
def start (
self ,
* ,
app : Runnable ,
import_string : str ,
name : Optional [ str ] = None ,
params : dict [ str , Any ],
single_run : bool = False ,
) -> None :
logger . info (
f "Starting MesopUI: import_string= { self . _import_string } , main_path= { self . _main_path } "
)
if single_run :
logger . warning ( "single_run parameter is currently not supported in MesopUI" )
MesopUI . _app = app
mesop_main ([ "mesop" , self . _main_path ])
suggested_function_call Source code in fastagency/messages.py
def suggested_function_call (
self ,
workflow_uuid : str ,
sender : Optional [ str ] = None ,
recipient : Optional [ str ] = None ,
auto_reply : bool = False ,
uuid : Optional [ str ] = None ,
function_name : Optional [ str ] = None ,
call_id : Optional [ str ] = None ,
arguments : Optional [ dict [ str , Any ]] = None ,
) -> Optional [ str ]:
uuid = uuid or str ( uuid4 () . hex )
arguments = arguments or {}
return self . process_message (
SuggestedFunctionCall (
sender = sender ,
recipient = recipient ,
auto_reply = auto_reply ,
uuid = uuid ,
workflow_uuid = workflow_uuid ,
function_name = function_name ,
call_id = call_id ,
arguments = arguments ,
)
)
system_message Source code in fastagency/messages.py
def system_message (
self ,
workflow_uuid : str ,
sender : Optional [ str ] = None ,
recipient : Optional [ str ] = None ,
auto_reply : bool = False ,
uuid : Optional [ str ] = None ,
message : Optional [ dict [ str , Any ]] = None ,
) -> Optional [ str ]:
uuid = uuid or str ( uuid4 () . hex )
message = message or {}
return self . process_message (
SystemMessage (
sender = sender ,
recipient = recipient ,
auto_reply = auto_reply ,
uuid = uuid ,
workflow_uuid = workflow_uuid ,
message = message ,
)
)
text_input Source code in fastagency/messages.py
def text_input (
self ,
workflow_uuid : str ,
sender : Optional [ str ] = None ,
recipient : Optional [ str ] = None ,
auto_reply : bool = False ,
uuid : Optional [ str ] = None ,
prompt : Optional [ str ] = None ,
suggestions : Optional [ list [ str ]] = None ,
password : bool = False ,
) -> Optional [ str ]:
uuid = uuid or str ( uuid4 () . hex )
suggestions = suggestions or []
return self . process_message (
TextInput (
sender = sender ,
recipient = recipient ,
auto_reply = auto_reply ,
uuid = uuid ,
workflow_uuid = workflow_uuid ,
prompt = prompt ,
suggestions = suggestions ,
password = password ,
)
)
text_message Source code in fastagency/messages.py
def text_message (
self ,
workflow_uuid : str ,
sender : Optional [ str ] = None ,
recipient : Optional [ str ] = None ,
auto_reply : bool = False ,
uuid : Optional [ str ] = None ,
body : Optional [ str ] = None ,
) -> Optional [ str ]:
uuid = uuid or str ( uuid4 () . hex )
return self . process_message (
TextMessage (
sender = sender ,
recipient = recipient ,
auto_reply = auto_reply ,
uuid = uuid ,
workflow_uuid = workflow_uuid ,
body = body ,
)
)
unregister classmethod
unregister ( conversation : MesopUI ) -> None
Source code in fastagency/ui/mesop/mesop.py
@classmethod
def unregister ( cls , conversation : "MesopUI" ) -> None :
del cls . _registry [ conversation . id ]
visit Source code in fastagency/messages.py
def visit ( self , message : IOMessage ) -> Optional [ str ]:
method_name = f "visit_ { message . type } "
method = getattr ( self , method_name , self . visit_default )
return method ( message )
visit_default Source code in fastagency/ui/mesop/mesop.py
def visit_default ( self , message : IOMessage ) -> None :
mesop_msg = self . _mesop_message ( message )
self . _publish ( mesop_msg )
visit_error Source code in fastagency/messages.py
def visit_error ( self , message : Error ) -> Optional [ str ]:
return self . visit_default ( message )
visit_execute_function visit_execute_function (
message : ExecuteFunctionEvent ,
) -> None
Source code in fastagency/ui/mesop/mesop.py
def visit_execute_function ( self , message : "ExecuteFunctionEvent" ) -> None :
mesop_msg = self . _mesop_message ( message )
self . _publish ( mesop_msg )
visit_function_call_execution Source code in fastagency/messages.py
def visit_function_call_execution (
self , message : FunctionCallExecution
) -> Optional [ str ]:
return self . visit_default ( message )
visit_input_request ( message : InputRequestEvent ) -> str
Source code in fastagency/ui/mesop/mesop.py
def visit_input_request ( self , message : "InputRequestEvent" ) -> str :
mesop_msg = self . _mesop_message ( message )
self . _publish ( mesop_msg )
return self . in_queue . get ()
visit_keep_alive Source code in fastagency/messages.py
def visit_keep_alive ( self , message : KeepAlive ) -> Optional [ str ]:
return self . visit_default ( message )
visit_multiple_choice Source code in fastagency/ui/mesop/mesop.py
def visit_multiple_choice ( self , message : MultipleChoice ) -> str :
mesop_msg = self . _mesop_message ( message )
self . _publish ( mesop_msg )
return self . in_queue . get ()
visit_run_completion visit_run_completion ( message : RunCompletionEvent ) -> None
Source code in fastagency/ui/mesop/mesop.py
def visit_run_completion ( self , message : "RunCompletionEvent" ) -> None :
# We can ignore the RunCompletionEvent as we handle RunResponse already
pass
visit_suggested_function_call Source code in fastagency/messages.py
def visit_suggested_function_call (
self , message : SuggestedFunctionCall
) -> Optional [ str ]:
return self . visit_default ( message )
visit_system_message Source code in fastagency/messages.py
def visit_system_message ( self , message : SystemMessage ) -> Optional [ str ]:
return self . visit_default ( message )
visit_termination visit_termination ( message : TerminationEvent ) -> None
Source code in fastagency/ui/mesop/mesop.py
def visit_termination ( self , message : "TerminationEvent" ) -> None :
pass
visit_text visit_text ( message : TextEvent ) -> None
Source code in fastagency/ui/mesop/mesop.py
def visit_text ( self , message : "TextEvent" ) -> None :
mesop_msg = self . _mesop_message ( message )
self . _publish ( mesop_msg )
visit_text_input Source code in fastagency/ui/mesop/mesop.py
def visit_text_input ( self , message : TextInput ) -> str :
mesop_msg = self . _mesop_message ( message )
self . _publish ( mesop_msg )
return self . in_queue . get ()
visit_text_message Source code in fastagency/ui/mesop/mesop.py
def visit_text_message ( self , message : TextMessage ) -> None :
mesop_msg = self . _mesop_message ( message )
self . _publish ( mesop_msg )
visit_using_auto_reply visit_using_auto_reply (
message : UsingAutoReplyEvent ,
) -> None
Source code in fastagency/ui/mesop/mesop.py
def visit_using_auto_reply ( self , message : "UsingAutoReplyEvent" ) -> None :
pass
visit_workflow_completed Source code in fastagency/messages.py
def visit_workflow_completed ( self , message : WorkflowCompleted ) -> Optional [ str ]:
return self . visit_default ( message )
visit_workflow_started Source code in fastagency/messages.py
def visit_workflow_started ( self , message : WorkflowStarted ) -> Optional [ str ]:
return self . visit_default ( message )
workflow_completed Source code in fastagency/messages.py
def workflow_completed (
self ,
workflow_uuid : str ,
sender : Optional [ str ] = None ,
recipient : Optional [ str ] = None ,
auto_reply : bool = False ,
uuid : Optional [ str ] = None ,
result : Optional [ str ] = None ,
) -> Optional [ str ]:
uuid = uuid or str ( uuid4 () . hex )
return self . process_message (
WorkflowCompleted (
sender = sender ,
recipient = recipient ,
auto_reply = auto_reply ,
uuid = uuid ,
workflow_uuid = workflow_uuid ,
result = result ,
)
)
workflow_started Source code in fastagency/messages.py
def workflow_started (
self ,
workflow_uuid : str ,
sender : Optional [ str ] = None ,
recipient : Optional [ str ] = None ,
auto_reply : bool = False ,
uuid : Optional [ str ] = None ,
name : Optional [ str ] = None ,
description : Optional [ str ] = None ,
params : Optional [ dict [ str , Any ]] = None ,
) -> Optional [ str ]:
uuid = uuid or str ( uuid4 () . hex )
params = params or {}
return self . process_message (
WorkflowStarted (
sender = sender ,
recipient = recipient ,
auto_reply = auto_reply ,
uuid = uuid ,
workflow_uuid = workflow_uuid ,
name = name ,
description = description ,
params = params ,
)
)