From 056b42a12a48307834fa876ed26925f362825108 Mon Sep 17 00:00:00 2001
From: Stephan Philips <s.g.j.philips@tudelft.nl>
Date: Sat, 6 Jun 2020 22:47:15 +0200
Subject: [PATCH] added job queueing to the tool set (needed for calibartion
 mangagment systems)

---
 core_tools/job_mgnt/__init__.py               |   0
 core_tools/job_mgnt/job_meta.py               |  50 +++++++
 core_tools/job_mgnt/job_mgmt.py               | 131 ++++++++++++++++++
 core_tools/job_mgnt/{ => legacy}/README.md    |   0
 .../job_mgnt/{ => legacy}/calibrations.py     |   0
 core_tools/job_mgnt/{ => legacy}/init_test.py |   0
 .../job_mgnt/{ => legacy}/init_test.sqlite    | Bin
 .../job_mgnt/{ => legacy}/job_manager.py      |   0
 .../job_mgnt/{ => legacy}/qubit_class.py      |   0
 .../job_mgnt/{ => legacy}/readout_Test.py     |   0
 .../job_mgnt/{ => legacy}/readout_Test2.py    |   0
 core_tools/job_mgnt/{ => legacy}/sequencer.py |   0
 core_tools/job_mgnt/{ => legacy}/test.py      |   0
 13 files changed, 181 insertions(+)
 create mode 100644 core_tools/job_mgnt/__init__.py
 create mode 100644 core_tools/job_mgnt/job_meta.py
 create mode 100644 core_tools/job_mgnt/job_mgmt.py
 rename core_tools/job_mgnt/{ => legacy}/README.md (100%)
 rename core_tools/job_mgnt/{ => legacy}/calibrations.py (100%)
 rename core_tools/job_mgnt/{ => legacy}/init_test.py (100%)
 rename core_tools/job_mgnt/{ => legacy}/init_test.sqlite (100%)
 rename core_tools/job_mgnt/{ => legacy}/job_manager.py (100%)
 rename core_tools/job_mgnt/{ => legacy}/qubit_class.py (100%)
 rename core_tools/job_mgnt/{ => legacy}/readout_Test.py (100%)
 rename core_tools/job_mgnt/{ => legacy}/readout_Test2.py (100%)
 rename core_tools/job_mgnt/{ => legacy}/sequencer.py (100%)
 rename core_tools/job_mgnt/{ => legacy}/test.py (100%)

