Source code for Instruments_Libraries.InstrumentSelect

"""
Created on Mon Jan 31 08:55:42 2022
Refactored on Tue Feb 27 2025

@author: Martin.Mihaylov
@author: Maxim Weizel
"""

from __future__ import annotations

import logging
import re
import typing 
import pyvisa as visa

if typing.TYPE_CHECKING:
    from Instruments_Libraries.APPH import APPH
    from Instruments_Libraries.AQ6370D import AQ6370D
    from Instruments_Libraries.CoBrite import CoBrite
    from Instruments_Libraries.FSWP50 import FSWP50
    from Instruments_Libraries.GPP4323 import GPP4323
    from Instruments_Libraries.KEITHLEY2612 import KEITHLEY2612
    from Instruments_Libraries.LU1000 import LU1000_Cband
    from Instruments_Libraries.MG3694C import MG3694C
    from Instruments_Libraries.MS2760A import MS2760A
    from Instruments_Libraries.MS4647B import MS4647B
    from Instruments_Libraries.PM100D import PM100D
    from Instruments_Libraries.RD3005 import RD3005
    from Instruments_Libraries.SMA100B import SMA100B
    from Instruments_Libraries.UXR import UXR

from pyvisa.errors import VisaIOError

# Configure logging
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)

# =============================================================================
# Auto-Discovery Helper
# =============================================================================


