The Metrics Hub¶
Overview¶
The Metrics Hub is a centralized ingestion and persistence service for telemetry and housekeeping metrics. Services and control servers send metric points to the hub, and the hub batches these points before writing them to the configured time-series backend.
This provides a single ingestion path and removes direct backend coupling from producers.
Core responsibilities:
- centralized ingestion for producers
- payload normalization and validation
- batch persistence to backend repositories
- runtime status and write-path observability
Socket Endpoints¶
The Metrics Hub currently exposes two ZeroMQ sockets:
| Purpose | Pattern | Your Socket | Endpoint |
|---|---|---|---|
| Metric Ingestion | PUSH-PULL | PUSH | tcp://localhost:6130 |
| Health/Control | ROUTER-DEALER | DEALER | tcp://localhost:6132 |
Note
These default port values are defined in cgse_core/settings.yaml and can be overridden in the local settings.
Ports can be static or dynamically allocated by the OS when configured as 0. Clients discover
the active endpoints through the Registry, especially when ports are configured as 0.
Metrics Payload¶
The canonical payload shape matches DataPoint.as_dict():
{
"measurement": "camera_tm",
"tags": {"device_id": "cam_01"},
"fields": {"temperature": 23.4},
"time": 1774354482.517, # optional, unix timestamp
}
Normalization Rules¶
measurementmust be a non-empty stringfieldsmust be a dictionary with at least one non-NonevalueNonefield values are removed, not fatalNonetag values are removed, not fatal- payloads with only
Nonefields are dropped
This behavior allows optional telemetry fields without dropping an otherwise valid point.
Client Access¶
Any core service, control server, or script can send points to the hub.
from egse.metrics import DataPoint
from egse.metricshub.client import MetricsHubSender
point = (
DataPoint.measurement("camera_tm")
.tag("device_id", "cam_01")
.field("temperature", 23.4)
)
with MetricsHubSender() as sender:
sender.send(point)
from egse.metrics import DataPoint
from egse.metricshub.client import AsyncMetricsHubSender
point = (
DataPoint.measurement("camera_tm")
.tag("device_id", "cam_01")
.field("temperature", 23.4)
)
with AsyncMetricsHubSender() as sender:
await sender.send(point)
Control Actions¶
The control endpoint supports three actions:
health: liveness checkinfo: runtime info and statisticsterminate: graceful shutdown request
Status And Health¶
from egse.metricshub.client import MetricsHubClient
with MetricsHubClient() as client:
ok = client.health_check()
info = client.server_status()
from egse.metricshub.client import AsyncMetricsHubClient
with AsyncMetricsHubClient() as client:
ok = await client.health_check()
info = await client.server_status()
The info response contains:
- collector and request ports
- backend details (
name, repository class, reachability) - batching config
- runtime statistics (
received,written,dropped,errors, queue size)
Running The Service¶
Start, stop, and inspect status from CLI:
uv run cgse mh start
uv run cgse mh stop
uv run cgse mh status
python -m egse.metricshub.server start
python -m egse.metricshub.server status
python -m egse.metricshub.server stop
Configuration¶
Metrics Hub settings are defined under Metrics Hub in cgse_core/settings.yaml or local settings.
Important settings:
COLLECTOR_PORTREQUESTS_PORTBACKEND(influxdborduckdb)BATCH_SIZEFLUSH_INTERVAL
Environment variables can override these settings. Typical examples:
CGSE_METRICS_BACKENDCGSE_METRICS_BATCH_SIZECGSE_METRICS_FLUSH_INTERVAL- backend-specific variables (
CGSE_INFLUX_*,CGSE_DUCKDB_PATH)
Control Server Integration¶
Control servers use MetricsHubSender and no longer need direct backend credentials.
When metrics hub ports are dynamic (port value 0), the collector endpoint is resolved through the Registry service.
Monitoring And Troubleshooting¶
The hub periodically logs statistics (STATS_INTERVAL) and reports operational state through the status command.
For debugging of optional-field traffic, the status output includes counters for filtered None fields/tags and
all-None payload drops.