Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • qutech-qdlabs/core_tools
1 result
Show changes
Commits on Source (2)
# Changelog
All notable changes to core_tools will be documented in this file.
## \[1.5.12] - 2025-03-07
- Video mode: Fix general settings in favorites
- "gates" object now uses virtual matrix as shown in GUI and not the normalized matrix.
## \[1.5.11] - 2025-02-20
- Added snapshot data_writer
......
from __future__ import annotations
from dataclasses import dataclass
from qcodes import Parameter
import numpy as np
FAST = "FAST"
SLOW = "SLOW"
# WARNING: Mode Fast sometimes gives corrupted data on Keysight.
# Mode Fast does not work with any of the other backends.
MODE = SLOW
class PulseLibParameter(Parameter):
def add_setpoints(self, setpoints, sequencer, lowest_level):
self.flat_index = 0
self.setpoints = setpoints
self.sequencer = sequencer
self.lowest_level = lowest_level
def get_raw(self):
current_val = self.setpoints[self.flat_index%len(self.setpoints)]
self.flat_index += 1
if self.flat_index >= np.prod(self.sequencer.shape):
self.flat_index = 0
return current_val
def set_raw(self, value):
if self.lowest_level:
if self.flat_index == 0 and hasattr(self.sequencer, 'starting_lambda'):
# TODO: What is this starting_lambda hack??
self.sequencer.starting_lambda(self.sequencer)
if MODE == SLOW or self.flat_index == 0:
self.sequencer.upload(np.unravel_index(self.flat_index, self.sequencer.shape))
# TODO: Change to loop using sequencer.params
index = np.unravel_index(self.flat_index, self.sequencer.shape)
self.sequencer.play(index, release=True)
if hasattr(self.sequencer, 'm_param'):
# NOTE: This is a hack to set the index for measurement_converter
self.sequencer.m_param.setIndex(tuple(index))
if MODE == SLOW:
# Wait is not needed for Keysight, because the digitizer call is blocking.
# self.sequencer.uploader.wait_until_AWG_idle()
pass
if MODE==FAST and self.flat_index < np.prod(self.sequencer.shape) - 1:
# WARNING: upload during play regularly results in corrupt data in Keysight.
self.sequencer.upload(np.unravel_index(self.flat_index+1, self.sequencer.shape))
from qcodes import Parameter
from pulse_lib.sequencer import index_param
class SequenceStartAction:
......@@ -58,54 +12,49 @@ class SequenceStartAction:
def __call__(self):
sequence = self._sequence
if hasattr(sequence, 'starting_lambda'):
sequence.starting_lambda(sequence)
sequence.upload((0, ))
sequence.play((0, ))
if hasattr(sequence, 'm_param'):
sequence.m_param.setIndex((0, ))
sequence.upload()
sequence.play()
def pulselib_2_qcodes(awg_sequence):
def get_pulselib_sweeps(sequence) -> list[sweep_info]:
'''
convert pulse sequencer object in qcodes parameters that are usable in sweeps.
Returns sweep parameters for the axes in the pulse sequence.
Args:
awg_sequence (pulselib.sequencer.sequencer) : sequence object
sequence (pulselib.sequencer.sequencer) : sequence object
Returns:
set_param (list<PulseLibParameter>) : set paramters for the pulselib to be used in the sweep
list[sweep_info] : sweeps along the axes of the sequence.
'''
set_param = list()
if awg_sequence.shape == (1,):
return set_param
for i in range(len(awg_sequence.shape)):
# TODO: Use sequencer.params
param = PulseLibParameter(name=awg_sequence.labels[i].replace(" ", "_"),
label=awg_sequence.labels[i],
unit=awg_sequence.units[i])
param.add_setpoints(awg_sequence.setpoints[i], awg_sequence, False)
set_param.append(sweep_info(param, n_points = len(awg_sequence.setpoints[i])))
set_params = []
if sequence.shape == (1,):
return set_params
seq_params = sequence.params
# Note: reverse order, because axis=0 is fastest running and must thus be last.
for param in seq_params[::-1]:
sweep = sweep_info(param)
sweep.set_values(param.values)
set_params.append(sweep)
set_param[0].param.lowest_level=True
return set_param[::-1]
return set_params
@dataclass
class sweep_info():
class sweep_info:
'''
data class that hold the sweep info for one of the paramters.
'''
param : Parameter = None
start : float = 0
stop : float = 0
n_points : int = 50
delay : float = 0
param: Parameter = None
start: float = 0
stop: float = 0
n_points: int = 50
delay: float = 0
def __post_init__(self):
self._values = None
self.original_value = None
if not isinstance(self.param, PulseLibParameter):
if not isinstance(self.param, index_param):
self.original_value = self.param()
def reset_param(self):
......@@ -121,6 +70,3 @@ class sweep_info():
return np.linspace(self.start, self.stop, self.n_points)
else:
return self._values
def check_OD_scan(sequence, minstr):
raise Exception('This function was broken beyond repair. Do not use it. [SdS]')
import logging
from qcodes.instrument.specialized_parameters import ElapsedTimeParameter
from core_tools.data.measurement import Measurement, AbortMeasurement
from pulse_lib.sequencer import sequencer
import time
from core_tools.data.measurement import Measurement, AbortMeasurement
from core_tools.sweeps.sweep_utility import (
SequenceStartAction,
pulselib_2_qcodes, sweep_info
get_pulselib_sweeps, sweep_info
)
from core_tools.job_mgnt.job_meta import job_meta
from core_tools.job_mgnt.job_mgmt import queue_mgr, ExperimentJob
import numpy as np
import time
from qcodes.instrument.specialized_parameters import ElapsedTimeParameter
from pulse_lib.sequencer import sequencer
logger = logging.getLogger(__name__)
class scan_generic(metaclass=job_meta):
'''
function that handeles the loop action and defines the run class.
'''
def __init__(self, *args, name='', reset_param=False, silent=False):
'''
init of the scan function
Args:
args (*list) : provide here the sweep info and meaurment parameters
args (list) : sweep info and meaurment parameters
reset_param (bool) : reset the setpoint parametes to their original value after the meaurement
silent (bool) : If True do not print dataset id and progress bar
'''
......@@ -46,16 +41,16 @@ class scan_generic(metaclass=job_meta):
self.set_vars.append(arg)
set_points.append(arg.param)
elif isinstance(arg, sequencer):
if arg.shape != (1, ):
set_vars_pulse_lib = pulselib_2_qcodes(arg)
for var in set_vars_pulse_lib:
self.meas.register_set_parameter(var.param, var.n_points)
self.set_vars.append(var)
set_points.append(var.param)
else:
# Sequence without looping parameters. Only upload, no setpoints
self.actions.append(SequenceStartAction(arg))
self.meas.add_snapshot('sequence', arg.metadata)
sequence = arg
if hasattr(sequence, 'starting_lambda'):
raise Exception("starting_lambda is not supported anymore")
sweeps_pulse_lib = get_pulselib_sweeps(sequence)
for var in sweeps_pulse_lib:
self.meas.register_set_parameter(var.param, var.n_points)
self.set_vars.append(var)
set_points.append(var.param)
self.actions.append(SequenceStartAction(sequence))
self.meas.add_snapshot('sequence', sequence.metadata)
elif arg is None:
continue
else:
......@@ -104,12 +99,12 @@ class scan_generic(metaclass=job_meta):
for param in self.set_vars:
try:
param.reset_param()
except:
except Exception:
logger.error(f'Failed to reset parameter {param.param.name}')
return self.meas.dataset
def put(self, priority = 1):
def put(self, priority=1):
'''
put the job in a queue.
'''
......@@ -120,8 +115,8 @@ class scan_generic(metaclass=job_meta):
def abort_measurement(self):
self.meas.abort()
def _loop(self, set_param, to_save, dataset):
if len(set_param) == 0:
def _loop(self, set_params, to_save, dataset):
if len(set_params) == 0:
for action in self.actions:
action()
m_data = []
......@@ -131,12 +126,13 @@ class scan_generic(metaclass=job_meta):
dataset.add_result(*to_save, *m_data)
self.n += 1
else:
param_info = set_param[0]
param_info = set_params[0]
for value in param_info.values():
if not isinstance(param_info.param, ElapsedTimeParameter):
param_info.param(value)
param = param_info.param
if not isinstance(param, ElapsedTimeParameter):
param(value)
time.sleep(param_info.delay)
self._loop(set_param[1:], to_save + ((param_info.param, param_info.param()),), dataset)
self._loop(set_params[1:], to_save + ((param, param()),), dataset)
def do0D(*m_instr, name='', silent=False):
......@@ -163,12 +159,12 @@ def do1D(param, start, stop, n_points, delay, *m_instr, name='', reset_param=Fal
silent (bool) : If True do not print dataset id and progress bar
'''
m_param = sweep_info(param, start, stop, n_points, delay)
return scan_generic(m_param, *m_instr,name=name, reset_param=reset_param, silent=silent)
return scan_generic(m_param, *m_instr, name=name, reset_param=reset_param, silent=silent)
def do2D(param_1, start_1, stop_1, n_points_1, delay_1,
param_2, start_2, stop_2, n_points_2, delay_2, *m_instr, name='',
reset_param=False, silent=False):
param_2, start_2, stop_2, n_points_2, delay_2, *m_instr, name='',
reset_param=False, silent=False):
'''
do a 2D scan
......