Bases: ProviderProtocol
Initialize the nats workflows.
PARAMETER | DESCRIPTION |
nats_url | The NATS URL. Defaults to None. TYPE: Optional[str] DEFAULT: None |
user | The user. Defaults to None. TYPE: Optional[str] DEFAULT: None |
password | The password. Defaults to None. TYPE: Optional[str] DEFAULT: None |
Source code in fastagency/adapters/nats/base.py
| def __init__(
self,
nats_url: Optional[str] = None,
user: Optional[str] = None,
password: Optional[str] = None,
) -> None:
"""Initialize the nats workflows.
Args:
nats_url (Optional[str], optional): The NATS URL. Defaults to None.
user (Optional[str], optional): The user. Defaults to None.
password (Optional[str], optional): The password. Defaults to None.
"""
self.nats_url = nats_url or "nats://localhost:4222"
self.user = user
self.password = password
self.broker = NatsBroker(self.nats_url, user=self.user, password=self.password)
self.app = FastStream(self.broker)
self._initiate_chat_subject: str = "chat.server.initiate_chat"
self.is_broker_running: bool = False
|
broker instance-attribute
broker = NatsBroker(nats_url, user=user, password=password)
is_broker_running instance-attribute
is_broker_running: bool = False
nats_url instance-attribute
nats_url = nats_url or 'nats://localhost:4222'
password instance-attribute
get_description
get_description(name: str) -> str
Source code in fastagency/adapters/nats/base.py
| def get_description(self, name: str) -> str:
description = asyncio.run(self._get_description(name))
logger.debug(f"Description: {description}")
# return "Student and teacher learning chat"
return description
|
run
Source code in fastagency/adapters/nats/base.py
| def run(
self,
name: str,
ui: UI,
user_id: Optional[str] = None,
**kwargs: Any,
) -> str:
# subscribe to whatever topic you need
# consume a message from the topic and call that visitor pattern (which is happening in NatsProvider)
workflow_uuid = ui._workflow_uuid
init_message = InitiateWorkflowModel(
user_id=user_id,
workflow_uuid=workflow_uuid,
params=kwargs,
name=name,
)
_from_server_subject = f"chat.client.messages.{user_id}.{workflow_uuid}"
_to_server_subject = f"chat.server.messages.{user_id}.{workflow_uuid}"
async def send_initiate_chat_msg() -> None:
await self.broker.publish(init_message, self._initiate_chat_subject)
logger.info("Initiate chat message sent")
@asynccontextmanager
async def lifespan() -> AsyncIterator[None]:
async with self.broker:
await self.broker.start()
logger.debug("Broker started")
try:
yield
finally:
await self.broker.close()
async def _setup_and_run() -> None:
await send_initiate_chat_msg()
await self._setup_subscriber(ui, _from_server_subject, _to_server_subject)
while True: # noqa: ASYNC110
await asyncio.sleep(0.1)
async def run_lifespan() -> None:
if not self.is_broker_running:
self.is_broker_running = True
async with lifespan():
await _setup_and_run()
else:
await _setup_and_run()
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(run_lifespan())
return "NatsWorkflows.run() completed"
|