Controller

class lsst.ts.MTMount.mock.Controller(command_port, log, reconnect=False)

Bases: object

Simulate the most basic responses from the low-level controller (Operation Manager).

Acknowledge all commands and mark as done. Also output a few other replies, to exercise the code.

Parameters:
command_port : int

Port for reading commands (that the CSC writes). The reply port is one greater than the command port.

log : logging.Logger

Logger.

reconnect : bool, optional

Try to reconnect if the connection is lost? Defaults to False for unit tests.

Attributes Summary

connected

Methods Summary

add_all_devices() Add all mock devices.
add_device(device_class, **kwargs) Add a mock device.
close([cancel_read_loop, shutdown]) Close the controller.
connect()
connect_callback(server)
delete_command_queue() Delete the command queue, if present.
do_both_axes_move(command)
do_both_axes_stop(command)
do_both_axes_track(command)
handle_command(command)
monitor_command(command, task)
put_axis_telemetry(device_id, tai) Warning: this minimal and simplistic.
put_camera_cable_wrap_telemetry(tai) Warning: this minimal and simplistic.
read_loop()
reply_to_command(command)
set_command_queue([maxsize]) Create or replace the command queue.
signal_handler()
telemetry_loop() Warning: this minimal and simplistic.
write_ack(command, timeout) Report a command as acknowledged.
write_done(command) Report a command as done.
write_noack(command, explanation) Report a command as failed.

Attributes Documentation

connected

Methods Documentation

add_all_devices()

Add all mock devices.

The devices are added to self.device_dict.

Parameters:
device_id : DeviceId

Device identifier.

add_device(device_class, **kwargs)

Add a mock device.

The device is added to self.device_dict.

Parameters:
device_class : mock.Device

Mock device class.

**kwargs : dict

Additional arguments for device_class.

close(cancel_read_loop=True, shutdown=True)

Close the controller.

Parameters:
cancel_read_loop : bool, optional

If True (default) then cancel the read loop task. Set False if calling from read_loop.

shutdown : bool, optional

Set done_task result? True by default. Use False if reconnecting.

connect()
connect_callback(server)
delete_command_queue()

Delete the command queue, if present.

This sets self.command_queue=None.

do_both_axes_move(command)
do_both_axes_stop(command)
do_both_axes_track(command)
handle_command(command)
monitor_command(command, task)
put_axis_telemetry(device_id, tai)

Warning: this minimal and simplistic.

put_camera_cable_wrap_telemetry(tai)

Warning: this minimal and simplistic.

read_loop()
reply_to_command(command)
set_command_queue(maxsize=0)

Create or replace the command queue.

This sets attribute self.command_queue to an asyncio.Queue and uses it to record commands as they are received.

Parameters:
maxsize : int, optional

Maximum number of items on the queue. If <= 0 then no limit. If the queue gets full then the newest items are dropped.

signal_handler()
telemetry_loop()

Warning: this minimal and simplistic.

write_ack(command, timeout)

Report a command as acknowledged.

Parameters:
command : Command

Command to report as acknowledged.

timeout : float or None

Timeout for command (second)

write_done(command)

Report a command as done.

Parameters:
command : Command

Command to report as done.

write_noack(command, explanation)

Report a command as failed.

Parameters:
command : Command

Command to report as failed.

explanation : str

Reason for the failure.