# Downloaded from https://www.novoptel.de/Home/Downloads_de.php on 07.02.2025
# Copyright Novoptel GmbH
import re
from ctypes import create_string_buffer
from time import sleep
import ftd2xx as ftd
[docs]
class NovoptelUSB:
# Parameters
baudrate = 230400
DEVNO = -1
def __init__(self, instrument_str=None):
self.isConnected = False
self.d = None
if isinstance(instrument_str, str): # Code from mweizel
pattern = rf"^{instrument_str}.*"
# list all connected ftdi devices
dlist = ftd.listDevices(2)
if dlist is not None and len(dlist) > 0:
for idx, dev in enumerate(dlist[:]):
if re.match(pattern, dev.decode("UTF-8")): # if instrument_str is found
self.DEVNO = idx
if self.DEVNO >= 0:
try:
print(f"Try connecting to {dlist[self.DEVNO].decode('UTF-8')}")
self.connect()
except Exception:
self.DEVNO = -1
print("Not Connected.")
else:
print(" No Instrument found")
return
else: # Code form Novoptel
# list all connected ftdi devices
dlist = ftd.listDevices(2)
if dlist is not None and len(dlist) > 0:
counter = 0
for dev in dlist[:]:
print(" " + str(counter) + ": " + dev.decode("UTF-8"))
counter = counter + 1
print(" Select Instrument (-1 to Quit):")
self.DEVNO = int(input())
if self.DEVNO >= 0:
self.connect()
else:
print(" No Instrument found")
return
[docs]
def connect(self):
self.d = ftd.open(self.DEVNO) # Open selected FTDI device
self.d.setBaudRate(self.baudrate)
self.d.setDataCharacteristics(8, 0, 0)
self.isConnected = True
print("Connected.")
return
[docs]
def close(self):
if self.d is not None:
self.d.close()
self.d = None
self.isConnected = False
print("Closed.")
return
[docs]
def write(self, addr, data):
if self.d is None:
raise ConnectionError("USB device is not connected")
sleep(0.01)
txstring = f"W{addr:03X}{data:04X}{chr(13)}"
tx = create_string_buffer(txstring.encode("utf-8"), 9)
self.d.write(tx.raw)
return
[docs]
def read(self, addr):
if self.d is None:
raise ConnectionError("USB device is not connected")
self.d.purge() # clear buffers
# sleep(0.01)
# send request command
txstring = f"R{addr:03X}0000{chr(13)}"
# print(txstring)
tx = create_string_buffer(txstring.encode("utf-8"), 9)
self.d.write(tx.raw)
# wait for RX
bytesavailable = 0
tries = 0
while bytesavailable < 5 and tries < 1000:
bytesavailable = self.d.getQueueStatus()
tries += 1
sleep(0.001)
# get RX
res = self.d.read(bytesavailable)
# print(len(res))
# print(type(res))
# for ires in res[:]:
# print(ires)
# print(tries)
# print(res.decode("utf-8"))
# print(int(res.decode("utf-8"),16))
# return RX as integer
if bytesavailable > 4:
val = int(res.decode("utf-8"), 16)
else:
val = -1
return val