#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Mon Dec 1 20:17:32 2024
@author: Maxim Weizel
"""
import numpy as np
import pyvisa as visa
from os.path import splitext
from datetime import datetime
[docs]
class UXR:
"""
This class is using pyvisa to connect to Instruments. Please install PyVisa before using it.
"""
def __init__(
self,
resource_str="TCPIP0::KEYSIGH-Q75EBO9.local::hislip0::INSTR",
num_channel=2,
visa_library="@ivi", # If you have problems try "@py"!
# Or try setting Keysight visa32.dll as primary!
):
self._resource = visa.ResourceManager(visa_library).open_resource(
str(resource_str), read_termination="\n", query_delay=0.5
)
print(self.IDN())
# Internal Variables and Predefined Lists
self._types_channel = list(range(1, num_channel + 1))
self._waveform_format = "ASC"
self._types_channel = list(range(1, num_channel + 1))
self._StateLS_mapping = {
"on": "ON",
"off": "OFF",
1: "ON",
0: "OFF",
"1": "ON",
"0": "OFF",
True: "ON",
False: "OFF",
}
# Default Settings
self.system_header("off") # Default is off and should stay off!!!
self.waveform_byteorder("LSBFirst")
self.waveform_format("WORD") # Data Aquisition is only implemented for WORD yet.
self.waveform_streaming("off")
[docs]
def query(self, message):
return self._resource.query(message)
[docs]
def query_binary_values(
self, message, datatype="h", container=np.array, data_points=int(0), **kwargs
):
return self._resource.query_binary_values(
message,
datatype=datatype,
container=container,
data_points=data_points,
**kwargs,
)
[docs]
def write(self, message):
return self._resource.write(message)
[docs]
def Close(self):
self._resource.close()
# =============================================================================
# Checks and Validations
# =============================================================================
def _validate_generic(self, value: str | int, valid_List: list) -> str:
if value not in valid_List:
raise ValueError(f"Invalid value given! Value can be one of {valid_List}.")
return value
def _validate_channel(self, channel: int) -> int:
channel = int(channel)
if channel not in self._types_channel:
raise ValueError(
f"Invalid channel number given! Channel Number can be one of {self._types_channel}."
)
return channel
def _validate_state(self, state: int | str) -> str:
state_normalized = self._StateLS_mapping.get(
state.lower() if isinstance(state, str) else int(state)
)
if state_normalized is None:
raise ValueError("Invalid state given! State can be [ON,OFF,1,0,True,False].")
return state_normalized
# =============================================================================
# * (Common) Commands
# =============================================================================
[docs]
def clear_status(self) -> None:
"""The ``*CLS`` command clears all status and error registers."""
self.write("*CLS")
[docs]
def IDN(self) -> str:
"""The ``*IDN?`` query returns the company name, oscilloscope model number, serial
number, and software version by returning this string:
Keysight Technologies,<Model #>,<USXXXXXXXX>,<Rev #>[,<Options>]
Returns
-------
str
Keysight Technologies,DSO9404A,USXXXXXXXX,XX.XX.XXXX
"""
return self.query("*IDN?")
[docs]
def OPC(self) -> int:
"""Places a “1” into the output queue when all device
operations have been completed
Returns
-------
TYPE str
1 or 0
"""
return int(self.query("*OPC?"))
[docs]
def reset(self) -> None:
"""The ``*RST`` command performs a default setup which is the same as pressing the
oscilloscope front panel [Default Setup] key.
"""
self.write("*RST")
# =============================================================================
# : (Root Level) Commands
# =============================================================================
[docs]
def aquisition_done(self) -> int:
"""The :ADER? query reads the Acquisition Done Event Register and returns 1 or 0.
After the Acquisition Done Event Register is read, the register is cleared. The
returned value 1 indicates an acquisition completed event has occurred and 0
indicates an acquisition completed event has not occurred.
Returns
-------
int
{1 | 0}
"""
return int(self.query(":ADER?"))
[docs]
def aquisition_state(self) -> str:
"""The :ASTate? query returns the acquisition state.
Returns
-------
str
{ARM | TRIG | ATRIG | ADONE}
"""
return self.query(":ASTate?")
[docs]
def autoscale(self) -> None:
"""The :AUToscale command causes the oscilloscope to evaluate all input waveforms
and find the optimum conditions for displaying the waveform.
"""
self.write(":AUToscale")
[docs]
def autoscale_channels(self, value: str | None = None) -> str:
"""The :AUToscale:CHANnels command selects whether to apply autoscale to all of
the input channels or just the input channels that are currently displayed.
Parameters
----------
value : str, optional
{ALL | DISPlayed}, if None then query
Returns
-------
str
{ALL | DISP}
Raises
------
ValueError
Expected one of: {ALL | DISP | DISPlayed }
"""
_types = ["ALL", "DISPLAYED", "DISP"]
if value is not None:
value = self._validate_generic(value.upper(), _types)
self.write(f":AUToscale:CHANnels {value}")
else: # Query
return self.query(":AUToscale:CHANnels?")
[docs]
def digitize(self, channel_num: int | None = None) -> None:
"""This command initializes the selected channels or functions, then acquires
them according to the current oscilloscope settings. When all waveforms are
completely acquired, the oscilloscope is stopped.
To Do: input can be: [CHANnel<N> | DIFF<D> | COMMonmode<C>]
Parameters
----------
channel_num : int
Number of the Channel
Raises
------
channel_numError
Expected one of: channel number
"""
if channel_num is not None:
channel_num = self._validate_channel(channel_num)
self.write(f":DIGitize CHANnel{channel_num}")
else:
self.write(":DIGitize")
[docs]
def run_state(self) -> str:
"""The :RSTate? query returns the run state:
Returns
-------
str
{RUN | STOP | SING}
"""
return self.query(":RSTate?")
[docs]
def run(self) -> None:
"""
Set the scope in run mode.
"""
self.write(":RUN")
[docs]
def single(self) -> None:
"""
Take a single acquisition
"""
self.write(":SING")
[docs]
def status(self, key: str | None = None, value: int | None = None) -> int:
"""The :STATus? query shows whether the specified channel, function, wmemory,
histogram, measurement trend, measurement spectrum, or equalized waveform is
on or off.
TODO: Each type has a different range of values that is excepted. No Checking
is implemented.
Parameters
----------
key : str, optional
if None return status of Channel1
value : int, optional
For Channel [1,2], for Function <=16
Returns
-------
int
A return value of 1 means on and a return value of 0 means off
Raises
------
ValueError
Expected one of: CHANNEL, FUNCTION, HIST, ... etc.
"""
_types_key = [
"CHAN",
"CHANNEL",
"DIFF",
"COMM",
"COMMONMODE",
"FUNC",
"FUNCTION",
"HIST",
"HISTOGRAM",
"WMEM",
"WMEMORY",
"CLOC",
"CLOCK" "MTR",
"MTREND",
"MSP",
"MSPECTRUM",
"EQU",
"EQUALIZED",
"XT",
]
if key is not None:
key = self._validate_generic(key.upper(), _types_key)
if int(value) <= 16: # For CHAN <=2, for FUNC <=16, ... etc.
return int(self.query(f":STATus? {key}{value}"))
else:
return int(self.query(f":STATus? CHANnel1"))
[docs]
def stop(self) -> None:
"""
Set the scope in stop mode.
"""
self.write(":STOP")
# =============================================================================
# :CHANnel<N> Commands
# =============================================================================
[docs]
def channel_display(self, channel: int, state: int | str | None = None) -> int:
"""The :CHANnel<N>:DISPlay command turns the display of the specified channel on
or off.
Parameters
----------
channel : int
An integer, analog input channel 1 or 2
state : int, str, optional
ON, 1, OFF, 0
Returns
-------
int
The :CHANnel<N>:DISPlay? query returns the current display condition for the
specified channel
"""
channel = self._validate_channel(channel)
if state is not None:
state = self._validate_state(state)
self.write(f":CHANnel{channel}:DISPlay {state}")
else: # query
return int(self.query(f":CHANnel{channel}:DISPlay?"))
[docs]
def channel_range(self, channel: int, range_value: float | None = None) -> float:
"""The :CHANnel<N>:RANGe command defines the full-scale vertical axis of the
selected channel. The values represent the full-scale deflection factor of the
vertical axis in volts. These values change as the probe attenuation factor is changed.
Parameters
----------
channel : int
An integer, analog input channel 1 or 2
range_value : float, optional
A real number for the full-scale voltage of the specified channel number,
by default None
Returns
-------
float
full-scale vertical axis of the selected channel
Raises
------
ValueError
For Channel expected one of: num_channels
ValueError
For range_value expected to be < 2V
"""
channel = self._validate_channel(channel)
if range_value is not None:
if range_value <= 4: # 2V Full Scale Range
self.write(f":CHANnel{channel}:RANGe {range_value}")
else:
raise ValueError("Invalid Argument. Expected to be <= 4V")
else: # query
return float(self.query(f":CHANnel{channel}:RANGe?"))
[docs]
def channel_scale(self, channel: int, scale_value: float | None = None) -> float:
"""The :CHANnel<N>:SCALe command sets the vertical scale, or units per division, of
the selected channel. This command is the same as the front-panel channel scale.
Parameters
----------
channel : int
An integer, analog input channel 1 or 2
scale_value : float, optional
A real number for the vertical scale of the channel in units per division,
by default None
Returns
-------
float
A real number for the vertical scale of the channel in units per division
Raises
------
ValueError
For Channel expected one of: num_channels
ValueError
For range_value expected to be < 500mV/div
"""
channel = self._validate_channel(channel)
if scale_value is not None:
if scale_value <= 0.5: # 500mV/div Scale
self.write(f":CHANnel{channel}:SCALe {scale_value}")
else:
raise ValueError("Invalid Argument. Expected to be <= 500mV/div")
else: # query
return float(self.query(f":CHANnel{channel}:SCALe?"))
# =============================================================================
# :DISPlay Commands
# =============================================================================
[docs]
def screenshot(
self,
path: str = "./screenshot.png",
with_time: bool = True,
time_fmt: str = "%Y-%m-%d_%H-%M-%S",
divider: str = "_",
timeout: float = 5000,
):
"""Save screen to {path} with {image_type}: bmp, jpg, gif, tif, png
Adapted from:
https://github.com/microsoft/Qcodes/blob/main/src/qcodes/instrument_drivers/Keysight/Infiniium.py
"""
# we lazy import PIL here to avoid importing pillow when unused
from PIL.Image import open as pil_open
time_str = datetime.now().strftime(time_fmt) if with_time else ""
img_name, img_type = splitext(path)
img_path = f"{img_name}{divider if with_time else ''}{time_str}{img_type.lower()}"
old_timeout = self._resource.timeout # save current timeout
self._resource.timeout = timeout # 5 seconds in milliseconds
try:
with open(img_path, "wb") as f:
screen_bytes = self.query_binary_values(
f":DISPlay:DATA? {img_type.upper()[1:]}", # without .
# https://docs.python.org/3/library/struct.html#format-characters
datatype="B", # Capitcal B for unsigned byte
container=bytes,
)
f.write(screen_bytes) # type: ignore[arg-type]
print(f"Screen image written to {img_path}")
except Exception as e:
self._resource.timeout = old_timeout # restore original timeout
print(f"Failed to save screenshot, Error occurred: \n{e}")
finally:
self._resource.timeout = old_timeout # restore original timeout
# =============================================================================
# :FUNCtion Commands
# =============================================================================
[docs]
def function_display(self, function_num: int, state: int | str | None = None) -> int:
"""The :FUNCtion<N>:DISPlay command turns the display of the specified function_num on
or off.
Parameters
----------
function_num : int
Function Number
state : int, str, optional
ON, 1, OFF, 0
Returns
-------
int
The :FUNCtion<N>:DISPlay? query returns the current display condition for the
specified function_num
Raises
------
ValueError
For function_num expected one of: 1-16
"""
if int(function_num) < 1 or int(function_num) > 16:
raise ValueError("Invalid Argument. Expected one of: 1-16")
if state is not None:
state = self._validate_state(state)
self.write(f":FUNCtion{function_num}:DISPlay {state}")
else: # query
return int(self.query(f":FUNCtion{function_num}:DISPlay?"))
# =============================================================================
# :SYSTem Commands
# =============================================================================
# =============================================================================
# :WAVeform Commands
# =============================================================================