diff --git a/.env_sample b/.env_sample index 26323a4a624e2b042250b4c8e164624ff729cb0b..7c1c1ebf2e14ca4db585dbab7a188dc2298ccfd0 100644 --- a/.env_sample +++ b/.env_sample @@ -44,7 +44,7 @@ GRAFANA_POSTGRES_USER_PASSWORD="" # Definition: Password for the Grafana post #MINIO_COMPRESSION="" # Definition: Enables compression in minio. Can be either set to "ON" or "OFF". Default is "OFF". WARNING: Enabling compression will decrease performance. # Einstein configuration parameters -#EINSTEIN="" # Definition: Activate patient data sharing with Einstein Relativity Tool. Default is "False". Type: boolean. Example: "True" +#EINSTEIN="" # Definition: Activate patient data sharing with Einstein Relativity Tool. It must be activated in order to activate it on the project level. Default is "False". Type: boolean. Example: "True" #EINSTEIN_ENDPOINT="" # Definition: Einstein endpoint used to call Einstein API. If Einstein is installed with default Docker configuration on the same server it should be 'host.docker.internal:1543'. Format: {hostname}:{port} #EINSTEIN_API_USER="" # Definition: API username of Einstein tool to trigger Einstein endpoints. #EINSTEIN_API_PASSWORD="" # Definition: API user's password of Einstein tool to trigger Einstein endpoints. diff --git a/api/app/api.py b/api/app/api.py index 5c6229140064f858bfd03a42b94443f810bb3752..d899959a4a9e3e7ff3944cd4305dfc36c2bb21a6 100644 --- a/api/app/api.py +++ b/api/app/api.py @@ -363,7 +363,8 @@ def extract_sphn_iri_during_import(project: str, database: Database) -> str: schema = database.config.minio_client.get_object(bucket_name=bucket, object_name=sphn_schema.object_name) return get_schema_iri(content=schema.data) -def upload_schemas(project: str, sphn_schema: UploadFile, project_specific_schema: UploadFile, exception_file: UploadFile, database: Database) -> str: +def upload_schemas(project: str, sphn_schema: UploadFile, project_specific_schema: UploadFile, exception_file: UploadFile, database: Database, + einstein: bool) -> str: """Upload schemas and exception file for project creation Args: @@ -373,16 +374,17 @@ def upload_schemas(project: str, sphn_schema: UploadFile, project_specific_schem exception_file (UploadFile): exception file database (Database): database object sphn_base_iri (str): SPHN base IRI + einstein (bool): share data with Einstein tool """ if project_specific_schema is None: content = sphn_schema.file.read() - database.check_pre_conditions(project=project, content=content, filename=sphn_schema.filename, type=PreCondition.PROJECT_CREATION, file_type=FileType.RDF, file_class=FileClass.SCHEMA) + database.check_pre_conditions(project=project, content=content, filename=sphn_schema.filename, type=PreCondition.PROJECT_CREATION, file_type=FileType.RDF, file_class=FileClass.SCHEMA, einstein=einstein) validate_rdf_file(content=content, filename=sphn_schema.filename) sphn_base_iri = get_schema_iri(content=content) write_to_minio(content=content, project_name=project, file_class=FileClass.SCHEMA, database=database, filename=sphn_schema.filename) else: content = project_specific_schema.file.read() - database.check_pre_conditions(project=project, content=content, filename=project_specific_schema.filename, type=PreCondition.PROJECT_CREATION, file_type=FileType.RDF, file_class=FileClass.SCHEMA) + database.check_pre_conditions(project=project, content=content, filename=project_specific_schema.filename, type=PreCondition.PROJECT_CREATION, file_type=FileType.RDF, file_class=FileClass.SCHEMA, einstein=einstein) validate_rdf_file(content=content, filename=project_specific_schema.filename) sphn_imported_iri = check_sphn_import(content=content) write_to_minio(content=content, project_name=project, file_class=FileClass.SCHEMA, database=database, filename=project_specific_schema.filename) diff --git a/api/app/main.py b/api/app/main.py index 9d8a2c897c4020b4c03072525da6474bf5736a98..c3ae18cdf871fc9a5aacec2f3b5cf4780f57b54e 100644 --- a/api/app/main.py +++ b/api/app/main.py @@ -101,6 +101,7 @@ async def create_project(project: str = Query(..., description="Name of the proj exception_file: UploadFile = File(None, description="Exception file for SHACLer"), output_format: str = Query("Turtle", enum = ["Turtle", "NQuads", "Trig"], description="Output format of the generated RDF data"), compression: str = Query(False, enum = [True, False], description="Compression on single patient files. If activated, patient file is compressed to GZIP format"), + share_with_einstein: bool = Query(False, enum = [True, False], description="Share project data (trig, n-quads) with Einstein tool"), testing: bool = Query(False, enum = [True, False], description="Flag for testing purposes only. Do not activate", include_in_schema=False), generate_sparql: bool = Query(True, enum = [True, False], description="Flag for testing purposes only which generates SPARQLer queries. Do not deactivate", include_in_schema=False), current_user: User = Depends(authentication.get_standard_level_permissions)): """ @@ -110,8 +111,10 @@ async def create_project(project: str = Query(..., description="Name of the proj database = Database(data_provider_id=data_provider_id) database.project_exists(project=project) try: - sphn_base_iri = upload_schemas(project=project, sphn_schema=sphn_schema, project_specific_schema=project_specific_schema, exception_file=exception_file, database=database) - database.create_project(project_name=project, current_user=current_user.username, sphn_base_uri=sphn_base_iri, output_format=OutputFormat(output_format), compression=compression) + sphn_base_iri = upload_schemas(project=project, sphn_schema=sphn_schema, project_specific_schema=project_specific_schema, exception_file=exception_file, database=database, + einstein=share_with_einstein) + database.create_project(project_name=project, current_user=current_user.username, sphn_base_uri=sphn_base_iri, output_format=OutputFormat(output_format), compression=compression, + share_with_einstein=share_with_einstein) except HTTPException: try: database.delete_project(project_name=project, minio_policy=minio_policy, delete_policies=False) @@ -1000,6 +1003,7 @@ async def import_project_endpoint(project: str = Query(..., description="Name of data_provider_id: str = Query(None, description="Data provider identifier. If not provided value from .env file will be used"), output_format: str = Query("Turtle", enum = ["Turtle", "NQuads", "Trig"], description="Output format of the generated RDF data"), compression: str = Query(False, enum = [True, False], description="Compression on single patient files. If activated, patient file is compressed to GZIP format"), + share_with_einstein: bool = Query(False, enum = [True, False], description="Share project data (trig, n-quads) with Einstein tool"), current_user: User = Depends(authentication.get_standard_level_permissions)): """ Import project @@ -1011,10 +1015,11 @@ async def import_project_endpoint(project: str = Query(..., description="Name of with project_zip.file as input_file: filename = project_zip.filename content = input_file.read() - database.check_pre_conditions(project=project, content=content, filename=filename, type=PreCondition.PROJECT_CREATION, file_type=FileType.ZIP) + database.check_pre_conditions(project=project, content=content, filename=filename, type=PreCondition.PROJECT_CREATION, file_type=FileType.ZIP, einstein=share_with_einstein) import_project(project=project, content=content, database=database) sphn_base_uri = extract_sphn_iri_during_import(project=project, database=database) - database.create_project(project_name=project, current_user=current_user.username, sphn_base_uri=sphn_base_uri, output_format=OutputFormat(output_format), compression=compression) + database.create_project(project_name=project, current_user=current_user.username, sphn_base_uri=sphn_base_uri, output_format=OutputFormat(output_format), compression=compression, + share_with_einstein=share_with_einstein) trigger_rml_generation(project=project, database=database) database.ddl_generation(project_name=project) database.grant_permissions(project=project) diff --git a/api/app/responses.py b/api/app/responses.py index 341dd576db726204ff4e833e00b035bb9d440a73..48bf9fda4dbf81b7b507d8939f32d51006df1c32 100644 --- a/api/app/responses.py +++ b/api/app/responses.py @@ -125,6 +125,11 @@ def get_responses(endpoint: Endpoint) -> dict: "value": { "detail": "Invalid project name 'Test-Project'. Project 'test-project' already exists. Case insensitive equality between project name is not allowed" } + }, + "Einstein deactivated": { + "value": { + "detail": "To activate data sharing with Einstein for project 'test-project', the EINSTEIN parameter must be set to 'True' in the .env file" + } } } if endpoint == Endpoint.CREATE_PROJECT: diff --git a/lib/connector.py b/lib/connector.py index 5fb97fb9995cf577e56c5d8506eb8e3c0182b4a7..8f7b656cc0bf50da054f32bafc580ec4dc47a6ce 100644 --- a/lib/connector.py +++ b/lib/connector.py @@ -17,6 +17,7 @@ from datetime import datetime import sys import subprocess import shutil +import traceback from typing import Union import json from itertools import islice @@ -25,6 +26,7 @@ import csv import re import io import requests +from botocore.exceptions import ClientError sys.path.append(os.path.abspath(os.path.dirname(__file__))) from de_identification import create_de_identification_rules @@ -1239,7 +1241,7 @@ outputfolder={output_folder} dag_run_config (dict): configuration for the running Airflow dag """ connector = Connector(project=project_name) - connector.create_einstein_bucket() + einstein = connector.database.is_einstein_activated(project=project_name) and connector.config.einstein patients_count = {'processed_patients': 0, 'failed_patients': 0, 'patients_with_warnings': 0} step = Steps.VALIDATION start_timestamp = datetime.now() @@ -1281,7 +1283,7 @@ outputfolder={output_folder} with ThreadPoolExecutor(max_workers=patient_group['parallelization']) as executor: futures = [] for i_chunk, chunk in enumerate(chunks): - futures.append(executor.submit(execute_validation, i_chunk, chunk, project_name, validation_log_level, run_uuid, patient_group['max_java_heap_space'], sphn_base_iri, project_output_format, compression)) + futures.append(executor.submit(execute_validation, i_chunk, chunk, project_name, validation_log_level, run_uuid, patient_group['max_java_heap_space'], sphn_base_iri, project_output_format, compression, einstein)) for future in as_completed(futures): patients_count_result, logs_folder = future.result() for key in patients_count.keys(): @@ -1436,13 +1438,6 @@ outputfolder={output_folder} execution_time=start_timestamp, elapsed_time=(datetime.now() - start_timestamp).seconds, run_uuid=run_uuid) raise - def create_einstein_bucket(self): - """Create Einstein bucket if Einstein data sharing activated and bucket does not exist - """ - bucket_name = 'einstein' - if self.config.einstein and not self.config.minio_client.bucket_exists(bucket_name=bucket_name): - self.config.minio_client.make_bucket(bucket_name=bucket_name) - def notify_einstein(self, patient_data: dict, patients_count: dict, project_name: str): """Notify patient to Einstein Relativity @@ -1588,7 +1583,7 @@ def execute_chunk_integration(i_chunk: int, chunk: dict, project_name: str, run_ def execute_validation(i_chunk: int, chunk: list, project_name: str, validation_log_level: LogLevel, run_uuid: str, max_java_heap_space: int, sphn_base_iri: str, project_output_format: OutputFormat, - compression: bool) -> dict: + compression: bool, einstein: bool) -> dict: """Execute validation in parallel over patient chunks Args: @@ -1601,6 +1596,7 @@ def execute_validation(i_chunk: int, chunk: list, project_name: str, validation_ sphn_base_iri (str): SPHN schema base IRI project_output_format (OutputFormat): project output format compression (str): project files should be compressed + einstein (bool): data sharing with Einstein activated Returns: dict: patients count @@ -1637,12 +1633,12 @@ def execute_validation(i_chunk: int, chunk: list, project_name: str, validation_ if not is_custom_query: chunk_connector.update_quality_checks(project_name=project_name, patient_id=validation_result.patient_id, report=validation_result.report, validation_ok=validation_result.status) chunk_connector.database.move_to_next_layer(project=project_name, patient_id=validation_result.patient_id, object_name=chunk[validation_result.patient_id], source_zone=DataPersistenceLayer.GRAPH_ZONE, validated_ok=validation_result.status, - parallel=True, sphn_base_iri=sphn_base_iri, project_output_format=project_output_format, compression=compression, einstein_data=einstein_data) + parallel=True, sphn_base_iri=sphn_base_iri, project_output_format=project_output_format, compression=compression, einstein_data=einstein_data, einstein=einstein) patients_count['processed_patients'] += 1 if not validation_result.status: patients_count['failed_patients'] += 1 chunk_connector.update_execution_errors_table(project_name=project_name, patient_id=validation_result.patient_id, run_uuid=run_uuid, step=step, error_level=ErrorLevel.ERROR, parallel=True) - if chunk_connector.config.einstein and einstein_data['patients']: + if einstein and einstein_data['patients']: chunk_connector.notify_einstein(patient_data=einstein_data, patients_count=patients_count, project_name=project_name) chunk_connector.cleanup_chunk_environment(test_folder=test_folder, logs_folder=logs_folder, properties_file=properties_file, step=step) chunk_connector.database.conn.commit() diff --git a/lib/database.py b/lib/database.py index 932c466a5bde633312e09f661cc9857f83814e2b..40c67c5698e606465307504b6225791a57404750 100644 --- a/lib/database.py +++ b/lib/database.py @@ -92,7 +92,7 @@ class Database(object): self.conn = None self.cursor = None - def create_project(self, project_name: str, current_user: str, sphn_base_uri: str, output_format: OutputFormat, compression: bool): + def create_project(self, project_name: str, current_user: str, sphn_base_uri: str, output_format: OutputFormat, compression: bool, share_with_einstein: bool): """ :param project_name: name of the project @@ -100,20 +100,22 @@ class Database(object): :param sphn_base_iri: base IRI of SPHN schema :param output_format: output format of generated RDF data :param compression: patient files should be compressed + :param share_with_einstein: share data with Einstein tool """ credentials = self.get_credentials(user_type=UserType.INGESTION) current_password = credentials[current_user] self.create_connection() - sql = """INSERT INTO configuration (project_name, data_provider_id, api_user, api_password, minio_access_key, minio_secret_key, postgres_user, postgres_password, postgres_ui_email, postgres_ui_password, airflow_user, airflow_password, airflow_firstname, airflow_lastname, airflow_email, initialized, sphn_base_iri, output_format, compression) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + sql = """INSERT INTO configuration (project_name, data_provider_id, api_user, api_password, minio_access_key, minio_secret_key, postgres_user, postgres_password, postgres_ui_email, postgres_ui_password, airflow_user, airflow_password, airflow_firstname, airflow_lastname, airflow_email, initialized, sphn_base_iri, output_format, compression, share_with_einstein) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (project_name, data_provider_id) DO UPDATE SET api_user=EXCLUDED.api_user, api_password=EXCLUDED.api_password, minio_access_key=EXCLUDED.minio_access_key, minio_secret_key=EXCLUDED.minio_secret_key, postgres_user=EXCLUDED.postgres_user, postgres_password=EXCLUDED.postgres_password, \ postgres_ui_email=EXCLUDED.postgres_ui_email, postgres_ui_password=EXCLUDED.postgres_ui_password, airflow_user=EXCLUDED.airflow_user, airflow_password=EXCLUDED.airflow_password, airflow_firstname=EXCLUDED.airflow_firstname, airflow_lastname=EXCLUDED.airflow_lastname, airflow_email=EXCLUDED.airflow_email, \ - initialized=EXCLUDED.initialized, sphn_base_iri=EXCLUDED.sphn_base_iri, output_format=EXCLUDED.output_format, compression=EXCLUDED.compression + initialized=EXCLUDED.initialized, sphn_base_iri=EXCLUDED.sphn_base_iri, output_format=EXCLUDED.output_format, compression=EXCLUDED.compression, share_with_einstein=EXCLUDED.share_with_einstein """ self.cursor.execute(sql, (project_name, self.config.data_provider_id, current_user, current_password, self.config.minio_access_key, self.pwd_context.hash(self.config.minio_secret_key), self.config.postgres_user, self.pwd_context.hash(self.config.postgres_password), self.config.postgres_ui_email, self.pwd_context.hash(self.config.postgres_ui_password), self.config.airflow_user, - self.pwd_context.hash(self.config.airflow_password), self.config.airflow_firstname, self.config.airflow_lastname, self.config.airflow_email, False, sphn_base_uri, output_format.value, compression)) + self.pwd_context.hash(self.config.airflow_password), self.config.airflow_firstname, self.config.airflow_lastname, self.config.airflow_email, False, sphn_base_uri, output_format.value, compression, + share_with_einstein)) self.close_connection() def delete_project(self, project_name: str, minio_policy: MinioPolicy, delete_policies: bool): @@ -1258,7 +1260,8 @@ class Database(object): if exists: raise HTTPException(status_code=exception_type, detail=pre_conditions) - def check_pre_conditions(self, project: str, type: PreCondition, content: bytes = None, filename: str = None, file_type: FileType = None, file_class: FileClass = None, tabular_type: TabularDataType = None, csv_file_type: IngestCSVDataType = None): + def check_pre_conditions(self, project: str, type: PreCondition, content: bytes = None, filename: str = None, file_type: FileType = None, file_class: FileClass = None, + tabular_type: TabularDataType = None, csv_file_type: IngestCSVDataType = None, einstein: bool = False): """ Checks that pre-conditions are met for ingesting data :param project: name of the project @@ -1269,6 +1272,7 @@ class Database(object): :param file_class: class of the file :param tabular_type: data type for tabular ingestion :param csv_file_type: data type of file for CSV ingestion + param einstein: share data with Einstein tool """ pre_conditions = None exception_type = None @@ -1282,6 +1286,8 @@ class Database(object): pre_conditions = "Invalid project name '{}'. Project name must begin and end with a letter or number".format(project) elif project.lower() == 'einstein': pre_conditions = "'einstein' is a reserved word and cannot be used as project name" + elif einstein and not self.config.einstein: + pre_conditions = f"To activate data sharing with Einstein for project '{project}', the EINSTEIN parameter must be set to 'True' in the .env file" if pre_conditions is not None: exception_type = status.HTTP_400_BAD_REQUEST elif type != PreCondition.EXT_SCHEMA_UPLOAD: @@ -1836,7 +1842,7 @@ class Database(object): return target_object_name.replace(extension, '.gz') def move_to_next_layer(self, project: str, patient_id: str, object_name: str, source_zone: DataPersistenceLayer, validated_ok: bool = False, file_type: FileType = None, patient_data: bytes = None, parallel: bool = False, - copy_only: bool = False, sphn_base_iri: str = None,project_output_format: OutputFormat = None, compression: bool = False, einstein_data: dict = None, anonymized: bool = False): + copy_only: bool = False, sphn_base_iri: str = None,project_output_format: OutputFormat = None, compression: bool = False, einstein_data: dict = None, anonymized: bool = False, einstein: bool = False): """Moves patient data to next data persistence layer and updates DB data persistence tables accordingly Args: @@ -1854,6 +1860,7 @@ class Database(object): compression (bool): project files should be compressed einstein_data (dict): map with patients info to send to Einstein anonymized (bool): anonymization activated on scrambling de-id rule + einstein (bool, optional): data sharing with Einstein activated """ bucket = Buckets.CONNECTOR.value timestmp = datetime.now() @@ -1930,7 +1937,7 @@ class Database(object): compressed_content.seek(0) self.config.minio_client.put_object(bucket_name=bucket, object_name=target_object_name, data=compressed_content, length=compressed_file_size) - if self.config.einstein: + if einstein: object_key = f'data/graph_zone/{os.path.basename(target_object_name)}' if not compression: converted_object.seek(0) @@ -4049,3 +4056,19 @@ ORDER BY concept, attribute, count_distinct_values compression = result[1] self.close_connection() return OutputFormat(output_format), compression + + def is_einstein_activated(self, project: str) -> bool: + """Check if Einstein data sharing is activated + + Args: + project (str): name of the project + + Returns: + bool: Einstein connection is active + """ + self.create_connection() + query = "SELECT share_with_einstein FROM configuration WHERE data_provider_id=%s AND project_name=%s" + self.cursor.execute(query, (self.config.data_provider_id, project)) + record = self.cursor.fetchone() + self.close_connection() + return record[0] \ No newline at end of file diff --git a/postgres/init/postgresql/00_create_tables.sql b/postgres/init/postgresql/00_create_tables.sql index 706bed92923d7749096d0131837cfa0fd0f167b4..2513f574f2a899ab164bd3f7d81ea90b1fc04d14 100644 --- a/postgres/init/postgresql/00_create_tables.sql +++ b/postgres/init/postgresql/00_create_tables.sql @@ -32,6 +32,7 @@ CREATE TABLE IF NOT EXISTS configuration ( sphn_base_iri varchar(100), output_format varchar(10), compression boolean, + share_with_einstein boolean, PRIMARY KEY(project_name, data_provider_id) );