Skip to content
Snippets Groups Projects
Commit 30b9829d authored by Daan Bijl's avatar Daan Bijl
Browse files

cleaned up database connection management between writer, exporter and...

cleaned up database connection management between writer, exporter and uploader. incorporated local database queries for exporter
parent 27c01479
No related branches found
No related tags found
No related merge requests found
This diff is collapsed.
from dataclasses import dataclass
from typing import Optional
from datetime import timedelta as TimeDelta
from typing import Optional, Tuple, List
from datetime import datetime, timedelta
from psycopg2._psycopg import cursor as Cursor
from psycopg2.extras import RealDictCursor
@dataclass
......@@ -15,7 +16,16 @@ class SyncStatus:
@dataclass
class ExportAction:
pass
uuid: int
id: Optional[int] = None
modify_count: int = 0
new_measurement: bool = False
data_changed: bool = False
completed: bool = False
update_star: bool = False
update_name: bool = False
fail_count: int = 0
resume_after: datetime = None
def export_new_measurement(c: Cursor, ct_uid: int, is_complete: bool):
......@@ -78,45 +88,169 @@ def export_changed_data(c: Cursor, ct_uid: int):
)
def get_export_action(c: Cursor) -> Optional[ExportAction]:
raise NotImplementedError()
def get_export_action(c: RealDictCursor) -> Optional[ExportAction]:
c.execute(
query="""
SELECT * FROM coretools_export_updates
WHERE resume_after < %(now)s
ORDER BY uuid LIMIT 1
""",
vars={
"now": datetime.now()
},
)
action_data = c.fetchone()
if action_data:
return ExportAction(**action_data)
else:
return None
def uuid_exists(c: Cursor, uuid) -> bool:
raise NotImplementedError()
def get_expired_export_action(c: RealDictCursor, expiration_time: datetime) -> Optional[ExportAction]:
c.execute(
query="""
SELECT uuid
FROM coretools_exported
WHERE raw_final = False
AND measurement_start_time < %(expiration_time)s
AND export_state = 1
ORDER BY uuid
LIMIT 1
""",
vars={
"expiration_time": expiration_time
},
)
data = c.fetchone()
if data:
return ExportAction(data['uuid'], completed=True)
else:
return None
def get_expired_measurement_action(c: Cursor) -> Optional[ExportAction]:
raise NotImplementedError()
def delete_export_action(c: Cursor, action: ExportAction) -> bool:
c.execute(
query="""
DELETE FROM coretools_export_updates
WHERE id = %(id)s
AND modify_count = %(modify_count)s
""",
vars={
"id": action.id,
"modify_count": action.modify_count,
}
)
return c.rowcount > 0
def set_exported(c: Cursor, measurement, path: str, is_complete: bool = False) -> None:
uuid = measurement.exp_uuid
start_time = measurement.run_timestamp
completed = measurement.completed or is_complete
def set_export_error(c: Cursor, uuid, exception, code=99) -> None:
raise NotImplementedError()
c.execute(
query="""
INSERT INTO coretools_exported
( uuid, measurement_start_time, path, export_state, raw_final )
VALUES
( %(uuid)s, %(start_time)s, %(path)s, %(export_state)s, %(raw_final)s )
ON CONFLICT (uuid) DO UPDATE
SET
uuid = %(uuid)s,
measurement_start_time = %(start_time)s,
path = %(path)s,
export_state = %(export_state)s,
raw_final = %(raw_final)s
""",
vars={
"uuid": uuid,
"start_time": start_time,
"path": path,
"export_state": 1,
"raw_final": completed,
}
)
def set_exported(c: Cursor, measurement, path: str, is_complete: bool = False) -> None:
raise NotImplementedError()
def set_resume_after(c: Cursor, action: ExportAction, wait_time: float) -> None:
now = datetime.now()
resume_after = now + timedelta(seconds=wait_time)
c.execute(
query="""
UPDATE coretools_export_updates
SET resume_after = %(resume_after)s
WHERE id = %(id)s
""",
vars={
"id": action.id,
"resume_after": resume_after,
}
)
def set_resume_after(c: Cursor, action: ExportAction, wait_time: TimeDelta) -> None:
raise NotImplementedError()
def set_export_error(c: Cursor, uuid, message, code=99) -> None:
c.execute(
query="""
INSERT INTO coretools_exported
( uuid, export_state, export_errors )
VALUES
( %(uuid)s, %(export_state)s, %(export_errors)s )
ON CONFLICT (uuid) DO UPDATE
uuid = %(uuid)s,
export_state = %(export_state)s,
export_errors = %(export_errors)s
""",
vars={
"uuid": uuid,
"export_state": code,
"export_errors": message,
}
)
def increment_fail_count(c: Cursor, action: ExportAction) -> None:
raise NotImplementedError()
c.execute(
query="""
UPDATE coretools_export_updates
SET fail_count = fail_count + 1
WHERE id = %(id)s
""",
vars={
"id": action.id
}
)
def retry_failed_exports(c: Cursor) -> None:
def get_failed_exports(c: Cursor) -> List[Tuple[int, bool]]:
c.execute(
query="""
SELECT uuid, gg
SELECT uuid, raw_final
FROM coretools_exported
WHERE export_state BETWEEN 10 AND 100
ORDER BY uuid
;
"""
)
records = c.fetchall()
if len(records) == 0:
return None
raise NotImplementedError()
return c.fetchall()
def set_retry_export(c: Cursor, uuid, is_complete: bool) -> None:
c.execute(
query="""
INSERT INTO coretools_export_updates
( uuid, data_changed, completed )
VALUES
( %(uuid)s, TRUE, %(completed)s )
ON CONFLICT ( uuid ) DO UPDATE
SET
uuid = %(uuid)s,
data_changed = TRUE,
completed = %(completed)s
""",
vars={
"uuid": uuid,
"completed": is_complete
}
)
......@@ -40,12 +40,18 @@ class SQDLWriter():
self.validate_version()
# initialise
self.exporter = Exporter(config)
self.exporter = Exporter(
cfg=config,
conn=self.connection
)
dev_mode = config.get("sqdl.dev_mode", default=True)
if dev_mode:
logger.info("Initialising SQDL Writer/Client in developer mode...")
self.uploader = Uploader(
config,
cfg=config,
conn=self.connection,
client=QDLClient(
dev_mode=config.get("sqdl.dev_mode", default=True)
dev_mode=dev_mode,
)
)
self.tick_rate = datetime.timedelta(
......@@ -66,6 +72,8 @@ class SQDLWriter():
self.database._connect()
self.connection = self.database.conn_local
self.exporter.connection = self.connection
self.uploader.connection = self.connection
self.is_running = True
self.next_tick = datetime.datetime.now() + self.tick_rate
......@@ -73,8 +81,8 @@ class SQDLWriter():
while self.is_running:
try:
self.queue_datasets_for_export()
self.exporter.poll(self.connection)
self.uploader.poll(self.connection)
self.exporter.poll()
self.uploader.poll()
except InterfaceError:
logger.warning("Connection to local database lost. Reconnecting...")
......@@ -205,3 +213,6 @@ class SQDLWriter():
self.database._connect()
self.connection = self.database.conn_local
assert self.connection.closed == 0, "failed to reconnect"
self.exporter.connection = self.connection
self.uploader.connection = self.connection
......@@ -308,20 +308,14 @@ class SqdlUploader:
c = self.connection.cursor()
task_queue.release_task(c, task)
def poll(self, conn: Connection) -> None:
self.connection = conn
# NOTE: KeyboardInterrupt and SystemExit will not be caught.
try:
work_done = self.process_task()
if not work_done:
self.idle_cnt += 1
if self.idle_cnt % 300 == 0:
logger.info('Nothing to upload')
else:
self.idle_cnt = 0
except Exception:
# anticipated causes: database connection failure when trying to get task.
logger.error('Task processing failed', exc_info=True)
def poll(self) -> None:
work_done = self.process_task()
if not work_done:
self.idle_cnt += 1
if self.idle_cnt % 300 == 0:
logger.info('Nothing to upload')
else:
self.idle_cnt = 0
def fix_filename(filename):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment