Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • qutech-qdlabs/core_tools
1 result
Show changes
Commits on Source (2)
# Changelog # Changelog
All notable changes to core_tools will be documented in this file. All notable changes to core_tools will be documented in this file.
## \[1.5.12] - 2025-03-07
- Video mode: Fix general settings in favorites
- "gates" object now uses virtual matrix as shown in GUI and not the normalized matrix.
## \[1.5.11] - 2025-02-20 ## \[1.5.11] - 2025-02-20
- Added snapshot data_writer - Added snapshot data_writer
......
from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
from qcodes import Parameter
import numpy as np import numpy as np
from qcodes import Parameter
FAST = "FAST" from pulse_lib.sequencer import index_param
SLOW = "SLOW"
# WARNING: Mode Fast sometimes gives corrupted data on Keysight.
# Mode Fast does not work with any of the other backends.
MODE = SLOW
class PulseLibParameter(Parameter):
def add_setpoints(self, setpoints, sequencer, lowest_level):
self.flat_index = 0
self.setpoints = setpoints
self.sequencer = sequencer
self.lowest_level = lowest_level
def get_raw(self):
current_val = self.setpoints[self.flat_index%len(self.setpoints)]
self.flat_index += 1
if self.flat_index >= np.prod(self.sequencer.shape):
self.flat_index = 0
return current_val
def set_raw(self, value):
if self.lowest_level:
if self.flat_index == 0 and hasattr(self.sequencer, 'starting_lambda'):
# TODO: What is this starting_lambda hack??
self.sequencer.starting_lambda(self.sequencer)
if MODE == SLOW or self.flat_index == 0:
self.sequencer.upload(np.unravel_index(self.flat_index, self.sequencer.shape))
# TODO: Change to loop using sequencer.params
index = np.unravel_index(self.flat_index, self.sequencer.shape)
self.sequencer.play(index, release=True)
if hasattr(self.sequencer, 'm_param'):
# NOTE: This is a hack to set the index for measurement_converter
self.sequencer.m_param.setIndex(tuple(index))
if MODE == SLOW:
# Wait is not needed for Keysight, because the digitizer call is blocking.
# self.sequencer.uploader.wait_until_AWG_idle()
pass
if MODE==FAST and self.flat_index < np.prod(self.sequencer.shape) - 1:
# WARNING: upload during play regularly results in corrupt data in Keysight.
self.sequencer.upload(np.unravel_index(self.flat_index+1, self.sequencer.shape))
class SequenceStartAction: class SequenceStartAction:
...@@ -58,54 +12,49 @@ class SequenceStartAction: ...@@ -58,54 +12,49 @@ class SequenceStartAction:
def __call__(self): def __call__(self):
sequence = self._sequence sequence = self._sequence
if hasattr(sequence, 'starting_lambda'): sequence.upload()
sequence.starting_lambda(sequence) sequence.play()
sequence.upload((0, ))
sequence.play((0, ))
if hasattr(sequence, 'm_param'):
sequence.m_param.setIndex((0, ))
def pulselib_2_qcodes(awg_sequence): def get_pulselib_sweeps(sequence) -> list[sweep_info]:
''' '''
convert pulse sequencer object in qcodes parameters that are usable in sweeps. Returns sweep parameters for the axes in the pulse sequence.
Args: Args:
awg_sequence (pulselib.sequencer.sequencer) : sequence object sequence (pulselib.sequencer.sequencer) : sequence object
Returns: Returns:
set_param (list<PulseLibParameter>) : set paramters for the pulselib to be used in the sweep list[sweep_info] : sweeps along the axes of the sequence.
''' '''
set_param = list() set_params = []
if awg_sequence.shape == (1,): if sequence.shape == (1,):
return set_param return set_params
for i in range(len(awg_sequence.shape)): seq_params = sequence.params
# TODO: Use sequencer.params
param = PulseLibParameter(name=awg_sequence.labels[i].replace(" ", "_"), # Note: reverse order, because axis=0 is fastest running and must thus be last.
label=awg_sequence.labels[i], for param in seq_params[::-1]:
unit=awg_sequence.units[i]) sweep = sweep_info(param)
param.add_setpoints(awg_sequence.setpoints[i], awg_sequence, False) sweep.set_values(param.values)
set_param.append(sweep_info(param, n_points = len(awg_sequence.setpoints[i]))) set_params.append(sweep)
set_param[0].param.lowest_level=True return set_params
return set_param[::-1]
@dataclass @dataclass
class sweep_info(): class sweep_info:
''' '''
data class that hold the sweep info for one of the paramters. data class that hold the sweep info for one of the paramters.
''' '''
param : Parameter = None param: Parameter = None
start : float = 0 start: float = 0
stop : float = 0 stop: float = 0
n_points : int = 50 n_points: int = 50
delay : float = 0 delay: float = 0
def __post_init__(self): def __post_init__(self):
self._values = None self._values = None
self.original_value = None self.original_value = None
if not isinstance(self.param, PulseLibParameter): if not isinstance(self.param, index_param):
self.original_value = self.param() self.original_value = self.param()
def reset_param(self): def reset_param(self):
...@@ -121,6 +70,3 @@ class sweep_info(): ...@@ -121,6 +70,3 @@ class sweep_info():
return np.linspace(self.start, self.stop, self.n_points) return np.linspace(self.start, self.stop, self.n_points)
else: else:
return self._values return self._values
def check_OD_scan(sequence, minstr):
raise Exception('This function was broken beyond repair. Do not use it. [SdS]')
import logging import logging
from qcodes.instrument.specialized_parameters import ElapsedTimeParameter import time
from core_tools.data.measurement import Measurement, AbortMeasurement
from pulse_lib.sequencer import sequencer
from core_tools.data.measurement import Measurement, AbortMeasurement
from core_tools.sweeps.sweep_utility import ( from core_tools.sweeps.sweep_utility import (
SequenceStartAction, SequenceStartAction,
pulselib_2_qcodes, sweep_info get_pulselib_sweeps, sweep_info
) )
from core_tools.job_mgnt.job_meta import job_meta from core_tools.job_mgnt.job_meta import job_meta
from core_tools.job_mgnt.job_mgmt import queue_mgr, ExperimentJob from core_tools.job_mgnt.job_mgmt import queue_mgr, ExperimentJob
from qcodes.instrument.specialized_parameters import ElapsedTimeParameter
import numpy as np from pulse_lib.sequencer import sequencer
import time
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class scan_generic(metaclass=job_meta): class scan_generic(metaclass=job_meta):
'''
function that handeles the loop action and defines the run class.
'''
def __init__(self, *args, name='', reset_param=False, silent=False): def __init__(self, *args, name='', reset_param=False, silent=False):
''' '''
init of the scan function
Args: Args:
args (*list) : provide here the sweep info and meaurment parameters args (list) : sweep info and meaurment parameters
reset_param (bool) : reset the setpoint parametes to their original value after the meaurement reset_param (bool) : reset the setpoint parametes to their original value after the meaurement
silent (bool) : If True do not print dataset id and progress bar silent (bool) : If True do not print dataset id and progress bar
''' '''
...@@ -46,16 +41,16 @@ class scan_generic(metaclass=job_meta): ...@@ -46,16 +41,16 @@ class scan_generic(metaclass=job_meta):
self.set_vars.append(arg) self.set_vars.append(arg)
set_points.append(arg.param) set_points.append(arg.param)
elif isinstance(arg, sequencer): elif isinstance(arg, sequencer):
if arg.shape != (1, ): sequence = arg
set_vars_pulse_lib = pulselib_2_qcodes(arg) if hasattr(sequence, 'starting_lambda'):
for var in set_vars_pulse_lib: raise Exception("starting_lambda is not supported anymore")
self.meas.register_set_parameter(var.param, var.n_points) sweeps_pulse_lib = get_pulselib_sweeps(sequence)
self.set_vars.append(var) for var in sweeps_pulse_lib:
set_points.append(var.param) self.meas.register_set_parameter(var.param, var.n_points)
else: self.set_vars.append(var)
# Sequence without looping parameters. Only upload, no setpoints set_points.append(var.param)
self.actions.append(SequenceStartAction(arg)) self.actions.append(SequenceStartAction(sequence))
self.meas.add_snapshot('sequence', arg.metadata) self.meas.add_snapshot('sequence', sequence.metadata)
elif arg is None: elif arg is None:
continue continue
else: else:
...@@ -104,12 +99,12 @@ class scan_generic(metaclass=job_meta): ...@@ -104,12 +99,12 @@ class scan_generic(metaclass=job_meta):
for param in self.set_vars: for param in self.set_vars:
try: try:
param.reset_param() param.reset_param()
except: except Exception:
logger.error(f'Failed to reset parameter {param.param.name}') logger.error(f'Failed to reset parameter {param.param.name}')
return self.meas.dataset return self.meas.dataset
def put(self, priority = 1): def put(self, priority=1):
''' '''
put the job in a queue. put the job in a queue.
''' '''
...@@ -120,8 +115,8 @@ class scan_generic(metaclass=job_meta): ...@@ -120,8 +115,8 @@ class scan_generic(metaclass=job_meta):
def abort_measurement(self): def abort_measurement(self):
self.meas.abort() self.meas.abort()
def _loop(self, set_param, to_save, dataset): def _loop(self, set_params, to_save, dataset):
if len(set_param) == 0: if len(set_params) == 0:
for action in self.actions: for action in self.actions:
action() action()
m_data = [] m_data = []
...@@ -131,12 +126,13 @@ class scan_generic(metaclass=job_meta): ...@@ -131,12 +126,13 @@ class scan_generic(metaclass=job_meta):
dataset.add_result(*to_save, *m_data) dataset.add_result(*to_save, *m_data)
self.n += 1 self.n += 1
else: else:
param_info = set_param[0] param_info = set_params[0]
for value in param_info.values(): for value in param_info.values():
if not isinstance(param_info.param, ElapsedTimeParameter): param = param_info.param
param_info.param(value) if not isinstance(param, ElapsedTimeParameter):
param(value)
time.sleep(param_info.delay) time.sleep(param_info.delay)
self._loop(set_param[1:], to_save + ((param_info.param, param_info.param()),), dataset) self._loop(set_params[1:], to_save + ((param, param()),), dataset)
def do0D(*m_instr, name='', silent=False): def do0D(*m_instr, name='', silent=False):
...@@ -163,12 +159,12 @@ def do1D(param, start, stop, n_points, delay, *m_instr, name='', reset_param=Fal ...@@ -163,12 +159,12 @@ def do1D(param, start, stop, n_points, delay, *m_instr, name='', reset_param=Fal
silent (bool) : If True do not print dataset id and progress bar silent (bool) : If True do not print dataset id and progress bar
''' '''
m_param = sweep_info(param, start, stop, n_points, delay) m_param = sweep_info(param, start, stop, n_points, delay)
return scan_generic(m_param, *m_instr,name=name, reset_param=reset_param, silent=silent) return scan_generic(m_param, *m_instr, name=name, reset_param=reset_param, silent=silent)
def do2D(param_1, start_1, stop_1, n_points_1, delay_1, def do2D(param_1, start_1, stop_1, n_points_1, delay_1,
param_2, start_2, stop_2, n_points_2, delay_2, *m_instr, name='', param_2, start_2, stop_2, n_points_2, delay_2, *m_instr, name='',
reset_param=False, silent=False): reset_param=False, silent=False):
''' '''
do a 2D scan do a 2D scan
......