Skip to content
Snippets Groups Projects
Commit 95ea1763 authored by Stephan Philips's avatar Stephan Philips
Browse files

added generic scan functions in qcodes new dataset format

parent f7ba7adb
No related branches found
No related tags found
No related merge requests found
Showing
with 487 additions and 0 deletions
File deleted
File deleted
File deleted
File deleted
File deleted
File deleted
File deleted
import tqdm
class progress_bar():
def __init__(self, n_tot):
self.bar = tqdm.tqdm(total = n_tot, unit='points', ncols=80)
self.n = 0
def __add__(self, n):
self.bar.update(n)
self.n += n
return self
def close(self):
self.bar.close()
if __name__ == '__main__':
import time
p = progress_bar(50)
for i in range(50):
time.sleep(0.01)
p += 1
p.close()
\ No newline at end of file
from core_tools.sweeps.sweep_utility import sweep_info
from qcodes import Parameter
class PulseLibParameter(Parameter):
setpoints = None
flat_index = 0
def add_setpoints(self, setpoints, sequencer, lowest_level):
self.setpoints = setpoints
self.sequencer = sequencer
self.lowest_level = lowest_level
def get_raw(self):
current_val = self.setpoints[self.flat_index%len(self.setpoints)]
self.flat_index += 1
if self.flat_index > np.prod(self.sequencer.shape):
self.flat_index = 0
return current_val
def set_raw(self, value):
if self.lowest_level:
if self.flat_index == 0:
self.sequencer.upload(np.unravel_index(flat_index, self.sequencer.shape))
index = np.unravel_index(flat_index, self.shape)
self.sequencer.play(index)
if flat_index < np.prod(self.shape) - 1:
self.sequencer.upload(np.unravel_index(flat_index+1, self.shape))
self.sequencer.uploader.wait_until_AWG_idle()
'''
@ sander, how can we make sure that a unused upload is removed when the garbage collector collects this?
(e.g. when a set is performed to reset parameters -- normally this does not happen, but user might accidentatly do this.)
'''
def pulselib_2_qcodes(awg_sequence):
'''
convert pulse sequencer object in qcodes parameters that are usable in sweeps.
Args:
awg_sequence (pulselib.sequencer.sequencer) : sequence object
Returns:
set_param (list<PulseLibParameter>) : set paramters for the pulselib to be used in the sweep
'''
set_param = list()
for i in range(len(awg_sequence.shape)):
param = PulseLibParameter(name = awg_sequence.labels[i].replace(" ", "_"), label=awg_sequence.labels[i], unit= awg_sequence.units[i])
param.add_setpoints(awg_sequence.setpoints[i], awg_sequence, False)
set_param.append(sweep_info(param, n_points = len(awg_sequence.setpoints[i])))
set_param[0].param.lowest_level=True
return set_param[::-1]
if __name__ == '__main__':
from qcodes import Parameter, MultiParameter, new_experiment
import numpy as np
import time
new_experiment("name", "testing")
class test_AWG_sequence0D(object):
"""docstring for test_AWG_sequence"""
def __init__(self):
super(test_AWG_sequence0D, self).__init__()
self.shape = (1, )
self.uploader = uploader()
self.units = ("V",)
self.labels = ("y axis",)
self.setpoints = (np.linspace(20,50,1),)
def play(self, idx):
time.sleep(0.01)
pass
def upload(self, idx):
pass
class test_AWG_sequence1D(object):
"""docstring for test_AWG_sequence"""
def __init__(self):
super(test_AWG_sequence1D, self).__init__()
self.shape = (50, )
self.uploader = uploader()
self.units = ("V",)
self.labels = ("y axis",)
self.setpoints = (np.linspace(20,50,50),)
def play(self, idx):
time.sleep(0.01)
pass
def upload(self, idx):
pass
class test_AWG_sequence2D(object):
"""docstring for test_AWG_sequence"""
def __init__(self):
super(test_AWG_sequence2D, self).__init__()
self.shape = (50, 50)
self.units = ("V", "V")
self.labels = ("x axis", "y axis")
self.setpoints = (np.linspace(20,50,50), np.linspace(50,125,50))
self.uploader = uploader()
def play(self, idx):
time.sleep(0.01)
pass
def upload(self, idx):
pass
class dummy_parameter(Parameter):
def __init__(self, name, label=None, unit=None):
super().__init__(name=name,
instrument=None,
labels=( "digitzer_response"),
units=("unit1" ))
class dummy_multi_parameter(MultiParameter):
def __init__(self, name, label=None, unit=None):
super().__init__(name=name,
instrument=None,
names=("test12","test1234"),
shapes=( (200, ) , (200, ), ),
labels=( "digitzer_response", "D2"),
units=("unit1", "unit2"), )
self.setpoints = ( (np.linspace(70,500,200), ), (np.linspace(70,500,200), ))
self.setpoint_shapes = ( (200, ), (200, ))
self.setpoint_labels = ( ("I channel", ), ('Q channel', ))
self.setpoint_units = ( ("mV", ), ("mV", ))
self.setpoint_names = ( ("test_name", ), ("testname_2", ))
self.i = 2
def get_raw(self):
self.i +=1
return (np.linspace(0,500,200)+self.i, np.linspace(0,500,200)+self.i+100)
class dummy_multi_parameter_2dawg(MultiParameter):
def __init__(self, name, label=None, unit=None):
super().__init__(name=name,
instrument=None,
names=("test12","test1234"),
shapes=( tuple() , tuple() ),
labels=( "digitzer_response", "D2"),
units=("unit1", "unit2"), )
self.setpoints = ( tuple(), tuple())
self.setpoint_shapes = ( tuple(), tuple())
self.setpoint_labels = ( ("I channel", ), ('Q channel', ))
self.setpoint_units = ( ("mV", ), ("mV", ))
self.setpoint_names = ( ("I_channel", ), ("Q_channel", ))
self.i = 2
def get_raw(self):
self.i +=1
return (self.i, self.i+100)
class uploader(object):
"""docstring for uploader"""
def __init__(self, ):
super(uploader, self).__init__()
def wait_until_AWG_idle(self):
'''
check if the AWG is doing playback, when done, release this function
'''
time.sleep(0.01)
pass
measurment_parameter = dummy_multi_parameter("digitzer_1", label="qubit_1 (spin up)", unit="%")
measurment_parameter2D = dummy_multi_parameter_2dawg("digitzer_1", label="qubit_1 (spin up)", unit="%")
awg_sequence0D = test_AWG_sequence0D()
awg_sequence1D = test_AWG_sequence1D()
awg_sequence2D = test_AWG_sequence2D()
test = pulselib_2_qcodes(awg_sequence0D)
print(test)
\ No newline at end of file
from dataclasses import dataclass
from qcodes import Parameter
class KILL_EXP(Exception):
pass
@dataclass
class sweep_info():
'''
data class that hold the sweep info for one of the paramters.
-- also contains a looper - (should this one move to somewhere else?)
'''
_param : Parameter = None
start : float = 0
stop : float = 0
n_points : int = 50
delay : float = 0
def __post_init__(self):
self.param = self._param
@property
def param(self):
return self._param
@param.setter
def param(self, input_param):
self.param_val = input_param.get()
self._param = input_param
def reset_param(self):
self._param.set(self.param_val)
def get_measure_data(m_instr):
'''
measure date for given paramters in m_instr
Args:
m_instr (list<qc.Parameter>) : list with parameters to be measured
Returns
my_data (list<qc.Parameter>), np.ndarray/str/float/int>)
'''
my_data = []
for instr in m_instr:
my_data.append( (instr, instr.get()))
return my_data
class PulseLibParameter(Parameter):
setpoints = None
flat_index = 0
def add_setpoints(self, setpoints, sequencer, lowest_level):
self.setpoints = setpoints
self.sequencer = sequencer
self.lowest_level = lowest_level
def get_raw(self):
current_val = self.setpoints[self.flat_index%len(self.setpoints)]
self.flat_index += 1
if self.flat_index > np.prod(self.sequencer.shape):
self.flat_index = 0
return current_val
def set_raw(self, value):
if self.lowest_level:
if self.flat_index == 0:
self.sequencer.upload(np.unravel_index(flat_index, self.sequencer.shape))
index = np.unravel_index(flat_index, self.shape)
self.sequencer.play(index)
if flat_index < np.prod(self.shape) - 1:
self.sequencer.upload(np.unravel_index(flat_index+1, self.shape))
self.sequencer.uploader.wait_until_AWG_idle()
'''
@ sander, how can we make sure that a unused upload is removed when the garbage collector collects this?
(e.g. when a set is performed to reset parameters -- normally this does not happen, but user might accidentatly do this.)
'''
def pulselib_2_qcodes(awg_sequence):
'''
convert pulse sequencer object in qcodes parameters that are usable in sweeps.
Args:
awg_sequence (pulselib.sequencer.sequencer) : sequence object
Returns:
set_param (list<PulseLibParameter>) : set paramters for the pulselib to be used in the sweep
'''
set_param = list()
for i in range(len(awg_sequence.shape)):
param = PulseLibParameter(name = awg_sequence.labels[i].replace(" ", "_"), label=awg_sequence.labels[i], unit= awg_sequence.units[i])
param.add_setpoints(awg_sequence.setpoints[i], awg_sequence, False)
set_param.append(sweep_info(param, n_points = len(awg_sequence.setpoints[i])))
set_param[0].param.lowest_level=True
return set_param[::-1]
from qcodes.instrument.specialized_parameters import ElapsedTimeParameter
from qcodes.dataset.measurements import Measurement
from pulse_lib.sequencer import sequencer
from core_tools.sweeps.sweep_utility import pulselib_2_qcodes, sweep_info, get_measure_data, KILL_EXP
from core_tools.job_mgnt.job_meta import job_meta
import numpy as np
import time
class scan_generic(metaclass=job_meta):
'''
function that handeles the loop action and defines the run class.
'''
def __init__(self, *args, reset_param=False):
'''
init of the scan function
Args:
args (*list) : provide here the sweep info and meaurment parameters
reset_param (bool) : reset the setpoint parametes to their original value after the meaurement
'''
self.meas = Measurement()
self.set_vars = []
self.m_instr = []
self.reset_param = reset_param
set_points = []
for arg in args:
if isinstance(arg, sweep_info):
self.meas.register_parameter(arg.param)
self.set_vars.append(arg)
set_points.append(arg.param)
elif isinstance(arg, sequencer):
set_vars_pulse_lib = pulselib_2_qcodes(arg)
for var in set_vars_pulse_lib:
self.meas.register_parameter(var.param)
set_points.append(var.param)
self.set_vars += set_vars_pulse_lib
else:
self.m_instr.append(arg)
for instr in self.m_instr:
self.meas.register_parameter(instr, setpoints=tuple(set_points[::-1]))
self.n_tot = 1
for swp_info in self.set_vars:
self.n_tot *= swp_info.n_points
def run(self):
'''
run function
-- starts the meaurement and saves the data
-- optionally also resets the paramters
-- wrapped by the job_meta class (allows for progress bar to appear)
'''
with self.meas.run() as datasaver:
self._loop(self.set_vars, self.m_instr, tuple(), datasaver)
dataset = datasaver.dataset
if self.reset_param:
for param in self.set_vars:
param.reset_param()
return dataset
def _loop(self, set_param, m_instr, to_save, datasaver):
if len(set_param) == 0:
if self.KILL == False:
datasaver.add_result(*to_save, *get_measure_data(m_instr))
self.n += 1
else:
raise KILL_EXP
else:
param_info = set_param[0]
for value in np.linspace(param_info.start, param_info.stop, param_info.n_points):
if not isinstance(param_info.param, ElapsedTimeParameter):
param_info.param(value)
time.sleep(param_info.delay)
self._loop(set_param[1:], m_instr, to_save + ((param_info.param, param_info.param()),), datasaver)
def do0D(*m_instr):
'''
do a 0D scan
Args:
m_instr (*list) : list of parameters to measure
'''
return scan_generic(*m_instr, reset_param=False)
def do1D(param, start, stop, n_points, delay, *m_instr, reset_param=False):
'''
do a 1D scan
Args:
param (qc.Parameter) : parameter to be swept
start (float) : start value of the sweep
stop (float) : stop value of the sweep
delay (float) : time to wait after the set of the parameter
m_instr (*list) : list of parameters to measure
reset_param (bool) : reset the setpoint parametes to their original value after the meaurement
'''
m_param = sweep_info(param, start, stop, n_points, delay)
return scan_generic(m_param, *m_instr, reset_param=reset_param)
def do2D(param_1, start_1, stop_1, n_points_1, delay_1,
param_2, start_2, stop_2, n_points_2, delay_2, *m_instr, reset_param=False):
'''
do a 2D scan
Args:
param_1 (qc.Parameter) : parameter to be swept
start_1 (float) : start value of the sweep
stop_1 (float) : stop value of the sweep
delay_1 (float) : time to wait after the set of the parameter
param_2 (qc.Parameter) : parameter to be swept
start_2 (float) : start value of the sweep
stop_2 (float) : stop value of the sweep
delay_2 (float) : time to wait after the set of the parameter
m_instr (*list) : list of parameters to measure
reset_param (bool) : reset the setpoint parametes to their original value after the meaurement
'''
m_param_1 = sweep_info(param_1, start_1, stop_1, n_points_1, delay_1)
m_param_2 = sweep_info(param_2, start_2, stop_2, n_points_2, delay_2)
return scan_generic(m_param_2, m_param_1, *m_instr, reset_param=reset_param)
if __name__ == '__main__':
import os
import datetime
import numpy as np
import scipy.optimize as opt
import matplotlib.pyplot as plt
import qcodes as qc
from qcodes.dataset.plotting import plot_dataset
from qcodes.dataset.data_set import load_by_run_spec
from qcodes.dataset.sqlite.database import initialise_or_create_database_at
from qcodes.dataset.experiment_container import load_or_create_experiment
from qcodes.tests.instrument_mocks import MockParabola
station = qc.station.Station()
station.add_component(MockParabola(name='MockParabola'))
class MyCounter(qc.Parameter):
def __init__(self, name):
# only name is required
super().__init__(name, label='Times this has been read',
docstring='counts how many times get has been called '
'but can be reset to any integer >= 0 by set')
self._count = 0
# you must provide a get method, a set method, or both.
def get_raw(self):
self._count += 1
return self._count
def set_raw(self, val):
self._count = val
now = str(datetime.datetime.now())
tutorial_db_path = os.path.join(os.getcwd(), 'linking_datasets_tutorial.db')
initialise_or_create_database_at(tutorial_db_path)
load_or_create_experiment('tutorial ' + now, 'no sample')
my_param = MyCounter('test_instr')
from qcodes.instrument.specialized_parameters import ElapsedTimeParameter
x = qc.Parameter(name='x', label='Voltage_x', unit='V',
set_cmd=None, get_cmd=None)
y = qc.Parameter(name='y', label='Voltage_y', unit='V',
set_cmd=None, get_cmd=None)
timer = ElapsedTimeParameter('time')
# do0D(my_param).run()
# do1D(x, 0, 100, 50, 0.1 , my_param, reset_param=True).run()
do2D(x, 0, 20, 20, 0.0, y, 0, 80, 30, 0.01, my_param).run()
do2D(x, 0, 20, 20, 0.0, timer, 0, 80, 30, 0.1, my_param).run()
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment