The Notification Hub¶
Overview¶
The Notification Hub serves as a centralized event distribution system for the core services and control servers. Instead of having services directly subscribe to each other, all services publish their events to the hub, which then redistributes them to interested subscribers via ZeroMQ PUB-SUB sockets.
Benefits¶
- Decoupling: Services don't need to know about each other directly
- Scalability: Easy to add new subscribers without modifying existing services
- Reliability: Single point of event distribution with consistent delivery
- Simplicity: One subscription endpoint for all system events
Socket Endpoints¶
The Notification Hub exposes three ZeroMQ sockets:
Purpose | Pattern | Your Socket | Endpoint |
---|---|---|---|
Event Publishing | PUSH-PULL | PUSH | tcp://notification-hub:4245 |
Event Subscription | PUB-SUB | SUB | tcp://notification-hub:4246 |
Health Checks | ROUTER-DEALER | DEALER | tcp://notification-hub:4247 |
Note: port numbers can be changed in the local settings YAML file.
Events¶
from egse.notifyhub.event import NotificationEvent
event = NotificationEvent(
event_type="new_setup",
source_service="cm_cs",
data={"setup_id": "0001234"},
)
NotificationEvent
also has a timestamp
and a correlation_id
associated. These fields are automatically filled on creation of the event
and must not be provided. An event can be converted into a plain dictionary
with the .as_dict()
method.
>>> print(event.as_dict())
{
'event_type': 'new_setup',
'source_service': 'cm_cs',
'data': {'setup_id': '0001234'},
'timestamp': 1756709627.25942,
'correlation_id': '23b44f16-34f9-498a-85f7-5c57658ea363'
}
Publishing Events¶
Any core service, control server, or script can publish events to the notification hub using the EventPublisher class. For convenience, EventPublisher supports usage as a context manager, making it easy to send single events.
from egse.notifyhub.services import EventPublisher
with EventPublisher() as publisher:
publisher.publish(event)
from egse.notifyhub.services import AsyncEventPublisher
async with AsyncEventPublisher() as publisher:
await publisher.publish(event)
Subscribing to Events¶
Listen to events from the notification hub with the EventSubscriber class. The usage for this class is quite different for synchronous and asynchronous contexts.
In a synchronous context, you will need to add code to poll the socket
and handle the event inside your own event loop. The subscriber socket
can be retrieved with the subscriber.socket
property, you can then add
the socket to a Poller object.
from egse.notifyhub.services import EventSubscriber
def load_setup(event_data: dict):
...
subscriber = EventSubscriber(["new_setup"])
subscriber.register_handler("new_setup", load_setup)
subscriber.connect()
while True:
...
if subscriber.poll():
subscriber.handle_event()
subscriber.disconnect()
from egse.notifyhub.services import AsyncEventSubscriber
async def load_setup(event_data: dict):
...
subscriber = AsyncEventSubscriber(["new_setup"])
subscriber.register_handler("new_setup", load_setup)
await subscriber.connect()
event_listener = asyncio.create_task(subscriber.start_listening())
...
subscriber.disconnect()
await event_listener # add a wait_for with a timeout if needed
Health Checks¶
The Notification Hub provides a health check interface to monitor the status and availability of the hub. This allows services and scripts to verify connectivity and basic hub functionality. To perform a health check, use the NotificationHubClient class. The client connects to the hub's ROUTER-DEALER socket and sends a health check request. If the hub is available, it responds True. In case of an error or when the hub is not available, False is returned.
from egse.notifyhub.client import NotificationHubClient
with NotificationHubClient() as client:
if not client.health_check():
... # notification hub not available
from egse.notifyhub.client import AsyncNotificationHubClient
with AsyncNotificationHubClient() as client:
if not await client.health_check():
... # notification hub not available
The health check has a default timeout of 5 seconds. If this is too long for
your needs, provide a request_timeout
argument as a float in seconds to
the NotificationHubClient
call.
Monitoring¶
Every 30s, the hub provides basic connection and event statistics to the log.
TODO: this should also go on the PUB channel as a StatsEvent.