Skip to content
Snippets Groups Projects
Commit ac1c84be authored by Daan Bijl's avatar Daan Bijl
Browse files

first running draft of incorporating sqdl-coretools-sync into core-tools

parent 08d18b4f
No related branches found
No related tags found
No related merge requests found
...@@ -56,44 +56,42 @@ class Exporter: ...@@ -56,44 +56,42 @@ class Exporter:
self.export_path = cfg.get('export.path') self.export_path = cfg.get('export.path')
self.inter_ds_delay = float(cfg.get('export.delay')) self.inter_ds_delay = float(cfg.get('export.delay'))
self.connection = SqlConnection() self.connection = SqlConnection()
self.uploader_db = UploaderDb(cfg.get('uploader.db_path')) self.uploader_db = UploaderDb(cfg)
self.uploader_queue = UploaderTaskQueue(self.uploader_db) self.uploader_queue = UploaderTaskQueue(self.uploader_db)
self.scopes = cfg.get('export.scopes', {}) self.scopes = cfg.get('export.scopes', {})
self.setup_name_corrections = cfg.get('export.setup_name_corrections', {}) self.setup_name_corrections = cfg.get('export.setup_name_corrections', {})
def run(self) -> None: self.no_action_count = 0
no_action_cnt = 0 self.loop_count = 0
loop_cnt = 0
process = psutil.Process() self.process = psutil.Process()
while True:
try: def poll(self) -> None:
loop_cnt += 1 try:
done_work = self.export_one() self.loop_count += 1
if not done_work: done_work = self.export_one()
if no_action_cnt == 0: if not done_work:
self.timer.log_times() if self.no_action_count == 0:
if (no_action_cnt % 100) == 0: self.timer.log_times()
logger.info('Nothing to export') if (self.no_action_count % 100) == 0:
no_action_cnt += 1 logger.info('Nothing to export')
time.sleep(0.2) self.no_action_count += 1
else: else:
no_action_cnt = 0 self.no_action_count = 0
unreachable = gc.collect() unreachable = gc.collect()
logger.info(f"GC unreachable: {unreachable} counts:{gc.get_count()} {gc.get_freeze_count()}") logger.info(f"GC unreachable: {unreachable} counts:{gc.get_count()} {gc.get_freeze_count()}")
logger.info(f"MEM: {process.memory_info()}") logger.info(f"MEM: {self.process.memory_info()}")
if loop_cnt % 1_000 == 0: if self.loop_count % 1_000 == 0:
logger.info("Close database connection to free memory") logger.info("Close database connection to free memory")
self.connection.close() self.connection.close()
unreachable = gc.collect() unreachable = gc.collect()
logger.info(f"GC2 unreachable: {unreachable} counts:{gc.get_count()} {gc.get_freeze_count()}") logger.info(f"GC2 unreachable: {unreachable} counts:{gc.get_count()} {gc.get_freeze_count()}")
logger.info(f"MEM2: {process.memory_info()}") logger.info(f"MEM2: {self.process.memory_info()}")
except (psycopg2.Error, psycopg2.Warning): except (psycopg2.Error, psycopg2.Warning):
logger.error("Database error", exc_info=True) logger.error("Database error", exc_info=True)
time.sleep(2.0) except Exception:
except Exception: logger.error("Unanticipated error", exc_info=True)
logger.error("Unanticipated error", exc_info=True)
time.sleep(2.0)
def export_one(self): def export_one(self):
self.timer = Timer() self.timer = Timer()
...@@ -401,7 +399,7 @@ def main(configuration_file: str): ...@@ -401,7 +399,7 @@ def main(configuration_file: str):
exporter = Exporter(cfg) exporter = Exporter(cfg)
if cfg.get('export.retry_failed', False): if cfg.get('export.retry_failed', False):
exporter.retry_failed_exports() exporter.retry_failed_exports()
exporter.run() exporter.poll()
except Exception: except Exception:
logger.error('Error running exporter', exc_info=True) logger.error('Error running exporter', exc_info=True)
raise raise
......
...@@ -98,7 +98,7 @@ class UploadLog(Base): ...@@ -98,7 +98,7 @@ class UploadLog(Base):
id: Mapped[int] = mapped_column(primary_key=True) id: Mapped[int] = mapped_column(primary_key=True)
scope: Mapped[str | None] scope: Mapped[str | None]
ds_uid: Mapped[int] ds_uid: Mapped[int] = mapped_column(BigInteger)
upload_time: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow) upload_time: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)
message: Mapped[str] # new dataset, uploaded files, exception xxx. message: Mapped[str] # new dataset, uploaded files, exception xxx.
......
...@@ -44,8 +44,8 @@ class SqdlUploader: ...@@ -44,8 +44,8 @@ class SqdlUploader:
if api_key: if api_key:
self.client.use_api_key(api_key) self.client.use_api_key(api_key)
db_path = self.cfg.get('uploader.database', '~/.sqdl_uploader/uploader.db') # db_path = self.cfg.get('uploader.database', '~/.sqdl_uploader/uploader.db')
self.db = UploaderDb(db_path) self.db = UploaderDb(self.cfg)
self.task_queue = UploaderTaskQueue(self.db) self.task_queue = UploaderTaskQueue(self.db)
self.upload_registry = UploadRegistry(self.db) self.upload_registry = UploadRegistry(self.db)
self.logger = UploadLogger(self.db) self.logger = UploadLogger(self.db)
...@@ -58,6 +58,8 @@ class SqdlUploader: ...@@ -58,6 +58,8 @@ class SqdlUploader:
self.cleanup_abandoned_tasks() self.cleanup_abandoned_tasks()
logger.info(f"Started uploader, pid:{self.pid}") logger.info(f"Started uploader, pid:{self.pid}")
self.idle_cnt = 0
def process_task(self) -> bool: def process_task(self) -> bool:
start = time.perf_counter() start = time.perf_counter()
task = self.task_queue.get_oldest_task(self.pid) task = self.task_queue.get_oldest_task(self.pid)
...@@ -269,23 +271,19 @@ class SqdlUploader: ...@@ -269,23 +271,19 @@ class SqdlUploader:
if not alive: if not alive:
self.task_queue.release_task(task) self.task_queue.release_task(task)
def run(self) -> None: def poll(self) -> None:
# NOTE: KeyboardInterrupt and SystemExit will not be caught. # NOTE: KeyboardInterrupt and SystemExit will not be caught.
idle_cnt = 0 try:
while True: work_done = self.process_task()
try: if not work_done:
work_done = self.process_task() self.idle_cnt += 1
if not work_done: if self.idle_cnt % 300 == 0:
idle_cnt += 1 logger.info('Nothing to upload')
if idle_cnt % 300 == 0: else:
logger.info('Nothing to upload') self.idle_cnt = 0
time.sleep(0.2) except Exception:
else: # anticipated causes: database connection failure when trying to get task.
idle_cnt = 0 logger.error('Task processing failed', exc_info=True)
except Exception:
# anticipated causes: database connection failure when trying to get task.
logger.error('Task processing failed', exc_info=True)
time.sleep(1.0)
def fix_filename(filename): def fix_filename(filename):
...@@ -302,7 +300,7 @@ def main(configuration_file: str, client: QDLClient = None): ...@@ -302,7 +300,7 @@ def main(configuration_file: str, client: QDLClient = None):
cfg = get_configuration() cfg = get_configuration()
try: try:
uploader = SqdlUploader(cfg, client=client) uploader = SqdlUploader(cfg, client=client)
uploader.run() uploader.poll()
except Exception: except Exception:
logger.error('Error running exporter', exc_info=True) logger.error('Error running exporter', exc_info=True)
raise raise
......
import time
import logging
from typing import Tuple, Optional
import datetime
from core_tools.startup.config import get_configuration
from core_tools.data.SQL.SQL_connection_mgr import SQL_database_init as DatabaseInit
from core_tools.data.SQL.queries.dataset_sync_queries import sync_mgr_queries
from core_tools.data.export.coretools_export import Exporter
from core_tools.data.sqdl.sqdl_uploader import SqdlUploader as Uploader
import sqdl_client
from sqdl_client.client import QDLClient
from sqdl_client.utils.fakes.fake_client import QDLFake
logger = logging.getLogger(__name__)
def _create_fake_client() -> QDLClient:
schema_name = "coretools-default"
qdl_fake = QDLFake()
qdl_fake.add_schema(
schema_name,
'''
measurement_data(
setup(min_length=5, type=str),
sample(min_length=5),
variables_measured(type=list),
dimensions(type=list))
''')
for scope_name in ["Test",]:
qdl_fake.add_scope(scope_name, 'Scope to test QDL', schema_name)
return qdl_fake.client
class SQDLWriter():
"""
Event loop that polls for measurement data to be uploaded to SQDL.
Expects the core-tools configurations to be initialised (see core-tools/startup/config.py).
Start the loop using the 'run' method.
"""
def __init__(self):
self.config = get_configuration()
self.database = DatabaseInit()
self.database._connect()
self.create_export_tables_if_not_exist() # done: create required tables if they do not exist yet
# done: make sure that exporter creates the required tables --> already did using SQLalchemy ORM
self.exporter = Exporter(self.config)
client = None
if self.config.get("use-sqdl-testing-environment", default=True): # to-do: change default to False when wrapping up
logger.info("Using fake SQDL Client for testing purposes")
client = _create_fake_client()
self.uploader = Uploader(self.config, client=client)
self.tick_rate = datetime.timedelta(
seconds=self.config.get("tick_rate", default=10)
)
self.is_running = False
def run(self):
"""
Start the SQDL Writer event loop.
"""
self.database._connect() # okay to skip disconnect, since the loop only stops at Writer shutdown (otherwise, use try-finally block or create context manager)
self.is_running = True
next_tick = datetime.datetime.now() + self.tick_rate
while self.is_running:
# done: identify data that needs to be handled
uuids_for_data_to_update = sync_mgr_queries.get_sync_items_raw_data(self.database) # checks 'data-synchronised' column value
# validate that the work done by db-sync does not significantly alter the contents of the postgresql database
# done: check contents of "sync raw data" method
# - query on UUID for 'data location', 'sync location' and 'update count'
# - data location and sync location just used for old output format
# - update count used to set 'data-synchronised' to true, so the entry is only processed once
# - the sync process deletes all remote data, then re-writes local data to remote
# -- this means that the entire section is trivialised by having only local data
# -- need to set the 'data synchronised' column
for uuid in uuids_for_data_to_update:
logger.debug("sync data for uuid: '{}'".format(uuid))
# (correction) done: part of the work is being done by the database triggers on the remote database -> insert into Exporter's database that data has changed (see ExportAction)
self.export_changed_measurement_data(uuid)
self.register_data_as_synchronised(uuid)
uuids_for_meta_to_update = sync_mgr_queries.get_sync_items_meas_table(self.database) # checks 'table synchronised' column value
# done: check contents of "sync table" method
# [!] previously, the 'new-measurement' status could be derived from a 'uuid' existing in local and not existing in remote. Need to find a new mechanism to trigger 'export-new-measurement' (to be validated)
# >> same problem applied to 'star changed' and 'name changed'
# >> should be that 'data update count' column starts at 0 for new measurements, marking a clear beginning. Have to test for edge cases
# - uuids for data to update will result in triggers for 'measurement-parameter' table
# - uuids for meta to update will result in triggers for 'global-overview' table
# cover behaviour that would usually be handled by triggers
for uuid in uuids_for_meta_to_update:
# to-do: parse data from 'global_measurement_overview'
# to-do: check local data agains SQDL remote data
metadata = self.collect_measurement_info(uuid)
if metadata is None:
continue
if self.check_if_uuid_is_new(metadata):
self.export_new_measurement(
uuid=uuid,
completed=metadata.get("is_complete", default=False)
)
else:
# done: assert if there are any edge cases that reach this point without ever running export_new_measurement()
star_changed, name_changed = self.check_for_measurement_changes(metadata)
self.export_changed_measurement(
uuid=uuid,
complete=metadata.get("is_complete", default=False),
star_changed=star_changed,
name_changed=name_changed,
)
# done: label table as synchronised
self.register_table_as_synchronised(uuid)
# export datasets
# to-do: check if there is a good way to export more than one action
# ExportAction stack can grow quite a bit, since one event loop check for all local changes,
# but only exports one (maybe temporary growing of the task stack is not harmful)
self.exporter.poll()
# done: modify exporter to use a poll method instead of a run method
# done: modify exporter to use PostgreSQL instead of SQLite
# update datasets
self.uploader.poll()
pass # done: modify uplaoder to use a poll method instead of a run method
pass # done: modify uploader to use PostgreSQL instead of SQLite --> (same class as exporter)
# done: sleep for deltatime if loop is running too fast
now = datetime.datetime.now()
if next_tick > now:
time.sleep((next_tick - now).total_seconds())
logger.debug("tick")
next_tick = datetime.datetime.now() + self.tick_rate
def create_export_tables_if_not_exist(self):
with self.database.conn_local.cursor() as cur:
table_statement, index_statement = self._create_export_updates_table()
cur.execute(table_statement)
cur.execute(index_statement)
table_statement, index_statement = self._create_exported_table()
cur.execute(table_statement)
cur.execute(index_statement)
def _create_export_updates_table(self) -> Tuple[str, str]:
"""
Provides the statements to create the coretools-export-updates table, if it does not already exist.
The following five columns function as boolean flag that dictate operations.
new-measurement - not really needed, but overwrites all data
data-changed - export previews, export metadata
completed - export raw data
update-star - compare with exported metadata
update-name - compare with exported metadata
"""
create_table_statement = """
CREATE TABLE IF NOT EXISTS coretools_export_updates (
id INT GENERATED ALWAYS AS IDENTITY,
uuid BIGINT NOT NULL UNIQUE,
modify_count INT default 0,
new_measurement BOOLEAN default FALSE,
data_changed BOOLEAN default FALSE,
completed BOOLEAN default FALSE,
update_star BOOLEAN default FALSE,
update_name BOOLEAN default FALSE,
PRIMARY KEY(id)
);
"""
create_index_statement = """
CREATE INDEX IF NOT EXISTS qdl_export_updates_uuid_index ON coretools_export_updates USING BTREE (uuid);
"""
return create_table_statement, create_index_statement
def _create_exported_table(self) -> Tuple[str, str]:
"""
Provides the statements to create the coretools-exported table, if it does not already exist.
"""
create_table_statement = """
CREATE TABLE IF NOT EXISTS coretools_exported (
id INT GENERATED ALWAYS AS IDENTITY,
uuid BIGINT NOT NULL UNIQUE,
path TEXT,
measurement_start_time timestamp, -- export raw after timeout and not completed.
raw_final BOOLEAN default FALSE, -- Set when completed or after timeout.
-- export state
export_state INT default 0, -- (0:todo, 1:done, 99: failed),
export_errors TEXT,
PRIMARY KEY(id)
);
"""
create_index_statement = """
CREATE INDEX IF NOT EXISTS coretools_exported_uuid_index ON coretools_exported USING BTREE (uuid);
"""
return create_table_statement, create_index_statement
def export_new_measurement(self, uuid: int, completed: bool):
"""
Original behaviour triggered on INSERT operations in the 'global_measurement_overview' table.
"""
statement = """
INSERT INTO coretools_export_updates(uuid, new_measurement, data_changed, completed)
VALUES (%{uuid}s, TRUE, TRUE, %(completed)s)
ON CONFLICT (uuid) DO
UPDATE SET
modify_count = coretools_export_updates.modify_count + 1,
new_measurement = TRUE,
completed = %(completed)s;
"""
parameters = {
"uuid": uuid,
"completed": completed,
}
with self.database.conn_local.cursor() as cur:
cur.execute(
query=statement,
vars=parameters,
)
def export_changed_measurement(self, uuid, complete: bool, star_changed: bool, name_changed: bool):
"""
Original behaviour triggered on UPDATE operations in the 'global_measurement_overview' table.
"""
statement = """
INSERT INTO coretools_export_updates(uuid, update_star, update_name, completed)
VALUES (%(uuid)s, %(update-star)s, %(name-changed)s, %(completed)s)
ON CONFLICT (uuid) DO
UPDATE SET
modify_count = coretools_export_updates.modify_count + 1,
update_star = coretools_export_updates.update_star OR %(star-changed)s,
update_name = coretools_export_updates.update_name OR %(name-changed)s,
completed = NEW.completed;
"""
parameters = {
"uuid": uuid,
"star-changed": star_changed, # star value in global-overview not equal to new value
"name-changed": name_changed, # experiment name value in global-overview not equal to new value
"completed": complete
}
with self.database.conn_local.cursor() as cur:
cur.execute(
query=statement,
vars=parameters,
)
def export_changed_measurement_data(self, uuid):
"""
Original behaviour triggered on INSERT and UPDATE operations in the 'measurement_parameters' table.
"""
statement = """
INSERT INTO coretools_export_updates(uuid, data_changed)
VALUES (%(uuid)s, TRUE)
ON CONFLICT (uuid) DO
UPDATE SET
modify_count = coretools_export_updates.modify_count + 1,
data_changed = TRUE;
"""
parameters = {
"uuid": uuid
}
with self.database.conn_local.cursor() as cur:
cur.execute(
query=statement,
vars=parameters,
)
def register_data_as_synchronised(self, uuid) -> None:
"""
Update 'global_measurement_overview' to register measurement data as synchronised
"""
statement = """
UPDATE global_measurement_overview
SET data_synchronized = TRUE
WHERE uuid = %(uuid)s;
"""
parameters = {
"uuid": uuid
}
with self.database.conn_local.cursor() as cur:
cur.execute(
query=statement,
vars=parameters,
)
def register_table_as_synchronised(self, uuid) -> None:
"""
Update 'global_measurement_overview' to register measurement table as synchronised
"""
statement = """
UPDATE global_measurement_overview
SET table_synchronized = TRUE
WHERE uuid = %(uuid)s;
"""
parameters = {
"uuid": uuid
}
with self.database.conn_local.cursor() as cur:
cur.execute(
query=statement,
vars=parameters,
)
def collect_measurement_info(self, uuid) -> Optional[dict]:
"""
Select relevant data from 'global_measurement_overview' to use in data syncronisation.
"""
# to-do: replace dict with static type (?)
statement = """
SELECT exp_name, starred, completed FROM global_measurement_overview WHERE uuid = %(uuid)s;
"""
parameters = {
"uuid": uuid
}
result = None
with self.database.conn_local.cursor() as cur:
cur.execute(
query=statement,
vars=parameters,
)
result = cur.fetchone()
if result is None:
logger.error("Failed to fetch data, or no entry exists with uuid '{}'".format(uuid))
return None
# to-do: fetch SQDL status on 'starred' and 'experiment-name' parameters, for comparison
self.uploader.client.login()
# schema: sqdl_client.api.v1.schema.SchemaAPI = self.uploader.client.api.schemas
# logger.info(schema.list())
# schema_instance = schema.retrieve_from_name("coretools-default")
# logger.info(schema_instance.to_json())
scope_api: sqdl_client.api.v1.scope.ScopeAPI = self.uploader.client.api.scope
# logger.info(scope_api.list())
scope = scope_api.retrieve_from_name("Test")
# logger.info(scope.list_data_identifiers())
logger.info("dataset uid: {}".format(uuid))
try:
dataset: sqdl_client.api.v1.dataset.Dataset = scope.retrieve_dataset_from_uid(uuid)
except sqdl_client.exceptions.ObjectNotFoundException as err:
logger.error(err)
return None
logger.info("dataset name: {}".format(dataset.name))
logger.info("dataset rating: {}".format(dataset.rating))
metadata = {
"is_new": False,
"is_complete": False,
"name_changed": False,
"star_changed": False,
}
return metadata
def check_if_uuid_is_new(self, uuid) -> bool:
"""
'update count' is 0
"""
statement = """
SELECT data_update_count
FROM global_measurement_overview
WHERE uuid = %(uuid)s;
"""
# to-do: fix check for new data
logger.warning("column value 'data_update_count' equal to 0 is not a safe test for new data: measurement can easily do multiple updates within one polling round, creating a race-condition")
parameters = {
"uuid": uuid
}
with self.database.conn_local.cursor() as cur:
cur.execute(
query=statement,
vars=parameters,
)
result = cur.fetchone()
assert result is not None, "Already queries UUID cannot be None"
return result[0] == 0
from sqdl_coretools_sync.uploader.model import UploadLog from core_tools.data.sqdl.model import UploadLog
from sqdl_coretools_sync.uploader.uploader_db import UploaderDb from core_tools.data.sqdl.uploader_db import UploaderDb
class UploadLogger: class UploadLogger:
......
import os import os
import logging
from sqlalchemy import create_engine from sqlalchemy import create_engine, URL
from sqlalchemy.orm import sessionmaker, Session from sqlalchemy.orm import sessionmaker, Session
from sqdl_coretools_sync.uploader.model import create_database from core_tools.data.sqdl.model import create_database
logger = logging.getLogger(__name__)
class UploaderDb: class UploaderDb:
echo_sql = False # enable for debugging echo_sql = False # enable for debugging
def __init__(self, db_file=None): def __init__(self, config: dict = None, db_file=None):
if db_file is None: # if db_file is None:
db_file = ':memory:' # db_file = ':memory:'
else: # else:
db_file = os.path.expanduser(db_file) # db_file = os.path.expanduser(db_file)
print(db_file) # print(db_file)
os.makedirs(os.path.dirname(db_file), exist_ok=True) # os.makedirs(os.path.dirname(db_file), exist_ok=True)
#
self.engine = create_engine("sqlite+pysqlite:///" + db_file, # self.engine = create_engine("sqlite+pysqlite:///" + db_file,
echo=UploaderDb.echo_sql, # echo=UploaderDb.echo_sql,
) # )
if db_file is not None:
logger.warning("SQDL Uploader no longer uses SQLite: db_file parameter depricated")
if config is None:
config = {}
connection_url = URL.create(
drivername="postgresql+psycopg2",
host=config.get("host", default="localhost"),
port=config.get("port", default=5432),
username=config.get("username", default="dbijl"),
database=config.get("database", default="core-tools")
)
self.engine = create_engine(
connection_url,
echo=UploaderDb.echo_sql,
)
create_database(self.engine) create_database(self.engine)
self.sessionmaker = sessionmaker(bind=self.engine) self.sessionmaker = sessionmaker(bind=self.engine)
......
from sqlalchemy import select, insert, delete, update, func from sqlalchemy import select, insert, delete, update, func
from sqlalchemy.exc import IntegrityError from sqlalchemy.exc import IntegrityError
from sqlalchemy.dialects.sqlite import insert as sqlite_upsert from sqlalchemy.dialects.postgresql import insert as psql_insert
from sqdl_coretools_sync.uploader.model import UploadedDataset, UploadedFile from core_tools.data.sqdl.model import UploadedDataset, UploadedFile
from sqdl_coretools_sync.uploader.uploader_db import UploaderDb from core_tools.data.sqdl.uploader_db import UploaderDb
# %% # %%
...@@ -33,12 +32,14 @@ class UploadRegistry: ...@@ -33,12 +32,14 @@ class UploadRegistry:
def add_update_file(self, dataset_id, file_sqdl_uuid, filename, st_mtime_ns) -> None: def add_update_file(self, dataset_id, file_sqdl_uuid, filename, st_mtime_ns) -> None:
with self.db.session() as session: with self.db.session() as session:
stmt = sqlite_upsert(UploadedFile).values( # stmt = sqlite_upsert(UploadedFile).values(
stmt = psql_insert(UploadedFile).values(
dataset_id=dataset_id, dataset_id=dataset_id,
sqdl_uuid=file_sqdl_uuid, sqdl_uuid=file_sqdl_uuid,
filename=filename, filename=filename,
st_mtime_us=st_mtime_ns // 1000) st_mtime_us=st_mtime_ns // 1000)
stmt = stmt.on_conflict_do_update( stmt = stmt.on_conflict_do_update(
constraint="uploaded_file_sqdl_uuid_key",
set_=dict(sqdl_uuid=file_sqdl_uuid, st_mtime_us=st_mtime_ns // 1000)) set_=dict(sqdl_uuid=file_sqdl_uuid, st_mtime_us=st_mtime_ns // 1000))
session.execute(stmt) session.execute(stmt)
session.commit() session.commit()
...@@ -51,20 +52,3 @@ class UploadRegistry: ...@@ -51,20 +52,3 @@ class UploadRegistry:
stmt = select(func.count()).select_from(UploadedFile) stmt = select(func.count()).select_from(UploadedFile)
counts["uploaded files"] = session.scalar(stmt) counts["uploaded files"] = session.scalar(stmt)
return counts return counts
if __name__ == "__main__":
from datetime import datetime
db = UploaderDb('~/.sqdl_uploader/uploader.db')
uploader = UploadRegistry(db)
ds_id = uploader.get_create_dataset('a1234', 'test', 1234)
print(ds_id)
print(uploader.get_files(ds_id))
uploader.add_update_file(ds_id, 'f1234', 'test.txt',
int(datetime.now().timestamp() * 1e9))
uploader.add_update_file(ds_id, 'f1235', 'test2.txt',
int(datetime.now().timestamp() * 1e9))
print(uploader.get_files(ds_id))
...@@ -3,9 +3,9 @@ from dataclasses import dataclass ...@@ -3,9 +3,9 @@ from dataclasses import dataclass
from typing import List from typing import List
from sqlalchemy import select, delete, update, func from sqlalchemy import select, delete, update, func
from sqlalchemy.dialects.sqlite import insert as sqlite_upsert from sqlalchemy.dialects.postgresql import insert as psql_insert
from sqdl_coretools_sync.uploader.model import UploadTask from core_tools.data.sqdl.model import UploadTask
from sqdl_coretools_sync.uploader.uploader_db import UploaderDb from core_tools.data.sqdl.uploader_db import UploaderDb
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -141,9 +141,11 @@ class UploaderTaskQueue: ...@@ -141,9 +141,11 @@ class UploaderTaskQueue:
scope = ds_locator.scope scope = ds_locator.scope
uid = ds_locator.uid uid = ds_locator.uid
path = ds_locator.path path = ds_locator.path
with self.db.session() as session: with self.db.session() as session:
stmt = sqlite_upsert(UploadTask).values(version_id=1, scope=scope, uid=uid, ds_path=path, **kwargs) stmt = psql_insert(UploadTask).values(version_id=1, scope=scope, uid=uid, ds_path=path, **kwargs)
stmt = stmt.on_conflict_do_update( stmt = stmt.on_conflict_do_update(
constraint="upload_task_uid_key",
set_=dict(version_id=UploadTask.version_id + 1, **kwargs)) set_=dict(version_id=UploadTask.version_id + 1, **kwargs))
session.execute(stmt) session.execute(stmt)
session.commit() session.commit()
...@@ -161,39 +163,3 @@ class UploaderTaskQueue: ...@@ -161,39 +163,3 @@ class UploaderTaskQueue:
counts["failed"] = results.get((True, False), 0) counts["failed"] = results.get((True, False), 0)
counts["retry"] = results.get((True, True), 0) counts["retry"] = results.get((True, True), 0)
return counts return counts
if __name__ == "__main__":
db = UploaderDb('~/.sqdl_uploader/uploader-test.db')
uploader = UploaderTaskQueue(db)
print(uploader.get_tasks())
uploader.add_dataset(1234, 'somewhere')
uploader.add_dataset(1235, 'else')
uploader.add_dataset(1299, 'there')
uploader._insert_or_update_task(1234, 'somewhere', failed=False, retry=False)
uploader._insert_or_update_task(1235, 'else', failed=False, retry=False)
uploader._insert_or_update_task(1299, 'there', failed=False, retry=False)
uploader.update_dataset(1234, 'somewhere')
uploader.update_dataset(1235, 'else', final=True)
print('All:', uploader.get_tasks())
print('Fail:', uploader.get_oldest_task())
task = uploader.get_oldest_task()
uploader.set_failed(task)
print('Delete', uploader.get_oldest_task())
task = uploader.get_oldest_task()
uploader.delete_task(task)
print('All:', uploader.get_tasks())
print('Update', uploader.get_oldest_task())
task = uploader.get_oldest_task()
uploader.update_rating(task.uid, task.ds_path)
uploader.update_name(task.uid, task.ds_path)
print(uploader.get_tasks())
uploader.delete_task(task)
print('After delete', uploader.get_tasks())
from core_tools.startup.db_connection import connect_local_db
from core_tools.startup.sample_info import set_sample_info
from core_tools.startup.app_wrapper import run_app
from core_tools.data.sqdl.sqdl_writer import SQDLWriter
def sync_init():
# to-do: validate if setting sample info is still required
set_sample_info('Any', 'Any', 'Any')
connect_local_db()
print('Starting SQDL Sync')
def sync_main():
writer = SQDLWriter()
writer.run()
if __name__ == '__main__':
run_app('sqdl_sync', sync_init, sync_main)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment