Commit a75d7067 authored by Gabriel Couture's avatar Gabriel Couture

Merge branch 'fix/change-path-when-retrieving-tree' into 'master'

Fix paths when retrieving and writing patient forest

See merge request !43
parents 1d1d2f31 790a05cf
Pipeline #607 passed with stage
in 3 minutes and 30 seconds
......@@ -5,7 +5,7 @@ from pyorthanc.study import Study
from pyorthanc.series import Series
from pyorthanc.instance import Instance
from pyorthanc.util import build_patient_forest, trim_patient_forest, \
retrieve_and_write_patients_forest_to_given_path
retrieve_and_write_patients
__all__ = [
'Orthanc',
......@@ -16,5 +16,5 @@ __all__ = [
'Instance',
'build_patient_forest',
'trim_patient_forest',
'retrieve_and_write_patients_forest_to_given_path'
'retrieve_and_write_patients'
]
......@@ -64,6 +64,16 @@ class Instance:
"""
return self.identifier
def get_uid(self) -> str:
"""Get SOPInstanceUID
Returns
-------
str
SOPInstanceUID
"""
return self.get_main_information()['MainDicomTags']['SOPInstanceUID']
def get_main_information(self) -> Dict:
"""Get instance information
......
......@@ -55,6 +55,16 @@ class Series:
"""
return self.identifier
def get_uid(self) -> str:
"""Get SeriesInstanceUID
Returns
-------
str
SeriesInstanceUID
"""
return self.get_main_information()['MainDicomTags']['SeriesInstanceUID']
def get_main_information(self) -> Dict:
"""Get series main information
......
......@@ -108,7 +108,20 @@ class Study:
str
Study ID
"""
return self.get_main_information()['MainDicomTags']['StudyID']
try:
return self.get_main_information()['MainDicomTags']['StudyID']
except KeyError:
return ''
def get_uid(self) -> str:
"""Get StudyInstanceUID
Returns
-------
str
StudyInstanceUID
"""
return self.get_main_information()['MainDicomTags']['StudyInstanceUID']
def get_parent_patient_identifier(self) -> str:
"""Get the Orthanc identifier of the parent patient
......
......@@ -5,8 +5,8 @@ from concurrent.futures import ThreadPoolExecutor
from typing import List, Dict, Callable, Optional
from pyorthanc import Study, Series, Instance
from pyorthanc.patient import Patient
from pyorthanc.orthanc import Orthanc
from pyorthanc.patient import Patient
def build_patient_forest(
......@@ -143,9 +143,7 @@ def trim_patient_forest(patient_forest: List[Patient]) -> List[Patient]:
return list(patients)
def retrieve_and_write_patients_forest_to_given_path(
patient_forest: List[Patient],
path: str) -> None:
def retrieve_and_write_patients(patient_forest: List[Patient], path: str) -> None:
"""Retrieve and write patients to given path
Parameters
......@@ -155,30 +153,77 @@ def retrieve_and_write_patients_forest_to_given_path(
path
Path where you want to write the files.
"""
anonymized_patient_counter = 1
os.makedirs(path, exist_ok=True)
for patient in patient_forest:
retrieve_patient(patient, path)
def retrieve_patient(patient: Patient, path: str) -> None:
patient_path = _make_patient_path(path, patient.get_id())
os.makedirs(patient_path, exist_ok=True)
for study in patient.get_studies():
retrieve_study(study, patient_path)
def retrieve_study(study: Study, patient_path: str) -> None:
study_path = _make_study_path(patient_path, study.get_id())
os.makedirs(study_path, exist_ok=True)
for series in study.get_series():
retrieve_series(series, study_path)
def retrieve_series(series: Series, study_path: str) -> None:
series_path = _make_series_path(study_path, series.get_modality())
os.makedirs(series_path, exist_ok=True)
for instance in series.get_instances():
retrieve_instance(instance, series_path)
def retrieve_instance(instance: Instance, series_path) -> None:
path = os.path.join(series_path, instance.get_uid() + '.dcm')
dicom_file_bytes = instance.get_dicom_file_content()
with open(path, 'wb') as file_handler:
file_handler.write(dicom_file_bytes)
def _make_patient_path(path: str, patient_id: str) -> str:
patient_directories = os.listdir(path)
if patient_id.strip() == '':
patient_id = 'anonymous-patient'
return os.path.join(path, _make_path_name(patient_id, patient_directories))
return os.path.join(path, patient_id)
def _make_study_path(patient_path: str, study_id: str) -> str:
study_directories = os.listdir(patient_path)
if study_id.strip() == '':
study_id = 'anonymous-study'
return os.path.join(patient_path, _make_path_name(study_id, study_directories))
if patient.get_id().strip() == '':
patient_path = os.path.join(path, f'anonymized-patient-{anonymized_patient_counter}')
anonymized_patient_counter += 1
else:
patient_path = os.path.join(path, patient.get_id())
used_study_paths: List[str] = [] # Sometime there are many studies with the same "ID" name.
def _make_series_path(study_path: str, modality: str) -> str:
series_directories = os.listdir(study_path)
for j, study in enumerate(patient.get_studies()):
study_path = os.path.join(patient_path, 'anonymized-study' if study.get_id() == '' else study.get_id())
return os.path.join(study_path, _make_path_name(modality, series_directories))
if study_path in used_study_paths:
study_path += str(j + 1)
used_study_paths.append(study_path)
os.makedirs(study_path, exist_ok=True)
def _make_path_name(name: str, directories: List[str], increment: int = 1, has_increment: bool = False) -> str:
if not has_increment:
name = f'{name}-{increment}'
has_increment = True
else:
name = '-'.join(name.split('-')[:-1])
for series in study.get_series():
for k, instance in enumerate(series.get_instances()):
instance_path = os.path.join(study_path, f'{series.get_modality()}-{k+1}.dcm')
if name in directories:
return _make_path_name(name, directories, increment + 1, has_increment)
dicom_file_bytes = instance.get_dicom_file_content()
with open(instance_path, 'wb') as file_handler:
file_handler.write(dicom_file_bytes)
return name
......@@ -50,7 +50,7 @@ with open('./README.md', 'r') as file_handler:
setup(
name='pyorthanc',
version='0.2.12',
version='0.2.13',
packages=find_packages(),
url='https://gitlab.physmed.chudequebec.ca/gacou54/pyorthanc',
license='MIT',
......
......@@ -44,6 +44,11 @@ class TestInstance(unittest.TestCase):
{key: value for key, value in a_instance.INFORMATION.items() if key not in keys_to_exclude},
)
def test_givenAInstance_whenGettingSOPInstanceUID_thenResultIsExpectedUID(self):
result = self.instance.get_uid()
self.assertEqual(result, a_instance.INFORMATION['MainDicomTags']['SOPInstanceUID'])
def test_givenAInstance_whenGettingFileSize_thenResultIsAInt(self):
result = self.instance.get_file_size()
......
......@@ -53,6 +53,11 @@ class TestSeries(unittest.TestCase):
self.assertEqual(result, a_series.PARENT_STUDY)
def test_givenASeries_whenGettingSeriesInstanceUID_thenResultIsExpectedUID(self):
result = self.series.get_uid()
self.assertEqual(result, a_series.INFORMATION['MainDicomTags']['SeriesInstanceUID'])
def test_givenASeries_whenBuildingInstances_thenPatientHasInstances(self):
self.series.build_instances()
......
......@@ -63,6 +63,11 @@ class TestStudy(unittest.TestCase):
self.assertEqual(result, a_study.ID)
def test_givenAStudy_whenGettingStudyInstanceUID_thenResultIsExpectedUID(self):
result = self.study.get_uid()
self.assertEqual(result, a_study.INFORMATION['MainDicomTags']['StudyInstanceUID'])
def test_givenAStudy_whenGettingReferringPhysicianName_thenResultIsExpectedReferringPhysicianName(self):
result = self.study.get_referring_physician_name()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment