from pyrealtime.layer import ProducerMixin, ThreadLayer, TransformMixin, EncoderMixin, DecoderMixin
import time
[docs]def find_serial_port(name):
"""Utility function to scan available serial ports by name. The returned object is intended to be passed to the
:meth:`~pyrealtime.serial_layer.SerialWriteLayer.from_port` constructor of
:class:`~pyrealtime.serial_layer.SerialReadLayer` or :class:`~pyrealtime.serial_layer.SerialWriteLayer`.
:param name: The name of the serial port to scan for. It will return the first available port containing name.
:return: A closed serial port object
"""
try:
import serial
import serial.tools.list_ports
except ImportError:
raise ModuleNotFoundError("PySerial not found")
ports = list(serial.tools.list_ports.comports())
port = None
for p in ports:
if name in p.description:
port = p.device
if port is None:
for p in ports:
if name in p.device:
port = p.device
if port is None:
print("Error: could not find port: %s." % name)
print("Available ports:")
for p in ports:
print("%s: %s" % (p, p.description))
return port
[docs]class SerialWriteLayer(TransformMixin, EncoderMixin, ThreadLayer):
"""Sends data to a serial port
"""
def __init__(self, port_in, baud_rate, device_name, *args, auto_reconnect=True, **kwargs):
"""
:param port_in: Source of data to send
:param baud_rate: Baud rate or serial port (e.g. 9600, 115200, etc). See pyserial documentation for more details
:param device_name: Full or partial name of the device (e.g. 'COM2' or 'Arduino'). The port will be obtained using :func:`~pyrealtime.serial_layer.find_serial_port`.
"""
self.ser = None
self.baud_rate = baud_rate
self.device_name = device_name
self.auto_reconnect = auto_reconnect
super().__init__(port_in, *args, **kwargs)
[docs] @classmethod
def from_port(cls, port_in, serial, *args, **kwargs):
"""Creates a layer from an existing serial object
:param serial: Serial port object, either created using pyserial or from :func:`~pyrealtime.serial_layer.find_serial_port`.
"""
layer = cls(port_in=port_in, baud_rate=None, device_name=None, *args, **kwargs)
layer.ser = serial
return layer
def initialize(self):
try:
import serial
import serial.tools.list_ports
except ImportError:
raise ModuleNotFoundError("PySerial not found")
if self.ser is None:
port = find_serial_port(self.device_name)
self.ser = serial.Serial(port, self.baud_rate, timeout=5)
def transform(self, data):
self.ser.write(self._encode(data))
[docs]class SerialReadLayer(ProducerMixin, DecoderMixin, ThreadLayer):
"""Reads data from a serial port
"""
def __init__(self, baud_rate, device_name, *args, auto_reconnect=True, **kwargs):
"""
:param baud_rate: Baud rate or serial port (e.g. 9600, 115200, etc). See pyserial documentation for more details
:param device_name: Full or partial name of the device (e.g. 'COM2' or 'Arduino'). The port will be obtained using :func:`~pyrealtime.serial_layer.find_serial_port`.
"""
self.ser = None
self.baud_rate = baud_rate
self.device_name = device_name
self.auto_reconnect = auto_reconnect
self.disconnected = False
super().__init__(*args, **kwargs)
[docs] @classmethod
def from_port(cls, serial, *args, **kwargs):
"""Creates a layer from an existing serial object
:param serial: Serial port object, either created using pyserial or from :func:`~pyrealtime.serial_layer.find_serial_port`.
"""
layer = cls(baud_rate=None, device_name=None, *args, **kwargs)
layer.ser = serial
return layer
def initialize(self):
try:
import serial
except ImportError:
raise ModuleNotFoundError("PySerial not found")
if self.ser is None:
port = find_serial_port(self.device_name)
for i in range(5):
try:
self.ser = serial.Serial(port, self.baud_rate, timeout=5)
break
except serial.SerialException:
time.sleep(.1)
pass
if not self.ser.is_open:
raise RuntimeError("Serial port not open")
def get_input(self):
import serial
self.reconnect()
try:
line = self.ser.readline()
if line is None or len(line) == 0:
return None
except serial.serialutil.SerialException:
self.on_disconnect()
return None
return self._decode(line)
def on_disconnect(self):
self.disconnected = True
self.ser.close()
print("Serial port disconnected...")
def reconnect(self):
if self.disconnected:
import serial
print("Attempting to reopen port...")
try:
self.ser.open()
self.disconnected = False
except serial.SerialException:
time.sleep(1)
[docs]class ByteSerialReadLayer(SerialReadLayer):
def __init__(self, *args, num_bytes=1, preamble=None, **kwargs):
super().__init__(*args, **kwargs)
self.num_bytes = num_bytes
self.preamble = preamble
self.preamble_buffer = None if preamble is None else bytes(len(preamble))
def get_input(self):
import serial
self.reconnect()
if self.disconnected:
return None
if self.preamble is not None:
while True:
try:
new_data = self.ser.read(1)
except serial.SerialException:
self.on_disconnect()
return None
if len(new_data) == 1:
self.preamble_buffer = self.preamble_buffer[1:] + new_data
if self.preamble_buffer == self.preamble:
break
try:
data = self.ser.read(self.num_bytes)
except serial.SerialException:
self.on_disconnect()
return None
if data is None or len(data) == 0:
return None
return self._decode(data)