[docs] def find_resource(model_regex: str, resource_filter: str = "?*INSTR", specific_address: str | None = None) -> str: """ Find a VISA resource matching the given model regex. If specific_address is provided, it is returned directly (after verification if possible). Returns the matching VISA resource string. """ import pyvisa # <- ensure pyvisa is defined in this scope rm = pyvisa.ResourceManager() if specific_address: logger.info(f"Using specific address: {specific_address}") return specific_address try: resources = rm.list_resources(resource_filter) except ValueError: resources = [] logger.info(f"Scanning {len(resources)} resources for pattern '{model_regex}'...") for res in resources: # Only consider TCPIP and relevant serial ports if not (res.startswith("TCPIP") or res.startswith("ASRL")): continue try: with rm.open_resource(res) as inst: inst.timeout = 500 # ms idn = None try: idn = inst.query("*IDN?").strip() except Exception: pass if idn and re.search(model_regex, idn, re.IGNORECASE): logger.info(f"Found match at {res}: {idn}") return res except Exception: continue # Last-resort fallback for link-local (169.254.x.x) devices if auto-discovery fails link_local_candidates = [r for r in resources if r.startswith("TCPIP::169.254.")] for res in link_local_candidates: try: with rm.open_resource(res) as inst: inst.timeout = 500 idn = inst.query("*IDN?").strip() if idn and re.search(model_regex, idn, re.IGNORECASE): logger.info(f"Found link-local match at {res}: {idn}") return res except Exception: continue raise RuntimeError(f"No instrument found matching pattern: {model_regex}")
# ============================================================================= # Instrument Factory Functions # =============================================================================
[docs] def OSA(resource: str | None = None) -> "AQ6370D": """ Connect to Yokogawa AQ6370D. Priority: 1. Use user-provided resource 2. Auto-discovery 3. Fallback to known link-local IPs """ from Instruments_Libraries.AQ6370D import AQ6370D import pyvisa # <- ensure pyvisa is defined in this scope if resource: logger.info(f"User provided resource: {resource}") return AQ6370D(resource) # 1️⃣ Try auto-discovery try: resource = find_resource(r"AQ6370") logger.info(f"AQ6370D found via auto-discovery: {resource}") return AQ6370D(resource) except RuntimeError: logger.warning("AQ6370D not found by auto-discovery.") # 2️⃣ Try fallback IPs observed in PyVISA fallback_ips = [ "TCPIP::169.254.2.21::INSTR", # current device seen on pyvisa "TCPIP::169.254.58.101::INSTR" # legacy fallback (optional) ] for ip in fallback_ips: try: logger.info(f"Trying fallback IP: {ip}") return AQ6370D(ip) except Exception as e: logger.warning(f"Failed to connect to {ip}: {e}") raise RuntimeError( "AQ6370D could not be found. Please provide the correct resource string " "or ensure the device is on the network and responds to *IDN? queries." )
[docs] def Laser_CoBrite(resource: str | None = None) -> CoBrite: # noqa: N802 from Instruments_Libraries.CoBrite import CoBrite if resource is None: resource = find_resource(r"COBRITE") return CoBrite(resource)
[docs] def SourceMeter(resource: str | None = None) -> KEITHLEY2612: # noqa: N802 from Instruments_Libraries.KEITHLEY2612 import KEITHLEY2612 if resource is None: resource = find_resource(r"Keithley.*2612") return KEITHLEY2612(resource)
[docs] def PowerSupply(resource: str | None = None) -> list: """ Scan and connect to all available RD3005 / KA3005 power supplies. Parameters ---------- resource : str | None Optional COM port or VISA resource string to connect to a specific device. Returns ------- list[RD3005] List of connected power supply instances. """ from Instruments_Libraries.RD3005 import RD3005 import serial.tools.list_ports import logging logger = logging.getLogger(__name__) connected_devices = [] target_ids = ["KORAD", "RND 320-KA3005P"] # If a specific resource is provided if resource: logger.info(f"Using provided resource: {resource}") connected_devices.append(RD3005(resource)) return connected_devices # Otherwise, scan all COM ports ports = serial.tools.list_ports.comports() filtered_ports = [p for p in ports if "bluetooth" not in p.description.lower()] filtered_ports.sort(key=lambda p: (0 if "rd3005" in p.description.lower() or "ka3005" in p.description.lower() else 1, p.device)) logger.info(f"Filtered COM ports: {[p.device for p in filtered_ports]}") for port in filtered_ports: inst = None try: logger.info(f"Trying port {port.device} ({port.description})") inst = RD3005(port.device) idn = inst.get_idn() if idn and any(tid in idn for tid in target_ids): logger.info(f"Power supply found on {port.device}: {idn}") connected_devices.append(inst) else: inst.close() except Exception as e: logger.warning(f"Error connecting to {port.device}: {e}") if inst: try: inst.close() except Exception: pass continue if not connected_devices: raise RuntimeError("No suitable power supply found.") return connected_devices
[docs] def PowerMeter(index: int = 0, resource: str | None = None) -> PM100D: # noqa: N802 """ Auto-detect a connected Thorlabs PM100-series power meter. """ from Instruments_Libraries.PM100D import PM100D if resource: return PM100D(resource) # Use auto-discovery logic similar to finding all PM100s and picking by index rm = visa.ResourceManager() matches = [] try: resources = rm.list_resources("?*::INSTR") except ValueError: resources = [] for res in resources: if "USB" not in res: continue # PM100 is typically USB try: with rm.open_resource(res) as inst: inst.timeout = 200 idn = None try: query_func = getattr(inst, "query", None) if callable(query_func): # Check if it supports query try: idn = query_func("*IDN?").strip() except Exception: pass if idn is None: write_func = getattr(inst, "write", None) read_func = getattr(inst, "read", None) if callable(write_func) and callable(read_func): # Fallback to write/read try: write_func("*IDN?") idn = read_func().strip() except Exception: pass except Exception: pass if idn and "PM100" in idn: matches.append(res) except Exception: continue if not matches: raise RuntimeError("No Thorlabs PM100-series power meter found.") if not (0 <= index < len(matches)): raise IndexError(f"index {index} out of range (found {len(matches)} device(s)).") return PM100D(matches[index])
[docs] def LU1000(resource: str | None = "USB") -> LU1000_Cband: # noqa: N802 from Instruments_Libraries.LU1000 import LU1000_Cband if resource is None: resource = "USB" return LU1000_Cband(resource)
[docs] def SpecAnalyser(resource: str | None = None) -> MS2760A: # noqa: N802 from Instruments_Libraries.MS2760A import MS2760A # Defaults to localhost per user request for hardcoded localhost if resource is None: resource = "127.0.0.1" return MS2760A(resource)
[docs] def SigGen(resource: str | None = None, visa_library: str = "@ivi") -> MG3694C: from Instruments_Libraries.MG3694C import MG3694C if resource is None: try: # Try auto-discovery resource = find_resource(r"MG369") if resource is None: # fallback to the IP discovered on the network resource = "TCPIP0::169.254.236.243::INSTR" print(f"MG3694C not found by auto-discovery, using fallback IP {resource}") except RuntimeError: resource = "TCPIP0::169.254.236.243::INSTR" print(f"MG3694C not found by auto-discovery, using fallback IP {resource}") return MG3694C(resource_str=resource, visa_library=visa_library)
[docs] def RnS_SMA100B(resource: str | None = None, visa_library: str = "@ivi") -> SMA100B: # noqa: N802 from Instruments_Libraries.SMA100B import SMA100B if resource is None: resource = find_resource(r"SMA100B") return SMA100B(resource_str=resource, visa_library=visa_library)
[docs] def VNA(resource: str | None = None) -> MS4647B: # noqa: N802 from Instruments_Libraries.MS4647B import MS4647B if resource: return MS4647B(resource) # Try auto-discovery first try: resource = find_resource(r"MS4647B") return MS4647B(resource) except RuntimeError: # Auto-discovery failed, scan all TCPIP resources rm = visa.ResourceManager() for res in rm.list_resources(): if "TCPIP" in res: try: return MS4647B(res) except Exception: continue # Fallback to hardcoded IP fallback = "TCPIP0::169.254.80.88::INSTR" return MS4647B(fallback)
[docs] def PhaseNoiseAnalyzer_APPH(resource: str | None = None) -> APPH: # noqa: N802 from Instruments_Libraries.APPH import APPH if resource is None: # Original logic looked for 'USB0' # We can try find_resource with regex try: resource = find_resource(r"APPH") except RuntimeError: # Fallback to finding any USB0 rm = visa.ResourceManager() for res in rm.list_resources(): if res.startswith("USB0"): resource = res break if resource is None: raise RuntimeError("APPH not found.") return APPH(resource)
[docs] def PowerSupply_GPP4323(resource: str | None = None) -> GPP4323: """ Scan and connect to the first available GW-Instek GPP-4323 power supply. Parameters ---------- resource : str | None Optional COM port or VISA resource string to connect to a specific device. Returns ------- GPP4323 Connected GPP4323 instance. """ import serial.tools.list_ports import re import logging from Instruments_Libraries.GPP4323 import GPP4323 from Instruments_Libraries.BaseInstrument import BaseInstrument logger = logging.getLogger(__name__) # If a resource was explicitly provided, connect directly if resource: if resource.upper().startswith("COM"): resource = BaseInstrument.com_to_asrl(resource) logger.info(f"Using provided resource: {resource}") return GPP4323(resource) # Regex to match the GPP4323 ID serial_regex = r'^GW INSTEK,GPP-4323.*' # List all COM ports ports = list(serial.tools.list_ports.comports()) # Filter out Bluetooth devices filtered_ports = [port for port in ports if "bluetooth" not in port.description.lower()] # Sort ports so "gpp" in description is prioritized filtered_ports.sort(key=lambda port: (0 if "gpp" in port.description.lower() else 1, port.device)) logger.info(f"Filtered COM ports: {[p.device for p in filtered_ports]}") selected_port = None GPP = None # Iterate over filtered COM ports for port in filtered_ports: try: logger.info(f"Trying port {port.device} ({port.description})") resource_str = BaseInstrument.com_to_asrl(port.device) GPP = GPP4323(resource_str) idn = GPP.get_idn() if re.match(serial_regex, idn.upper()): logger.info(f"GPP4323 found on {port.device}: {idn}") selected_port = port.device break # stop at first matching device else: GPP.close() except Exception as e: logger.warning(f"Error connecting to {port.device}: {e}") if GPP: try: GPP.close() except Exception: pass continue if selected_port is None: raise RuntimeError("No suitable GPP4323 Power Supply found.") # Return the connected instance return GPP
[docs] def UXR_1002A(resource: str | None = None) -> UXR: # noqa: N802 from Instruments_Libraries.UXR import UXR if resource is None: # Keep original hardcoded fallback resource = "TCPIP0::KEYSIGH-Q75EBO9.local::hislip0::INSTR" my_UXR = UXR(resource) # noqa: N806 # Preservation of original init settings logic # Note: exception handling is now up to caller or BaseInstrument try: my_UXR.system_header("off") my_UXR.waveform_byteorder("LSBFirst") my_UXR.waveform_format("WORD") my_UXR.waveform_streaming("off") except Exception as e: logger.warning(f"Failed to set initial UXR settings: {e}") return my_UXR
[docs] def RnS_FSWP50(resource: str | None = None) -> FSWP50: # noqa: N802 from Instruments_Libraries.FSWP50 import FSWP50 if resource is None: # Original hardcoded IP resource = "169.254.253.126" return FSWP50(resource)
# ============================================================================= # Main Factory / Selector # =============================================================================
[docs] def InstInit(num) -> typing.Any: # noqa: N802 """ Initialize instrument based on selection string. Recommended to use specific functions (e.g. SpecAnalyser()) directly instead. """ map_func = { " Anrtisu Spectrum Analyzer MS2760A ": SpecAnalyser, " Anritsu Signal Generator MG3694C ": SigGen, " Anritsu Vectro Analyzer MS4647B ": VNA, " Power Meter ThorLabs PM100D ": PowerMeter, " Novoptel Laser LU1000 ": LU1000, " Yokogawa Optical Spectrum Analyzer AQ6370D ": OSA, " KEITHLEY Source Meter 2612 ": SourceMeter, " Power Supply KA3005 ": PowerSupply, " CoBrite Tunable Laser ": Laser_CoBrite, " AnaPico AG,APPH20G ": PhaseNoiseAnalyzer_APPH, " 4-Channels Power Suppy GPP4323 ": PowerSupply_GPP4323, " Rohde and Schwarz SMA100B ": RnS_SMA100B, " Keysight UXR0702A ": UXR_1002A, " Rohde and Schwarz FSWP50 ": RnS_FSWP50, } func = map_func.get(num) if func: return func() else: raise ValueError("Invalid Instrument Selected")