Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • qutech-qdlabs/core_tools
1 result
Show changes
Commits on Source (2)
Showing
with 594 additions and 119 deletions
...@@ -12,7 +12,11 @@ All notable changes to core_tools will be documented in this file. ...@@ -12,7 +12,11 @@ All notable changes to core_tools will be documented in this file.
- Added ```.../startup/sqdl_sync.py``` and ```.../startup/launch_sqdl_sync.py```, which fill the role of ```.../startup/db_sync.py``` and ```.../startup/launch_db_sync.py``` for the purpose of synchronising data to SQDL. - Added ```.../startup/sqdl_sync.py``` and ```.../startup/launch_sqdl_sync.py```, which fill the role of ```.../startup/db_sync.py``` and ```.../startup/launch_db_sync.py``` for the purpose of synchronising data to SQDL.
- Added local database versioning. - Added local database versioning.
- ... - Added a ```database_version``` table to the database schema.
- Added a routine that checks the currently available local database version agains the expectations of the current coretools version. If new database versions are available, the modifications are applied automatically when connecting to the local database.
- These additions are still compatible with older coretools version, whether users are syncing their local database to remote using ```db_sync``` or using a direct remote connection. (See Changed section below)
- Added a definition for coretools database v1.0.0 (replication of the current schema)
- Added a definition for coretools database v1.1.0 (includes the SQDL functionality, as well as a new Scope column, see Changed section below)
### Changed ### Changed
- Added a ```scope``` attribute to sample info. - Added a ```scope``` attribute to sample info.
...@@ -22,14 +26,12 @@ All notable changes to core_tools will be documented in this file. ...@@ -22,14 +26,12 @@ All notable changes to core_tools will be documented in this file.
- Added a warning about setting the ```scope``` config parameter to the core-tools start-up configuration method. - Added a warning about setting the ```scope``` config parameter to the core-tools start-up configuration method.
- Note that updating your remote database to include these columns is not require at this stage. The synchronisation process as defined by ```db_sync``` allows for this descrepancy between databases, facilitating a non-breaking upgrade. - Note that updating your remote database to include these columns is not require at this stage. The synchronisation process as defined by ```db_sync``` allows for this descrepancy between databases, facilitating a non-breaking upgrade.
- ... - Replaced the generate-table section of the DatabaseManager (see ```SQL_database_manager``` class in ```core_tools.data.SQL.SQL_database_mgr.py```) responsible for creating tables if they did not already exist, with the new versioning routine (see the Added section above).
### Depricated - Added optional configuration ```address``` for local database as well, allowing for non-default PostgreSQL installations. If not specified, the config parameter defaults to ```localhost:5432```, which used to the fixed value.
- Note that the connection manager uses the value ```localhost``` to distinguish between local and remote connections. In order to guarentee this funcionality, misconfiguration of local and/or remote database ```address``` values will result in a ```RuntimeError``` when running ```core_tools.configure("...")```.
### Removed - Updated some examples in the DemoStation to use modern Mocks.
### Fixed
- ...
## \[1.5.11] - 2025-02-20 ## \[1.5.11] - 2025-02-20
......
...@@ -4,6 +4,9 @@ from core_tools.data.SQL.queries.dataset_creation_queries import ( ...@@ -4,6 +4,9 @@ from core_tools.data.SQL.queries.dataset_creation_queries import (
measurement_overview_queries, measurement_overview_queries,
measurement_parameters_queries) measurement_parameters_queries)
from core_tools.data.SQL.queries.dataset_sync_queries import sync_mgr_queries from core_tools.data.SQL.queries.dataset_sync_queries import sync_mgr_queries
from core_tools.data.SQL.versioning import local_database_update_routine
import psycopg2 import psycopg2
import time import time
import logging import logging
...@@ -80,14 +83,19 @@ class SQL_database_manager(SQL_database_init): ...@@ -80,14 +83,19 @@ class SQL_database_manager(SQL_database_init):
if (not db_mgr.SQL_conn_info_local.readonly if (not db_mgr.SQL_conn_info_local.readonly
and SQL_conn_info_local.host == "localhost"): and SQL_conn_info_local.host == "localhost"):
conn_local = db_mgr.conn_local
sample_info_queries.generate_table(conn_local) conn_local = db_mgr.conn_local
sample_info_queries.add_sample(conn_local)
# # todo: check that database versioning 1.0 creates the same schema as this class
measurement_overview_queries.generate_table(conn_local) local_database_update_routine(conn_local)
measurement_overview_queries.update_local_table(conn_local)
measurement_parameters_queries.generate_table(conn_local) # sample_info_queries.generate_table(conn_local)
conn_local.commit() sample_info_queries.add_sample(conn_local)
# measurement_overview_queries.generate_table(conn_local)
# measurement_overview_queries.update_local_table(conn_local)
# measurement_parameters_queries.generate_table(conn_local)
conn_local.commit()
return SQL_database_manager.__instance return SQL_database_manager.__instance
@classmethod @classmethod
......
...@@ -37,6 +37,7 @@ CREATE INDEX IF NOT EXISTS date_day_index ON global_measurement_overview USING B ...@@ -37,6 +37,7 @@ CREATE INDEX IF NOT EXISTS date_day_index ON global_measurement_overview USING B
CREATE INDEX IF NOT EXISTS data_synced_index ON global_measurement_overview USING BTREE (data_synchronized); CREATE INDEX IF NOT EXISTS data_synced_index ON global_measurement_overview USING BTREE (data_synchronized);
CREATE INDEX IF NOT EXISTS table_synced_index ON global_measurement_overview USING BTREE (table_synchronized); CREATE INDEX IF NOT EXISTS table_synced_index ON global_measurement_overview USING BTREE (table_synchronized);
ALTER TABLE global_measurement_overview ADD COLUMN IF NOT EXISTS data_update_count INT DEFAULT 0;
--- Measurement Parameters --- Measurement Parameters
CREATE TABLE IF NOT EXISTS measurement_parameters ( CREATE TABLE IF NOT EXISTS measurement_parameters (
id SERIAL primary key, id SERIAL primary key,
......
from psycopg2._psycopg import cursor as Cursor
def initialise_v1_0_0(c: Cursor) -> None:
c.execute(
query="""
CREATE TABLE IF NOT EXISTS global_measurement_overview (
id SERIAL,
uuid BIGINT NOT NULL unique,
exp_name text NOT NULL,
set_up text NOT NULL,
project text NOT NULL,
sample text NOT NULL,
creasted_by text NOT NULL, -- database account used when ds was created
start_time TIMESTAMP,
stop_time TIMESTAMP,
exp_data_location text, -- Database table name of parameter table. Older datasets. [SdS]
snapshot BYTEA,
metadata BYTEA,
keywords JSONB,
starred BOOL DEFAULT False,
completed BOOL DEFAULT False,
data_size int, -- Total size of data. Is written at finish.
data_cleared BOOL DEFAULT False, -- Note [SdS]: Column is not used
data_update_count int DEFAULT 0, -- number of times the data has been updated on local client
data_synchronized BOOL DEFAULT False, -- data + param table sync'd
table_synchronized BOOL DEFAULT False, -- global_measurements_overview sync'd
sync_location text -- Note [SdS]: Column is abused for migration to new measurement_parameters table
);
"""
)
c.execute(
query="""
CREATE INDEX IF NOT EXISTS id_indexed ON global_measurement_overview USING BTREE (id);
CREATE INDEX IF NOT EXISTS uuid_indexed ON global_measurement_overview USING BTREE (uuid);
CREATE INDEX IF NOT EXISTS starred_indexed ON global_measurement_overview USING BTREE (starred);
CREATE INDEX IF NOT EXISTS date_day_index ON global_measurement_overview USING BTREE (project, set_up, sample);
"""
)
c.execute(
query="""
CREATE INDEX IF NOT EXISTS data_synced_index ON global_measurement_overview USING BTREE (data_synchronized);
CREATE INDEX IF NOT EXISTS table_synced_index ON global_measurement_overview USING BTREE (table_synchronized);
"""
)
c.execute(
query="""
ALTER TABLE global_measurement_overview ADD COLUMN IF NOT EXISTS data_update_count INT DEFAULT 0;
"""
)
c.execute(
query="""
CREATE TABLE IF NOT EXISTS measurement_parameters (
id SERIAL primary key,
exp_uuid BIGINT NOT NULL,
param_index INT NOT NULL,
param_id BIGINT,
nth_set INT,
nth_dim INT,
param_id_m_param BIGINT,
setpoint BOOL,
setpoint_local BOOL,
name_gobal text,
name text NOT NULL,
label text NOT NULL,
unit text NOT NULL,
depencies jsonb,
shape jsonb,
write_cursor INT,
total_size INT,
oid INT
);
"""
)
c.execute(
query="""
CREATE INDEX IF NOT EXISTS exp_uuid_index ON measurement_parameters USING BTREE (exp_uuid);
CREATE INDEX IF NOT EXISTS oid_index ON measurement_parameters USING BTREE (oid);
"""
)
c.execute(
query="""
CREATE TABLE IF NOT EXISTS sample_info_overview (
sample_info_hash TEXT NOT NULL UNIQUE,
set_up TEXT NOT NULL,
project TEXT NOT NULL,
sample TEXT NOT NULL
);
"""
)
from psycopg2._psycopg import cursor as Cursor
def update_to_v1_1_0(c: Cursor) -> None:
c.execute(
query="""
CREATE TABLE IF NOT EXISTS coretools_export_updates (
id INT GENERATED ALWAYS AS IDENTITY,
uuid BIGINT NOT NULL UNIQUE,
modify_count INT DEFAULT 0,
new_measurement BOOLEAN DEFAULT FALSE, -- not really needed, but overwrite all data.
data_changed BOOLEAN DEFAULT FALSE, -- export previews, export metadata.
completed BOOLEAN DEFAULT FALSE, -- export raw data
update_star BOOLEAN DEFAULT FALSE, -- compare with exported metadata.
update_name BOOLEAN DEFAULT FALSE, -- compare with exported metadata.
resume_after timestamp DEFAULT '2020-01-01 00:00:00',
fail_count INT DEFAULT 0,
PRIMARY KEY(id)
);
"""
)
c.execute(
query="""
CREATE INDEX IF NOT EXISTS qdl_export_updates_uuid_index ON coretools_export_updates USING BTREE (uuid);
"""
)
c.execute(
query="""
CREATE TABLE IF NOT EXISTS coretools_exported (
id INT GENERATED ALWAYS AS IDENTITY,
uuid BIGINT NOT NULL UNIQUE,
path TEXT,
measurement_start_time timestamp, -- export raw after timeout and not completed.
raw_final BOOLEAN DEFAULT FALSE, -- Set when completed or after timeout.
-- export state
export_state INT DEFAULT 0, -- (0:todo, 1:done, 99: failed),
export_errors TEXT,
PRIMARY KEY(id)
);
"""
)
c.execute(
query="""
CREATE INDEX IF NOT EXISTS coretools_exported_uuid_index ON coretools_exported USING BTREE (uuid);
"""
)
c.execute(
query="""
CREATE TABLE IF NOT EXISTS upload_task_queue (
idx INT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
task_iteration INT NOT NULL,
scope TEXT,
coretools_uid BIGINT UNIQUE NOT NULL,
dataset_path TEXT NOT NULL,
update_dataset BOOLEAN NOT NULL DEFAULT FALSE,
update_name BOOLEAN NOT NULL DEFAULT FALSE,
update_rating BOOLEAN NOT NULL DEFAULT FALSE,
is_ready BOOLEAN NOT NULL DEFAULT FALSE,
has_failed BOOLEAN NOT NULL DEFAULT FALSE,
should_retry BOOLEAN NOT NULL DEFAULT FALSE,
is_claimed_by INT
);
"""
)
c.execute(
query="""
CREATE INDEX IF NOT EXISTS task_queue_uid_index ON upload_task_queue (coretools_uid);
CREATE INDEX IF NOT EXISTS task_queue_is_claimed_by_has_failed_index ON upload_task_queue (is_claimed_by, has_failed);
CREATE INDEX IF NOT EXISTS task_queue_is_claimed_by_should_retry_index ON upload_task_queue (is_claimed_by, should_retry);
"""
)
c.execute(
query="""
CREATE TABLE IF NOT EXISTS upload_log (
idx INT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
scope TEXT,
ct_uid BIGINT NOT NULL,
upload_timestamp TIMESTAMP NOT NULL,
message TEXT NOT NULL CHECK (message <> '')
);
"""
)
c.execute(
query="""
CREATE INDEX IF NOT EXISTS upload_log_id_index ON upload_log (idx);
"""
)
c.execute(
query="""
CREATE TABLE IF NOT EXISTS sqdl_dataset (
idx INT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
scope TEXT NOT NULL,
coretools_uid BIGINT NOT NULL,
sqdl_uuid UUID NOT NULL
);
"""
)
c.execute(
query="""
CREATE INDEX IF NOT EXISTS qdl_dataset_uuid_index ON sqdl_dataset (sqdl_uuid);
"""
)
c.execute(
query="""
CREATE TABLE IF NOT EXISTS sqdl_file (
idx INT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
dataset_index INT REFERENCES sqdl_dataset (idx),
sqdl_uuid UUID UNIQUE NOT NULL,
filename TEXT NOT NULL,
last_modified INT NOT NULL
);
"""
)
c.execute(
query="""
CREATE INDEX IF NOT EXISTS qdl_file_dataset_id_index ON sqdl_file (dataset_index);
CREATE INDEX IF NOT EXISTS qdl_file_dataset_uuid_index ON sqdl_file (sqdl_uuid);
"""
)
c.execute(
query="""
CREATE TABLE IF NOT EXISTS database_version (
major SMALLINT UNIQUE NOT NULL,
minor SMALLINT UNIQUE NOT NULL,
patch SMALLINT UNIQUE NOT NULL
);
"""
)
c.execute(
query="""
INSERT INTO database_version (
major, minor, patch
) VALUES (
1, 1, 0
)
"""
)
c.execute(
query="""
ALTER TABLE global_measurement_overview ADD COLUMN IF NOT EXISTS scope TEXT DEFAULT NULL;
ALTER TABLE sample_info_overview ADD COLUMN IF NOT EXISTS scope TEXT DEFAULT NULL;
"""
)
...@@ -3,6 +3,7 @@ from core_tools.data.SQL.SQL_common_commands import insert_row_in_table, update_ ...@@ -3,6 +3,7 @@ from core_tools.data.SQL.SQL_common_commands import insert_row_in_table, update_
from core_tools.data.SQL.SQL_utility import generate_uuid from core_tools.data.SQL.SQL_utility import generate_uuid
from core_tools.data.SQL.connect import SQL_conn_info_local, sample_info from core_tools.data.SQL.connect import SQL_conn_info_local, sample_info
from core_tools.data.SQL.versioning import get_database_version, DatabaseVersion
import psycopg2, json import psycopg2, json
...@@ -114,23 +115,39 @@ class measurement_overview_queries: ...@@ -114,23 +115,39 @@ class measurement_overview_queries:
or not is_valid_info(sample_info.sample)): or not is_valid_info(sample_info.sample)):
raise Exception(f'Sample info not valid: {sample_info}') raise Exception(f'Sample info not valid: {sample_info}')
scope_value = sample_info.scope
if not is_valid_info(scope_value):
scope_value = None
uuid = generate_uuid() uuid = generate_uuid()
# NOTE: column sync_location is abused for migration to new format
var_names = ( if get_database_version(conn) >= DatabaseVersion(1, 1, 0):
# Scope column is introduced in v1.1.0, all Local databases automatically update.
# However, if only a Remote database is configured, then that one is not automatically updated
# for safety/compatibility reasons, which means we still need the 'no scope' variant of the query.
scope_value = sample_info.scope
if not is_valid_info(scope_value):
scope_value = None
var_names = (
'uuid', 'set_up', 'scope', 'project', 'sample', 'uuid', 'set_up', 'scope', 'project', 'sample',
'creasted_by', 'exp_name', 'sync_location', 'exp_data_location', 'creasted_by', 'exp_name', 'sync_location', 'exp_data_location',
'start_time') 'start_time'
)
var_values = ( var_values = (
uuid, str(sample_info.set_up), scope_value, str(sample_info.project), str(sample_info.sample), uuid, str(sample_info.set_up), scope_value, str(sample_info.project), str(sample_info.sample),
SQL_conn_info_local.user, exp_name, 'New measurement_parameters', '', SQL_conn_info_local.user, exp_name, 'New measurement_parameters', '',
psycopg2.sql.SQL("TO_TIMESTAMP({})").format(psycopg2.sql.Literal(start_time)) psycopg2.sql.SQL("TO_TIMESTAMP({})").format(psycopg2.sql.Literal(start_time))
) )
else:
var_names = (
'uuid', 'set_up', 'project', 'sample',
'creasted_by', 'exp_name', 'sync_location', 'exp_data_location',
'start_time'
)
var_values = (
uuid, str(sample_info.set_up), str(sample_info.project), str(sample_info.sample),
SQL_conn_info_local.user, exp_name, 'New measurement_parameters', '',
psycopg2.sql.SQL("TO_TIMESTAMP({})").format(psycopg2.sql.Literal(start_time))
)
# NOTE: column sync_location is abused for migration to new format
returning = ('id', 'uuid') returning = ('id', 'uuid')
query_outcome = insert_row_in_table(conn, measurement_overview_queries.table_name, query_outcome = insert_row_in_table(conn, measurement_overview_queries.table_name,
var_names, var_values, returning) var_names, var_values, returning)
......
import logging
from typing import Callable, Tuple, Dict
from core_tools.data.SQL.model.versions.v1_0_0 import initialise_v1_0_0
from core_tools.data.SQL.model.versions.v1_1_0 import update_to_v1_1_0
from psycopg2._psycopg import connection as Connection, cursor as Cursor, Error as PGError
from psycopg2.extras import RealDictCursor
logger = logging.getLogger(__name__)
UpdateOperation = Callable[[Cursor], None]
class DatabaseVersion:
def __init__(self, major: int, minor: int, patch: int):
self.major = major
self.minor = minor
self.patch = patch
def next_patch(self):
return DatabaseVersion(self.major, self.minor, self.patch + 1)
def next_minor(self):
return DatabaseVersion(self.major, self.minor + 1, 0)
def next_major(self):
return DatabaseVersion(self.major + 1, 0, 0)
def __eq__(self, other):
return (self.major == other.major) and (self.minor == other.minor) and (self.patch == other.patch)
def __lt__(self, other):
return (self.major < other.major) or (self.major == other.major and self.minor < other.minor) or (self.major == other.major and self.minor == other.minor and self.patch < other.patch)
def __ne__(self, other):
return not self.__eq__(other)
def __gt__(self, other):
return not (self.__eq__(other) or self.__lt__(other))
def __le__(self, other):
return self.__eq__(other) or self.__lt__(other)
def __ge__(self, other):
return not self.__lt__(other)
def __repr__(self) -> str:
return "{}.{}.{}".format(self.major, self.minor, self.patch)
def __hash__(self):
return hash((self.major, self.minor, self.patch))
__REQUIRED_DATABASE_VERSION__ = DatabaseVersion(1, 1, 0)
__UPDATE_PATH__: Dict[DatabaseVersion, UpdateOperation] = {
DatabaseVersion(1, 1, 0): update_to_v1_1_0,
}
def local_database_update_routine(conn: Connection):
version = get_database_version(conn)
if version.major == 0:
with conn:
cursor = conn.cursor()
initialise_v1_0_0(cursor)
version = DatabaseVersion(1, 0, 0)
if version < __REQUIRED_DATABASE_VERSION__:
logger.warning(
"Expected local database version {}, but found {}. Performing updates. This can take a while, depeninding on the setup.".format(__REQUIRED_DATABASE_VERSION__, version)
)
while version < __REQUIRED_DATABASE_VERSION__:
version = _update_database(conn, version)
def get_database_version(conn: Connection) -> DatabaseVersion:
try:
with conn:
c = conn.cursor(cursor_factory=RealDictCursor)
c.execute(
query="SELECT major, minor, patch FROM database_version",
)
records = c.fetchall()
assert len(records) == 1, "Either no or more than one entries in database_version table"
record = records[0]
return DatabaseVersion(major=record["major"], minor=record["minor"], patch=record["patch"])
except PGError as err:
if err.pgcode == "42P01":
# 42P01 is the psycopg2 error code for UndefinedTable
logger.debug("No table 'database_version' found, returning v0.0.0")
return DatabaseVersion(0, 0, 0)
raise err
def _update_database(conn: Connection, current: DatabaseVersion) -> DatabaseVersion:
next_version, update_operation = _check_for_database_updates(current)
logger.info("Attempting local database upgrade from version {} to {}".format(current, next_version))
_apply_database_update(conn, update_operation)
logger.info("Update successful.")
return next_version
def _check_for_database_updates(current: DatabaseVersion) -> Tuple[DatabaseVersion, UpdateOperation]:
if current.next_patch() in __UPDATE_PATH__:
next_version = current.next_patch()
elif current.next_minor() in __UPDATE_PATH__:
next_version = current.next_minor()
elif current.next_major() in __UPDATE_PATH__:
next_version = current.next_major()
else:
raise NotImplementedError("Unable to find a update path to version {}. Stuck at {}.".format(__REQUIRED_DATABASE_VERSION__, current))
return next_version, __UPDATE_PATH__[next_version]
def _apply_database_update(conn: Connection, update: UpdateOperation):
try:
with conn:
c = conn.cursor()
update(c)
except Exception as err:
logger.exception(
"Error during update. Changes are automatically rolled back to previous successful update. Quiting with the following error: {}".format(err)
)
raise err
...@@ -2,7 +2,6 @@ import gc ...@@ -2,7 +2,6 @@ import gc
import logging import logging
import psutil import psutil
import time import time
from collections.abc import Mapping
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Optional, Tuple, Dict from typing import Optional, Tuple, Dict
...@@ -15,7 +14,7 @@ from core_tools.data.ds.data_set import load_by_uuid ...@@ -15,7 +14,7 @@ from core_tools.data.ds.data_set import load_by_uuid
from core_tools.data.utils.timer import Timer from core_tools.data.utils.timer import Timer
from core_tools.data.sqdl.export.data_export import export_data from core_tools.data.sqdl.export.data_export import export_data
from core_tools.data.sqdl.export.data_preview import generate_previews from core_tools.data.sqdl.export.data_preview import generate_previews
from core_tools.data.sqdl.model import task_queue, export from core_tools.data.sqdl.model import core, task_queue, export
from core_tools.data.sqdl.model.task_queue import DatasetInfo from core_tools.data.sqdl.model.task_queue import DatasetInfo
from core_tools.data.sqdl.model.export import ExportAction from core_tools.data.sqdl.model.export import ExportAction
...@@ -36,8 +35,7 @@ class SqdlUpdate: ...@@ -36,8 +35,7 @@ class SqdlUpdate:
class Exporter: class Exporter:
def __init__(self, cfg: Dict, conn: Connection): def __init__(self, cfg: Dict, conn: Connection):
self.export_path = cfg.get('sqdl.export_path') self.export_path = "{}/export".format(cfg.get('sqdl.base_path', "~/.sqdl"))
# self.connection = SqlConnection()
self.connection = conn self.connection = conn
if cfg.get("sqdl.retry_failed_exports", default=False): if cfg.get("sqdl.retry_failed_exports", default=False):
...@@ -356,20 +354,26 @@ class Exporter: ...@@ -356,20 +354,26 @@ class Exporter:
return False return False
return True return True
def get_scope(self, project: str, set_up: str) -> str: def get_scope(self, coretools_uid: int) -> str:
try: # Info is never None here, because otherwise it wouldn't be listed for export
scope = self.scopes[project] info = core.get_measurement_info(self.connection, coretools_uid)
if isinstance(scope, Mapping): if info is None:
scope = scope[set_up] raise Exception("Failed to extract information for measurement with id '{}'".format(coretools_uid))
return scope # try:
except KeyError: # scope = self.scopes[project]
raise Exception(f"No scope for project '{project}'") # if isinstance(scope, Mapping):
# scope = scope[set_up]
# return scope
# except KeyError:
if info.scope is None:
raise Exception(f"No scope for measurement with ID '{coretools_uid}'")
return info.scope
def fix_setup_name(self, setup): def fix_setup_name(self, setup):
return self.setup_name_corrections.get(setup, setup) return self.setup_name_corrections.get(setup, setup)
def export_measurement(self, measurement, action: ExportAction) -> Tuple[SqdlUpdate, str]: def export_measurement(self, measurement, action: ExportAction) -> Tuple[SqdlUpdate, str]:
scope = self.get_scope(measurement.project, measurement.set_up) scope = self.get_scope(int(measurement.exp_uuid))
measurement.set_up = self.fix_setup_name(measurement.set_up) measurement.set_up = self.fix_setup_name(measurement.set_up)
updates = SqdlUpdate(measurement.exp_uuid, scope, raw_final=action.completed) updates = SqdlUpdate(measurement.exp_uuid, scope, raw_final=action.completed)
try: try:
......
from typing import List from dataclasses import dataclass
from typing import List, Optional
from psycopg2._psycopg import connection as Connection from psycopg2._psycopg import connection as Connection
@dataclass
class MeasurementInfo:
coretools_uid: int
sqdl_uuid: str
scope: str
experiment_name: str
starred: bool
completed: bool
def get_data_to_sync(conn: Connection) -> List[int]: def get_data_to_sync(conn: Connection) -> List[int]:
with conn: with conn:
c = conn.cursor() c = conn.cursor()
...@@ -65,3 +77,28 @@ def set_table_as_synced(conn: Connection, ct_uid: int) -> bool: ...@@ -65,3 +77,28 @@ def set_table_as_synced(conn: Connection, ct_uid: int) -> bool:
) )
synced = c.rowcount == 1 synced = c.rowcount == 1
return synced return synced
def get_measurement_info(conn: Connection, coretools_uid: int) -> Optional[MeasurementInfo]:
statement = """
SELECT overview.uuid, datasets.sqdl_uuid, overview.scope, overview.exp_name, overview.starred, overview.completed
FROM global_measurement_overview AS overview
JOIN sqdl_dataset AS datasets
ON overview.uuid = datasets.coretools_uid
WHERE overview.uuid = %(ct-uid)s;
"""
parameters = {
"ct-uid": coretools_uid
}
with conn:
cur = conn.cursor()
cur.execute(
query=statement,
vars=parameters,
)
result = cur.fetchone()
if result is not None:
return MeasurementInfo(*result)
return None
...@@ -86,15 +86,6 @@ def get_files_for_dataset(conn: Connection, parent_idx: int) -> List[SQDLFile]: ...@@ -86,15 +86,6 @@ def get_files_for_dataset(conn: Connection, parent_idx: int) -> List[SQDLFile]:
""" """
Get all the SQDLFile entries associated with the SQDLDataset that has the provided index. Get all the SQDLFile entries associated with the SQDLDataset that has the provided index.
""" """
def parse_row(row: Tuple) -> SQDLFile:
return SQDLFile(
index=row[0],
dataset_index=row[1],
sqdl_uuid=row[2],
filename=row[3],
last_modified=[4]
)
with conn: with conn:
c = conn.cursor() c = conn.cursor()
c.execute( c.execute(
...@@ -106,14 +97,13 @@ def get_files_for_dataset(conn: Connection, parent_idx: int) -> List[SQDLFile]: ...@@ -106,14 +97,13 @@ def get_files_for_dataset(conn: Connection, parent_idx: int) -> List[SQDLFile]:
vars={"parent": parent_idx} vars={"parent": parent_idx}
) )
records = c.fetchall() records = c.fetchall()
return [parse_row(r) for r in records] return [SQDLFile(*r) for r in records]
def create_or_update_file(conn: Connection, parent_idx: int, sqdl_uuid: UUID, filename: str, last_modified: int) -> None: def create_or_update_file(conn: Connection, parent_idx: int, sqdl_uuid: UUID, filename: str, last_modified: int) -> None:
""" """
Create new SQDLFile entry. If entry with UUID already exists, update the last-modified timestamp instead. Create new SQDLFile entry. If entry with UUID already exists, update the last-modified timestamp instead.
""" """
# todo: original uploader registry also updates uuid on conflict, but uuid is at all times the conflicting column, so that should do nothing.
with conn: with conn:
c = conn.cursor() c = conn.cursor()
c.execute( c.execute(
......
...@@ -6,7 +6,7 @@ def read(conn: Connection) -> str: ...@@ -6,7 +6,7 @@ def read(conn: Connection) -> str:
""" """
query = """ query = """
SELECT major, minor, patch SELECT major, minor, patch
FROM coretools_version; FROM database_version;
""" """
with conn: with conn:
c = conn.cursor() c = conn.cursor()
......
import os
import time import time
import logging import logging
from typing import Optional from typing import Optional
...@@ -8,7 +9,9 @@ from core_tools.data.SQL.SQL_connection_mgr import SQL_database_init as Database ...@@ -8,7 +9,9 @@ from core_tools.data.SQL.SQL_connection_mgr import SQL_database_init as Database
from core_tools.data.sqdl.export.coretools_export import Exporter from core_tools.data.sqdl.export.coretools_export import Exporter
from core_tools.data.sqdl.uploader.sqdl_uploader import SqdlUploader as Uploader from core_tools.data.sqdl.uploader.sqdl_uploader import SqdlUploader as Uploader
from core_tools.data.sqdl.model import core, export, version from core_tools.data.SQL.versioning import get_database_version, __REQUIRED_DATABASE_VERSION__
from core_tools.data.sqdl.model import core, export
from core_tools.data.sqdl.model.export import SyncStatus from core_tools.data.sqdl.model.export import SyncStatus
import sqdl_client import sqdl_client
...@@ -36,18 +39,28 @@ class SQDLWriter(): ...@@ -36,18 +39,28 @@ class SQDLWriter():
self.database = DatabaseInit() self.database = DatabaseInit()
self.database._connect() self.database._connect()
if not self.database.local_conn_active: if not self.database.local_conn_active:
raise ValueError("database not configured to a local database instance") raise ValueError("Only remote database configured. Setup not compatible with SQDL sync.")
self.connection = self.database.conn_local self.connection = self.database.conn_local
self.validate_version() self.validate_version()
base_path = config.get("sqdl.base_path", "~/.sqdl")
self.base_path = os.path.expanduser(base_path)
os.makedirs(self.base_path, exist_ok=True)
os.makedirs("{}/export".format(base_path), exist_ok=True)
# initialise # initialise
self.exporter = Exporter( self.exporter = Exporter(
cfg=config, cfg=config,
conn=self.connection conn=self.connection
) )
self.dev_mode = config.get("sqdl.dev_mode", default=True) self.dev_mode = config.get("sqdl.dev_mode", default=True)
self.use_personal_login = config.get("sqdl.use_personal_login", default=False)
if self.dev_mode: if self.dev_mode:
logger.info("Initialising SQDL Writer/Client in developer mode...") logger.info("Initialising SQDL Writer/Client in developer mode...")
self.uploader = Uploader( self.uploader = Uploader(
cfg=config, cfg=config,
conn=self.connection, conn=self.connection,
...@@ -76,6 +89,13 @@ class SQDLWriter(): ...@@ -76,6 +89,13 @@ class SQDLWriter():
self.exporter.connection = self.connection self.exporter.connection = self.connection
self.uploader.connection = self.connection self.uploader.connection = self.connection
# do not use 'login' functionality when doing local development
if self.use_personal_login and not self.dev_mode:
self.uploader.client.login()
elif not self.use_personal_login and not self.dev_mode:
key = self.read_local_api_key()
self.uploader.client.use_api_key(key)
self.is_running = True self.is_running = True
self.next_tick = datetime.datetime.now() + self.tick_rate self.next_tick = datetime.datetime.now() + self.tick_rate
...@@ -102,6 +122,7 @@ class SQDLWriter(): ...@@ -102,6 +122,7 @@ class SQDLWriter():
finally: finally:
self.database._disconnect() self.database._disconnect()
self.uploader.client.logout()
logger.info("Stopping SQDL Writer event loop...") logger.info("Stopping SQDL Writer event loop...")
def queue_datasets_for_export(self) -> None: def queue_datasets_for_export(self) -> None:
...@@ -113,8 +134,6 @@ class SQDLWriter(): ...@@ -113,8 +134,6 @@ class SQDLWriter():
for ct_uid in uids_for_data_to_update: for ct_uid in uids_for_data_to_update:
logger.debug("sync data for core-tools UID: '{}'".format(ct_uid)) logger.debug("sync data for core-tools UID: '{}'".format(ct_uid))
# todo: these two belong together, right?
export.export_changed_data(self.connection, ct_uid) export.export_changed_data(self.connection, ct_uid)
core.set_data_as_synced(self.connection, ct_uid) core.set_data_as_synced(self.connection, ct_uid)
...@@ -136,70 +155,46 @@ class SQDLWriter(): ...@@ -136,70 +155,46 @@ class SQDLWriter():
""" """
Select relevant data from 'global_measurement_overview' to use in data syncronisation. Select relevant data from 'global_measurement_overview' to use in data syncronisation.
""" """
# todo: check local data agains SQDL remote data
# todo: revise how changes in name and rating are handled, because without the intermediary remote database, we lose our method for tracking changes # todo: revise how changes in name and rating are handled, because without the intermediary remote database, we lose our method for tracking changes
# in the current solution, 'local' becomes the authority on name and rating, which is not what we want # in the current solution, 'local' becomes the authority on name and rating, which is not what we want
statement = """
SELECT overview.uuid, overview.scope, overview.exp_name, overview.starred, overview.completed, datasets.sqdl_uuid
FROM global_measurement_overview AS overview
JOIN sqdl_dataset AS datasets
ON overview.uuid = datasets.coretools_uid
WHERE overview.uuid = %(ct-uid)s;
"""
parameters = {
"ct-uid": uuid
}
result = None
with self.connection:
cur = self.connection.cursor()
cur.execute(
query=statement,
vars=parameters,
)
result = cur.fetchone()
if result is None:
logger.error("Failed to fetch data, or no entry exists with uuid '{}'".format(uuid))
return None
ct_uid, scope_name, ct_name, ct_star, ct_complete, sqdl_uuid = result info = core.get_measurement_info(self.connection, uuid)
if scope_name is None: if info is None:
logger.warning("No Scope parameter for CoreTools UID '{}'. Skipping SQDL Sync.".format(ct_uid)) logger.error("Failed to fetch data, or no entry exists with uuid '{}'".format(uuid))
return None return None
# do not use 'login' functionality when doing local development if info.scope is None:
if not self.dev_mode: logger.warning("No Scope parameter for CoreTools UID '{}'. Skipping SQDL Sync.".format(info.coretools_uid))
self.uploader.client.login() return None
scope_api: sqdl_client.api.v1.scope.ScopeAPI = self.uploader.client.api.scope scope_api: sqdl_client.api.v1.scope.ScopeAPI = self.uploader.client.api.scope
scope = scope_api.retrieve_from_name(scope_name) scope = scope_api.retrieve_from_name(info.scope)
try: try:
dataset: sqdl_client.api.v1.dataset.Dataset = scope.retrieve_dataset_from_uid(str(ct_uid)) dataset: sqdl_client.api.v1.dataset.Dataset = scope.retrieve_dataset_from_uid(str(info.coretools_uid))
except sqdl_client.exceptions.ObjectNotFoundException: except sqdl_client.exceptions.ObjectNotFoundException:
logger.info("No dataset with CoreTools UID '{}'. Creating new export.".format(ct_uid)) logger.info("No dataset with CoreTools UID '{}'. Creating new export.".format(info.coretools_uid))
metadata = SyncStatus( sync_status = SyncStatus(
is_new=True, is_new=True,
is_complete=ct_complete, is_complete=info.completed,
) )
return metadata return sync_status
metadata = SyncStatus( sync_status = SyncStatus(
is_new=False, is_new=False,
is_complete=ct_complete, is_complete=info.completed,
changed_name=ct_name != dataset.name, changed_name=info.experiment_name != dataset.name,
changed_rating=ct_star != (dataset.rating > 0) changed_rating=info.starred != (dataset.rating > 0)
) )
return metadata return sync_status
def validate_version(self) -> None: def validate_version(self) -> None:
""" """
Assert that the local database version matches requirements. Assert that the local database version matches requirements.
""" """
database_version = version.read(self.connection) version = get_database_version(self.connection)
assert version == __REQUIRED_DATABASE_VERSION__, "Local database is not up to date (expected '{}', found '{}'). Cannot sync to SQDL.".format(__REQUIRED_DATABASE_VERSION__, version)
assert database_version == __database_version__, "Database is not up to date: expected '{}', found '{}'".format(__database_version__, database_version)
def sleep_to_limit_rate(self) -> None: def sleep_to_limit_rate(self) -> None:
""" """
...@@ -212,6 +207,26 @@ class SQDLWriter(): ...@@ -212,6 +207,26 @@ class SQDLWriter():
time.sleep(seconds) time.sleep(seconds)
self.next_tick = datetime.datetime.now() + self.tick_rate self.next_tick = datetime.datetime.now() + self.tick_rate
def read_local_api_key(self) -> Optional[str]:
env_file = "{}/.env".format(self.base_path)
if not os.path.exists(env_file):
logger.warning("No .env file found. Check the 'Using SQDL' section of the documentation, or ask your local Admin for the right credentials.")
return
with open(env_file) as file:
lines = file.readlines()
for line in lines:
if line.startswith("#"):
continue
key, value = line.strip().split(set="=", maxsplit=1)
if key == "API_KEY":
return value
logger.warning("Found .env file, but unable to extract parameter 'API_KEY'. Check the 'Using SQDL' section of the documentation for more details.")
return
def reconnect(self): def reconnect(self):
self.database._disconnect() self.database._disconnect()
self.database._connect() self.database._connect()
......
...@@ -44,7 +44,6 @@ class MetadataFormatter: ...@@ -44,7 +44,6 @@ class MetadataFormatter:
'variables_measured': self.validate(desc['vars']), 'variables_measured': self.validate(desc['vars']),
'dimensions': self.validate(desc['dims']), 'dimensions': self.validate(desc['dims']),
# TODO more metadata ? # TODO more metadata ?
"fridge": "test-parameter", # todo:
} }
if "project" in desc: if "project" in desc:
metadata["project"] = desc["project"] metadata["project"] = desc["project"]
......
...@@ -2,7 +2,6 @@ import logging ...@@ -2,7 +2,6 @@ import logging
import os import os
import re import re
import time import time
from collections.abc import Mapping
from datetime import datetime from datetime import datetime
from typing import Dict from typing import Dict
...@@ -14,7 +13,7 @@ from core_tools.data.sqdl.model import log, upload, task_queue ...@@ -14,7 +13,7 @@ from core_tools.data.sqdl.model import log, upload, task_queue
import psutil import psutil
import core_tools as ct import core_tools as ct
from core_tools.startup.config import get_configuration
from sqdl_client.api.v1.dataset import Dataset from sqdl_client.api.v1.dataset import Dataset
from sqdl_client.api.v1.file import File from sqdl_client.api.v1.file import File
from sqdl_client.client import QDLClient from sqdl_client.client import QDLClient
...@@ -36,17 +35,15 @@ class SqdlUploader: ...@@ -36,17 +35,15 @@ class SqdlUploader:
self.client = QDLClient() self.client = QDLClient()
else: else:
self.client = client self.client = client
api_key = cfg.get('sqdl.api_key')
if api_key:
self.client.use_api_key(api_key)
self.connection = conn self.connection = conn
self.metadata_formatter = MetadataFormatter() self.metadata_formatter = MetadataFormatter()
# load scopes to fix them when not set during export
self.scopes = cfg.get('sqdl.scopes', {}) # load scope to fix them when not set during export
self.local_scope = cfg.get('scope', None)
if cfg.get('sqdl.retry_failed_uploads', False): if cfg.get('sqdl.retry_failed_uploads', False):
task_queue.retry_all_failed(self.connection) task_queue.retry_all_failed(self.connection)
self.pid = os.getpid() self.pid = os.getpid()
self.cleanup_abandoned_tasks() self.cleanup_abandoned_tasks()
logger.info(f"Started uploader, pid:{self.pid}") logger.info(f"Started uploader, pid:{self.pid}")
...@@ -133,13 +130,15 @@ class SqdlUploader: ...@@ -133,13 +130,15 @@ class SqdlUploader:
def get_scope(self, desc): def get_scope(self, desc):
# is it in the json file? # is it in the json file?
scope = desc.get('scope') scope = desc.get('scope', None)
if not scope: if scope is None:
scope = self.scopes.get(desc['project']) scope = self.local_scope
if isinstance(scope, Mapping): # if not scope:
scope = scope.get(desc['setup']) # scope = self.scopes.get(desc['project'])
if not scope: # if isinstance(scope, Mapping):
raise NoScopeError(desc['project']) # scope = scope.get(desc['setup'])
if scope is None:
raise NoScopeError(desc['uid'])
return scope return scope
def get_create_sqdl_dataset(self, scope_name, desc) -> Dataset: def get_create_sqdl_dataset(self, scope_name, desc) -> Dataset:
......
...@@ -55,7 +55,7 @@ def _generate_log_file_name(): ...@@ -55,7 +55,7 @@ def _generate_log_file_name():
def _configure_logging(cfg): def _configure_logging(cfg):
if cfg.get('logging.disabled', False): if cfg.get('logging.disabled', False):
return return
path = cfg.get('logging.file_location', '~/.core_tools') path = cfg.get('logging.file_location', '~/.core_tools/logs')
file_level = cfg.get('logging.file_level', 'INFO') file_level = cfg.get('logging.file_level', 'INFO')
console_level = cfg.get('logging.console_level', 'WARNING') console_level = cfg.get('logging.console_level', 'WARNING')
logger_levels = cfg.get('logging.logger_levels', {}) logger_levels = cfg.get('logging.logger_levels', {})
......
...@@ -31,6 +31,8 @@ def _config_remote_db(readonly): ...@@ -31,6 +31,8 @@ def _config_remote_db(readonly):
dbname = cfg['remote_database.database'] dbname = cfg['remote_database.database']
address = cfg['remote_database.address'] address = cfg['remote_database.address']
host,port = address.split(':') host,port = address.split(':')
if host == "localhost":
raise RuntimeError("Illegal host name '{}' for remote database. Not allowed to be 'localhost'.".format(host))
SQL_conn_info_remote(host, int(port), SQL_conn_info_remote(host, int(port),
user, passwd, dbname, user, passwd, dbname,
readonly) readonly)
......
...@@ -29,14 +29,20 @@ Configuration for SQDL functionality is specified in the ```sqdl``` section of y ...@@ -29,14 +29,20 @@ Configuration for SQDL functionality is specified in the ```sqdl``` section of y
```yaml ```yaml
sqdl: sqdl:
tick_rate: 6 # (int) Minimal period of the event loop in seconds tick_rate: 6 # (int) Minimum period of the event loop in seconds
dev_mode: false # (bool) Whether or not to use the sqdl-client developer mode for local testing
retry_failed_exports: false # (bool) Whether or not to retry exporting previously failed exports when re-initialising the SQDL Writer retry_failed_exports: false # (bool) Whether or not to retry exporting previously failed exports when re-initialising the SQDL Writer
retry_failed_uploads: false # (bool) Whether or not to retry uploading previously failed uploads when re-initialising the SQDL Writer retry_failed_uploads: false # (bool) Whether or not to retry uploading previously failed uploads when re-initialising the SQDL Writer
export_path: ~/.sqdl-export # (str) Local (absolute) path where the exported data files will be saved before uploading base_path: ~/.sqdl # (str) Local path where the exported data files will be saved before uploading
api_key: None # (str) API key used to authenticate with use_personal_login: false # (bool) Whether or not to use personal credentials for singing into SQDL
dev_mode: false # (bool) Whether or not to use the sqdl-client developer mode for local testing
setup_name_correction: # (section) Section used for renaming setups setup_name_correction: # (section) Section used for renaming setups
from: to # (str) Represent local setup 'from' as 'to' in storage from: to # (str) Represent local setup 'from' as 'to' in storage
``` ```
Additionally, the ```scope``` parameter is checked for at the top level, just like ```project```, ```setup``` and ```sample```. Additionally, the ```scope``` parameter is checked for at the top level, just like ```project```, ```setup``` and ```sample```.
### SQDL Login
By default, the SQDL Writer is configured to be authenticated through the use of an API key. This key has to be provided by your local administrator, and should be unique to your setup.
This means that every measurement tool has it's own API key, and it should not by copied between systems.
If you are not using the SQDL Writer from a shared system, it is possible to use your personal credentials to log into SQDL. To do so, set the ```use_personal_login``` parameter to true.