Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • qutech-qdlabs/core_tools
1 result
Show changes
Commits on Source (5)
# Changelog # Changelog
All notable changes to core_tools will be documented in this file. All notable changes to core_tools will be documented in this file.
## \[1.5.11] - 2025-02-20
- Added snapshot data_writer
- Added simple HTTP server to start scripts/functions based on ScriptRunner
- Added example of live exporter writing json files
## \[1.5.10] - 2025-02-17 ## \[1.5.10] - 2025-02-17
- Removed unintended print statement. - Removed unintended print statement.
......
import inspect
import logging
import os
from enum import Enum
from abc import ABC, abstractmethod
try:
from spyder_kernels.customize.spydercustomize import runcell
except Exception:
runcell = None
logger = logging.getLogger(__name__)
class Command(ABC):
def __init__(self, name, parameters, defaults={}):
self.name = name
self.parameters = parameters
self.defaults = defaults
@abstractmethod
def __call__(self):
pass
class Cell(Command):
'''
A reference to an IPython cell with Python code to be run as command in
ScriptRunner.
If command_name is not specified then cell+filename with be used as command name.
Args:
cell: name or number of cell to run.
python_file: filename of Python file.
command_name: Optional name to show for the command in ScriptRunner.
'''
def __init__(self, cell: str | int, python_file: str, command_name: str | None = None):
filename = os.path.basename(python_file)
name = f'{cell} ({filename})' if command_name is None else command_name
super().__init__(name, {})
self.cell = cell
self.python_file = python_file
if runcell is None:
raise Exception('runcell not available. Upgrade to Spyder 4+ to use Cell()')
def __call__(self):
command = f"runcell({self.cell}, '{self.python_file}')"
# print(command)
logger.info(command)
runcell(self.cell, self.python_file)
class Function(Command):
'''
A reference to a function to be run as command in ScriptRunner.
The argument types of the function are displayed in the GUI. The entered
data is converted to the specified type. Currently the types `str`, `int`, `float` and `bool`
are supported.
If the type of the argument is not specified, then a string will be passed to the function.
If command_name is not specified, then the name of func is used as
command name.
The additional keyword arguments will be entered as default values in the GUI.
Args:
func: function to execute.
command_name: Optional name to show for the command in ScriptRunner.
kwargs: default arguments to pass with function.
'''
def __init__(self, func: any, command_name: str | None = None, **kwargs):
if command_name is None:
command_name = func.__name__
signature = inspect.signature(func)
parameters = {p.name: p for p in signature.parameters.values()}
defaults = {}
for name, parameter in parameters.items():
if parameter.default is not inspect._empty:
defaults[name] = parameter.default
if parameter.annotation is inspect._empty:
logger.warning(f'No annotation for `{name}`, assuming string')
defaults.update(**kwargs)
super().__init__(command_name, parameters, defaults)
self.func = func
def _convert_arg(self, param_name, value):
annotation = self.parameters[param_name].annotation
if annotation is inspect._empty:
# no type specified. Pass string.
return value
if isinstance(annotation, str):
raise Exception('Cannot convert to type specified as a string')
if issubclass(annotation, bool):
return value in [True, 1, 'True', 'true', '1']
if issubclass(annotation, Enum):
return annotation[value]
return annotation(value)
def __call__(self, **kwargs):
call_args = {}
for name in self.parameters:
try:
call_args[name] = self.defaults[name]
except KeyError:
pass
try:
call_args[name] = self._convert_arg(name, kwargs[name])
except KeyError:
pass
args_list = [f'{name}={repr(value)}' for name, value in call_args.items()]
command = f'{self.func.__name__}({", ".join(args_list)})'
print(command)
logger.info(command)
return self.func(**call_args)
...@@ -2,17 +2,15 @@ import logging ...@@ -2,17 +2,15 @@ import logging
import inspect import inspect
import os import os
from enum import Enum from enum import Enum
from abc import ABC, abstractmethod
from PyQt5 import QtCore, QtWidgets from PyQt5 import QtCore, QtWidgets
from core_tools.GUI.keysight_videomaps.liveplotting import liveplotting
from core_tools.GUI.script_runner.script_runner_gui import Ui_MainWindow from core_tools.GUI.script_runner.script_runner_gui import Ui_MainWindow
from core_tools.GUI.script_runner.commands import Function, Cell
from core_tools.GUI.script_runner.web_server import run_web_server
from core_tools.GUI.qt_util import qt_log_exception from core_tools.GUI.qt_util import qt_log_exception
from core_tools.GUI.keysight_videomaps.liveplotting import liveplotting
try:
from spyder_kernels.customize.spydercustomize import runcell
except Exception:
runcell = None
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -45,6 +43,7 @@ class ScriptRunner(QtWidgets.QMainWindow, Ui_MainWindow): ...@@ -45,6 +43,7 @@ class ScriptRunner(QtWidgets.QMainWindow, Ui_MainWindow):
super(QtWidgets.QMainWindow, self).__init__() super(QtWidgets.QMainWindow, self).__init__()
self.setupUi(self) self.setupUi(self)
self.video_mode_running = False self.video_mode_running = False
self.video_mode_label = QtWidgets.QLabel("VideoMode: <unknown") self.video_mode_label = QtWidgets.QLabel("VideoMode: <unknown")
self.video_mode_label.setMargin(2) self.video_mode_label.setMargin(2)
...@@ -96,10 +95,11 @@ class ScriptRunner(QtWidgets.QMainWindow, Ui_MainWindow): ...@@ -96,10 +95,11 @@ class ScriptRunner(QtWidgets.QMainWindow, Ui_MainWindow):
python_file: filename of Python file. python_file: filename of Python file.
command_name: Optional name to show for the command in ScriptRunner. command_name: Optional name to show for the command in ScriptRunner.
''' '''
if runcell is None:
raise Exception('runcell not available. Upgrade to Spyder 4+ to use add_cell()')
self._add_command(Cell(cell, python_file, command_name)) self._add_command(Cell(cell, python_file, command_name))
def run_server(self):
run_web_server(self.commands)
@qt_log_exception @qt_log_exception
def closeEvent(self, event): def closeEvent(self, event):
self.timer.stop() self.timer.stop()
...@@ -213,112 +213,6 @@ class ScriptRunner(QtWidgets.QMainWindow, Ui_MainWindow): ...@@ -213,112 +213,6 @@ class ScriptRunner(QtWidgets.QMainWindow, Ui_MainWindow):
liveplotting.last_instance._2D_start_stop() liveplotting.last_instance._2D_start_stop()
class Command(ABC):
def __init__(self, name, parameters, defaults={}):
self.name = name
self.parameters = parameters
self.defaults = defaults
@abstractmethod
def __call__(self):
pass
class Cell(Command):
'''
A reference to an IPython cell with Python code to be run as command in
ScriptRunner.
If command_name is not specified then cell+filename with be used as command name.
Args:
cell: name or number of cell to run.
python_file: filename of Python file.
command_name: Optional name to show for the command in ScriptRunner.
'''
def __init__(self, cell: str | int, python_file: str, command_name: str | None = None):
filename = os.path.basename(python_file)
name = f'{cell} ({filename})' if command_name is None else command_name
super().__init__(name, {})
self.cell = cell
self.python_file = python_file
if runcell is None:
raise Exception('runcell not available. Upgrade to Spyder 4+ to use Cell()')
def __call__(self):
command = f"runcell({self.cell}, '{self.python_file}')"
print(command)
logger.info(command)
runcell(self.cell, self.python_file)
class Function(Command):
'''
A reference to a function to be run as command in ScriptRunner.
The argument types of the function are displayed in the GUI. The entered
data is converted to the specified type. Currently the types `str`, `int`, `float` and `bool`
are supported.
If the type of the argument is not specified, then a string will be passed to the function.
If command_name is not specified, then the name of func is used as
command name.
The additional keyword arguments will be entered as default values in the GUI.
Args:
func: function to execute.
command_name: Optional name to show for the command in ScriptRunner.
kwargs: default arguments to pass with function.
'''
def __init__(self, func: any, command_name: str | None = None, **kwargs):
if command_name is None:
command_name = func.__name__
signature = inspect.signature(func)
parameters = {p.name: p for p in signature.parameters.values()}
defaults = {}
for name, parameter in parameters.items():
if parameter.default is not inspect._empty:
defaults[name] = parameter.default
if parameter.annotation is inspect._empty:
logger.warning(f'No annotation for `{name}`, assuming string')
defaults.update(**kwargs)
super().__init__(command_name, parameters, defaults)
self.func = func
def _convert_arg(self, param_name, value):
annotation = self.parameters[param_name].annotation
if annotation is inspect._empty:
# no type specified. Pass string.
return value
if isinstance(annotation, str):
raise Exception('Cannot convert to type specified as a string')
if issubclass(annotation, bool):
return value in [True, 1, 'True', 'true', '1']
if issubclass(annotation, Enum):
return annotation[value]
return annotation(value)
def __call__(self, **kwargs):
call_args = {}
for name in self.parameters:
try:
call_args[name] = self.defaults[name]
except KeyError:
pass
try:
call_args[name] = self._convert_arg(name, kwargs[name])
except KeyError:
pass
args_list = [f'{name}={repr(value)}' for name, value in call_args.items()]
command = f'{self.func.__name__}({", ".join(args_list)})'
print(command)
logger.info(command)
return self.func(**call_args)
if __name__ == "__main__": if __name__ == "__main__":
def sayHi(name: str, times: int = 1): def sayHi(name: str, times: int = 1):
...@@ -343,7 +237,11 @@ if __name__ == "__main__": ...@@ -343,7 +237,11 @@ if __name__ == "__main__":
ui.add_function(fit, 'Fit it', mode=Mode.CENTER, x=1.0) ui.add_function(fit, 'Fit it', mode=Mode.CENTER, x=1.0)
ui.add_function(fit, 'Fit it', mode='center', x=1.0) ui.add_function(fit, 'Fit it', mode='center', x=1.0)
ui.add_function(fit, 'Fit it', mode='CENTER', x=1.0) ui.add_function(fit, 'Fit it', mode='CENTER', x=1.0)
ui.add_cell('Say Hi', path+'/test_script.py') # ui.add_cell('Say Hi', path+'/test_script.py')
ui.add_cell(2, path+'/test_script.py', 'Magic Button') # ui.add_cell(2, path+'/test_script.py', 'Magic Button')
ui.add_cell('Oops', path+'/test_script.py') # ui.add_cell('Oops', path+'/test_script.py')
ui.add_cell('Syntax Error', path+'/test_script.py') # ui.add_cell('Syntax Error', path+'/test_script.py')
# NOTE:
# To start servicing http requests run:
ui.run_server()
import json
from datetime import datetime
from functools import cached_property
from http.cookies import SimpleCookie
from http.server import HTTPServer, BaseHTTPRequestHandler, HTTPStatus
from urllib.parse import parse_qsl, urlparse
from core_tools.GUI.script_runner.commands import Command
class WebRequestHandler(BaseHTTPRequestHandler):
@cached_property
def url(self):
return urlparse(self.path)
@cached_property
def query_data(self):
return dict(parse_qsl(self.url.query))
@cached_property
def post_data(self):
content_length = int(self.headers.get("Content-Length", 0))
return self.rfile.read(content_length)
@cached_property
def form_data(self):
return dict(parse_qsl(self.post_data.decode("utf-8")))
@cached_property
def cookies(self):
return SimpleCookie(self.headers.get("Cookie"))
def log_request(self, code='-', size='-'):
pass
def do_GET(self):
if self.url.path == "/functions":
# return list of functions with arguments
response = json.dumps(self.get_function_descriptions()).encode("utf-8")
self.send_response(HTTPStatus.OK)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(response)
else:
self.send_error(HTTPStatus.NOT_FOUND)
def do_POST(self):
if self.url.path == "/run":
try:
data = self.form_data
cmd_name = data["__name__"]
kwargs = {
k: v
for k, v in data.items() if k != "__name__"
}
for command in self.server.commands:
if command.name == cmd_name:
try:
result = command(**kwargs)
except Exception as ex:
print(ex)
raise
break
else:
self.send_error(HTTPStatus.NOT_FOUND)
response = json.dumps(result).encode("utf-8")
self.send_response(HTTPStatus.OK)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(response)
except Exception as ex:
response = json.dumps(str(ex)).encode("utf-8")
self.send_response(HTTPStatus.BAD_REQUEST)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(response)
else:
self.send_error(HTTPStatus.NOT_FOUND)
def get_function_descriptions(self):
functions = []
for command in self.server.commands:
command: Command = command
parameters = command.parameters
functions.append({
"__name__": command.name,
"args": [{
"name": name,
"type": parameter.annotation.__name__,
}
for name, parameter in parameters.items()],
})
return functions
def get_response(self):
return json.dumps(
{
"now": datetime.now().isoformat(),
"path": self.url.path,
"query_data": self.query_data,
"post_data": self.post_data.decode("utf-8"),
"form_data": self.form_data,
"cookies": {
name: cookie.value
for name, cookie in self.cookies.items()
},
}
)
def run_web_server(command_list: list[Command]):
server_address = ('127.0.0.1', 8001)
httpd = HTTPServer(server_address, WebRequestHandler)
httpd.commands = command_list
try:
print(f"Server running at http://{server_address[0]}:{server_address[1]}")
print("Interrupt kernel to stop server")
httpd.serve_forever()
except KeyboardInterrupt:
httpd.shutdown()
# %%
if __name__ == "__main__":
from enum import Enum
from core_tools.GUI.script_runner.commands import Function
def sayHi(name: str, times: int = 1):
for _ in range(times):
print(f'Hi {name}')
return name
class Mode(str, Enum):
LEFT = 'left'
CENTER = 'center'
RIGHT = 'right'
def fit(x: float, mode: Mode):
print(f'fit {x}, {mode}')
command_list = [
Function(sayHi),
Function(fit),
]
run_web_server(command_list)
...@@ -10,4 +10,4 @@ from core_tools.startup.gui import ( ...@@ -10,4 +10,4 @@ from core_tools.startup.gui import (
start_script_runner, start_script_runner,
) )
__version__ = "1.5.10" __version__ = "1.5.11"
...@@ -41,7 +41,7 @@ class _Action: ...@@ -41,7 +41,7 @@ class _Action:
class DataWriter: class DataWriter:
def __init__(self, name, *args): def __init__(self, name, *args, snapshot_data: dict[str, any] | None = None):
self._measurement = Measurement(name, silent=True) self._measurement = Measurement(name, silent=True)
self._actions = [] self._actions = []
self._set_params = [] self._set_params = []
...@@ -55,7 +55,10 @@ class DataWriter: ...@@ -55,7 +55,10 @@ class DataWriter:
self._add_data(arg) self._add_data(arg)
else: else:
raise TypeError(f"Unknown argument of type {type(arg)}") raise TypeError(f"Unknown argument of type {type(arg)}")
self._measurement.add_snapshot('data_writer', {'message': 'Data written by data writer'}) if snapshot_data:
self._measurement.add_snapshot('data_writer', snapshot_data)
else:
self._measurement.add_snapshot('data_writer', {'message': 'Data written by data writer'})
def _add_axis(self, axis): def _add_axis(self, axis):
param = ManualParameter(axis.name, label=axis.label, unit=axis.unit) param = ManualParameter(axis.name, label=axis.label, unit=axis.unit)
...@@ -100,13 +103,14 @@ class DataWriter: ...@@ -100,13 +103,14 @@ class DataWriter:
self._loop(iaction + 1, isetpoint) self._loop(iaction + 1, isetpoint)
def write_data(name: str, *args): def write_data(name: str, *args, snapshot_data: dict[str, any] | None = None):
''' '''
Creates a dataset `name` using the specified Axis and Data. Creates a dataset `name` using the specified Axis and Data.
Args: Args:
name: name of the dataset. name: name of the dataset.
args: list of Axis and Data objects. args: list of Axis and Data objects.
snapshot_data: Data to be added to the snapshot.
Example: Example:
write_data( write_data(
...@@ -131,7 +135,7 @@ def write_data(name: str, *args): ...@@ -131,7 +135,7 @@ def write_data(name: str, *args):
Data('z', 'z', 'a.u., <array with shape(len(values_a), len(values_b), len(values_c)>), Data('z', 'z', 'a.u., <array with shape(len(values_a), len(values_b), len(values_c)>),
) )
''' '''
return DataWriter(name, *args).run() return DataWriter(name, *args, snapshot_data=snapshot_data).run()
if __name__ == "__main__": if __name__ == "__main__":
......
...@@ -11,6 +11,11 @@ write_data( ...@@ -11,6 +11,11 @@ write_data(
'Demo_WriteData', 'Demo_WriteData',
Axis('x', 'x-array', 'a.u.', [1, 2, 3, 4]), Axis('x', 'x-array', 'a.u.', [1, 2, 3, 4]),
Data('y', 'y-array', 'a.u.', [3, 1, 4, 6]), Data('y', 'y-array', 'a.u.', [3, 1, 4, 6]),
snapshot_data={
"model": "some arbitrary data",
"param_a": 100,
"param_b": 10,
}
) )
write_data( write_data(
...@@ -19,7 +24,9 @@ write_data( ...@@ -19,7 +24,9 @@ write_data(
Axis('e12', 'detuning', 'mv', np.linspace(-10, 10, 5)), Axis('e12', 'detuning', 'mv', np.linspace(-10, 10, 5)),
Data('SD1', 'Sensor 1', 'mV', np.linspace(10, 20, 55).reshape((11, 5))), Data('SD1', 'Sensor 1', 'mV', np.linspace(10, 20, 55).reshape((11, 5))),
Data('SD2', 'Sensor 2', 'mV', np.linspace(0, -20, 55).reshape((11, 5))), Data('SD2', 'Sensor 2', 'mV', np.linspace(0, -20, 55).reshape((11, 5))),
snapshot_data={
"model": "linear numpy data",
"param_a": 100,
"param_b": 10,
}
) )
#%%
\ No newline at end of file
import json
import os
import time
import core_tools as ct
from core_tools.data.ds.reader import load_by_uuid
from core_tools.data.SQL.queries.dataset_gui_queries import query_for_measurement_results
from core_tools.data.SQL.SQL_connection_mgr import SQL_database_manager
from core_tools.data.ds.ds2xarray import ds2xarray
"""
TIP:
Switch project to 'Game', so all data is for Game
"""
class LiveExporter:
def __init__(self, project_name):
self.project_name = project_name
self.export_path = 'C:/measurements/export/Game/'
os.makedirs(self.export_path, exist_ok=True)
self.start_time = '2025-02-20'
self.active_ds = None
self.ds_written = 0
self.max_id = None
def export_ds_json(self, ds, variables: list[str] | None = None):
xds = ds2xarray(ds, snapshot=False)
if variables is not None:
xds = xds[vars]
d = xds.to_dict()
fname = self.export_path + f"ds_{ds.exp_uuid}.json"
tmp_file = fname + ".tmp"
with open(tmp_file, "w") as fp:
json.dump(d, fp, indent=1)
if os.path.exists(fname):
os.remove(fname)
os.rename(tmp_file, fname)
def export_active_ds(self):
active_ds = self.active_ds
if active_ds is not None:
active_ds.sync()
written = 0
for m_param in active_ds:
for name, descr in m_param:
written += descr.written()
if self.ds_written != written:
self.export_ds_json(active_ds)
self.ds_written = written
if active_ds.completed:
self.active_ds = None
self.ds_written = 0
def export_new_ds(self):
# New measurements detected. Do last update of active_ds
self.export_active_ds()
self.active_ds = None
self.ds_written = 0
res = query_for_measurement_results.search_query(
start_time=self.start_time,
name=None, # part of name
project=self.project_name, # optional
keywords=None, # optional
)
# Export all
ds = None
for e in res:
ds = load_by_uuid(e.uuid)
self. export_ds_json(ds)
self.start_time = ds.run_timestamp
self.max_id = ds.exp_id
# check if last ds needs updating
if ds is not None and not ds.completed:
self.active_ds = ds
def run(self):
try:
while True:
try:
self.export_active_ds()
# check for new measurements. This is a fast query on the database
new_max_id = query_for_measurement_results.detect_new_meaurements(
self.max_id,
project=self.project_name
)
if new_max_id != self.max_id:
self.export_new_ds()
time.sleep(0.1)
except Exception as ex:
"""
Things that could go wrong:
* writing or copying file fails because other application is accessing it.
"""
print(ex)
time.sleep(0.5)
except KeyboardInterrupt:
SQL_database_manager.disconnect()
raise
ct.configure('./setup_config/ct_config_laptop.yaml')
exporter = LiveExporter()
exporter.run()
from setuptools import setup, find_packages from setuptools import setup, find_packages
setup(name="core_tools", setup(name="core_tools",
version="1.5.10", version="1.5.11",
packages = find_packages(), packages = find_packages(),
python_requires=">=3.10", python_requires=">=3.10",
install_requires=[ install_requires=[
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
# github_url = "https://github.com/<user or organization>/<project>/" # github_url = "https://github.com/<user or organization>/<project>/"
[version] [version]
current = "1.5.10" current = "1.5.11"
# Example of a semver regexp. # Example of a semver regexp.
# Make sure this matches current_version before # Make sure this matches current_version before
......