Skip to content

egse.control

This module defines the abstract class for any Control Server and some convenience functions.

Classes:

Name Description
ControlServer

Base class for all device control servers and for the Storage Manager and Configuration Manager.

Functions:

Name Description
is_control_server_active

Checks if the Control Server is running.

ControlServer

ControlServer()

Base class for all device control servers and for the Storage Manager and Configuration Manager.

A Control Server reads commands from a ZeroMQ socket and executes these commands by calling the execute() method of the commanding protocol class.

The subclass shall define the following:

  • Define the device protocol class -> self.device_protocol
  • Bind the command socket to the device protocol -> self.dev_ctrl_cmd_sock
  • Register the command socket in the poll set -> self.poller

Methods:

Name Description
after_serve

This method needs to be overridden by the subclass if certain actions need to be executed after the control

before_serve

This method needs to be overridden by the subclass if certain actions need to be executed before the control

get_average_execution_times

Returns the average execution times of all functions that have been monitored by this process.

get_commanding_port

Returns the commanding port used by the Control Server.

get_communication_protocol

Returns the communication protocol used by the Control Server.

get_ip_address

Returns the IP address of the current host.

get_monitoring_port

Returns the monitoring port used by the Control Server.

get_process_status

Returns the process status of the Control Server.

get_service_port

Returns the service port used by the Control Server.

get_storage_mnemonic

Returns the storage mnemonics used by the Control Server.

handle_scheduled_tasks

Executes or reschedules tasks in the serve() event loop.

is_storage_manager_active

Checks if the Storage Manager is active.

notify_listeners

Notifies registered listeners about an event.

quit

Interrupts the Control Server.

register_as_listener

Registers a listener with the specified proxy.

register_to_storage_manager

Registers this Control Server to the Storage Manager.

schedule_task

Schedules a task to run in the control server event loop.

serve

Activation of the Control Server.

set_hk_delay

Sets the delay time for housekeeping.

set_logging_level

Sets the logging level to the given level.

set_mon_delay

Sets the delay time for monitoring.

set_scheduled_task_delay

Sets the delay time between successive executions of scheduled tasks.

store_housekeeping_information

Sends housekeeping information to the Storage Manager.

unregister_as_listener

Removes a registered listener from the specified proxy.

unregister_from_storage_manager

Unregisters the Control Server from the Storage Manager.

after_serve

after_serve()

This method needs to be overridden by the subclass if certain actions need to be executed after the control server has been deactivated.

before_serve

before_serve()

This method needs to be overridden by the subclass if certain actions need to be executed before the control server is activated.

get_average_execution_times

get_average_execution_times()

Returns the average execution times of all functions that have been monitored by this process.

Returns:

Type Description
dict

Dictionary with the average execution times of all functions that have been monitored by this process. The dictionary keys are the function names, and the values are the average execution times in ms.

get_commanding_port abstractmethod

get_commanding_port()

Returns the commanding port used by the Control Server.

Returns:

Type Description
int

Commanding port used by the Control Server, as specified in the settings.

get_communication_protocol abstractmethod

get_communication_protocol()

Returns the communication protocol used by the Control Server.

Returns:

Type Description
str

Communication protocol used by the Control Server, as specified in the settings.

get_ip_address

get_ip_address()

Returns the IP address of the current host.

get_monitoring_port abstractmethod

get_monitoring_port()

Returns the monitoring port used by the Control Server.

Returns:

Type Description
int

Monitoring port used by the Control Server, as specified in the settings.

get_process_status

get_process_status()

Returns the process status of the Control Server.

Returns:

Type Description
dict

Dictionary with the process status of the Control Server.

get_service_port abstractmethod

get_service_port()

Returns the service port used by the Control Server.

Returns:

Type Description
int

Service port used by the Control Server, as specified in the settings.

get_storage_mnemonic

get_storage_mnemonic()

Returns the storage mnemonics used by the Control Server.

This is a string that will appear in the filename with the housekeeping information of the device, as a way of identifying the device. If this is not implemented in the subclass, then the class name will be used.

Returns:

Type Description
str

Storage mnemonics used by the Control Server, as specified in the settings.

handle_scheduled_tasks

handle_scheduled_tasks()

Executes or reschedules tasks in the serve() event loop.

is_storage_manager_active

is_storage_manager_active()

Checks if the Storage Manager is active.

This method has to be implemented by the subclass if you need to store information.

Note: You might want to set a specific timeout when checking for the Storage Manager.

Note: If this method returns True, the following methods shall also be implemented by the subclass:

  • register_to_storage_manager()
  • unregister_from_storage_manager()
  • store_housekeeping_information()

Returns:

Type Description
bool

True if the Storage Manager is active; False otherwise.

notify_listeners

notify_listeners(event_id=0, context=None)

Notifies registered listeners about an event.

This function creates an Event object with the provided event_id and context and notifies all registered listeners with the created event.

Parameters:

Name Type Description Default
event_id int

The identifier for the event. Defaults to 0.

0
context dict

Additional context information associated with the event. Defaults to None.

None
Note

The notification is performed by the notify_listeners method of the listeners object associated with this instance. The notification is executed in a daemon thread to avoid blocking the commanding chain.

quit

quit()

Interrupts the Control Server.

register_as_listener

register_as_listener(proxy, listener)

Registers a listener with the specified proxy.

This function attempts to add the provided listener to the specified proxy. It employs a retry mechanism to handle potential ConnectionError exceptions, making up to 5 attempts to add the listener.

Parameters:

Name Type Description Default
proxy Type

A callable object representing the proxy to which the listener will be added.

required
listener dict

The listener to be registered. Should be a dictionary containing listener details.

required

