Skip to content
Snippets Groups Projects
Commit ecad5416 authored by Daan Bijl's avatar Daan Bijl
Browse files

draft implementation sqdl-writer

parent a9e8fb9f
No related branches found
No related tags found
No related merge requests found
Showing
with 1027 additions and 482 deletions
...@@ -5,21 +5,25 @@ import time ...@@ -5,21 +5,25 @@ import time
from collections.abc import Mapping from collections.abc import Mapping
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Optional, Tuple from typing import Optional, Tuple, Dict
import numpy as np import numpy as np
import psycopg2 import psycopg2
from psycopg2._psycopg import connection as Connection
import core_tools as ct import core_tools as ct
from core_tools.startup.config import get_configuration from core_tools.startup.config import get_configuration
from core_tools.data.ds.data_set import load_by_uuid from core_tools.data.ds.data_set import load_by_uuid
from core_tools.data.export.psql_commands import SqlConnection from core_tools.data.sqdl.export.psql_commands import SqlConnection
from core_tools.data.utils.timer import Timer from core_tools.data.utils.timer import Timer
from core_tools.data.export.data_export import export_data from core_tools.data.sqdl.export.data_export import export_data
from core_tools.data.export.data_preview import generate_previews from core_tools.data.sqdl.export.data_preview import generate_previews
from core_tools.data.sqdl.uploader_db import UploaderDb # from core_tools.data.sqdl.uploader_db import UploaderDb
from core_tools.data.sqdl.uploader_task_queue import UploaderTaskQueue, DatasetLocator
from core_tools.data.sqdl.model.task_queue import TaskQueueOperations, DatasetInfo
# from core_tools.data.sqdl.uploader_task_queue import UploaderTaskQueue, DatasetLocator
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -51,13 +55,17 @@ class SqdlUpdate: ...@@ -51,13 +55,17 @@ class SqdlUpdate:
class Exporter: class Exporter:
def __init__(self, cfg): def __init__(self, cfg: Dict):
self.cfg = cfg self.cfg = cfg
self.export_path = cfg.get('export.path') self.export_path = cfg.get('export.path')
self.inter_ds_delay = float(cfg.get('export.delay')) self.inter_ds_delay = float(cfg.get('export.delay'))
self.connection = SqlConnection() self.connection = SqlConnection()
self.uploader_db = UploaderDb(cfg) # self.uploader_db = UploaderDb(cfg)
self.uploader_queue = UploaderTaskQueue(self.uploader_db) # self.connection = self.uploader_db.engine.connect()
# self.uploader_queue = UploaderTaskQueue(self.uploader_db)
self.uploader = TaskQueueOperations()
self.scopes = cfg.get('export.scopes', {}) self.scopes = cfg.get('export.scopes', {})
self.setup_name_corrections = cfg.get('export.setup_name_corrections', {}) self.setup_name_corrections = cfg.get('export.setup_name_corrections', {})
...@@ -66,10 +74,10 @@ class Exporter: ...@@ -66,10 +74,10 @@ class Exporter:
self.process = psutil.Process() self.process = psutil.Process()
def poll(self) -> None: def poll(self, conn: Connection) -> None:
try: try:
self.loop_count += 1 self.loop_count += 1
done_work = self.export_one() done_work = self.export_one(conn)
if not done_work: if not done_work:
if self.no_action_count == 0: if self.no_action_count == 0:
self.timer.log_times() self.timer.log_times()
...@@ -93,7 +101,7 @@ class Exporter: ...@@ -93,7 +101,7 @@ class Exporter:
except Exception: except Exception:
logger.error("Unanticipated error", exc_info=True) logger.error("Unanticipated error", exc_info=True)
def export_one(self): def export_one(self, conn: Connection):
self.timer = Timer() self.timer = Timer()
self.timer.time('query actions') self.timer.time('query actions')
...@@ -126,7 +134,7 @@ class Exporter: ...@@ -126,7 +134,7 @@ class Exporter:
sqdl_update, ds_path = self.export_measurement(ds, action) sqdl_update, ds_path = self.export_measurement(ds, action)
sqdl_update.update_star |= action.update_star sqdl_update.update_star |= action.update_star
sqdl_update.update_name |= action.update_name sqdl_update.update_name |= action.update_name
self.add_sqdl_update(sqdl_update, ds_path) self.add_sqdl_update(conn, sqdl_update, ds_path)
self.set_exported(ds, ds_path, action.completed) self.set_exported(ds, ds_path, action.completed)
if action.id is not None: if action.id is not None:
# id is None for expired measurements # id is None for expired measurements
...@@ -315,14 +323,25 @@ class Exporter: ...@@ -315,14 +323,25 @@ class Exporter:
''', ''',
) )
def add_sqdl_update(self, sqdl_update: SqdlUpdate, ds_path: str) -> None: def add_sqdl_update(self, conn: Connection, sqdl_update: SqdlUpdate, ds_path: str) -> None:
ds_locator = DatasetLocator(sqdl_update.scope, uid=sqdl_update.uuid, path=ds_path) # ds_locator = DatasetLocator(sqdl_update.scope, uid=sqdl_update.uuid, path=ds_path)
if sqdl_update.upload_dataset or sqdl_update.upload_raw_data: ds_info = DatasetInfo(
self.uploader_queue.update_dataset(ds_locator, final=sqdl_update.raw_final) scope=sqdl_update.scope,
if sqdl_update.update_star: uid=sqdl_update.uuid,
self.uploader_queue.update_rating(ds_locator) path=ds_path
if sqdl_update.update_name: )
self.uploader_queue.update_name(ds_locator)
with conn:
c = conn.cursor()
if sqdl_update.upload_dataset or sqdl_update.upload_raw_data:
# self.uploader_queue.update_dataset(ds_locator, final=sqdl_update.raw_final)
self.uploader.update_dataset(c, dsi=ds_info, is_finished=sqdl_update.raw_final)
if sqdl_update.update_star:
# self.uploader_queue.update_rating(ds_locator)
self.uploader.update_rating(c, dsi=ds_info)
if sqdl_update.update_name:
# self.uploader_queue.update_name(ds_locator)
self.uploader.update_name(c, dsi=ds_info)
@property @property
def measurement_expiration_time(self): def measurement_expiration_time(self):
...@@ -354,7 +373,6 @@ class Exporter: ...@@ -354,7 +373,6 @@ class Exporter:
scope = self.get_scope(measurement) scope = self.get_scope(measurement)
measurement.set_up = self.fix_setup_name(measurement.set_up) measurement.set_up = self.fix_setup_name(measurement.set_up)
updates = SqdlUpdate(measurement.exp_uuid, scope, raw_final=action.completed) updates = SqdlUpdate(measurement.exp_uuid, scope, raw_final=action.completed)
updates.scope = scope
try: try:
dsx, ds_path, var_descr = export_data( dsx, ds_path, var_descr = export_data(
measurement, measurement,
......
import logging
from core_tools.data.SQL.SQL_connection_mgr import SQL_database_manager from core_tools.data.SQL.SQL_connection_mgr import SQL_database_manager
from psycopg2.extras import RealDictCursor from psycopg2.extras import RealDictCursor
logger = logging.getLogger(__name__)
class SqlConnection: class SqlConnection:
def __init__(self, remote=False): def __init__(self, remote=False):
...@@ -18,7 +23,8 @@ class SqlConnection: ...@@ -18,7 +23,8 @@ class SqlConnection:
res = cur.fetchall() res = cur.fetchall()
cur.close() cur.close()
return res return res
except Exception: except Exception as err:
logger.error("error in export database connection: {}".format(err))
connection.close() connection.close()
raise raise
......
from datetime import datetime
from typing import Optional
from sqlalchemy import BigInteger, ForeignKey, DateTime, Index
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import relationship
# declarative base class
class Base(DeclarativeBase):
pass
class UploadTask(Base):
__tablename__ = "upload_task"
id: Mapped[int] = mapped_column(primary_key=True)
version_id: Mapped[int] = mapped_column(nullable=False)
scope: Mapped[str | None]
uid: Mapped[int] = mapped_column(BigInteger, nullable=False, unique=True)
ds_path: Mapped[str]
''' client-app unique id '''
update_dataset: Mapped[bool] = mapped_column(default=False)
''' dataset is new or has changed '''
set_raw_final: Mapped[bool] = mapped_column(default=False)
''' raw file can be uploaded and finalized '''
update_name: Mapped[bool] = mapped_column(default=False)
''' compare name of dataset with name in SQDL and update '''
update_rating: Mapped[bool] = mapped_column(default=False)
''' compare rating of dataset with rating in SQDL and update '''
failed: Mapped[bool] = mapped_column(default=False)
retry: Mapped[bool] = mapped_column(default=False)
claimed_by: Mapped[Optional[int]] = mapped_column(nullable=True)
__mapper_args__ = {"version_id_col": version_id}
__table_args__ = (
Index('upload_claimed_retry', claimed_by, retry),
Index('upload_claimed_failed', claimed_by, failed),
)
def __repr__(self):
args = []
args.append(f'id={self.id}')
args.append(f'version_id={self.version_id}')
args.append(f'scope={self.scope}')
args.append(f'uid={self.uid}')
args.append(f'ds_path="{self.ds_path}"')
if self.update_dataset:
args.append('update_dataset=True')
if self.set_raw_final:
args.append('set_raw_final=True')
if self.update_name:
args.append('update_name=True')
if self.update_rating:
args.append('update_rating=True')
if self.failed:
args.append('failed=True')
if self.retry:
args.append('retry=True')
if self.claimed_by:
args.append(f"claimed_by: {self.claimed_by}")
return 'UploadTask(' + ', '.join(args) + ')'
class UploadedDataset(Base):
__tablename__ = "uploaded_dataset"
id: Mapped[int] = mapped_column(primary_key=True)
scope: Mapped[str]
uid: Mapped[int] = mapped_column(BigInteger, nullable=False)
sqdl_uuid: Mapped[str] = mapped_column(nullable=False, unique=True)
files: Mapped[list["UploadedFile"]] = relationship(back_populates="dataset")
def __repr__(self):
return f'UploadedDataset(scope={self.scope}, uid={self.uid}, sqld_uuid={self.sqld_uuid})'
class UploadedFile(Base):
__tablename__ = "uploaded_file"
id: Mapped[int] = mapped_column(primary_key=True)
dataset_id: Mapped[int] = mapped_column(ForeignKey("uploaded_dataset.id"), index=True) # link to dataset
sqdl_uuid: Mapped[str] = mapped_column(nullable=False, unique=True)
filename: Mapped[str]
st_mtime_us: Mapped[int] = mapped_column(BigInteger)
dataset: Mapped["UploadedDataset"] = relationship(back_populates="files")
def __repr__(self):
mtime = datetime.fromtimestamp(self.st_mtime_us / 1e6)
return f'UploadedFile(filename={self.filename}, mtime={mtime}, sqdl_uuid={self.sqdl_uuid})'
class UploadLog(Base):
__tablename__ = "upload_log"
id: Mapped[int] = mapped_column(primary_key=True)
scope: Mapped[str | None]
ds_uid: Mapped[int] = mapped_column(BigInteger)
upload_time: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)
message: Mapped[str] # new dataset, uploaded files, exception xxx.
def __repr__(self):
return f'Log(scope={self.scope}, uid={self.ds_uid}, t={self.upload_time}: {self.message})'
def create_database(engine):
Base.metadata.create_all(engine)
from typing import List
from psycopg2._psycopg import cursor as Cursor
def core_method(message: str):
print("from core: {}".format(message))
class CoreOperations:
def get_data_to_sync(self, c: Cursor) -> List[int]:
c.execute(
query="""
SELECT uuid
FROM global_measurement_overview
WHERE NOT data_synchronized
;
"""
)
records = c.fetchall()
return [r[0] for r in records]
def get_table_to_sync(self, c: Cursor) -> List[int]:
c.execute(
query="""
SELECT uuid
FROM global_measurement_overview
WHERE NOT table_synchronized
;
"""
)
records = c.fetchall()
return [r[0] for r in records]
def set_data_as_synced(self, c: Cursor, ct_uid: int) -> bool:
c.execute(
query="""
UPDATE global_measurement_overview
SET data_synchronized = TRUE
WHERE uuid = %(uid)s;
""",
vars={
"uid": ct_uid
}
)
return c.rowcount == 1
def set_table_as_synced(self, c: Cursor, ct_uid: int) -> bool:
c.execute(
query="""
UPDATE global_measurement_overview
SET table_synchronized = TRUE
WHERE uuid = %(uid)s;
""",
vars={
"uid": ct_uid
}
)
return c.rowcount == 1
from dataclasses import dataclass
from typing import Optional
from datetime import timedelta as TimeDelta
from psycopg2._psycopg import cursor as Cursor
@dataclass
class Metadata:
is_new: bool
changed_name: bool = False
changed_rating: bool = False
is_complete: bool = False
@dataclass
class ExportAction:
pass
class ExportOperations:
def export_new_measurement(self, c: Cursor, ct_uid: int, is_complete: bool):
c.execute(
query="""
INSERT INTO coretools_export_updates (
uuid, new_measurement, data_changed, completed
) VALUES (
%(uid)s, TRUE, TRUE, %(completed)s
) ON CONFLICT ( uuid ) DO UPDATE SET
modify_count = coretools_export_updates.modify_count + 1,
new_measurement = TRUE,
completed = %(completed)s;
""",
vars={
"uid": ct_uid,
"completed": is_complete,
}
)
def export_changed_measurement(self, c: Cursor, ct_uid: int, meta: Metadata):
c.execute(
query="""
INSERT INTO coretools_export_updates (
uuid, update_star, update_name, completed
) VALUES (
%(uuid)s, %(update-star)s, %(name-changed)s, %(completed)s
) ON CONFLICT ( uuid ) DO UPDATE SET
modify_count = coretools_export_updates.modify_count + 1,
update_star = coretools_export_updates.update_star OR %(star-changed)s,
update_name = coretools_export_updates.update_name OR %(name-changed)s,
completed = %(completed)s;
""",
vars={
"uuid": ct_uid,
"star-changed": meta.changed_rating, # star value in global-overview not equal to new value
"name-changed": meta.changed_name, # experiment name value in global-overview not equal to new value
"completed": meta.is_complete
}
)
def export_changed_data(self, c: Cursor, ct_uid: int):
c.execute(
query="""
INSERT INTO coretools_export_updates (
uuid, data_changed
) VALUES (
%(uid)s, TRUE
)
ON CONFLICT ( uuid ) DO UPDATE SET
modify_count = coretools_export_updates.modify_count + 1,
data_changed = TRUE
;
""",
vars={
"uid": ct_uid
}
)
def get_export_action(self, c: Cursor) -> Optional[ExportAction]:
raise NotImplementedError()
def uuid_exists(self, c: Cursor, uuid) -> bool:
raise NotImplementedError()
def get_expired_measurement_action(self, c: Cursor) -> Optional[ExportAction]:
raise NotImplementedError()
def set_export_error(self, c: Cursor, uuid, exception, code=99) -> None:
raise NotImplementedError()
def set_exported(self, c: Cursor, measurement, path: str, is_complete: bool = False) -> None:
raise NotImplementedError()
def set_resume_after(self, c: Cursor, action: ExportAction, wait_time: TimeDelta) -> None:
raise NotImplementedError()
def increment_fail_count(self, c: Cursor, action: ExportAction) -> None:
raise NotImplementedError()
def retry_failed_exports(self, c: Cursor) -> None:
c.execute(
query="""
SELECT uuid, gg
FROM coretools_exported
WHERE export_state BETWEEN 10 AND 100
ORDER BY uuid
;
"""
)
records = c.fetchall()
if len(records) == 0:
return None
raise NotImplementedError()
from datetime import datetime
from typing import Optional
from dataclasses import dataclass
from psycopg2._psycopg import cursor as Cursor
@dataclass
class UploadLog:
index: int # defined by database (generated index)
scope: Optional[str]
ct_uid: int
upload_time: datetime
message: str
class LogOperations:
def log(self, c: Cursor, scope: str, ct_uid: int, message: str):
"""
Insert a message into the 'upload_log' table.
"""
query = """
INSERT INTO upload_log (
scope,
ct_uid,
upload_timestamp,
message
) VALUES (
%(scope)s,
%(uid)s,
%(ts)s,
%(msg)s
);
"""
values = {
"scope": scope,
"uid": ct_uid,
"ts": datetime.now(),
"msg": message,
}
c.execute(query=query, vars=values)
import logging
from typing import Optional, List, Dict
from dataclasses import dataclass
from psycopg2._psycopg import cursor as Cursor
logger = logging.getLogger(__name__)
@dataclass
class DatasetInfo:
scope: str
uid: int
path: str
@dataclass
class UploadTask:
idx: int # defined by database (generated index)
task_iteration: int
scope: Optional[str]
coretools_uid: int
dataset_path: str
update_dataset: bool = False
update_name: bool = False
update_rating: bool = False
is_ready: bool = False
has_failed: bool = False
should_retry: bool = False
is_claimed_by: Optional[int] = None
class TaskQueueOperations:
def get_tasks(self, c: Cursor) -> List[UploadTask]:
c.execute(
query="""
SELECT idx, task_iteration, scope, coretools_uid, dataset_path, update_dataset, update_name, update_rating, is_ready, has_failed, should_retry, is_claimed_by
FROM upload_task_queue
;
"""
)
results = c.fetchall()
return [UploadTask(*r) for r in results]
def claim_oldest_task(self, c: Cursor, pid: int) -> Optional[UploadTask]:
c.execute(
query="""
SELECT idx, task_iteration, scope, coretools_uid, dataset_path, update_dataset, update_name, update_rating, is_ready, has_failed, should_retry, is_claimed_by
FROM upload_task_queue
WHERE NOT has_failed AND is_claimed_by IS NULL
ORDER BY idx
LIMIT 1
;
"""
)
result = c.fetchone()
if result is None:
return None
task = UploadTask(*result)
if not self.claim_task(c, task, pid):
return None
return task
def claim_newest_retry_task(self, c: Cursor, pid: int) -> Optional[UploadTask]:
c.execute(
query="""
SELECT idx, task_iteration, scope, coretools_uid, dataset_path, update_dataset, update_name, update_rating, is_ready, has_failed, should_retry, is_claimed_by
FROM upload_task_queue
WHERE should_retry AND is_claimed_by IS NULL
ORDER BY idx DESC
LIMIT 1
;
""",
vars={
"pid": pid
}
)
result = c.fetchone()
if result is None:
return None
task = UploadTask(*result)
if not self.claim_task(c, task, pid):
return None
return task
def claim_task(self, c: Cursor, task: UploadTask, pid: int) -> bool:
c.execute(
query="""
UPDATE upload_task_queue
SET
task_iteration = task_iteration + 1,
is_claimed_by = %(pid)s
WHERE
idx = %(idx)s
AND task_iteration = %(iter)s
;
""",
vars={
"pid": pid,
"idx": task.idx,
"iter": task.task_iteration
}
)
if c.rowcount != 1:
logger.warning("Failed to claim task with uid '{}'".format(task.coretools_uid))
return False
task.task_iteration += 1
return True
def release_task(self, c: Cursor, task: UploadTask) -> None:
c.execute(
query="""
UPDATE upload_task_queue
SET is_claimed_by = NULL
WHERE idx = %(idx)s
;
""",
vars={
"idx": task.idx,
}
)
if c.rowcount != 1:
logger.warning("Failed to release task with uid '{}'".format(task.coretools_uid))
def get_claimed_tasks(self, c: Cursor) -> List[UploadTask]:
c.execute(
query="""
SELECT idx, task_iteration, scope, coretools_uid, dataset_path, update_dataset, update_name, update_rating, is_ready, has_failed, should_retry, is_claimed_by
FROM upload_task_queue
WHERE is_claimed_by IS NOT NULL
ORDER BY idx
;
"""
)
results = c.fetchall()
return [UploadTask(*r) for r in results]
def add_dataset(self, c: Cursor, dsi: DatasetInfo, is_finished: bool = False) -> None:
# update_dataset = True
c.execute(
query="""
INSERT INTO upload_task_queue
( task_iteration, scope, coretools_uid, dataset_path, update_dataset, is_ready )
VALUES
( 1, %(scope)s, %(uid)s, %(path)s, TRUE, %(is_finished)s )
ON CONFLICT (coretools_uid) DO UPDATE SET
task_iteration = excluded.task_iteration + 1,
update_dataset = TRUE,
is_ready = %(is_finished)s
;
""",
vars={
"scope": dsi.scope,
"uid": dsi.uid,
"path": dsi.path,
"is_finished": is_finished
}
)
def update_dataset(self, c: Cursor, dsi: DatasetInfo, is_finished: bool = False) -> None:
# update_dataset = True
# has_failed = False
c.execute(
query="""
INSERT INTO upload_task_queue
( task_iteration, scope, coretools_uid, dataset_path, update_dataset, has_failed, is_ready )
VALUES
( 1, %(scope)s, %(uid)s, %(path)s, TRUE, FALSE, %(is_finished)s )
ON CONFLICT (coretools_uid) DO UPDATE SET
task_iteration = excluded.task_iteration + 1,
update_dataset = TRUE,
has_failed = FALSE,
is_ready = %(is_finished)s
;
""",
vars={
"scope": dsi.scope,
"uid": dsi.uid,
"path": dsi.path,
"is_finished": is_finished
}
)
def reload_dataset(self, c: Cursor, dsi: DatasetInfo, is_finished: bool = False) -> None:
# update_dataset = True
# should_retry = True
c.execute(
query="""
INSERT INTO upload_task_queue
( task_iteration, scope, coretools_uid, dataset_path, update_dataset, should_retry, is_ready )
VALUES
( 1, %(scope)s, %(uid)s, %(path)s, TRUE, TRUE, %(is_finished)s )
ON CONFLICT (coretools_uid) DO UPDATE SET
task_iteration = excluded.task_iteration + 1,
update_dataset = TRUE,
should_retry = TRUE,
is_ready = %(is_finished)s
;
""",
vars={
"scope": dsi.scope,
"uid": dsi.uid,
"path": dsi.path,
"is_finished": is_finished
}
)
def update_name(self, c: Cursor, dsi: DatasetInfo) -> None:
# update_name = True
c.execute(
query="""
INSERT INTO upload_task_queue
( task_iteration, scope, coretools_uid, dataset_path, update_name )
VALUES
( 1, %(scope)s, %(uid)s, %(path)s, TRUE )
ON CONFLICT (coretools_uid) DO UPDATE SET
task_iteration = excluded.task_iteration + 1,
update_name = TRUE,
;
""",
vars={
"scope": dsi.scope,
"uid": dsi.uid,
"path": dsi.path,
}
)
def update_rating(self, c: Cursor, dsi: DatasetInfo) -> None:
# update_rating = True
c.execute(
query="""
INSERT INTO upload_task_queue
( task_iteration, scope, coretools_uid, dataset_path, update_rating )
VALUES
( 1, %(scope)s, %(uid)s, %(path)s, TRUE )
ON CONFLICT (coretools_uid) DO UPDATE SET
task_iteration = excluded.task_iteration + 1,
update_rating = TRUE,
;
""",
vars={
"scope": dsi.scope,
"uid": dsi.uid,
"path": dsi.path,
}
)
def delete_task(self, c: Cursor, task: UploadTask) -> bool:
c.execute(
query="""
DELETE FROM upload_task_queue
WHERE idx = %(idx)s
AND task_iteration = %(iter)s
;
""",
vars={
"idx": task.idx,
"iter": task.task_iteration
}
)
return c.rowcount == 1
def get_all_failed_tasks(self, c: Cursor) -> List[UploadTask]:
c.execute(
query="""
SELECT
idx, task_iteration, scope, coretools_uid, dataset_path, update_dataset, update_name, update_rating, is_ready, has_failed, should_retry, is_claimed_by
FROM upload_task_queue
WHERE has_failed
ORDER BY idx
"""
)
result = c.fetchall()
return [UploadTask(*r) for r in result]
def set_failed(self, c: Cursor, task: UploadTask) -> None:
c.execute(
query="""
UPDATE upload_task_queue
SET
task_iteration = task_iteration + 1,
has_failed = TRUE,
should_retry = FALSE,
is_claimed_by = NULL
WHERE
idx = %(idx)s
AND task_iteration = %(iter)s
;
""",
vars={
"idx": task.idx,
"iter": task.task_iteration
}
)
if c.rowcount == 0:
self.release_task(c, task)
def retry_all_failed(self, c: Cursor) -> None:
c.execute(
query="""
UPDATE upload_task_queue
SET
task_iteration = task_iteration + 1,
should_retry = TRUE
WHERE
has_failed = TRUE
;
""",
vars={
}
)
def get_counts(self, c: Cursor) -> Dict[str, int]:
c.execute(
query="""
SELECT failed, retry, COUNT(*)
FROM upload_task_queue
GROUP BY failed, retry
;
"""
)
records = c.fetchall()
result = {}
for failed, retry, count in records:
result[(failed, retry)] = count
return {
"new": result.get((False, False), default=0),
"reload": result.get((False, True), default=0),
"failed": result.get((True, False), default=0),
"retry": result.get((True, True), default=0)
}
import logging
from dataclasses import dataclass
from typing import List, Dict, Tuple
from uuid import UUID
from psycopg2._psycopg import cursor as Cursor, IntegrityError
logger = logging.getLogger(__name__)
@dataclass
class SQDLFile:
index: int # defined by database (generated index)
dataset_index: int # defined by database (foreign key)
sqdl_uuid: int
filename: str
last_modified: int # unix timestamp in microseconds (st_mtime_ns // 1000)
@dataclass
class SQDLDataset:
index: int # defined by database (generated index)
scope: str
sqdl_uuid: int
files: List[SQDLFile]
class UploadOperations:
def create_dataset(self, c: Cursor, scope: str, ct_uid: int, sqdl_uuid: UUID) -> int:
"""
Create new dataset entry in the 'sqdl_dataset' table. Return the row index of th new entry. If an entry with the specified UUID already exists, return the row index of the existing entry instead.
"""
try:
c.execute(
query="""
INSERT INTO sqdl_dataset (
scope, coretools_uid, sqdl_uuid
) VALUES (
%(scope)s, %(uid)s, %(uuid)s
)
RETURNING idx;
""",
vars={
"scope": scope,
"uid": ct_uid,
"uuid": sqdl_uuid,
}
)
index = c.fetchone()[0]
return index
except IntegrityError:
c.execute(
query="""
SELECT idx
FROM sqdl_dataset
WHERE sqdl_uuid = %(uuid)s
""",
vars={
"uuid": sqdl_uuid
}
)
index = c.fetchone()[0]
logger.warning("Dataset with UUID '{}' already exists in table 'sqdl_dataset' at index '{}'.".format(sqdl_uuid, index))
return index
def get_counts(self, c: Cursor) -> Dict[str, int]:
"""
"""
counts = {}
c.execute(query="SELECT COUNT(idx) FROM sqdl_dataset;")
counts["uploaded-datasets"] = c.fetchone()[0]
c.execute(query="SELECT COUNT(idx) FROM sqdl_file;")
counts["uploaded-files"] = c.fetchone()[0]
return counts
def get_files_for_dataset(self, c: Cursor, parent_idx: int) -> List[SQDLFile]:
"""
Get all the SQDLFile entries associated with the SQDLDataset that has the provided index.
"""
def parse_row(row: Tuple) -> SQDLFile:
return SQDLFile(
index=row[0],
dataset_index=row[1],
sqdl_uuid=row[2],
filename=row[3],
last_modified=[4]
)
c.execute(
query="""
SELECT idx, dataset_index, sqdl_uuid, filename, last_modified
FROM sqdl_file
WHERE dataset_index = %(parent)s;
""",
vars={"parent": parent_idx}
)
records = c.fetchall()
return [parse_row(r) for r in records]
def create_or_update_file(self, c: Cursor, parent_idx: int, sqdl_uuid: UUID, filename: str, last_modified: int) -> None:
"""
Create new SQDLFile entry. If entry with UUID already exists, update the last-modified timestamp instead.
"""
# todo: original uploader registry also updates uuid on conflict, but uuid is at all times the conflicting column, so that should do nothing.
c.execute(
query="""
INSERT INTO sqdl_file (
dataset_index, sqdl_uuid, filename, last_modified
) VALUES (
%(idx)s, %(uuid)s, %(fn)s, %(lm)s
) ON CONFLICT (sqdl_uuid) DO UPDATE SET
last_modified = %(lm)s
;
""",
vars={
"idx": parent_idx,
"uuid": sqdl_uuid,
"fn": filename,
"lm": last_modified,
}
)
from psycopg2._psycopg import cursor as Cursor
class VersionOperations:
"""
"""
def read(self, c: Cursor) -> str:
"""
"""
query = """
SELECT major, minor, patch
FROM coretools_version;
"""
c.execute(query=query)
record = c.fetchall()
assert len(record) == 1, "Either no or more that one database version exists"
return "{}.{}.{}".format(*record[0])
CREATE TABLE IF NOT EXISTS global_measurement_overview
id SERIAL,
uuid BIGINT NOT NULL unique,
exp_name text NOT NULL,
set_up text NOT NULL,
project text NOT NULL,
sample text NOT NULL,
creasted_by text NOT NULL, -- database account used when ds was created
start_time TIMESTAMP,
stop_time TIMESTAMP,
exp_data_location text, -- Database table name of parameter table. Older datasets. [SdS]
snapshot BYTEA,
metadata BYTEA,
keywords JSONB,
starred BOOL DEFAULT False,
completed BOOL DEFAULT False,
data_size int, -- Total size of data. Is written at finish.
data_cleared BOOL DEFAULT False, -- Note [SdS]: Column is not used
data_update_count int DEFAULT 0, -- number of times the data has been updated on local client
data_synchronized BOOL DEFAULT False, -- data + param table sync'd
table_synchronized BOOL DEFAULT False, -- global_measurements_overview sync'd
sync_location text); -- Note [SdS]: Column is abused for migration to new measurement_parameters table
CREATE INDEX IF NOT EXISTS id_indexed ON global_measurement_overview USING BTREE (id);
CREATE INDEX IF NOT EXISTS uuid_indexed ON global_measurement_overview USING BTREE (uuid);
CREATE INDEX IF NOT EXISTS starred_indexed ON global_measurement_overview USING BTREE (starred);
CREATE INDEX IF NOT EXISTS date_day_index ON global_measurement_overview USING BTREE (project, set_up, sample);
CREATE INDEX IF NOT EXISTS data_synced_index ON global_measurement_overview USING BTREE (data_synchronized);
CREATE INDEX IF NOT EXISTS table_synced_index ON global_measurement_overview USING BTREE (table_synchronized);
-- additions
---
CREATE TABLE coretools_export_updates (
id INT GENERATED ALWAYS AS IDENTITY,
uuid BIGINT NOT NULL UNIQUE,
modify_count INT DEFAULT 0,
new_measurement BOOLEAN DEFAULT FALSE, -- not really needed, but overwrite all data.
data_changed BOOLEAN DEFAULT FALSE, -- export previews, export metadata.
completed BOOLEAN DEFAULT FALSE, -- export raw data
update_star BOOLEAN DEFAULT FALSE, -- compare with exported metadata.
update_name BOOLEAN DEFAULT FALSE, -- compare with exported metadata.
resume_after timestamp DEFAULT '2020-01-01 00:00:00',
fail_count INT DEFAULT 0,
PRIMARY KEY(id)
);
CREATE INDEX IF NOT EXISTS qdl_export_updates_uuid_index ON coretools_export_updates USING BTREE (uuid);
---
CREATE TABLE coretools_exported (
id INT GENERATED ALWAYS AS IDENTITY,
uuid BIGINT NOT NULL UNIQUE,
path TEXT,
measurement_start_time timestamp, -- export raw after timeout and not completed.
raw_final BOOLEAN DEFAULT FALSE, -- Set when completed or after timeout.
-- export state
export_state INT DEFAULT 0, -- (0:todo, 1:done, 99: failed),
export_errors TEXT,
PRIMARY KEY(id)
);
CREATE INDEX IF NOT EXISTS coretools_exported_uuid_index ON coretools_exported USING BTREE (uuid);
---
CREATE TABLE upload_task_queue (
idx INT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
task_iteration INT NOT NULL,
scope TEXT,
coretools_uid BIGINT UNIQUE NOT NULL,
dataset_path TEXT NOT NULL,
update_dataset BOOLEAN NOT NULL DEFAULT FALSE,
update_name BOOLEAN NOT NULL DEFAULT FALSE,
update_rating BOOLEAN NOT NULL DEFAULT FALSE,
is_ready BOOLEAN NOT NULL DEFAULT FALSE,
has_failed BOOLEAN NOT NULL DEFAULT FALSE,
should_retry BOOLEAN NOT NULL DEFAULT FALSE,
is_claimed_by INT
);
CREATE INDEX ON upload_task_queue (coretools_uid);
CREATE INDEX ON upload_task_queue (is_claimed_by, has_failed);
CREATE INDEX ON upload_task_queue (is_claimed_by, should_retry);
---
CREATE TABLE coretools_version (
major SMALLINT UNIQUE NOT NULL,
minor SMALLINT UNIQUE NOT NULL,
patch SMALLINT UNIQUE NOT NULL
);
---
CREATE TABLE upload_log (
idx INT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
scope TEXT,
ct_uid BIGINT NOT NULL,
upload_timestamp TIMESTAMP NOT NULL,
message TEXT NOT NULL CHECK (message <> '')
);
CREATE INDEX ON upload_log (idx);
---
CREATE TABLE sqdl_dataset (
idx INT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
scope TEXT NOT NULL,
coretools_uid BIGINT NOT NULL,
sqdl_uuid UUID NOT NULL
);
CREATE INDEX ON sqdl_dataset (sqdl_uuid);
---
CREATE TABLE sqdl_file (
idx INT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
dataset_index INT REFERENCES sqdl_dataset (idx),
sqdl_uuid UUID UNIQUE NOT NULL,
filename TEXT NOT NULL,
last_modified INT NOT NULL
);
CREATE INDEX ON sqdl_file (dataset_index);
CREATE INDEX ON sqdl_file (sqdl_uuid);
---
...@@ -30,9 +30,9 @@ class DatasetInfo: ...@@ -30,9 +30,9 @@ class DatasetInfo:
class DatasetReader: class DatasetReader:
def __init__(self, scope_name: str | None = None): def __init__(self, scope_name: str | None = None):
client = QDLClient() client = QDLClient(dev_mode=True)
self.client = client self.client = client
client.login() # client.login()
self.s3_session = requests.Session() self.s3_session = requests.Session()
if scope_name: if scope_name:
self.set_scope(scope_name) self.set_scope(scope_name)
......
This diff is collapsed.
from core_tools.data.sqdl.model import UploadLog
from core_tools.data.sqdl.uploader_db import UploaderDb
class UploadLogger:
def __init__(self, db: UploaderDb):
self.db = db
def log(self, scope, ds_uid, message):
with self.db.session() as session:
session.add(
UploadLog(
scope=scope,
ds_uid=ds_uid,
message=message
)
)
session.commit()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment