From e4ddc0dcbc4944ec591456ccf3e33c68ffe6cb37 Mon Sep 17 00:00:00 2001 From: sldesnoo-Delft <s.l.desnoo@tudelft.nl> Date: Fri, 7 Mar 2025 09:40:36 +0100 Subject: [PATCH] Cleanup of sweeps --- core_tools/sweeps/sweep_utility.py | 108 ++++++++--------------------- core_tools/sweeps/sweeps.py | 62 ++++++++--------- 2 files changed, 56 insertions(+), 114 deletions(-) diff --git a/core_tools/sweeps/sweep_utility.py b/core_tools/sweeps/sweep_utility.py index 38a53fe9..94ae8407 100644 --- a/core_tools/sweeps/sweep_utility.py +++ b/core_tools/sweeps/sweep_utility.py @@ -1,55 +1,9 @@ +from __future__ import annotations from dataclasses import dataclass -from qcodes import Parameter import numpy as np - -FAST = "FAST" -SLOW = "SLOW" - -# WARNING: Mode Fast sometimes gives corrupted data on Keysight. -# Mode Fast does not work with any of the other backends. -MODE = SLOW - - -class PulseLibParameter(Parameter): - - def add_setpoints(self, setpoints, sequencer, lowest_level): - self.flat_index = 0 - self.setpoints = setpoints - self.sequencer = sequencer - self.lowest_level = lowest_level - - def get_raw(self): - current_val = self.setpoints[self.flat_index%len(self.setpoints)] - - self.flat_index += 1 - if self.flat_index >= np.prod(self.sequencer.shape): - self.flat_index = 0 - - return current_val - - def set_raw(self, value): - if self.lowest_level: - if self.flat_index == 0 and hasattr(self.sequencer, 'starting_lambda'): - # TODO: What is this starting_lambda hack?? - self.sequencer.starting_lambda(self.sequencer) - - if MODE == SLOW or self.flat_index == 0: - self.sequencer.upload(np.unravel_index(self.flat_index, self.sequencer.shape)) - - # TODO: Change to loop using sequencer.params - index = np.unravel_index(self.flat_index, self.sequencer.shape) - self.sequencer.play(index, release=True) - if hasattr(self.sequencer, 'm_param'): - # NOTE: This is a hack to set the index for measurement_converter - self.sequencer.m_param.setIndex(tuple(index)) - if MODE == SLOW: - # Wait is not needed for Keysight, because the digitizer call is blocking. - # self.sequencer.uploader.wait_until_AWG_idle() - pass - if MODE==FAST and self.flat_index < np.prod(self.sequencer.shape) - 1: - # WARNING: upload during play regularly results in corrupt data in Keysight. - self.sequencer.upload(np.unravel_index(self.flat_index+1, self.sequencer.shape)) +from qcodes import Parameter +from pulse_lib.sequencer import index_param class SequenceStartAction: @@ -58,54 +12,49 @@ class SequenceStartAction: def __call__(self): sequence = self._sequence - if hasattr(sequence, 'starting_lambda'): - sequence.starting_lambda(sequence) - sequence.upload((0, )) - sequence.play((0, )) - if hasattr(sequence, 'm_param'): - sequence.m_param.setIndex((0, )) + sequence.upload() + sequence.play() -def pulselib_2_qcodes(awg_sequence): +def get_pulselib_sweeps(sequence) -> list[sweep_info]: ''' - convert pulse sequencer object in qcodes parameters that are usable in sweeps. + Returns sweep parameters for the axes in the pulse sequence. Args: - awg_sequence (pulselib.sequencer.sequencer) : sequence object + sequence (pulselib.sequencer.sequencer) : sequence object Returns: - set_param (list<PulseLibParameter>) : set paramters for the pulselib to be used in the sweep + list[sweep_info] : sweeps along the axes of the sequence. ''' - set_param = list() - if awg_sequence.shape == (1,): - return set_param - for i in range(len(awg_sequence.shape)): - # TODO: Use sequencer.params - param = PulseLibParameter(name=awg_sequence.labels[i].replace(" ", "_"), - label=awg_sequence.labels[i], - unit=awg_sequence.units[i]) - param.add_setpoints(awg_sequence.setpoints[i], awg_sequence, False) - set_param.append(sweep_info(param, n_points = len(awg_sequence.setpoints[i]))) + set_params = [] + if sequence.shape == (1,): + return set_params + seq_params = sequence.params + + # Note: reverse order, because axis=0 is fastest running and must thus be last. + for param in seq_params[::-1]: + sweep = sweep_info(param) + sweep.set_values(param.values) + set_params.append(sweep) - set_param[0].param.lowest_level=True - return set_param[::-1] + return set_params @dataclass -class sweep_info(): +class sweep_info: ''' data class that hold the sweep info for one of the paramters. ''' - param : Parameter = None - start : float = 0 - stop : float = 0 - n_points : int = 50 - delay : float = 0 + param: Parameter = None + start: float = 0 + stop: float = 0 + n_points: int = 50 + delay: float = 0 def __post_init__(self): self._values = None self.original_value = None - if not isinstance(self.param, PulseLibParameter): + if not isinstance(self.param, index_param): self.original_value = self.param() def reset_param(self): @@ -121,6 +70,3 @@ class sweep_info(): return np.linspace(self.start, self.stop, self.n_points) else: return self._values - -def check_OD_scan(sequence, minstr): - raise Exception('This function was broken beyond repair. Do not use it. [SdS]') diff --git a/core_tools/sweeps/sweeps.py b/core_tools/sweeps/sweeps.py index 5f63e1b5..27d9f0a3 100644 --- a/core_tools/sweeps/sweeps.py +++ b/core_tools/sweeps/sweeps.py @@ -1,32 +1,27 @@ import logging -from qcodes.instrument.specialized_parameters import ElapsedTimeParameter -from core_tools.data.measurement import Measurement, AbortMeasurement -from pulse_lib.sequencer import sequencer +import time +from core_tools.data.measurement import Measurement, AbortMeasurement from core_tools.sweeps.sweep_utility import ( SequenceStartAction, - pulselib_2_qcodes, sweep_info + get_pulselib_sweeps, sweep_info ) from core_tools.job_mgnt.job_meta import job_meta from core_tools.job_mgnt.job_mgmt import queue_mgr, ExperimentJob - -import numpy as np -import time +from qcodes.instrument.specialized_parameters import ElapsedTimeParameter +from pulse_lib.sequencer import sequencer logger = logging.getLogger(__name__) class scan_generic(metaclass=job_meta): - ''' - function that handeles the loop action and defines the run class. - ''' + def __init__(self, *args, name='', reset_param=False, silent=False): ''' - init of the scan function Args: - args (*list) : provide here the sweep info and meaurment parameters + args (list) : sweep info and meaurment parameters reset_param (bool) : reset the setpoint parametes to their original value after the meaurement silent (bool) : If True do not print dataset id and progress bar ''' @@ -46,16 +41,16 @@ class scan_generic(metaclass=job_meta): self.set_vars.append(arg) set_points.append(arg.param) elif isinstance(arg, sequencer): - if arg.shape != (1, ): - set_vars_pulse_lib = pulselib_2_qcodes(arg) - for var in set_vars_pulse_lib: - self.meas.register_set_parameter(var.param, var.n_points) - self.set_vars.append(var) - set_points.append(var.param) - else: - # Sequence without looping parameters. Only upload, no setpoints - self.actions.append(SequenceStartAction(arg)) - self.meas.add_snapshot('sequence', arg.metadata) + sequence = arg + if hasattr(sequence, 'starting_lambda'): + raise Exception("starting_lambda is not supported anymore") + sweeps_pulse_lib = get_pulselib_sweeps(sequence) + for var in sweeps_pulse_lib: + self.meas.register_set_parameter(var.param, var.n_points) + self.set_vars.append(var) + set_points.append(var.param) + self.actions.append(SequenceStartAction(sequence)) + self.meas.add_snapshot('sequence', sequence.metadata) elif arg is None: continue else: @@ -104,12 +99,12 @@ class scan_generic(metaclass=job_meta): for param in self.set_vars: try: param.reset_param() - except: + except Exception: logger.error(f'Failed to reset parameter {param.param.name}') return self.meas.dataset - def put(self, priority = 1): + def put(self, priority=1): ''' put the job in a queue. ''' @@ -120,8 +115,8 @@ class scan_generic(metaclass=job_meta): def abort_measurement(self): self.meas.abort() - def _loop(self, set_param, to_save, dataset): - if len(set_param) == 0: + def _loop(self, set_params, to_save, dataset): + if len(set_params) == 0: for action in self.actions: action() m_data = [] @@ -131,12 +126,13 @@ class scan_generic(metaclass=job_meta): dataset.add_result(*to_save, *m_data) self.n += 1 else: - param_info = set_param[0] + param_info = set_params[0] for value in param_info.values(): - if not isinstance(param_info.param, ElapsedTimeParameter): - param_info.param(value) + param = param_info.param + if not isinstance(param, ElapsedTimeParameter): + param(value) time.sleep(param_info.delay) - self._loop(set_param[1:], to_save + ((param_info.param, param_info.param()),), dataset) + self._loop(set_params[1:], to_save + ((param, param()),), dataset) def do0D(*m_instr, name='', silent=False): @@ -163,12 +159,12 @@ def do1D(param, start, stop, n_points, delay, *m_instr, name='', reset_param=Fal silent (bool) : If True do not print dataset id and progress bar ''' m_param = sweep_info(param, start, stop, n_points, delay) - return scan_generic(m_param, *m_instr,name=name, reset_param=reset_param, silent=silent) + return scan_generic(m_param, *m_instr, name=name, reset_param=reset_param, silent=silent) def do2D(param_1, start_1, stop_1, n_points_1, delay_1, - param_2, start_2, stop_2, n_points_2, delay_2, *m_instr, name='', - reset_param=False, silent=False): + param_2, start_2, stop_2, n_points_2, delay_2, *m_instr, name='', + reset_param=False, silent=False): ''' do a 2D scan -- GitLab