Source code for pyrealtime.serial_layer

from pyrealtime.layer import ProducerMixin, ThreadLayer, TransformMixin, EncoderMixin, DecoderMixin
import time


[docs]def find_serial_port(name): """Utility function to scan available serial ports by name. The returned object is intended to be passed to the :meth:`~pyrealtime.serial_layer.SerialWriteLayer.from_port` constructor of :class:`~pyrealtime.serial_layer.SerialReadLayer` or :class:`~pyrealtime.serial_layer.SerialWriteLayer`. :param name: The name of the serial port to scan for. It will return the first available port containing name. :return: A closed serial port object """ try: import serial import serial.tools.list_ports except ImportError: raise ModuleNotFoundError("PySerial not found") ports = list(serial.tools.list_ports.comports()) port = None for p in ports: if name in p.description: port = p.device if port is None: for p in ports: if name in p.device: port = p.device if port is None: print("Error: could not find port: %s." % name) print("Available ports:") for p in ports: print("%s: %s" % (p, p.description)) return port
[docs]class SerialWriteLayer(TransformMixin, EncoderMixin, ThreadLayer): """Sends data to a serial port """ def __init__(self, port_in, baud_rate, device_name, *args, auto_reconnect=True, **kwargs): """ :param port_in: Source of data to send :param baud_rate: Baud rate or serial port (e.g. 9600, 115200, etc). See pyserial documentation for more details :param device_name: Full or partial name of the device (e.g. 'COM2' or 'Arduino'). The port will be obtained using :func:`~pyrealtime.serial_layer.find_serial_port`. """ self.ser = None self.baud_rate = baud_rate self.device_name = device_name self.auto_reconnect = auto_reconnect super().__init__(port_in, *args, **kwargs)
[docs] @classmethod def from_port(cls, port_in, serial, *args, **kwargs): """Creates a layer from an existing serial object :param serial: Serial port object, either created using pyserial or from :func:`~pyrealtime.serial_layer.find_serial_port`. """ layer = cls(port_in=port_in, baud_rate=None, device_name=None, *args, **kwargs) layer.ser = serial return layer
def initialize(self): try: import serial import serial.tools.list_ports except ImportError: raise ModuleNotFoundError("PySerial not found") if self.ser is None: port = find_serial_port(self.device_name) self.ser = serial.Serial(port, self.baud_rate, timeout=5) def transform(self, data): self.ser.write(self._encode(data))
[docs]class SerialReadLayer(ProducerMixin, DecoderMixin, ThreadLayer): """Reads data from a serial port """ def __init__(self, baud_rate, device_name, *args, auto_reconnect=True, **kwargs): """ :param baud_rate: Baud rate or serial port (e.g. 9600, 115200, etc). See pyserial documentation for more details :param device_name: Full or partial name of the device (e.g. 'COM2' or 'Arduino'). The port will be obtained using :func:`~pyrealtime.serial_layer.find_serial_port`. """ self.ser = None self.baud_rate = baud_rate self.device_name = device_name self.auto_reconnect = auto_reconnect self.disconnected = False super().__init__(*args, **kwargs)
[docs] @classmethod def from_port(cls, serial, *args, **kwargs): """Creates a layer from an existing serial object :param serial: Serial port object, either created using pyserial or from :func:`~pyrealtime.serial_layer.find_serial_port`. """ layer = cls(baud_rate=None, device_name=None, *args, **kwargs) layer.ser = serial return layer
def initialize(self): try: import serial except ImportError: raise ModuleNotFoundError("PySerial not found") if self.ser is None: port = find_serial_port(self.device_name) for i in range(5): try: self.ser = serial.Serial(port, self.baud_rate, timeout=5) break except serial.SerialException: time.sleep(.1) pass if not self.ser.is_open: raise RuntimeError("Serial port not open") def get_input(self): import serial self.reconnect() try: line = self.ser.readline() if line is None or len(line) == 0: return None except serial.serialutil.SerialException: self.on_disconnect() return None return self._decode(line) def on_disconnect(self): self.disconnected = True self.ser.close() print("Serial port disconnected...") def reconnect(self): if self.disconnected: import serial print("Attempting to reopen port...") try: self.ser.open() self.disconnected = False except serial.SerialException: time.sleep(1)
[docs]class ByteSerialReadLayer(SerialReadLayer): def __init__(self, *args, num_bytes=1, preamble=None, **kwargs): super().__init__(*args, **kwargs) self.num_bytes = num_bytes self.preamble = preamble self.preamble_buffer = None if preamble is None else bytes(len(preamble)) def get_input(self): import serial self.reconnect() if self.disconnected: return None if self.preamble is not None: while True: try: new_data = self.ser.read(1) except serial.SerialException: self.on_disconnect() return None if len(new_data) == 1: self.preamble_buffer = self.preamble_buffer[1:] + new_data if self.preamble_buffer == self.preamble: break try: data = self.ser.read(self.num_bytes) except serial.SerialException: self.on_disconnect() return None if data is None or len(data) == 0: return None return self._decode(data)