Source code for Instruments_Libraries.UXR

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Mon Dec  1 20:17:32 2024

@author: Maxim Weizel
"""

import numpy as np
import pyvisa as visa
from os.path import splitext
from datetime import datetime


[docs] class UXR: """ This class is using pyvisa to connect to Instruments. Please install PyVisa before using it. """ def __init__( self, resource_str="TCPIP0::KEYSIGH-Q75EBO9.local::hislip0::INSTR", num_channel=2, visa_library="@ivi", # If you have problems try "@py"! # Or try setting Keysight visa32.dll as primary! ): self._resource = visa.ResourceManager(visa_library).open_resource( str(resource_str), read_termination="\n", query_delay=0.5 ) print(self.IDN()) # Internal Variables and Predefined Lists self._types_channel = list(range(1, num_channel + 1)) self._waveform_format = "ASC" self._types_channel = list(range(1, num_channel + 1)) self._StateLS_mapping = { "on": "ON", "off": "OFF", 1: "ON", 0: "OFF", "1": "ON", "0": "OFF", True: "ON", False: "OFF", } # Default Settings self.system_header("off") # Default is off and should stay off!!! self.waveform_byteorder("LSBFirst") self.waveform_format("WORD") # Data Aquisition is only implemented for WORD yet. self.waveform_streaming("off")
[docs] def query(self, message): return self._resource.query(message)
[docs] def query_binary_values( self, message, datatype="h", container=np.array, data_points=int(0), **kwargs ): return self._resource.query_binary_values( message, datatype=datatype, container=container, data_points=data_points, **kwargs, )
[docs] def write(self, message): return self._resource.write(message)
[docs] def Close(self): self._resource.close()
# ============================================================================= # Checks and Validations # ============================================================================= def _validate_generic(self, value: str | int, valid_List: list) -> str: if value not in valid_List: raise ValueError(f"Invalid value given! Value can be one of {valid_List}.") return value def _validate_channel(self, channel: int) -> int: channel = int(channel) if channel not in self._types_channel: raise ValueError( f"Invalid channel number given! Channel Number can be one of {self._types_channel}." ) return channel def _validate_state(self, state: int | str) -> str: state_normalized = self._StateLS_mapping.get( state.lower() if isinstance(state, str) else int(state) ) if state_normalized is None: raise ValueError("Invalid state given! State can be [ON,OFF,1,0,True,False].") return state_normalized # ============================================================================= # * (Common) Commands # =============================================================================
[docs] def clear_status(self) -> None: """The ``*CLS`` command clears all status and error registers.""" self.write("*CLS")
[docs] def IDN(self) -> str: """The ``*IDN?`` query returns the company name, oscilloscope model number, serial number, and software version by returning this string: Keysight Technologies,<Model #>,<USXXXXXXXX>,<Rev #>[,<Options>] Returns ------- str Keysight Technologies,DSO9404A,USXXXXXXXX,XX.XX.XXXX """ return self.query("*IDN?")
[docs] def OPC(self) -> int: """Places a “1” into the output queue when all device operations have been completed Returns ------- TYPE str 1 or 0 """ return int(self.query("*OPC?"))
[docs] def reset(self) -> None: """The ``*RST`` command performs a default setup which is the same as pressing the oscilloscope front panel [Default Setup] key. """ self.write("*RST")
# ============================================================================= # : (Root Level) Commands # =============================================================================
[docs] def aquisition_done(self) -> int: """The :ADER? query reads the Acquisition Done Event Register and returns 1 or 0. After the Acquisition Done Event Register is read, the register is cleared. The returned value 1 indicates an acquisition completed event has occurred and 0 indicates an acquisition completed event has not occurred. Returns ------- int {1 | 0} """ return int(self.query(":ADER?"))
[docs] def aquisition_state(self) -> str: """The :ASTate? query returns the acquisition state. Returns ------- str {ARM | TRIG | ATRIG | ADONE} """ return self.query(":ASTate?")
[docs] def autoscale(self) -> None: """The :AUToscale command causes the oscilloscope to evaluate all input waveforms and find the optimum conditions for displaying the waveform. """ self.write(":AUToscale")
[docs] def autoscale_channels(self, value: str | None = None) -> str: """The :AUToscale:CHANnels command selects whether to apply autoscale to all of the input channels or just the input channels that are currently displayed. Parameters ---------- value : str, optional {ALL | DISPlayed}, if None then query Returns ------- str {ALL | DISP} Raises ------ ValueError Expected one of: {ALL | DISP | DISPlayed } """ _types = ["ALL", "DISPLAYED", "DISP"] if value is not None: value = self._validate_generic(value.upper(), _types) self.write(f":AUToscale:CHANnels {value}") else: # Query return self.query(":AUToscale:CHANnels?")
[docs] def digitize(self, channel_num: int | None = None) -> None: """This command initializes the selected channels or functions, then acquires them according to the current oscilloscope settings. When all waveforms are completely acquired, the oscilloscope is stopped. To Do: input can be: [CHANnel<N> | DIFF<D> | COMMonmode<C>] Parameters ---------- channel_num : int Number of the Channel Raises ------ channel_numError Expected one of: channel number """ if channel_num is not None: channel_num = self._validate_channel(channel_num) self.write(f":DIGitize CHANnel{channel_num}") else: self.write(":DIGitize")
[docs] def run_state(self) -> str: """The :RSTate? query returns the run state: Returns ------- str {RUN | STOP | SING} """ return self.query(":RSTate?")
[docs] def run(self) -> None: """ Set the scope in run mode. """ self.write(":RUN")
[docs] def single(self) -> None: """ Take a single acquisition """ self.write(":SING")
[docs] def status(self, key: str | None = None, value: int | None = None) -> int: """The :STATus? query shows whether the specified channel, function, wmemory, histogram, measurement trend, measurement spectrum, or equalized waveform is on or off. TODO: Each type has a different range of values that is excepted. No Checking is implemented. Parameters ---------- key : str, optional if None return status of Channel1 value : int, optional For Channel [1,2], for Function <=16 Returns ------- int A return value of 1 means on and a return value of 0 means off Raises ------ ValueError Expected one of: CHANNEL, FUNCTION, HIST, ... etc. """ _types_key = [ "CHAN", "CHANNEL", "DIFF", "COMM", "COMMONMODE", "FUNC", "FUNCTION", "HIST", "HISTOGRAM", "WMEM", "WMEMORY", "CLOC", "CLOCK" "MTR", "MTREND", "MSP", "MSPECTRUM", "EQU", "EQUALIZED", "XT", ] if key is not None: key = self._validate_generic(key.upper(), _types_key) if int(value) <= 16: # For CHAN <=2, for FUNC <=16, ... etc. return int(self.query(f":STATus? {key}{value}")) else: return int(self.query(f":STATus? CHANnel1"))
[docs] def stop(self) -> None: """ Set the scope in stop mode. """ self.write(":STOP")
# ============================================================================= # :CHANnel<N> Commands # =============================================================================
[docs] def channel_display(self, channel: int, state: int | str | None = None) -> int: """The :CHANnel<N>:DISPlay command turns the display of the specified channel on or off. Parameters ---------- channel : int An integer, analog input channel 1 or 2 state : int, str, optional ON, 1, OFF, 0 Returns ------- int The :CHANnel<N>:DISPlay? query returns the current display condition for the specified channel """ channel = self._validate_channel(channel) if state is not None: state = self._validate_state(state) self.write(f":CHANnel{channel}:DISPlay {state}") else: # query return int(self.query(f":CHANnel{channel}:DISPlay?"))
[docs] def channel_range(self, channel: int, range_value: float | None = None) -> float: """The :CHANnel<N>:RANGe command defines the full-scale vertical axis of the selected channel. The values represent the full-scale deflection factor of the vertical axis in volts. These values change as the probe attenuation factor is changed. Parameters ---------- channel : int An integer, analog input channel 1 or 2 range_value : float, optional A real number for the full-scale voltage of the specified channel number, by default None Returns ------- float full-scale vertical axis of the selected channel Raises ------ ValueError For Channel expected one of: num_channels ValueError For range_value expected to be < 2V """ channel = self._validate_channel(channel) if range_value is not None: if range_value <= 4: # 2V Full Scale Range self.write(f":CHANnel{channel}:RANGe {range_value}") else: raise ValueError("Invalid Argument. Expected to be <= 4V") else: # query return float(self.query(f":CHANnel{channel}:RANGe?"))
[docs] def channel_scale(self, channel: int, scale_value: float | None = None) -> float: """The :CHANnel<N>:SCALe command sets the vertical scale, or units per division, of the selected channel. This command is the same as the front-panel channel scale. Parameters ---------- channel : int An integer, analog input channel 1 or 2 scale_value : float, optional A real number for the vertical scale of the channel in units per division, by default None Returns ------- float A real number for the vertical scale of the channel in units per division Raises ------ ValueError For Channel expected one of: num_channels ValueError For range_value expected to be < 500mV/div """ channel = self._validate_channel(channel) if scale_value is not None: if scale_value <= 0.5: # 500mV/div Scale self.write(f":CHANnel{channel}:SCALe {scale_value}") else: raise ValueError("Invalid Argument. Expected to be <= 500mV/div") else: # query return float(self.query(f":CHANnel{channel}:SCALe?"))
# ============================================================================= # :DISPlay Commands # =============================================================================
[docs] def screenshot( self, path: str = "./screenshot.png", with_time: bool = True, time_fmt: str = "%Y-%m-%d_%H-%M-%S", divider: str = "_", timeout: float = 5000, ): """Save screen to {path} with {image_type}: bmp, jpg, gif, tif, png Adapted from: https://github.com/microsoft/Qcodes/blob/main/src/qcodes/instrument_drivers/Keysight/Infiniium.py """ # we lazy import PIL here to avoid importing pillow when unused from PIL.Image import open as pil_open time_str = datetime.now().strftime(time_fmt) if with_time else "" img_name, img_type = splitext(path) img_path = f"{img_name}{divider if with_time else ''}{time_str}{img_type.lower()}" old_timeout = self._resource.timeout # save current timeout self._resource.timeout = timeout # 5 seconds in milliseconds try: with open(img_path, "wb") as f: screen_bytes = self.query_binary_values( f":DISPlay:DATA? {img_type.upper()[1:]}", # without . # https://docs.python.org/3/library/struct.html#format-characters datatype="B", # Capitcal B for unsigned byte container=bytes, ) f.write(screen_bytes) # type: ignore[arg-type] print(f"Screen image written to {img_path}") except Exception as e: self._resource.timeout = old_timeout # restore original timeout print(f"Failed to save screenshot, Error occurred: \n{e}") finally: self._resource.timeout = old_timeout # restore original timeout
# ============================================================================= # :FUNCtion Commands # =============================================================================
[docs] def function_display(self, function_num: int, state: int | str | None = None) -> int: """The :FUNCtion<N>:DISPlay command turns the display of the specified function_num on or off. Parameters ---------- function_num : int Function Number state : int, str, optional ON, 1, OFF, 0 Returns ------- int The :FUNCtion<N>:DISPlay? query returns the current display condition for the specified function_num Raises ------ ValueError For function_num expected one of: 1-16 """ if int(function_num) < 1 or int(function_num) > 16: raise ValueError("Invalid Argument. Expected one of: 1-16") if state is not None: state = self._validate_state(state) self.write(f":FUNCtion{function_num}:DISPlay {state}") else: # query return int(self.query(f":FUNCtion{function_num}:DISPlay?"))
# ============================================================================= # :SYSTem Commands # =============================================================================
[docs] def system_header(self, state: int | str | None = None) -> int: """!!!! SHOULD BE OFF !!!! The :SYSTem:HEADer command specifies whether the instrument will output a header for query responses. When :SYSTem:HEADer is set to ON, the query responses include the command header. Parameters ---------- state : int | str | None, optional {{ON | 1} | {OFF | 0}}, by default None Returns ------- int {1 | 0} Raises ------ ValueError Expected one of: {{ON | 1} | {OFF | 0}} """ if state is not None: state = self._validate_state(state) self.write(f":SYSTem:HEADer {state}") else: # query return int(self.query(":SYSTem:HEADer?"))
# ============================================================================= # :WAVeform Commands # =============================================================================
[docs] def waveform_byteorder(self, value: str = "LSBFIRST") -> str: """The :WAVeform:BYTeorder command selects the order in which bytes are transferred from (or to) the oscilloscope using WORD and LONG formats Parameters ---------- value : str, optional byteorder {MSBF, LSBF}, by default LSBFIRST Returns ------- str byteorder {MSBF, LSBF} Raises ------ ValueError Expected one of: MSBFIRST, LSBFIRST """ _types = ["MSBF", "MSBFIRST", "LSBF", "LSBFIRST"] if value is not None: value = self._validate_generic(value.upper(), _types) self.write(f":WAVeform:BYTeorder {value}") else: # Query return self.query(":WAVeform:BYTeorder?")
[docs] def waveform_data( self, start: int | None = None, size: int | None = None, datatype: str = "h", container: type = np.array, data_points: int = 0, **kwargs, ) -> np.ndarray: """ The :WAVeform:DATA? query outputs waveform data to the computer over the remote interface. The data is copied from a waveform memory, function, or channel previously specified with the :WAVeform:SOURce command. Parameters ---------- start : int, optional Starting point in the source memory for the first waveform point to transfer, by default None. size : int, optional Number of points in the source memory to transfer. If larger than available data, size is adjusted to the maximum available, by default None. datatype : str, optional Data type for binary values as defined in Python struct, by default "h" (short). container : type, optional Type of container to hold the data, by default np.array. data_points : int, optional Expected number of data points, by default 0. kwargs : dict, optional Additional arguments passed to the query_binary_values method. Returns ------- np.ndarray Acquired data. Raises ------ ValueError If `start` or `size` are invalid (non-integers or negative). NotImplementedError If the waveform format is not "WORD". """ # Validate start and size if start is not None and (not isinstance(start, int) or start < 0): raise ValueError("`start` must be a non-negative integer.") if size is not None and (not isinstance(size, int) or size < 0): raise ValueError("`size` must be a non-negative integer.") # Construct the SCPI message if start is not None and size is not None: message = f":WAVeform:DATA? {start},{size}" elif start is not None: message = f":WAVeform:DATA? {start}" else: message = ":WAVeform:DATA?" # Query the waveform data if self._waveform_format == "WORD": try: return self.query_binary_values( message, datatype=datatype, container=container, data_points=data_points, **kwargs, ) except Exception as e: print("Error:", e) else: raise NotImplementedError( f"Unsupported waveform format: {self._waveform_format}. " "Only 'WORD' format is currently supported." )
[docs] def waveform_format(self, value: str | None = None) -> str: """The :WAVeform:FORMat command sets the data transmission mode for waveform data output. This command controls how the data is formatted when it is sent from the oscilloscope, and pertains to all waveforms. To Do: Only WORD is tested. There is a FLOAT type? Parameters ---------- value : str, optional One of {ASCii | BINary | BYTE | WORD }, by default None Returns ------- str {ASC | BIN | BYTE | WORD } Raises ------ ValueError Expected one of: {ASCii | BINary | BYTE | WORD} """ _types = ["ASC", "ASCII", "BIN", "BINARY", "BYTE", "WORD"] if value is not None: value = self._validate_generic(value.upper(), _types) self.write(f":WAVeform:FORMat {value}") self._waveform_format = self.query(":WAVeform:FORMat?").upper() else: # Query self._waveform_format = self.query(":WAVeform:FORMat?").upper() return self._waveform_format
[docs] def waveform_points(self) -> int: """The :WAVeform:POINts? query returns the points value in the current waveform preamble. Returns ------- int Number of points in the current waveform """ return int(self.query(":WAVeform:POINts?"))
[docs] def waveform_source(self, key: str | None = None, value: int | None = None) -> str: """The :WAVeform:SOURce command selects a channel, function, waveform memory, or histogram as the waveform source TODO: No checks implemented Parameters ---------- key : str | None, optional One of: {CHANnel<N> | DIFF<D> | COMMonmode<C> | FUNCtion<F> | HISTogram | WMEMory<R> | CLOCk | MTRend | MSPectrum | EQUalized | XT<X> | PNOise | INPut | CORRected | ERRor | LFPR | NREDuced}, by default None value : int | None, optional Number e.g. 1 for Channel1, by default None Returns ------- str The :WAVeform:SOURce? query returns the currently selected waveform source. """ if key is not None and value is not None: self.write(f":WAVeform:SOURce {key}{value}") else: return self.query(":WAVeform:SOURce?")
[docs] def waveform_streaming(self, state: int | str | None = None) -> int: """When enabled, :WAVeform:STReaming allows more than 999,999,999 bytes of data to be transferred from the Infiniium oscilloscope to a PC when using the :WAVeform:DATA? query. Parameters ---------- state : int | str | None, optional {{ON | 1} | {OFF | 0}}, by default None Returns ------- int {1 | 0} """ if state is not None: state = self._validate_state(state) self.write(f":WAVeform:STReaming {state}") else: # query return int(self.query(":WAVeform:STReaming?"))
[docs] def waveform_x_increment(self) -> float: """The :WAVeform:XINCrement? query returns the duration between consecutive data points for the currently specified waveform source. Returns ------- float A real number representing the duration between data points on the X axis. """ return float(self.query(":WAVeform:XINCrement?"))
[docs] def waveform_x_origin(self) -> float: """The :WAVeform:XORigin? query returns the X-axis value of the first data point in the data record. Returns ------- float A real number representing the X-axis value of the first data point in the data record. """ return float(self.query(":WAVeform:XORigin?"))
[docs] def waveform_y_increment(self) -> float: """The :WAVeform:YINCrement? query returns the y-increment voltage value for the currently specified source. Returns ------- float A real number in exponential format. """ return float(self.query(":WAVeform:YINCrement?"))
[docs] def waveform_y_origin(self) -> float: """The :WAVeform:YORigin? query returns the y-origin voltage value for the currently specified source. The voltage value returned is the voltage value represented by the waveform data digital code 00000. Returns ------- float A real number in exponential format. """ return float(self.query(":WAVeform:YORigin?"))