Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • qutech-qdlabs/core_tools
1 result
Show changes
Commits on Source (9)
Showing
with 151 additions and 164 deletions
# Changelog
All notable changes to core_tools will be documented in this file.
## \[1.5.12] - 2025-03-07
- Video mode: Fix general settings in favorites
- "gates" object now uses virtual matrix as shown in GUI and not the normalized matrix.
## \[1.5.11] - 2025-02-20
- Added snapshot data_writer
......
......@@ -2,6 +2,7 @@ import logging
import os
import re
from io import StringIO
from typing import Any
from PyQt5 import QtWidgets
from ruamel.yaml import YAML
......@@ -51,7 +52,7 @@ C:\Users\<user>\.core_tools\videomode on Windows.
"""
def yaml_string(data: dict[str, any]) -> str:
def yaml_string(data: dict[str, Any]) -> str:
if data in [{}, ""]:
return ""
yaml = YAML()
......@@ -156,10 +157,10 @@ class Favorites:
if self._ui_check_2D.isChecked():
settings["2D"] = self._parse_settings_text(self._ui_settings_2D, "2D")
if self._ui_check_gen.isChecked():
settings["3D"] = self._parse_settings_text(self._ui_settings_gen, "gen")
settings["gen"] = self._parse_settings_text(self._ui_settings_gen, "gen")
return settings
def load_selected(self, active_settings: dict[str, any]):
def load_selected(self, active_settings: dict[str, Any]):
currentItem = self._ui_name_list.currentItem()
name = currentItem.text() if currentItem is not None else ""
if name == "":
......
from abc import abstractmethod
import copy
from functools import partial
import logging
from abc import abstractmethod
from functools import partial
from typing import Any
from PyQt5 import QtCore, QtWidgets
......@@ -16,7 +17,7 @@ class Settings:
self._group_name = group_name
self._update_plot = update_plot
self._gui_elements: dict[str, GuiElement] = {}
self._values: dict[str, any] = {}
self._values: dict[str, Any] = {}
def add(self, name: str, widget, plot_setting: bool = False):
if isinstance(widget, QtWidgets.QCheckBox):
......@@ -34,7 +35,7 @@ class Settings:
self._gui_elements[name] = gui_element
self._values[name] = gui_element.get_value()
def update_value(self, name: str, value: any):
def update_value(self, name: str, value: Any):
self._values[name] = value
if self._gui_elements[name].plot_setting:
self._update_plot()
......@@ -44,10 +45,10 @@ class Settings:
def __getitem__(self, name: str):
return self._values[name]
def set_value(self, name: str, value: any):
def set_value(self, name: str, value: Any):
self._gui_elements[name].set_value(value)
def update(self, values: dict[str, any]):
def update(self, values: dict[str, Any]):
for name, value in values.items():
if name not in self._values:
logger.warning(f"Setting {self._group_name}:{name} does not exist")
......@@ -55,7 +56,7 @@ class Settings:
if self._values[name] != value:
self.set_value(name, value)
def to_dict(self) -> dict[str, any]:
def to_dict(self) -> dict[str, Any]:
return copy.deepcopy(self._values)
......@@ -179,7 +180,7 @@ class CheckboxList(GuiElement):
self._value_changed(sorted(self._checked))
def set_value(self, value: list[str]):
self._checked = set(value)
self._checked = set([v for v in value if v in self._check_boxes])
for name, check_box in self._check_boxes.items():
check_box.setChecked(name in self._checked)
......
import logging
from typing import Any
from pulse_lib.base_pulse import pulselib
......@@ -9,8 +10,8 @@ logger = logging.getLogger(__name__)
class PulselibSettings:
def __init__(self, pulse_lib: pulselib):
self._pulse_lib = pulse_lib
self._attenuations: dict[str, any] = None
self._v_gate_projection: dict[str, any] = None
self._attenuations: dict[str, Any] = None
self._v_gate_projection: dict[str, Any] = None
# TODO change this check in a required update to 1.7.31+
if hasattr(pulselib, "get_virtual_gate_projection"):
......@@ -37,4 +38,3 @@ class PulselibSettings:
self._attenuations != attenuations
or self._v_gate_projection != v_gate_projection
)
from collections import defaultdict
from typing import Any
import time
import logging
import numpy as np
......@@ -42,7 +43,7 @@ class KeysightFastScanParameter(FastScanParameterBase):
# Create dict with digitizers and used channel numbers.
# dict[digitizer, List[channel_numbers]]
self.dig_channel_nums: dict[any, list[int]] = defaultdict(set)
self.dig_channel_nums: dict[Any, list[int]] = defaultdict(set)
channels = [ch for ch, _, _ in scan_config.channel_map.values()]
if digitizer is not None:
for ch in channels:
......@@ -450,12 +451,12 @@ class FastScanGenerator(FastScanGeneratorBase):
else:
for ch_num in dig_channels:
hvi_dig_channels[self.digitizer.name].add(ch_num)
video_mode_channels = {name:list(channels) for name, channels in hvi_dig_channels.items()}
video_mode_channels = {name: list(channels) for name, channels in hvi_dig_channels.items()}
if not hasattr(my_seq, 'schedule_params'):
raise Exception('Update pulse-lib to v1.7.11+')
my_seq.schedule_params["acquisition_period"]= acquisition_period
my_seq.schedule_params["acquisition_period"] = acquisition_period
my_seq.schedule_params["number_of_points"] = n_pts
my_seq.schedule_params["number_of_lines"] = n_lines
my_seq.schedule_params["start_delay"] = start_delay
......
from abc import abstractmethod
from collections.abc import Sequence
from dataclasses import dataclass, field
from typing import Callable
from typing import Any, Callable
import numpy as np
from qcodes import MultiParameter
......@@ -214,7 +214,7 @@ class FastScanParameterBase(MultiParameter):
def snapshot_base(self,
update: bool | None = True,
params_to_skip_update: Sequence[str] | None = None
) -> dict[any, any]:
) -> dict[Any, Any]:
snapshot = super().snapshot_base(update, params_to_skip_update)
snapshot.update({"parameters": self.config.snapshot()})
return snapshot
......
import logging
import os
from collections.abc import Sequence
from typing import Callable
from typing import Any, Callable
import numpy as np
import pyqtgraph as pg
......@@ -1154,7 +1154,7 @@ class vm_data_param(MultiParameter):
def snapshot_base(self,
update: bool | None = True,
params_to_skip_update: Sequence[str] | None = None
) -> dict[any, any]:
) -> dict[Any, Any]:
snapshot = super().snapshot_base(update, params_to_skip_update)
snapshot["parameters"] = self.param.snapshot().get("parameters", {})
return snapshot
......
from dataclasses import dataclass
from functools import partial
from typing import Any
from PyQt5.QtCore import QThread
from PyQt5 import QtWidgets, QtGui
from PyQt5 import QtCore
......@@ -15,14 +16,16 @@ logger = logging.getLogger(__name__)
colormap = colormaps["viridis"]
colormap._init()
lut = np.array(colormap.colors)*255 # Convert matplotlib colormap from 0-1 to 0-255 for Qt
lut = np.array(colormap.colors)*255 # Convert matplotlib colormap from 0-1 to 0-255 for Qt
@dataclass
class plot_widget_data:
plot_widget: pg.PlotWidget # widget.
plot_items: list # line in the plot.
color_bar: any = None
cross: tuple[any] = None
plot_widget: pg.PlotWidget # widget.
plot_items: list # line in the plot.
color_bar: Any = None
cross: tuple[Any] = None
class plot_param:
def __init__(self, multi_parameter, i):
......@@ -54,7 +57,7 @@ class plot_param:
def get_index(self, *values):
indices = []
for i,value in enumerate(values):
for i, value in enumerate(values):
xrange = self.xrange(i)
index = int((value-xrange[0]) / (xrange[1]-xrange[0]) * self.shape[i])
if index < 0 or index >= self.shape[i]:
......@@ -99,7 +102,7 @@ class live_plot(QThread):
# getter for the scan.
self.parameter_getter = parameter_getter
self.plot_params = [plot_param(parameter_getter, i) for i in range(self.n_plots)]
self.shape = parameter_getter.shapes[0] #assume all the shapes are the same.
self.shape = parameter_getter.shapes[0] # assume all the shapes are the same.
self.plot_widgets = []
# plot properties
......@@ -184,8 +187,8 @@ class live_plot(QThread):
self.active = False
self.set_busy(False)
while self.plt_finished != True:
time.sleep(0.01) # 10 ms interval to make sure gil releases.
while not self.plt_finished:
time.sleep(0.01) # 10 ms interval to make sure gil releases.
self.timer.stop()
self.update_plot()
......@@ -257,7 +260,7 @@ class _1D_live_plot(live_plot):
xrange = param.xrange(0)[1]
self.x_data = np.linspace(-xrange, xrange, self.plot_data[i].size)
curve = plot_1D.plot(self.x_data, self.plot_data[i], pen=(255,0,0))
curve = plot_1D.plot(self.x_data, self.plot_data[i], pen=(255, 0, 0))
plot_data = plot_widget_data(plot_1D, [curve])
plot_data.proxy = pg.SignalProxy(plot_1D.scene().sigMouseMoved, rateLimit=10,
slot=partial(self.mouse_moved, plot_1D, i))
......@@ -293,7 +296,7 @@ class _1D_live_plot(live_plot):
x, ix = self._get_plot_coords(plot, index, event[0])
if ix is None:
return
v = self.plot_data[index][ix] # TODO @@@ check with diff ...
v = self.plot_data[index][ix] # TODO @@@ check with diff ...
if self._on_mouse_moved:
plot_param = self.plot_params[index]
self._on_mouse_moved(x, plot_param.name, v)
......@@ -317,7 +320,7 @@ class _1D_live_plot(live_plot):
x_voltage_str = self._format_dc_voltage(self.gate_x_voltage)
self.gate_values_label.setText(
f'DC {gate_x}:{x_voltage_str}')
except:
except Exception:
logger.error('Plotting failed', exc_info=True)
# slow down to reduce error burst
time.sleep(1.0)
......@@ -338,7 +341,7 @@ class _1D_live_plot(live_plot):
for i in range(self.n_plots):
buffer_data = self.buffer_data[i]
y = input_data[i]
buffer_data = np.roll(buffer_data,1,0)
buffer_data = np.roll(buffer_data, 1, 0)
buffer_data[0] = y
self.buffer_data[i] = buffer_data
self.plot_data[i] = np.sum(buffer_data[:self.average_scans], 0)/self.average_scans
......@@ -384,15 +387,15 @@ class _2D_live_plot(live_plot):
plot_2D.setTitle(param.label, size='10pt')
min_max = pg.LabelItem(parent=plot_2D.graphicsItem())
min_max.anchor(itemPos=(1,0), parentPos=(1,0))
min_max.anchor(itemPos=(1, 0), parentPos=(1, 0))
self.min_max.append(min_max)
icol = i % n_col
irow = i // n_col
self.top_layout.addWidget(plot_2D, irow, icol, 1, 1)
range0 = param.xrange(0)[1] # y value
range1 = param.xrange(1)[1] # x value
range0 = param.xrange(0)[1] # y value
range1 = param.xrange(1)[1] # x value
shape = param.shape
tr = QtGui.QTransform()
tr.translate(-range1, -range0)
......@@ -445,8 +448,8 @@ class _2D_live_plot(live_plot):
if plot.sceneBoundingRect().contains(coordinates):
# filter on min/max
mouse_point = plot.plotItem.vb.mapSceneToView(coordinates)
x,y = mouse_point.x(), mouse_point.y()
iy,ix = self.plot_params[index].get_index(y,x)
x, y = mouse_point.x(), mouse_point.y()
iy, ix = self.plot_params[index].get_index(y, x)
if iy is not None and ix is not None:
return x, y, ix, iy
return None, None, None, None
......@@ -466,7 +469,7 @@ class _2D_live_plot(live_plot):
x, y, ix, iy = self._get_plot_coords(plot, index, event[0])
if iy is None or ix is None:
return
v = self.plot_data[index][ix,iy]
v = self.plot_data[index][ix, iy]
if self._on_mouse_moved:
plot_param = self.plot_params[index]
self._on_mouse_moved(x, y, plot_param.name, v)
......@@ -498,16 +501,16 @@ class _2D_live_plot(live_plot):
plot_data = self.plot_data[i]
if self._filter_background:
sigma = self.plot_params[i].shape[0] * self._background_rel_sigma
plot_data = plot_data - ndimage.gaussian_filter(plot_data, sigma, mode = 'nearest')
plot_data = plot_data - ndimage.gaussian_filter(plot_data, sigma, mode='nearest')
if self._filter_noise:
plot_data = ndimage.gaussian_filter(plot_data, self._noise_sigma, mode = 'nearest')
plot_data = ndimage.gaussian_filter(plot_data, self._noise_sigma, mode='nearest')
if self.gradient == 'Off':
if self.enhanced_contrast:
plot_data = compress_range(plot_data, upper=99.5, lower=0.5)
mn, mx = np.min(plot_data), np.max(plot_data)
self.min_max[i].setText(f"min:{mn:4.0f} mV<br/>max:{mx:4.0f} mV")
if color_bar:
color_bar.setLevels(values=(mn,mx))
color_bar.setLevels(values=(mn, mx))
if img_item.lut is None:
img_item.setLookupTable(color_bar.colorMap().getLookupTable())
else:
......@@ -521,7 +524,7 @@ class _2D_live_plot(live_plot):
mn, mx = np.min(plot_data), np.max(plot_data)
self.min_max[i].setText(f"min:{mn:4.0f} a.u.<br/>max:{mx:4.0f} a.u.")
if color_bar:
color_bar.setLevels(values=(mn,mx))
color_bar.setLevels(values=(mn, mx))
if img_item.lut is None:
img_item.setLookupTable(color_bar.colorMap().getLookupTable())
else:
......@@ -571,7 +574,7 @@ class _2D_live_plot(live_plot):
for i in range(self.n_plots):
buffer_data = self.buffer_data[i]
xy = input_data[i][:, :].T
buffer_data = np.roll(buffer_data,1,0)
buffer_data = np.roll(buffer_data, 1, 0)
buffer_data[0] = xy
self.buffer_data[i] = buffer_data
self.plot_data[i] = np.sum(buffer_data[:self.average_scans], 0)/self.average_scans
......@@ -587,5 +590,3 @@ class _2D_live_plot(live_plot):
time.sleep(1.0)
self.plt_finished = True
from typing import Any
from core_tools.GUI.param_viewer.param_viewer_GUI_window import Ui_MainWindow
from PyQt5 import QtCore, QtWidgets
import qcodes as qc
......@@ -11,9 +12,9 @@ logger = logging.getLogger(__name__)
@dataclass
class param_data_obj:
param_parameter: any
gui_input_param: any
division: any
param_parameter: Any
gui_input_param: Any
division: Any
name: str
......
......@@ -2,14 +2,27 @@ import inspect
import logging
import os
from enum import Enum
from typing import Any
from abc import ABC, abstractmethod
try:
from spyder_kernels.customize.spydercustomize import runcell
runcell_version = 5
except Exception:
runcell = None
try:
from IPython import get_ipython # pyright: ignore
ipython = get_ipython()
if ipython is not None:
runcell = ipython.magics_manager.magics['line']['runcell']
runcell_version = 6
except KeyError:
# no runcell magic
pass
logger = logging.getLogger(__name__)
......@@ -48,10 +61,20 @@ class Cell(Command):
raise Exception('runcell not available. Upgrade to Spyder 4+ to use Cell()')
def __call__(self):
command = f"runcell({self.cell}, '{self.python_file}')"
# print(command)
logger.info(command)
runcell(self.cell, self.python_file)
if runcell_version == 5:
command = f"runcell({self.cell}, '{self.python_file}')"
print(command)
logger.info(command)
runcell(self.cell, self.python_file)
elif runcell_version == 6:
if isinstance(self.cell, int):
command = f"-i {self.cell} {self.python_file}"
else:
command = f"-n '{self.cell}' {self.python_file}"
print("runcell", command)
logger.info(f"runcell {command}")
result = runcell(command)
print(result)
class Function(Command):
......@@ -74,7 +97,7 @@ class Function(Command):
kwargs: default arguments to pass with function.
'''
def __init__(self, func: any, command_name: str | None = None, **kwargs):
def __init__(self, func: Any, command_name: str | None = None, **kwargs):
if command_name is None:
command_name = func.__name__
signature = inspect.signature(func)
......
......@@ -2,6 +2,7 @@ import logging
import inspect
import os
from enum import Enum
from typing import Any
from PyQt5 import QtCore, QtWidgets
......@@ -63,7 +64,7 @@ class ScriptRunner(QtWidgets.QMainWindow, Ui_MainWindow):
if not instance_ready:
self.app.exec()
def add_function(self, func: any, command_name: str | None = None, **kwargs):
def add_function(self, func: Any, command_name: str | None = None, **kwargs):
'''
Adds a function to be run as command in ScriptRunner.
......@@ -228,7 +229,7 @@ if __name__ == "__main__":
def fit(x: float, mode: Mode):
print(f'fit {x}, {mode}')
path = os.path.dirname(__file__)
path = os.path.dirname(__file__).replace('\\', '/')
ui = ScriptRunner()
ui.add_function(sayHi)
......@@ -237,11 +238,11 @@ if __name__ == "__main__":
ui.add_function(fit, 'Fit it', mode=Mode.CENTER, x=1.0)
ui.add_function(fit, 'Fit it', mode='center', x=1.0)
ui.add_function(fit, 'Fit it', mode='CENTER', x=1.0)
# ui.add_cell('Say Hi', path+'/test_script.py')
# ui.add_cell(2, path+'/test_script.py', 'Magic Button')
# ui.add_cell('Oops', path+'/test_script.py')
# ui.add_cell('Syntax Error', path+'/test_script.py')
ui.add_cell('Say Hi', path+'/test_script.py')
ui.add_cell(2, path+'/test_script.py', 'Magic Button')
ui.add_cell('Oops', path+'/test_script.py')
ui.add_cell('Syntax Error', path+'/test_script.py')
# NOTE:
# To start servicing http requests run:
ui.run_server()
# ui.run_server()
......@@ -108,8 +108,9 @@ class WebRequestHandler(BaseHTTPRequestHandler):
)
def run_web_server(command_list: list[Command]):
server_address = ('127.0.0.1', 8001)
def run_web_server(command_list: list[Command], server_address: tuple[str, int] | None = None):
if server_address is None:
server_address = ('0.0.0.0', 8001)
httpd = HTTPServer(server_address, WebRequestHandler)
httpd.commands = command_list
try:
......
......@@ -10,4 +10,4 @@ from core_tools.startup.gui import (
start_script_runner,
)
__version__ = "1.5.11"
__version__ = "1.5.12"
import logging
from dataclasses import dataclass
from typing import Any
import numpy as np
from numpy import ndarray
......@@ -41,7 +42,7 @@ class _Action:
class DataWriter:
def __init__(self, name, *args, snapshot_data: dict[str, any] | None = None):
def __init__(self, name, *args, snapshot_data: dict[str, Any] | None = None):
self._measurement = Measurement(name, silent=True)
self._actions = []
self._set_params = []
......@@ -103,7 +104,7 @@ class DataWriter:
self._loop(iaction + 1, isetpoint)
def write_data(name: str, *args, snapshot_data: dict[str, any] | None = None):
def write_data(name: str, *args, snapshot_data: dict[str, Any] | None = None):
'''
Creates a dataset `name` using the specified Axis and Data.
......
import json
import logging
from typing import Any
import qcodes as qc
from qcodes.utils.helpers import NumpyJSONEncoder
......@@ -100,7 +101,7 @@ def create_new_data_set(experiment_name, measurement_snapshot, *m_params):
return data_set(ds)
def _reduce_snapshot(snapshot: dict[str, any]):
def _reduce_snapshot(snapshot: dict[str, Any]):
if "__class__" in snapshot:
exclude_keys = [
"__class__",
......
......@@ -15,6 +15,7 @@ class gates(qc.Instrument):
gates class, generate qcodes parameters for the real gates and the virtual gates
It also manages the virtual gate matrix.
"""
def __init__(self, name, hardware, dac_sources, dc_gain={}):
'''
gates object
......@@ -52,9 +53,9 @@ class gates(qc.Instrument):
self._dac_params[gate_name] = dac_sources[source_index].parameters[f'dac{int(ch_num)}']
self._all_gate_names.append(gate_name)
self._real_gates.append(gate_name)
self.add_parameter(gate_name, set_cmd = partial(self._set_voltage, gate_name),
self.add_parameter(gate_name, set_cmd=partial(self._set_voltage, gate_name),
get_cmd=partial(self._get_voltage, gate_name),
unit = "mV")
unit="mV")
# make virtual gates:
for virt_gate_set in self.hardware.virtual_gates:
......@@ -135,7 +136,7 @@ class gates(qc.Instrument):
self.parameters[real_gate].set(old_voltages[real_gate] + ratio * delta)
except Exception as ex:
logger.warning(f'Failed to set virtual gate voltage to {voltage:.1f} mV; Reverting all voltages. '
f'Exception: {ex}')
f'Exception: {ex}')
for real_gate, ratio in projection[gate_name].items():
self.set(real_gate, old_voltages[real_gate])
raise
......@@ -199,7 +200,7 @@ class gates(qc.Instrument):
for virt_gate_convertor in self._virt_gate_convertors:
real_voltages = [v[name] for name in virt_gate_convertor.real_gates]
virtual_voltages = np.matmul(virt_gate_convertor.r2v_matrix, real_voltages)
virtual_voltages = np.matmul(virt_gate_convertor.r2v_matrix, real_voltages)
for vg_name, vg_voltage in zip(virt_gate_convertor.virtual_gates, virtual_voltages):
v[vg_name] = vg_voltage
self.parameters[vg_name].cache.set(vg_voltage)
......
import numpy as np
class VirtualGateMatrixView:
'''
Data to convert real gate voltages to virtual gate voltages and v.v.
......@@ -10,6 +11,7 @@ class VirtualGateMatrixView:
virtual_gates (list[str]): names of virtual gates
r2v_matrix (2D array-like): matrix to convert voltages of real gates to voltages of virtual gates.
'''
def __init__(self, name, real_gates, virtual_gates, r2v_matrix, indices):
self.name = name
self._real_gates = real_gates
......@@ -34,7 +36,7 @@ class VirtualGateMatrixView:
@property
def r2v_matrix(self):
# note: self._r2v_matrix may be changed externally. Create indexed copy here.
r2v_matrix = self._r2v_matrix[self._indices][:,self._indices]
r2v_matrix = self._r2v_matrix[self._indices][:, self._indices]
return r2v_matrix
......@@ -112,16 +114,16 @@ class VirtualGateMatrix:
def get_element(self, i, j, v2r=True):
if v2r:
return self._v2r_matrix[i,j]
return self._v2r_matrix[i, j]
else:
return self._r2v_matrix[i,j]
return self._r2v_matrix[i, j]
def set_element(self, i, j, value, v2r=True):
if v2r:
self._v2r_matrix[i,j] = value
self._v2r_matrix[i, j] = value
self._r2v_matrix[:] = np.linalg.inv(self._v2r_matrix)
else:
self._r2v_matrix[i,j] = value
self._r2v_matrix[i, j] = value
self._v2r_matrix[:] = np.linalg.inv(self._r2v_matrix)
self._calc_normalized()
......@@ -145,7 +147,7 @@ class VirtualGateMatrix:
if self._normalization:
# divide rows by diagonal value
norm = no_norm/np.diag(no_norm)[:,None]
norm = no_norm/np.diag(no_norm)[:, None]
else:
norm = no_norm
......@@ -156,7 +158,7 @@ class VirtualGateMatrix:
real_gate_names = []
virtual_gate_names = []
for i,name in enumerate(self.real_gate_names):
for i, name in enumerate(self.real_gate_names):
if name in available_gates:
gate_indices.append(i)
real_gate_names.append(name)
......@@ -165,6 +167,5 @@ class VirtualGateMatrix:
return VirtualGateMatrixView(self.name,
real_gate_names,
virtual_gate_names,
self._norm_r2v_matrix,
self._r2v_matrix,
gate_indices)
from dataclasses import dataclass, field
from typing import Any
import time
import threading
......@@ -8,7 +9,7 @@ from queue import PriorityQueue
@dataclass(order=True)
class ExperimentJob:
priority: float
job: any = field(compare=False)
job: Any = field(compare=False)
seq_nr: int = 0
def __post_init__(self):
......
......@@ -2,6 +2,7 @@ import logging
import time
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Any
import numpy as np
from qcodes import Parameter
......@@ -239,7 +240,7 @@ class _MParam:
class _Block:
setter: Setter | None = None
value: float | None = None
actions: list[any] = field(default_factory=list)
actions: list[Any] = field(default_factory=list)
@property
def loop_length(self):
......
from __future__ import annotations
from dataclasses import dataclass
from qcodes import Parameter
import numpy as np
FAST = "FAST"
SLOW = "SLOW"
# WARNING: Mode Fast sometimes gives corrupted data on Keysight.
# Mode Fast does not work with any of the other backends.
MODE = SLOW
class PulseLibParameter(Parameter):
def add_setpoints(self, setpoints, sequencer, lowest_level):
self.flat_index = 0
self.setpoints = setpoints
self.sequencer = sequencer
self.lowest_level = lowest_level
def get_raw(self):
current_val = self.setpoints[self.flat_index%len(self.setpoints)]
self.flat_index += 1
if self.flat_index >= np.prod(self.sequencer.shape):
self.flat_index = 0
return current_val
def set_raw(self, value):
if self.lowest_level:
if self.flat_index == 0 and hasattr(self.sequencer, 'starting_lambda'):
# TODO: What is this starting_lambda hack??
self.sequencer.starting_lambda(self.sequencer)
if MODE == SLOW or self.flat_index == 0:
self.sequencer.upload(np.unravel_index(self.flat_index, self.sequencer.shape))
# TODO: Change to loop using sequencer.params
index = np.unravel_index(self.flat_index, self.sequencer.shape)
self.sequencer.play(index, release=True)
if hasattr(self.sequencer, 'm_param'):
# NOTE: This is a hack to set the index for measurement_converter
self.sequencer.m_param.setIndex(tuple(index))
if MODE == SLOW:
# Wait is not needed for Keysight, because the digitizer call is blocking.
# self.sequencer.uploader.wait_until_AWG_idle()
pass
if MODE==FAST and self.flat_index < np.prod(self.sequencer.shape) - 1:
# WARNING: upload during play regularly results in corrupt data in Keysight.
self.sequencer.upload(np.unravel_index(self.flat_index+1, self.sequencer.shape))
from qcodes import Parameter
from pulse_lib.sequencer import index_param
class SequenceStartAction:
......@@ -58,54 +12,49 @@ class SequenceStartAction:
def __call__(self):
sequence = self._sequence
if hasattr(sequence, 'starting_lambda'):
sequence.starting_lambda(sequence)
sequence.upload((0, ))
sequence.play((0, ))
if hasattr(sequence, 'm_param'):
sequence.m_param.setIndex((0, ))
sequence.upload()
sequence.play()
def pulselib_2_qcodes(awg_sequence):
def get_pulselib_sweeps(sequence) -> list[sweep_info]:
'''
convert pulse sequencer object in qcodes parameters that are usable in sweeps.
Returns sweep parameters for the axes in the pulse sequence.
Args:
awg_sequence (pulselib.sequencer.sequencer) : sequence object
sequence (pulselib.sequencer.sequencer) : sequence object
Returns:
set_param (list<PulseLibParameter>) : set paramters for the pulselib to be used in the sweep
list[sweep_info] : sweeps along the axes of the sequence.
'''
set_param = list()
if awg_sequence.shape == (1,):
return set_param
for i in range(len(awg_sequence.shape)):
# TODO: Use sequencer.params
param = PulseLibParameter(name=awg_sequence.labels[i].replace(" ", "_"),
label=awg_sequence.labels[i],
unit=awg_sequence.units[i])
param.add_setpoints(awg_sequence.setpoints[i], awg_sequence, False)
set_param.append(sweep_info(param, n_points = len(awg_sequence.setpoints[i])))
set_params = []
if sequence.shape == (1,):
return set_params
seq_params = sequence.params
# Note: reverse order, because axis=0 is fastest running and must thus be last.
for param in seq_params[::-1]:
sweep = sweep_info(param)
sweep.set_values(param.values)
set_params.append(sweep)
set_param[0].param.lowest_level=True
return set_param[::-1]
return set_params
@dataclass
class sweep_info():
class sweep_info:
'''
data class that hold the sweep info for one of the paramters.
'''
param : Parameter = None
start : float = 0
stop : float = 0
n_points : int = 50
delay : float = 0
param: Parameter = None
start: float = 0
stop: float = 0
n_points: int = 50
delay: float = 0
def __post_init__(self):
self._values = None
self.original_value = None
if not isinstance(self.param, PulseLibParameter):
if not isinstance(self.param, index_param):
self.original_value = self.param()
def reset_param(self):
......@@ -121,6 +70,3 @@ class sweep_info():
return np.linspace(self.start, self.stop, self.n_points)
else:
return self._values
def check_OD_scan(sequence, minstr):
raise Exception('This function was broken beyond repair. Do not use it. [SdS]')