From 62e881e597c0aea298c66d7ca585068b72407605 Mon Sep 17 00:00:00 2001 From: Nicola Stoira <nicola.stoira@accenture.com> Date: Wed, 12 Jun 2024 14:10:27 +0200 Subject: [PATCH 1/4] Add project name print statement and pre-checks execution times in results --- lib/connector.py | 6 +++-- lib/pre_checks.py | 63 ++++++++++++++++++++++++++++------------------- 2 files changed, 42 insertions(+), 27 deletions(-) diff --git a/lib/connector.py b/lib/connector.py index 43cbdf6a..30096c83 100644 --- a/lib/connector.py +++ b/lib/connector.py @@ -499,7 +499,7 @@ outputfolder={output_folder} run_uuid (str): UUID of the run dag_run_config (dict): configuration for the running Airflow dag """ - + print(f"Running pre-checks/de-identification for project '{project_name}'") connector = Connector(project=project_name) patients_count = {"processed_patients": 0, "failed_patients": 0, "patients_with_warnings": 0} step = Steps.PRE_CHECK_DE_ID @@ -686,7 +686,7 @@ outputfolder={output_folder} run_uuid (str): UUID of the run dag_run_config (dict): configuration for the running Airflow dag """ - + print(f"Running integration for project '{project_name}'") connector = Connector(project=project_name) patients_count = {"processed_patients": 0, "failed_patients": 0} patients_with_warnings = 0 @@ -1251,6 +1251,7 @@ outputfolder={output_folder} severity_level (QCSeverityLevel): QC severity level revalidate (bool): revalidate all the data """ + print(f"Running validation for project '{project_name}'") connector = Connector(project=project_name) einstein = connector.database.is_einstein_activated(project=project_name) and connector.config.einstein patients_count = {'processed_patients': 0, 'failed_patients': 0, 'patients_with_warnings': 0} @@ -1397,6 +1398,7 @@ outputfolder={output_folder} run_uuid (str): UUID of the run profile (str): statistics profile """ + print(f"Running statistics for project '{project_name}'") connector = Connector(project=project_name) patients_count = {'processed_patients': 0, 'failed_patients': 0} patients_with_warnings = 0 diff --git a/lib/pre_checks.py b/lib/pre_checks.py index b366737d..cadb0e9c 100644 --- a/lib/pre_checks.py +++ b/lib/pre_checks.py @@ -21,6 +21,7 @@ from jsonschema import FormatChecker import sys import os import rfc3987 +from datetime import datetime sys.path.append(os.path.abspath(os.path.dirname(__file__))) from enums import Buckets, PreCheckLevel, PreCheckStatus, PreCheckType @@ -161,20 +162,22 @@ class PreCheckResult(object): """ Class defining Pre-Check Result object """ - def __init__(self, status: PreCheckStatus, check: PreCheck, message: str = None): + def __init__(self, status: PreCheckStatus, check: PreCheck, start_time: datetime, execution_time: int, message: str = None): """Class constructor Args: status (PreCheckStatus): status of the exectution check (PreCheck): check object + start_time (datetime): start time of the check + execution_time (int): execution time in seconds message (str, optional): check exit message. Defaults to None. """ self.status = status self.level = check.level if self.status == PreCheckStatus.SUCCESS: - self.message = f"Pre-Check '{check.check_name}' of type '{check.type.value}' PASSED" + self.message = f"Pre-Check '{check.check_name}' of type '{check.type.value}' PASSED. Start time: {start_time}. Execution time: {execution_time}" else: - self.message = f"Pre-Check '{check.check_name}' of type '{check.type.value}' FAILED with severity level '{self.level.value.lower()}': {message}" + self.message = f"Pre-Check '{check.check_name}' of type '{check.type.value}' FAILED with severity level '{self.level.value.lower()}'. Start time: {start_time}. Execution time: {execution_time}. Error message: {message}" class ReplaceCharsCheck(PreCheck): @@ -205,11 +208,12 @@ class ReplaceCharsCheck(PreCheck): PreCheckResult: object with results of execution """ try: + start_time = datetime.now() filtered_keys = [key for key in keys if re.sub("(\/[0-9]*\/)", "/", key).endswith(self.applies_to_field)] self.replace_characters(data=patient_data, key_paths=filtered_keys) - return PreCheckResult(status=PreCheckStatus.SUCCESS, check=self) + return PreCheckResult(status=PreCheckStatus.SUCCESS, check=self, start_time=start_time, execution_time=(datetime.now() - start_time).seconds) except Exception as exc: - return PreCheckResult(status=PreCheckStatus.FAILED, check=self, message=traceback.format_exc()) + return PreCheckResult(status=PreCheckStatus.FAILED, check=self, start_time=start_time, execution_time=(datetime.now() - start_time).seconds, message=traceback.format_exc()) def replace_characters(self, data: dict, key_paths: list): """Loop through the data and replace characters @@ -253,11 +257,12 @@ class ReplaceRegexCheck(PreCheck): PreCheckResult: object with results of execution """ try: + start_time = datetime.now() filtered_keys = [key for key in keys if re.sub("(\/[0-9]*\/)", "/", key).endswith(self.applies_to_field)] self.replace_characters(data=patient_data, key_paths=filtered_keys) - return PreCheckResult(status=PreCheckStatus.SUCCESS, check=self) + return PreCheckResult(status=PreCheckStatus.SUCCESS, check=self, start_time=start_time, execution_time=(datetime.now() - start_time).seconds) except Exception as exc: - return PreCheckResult(status=PreCheckStatus.FAILED, check=self, message=traceback.format_exc()) + return PreCheckResult(status=PreCheckStatus.FAILED, check=self, start_time=start_time, execution_time=(datetime.now() - start_time).seconds, message=traceback.format_exc()) def replace_characters(self, data: dict, key_paths: list): """Loop through the data and replace according to regex expression @@ -300,26 +305,29 @@ class RegexCheck(PreCheck): """ self.check_result = None try: + start_time = datetime.now() filtered_keys = [key for key in keys if re.sub("(\/[0-9]*\/)", "/", key).endswith(self.applies_to_field)] - self.apply_regex(data=patient_data, key_paths=filtered_keys) + self.apply_regex(data=patient_data, key_paths=filtered_keys, start_time=start_time) if self.check_result is not None: return self.check_result else: - return PreCheckResult(status=PreCheckStatus.SUCCESS, check=self) + return PreCheckResult(status=PreCheckStatus.SUCCESS, check=self, start_time=start_time, execution_time=(datetime.now() - start_time).seconds) except Exception as exc: - return PreCheckResult(status=PreCheckStatus.FAILED, check=self, message=traceback.format_exc()) + return PreCheckResult(status=PreCheckStatus.FAILED, check=self, start_time=start_time, execution_time=(datetime.now() - start_time).seconds, message=traceback.format_exc()) - def apply_regex(self, data: dict, key_paths: list): + def apply_regex(self, data: dict, key_paths: list, start_time: datetime): """Loop through data and check the regex expression Args: data (dict): patient data key_paths (list): list of key paths to update + start_time (datetime): start time of the check """ for key_path in key_paths: current, last_key = PreCheck._transverse_data(key_path=key_path, data=data) if not bool(re.match(f"^{self.regex}$", current[last_key])): - self.check_result = PreCheckResult(status=PreCheckStatus.FAILED, check=self, message=f"Value '{current[last_key]}' of field '{key_path}' does not match regex '{self.regex}'") + self.check_result = PreCheckResult(status=PreCheckStatus.FAILED, check=self, start_time=start_time, execution_time=(datetime.now() - start_time).seconds, + message=f"Value '{current[last_key]}' of field '{key_path}' does not match regex '{self.regex}'") return class ValidationCheck(PreCheck): @@ -354,6 +362,7 @@ class ValidationCheck(PreCheck): PreCheckResult: object with results of execution """ try: + start_time = datetime.now() if self.check_formats: format_checker = FormatChecker() format_checker.checkers['date'] = (PreCheck._date_checker, ValueError) @@ -366,14 +375,14 @@ class ValidationCheck(PreCheck): errs=v.iter_errors(patient_data) errors = [err for err in errs] if not errors: - return PreCheckResult(status=PreCheckStatus.SUCCESS, check=self) + return PreCheckResult(status=PreCheckStatus.SUCCESS, check=self, start_time=start_time, execution_time=(datetime.now() - start_time).seconds) else: error_message = "" for err in errors: error_message += f"\nJSON Path: {err.json_path} -- Error: {err.message}" - return PreCheckResult(status=PreCheckStatus.FAILED, check=self, message=error_message) + return PreCheckResult(status=PreCheckStatus.FAILED, check=self, start_time=start_time, execution_time=(datetime.now() - start_time).seconds, message=error_message) except Exception as exc: - return PreCheckResult(status=PreCheckStatus.FAILED, check=self, message=traceback.format_exc()) + return PreCheckResult(status=PreCheckStatus.FAILED, check=self, start_time=start_time, execution_time=(datetime.now() - start_time).seconds, message=traceback.format_exc()) class DataTypeCheck(PreCheck): @@ -397,13 +406,14 @@ class DataTypeCheck(PreCheck): PreCheckResult: object with results of execution """ try: + start_time = datetime.now() error_message = self.check_data_type(data=patient_data) if error_message is not None: - return PreCheckResult(status=PreCheckStatus.FAILED, check=self, message=error_message) + return PreCheckResult(status=PreCheckStatus.FAILED, check=self, start_time=start_time, execution_time=(datetime.now() - start_time).seconds, message=error_message) else: - return PreCheckResult(status=PreCheckStatus.SUCCESS, check=self) + return PreCheckResult(status=PreCheckStatus.SUCCESS, check=self, start_time=start_time, execution_time=(datetime.now() - start_time).seconds) except: - return PreCheckResult(status=PreCheckStatus.FAILED, check=self, message=traceback.format_exc()) + return PreCheckResult(status=PreCheckStatus.FAILED, check=self, start_time=start_time, execution_time=(datetime.now() - start_time).seconds, message=traceback.format_exc()) def check_data_type(self, data: bytes) -> str: """Check data types @@ -466,14 +476,15 @@ class IRIValidationCheck(PreCheck): PreCheckResult: object with results of execution """ try: + start_time = datetime.now() filtered_keys = [key for key in keys if key.split('/')[-1] == 'iri'] error_messages = self.validate_IRIs(data=patient_data, keys=filtered_keys) if error_messages: - return PreCheckResult(status=PreCheckStatus.FAILED, check=self, message="\n" + "\n".join(error_messages)) + return PreCheckResult(status=PreCheckStatus.FAILED, check=self, start_time=start_time, execution_time=(datetime.now() - start_time).seconds, message="\n" + "\n".join(error_messages)) else: - return PreCheckResult(status=PreCheckStatus.SUCCESS, check=self) + return PreCheckResult(status=PreCheckStatus.SUCCESS, check=self, start_time=start_time, execution_time=(datetime.now() - start_time).seconds) except: - return PreCheckResult(status=PreCheckStatus.FAILED, check=self, message=traceback.format_exc()) + return PreCheckResult(status=PreCheckStatus.FAILED, check=self, start_time=start_time, execution_time=(datetime.now() - start_time).seconds, message=traceback.format_exc()) def validate_IRIs(self, data: dict, keys: list) -> list: """Validate IRIs values @@ -514,10 +525,11 @@ class RdfValidationCheck(PreCheck): PreCheckResult: object with results of execution """ try: + start_time = datetime.now() validate_rdf_file(content=patient_data) - return PreCheckResult(status=PreCheckStatus.SUCCESS, check=self) + return PreCheckResult(status=PreCheckStatus.SUCCESS, check=self, start_time=start_time, execution_time=(datetime.now() - start_time).seconds) except Exception as exc: - return PreCheckResult(status=PreCheckStatus.FAILED, check=self, message=exc) + return PreCheckResult(status=PreCheckStatus.FAILED, check=self, start_time=start_time, execution_time=(datetime.now() - start_time).seconds, message=exc) class JSONPreProcessing(PreCheck): """ @@ -543,6 +555,7 @@ class JSONPreProcessing(PreCheck): PreCheckResult: object with results of execution """ try: + start_time = datetime.now() # Push down sourceConcept ID and type code_term_paths = set() JSONPreProcessing._extract_code_term_paths(property_map=self.schema, code_keys=code_term_paths, key_path='') @@ -552,9 +565,9 @@ class JSONPreProcessing(PreCheck): # Expand array fields in supporting concepts JSONPreProcessing._expand_supporting_array_fields(data=patient_data) - return PreCheckResult(status=PreCheckStatus.SUCCESS, check=self) + return PreCheckResult(status=PreCheckStatus.SUCCESS, check=self, start_time=start_time, execution_time=(datetime.now() - start_time).seconds) except Exception: - return PreCheckResult(status=PreCheckStatus.FAILED, check=self, message=traceback.format_exc()) + return PreCheckResult(status=PreCheckStatus.FAILED, check=self, start_time=start_time, execution_time=(datetime.now() - start_time).seconds, message=traceback.format_exc()) @staticmethod def _extract_code_term_paths(property_map: dict, code_keys: set, key_path: str, in_array_field: bool = False, in_one_of: bool = False): -- GitLab From e9ac6eb127c4661af8fa1d2c39b7cf71ccffeb97 Mon Sep 17 00:00:00 2001 From: Nicola Stoira <nicola.stoira@accenture.com> Date: Wed, 12 Jun 2024 14:40:10 +0200 Subject: [PATCH 2/4] Update unit tests --- api/tests/test_main.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/api/tests/test_main.py b/api/tests/test_main.py index f83801d6..8394d165 100644 --- a/api/tests/test_main.py +++ b/api/tests/test_main.py @@ -906,10 +906,10 @@ class IngestProcessTest(object): database.create_connection() sql = "SELECT message FROM logging WHERE project_name=%s and data_provider_id=%s AND patient_id='1' AND step='Pre-check/De-ID' AND status='Pre-Check'" database.cursor.execute(sql, (project_name, database.config.data_provider_id)) - patient_1_pre_checks = [log.strip('\t') for log in database.cursor.fetchone()[0].strip('\n').split('\n')] + patient_1_pre_checks = [log.strip('\t').split('.')[0] for log in database.cursor.fetchone()[0].strip('\n').split('\n')] sql = "SELECT message FROM logging WHERE project_name=%s and data_provider_id=%s AND patient_id='2' AND step='Pre-check/De-ID' AND status='Pre-Check'" database.cursor.execute(sql, (project_name, database.config.data_provider_id)) - patient_2_pre_checks = [log.strip('\t') for log in database.cursor.fetchone()[0].strip('\n').split('\n')] + patient_2_pre_checks = [log.strip('\t').split('.')[0] for log in database.cursor.fetchone()[0].strip('\n').split('\n')] sql = "SELECT message FROM logging WHERE project_name=%s and data_provider_id=%s AND patient_id='1' AND step='Pre-check/De-ID' AND status='De-Identification'" database.cursor.execute(sql, (project_name, database.config.data_provider_id)) patient_1_de_id = [log.strip('\t') for log in database.cursor.fetchone()[0].strip('\n').split('\n')] @@ -921,9 +921,9 @@ class IngestProcessTest(object): "Pre-Check 'defaultIDReplaceCharsCheck' of type 'replaceCharsCheck' PASSED", "Pre-Check 'defaultTermIDReplaceCharsCheck' of type 'replaceCharsCheck' PASSED", "Pre-Check 'defaultValidationCheck' of type 'validationCheck' PASSED", - "Pre-Check 'testRegexCheck' of type 'regexCheck' FAILED with severity level 'warn': Value 'allergen_two' of field 'content/sphn:Allergy/1/sphn:hasAllergen/id' does not match regex '^allergen_[0-9]*$'", + "Pre-Check 'testRegexCheck' of type 'regexCheck' FAILED with severity level 'warn''", "Pre-Check 'jsonPreProcessing' of type 'jsonPreProcessing' PASSED"] - assert patient_2_pre_checks == ["Pre-Check 'testDataTypeCheck' of type 'dataTypeCheck' FAILED with severity level 'warn': ", "Values not matching data type 'xsd:dateTime': ['2022-04-24']"] + assert patient_2_pre_checks == ["Pre-Check 'testDataTypeCheck' of type 'dataTypeCheck' FAILED with severity level 'warn'"] assert patient_1_de_id == ["De-Identification rule 'testScrambleField' of type 'scrambleField' EXECUTED", "De-Identification rule 'scrambleAllergenID' of type 'scrambleField' EXECUTED", "De-Identification rule 'scrambleAllergenCode' of type 'scrambleField' EXECUTED", "De-Identification rule 'testDateShift' of type 'dateShift' EXECUTED", "De-Identification rule 'testSubstituteFieldList' of type 'substituteFieldList' EXECUTED", "De-Identification rule 'testSubstituteFieldListSupporting' of type 'substituteFieldList' EXECUTED", -- GitLab From b0dca45048c31d36a8a56efc420a15a01e5a7793 Mon Sep 17 00:00:00 2001 From: Nicola Stoira <nicola.stoira@accenture.com> Date: Wed, 12 Jun 2024 15:14:02 +0200 Subject: [PATCH 3/4] Remove single quote --- api/tests/test_main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/tests/test_main.py b/api/tests/test_main.py index 8394d165..1af3de2a 100644 --- a/api/tests/test_main.py +++ b/api/tests/test_main.py @@ -921,7 +921,7 @@ class IngestProcessTest(object): "Pre-Check 'defaultIDReplaceCharsCheck' of type 'replaceCharsCheck' PASSED", "Pre-Check 'defaultTermIDReplaceCharsCheck' of type 'replaceCharsCheck' PASSED", "Pre-Check 'defaultValidationCheck' of type 'validationCheck' PASSED", - "Pre-Check 'testRegexCheck' of type 'regexCheck' FAILED with severity level 'warn''", + "Pre-Check 'testRegexCheck' of type 'regexCheck' FAILED with severity level 'warn'", "Pre-Check 'jsonPreProcessing' of type 'jsonPreProcessing' PASSED"] assert patient_2_pre_checks == ["Pre-Check 'testDataTypeCheck' of type 'dataTypeCheck' FAILED with severity level 'warn'"] assert patient_1_de_id == ["De-Identification rule 'testScrambleField' of type 'scrambleField' EXECUTED", "De-Identification rule 'scrambleAllergenID' of type 'scrambleField' EXECUTED", -- GitLab From 15e8fbcf13c23bb84e84eaa733e7b01ae07c1697 Mon Sep 17 00:00:00 2001 From: Nicola Stoira <nicola.stoira@accenture.com> Date: Wed, 12 Jun 2024 15:52:36 +0200 Subject: [PATCH 4/4] Fix test --- api/tests/test_main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/tests/test_main.py b/api/tests/test_main.py index 1af3de2a..2d8b57aa 100644 --- a/api/tests/test_main.py +++ b/api/tests/test_main.py @@ -923,7 +923,7 @@ class IngestProcessTest(object): "Pre-Check 'defaultValidationCheck' of type 'validationCheck' PASSED", "Pre-Check 'testRegexCheck' of type 'regexCheck' FAILED with severity level 'warn'", "Pre-Check 'jsonPreProcessing' of type 'jsonPreProcessing' PASSED"] - assert patient_2_pre_checks == ["Pre-Check 'testDataTypeCheck' of type 'dataTypeCheck' FAILED with severity level 'warn'"] + assert patient_2_pre_checks == ["Pre-Check 'testDataTypeCheck' of type 'dataTypeCheck' FAILED with severity level 'warn'", "Values not matching data type 'xsd:dateTime': ['2022-04-24']"] assert patient_1_de_id == ["De-Identification rule 'testScrambleField' of type 'scrambleField' EXECUTED", "De-Identification rule 'scrambleAllergenID' of type 'scrambleField' EXECUTED", "De-Identification rule 'scrambleAllergenCode' of type 'scrambleField' EXECUTED", "De-Identification rule 'testDateShift' of type 'dateShift' EXECUTED", "De-Identification rule 'testSubstituteFieldList' of type 'substituteFieldList' EXECUTED", "De-Identification rule 'testSubstituteFieldListSupporting' of type 'substituteFieldList' EXECUTED", -- GitLab