diff --git a/core_tools/GUI/script_runner/commands.py b/core_tools/GUI/script_runner/commands.py new file mode 100644 index 0000000000000000000000000000000000000000..633448c7ce7fb7013543787c855d86fb1c9c50b9 --- /dev/null +++ b/core_tools/GUI/script_runner/commands.py @@ -0,0 +1,120 @@ +import inspect +import logging +import os +from enum import Enum + +from abc import ABC, abstractmethod + +try: + from spyder_kernels.customize.spydercustomize import runcell +except Exception: + runcell = None + + +logger = logging.getLogger(__name__) + + +class Command(ABC): + def __init__(self, name, parameters, defaults={}): + self.name = name + self.parameters = parameters + self.defaults = defaults + + @abstractmethod + def __call__(self): + pass + + +class Cell(Command): + ''' + A reference to an IPython cell with Python code to be run as command in + ScriptRunner. + + If command_name is not specified then cell+filename with be used as command name. + + Args: + cell: name or number of cell to run. + python_file: filename of Python file. + command_name: Optional name to show for the command in ScriptRunner. + ''' + + def __init__(self, cell: str | int, python_file: str, command_name: str | None = None): + filename = os.path.basename(python_file) + name = f'{cell} ({filename})' if command_name is None else command_name + super().__init__(name, {}) + self.cell = cell + self.python_file = python_file + if runcell is None: + raise Exception('runcell not available. Upgrade to Spyder 4+ to use Cell()') + + def __call__(self): + command = f"runcell({self.cell}, '{self.python_file}')" + # print(command) + logger.info(command) + runcell(self.cell, self.python_file) + + +class Function(Command): + ''' + A reference to a function to be run as command in ScriptRunner. + + The argument types of the function are displayed in the GUI. The entered + data is converted to the specified type. Currently the types `str`, `int`, `float` and `bool` + are supported. + If the type of the argument is not specified, then a string will be passed to the function. + + If command_name is not specified, then the name of func is used as + command name. + + The additional keyword arguments will be entered as default values in the GUI. + + Args: + func: function to execute. + command_name: Optional name to show for the command in ScriptRunner. + kwargs: default arguments to pass with function. + ''' + + def __init__(self, func: any, command_name: str | None = None, **kwargs): + if command_name is None: + command_name = func.__name__ + signature = inspect.signature(func) + parameters = {p.name: p for p in signature.parameters.values()} + defaults = {} + for name, parameter in parameters.items(): + if parameter.default is not inspect._empty: + defaults[name] = parameter.default + if parameter.annotation is inspect._empty: + logger.warning(f'No annotation for `{name}`, assuming string') + defaults.update(**kwargs) + super().__init__(command_name, parameters, defaults) + self.func = func + + def _convert_arg(self, param_name, value): + annotation = self.parameters[param_name].annotation + if annotation is inspect._empty: + # no type specified. Pass string. + return value + if isinstance(annotation, str): + raise Exception('Cannot convert to type specified as a string') + if issubclass(annotation, bool): + return value in [True, 1, 'True', 'true', '1'] + if issubclass(annotation, Enum): + return annotation[value] + return annotation(value) + + def __call__(self, **kwargs): + call_args = {} + for name in self.parameters: + try: + call_args[name] = self.defaults[name] + except KeyError: + pass + try: + call_args[name] = self._convert_arg(name, kwargs[name]) + except KeyError: + pass + args_list = [f'{name}={repr(value)}' for name, value in call_args.items()] + command = f'{self.func.__name__}({", ".join(args_list)})' + print(command) + logger.info(command) + return self.func(**call_args) diff --git a/core_tools/GUI/script_runner/script_runner_main.py b/core_tools/GUI/script_runner/script_runner_main.py index 1589200c224f699abb1a0649f7cc6fd5328196f5..204766326761926af04a5273b59a992f890f24a5 100644 --- a/core_tools/GUI/script_runner/script_runner_main.py +++ b/core_tools/GUI/script_runner/script_runner_main.py @@ -2,17 +2,15 @@ import logging import inspect import os from enum import Enum -from abc import ABC, abstractmethod from PyQt5 import QtCore, QtWidgets + +from core_tools.GUI.keysight_videomaps.liveplotting import liveplotting from core_tools.GUI.script_runner.script_runner_gui import Ui_MainWindow +from core_tools.GUI.script_runner.commands import Function, Cell +from core_tools.GUI.script_runner.web_server import run_web_server from core_tools.GUI.qt_util import qt_log_exception -from core_tools.GUI.keysight_videomaps.liveplotting import liveplotting -try: - from spyder_kernels.customize.spydercustomize import runcell -except Exception: - runcell = None logger = logging.getLogger(__name__) @@ -45,6 +43,7 @@ class ScriptRunner(QtWidgets.QMainWindow, Ui_MainWindow): super(QtWidgets.QMainWindow, self).__init__() self.setupUi(self) + self.video_mode_running = False self.video_mode_label = QtWidgets.QLabel("VideoMode: <unknown") self.video_mode_label.setMargin(2) @@ -96,10 +95,11 @@ class ScriptRunner(QtWidgets.QMainWindow, Ui_MainWindow): python_file: filename of Python file. command_name: Optional name to show for the command in ScriptRunner. ''' - if runcell is None: - raise Exception('runcell not available. Upgrade to Spyder 4+ to use add_cell()') self._add_command(Cell(cell, python_file, command_name)) + def run_server(self): + run_web_server(self.commands) + @qt_log_exception def closeEvent(self, event): self.timer.stop() @@ -213,112 +213,6 @@ class ScriptRunner(QtWidgets.QMainWindow, Ui_MainWindow): liveplotting.last_instance._2D_start_stop() -class Command(ABC): - def __init__(self, name, parameters, defaults={}): - self.name = name - self.parameters = parameters - self.defaults = defaults - - @abstractmethod - def __call__(self): - pass - - -class Cell(Command): - ''' - A reference to an IPython cell with Python code to be run as command in - ScriptRunner. - - If command_name is not specified then cell+filename with be used as command name. - - Args: - cell: name or number of cell to run. - python_file: filename of Python file. - command_name: Optional name to show for the command in ScriptRunner. - ''' - - def __init__(self, cell: str | int, python_file: str, command_name: str | None = None): - filename = os.path.basename(python_file) - name = f'{cell} ({filename})' if command_name is None else command_name - super().__init__(name, {}) - self.cell = cell - self.python_file = python_file - if runcell is None: - raise Exception('runcell not available. Upgrade to Spyder 4+ to use Cell()') - - def __call__(self): - command = f"runcell({self.cell}, '{self.python_file}')" - print(command) - logger.info(command) - runcell(self.cell, self.python_file) - - -class Function(Command): - ''' - A reference to a function to be run as command in ScriptRunner. - - The argument types of the function are displayed in the GUI. The entered - data is converted to the specified type. Currently the types `str`, `int`, `float` and `bool` - are supported. - If the type of the argument is not specified, then a string will be passed to the function. - - If command_name is not specified, then the name of func is used as - command name. - - The additional keyword arguments will be entered as default values in the GUI. - - Args: - func: function to execute. - command_name: Optional name to show for the command in ScriptRunner. - kwargs: default arguments to pass with function. - ''' - - def __init__(self, func: any, command_name: str | None = None, **kwargs): - if command_name is None: - command_name = func.__name__ - signature = inspect.signature(func) - parameters = {p.name: p for p in signature.parameters.values()} - defaults = {} - for name, parameter in parameters.items(): - if parameter.default is not inspect._empty: - defaults[name] = parameter.default - if parameter.annotation is inspect._empty: - logger.warning(f'No annotation for `{name}`, assuming string') - defaults.update(**kwargs) - super().__init__(command_name, parameters, defaults) - self.func = func - - def _convert_arg(self, param_name, value): - annotation = self.parameters[param_name].annotation - if annotation is inspect._empty: - # no type specified. Pass string. - return value - if isinstance(annotation, str): - raise Exception('Cannot convert to type specified as a string') - if issubclass(annotation, bool): - return value in [True, 1, 'True', 'true', '1'] - if issubclass(annotation, Enum): - return annotation[value] - return annotation(value) - - def __call__(self, **kwargs): - call_args = {} - for name in self.parameters: - try: - call_args[name] = self.defaults[name] - except KeyError: - pass - try: - call_args[name] = self._convert_arg(name, kwargs[name]) - except KeyError: - pass - args_list = [f'{name}={repr(value)}' for name, value in call_args.items()] - command = f'{self.func.__name__}({", ".join(args_list)})' - print(command) - logger.info(command) - return self.func(**call_args) - - if __name__ == "__main__": def sayHi(name: str, times: int = 1): @@ -343,7 +237,11 @@ if __name__ == "__main__": ui.add_function(fit, 'Fit it', mode=Mode.CENTER, x=1.0) ui.add_function(fit, 'Fit it', mode='center', x=1.0) ui.add_function(fit, 'Fit it', mode='CENTER', x=1.0) - ui.add_cell('Say Hi', path+'/test_script.py') - ui.add_cell(2, path+'/test_script.py', 'Magic Button') - ui.add_cell('Oops', path+'/test_script.py') - ui.add_cell('Syntax Error', path+'/test_script.py') + # ui.add_cell('Say Hi', path+'/test_script.py') + # ui.add_cell(2, path+'/test_script.py', 'Magic Button') + # ui.add_cell('Oops', path+'/test_script.py') + # ui.add_cell('Syntax Error', path+'/test_script.py') + + # NOTE: + # To start servicing http requests run: + ui.run_server() diff --git a/core_tools/GUI/script_runner/web_server.py b/core_tools/GUI/script_runner/web_server.py new file mode 100644 index 0000000000000000000000000000000000000000..bfa70d4c8f55dc53c417da62415b2df607628b5f --- /dev/null +++ b/core_tools/GUI/script_runner/web_server.py @@ -0,0 +1,147 @@ +import json +from datetime import datetime +from functools import cached_property +from http.cookies import SimpleCookie +from http.server import HTTPServer, BaseHTTPRequestHandler, HTTPStatus +from urllib.parse import parse_qsl, urlparse + +from core_tools.GUI.script_runner.commands import Command + + +class WebRequestHandler(BaseHTTPRequestHandler): + @cached_property + def url(self): + return urlparse(self.path) + + @cached_property + def query_data(self): + return dict(parse_qsl(self.url.query)) + + @cached_property + def post_data(self): + content_length = int(self.headers.get("Content-Length", 0)) + return self.rfile.read(content_length) + + @cached_property + def form_data(self): + return dict(parse_qsl(self.post_data.decode("utf-8"))) + + @cached_property + def cookies(self): + return SimpleCookie(self.headers.get("Cookie")) + + def log_request(self, code='-', size='-'): + pass + + def do_GET(self): + if self.url.path == "/functions": + # return list of functions with arguments + response = json.dumps(self.get_function_descriptions()).encode("utf-8") + self.send_response(HTTPStatus.OK) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(response) + else: + self.send_error(HTTPStatus.NOT_FOUND) + + def do_POST(self): + if self.url.path == "/run": + try: + data = self.form_data + cmd_name = data["__name__"] + kwargs = { + k: v + for k, v in data.items() if k != "__name__" + } + for command in self.server.commands: + if command.name == cmd_name: + try: + result = command(**kwargs) + except Exception as ex: + print(ex) + raise + break + else: + self.send_error(HTTPStatus.NOT_FOUND) + response = json.dumps(result).encode("utf-8") + self.send_response(HTTPStatus.OK) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(response) + except Exception as ex: + response = json.dumps(str(ex)).encode("utf-8") + self.send_response(HTTPStatus.BAD_REQUEST) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(response) + else: + self.send_error(HTTPStatus.NOT_FOUND) + + def get_function_descriptions(self): + functions = [] + for command in self.server.commands: + command: Command = command + parameters = command.parameters + functions.append({ + "__name__": command.name, + "args": [{ + "name": name, + "type": parameter.annotation.__name__, + } + for name, parameter in parameters.items()], + }) + return functions + + def get_response(self): + return json.dumps( + { + "now": datetime.now().isoformat(), + "path": self.url.path, + "query_data": self.query_data, + "post_data": self.post_data.decode("utf-8"), + "form_data": self.form_data, + "cookies": { + name: cookie.value + for name, cookie in self.cookies.items() + }, + } + ) + + +def run_web_server(command_list: list[Command]): + server_address = ('127.0.0.1', 8001) + httpd = HTTPServer(server_address, WebRequestHandler) + httpd.commands = command_list + try: + print(f"Server running at http://{server_address[0]}:{server_address[1]}") + print("Interrupt kernel to stop server") + httpd.serve_forever() + except KeyboardInterrupt: + httpd.shutdown() + + +# %% + +if __name__ == "__main__": + from enum import Enum + from core_tools.GUI.script_runner.commands import Function + + def sayHi(name: str, times: int = 1): + for _ in range(times): + print(f'Hi {name}') + return name + + class Mode(str, Enum): + LEFT = 'left' + CENTER = 'center' + RIGHT = 'right' + + def fit(x: float, mode: Mode): + print(f'fit {x}, {mode}') + + command_list = [ + Function(sayHi), + Function(fit), + ] + + run_web_server(command_list)