def__init__(self,nats_url:Optional[str]=None,user:Optional[str]=None,password:Optional[str]=None,)->None:"""Initialize the nats workflows. Args: nats_url (Optional[str], optional): The NATS URL. Defaults to None. user (Optional[str], optional): The user. Defaults to None. password (Optional[str], optional): The password. Defaults to None. """self.nats_url=nats_urlor"nats://localhost:4222"self.user=userself.password=passwordself.broker=NatsBroker(self.nats_url,user=self.user,password=self.password)self.app=FastStream(self.broker)self._initiate_chat_subject:str="chat.server.initiate_chat"self.is_broker_running:bool=False
defrun(self,name:str,ui:UI,user_id:Optional[str]=None,**kwargs:Any,)->str:# subscribe to whatever topic you need# consume a message from the topic and call that visitor pattern (which is happening in NatsProvider)workflow_uuid=ui._workflow_uuidinit_message=InitiateWorkflowModel(user_id=user_id,workflow_uuid=workflow_uuid,params=kwargs,name=name,)_from_server_subject=f"chat.client.messages.{user_id}.{workflow_uuid}"_to_server_subject=f"chat.server.messages.{user_id}.{workflow_uuid}"asyncdefsend_initiate_chat_msg()->None:awaitself.broker.publish(init_message,self._initiate_chat_subject)logger.info("Initiate chat message sent")@asynccontextmanagerasyncdeflifespan()->AsyncIterator[None]:asyncwithself.broker:awaitself.broker.start()logger.debug("Broker started")try:yieldfinally:awaitself.broker.close()asyncdef_setup_and_run()->None:awaitsend_initiate_chat_msg()awaitself._setup_subscriber(ui,_from_server_subject,_to_server_subject)whileTrue:# noqa: ASYNC110awaitasyncio.sleep(0.1)asyncdefrun_lifespan()->None:ifnotself.is_broker_running:self.is_broker_running=Trueasyncwithlifespan():await_setup_and_run()else:await_setup_and_run()try:loop=asyncio.get_event_loop()exceptRuntimeError:loop=asyncio.new_event_loop()asyncio.set_event_loop(loop)loop.run_until_complete(run_lifespan())return"NatsWorkflows.run() completed"