diff --git a/core_tools/job_mgnt/__init__.py b/core_tools/job_mgnt/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/core_tools/job_mgnt/job_meta.py b/core_tools/job_mgnt/job_meta.py
new file mode 100644
index 00000000..c4852f96
--- /dev/null
+++ b/core_tools/job_mgnt/job_meta.py
@@ -0,0 +1,50 @@
+from core_tools.sweeps.progressbar import progress_bar
+from core_tools.sweeps.sweep_utility import KILL_EXP
+
+def run_wrapper(run_function):
+    def run(*args, **kwargs):
+        args[0].n = progress_bar(args[0].n_tot)
+        try:
+            returns = run_function(*args, **kwargs)
+        except KILL_EXP:
+            print('kill signal for the current experiment received.')
+            returns = None
+        
+        args[0].n.close()
+        args[0].n = 0
+
+        return returns
+    return run
+
+class job_meta(type):  
+    def __new__(cls,name, bases, dct):
+        if 'run' not in dct:
+            raise ValueError('Please define a run function in your job class.')
+
+        x = super().__new__(cls, name, bases, dct)
+        x.run = run_wrapper(dct['run'])
+
+        x.n_tot = 0
+        x.n = 0
+        x.KILL = False
+        return x
+        
+if __name__ == '__main__':
+    import time
+
+    class pulse_lib_sweep_virt(metaclass=job_meta):
+        def __init__(self, n_steps):
+            self.n_tot = n_steps
+
+        def run(self):
+            for i in range(self.n_tot):
+                time.sleep(0.01)
+                self.n += 1
+    
+    a = pulse_lib_sweep_virt(5)
+    a.run()
+    a.KILL = True
+
+    b = pulse_lib_sweep_virt(5)
+    b.run()
+    print(b.KILL)
\ No newline at end of file
diff --git a/core_tools/job_mgnt/job_mgmt.py b/core_tools/job_mgnt/job_mgmt.py
new file mode 100644
index 00000000..5a8ff019
--- /dev/null
+++ b/core_tools/job_mgnt/job_mgmt.py
@@ -0,0 +1,131 @@
+from dataclasses import dataclass, field
+from typing import Any
+import time
+
+import threading, queue
+from queue import PriorityQueue
+
+@dataclass(order=True)
+class ExperimentJob:
+    priority: float
+    job: Any = field(compare=False)
+
+    def kill(self):
+        self.job.KILL = True
+
+class queue_mgr():
+    def __init__(self):
+        self.q = PriorityQueue()
+        self.job_refs = list()
+
+        def worker():
+            while True:
+                n_jobs = self.q.qsize()
+                if n_jobs != 0:
+                    print('{} items queued.'.format(n_jobs))
+                    print('Starting new job.')
+                    job_object = self.q.get()
+                    try:
+                        print(job_object.job.KILL)
+                        if job_object.job.KILL != True:
+                            job_object.job.run()
+                    except:
+                        print('an exception in the job occurred? Going to the next job.')
+                    self.q.task_done()
+                else:
+                    # 200ms sleep.
+                    time.sleep(0.2)
+
+        self.worker_thread = threading.Thread(target=worker, daemon=True).start()
+
+    def put(self, job):
+        '''
+        put a job into the measurement queue
+
+        Args:
+            job (ExperimentJob) : job object
+        '''
+        self.q.put(job)
+        self.job_refs.append(job)
+
+    def kill(self, job):
+        '''
+        kill a certain job that has been submitted to the queue
+
+        Args:
+            job (ExperimentJob) : job object
+        '''
+        job.KILL = True
+
+    def killall(self):
+        '''
+        kill all the jobs
+        '''
+        for job in self.job_refs:
+            job.kill()
+
+        self.job_refs = []
+
+    def join(self):
+        self.q.join()
+
+    @property
+    def n_jobs(self):
+        return self.q.qsize()
+
+
+#%%
+if __name__ == '__main__':
+    from core_tools.sweeps.sweeps import do1D, do2D
+    import os
+    import qcodes as qc
+    from qcodes.dataset.sqlite.database import initialise_or_create_database_at
+    from qcodes.dataset.experiment_container import load_or_create_experiment
+    from qcodes.instrument.specialized_parameters import ElapsedTimeParameter
+    class MyCounter(qc.Parameter):
+        def __init__(self, name):
+            # only name is required
+            super().__init__(name, label='Times this has been read',
+                             docstring='counts how many times get has been called '
+                                       'but can be reset to any integer >= 0 by set')
+            self._count = 0
+
+        # you must provide a get method, a set method, or both.
+        def get_raw(self):
+            self._count += 1
+            return self._count
+
+        def set_raw(self, val):
+            self._count = val
+
+    tutorial_db_path = os.path.join(os.getcwd(), 'linking_datasets_tutorial.db')
+    initialise_or_create_database_at(tutorial_db_path)
+    load_or_create_experiment('tutorial', 'no sample')
+
+    my_param = MyCounter('test_instr')
+
+    x = qc.Parameter(name='x', label='Voltage_x', unit='V',
+              set_cmd=None, get_cmd=None)
+    y = qc.Parameter(name='y', label='Voltage_y', unit='V',
+              set_cmd=None, get_cmd=None)
+    timer = ElapsedTimeParameter('time')
+
+    scan1 = do2D(x, 0, 20, 20, 0.0, y, 0, 80, 30, 0.1, my_param)
+    scan2 = do2D(x, 0, 20, 20, 0.0, timer, 0, 80, 30, .1, my_param)
+    scan3 = do1D(x, 0, 100, 50, 0.1 , my_param, reset_param=True)
+
+    q = queue_mgr()
+    job1 = ExperimentJob(1, scan1)
+    job2 = ExperimentJob(1, scan2)
+    job3 = ExperimentJob(1, scan3)
+    q.put(job1)
+    q.put(job2)
+    q.put(job3)
+    
+    q.killall()
+    scan1 = do2D(x, 0, 20, 20, 0.0, y, 0, 80, 30, 0.1, my_param)
+    scan2 = do2D(x, 0, 20, 20, 0.0, timer, 0, 80, 30, .1, my_param)
+    job1 = ExperimentJob(1, scan1)
+    job2 = ExperimentJob(1, scan2)
+    q.put(job1)
+    q.put(job2)
\ No newline at end of file
diff --git a/core_tools/job_mgnt/README.md b/core_tools/job_mgnt/legacy/README.md
similarity index 100%
rename from core_tools/job_mgnt/README.md
rename to core_tools/job_mgnt/legacy/README.md
diff --git a/core_tools/job_mgnt/calibrations.py b/core_tools/job_mgnt/legacy/calibrations.py
similarity index 100%
rename from core_tools/job_mgnt/calibrations.py
rename to core_tools/job_mgnt/legacy/calibrations.py
diff --git a/core_tools/job_mgnt/init_test.py b/core_tools/job_mgnt/legacy/init_test.py
similarity index 100%
rename from core_tools/job_mgnt/init_test.py
rename to core_tools/job_mgnt/legacy/init_test.py
diff --git a/core_tools/job_mgnt/init_test.sqlite b/core_tools/job_mgnt/legacy/init_test.sqlite
similarity index 100%
rename from core_tools/job_mgnt/init_test.sqlite
rename to core_tools/job_mgnt/legacy/init_test.sqlite
diff --git a/core_tools/job_mgnt/job_manager.py b/core_tools/job_mgnt/legacy/job_manager.py
similarity index 100%
rename from core_tools/job_mgnt/job_manager.py
rename to core_tools/job_mgnt/legacy/job_manager.py
diff --git a/core_tools/job_mgnt/qubit_class.py b/core_tools/job_mgnt/legacy/qubit_class.py
similarity index 100%
rename from core_tools/job_mgnt/qubit_class.py
rename to core_tools/job_mgnt/legacy/qubit_class.py
diff --git a/core_tools/job_mgnt/readout_Test.py b/core_tools/job_mgnt/legacy/readout_Test.py
similarity index 100%
rename from core_tools/job_mgnt/readout_Test.py
rename to core_tools/job_mgnt/legacy/readout_Test.py
diff --git a/core_tools/job_mgnt/readout_Test2.py b/core_tools/job_mgnt/legacy/readout_Test2.py
similarity index 100%
rename from core_tools/job_mgnt/readout_Test2.py
rename to core_tools/job_mgnt/legacy/readout_Test2.py
diff --git a/core_tools/job_mgnt/sequencer.py b/core_tools/job_mgnt/legacy/sequencer.py
similarity index 100%
rename from core_tools/job_mgnt/sequencer.py
rename to core_tools/job_mgnt/legacy/sequencer.py
diff --git a/core_tools/job_mgnt/test.py b/core_tools/job_mgnt/legacy/test.py
similarity index 100%
rename from core_tools/job_mgnt/test.py
rename to core_tools/job_mgnt/legacy/test.py
-- 
GitLab