Raises:

Type Description
ConnectionError

If the connection to the proxy encounters issues even after multiple retry attempts.

Note

The function runs in a separate daemon thread to avoid blocking the main thread.

register_to_storage_manager

register_to_storage_manager()

Registers this Control Server to the Storage Manager.

By doing so, the housekeeping information of the device will be sent to the Storage Manager, which will store the information in a dedicated CSV file.

This method has to be overwritten by the subclasses if they have housekeeping information that must be stored.

Subclasses need to overwrite this method if they have housekeeping information to be stored.

The following information is required for the registration:

  • origin: Storage mnemonic, which can be retrieved from self.get_storage_mnemonic()
  • persistence_class: Persistence layer (one of the TYPES in egse.storage.persistence)
  • prep: depending on the type of the persistence class (see respective documentation)

The egse.storage module provides a convenience method that can be called from the method in the subclass:

>>> from egse.storage import register_to_storage_manager  # noqa
Note

the egse.storage module might not be available, it is provided by the cgse-core package.

schedule_task

schedule_task(callback, after=0.0, when=None)

Schedules a task to run in the control server event loop.

The callback function will be executed as soon as possible in the serve() event loop.

Some simple scheduling options are available:

  • after: the task will only execute 'x' seconds after the time of scheduling. I.e. the task will be rescheduled until time > scheduled time + 'x' seconds.
  • when: the task will only execute when the condition is True.

The after and the when arguments can be combined.

Note
  • This function is intended to be used in order to prevent a deadlock.
  • Since the callback function is executed in the serve() event loop, it shall not block!

serve

serve()

Activation of the Control Server.

This comprises the following steps:

  • Executing the before_serve method;
  • Checking if the Storage Manager is active and registering the Control Server to it;
  • Start listening for keyboard interrupts;
  • Start accepting (listening to) commands;
  • Start sending out monitoring information;
  • Start sending out housekeeping information;
  • Start listening for quit commands;
  • After a quit command has been received:
    • Unregister from the Storage Manager;
    • Execute the after_serve method;
    • Close all sockets;
    • Clean up all threads.

set_hk_delay

set_hk_delay(seconds)

Sets the delay time for housekeeping.

The delay time is the time between two successive executions of the get_housekeeping() function of the device protocol.

It might happen that the delay time that is set is longer than what you requested. That is the case when the execution of the get_housekeeping() function takes longer than the requested delay time. That should prevent the server from blocking when a too short delay time is requested.

Parameters:

Name Type Description Default
seconds float

Number of seconds between the housekeeping calls

required

Returns:

Type Description
float

Delay that was set [ms].

set_logging_level

set_logging_level(level)

Sets the logging level to the given level.

Allowed logging levels are:

  • "CRITICAL" or "FATAL" or 50
  • "ERROR" or 40
  • "WARNING" or "WARN" or 30
  • "INFO" or 20
  • "DEBUG" or 10
  • "NOTSET" or 0

Parameters:

Name Type Description Default
level int | str

Logging level to use, specified as either a string or an integer

required

set_mon_delay

set_mon_delay(seconds)

Sets the delay time for monitoring.

The delay time is the time between two successive executions of the get_status() function of the device protocol.

It might happen that the delay time that is set is longer than what you requested. That is the case when the execution of the get_status() function takes longer than the requested delay time. That should prevent the server from blocking when a too short delay time is requested.

Parameters:

Name Type Description Default
seconds float

Number of seconds between the monitoring calls

required

Returns:

Type Description
float

Delay that was set [ms].

set_scheduled_task_delay

set_scheduled_task_delay(seconds)

Sets the delay time between successive executions of scheduled tasks.

Parameters:

Name Type Description Default
seconds

the time interval between two successive executions [seconds]

required

store_housekeeping_information

store_housekeeping_information(data)

Sends housekeeping information to the Storage Manager.

This method has to be overwritten by the subclasses if they want the device housekeeping information to be saved.

Parameters:

Name Type Description Default
data dict

a dictionary containing parameter name and value of all device housekeeping. There is also a timestamp that represents the date/time when the HK was received from the device.

required

unregister_as_listener

unregister_as_listener(proxy, listener)

Removes a registered listener from the specified proxy.

This function attempts to remove the provided listener from the specified proxy. It employs a retry mechanism to handle potential ConnectionError exceptions, making up to 5 attempts to add the listener.

Parameters:

Name Type Description Default
proxy Type

A callable object representing the proxy from which the listener will be removed.

required
listener dict

The listener to be removed. Should be a dictionary containing listener details.

required

Raises:

Type Description
ConnectionError

If the connection to the proxy encounters issues even after multiple retry attempts.

Note

The function runs in a separate thread but will block until the de-registration is finished. The reason being that this method is usually called in a after_serve block so it needs to finish before the ZeroMQ context is destroyed.

unregister_from_storage_manager

unregister_from_storage_manager()

Unregisters the Control Server from the Storage Manager.

This method has to be overwritten by the subclasses.

The following information is required for the registration:

  • origin: Storage mnemonic, which can be retrieved from self.get_storage_mnemonic()

The egse.storage module provides a convenience method that can be called from the method in the subclass:

>>> from egse.storage import unregister_from_storage_manager  # noqa
Note

the egse.storage module might not be available, it is provided by the cgse-core package.

is_control_server_active

is_control_server_active(endpoint=None, timeout=0.5)

Checks if the Control Server is running.

This function sends a Ping message to the Control Server and expects a Pong answer back within the timeout period.

Parameters:

Name Type Description Default
endpoint str

Endpoint to connect to, i.e. ://

:

None
timeout float

Timeout when waiting for a reply [s, default=0.5]

0.5