"""
Created on Tue Dec 14 12:58:27 2021
@author: Martin.Mihaylov
"""
import numpy as np
import pandas as pd
import pyvisa
from .BaseInstrument import BaseInstrument
[docs]
class AQ6370D(BaseInstrument):
"""
This class uses BaseInstrument to connect to a Yokogawa AQ6370D.
Need to have python 'pyvisa', 'pandas' and 'numpy' libraries installed!
"""
def __init__(self, resource_str: str, visa_library: str = "@py", **kwargs):
"""
Get name and identification.
Make a restart of the instrument in the beginning to get the instrument
to default settings.
"""
super().__init__(resource_str, visa_library=visa_library, **kwargs)
print(self.get_idn())
# =============================================================================
# Start Sweep
# =============================================================================
[docs]
def start_sweep(self):
"""
Makes a sweep
"""
self.write(":INITIATE")
# =============================================================================
# Stop Measurment
# =============================================================================
[docs]
def stop(self) -> None:
""" """
self.write("ABORt")
# =============================================================================
# ASK
# =============================================================================
[docs]
def get_display_auto_y(self) -> str:
"""
Queries the automatic setting function of
the subscale of the level axis.
Response 0 = OFF, 1 = ON
"""
data = self.query(":DISPLAY:TRACE:Y2:AUTO?")
if data == "0":
return "OFF"
else:
return "ON"
[docs]
def get_display_y_unit(self) -> str | None:
"""
Queries the units of the main scale of the level axis.
DBM = dBm
W = W
DBM/NM = dBm/nm or dBm/THz
W/NM = W/nm or W/THz
"""
data = self.query(":DISPLAY:TRACE:Y1:UNIT?")
if data == "0":
return "dBm"
elif data == "1":
return "W"
elif data == "2":
return "DBM/NM"
elif data == "3":
return "W/NM"
[docs]
def get_wavelength_start(self) -> float:
"""
Queries the measurement condition
measurement start wavelength
"""
return float(self.query(":SENSE:WAVELENGTH:START?"))
[docs]
def get_wavelength_stop(self) -> float:
"""
Queries the measurement condition
measurement start wavelength
"""
return float(self.query(":SENSE:WAVELENGTH:STOP?"))
[docs]
def get_center_wavelenght(self) -> float:
"""
Queries the synchronous sweep function.
"""
return float(self.query(":SENSe:WAVelength:CENTer?"))
[docs]
def get_unit_x(self) -> str | None:
"""
Queries the units for the X axis.
For AQ6370C, AQ6373 or AQ6373B
WAVelength = Wavelength
FREQuency = Frequency
"""
data = self.query(":UNIT:X?")
if data == "0":
return "WAVelength"
elif data == "1":
return "FREQuency"
elif data == "2":
return "WNUMber"
[docs]
def get_trace_state(self) -> str:
"""
Queries the display status of the specified
trace.
"""
data = self.query("TRACe:STATe?")
if data == "0":
return "Trace is OFF"
else:
return "Trace is ON"
[docs]
def get_trace_active(self) -> str:
"""
Queries the trace to be transferred.
Outputs - (TRA|TRB|TRC|TRD|TRE|TRF|TRG)
"""
return self.query(":TRACe:ACTive?")
[docs]
def get_central_wavelenght(self) -> float:
"""
Queries the center wavelength of the
X-axis of the display scale
"""
return float(self.query(":SENSE:WAVELENGTH:CENTER?"))
[docs]
def get_span(self) -> float:
"""
Queries the measurement condition
measurement span.
"""
return float(self.query(":SENSE:WAVELENGTH:SPAN?"))
[docs]
def get_trace_resolution(self, state: str) -> list:
"""
Queries the actual resolution data of the
specified trace.
Parameters
----------
state : str
Trace selected - ['TRA','TRB','TRC','TRD','TRE','TRF','TRG']
Raises
------
ValueError
Error message
"""
state_list = ["TRA", "TRB", "TRC", "TRD", "TRE", "TRF", "TRG"]
if state in state_list:
data = self.query("CALCULATE:ARESOLUTION? " + str(state)).split(",")
data = list(np.array(data, dtype=np.float32))
return data
else:
raise ValueError("Unknown input! See function description for more info.")
[docs]
def get_bw_resolution(self) -> float:
"""
Queries the measurement resolution.
"""
return float(self.query(":SENSE:BANDWIDTH?"))
[docs]
def get_sensitivity(self) -> str:
"""
Queries the measurement sensitivity.
"""
data = self.query(":SENSE:SENSE?")
if data == "0":
return "NHLD"
elif data == "1":
return "NAUT"
elif data == "2":
return "MID"
elif data == "3":
return "HIGH1"
elif data == "4":
return "HIGH2"
elif data == "5":
return "HIGH3"
else:
return "NORMAL"
[docs]
def get_average_count(self) -> float:
"""
Queries the number of times averaging for
each measured point.
"""
return float(self.query(":SENSE:AVERAGE:COUNT?"))
[docs]
def get_segment_points(self) -> float:
"""
Queries the number of sampling points
to be measured at one time when performing
SEGMENT MEASURE.
"""
return float(self.query(":SENSE:SWEEP:SEGMENT:POINTS?"))
[docs]
def get_sample_points(self) -> float:
"""
Queries the number of samples measured
"""
return float(self.query(":SENSE:SWEEP:POINTS?"))
[docs]
def get_sample_points_auto(self) -> str:
"""
Queries the function of automatically
setting the sampling number to be measured
Response 0 = OFF, 1 = ON
"""
data = self.query(":SENSE:SWEEP:POINTS:AUTO?")
if data == "0":
return "OFF"
else:
return "ON"
[docs]
def get_sweep_speed(self) -> str:
"""
Queries the sweep speed
1x|0: Standard
2x|1: Twice as fast as standard
"""
data = self.query(":SENSE:SWEEP:SPEED?")
if data == "0":
return "Standard"
else:
return "Twice as fast as standard"
[docs]
def get_trace_data_x(self, state: str) -> list:
"""
Queries the wavelength axis data of the
specified trace.
Parameters
----------
state : str
Name of the trace that should be extract/selected.
state = [TRA|TRB|TRC|TRD|TRE|TRF|TRG]
Raises
------
ValueError
Error message
"""
state_list = ["TRA", "TRB", "TRC", "TRD", "TRE", "TRF", "TRG"]
if state in state_list:
data = self.query(":TRACE:X? " + str(state)).split(",")
data = list(np.array(data, dtype=np.float32))
return data
else:
raise ValueError("Unknown input! See function description for more info.")
[docs]
def get_trace_data_y(self, state: str) -> list:
"""
Queries the level axis data of specified trace.
Parameters
----------
state : str
Name of the trace that should be extract/selected.
state = [TRA|TRB|TRC|TRD|TRE|TRF|TRG]
Raises
------
ValueError
Error message
"""
state_list = ["TRA", "TRB", "TRC", "TRD", "TRE", "TRF", "TRG"]
if state in state_list:
data = self.query(":TRACE:Y? " + str(state)).split(",")
data = list(np.array(data, dtype=np.float32))
return data
else:
raise ValueError("Unknown input! See function description for more info.")
[docs]
def get_sweep_mode(self) -> str | None:
"""
Queries the sweep mode
['SINGle','REPeat','AUTO','SEGMent']
"""
data = self.query(":INITiate:SMODe?")
if data == "1":
return "SINGle"
elif data == "2":
return "REPeat"
elif data == "3":
return "AUTO"
elif data == "4":
return "SEGMent"
[docs]
def get_trace_attribute(self, state: str) -> str:
"""
Queries the attributes of the specified
trace
['WRITe','FIX','MAX','MIN','RAVG','CALC']
WRITe = WRITE
FIX = FIX
MAX = MAX HOLD
MIN = MIN HOLD
RAVG = ROLL AVG
CALC = CALC
Parameters
----------
state : str
Name of the trace that should be extract/selected.
state_list = ['TRA','TRB','TRC','TRD','TRE','TRF','TRG']
Raises
------
ValueError
Error message
"""
state_list = ["TRA", "TRB", "TRC", "TRD", "TRE", "TRF", "TRG"]
if state in state_list:
data = self.query(":TRACE:ATTRIBUTE:" + str(state) + "?")
else:
raise ValueError("Unknown input! See function description for more info.")
if data == "0":
return "WRITE"
elif data == "1":
return "FIX"
elif data == "2":
return "MAX HOLD"
elif data == "3":
return "MIN HOLD"
elif data == "4":
return "ROLL AVG"
else:
return "CALC"
# =============================================================================
# SET
# =============================================================================
[docs]
def set_display_y_unit(self, state: str) -> None:
"""
Parameters
----------
state : str
Set the units of the main scale of the
level axis
['dBm','W','DBM/NM','W/NM']
Raises
------
ValueError
Error message
"""
state_list = ["dBm", "W", "DBM/NM", "W/NM"]
if state in state_list:
self.write(":DISPLAY:TRACE:Y1:UNIT " + str(state))
else:
raise ValueError("Unknown input! See function description for more info.")
[docs]
def set_wavelength_start(self, value: float, unit: str) -> None:
"""
Parameters
----------
value : int/float
Set the measurement condition
unit : str
units - [M|HZ].
Raises
------
ValueError
Error message
"""
unit_list = ["M", "HZ"]
if unit in unit_list:
self.write(":SENSE:WAVELENGTH:START " + str(value) + str(unit))
else:
raise ValueError("Unknown input! See function description for more info.")
[docs]
def set_wavelength_stop(self, value: float, unit: str) -> None:
"""
Set the measurement condition
measurement stop wavelength
[M|HZ]
"""
unit_list = ["M", "HZ"]
if unit in unit_list:
self.write(":SENSE:WAVELENGTH:STOP " + str(value) + str(unit))
else:
raise ValueError("Unknown input! See function description for more info.")
[docs]
def set_unit_x(self, unit: str) -> None:
"""
Parameters
----------
unit : str
Set the units for the X axis.
unit_list = ['WAV','FREQ','WNUM']
Raises
------
ValueError
Error message
"""
unit_list = ["WAV", "FREQ", "WNUM"]
if unit in unit_list:
self.write(":UNIT:X " + unit)
else:
raise ValueError("Unknown input! See function description for more info.")
[docs]
def set_center_wavelenght(self, value: float, unit: str) -> None:
"""
Parameters
----------
value : int/float
Set the center wavelength of the
X-axis of the display scale
unit : str
Units = ['M','HZ']
Raises
------
ValueError
Error message
"""
unit_list = ["M", "HZ"]
if unit in unit_list:
self.write(":SENSE:WAVELENGTH:CENTER " + str(value) + str(unit))
else:
raise ValueError("Unknown input! See function description for more info.")
[docs]
def set_span(self, value: float, unit: str) -> None:
"""
Parameters
----------
value : int/float
Set the measurement span.
unit : str
unit_lists = ['M','HZ']
"""
unit_list = ["M", "HZ"]
if unit in unit_list:
self.write(":SENSE:WAVELENGTH:SPAN " + str(value) + str(unit))
else:
raise ValueError("Unknown input! See function description for more info.")
[docs]
def set_bw_resolution(self, value: float, unit: str) -> None:
"""
Parameters
----------
value : int/float
Set the measurement resolution.
unit : str
unit_list = ['M','HZ']
Raises
------
ValueError
Error message
"""
unit_list = ["M", "HZ"]
if unit in unit_list:
self.write(":SENSE:BANDWIDTH:RESOLUTION " + str(value) + str(unit))
else:
raise ValueError("Unknown input! See function description for more info.")
[docs]
def set_sensitivity(self, unit: str) -> None:
"""
Parameters
----------
unit : str
Set the measurement sensitivity.
NHLD = NORMAL HOLD
NAUT = NORMAL AUTO
NORMal = NORMAL
MID = MID
HIGH1 = HIGH1 or HIGH1/CHOP
HIGH2 = HIGH2 or HIGH2/CHOP
HIGH3 = HIGH3 or HIGH3/CHOP
Raises
------
ValueError
Error message
"""
unit_list = ["NHLD", "NAUT", "MID", "HIGH1", "HIGH2", "HIGH3", "NORMAL"]
if unit in unit_list:
self.write(":SENSE:SENSE " + str(unit))
else:
raise ValueError("Unknown input! See function description for more info.")
[docs]
def set_average_count(self, value: int) -> None:
"""
Parameters
----------
value : int
Set the number of times averaging for
each measured point.
"""
value = int(value)
self.write(":SENSE:AVERAGE:COUNT " + str(value))
[docs]
def set_segment_points(self, value: int) -> None:
"""
Parameters
----------
value :int
Set the number of sampling points
to be measured at one time when performing
SEGMENT MEASURE.
"""
value = int(value)
self.write(":SENSE:SWEEP:SEGMENT:POINTS " + str(value))
[docs]
def set_sample_points(self, value: int) -> None:
"""
Parameters
----------
value : int
Set the number of samples measured
"""
value = int(value)
self.write(":SENSE:SWEEP:POINTS " + str(value))
[docs]
def set_sweep_speed(self, value: int) -> None:
"""
Parameters
----------
value : int
Set the sweep speed.
1 - Standard
2 - Twice as fast as standard
"""
value = int(value)
if value == 1:
self.write(":SENSE:SWEEP:SPEED 1x")
elif value == 2:
self.write(":SENSE:SWEEP:SPEED 2x")
else:
raise ValueError("Unknown input! See function description for more info.")
[docs]
def set_sample_points_auto(self, state: str) -> None:
"""
Parameters
----------
state : str
Set the function of automatically
setting the sampling number to be measured
['ON'|'OFF']
Raises
------
ValueError
Error message
"""
state_list = ["ON", "OFF"]
if state in state_list:
self.write(" :SENSE:SWEEP:POINTS:AUTO " + str(state))
else:
raise ValueError("Unknown input! See function description for more info.")
[docs]
def set_trace_active(self, state: str) -> None:
"""
Parameters
----------
state : str
Sets the active trace.
state_list = ['TRA','TRB','TRC','TRD','TRE','TRF','TRG']
Raises
------
ValueError
Error message
"""
state_list = ["TRA", "TRB", "TRC", "TRD", "TRE", "TRF", "TRG"]
if state in state_list:
self.write(":TRACE:ACTIVE " + str(state))
else:
raise ValueError("Unknown input! See function description for more info.")
[docs]
def set_sweep_mode(self, state: str) -> None:
"""
Parameters
----------
state : str
Set the sweep mode
['SINGle','REPeat','AUTO','SEGMent']
Raises
------
ValueError
Error message
"""
valid_state = self._check_scpi_param(state, ["SINGle", "REPeat", "AUTO", "SEGMent"])
self.write("INITIATE:SMODE " + valid_state)
[docs]
def set_trace_attribute(self, trace: str, state: str) -> None:
"""
Parameters
----------
trace : str
Sets the active trace.
state_list = ['TRA','TRB','TRC','TRD','TRE','TRF','TRG']
state : str
Set the attributes of the specified
trace
['WRITe','FIX','MAX','MIN','RAVG','CALC']
WRITe = WRITE
FIX = FIX
MAX = MAX HOLD
MIN = MIN HOLD
RAVG = ROLL AVG
CALC = CALC
Raises
------
ValueError
Error message
"""
valid_state = self._check_scpi_param(state, ["WRITe", "FIX", "MAX", "MIN", "RAVG", "CALC"])
valid_trace = self._check_scpi_param(
trace, ["TRA", "TRB", "TRC", "TRD", "TRE", "TRF", "TRG"]
)
self.write(f":TRACE:ATTRIBUTE:{valid_trace} {valid_state}")
# =============================================================================
# get Data
# =============================================================================
[docs]
def get_data(self, state: str) -> pd.DataFrame:
"""
Get data on X and Y Traces from OSA.
Data Output is CST File.
Data is Saved in X Column and Y Column
state = 'TRA','TRB','TRC','TRD','TRE','TRF','TRG'
"""
state_list = ["TRA", "TRB", "TRC", "TRD", "TRE", "TRF", "TRG"]
if state in state_list:
# Get Headers
header_x = self.get_unit_x()
header_y = self.get_display_y_unit()
# Get Data
data_x = self.get_trace_data_x(state)
data_y = self.get_trace_data_y(state)
# create CSV
data = {str(header_x): data_x, str(header_y): data_y}
data = pd.DataFrame(data, columns=[str(header_x), str(header_y)])
return data
else:
raise ValueError("Unknown input! See function description for more info.")
[docs]
def print_params_osa(self) -> None:
"""
Parameters set on the Yokogawa AQ6370D
"""
print("################ OSA Parameters ################")
print("Y-Axis units = ", self.get_display_y_unit())
print("X-Axis units = ", self.get_unit_x())
print("Start Wavelength = ", self.get_wavelength_start())
print("Stop Wavelength = ", self.get_wavelength_stop())
print("Bandwidth Resolution = ", self.get_bw_resolution())
print("Center Wavelength = ", self.get_center_wavelenght())
print("Span = ", self.get_span())
print("Output data format = ", self.get_data_format())
print("Displayed trace = ", self.get_trace_state())
print("Selected Trace = ", self.get_trace_active())
print("Averaging Points = ", self.get_average_count())
print("Sample Points = ", self.get_sample_points())
print("Sweep speed = ", self.get_sweep_speed())
print("Sweep Mode = ", self.get_sweep_mode())
print("################ OSA Parameters ################")
[docs]
def get_params_osa(self) -> tuple[list[str], list]:
"""
Parameters set on the Yokogawa AQ6370D
"""
header = [
"Y-Axis units",
"X-Axis units",
"Start Wavelength",
"Stop Wavelength",
"Bandwidth Resolution",
"Center Wavelength",
"Span",
"Output data format",
"Displayed trace",
"Selected Trace",
"Averaging Points",
"Sample Points",
"Sweep speed",
"Sweep Mode",
]
data: list = []
data.append(self.get_display_y_unit())
data.append(self.get_unit_x())
data.append(self.get_wavelength_start())
data.append(self.get_wavelength_stop())
data.append(self.get_bw_resolution())
data.append(self.get_center_wavelenght())
data.append(self.get_span())
data.append(self.get_data_format())
data.append(self.get_trace_state())
data.append(self.get_trace_active())
data.append(self.get_average_count())
data.append(self.get_sample_points())
data.append(self.get_sweep_speed())
data.append(self.get_sweep_mode())
return header, data