Skip to content
Snippets Groups Projects
Commit 27c01479 authored by Daan Bijl's avatar Daan Bijl
Browse files

refactor sqdl model definitions

parent 415edf63
No related branches found
No related tags found
No related merge requests found
......@@ -12,8 +12,6 @@ import numpy as np
import psycopg2
from psycopg2._psycopg import connection as Connection
import core_tools as ct
from core_tools.startup.config import get_configuration
from core_tools.data.ds.data_set import load_by_uuid
from core_tools.data.sqdl.export.psql_commands import SqlConnection
......@@ -22,7 +20,8 @@ from core_tools.data.sqdl.export.data_export import export_data
from core_tools.data.sqdl.export.data_preview import generate_previews
# from core_tools.data.sqdl.uploader_db import UploaderDb
from core_tools.data.sqdl.model.task_queue import TaskQueueOperations, DatasetInfo
from core_tools.data.sqdl.model import task_queue
from core_tools.data.sqdl.model.task_queue import DatasetInfo
# from core_tools.data.sqdl.uploader_task_queue import UploaderTaskQueue, DatasetLocator
logger = logging.getLogger(__name__)
......@@ -58,7 +57,6 @@ class Exporter:
def __init__(self, cfg: Dict):
self.export_path = cfg.get('sqdl.export_path')
self.connection = SqlConnection()
self.uploader = TaskQueueOperations()
self.scopes = cfg.get('sqdl.scopes', {})
self.setup_name_corrections = cfg.get('sqdl.setup_name_corrections', {})
......@@ -328,13 +326,13 @@ class Exporter:
c = conn.cursor()
if sqdl_update.upload_dataset or sqdl_update.upload_raw_data:
# self.uploader_queue.update_dataset(ds_locator, final=sqdl_update.raw_final)
self.uploader.update_dataset(c, dsi=ds_info, is_finished=sqdl_update.raw_final)
task_queue.update_dataset(c, dsi=ds_info, is_finished=sqdl_update.raw_final)
if sqdl_update.update_star:
# self.uploader_queue.update_rating(ds_locator)
self.uploader.update_rating(c, dsi=ds_info)
task_queue.update_rating(c, dsi=ds_info)
if sqdl_update.update_name:
# self.uploader_queue.update_name(ds_locator)
self.uploader.update_name(c, dsi=ds_info)
task_queue.update_name(c, dsi=ds_info)
@property
def measurement_expiration_time(self):
......
......@@ -3,57 +3,55 @@ from typing import List
from psycopg2._psycopg import cursor as Cursor
def core_method(message: str):
print("from core: {}".format(message))
class CoreOperations:
def get_data_to_sync(self, c: Cursor) -> List[int]:
c.execute(
query="""
SELECT uuid
FROM global_measurement_overview
WHERE NOT data_synchronized
;
"""
)
records = c.fetchall()
return [r[0] for r in records]
def get_table_to_sync(self, c: Cursor) -> List[int]:
c.execute(
query="""
SELECT uuid
FROM global_measurement_overview
WHERE NOT table_synchronized
;
"""
)
records = c.fetchall()
return [r[0] for r in records]
def set_data_as_synced(self, c: Cursor, ct_uid: int) -> bool:
c.execute(
query="""
UPDATE global_measurement_overview
SET data_synchronized = TRUE
WHERE uuid = %(uid)s;
""",
vars={
"uid": ct_uid
}
)
return c.rowcount == 1
def set_table_as_synced(self, c: Cursor, ct_uid: int) -> bool:
c.execute(
query="""
UPDATE global_measurement_overview
SET table_synchronized = TRUE
WHERE uuid = %(uid)s;
""",
vars={
"uid": ct_uid
}
)
return c.rowcount == 1
def get_data_to_sync(c: Cursor) -> List[int]:
c.execute(
query="""
SELECT uuid
FROM global_measurement_overview
WHERE NOT data_synchronized
;
"""
)
records = c.fetchall()
return [r[0] for r in records]
def get_table_to_sync(c: Cursor) -> List[int]:
c.execute(
query="""
SELECT uuid
FROM global_measurement_overview
WHERE NOT table_synchronized
;
"""
)
records = c.fetchall()
return [r[0] for r in records]
def set_data_as_synced(c: Cursor, ct_uid: int) -> bool:
c.execute(
query="""
UPDATE global_measurement_overview
SET data_synchronized = TRUE
WHERE uuid = %(uid)s;
""",
vars={
"uid": ct_uid
}
)
return c.rowcount == 1
def set_table_as_synced(c: Cursor, ct_uid: int) -> bool:
c.execute(
query="""
UPDATE global_measurement_overview
SET table_synchronized = TRUE
WHERE uuid = %(uid)s;
""",
vars={
"uid": ct_uid
}
)
return c.rowcount == 1
......@@ -18,96 +18,105 @@ class ExportAction:
pass
class ExportOperations:
def export_new_measurement(self, c: Cursor, ct_uid: int, is_complete: bool):
c.execute(
query="""
INSERT INTO coretools_export_updates (
uuid, new_measurement, data_changed, completed
) VALUES (
%(uid)s, TRUE, TRUE, %(completed)s
) ON CONFLICT ( uuid ) DO UPDATE SET
modify_count = coretools_export_updates.modify_count + 1,
new_measurement = TRUE,
completed = %(completed)s;
""",
vars={
"uid": ct_uid,
"completed": is_complete,
}
)
def export_changed_measurement(self, c: Cursor, ct_uid: int, meta: SyncStatus):
c.execute(
query="""
INSERT INTO coretools_export_updates (
uuid, update_star, update_name, completed
) VALUES (
%(uuid)s, %(star-changed)s, %(name-changed)s, %(completed)s
) ON CONFLICT ( uuid ) DO UPDATE SET
modify_count = coretools_export_updates.modify_count + 1,
update_star = coretools_export_updates.update_star OR %(star-changed)s,
update_name = coretools_export_updates.update_name OR %(name-changed)s,
completed = %(completed)s;
""",
vars={
"uuid": ct_uid,
"star-changed": meta.changed_rating, # star value in global-overview not equal to new value
"name-changed": meta.changed_name, # experiment name value in global-overview not equal to new value
"completed": meta.is_complete
}
)
def export_changed_data(self, c: Cursor, ct_uid: int):
c.execute(
query="""
INSERT INTO coretools_export_updates (
uuid, data_changed
) VALUES (
%(uid)s, TRUE
)
ON CONFLICT ( uuid ) DO UPDATE SET
modify_count = coretools_export_updates.modify_count + 1,
data_changed = TRUE
;
""",
vars={
"uid": ct_uid
}
)
def get_export_action(self, c: Cursor) -> Optional[ExportAction]:
raise NotImplementedError()
def uuid_exists(self, c: Cursor, uuid) -> bool:
raise NotImplementedError()
def get_expired_measurement_action(self, c: Cursor) -> Optional[ExportAction]:
raise NotImplementedError()
def set_export_error(self, c: Cursor, uuid, exception, code=99) -> None:
raise NotImplementedError()
def set_exported(self, c: Cursor, measurement, path: str, is_complete: bool = False) -> None:
raise NotImplementedError()
def set_resume_after(self, c: Cursor, action: ExportAction, wait_time: TimeDelta) -> None:
raise NotImplementedError()
def increment_fail_count(self, c: Cursor, action: ExportAction) -> None:
raise NotImplementedError()
def retry_failed_exports(self, c: Cursor) -> None:
c.execute(
query="""
SELECT uuid, gg
FROM coretools_exported
WHERE export_state BETWEEN 10 AND 100
ORDER BY uuid
;
"""
)
records = c.fetchall()
if len(records) == 0:
return None
raise NotImplementedError()
def export_new_measurement(c: Cursor, ct_uid: int, is_complete: bool):
c.execute(
query="""
INSERT INTO coretools_export_updates (
uuid, new_measurement, data_changed, completed
) VALUES (
%(uid)s, TRUE, TRUE, %(completed)s
) ON CONFLICT ( uuid ) DO UPDATE SET
modify_count = coretools_export_updates.modify_count + 1,
new_measurement = TRUE,
completed = %(completed)s;
""",
vars={
"uid": ct_uid,
"completed": is_complete,
}
)
def export_changed_measurement(c: Cursor, ct_uid: int, meta: SyncStatus):
c.execute(
query="""
INSERT INTO coretools_export_updates (
uuid, update_star, update_name, completed
) VALUES (
%(uuid)s, %(star-changed)s, %(name-changed)s, %(completed)s
) ON CONFLICT ( uuid ) DO UPDATE SET
modify_count = coretools_export_updates.modify_count + 1,
update_star = coretools_export_updates.update_star OR %(star-changed)s,
update_name = coretools_export_updates.update_name OR %(name-changed)s,
completed = %(completed)s;
""",
vars={
"uuid": ct_uid,
"star-changed": meta.changed_rating, # star value in global-overview not equal to new value
"name-changed": meta.changed_name, # experiment name value in global-overview not equal to new value
"completed": meta.is_complete
}
)
def export_changed_data(c: Cursor, ct_uid: int):
c.execute(
query="""
INSERT INTO coretools_export_updates (
uuid, data_changed
) VALUES (
%(uid)s, TRUE
)
ON CONFLICT ( uuid ) DO UPDATE SET
modify_count = coretools_export_updates.modify_count + 1,
data_changed = TRUE
;
""",
vars={
"uid": ct_uid
}
)
def get_export_action(c: Cursor) -> Optional[ExportAction]:
raise NotImplementedError()
def uuid_exists(c: Cursor, uuid) -> bool:
raise NotImplementedError()
def get_expired_measurement_action(c: Cursor) -> Optional[ExportAction]:
raise NotImplementedError()
def set_export_error(c: Cursor, uuid, exception, code=99) -> None:
raise NotImplementedError()
def set_exported(c: Cursor, measurement, path: str, is_complete: bool = False) -> None:
raise NotImplementedError()
def set_resume_after(c: Cursor, action: ExportAction, wait_time: TimeDelta) -> None:
raise NotImplementedError()
def increment_fail_count(c: Cursor, action: ExportAction) -> None:
raise NotImplementedError()
def retry_failed_exports(c: Cursor) -> None:
c.execute(
query="""
SELECT uuid, gg
FROM coretools_exported
WHERE export_state BETWEEN 10 AND 100
ORDER BY uuid
;
"""
)
records = c.fetchall()
if len(records) == 0:
return None
raise NotImplementedError()
......@@ -14,28 +14,27 @@ class UploadLog:
message: str
class LogOperations:
def log(self, c: Cursor, scope: str, ct_uid: int, message: str):
def log(c: Cursor, scope: str, ct_uid: int, message: str):
"""
Insert a message into the 'upload_log' table.
"""
query = """
INSERT INTO upload_log (
scope,
ct_uid,
upload_timestamp,
message
) VALUES (
%(scope)s,
%(uid)s,
%(ts)s,
%(msg)s
);
"""
Insert a message into the 'upload_log' table.
"""
query = """
INSERT INTO upload_log (
scope,
ct_uid,
upload_timestamp,
message
) VALUES (
%(scope)s,
%(uid)s,
%(ts)s,
%(msg)s
);
"""
values = {
"scope": scope,
"uid": ct_uid,
"ts": datetime.now(),
"msg": message,
}
c.execute(query=query, vars=values)
values = {
"scope": scope,
"uid": ct_uid,
"ts": datetime.now(),
"msg": message,
}
c.execute(query=query, vars=values)
This diff is collapsed.
......@@ -26,99 +26,101 @@ class SQDLDataset:
files: List[SQDLFile]
class UploadOperations:
def create_dataset(self, c: Cursor, scope: str, ct_uid: int, sqdl_uuid: UUID) -> int:
"""
Create new dataset entry in the 'sqdl_dataset' table. Return the row index of th new entry. If an entry with the specified UUID already exists, return the row index of the existing entry instead.
"""
try:
c.execute(
query="""
INSERT INTO sqdl_dataset (
scope, coretools_uid, sqdl_uuid
) VALUES (
%(scope)s, %(uid)s, %(uuid)s
)
RETURNING idx;
""",
vars={
"scope": scope,
"uid": ct_uid,
"uuid": sqdl_uuid,
}
)
index = c.fetchone()[0]
return index
except IntegrityError:
c.execute(
query="""
SELECT idx
FROM sqdl_dataset
WHERE sqdl_uuid = %(uuid)s
""",
vars={
"uuid": sqdl_uuid
}
)
index = c.fetchone()[0]
logger.warning("Dataset with UUID '{}' already exists in table 'sqdl_dataset' at index '{}'.".format(sqdl_uuid, index))
return index
def get_counts(self, c: Cursor) -> Dict[str, int]:
"""
"""
counts = {}
c.execute(query="SELECT COUNT(idx) FROM sqdl_dataset;")
counts["uploaded-datasets"] = c.fetchone()[0]
c.execute(query="SELECT COUNT(idx) FROM sqdl_file;")
counts["uploaded-files"] = c.fetchone()[0]
return counts
def get_files_for_dataset(self, c: Cursor, parent_idx: int) -> List[SQDLFile]:
"""
Get all the SQDLFile entries associated with the SQDLDataset that has the provided index.
"""
def parse_row(row: Tuple) -> SQDLFile:
return SQDLFile(
index=row[0],
dataset_index=row[1],
sqdl_uuid=row[2],
filename=row[3],
last_modified=[4]
)
def create_dataset(c: Cursor, scope: str, ct_uid: int, sqdl_uuid: UUID) -> int:
"""
Create new dataset entry in the 'sqdl_dataset' table. Return the row index of th new entry. If an entry with the specified UUID already exists, return the row index of the existing entry instead.
"""
try:
c.execute(
query="""
SELECT idx, dataset_index, sqdl_uuid, filename, last_modified
FROM sqdl_file
WHERE dataset_index = %(parent)s;
INSERT INTO sqdl_dataset (
scope, coretools_uid, sqdl_uuid
) VALUES (
%(scope)s, %(uid)s, %(uuid)s
)
RETURNING idx;
""",
vars={"parent": parent_idx}
vars={
"scope": scope,
"uid": ct_uid,
"uuid": sqdl_uuid,
}
)
records = c.fetchall()
return [parse_row(r) for r in records]
def create_or_update_file(self, c: Cursor, parent_idx: int, sqdl_uuid: UUID, filename: str, last_modified: int) -> None:
"""
Create new SQDLFile entry. If entry with UUID already exists, update the last-modified timestamp instead.
"""
# todo: original uploader registry also updates uuid on conflict, but uuid is at all times the conflicting column, so that should do nothing.
index = c.fetchone()[0]
return index
except IntegrityError:
c.execute(
query="""
INSERT INTO sqdl_file (
dataset_index, sqdl_uuid, filename, last_modified
) VALUES (
%(idx)s, %(uuid)s, %(fn)s, %(lm)s
) ON CONFLICT (sqdl_uuid) DO UPDATE SET
last_modified = %(lm)s
;
SELECT idx
FROM sqdl_dataset
WHERE sqdl_uuid = %(uuid)s
""",
vars={
"idx": parent_idx,
"uuid": sqdl_uuid,
"fn": filename,
"lm": last_modified,
"uuid": sqdl_uuid
}
)
index = c.fetchone()[0]
logger.warning("Dataset with UUID '{}' already exists in table 'sqdl_dataset' at index '{}'.".format(sqdl_uuid, index))
return index
def get_counts(c: Cursor) -> Dict[str, int]:
"""
"""
counts = {}
c.execute(query="SELECT COUNT(idx) FROM sqdl_dataset;")
counts["uploaded-datasets"] = c.fetchone()[0]
c.execute(query="SELECT COUNT(idx) FROM sqdl_file;")
counts["uploaded-files"] = c.fetchone()[0]
return counts
def get_files_for_dataset(c: Cursor, parent_idx: int) -> List[SQDLFile]:
"""
Get all the SQDLFile entries associated with the SQDLDataset that has the provided index.
"""
def parse_row(row: Tuple) -> SQDLFile:
return SQDLFile(
index=row[0],
dataset_index=row[1],
sqdl_uuid=row[2],
filename=row[3],
last_modified=[4]
)
c.execute(
query="""
SELECT idx, dataset_index, sqdl_uuid, filename, last_modified
FROM sqdl_file
WHERE dataset_index = %(parent)s;
""",
vars={"parent": parent_idx}
)
records = c.fetchall()
return [parse_row(r) for r in records]
def create_or_update_file(c: Cursor, parent_idx: int, sqdl_uuid: UUID, filename: str, last_modified: int) -> None:
"""
Create new SQDLFile entry. If entry with UUID already exists, update the last-modified timestamp instead.
"""
# todo: original uploader registry also updates uuid on conflict, but uuid is at all times the conflicting column, so that should do nothing.
c.execute(
query="""
INSERT INTO sqdl_file (
dataset_index, sqdl_uuid, filename, last_modified
) VALUES (
%(idx)s, %(uuid)s, %(fn)s, %(lm)s
) ON CONFLICT (sqdl_uuid) DO UPDATE SET
last_modified = %(lm)s
;
""",
vars={
"idx": parent_idx,
"uuid": sqdl_uuid,
"fn": filename,
"lm": last_modified,
}
)
from psycopg2._psycopg import cursor as Cursor
class VersionOperations:
def read(c: Cursor) -> str:
"""
"""
def read(self, c: Cursor) -> str:
"""
"""
query = """
SELECT major, minor, patch
FROM coretools_version;
"""
c.execute(query=query)
record = c.fetchall()
assert len(record) == 1, "Either no or more that one database version exists"
return "{}.{}.{}".format(*record[0])
query = """
SELECT major, minor, patch
FROM coretools_version;
"""
c.execute(query=query)
record = c.fetchall()
assert len(record) == 1, "Either no or more that one database version exists"
return "{}.{}.{}".format(*record[0])
......@@ -14,6 +14,9 @@ from core_tools.data.sqdl.model.export import SyncStatus
import sqdl_client
from sqdl_client.client import QDLClient
from psycopg2 import InterfaceError
from requests.exceptions import ConnectionError
__database_version__ = "1.1.0"
logger = logging.getLogger(__name__)
......@@ -27,17 +30,16 @@ class SQDLWriter():
"""
def __init__(self):
# configuration
config = get_configuration()
# local database
self.database = DatabaseInit()
# todo: validate that this is actually a local database
self.database._connect()
self.connection = self.database.conn_local
self.validate_version()
# core
# initialise
self.exporter = Exporter(config)
self.uploader = Uploader(
config,
......@@ -46,11 +48,11 @@ class SQDLWriter():
dev_mode=config.get("sqdl.dev_mode", default=True)
)
)
# event loop
self.tick_rate = datetime.timedelta(
seconds=config.get("sqdl.tick_rate", default=6)
)
# prepare for run
self.is_running = False
self.next_tick = None
self.database._disconnect()
......@@ -69,47 +71,45 @@ class SQDLWriter():
self.next_tick = datetime.datetime.now() + self.tick_rate
while self.is_running:
if self.connection.closed > 0:
try:
self.queue_datasets_for_export()
self.exporter.poll(self.connection)
self.uploader.poll(self.connection)
except InterfaceError:
logger.warning("Connection to local database lost. Reconnecting...")
self.database._disconnect()
self.database._connect()
self.connection = self.database.conn_local
assert self.connection.closed == 0, "failed to reconnect"
# todo: instead of passing connection every time, set connection for exporter and uploader here once. If any of the components closes the connection, the reconnect will be triggered.
self.reconnect()
self.queue_datasets_for_export()
# todo: check if there is a good way to export more than one action
# ExportAction stack can grow quite a bit, since one event loop check for all local changes,
# but only exports one (maybe temporary growing of the task stack is not harmful)
self.exporter.poll(self.connection)
self.uploader.poll(self.connection)
except ConnectionError:
logger.warning("Failed to connect to SQDL. ")
self.sleep_to_limit_rate()
finally:
self.sleep_to_limit_rate()
except Exception as exc:
logger.exception("An Exception with the following message occured: {}".format(exc))
logger.exception("An unhandled exception with the following message occured: {}".format(exc))
finally:
self.database._disconnect()
logger.info("Stopping SQDL Writer event loop...")
def queue_datasets_for_export(self):
def queue_datasets_for_export(self) -> None:
"""
Covers the behaviour that would originally be done by db-sync and the remote database triggers.
Looks up measurement data that needs to be synchronized from the local database, and creates the appropriate ExportActions.
"""
with self.connection:
cursor = self.connection.cursor()
uids_for_data_to_update = core.CoreOperations().get_data_to_sync(cursor)
uids_for_data_to_update = core.get_data_to_sync(cursor)
for ct_uid in uids_for_data_to_update:
logger.debug("sync data for core-tools UID: '{}'".format(ct_uid))
export.ExportOperations().export_changed_data(cursor, ct_uid)
core.CoreOperations().set_data_as_synced(cursor, ct_uid)
export.export_changed_data(cursor, ct_uid)
core.set_data_as_synced(cursor, ct_uid)
with self.connection:
cursor = self.connection.cursor()
uids_for_meta_to_update = core.CoreOperations().get_table_to_sync(cursor)
uids_for_meta_to_update = core.get_table_to_sync(cursor)
# cover behaviour that would usually be handled by triggers
for ct_uid in uids_for_meta_to_update:
......@@ -120,10 +120,10 @@ class SQDLWriter():
with self.connection:
cursor = self.connection.cursor()
if sync_status.is_new:
export.ExportOperations().export_new_measurement(cursor, ct_uid, sync_status.is_complete)
export.export_new_measurement(cursor, ct_uid, sync_status.is_complete)
else:
export.ExportOperations().export_changed_measurement(cursor, ct_uid, sync_status)
core.CoreOperations().set_table_as_synced(cursor, ct_uid)
export.export_changed_measurement(cursor, ct_uid, sync_status)
core.set_table_as_synced(cursor, ct_uid)
def collect_measurement_sync_status(self, uuid: str) -> Optional[SyncStatus]:
"""
......@@ -189,7 +189,7 @@ class SQDLWriter():
"""
with self.connection as conn:
cursor = conn.cursor()
database_version = version.VersionOperations().read(cursor)
database_version = version.read(cursor)
assert database_version == __database_version__, "Database is not up to date: expected '{}', found '{}'".format(__database_version__, database_version)
......@@ -199,3 +199,9 @@ class SQDLWriter():
seconds = (self.next_tick - now).total_seconds()
time.sleep(seconds)
self.next_tick = datetime.datetime.now() + self.tick_rate
def reconnect(self):
self.database._disconnect()
self.database._connect()
self.connection = self.database.conn_local
assert self.connection.closed == 0, "failed to reconnect"
......@@ -10,10 +10,7 @@ from .dataset_scanner import DatasetScanner, FileInfo
from .exceptions import InvalidNameError, NoScopeError, DatasetError
from .metadata_formatter import MetadataFormatter
from core_tools.data.sqdl.model.task_queue import TaskQueueOperations
from core_tools.data.sqdl.model.upload import UploadOperations
from core_tools.data.sqdl.model.log import LogOperations
from core_tools.data.sqdl.model import log, upload, task_queue
import psutil
import core_tools as ct
......@@ -43,11 +40,7 @@ class SqdlUploader:
api_key = cfg.get('sqdl.api_key')
if api_key:
self.client.use_api_key(api_key)
self.connection = conn
self.task_queue = TaskQueueOperations()
self.upload_registry = UploadOperations()
self.logger = LogOperations()
self.metadata_formatter = MetadataFormatter()
# load scopes to fix them when not set during export
......@@ -55,7 +48,7 @@ class SqdlUploader:
if cfg.get('sqdl.retry_failed_uploads', False):
with self.connection:
c = self.connection.cursor()
self.task_queue.retry_all_failed(c)
task_queue.retry_all_failed(c)
self.pid = os.getpid()
self.cleanup_abandoned_tasks()
logger.info(f"Started uploader, pid:{self.pid}")
......@@ -64,12 +57,11 @@ class SqdlUploader:
def process_task(self) -> bool:
start = time.perf_counter()
# task = self.task_queue.get_oldest_task(self.pid)
with self.connection:
c = self.connection.cursor()
task = self.task_queue.claim_oldest_task(c, self.pid)
task = task_queue.claim_oldest_task(c, self.pid)
if task is None:
task = self.task_queue.claim_newest_retry_task(c, self.pid)
task = task_queue.claim_newest_retry_task(c, self.pid)
if task is None:
return False
......@@ -89,7 +81,7 @@ class SqdlUploader:
if task.update_dataset or task.is_ready:
with self.connection:
c = self.connection.cursor()
ds_upload_id = self.upload_registry.create_dataset(c, task.scope, task.coretools_uid, sqdl_ds.uuid)
ds_upload_id = upload.create_dataset(c, task.scope, task.coretools_uid, sqdl_ds.uuid)
files = self.sort_files(ds_scanner.get_files(), desc)
self.upload_files(task, ds_upload_id, files, sqdl_ds)
......@@ -105,10 +97,10 @@ class SqdlUploader:
with self.connection:
c = self.connection.cursor()
deleted = self.task_queue.delete_task(c, task)
deleted = task_queue.delete_task(c, task)
if not deleted:
logger.debug(f'Task {task.coretools_uid} has been modified during upload. Release task')
self.task_queue.release_task(c, task)
task_queue.release_task(c, task)
# log success
self.log(task, 'Uploaded')
......@@ -119,7 +111,7 @@ class SqdlUploader:
logger.error(f"Exception processing {task.coretools_uid} '{ex}' {task.dataset_path}")
with self.connection:
c = self.connection.cursor()
self.task_queue.set_failed(c, task)
task_queue.set_failed(c, task)
self.log(task, f'{type(ex)}: {ex}')
except (ConnectionError, ReadTimeout) as ex:
......@@ -127,7 +119,7 @@ class SqdlUploader:
logger.error(f"Exception processing {task.coretools_uid} '{ex}'", exc_info=True)
with self.connection:
c = self.connection.cursor()
self.task_queue.release_task(c, task)
task_queue.release_task(c, task)
time.sleep(1.0)
except RequestException as ex:
......@@ -136,7 +128,7 @@ class SqdlUploader:
logger.info(f'Response: {ex.response.url}; {ex.response.headers}')
with self.connection:
c = self.connection.cursor()
self.task_queue.set_failed(c, task)
task_queue.set_failed(c, task)
self.log(task, f'{type(ex)}: {ex}')
time.sleep(0.5)
......@@ -147,7 +139,7 @@ class SqdlUploader:
logger.error(f'Exception processing {task.coretools_uid} {task.dataset_path}', exc_info=True)
with self.connection:
c = self.connection.cursor()
self.task_queue.set_failed(c, task)
task_queue.set_failed(c, task)
self.log(task, f'{type(ex)}: {ex}')
time.sleep(0.5)
return True
......@@ -155,7 +147,7 @@ class SqdlUploader:
def log(self, task, message):
with self.connection:
c = self.connection.cursor()
self.logger.log(c, task.scope, task.coretools_uid, message)
log.log(c, task.scope, task.coretools_uid, message)
def get_scope(self, desc):
# is it in the json file?
......@@ -173,9 +165,6 @@ class SqdlUploader:
metadata = self.metadata_formatter.format(desc)
sqdl_api = self.client.api
logger.info(sqdl_api.version)
logger.info(sqdl_api.get_user_info())
try:
logger.info("retrieving scope by name: '{}'".format(scope_name))
scope = sqdl_api.scope.retrieve_from_name(scope_name)
......@@ -241,10 +230,9 @@ class SqdlUploader:
return sorted(files, key=lambda fi: fi.seq_number)
def upload_files(self, task, ds_upload_id, file_entries: list[FileInfo], sqdl_ds: Dataset):
with self.connection:
c = self.connection.cursor()
uploaded_files_list = self.upload_registry.get_files_for_dataset(c, parent_idx=ds_upload_id)
uploaded_files_list = upload.get_files_for_dataset(c, parent_idx=ds_upload_id)
uploaded_files = {uf.filename: uf for uf in uploaded_files_list}
logger.debug(f"{len(uploaded_files)} different files in registry for {task.coretools_uid}: {[uploaded_files.keys()]}")
......@@ -274,19 +262,12 @@ class SqdlUploader:
logger.error(f"Cannot upload modified file '{fi.name}'. It's immutable. {task}; {fi}; {uploaded_files.get(fi.name)}")
continue
logger.info("FILE DATA: file url '{}'".format(sqdl_file.url))
logger.info("FILE DATA: file name '{}'".format(sqdl_file.name))
logger.info("FILE DATA: presigned url '{}'".format(sqdl_file.presigned_url))
logger.info("FILE DATA: presigned upload data:")
for k, v in sqdl_file.presigned_upload_data.items():
logger.info("FILE DATA: key = '{}', value = '{}'".format(k, v))
make_immutable = fi.file_type == 'raw' and task.is_ready
self.upload_file(fi, sqdl_file, make_immutable)
with self.connection:
c = self.connection.cursor()
self.upload_registry.create_or_update_file(
upload.create_or_update_file(
c,
parent_idx=ds_upload_id,
sqdl_uuid=sqdl_file.uuid,
......@@ -316,7 +297,7 @@ class SqdlUploader:
pids = psutil.pids()
with self.connection:
c = self.connection.cursor()
tasks = self.task_queue.get_claimed_tasks(c)
tasks = task_queue.get_claimed_tasks(c)
for task in tasks:
alive = task.is_claimed_by in pids
......@@ -325,7 +306,7 @@ class SqdlUploader:
# todo: map more efficiently - collect all dead tasks, then release them all in one transaction, instead of doing a transaction per loop
with self.connection:
c = self.connection.cursor()
self.task_queue.release_task(c, task)
task_queue.release_task(c, task)
def poll(self, conn: Connection) -> None:
self.connection = conn
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment