"""
Created on Tue Feb 27 2025
@author: Refactoring Bot
"""
import logging
from typing import Any, cast
import pyvisa
from pyvisa.resources import MessageBasedResource
[docs]
class BaseInstrument:
"""
Base class for all instrument drivers.
Handles VISA connection, communication, logging, and error handling.
"""
def __init__(self, resource_str: str, visa_library: str = "@py", **kwargs):
"""
Initialize the instrument connection.
Parameters
----------
resource_str : str
The VISA resource string (e.g., 'TCPIP::192.168.1.1::INSTR') or just an IP address.
visa_library : str, optional
VISA library to use (e.g., '@ivi', '@py'). Default is '@py'.
**kwargs : dict
Additional arguments passed to `open_resource`.
"""
import re
# Auto-format IP address or localhost to TCPIP VISA string if needed
# We assume it's an IP/locahost if it perfectly matches an IPv4 format or 'localhost'
ip_pattern = re.compile(r"^(\d{1,3}\.){3}\d{1,3}$|^localhost$", re.IGNORECASE)
# If the user just gave an IP, add the TCPIP...INSTR decorators
if ip_pattern.match(resource_str):
resource_str = f"TCPIP::{resource_str}::INSTR"
# If it's an IP with a port (socket), example: 192.168.1.1:5025
elif re.match(r"^(\d{1,3}\.){3}\d{1,3}:\d+$|^localhost:\d+$", resource_str, re.IGNORECASE):
ip, port = resource_str.split(":")
resource_str = f"TCPIP::{ip}::{port}::SOCKET"
self.resource_str = resource_str
self.logger = logging.getLogger(f"{self.__class__.__name__}({resource_str})")
# Ensure a basic handler exists if none are configured
if not self.logger.hasHandlers() and not logging.getLogger().handlers:
logging.basicConfig(level=logging.INFO)
try:
self._rm = pyvisa.ResourceManager(visa_library)
self._resource = cast(
MessageBasedResource, self._rm.open_resource(resource_str, **kwargs)
)
self.logger.info(f"Connected to {resource_str}")
except Exception as e:
self.logger.error(f"Failed to connect to {resource_str}: {e}")
raise
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
[docs]
@staticmethod
def com_to_asrl(com_port: str) -> str:
"""
Convert a Windows COM port string to a PyVISA ASRL resource string.
Parameters
----------
com_port : str
Windows COM port (e.g., "COM3").
Returns
-------
str
VISA ASRL string (e.g., "ASRL3::INSTR").
"""
return f"ASRL{com_port.upper().replace('COM','')}::INSTR"
# =============================================================================
# Communication Wrappers
# =============================================================================
[docs]
def write(self, command: str) -> None:
"""
Send a command to the instrument.
Parameters
----------
command : str
SCPI command string.
"""
try:
self.logger.debug(f"Write: {command}")
self._resource.write(command)
except Exception as e:
self.logger.error(f"Write failed: '{command}', Error: {e}")
raise
[docs]
def query(self, command: str) -> str:
"""
Send a query and return the response string.
The instrument's response is stripped of whitespace.
Parameters
----------
command : str
SCPI query string.
"""
try:
self.logger.debug(f"Query: {command}")
response = self._resource.query(command).strip()
self.logger.debug(f"Response: {response}")
return response
except Exception as e:
self.logger.error(f"Query failed: '{command}', Error: {e}")
raise
[docs]
def read(self) -> str:
"""
Read raw response string from the instrument.
"""
try:
self.logger.debug("Reading from instrument...")
response = self._resource.read()
self.logger.debug(f"Read: {response.strip()}")
return response
except Exception as e:
self.logger.error(f"Read failed: {e}")
raise
[docs]
def query_ascii_values(self, command: str, **kwargs) -> list[Any]:
"""
Query for a list of ASCII values (e.g., trace data).
Parameters
----------
command : str
SCPI query command.
**kwargs : dict
Additional arguments passed to `query_ascii_values`.
"""
try:
self.logger.debug(f"Query ASCII: {command}")
return cast(list[Any], self._resource.query_ascii_values(command, **kwargs))
except Exception as e:
self.logger.error(f"Query ASCII failed: '{command}', Error: {e}")
raise
[docs]
def query_str_list(self, command: str) -> list[str]:
"""
Query the instrument and return a list of strings.
Automatically removes SCPI quotes (' or ") and strips whitespace.
"""
response = self.query(command)
if not response or response.upper() == "NONE":
return []
return [s.strip().strip("'").strip('"') for s in response.split(",")]
[docs]
def query_float(self, command: str) -> float:
"""
Convenience method to query and parse a single float value.
Parameters
----------
command : str
SCPI query string.
"""
return float(self.query(command))
[docs]
def query_int(self, command: str) -> int:
"""
Convenience method to query and parse a single integer value.
Parameters
----------
command : str
SCPI query string.
"""
return int(float(self.query(command)))
# =============================================================================
# Common Instrument Commands
# =============================================================================
[docs]
def get_idn(self) -> str:
"""
Get the instrument identification string (``*IDN?``).
"""
return self.query("*IDN?")
[docs]
def reset(self) -> None:
"""
Reset the instrument (``*RST``).
"""
self.write("*RST")
self.logger.info("Instrument reset (*RST)")
[docs]
def clear(self) -> None:
"""
Clear the instrument status (``*CLS``).
"""
self.write("*CLS")
[docs]
def get_opc(self) -> int:
"""
Wait until operation complete (``*OPC?``).
Returns
-------
int
1 when operation is complete.
"""
return self.query_int("*OPC?")
[docs]
def wait(self) -> None:
"""
Wait for operation to complete (``*WAI``).
"""
self.write("*WAI")
[docs]
def close(self) -> None:
"""
Close the connection to the instrument.
"""
try:
if hasattr(self, "_resource"):
self._resource.close()
if hasattr(self, "_rm"):
self._rm.close()
self.logger.info(f"Connection to {self.resource_str} closed.")
except Exception as e:
self.logger.error(f"Error closing connection: {e}")
# =============================================================================
# Validate Variables
# =============================================================================
def _parse_state(self, state: str | int | float | bool) -> str:
"""
Helper to parse various input types into SCPI 'ON' or 'OFF' strings.
Supports bool (True/False), numeric (1/0, 1.0/0.0), and strings ('ON'/'OFF', '1'/'0').
"""
if isinstance(state, bool):
return "ON" if state else "OFF"
if isinstance(state, (int, float)):
if state == 1:
return "ON"
elif state == 0:
return "OFF"
s = str(state).strip().upper()
if s in ["ON", "1", "1.0"]:
return "ON"
elif s in ["OFF", "0", "0.0"]:
return "OFF"
else:
raise ValueError(f"Invalid state: '{state}'. Use True/False, 1/0, or 'ON'/'OFF'.")
def _check_scpi_param(self, user_input: str, allowed_params: list[str]) -> str:
"""
Validates user input against a list of allowed SCPI parameters.
Supports SCPI short forms (e.g., 'PHOT' for 'PHOTodiode') and is case-insensitive.
Returns the exact parameter string as provided in allowed_params.
"""
user_input_upper = str(user_input).strip().upper()
for param in allowed_params:
# Handle optional characters like [] if they exist
clean_param = param.replace("[", "").replace("]", "")
# Extract the mandatory short form, represented by uppercase letters
short_form = "".join(c for c in param if c.isupper())
if not short_form:
short_form = clean_param.upper()
long_form = clean_param.upper()
# Input must be at least the short form length, and match the long form's prefix
if len(user_input_upper) >= len(short_form) and long_form.startswith(user_input_upper):
return param
raise ValueError(
f"Invalid input '{user_input}'. Allowed parameters are: {allowed_params} "
"(case-insensitive, abbreviation allowed up to the capital letters)."
)
# =============================================================================
# Common aliases
# =============================================================================
Close = close
idn = get_idn
IDN = get_idn
opc = get_opc
OPC = get_opc