Source code for pythologist_reader.formats.inform.immunoprofile
from pythologist_reader.formats.inform.frame import CellFrameInForm, preliminary_threshold_read
from pythologist_reader.formats.inform.sets import CellSampleInForm, CellProjectInForm
from pythologist_reader.formats.inform.custom import CellFrameInFormLineArea, CellFrameInFormCustomMask
import os, re, sys
from tempfile import mkdtemp
from glob import glob
from shutil import copytree, copy, rmtree
import pandas as pd
from pythologist_image_utilities import read_tiff_stack, make_binary_image_array, binary_image_dilation
from uuid import uuid4
[docs]class CellProjectInFormImmunoProfile(CellProjectInForm):
"""
Read an ImmunoProfile sample
"""
def __init__(self,*argv,**kwargs):
super().__init__(*argv,**kwargs)
# if we are creating a new project go ahead and give a default name until otherwise set
if kwargs['mode']=='w': self.project_name = 'ImmunoProfile'
return
def create_cell_sample_class(self):
return CellSampleInFormImmunoProfile()
[docs] def add_sample_path(self,path,
sample_name=None,
export_names = ['FOXP3','PD1_PDL1'],
channel_abbreviations={
'PD-L1 (Opal 520)':'PDL1',
'Foxp3 (Opal 570)':'FOXP3',
'PD-1 (Opal 620)':'PD1'},
verbose=False,
microns_per_pixel=0.496,
invasive_margin_width_microns=40,
invasive_margin_drawn_line_width_pixels=10,
skip_margin=False,
skip_segmentation_processing=False,
skip_all_regions=False,
deidentify=False,
**kwargs):
"""
Read add a sample in as single project folder and add it to the CellProjectInFormImmunoProfile
such as ``IP-99-A00001``:
| IP-99-A00001/
| └── INFORM_ANALYSIS
| ├── FOXP3
| ├── GIMP
| └── PD1_PDL1
Args:
path (str): location of the project directory
sample_name (str): name of the immunoprofile sample (default: rightmost directory in path), can be overridden by 'deidenitfy' set to True .. results in the uuid4 for the sample being used
export_names (list): specify the names of the exports to read
channel_abbreviations (dict): dictionary of shortcuts to translate to simpler channel names
verbose (bool): if true print extra details
microns_per_pixel (float): conversion factor
invasive_margin_width_microns (int): size of invasive margin in microns
invasive_margin_drawn_line_width_pixels (int): size of the line drawn for invasive margins in pixels
skip_margin (bool): if false (default) read in margin line and define a margin acording to steps. if true, only read a tumor and stroma.
skip_segmentation_processing (bool): if false (default) read segementations, else skip to run faster
deidentify (bool): if false (default) use sample names and frame names derived from the folders. If true use the uuid4s.
Returns:
sample_id, sample_name (tuple) returns the uuid4 assigned as the sample_id, and the sample_name that were given to this sample that was added
"""
if self.mode == 'r': raise ValueError("Error: cannot write to a path in read-only mode.")
if sample_name is None: sample_name = os.path.split(path)[-1]
# fix the margin width
grow_margin_steps = int(invasive_margin_width_microns/microns_per_pixel-invasive_margin_drawn_line_width_pixels/2)
if verbose: sys.stderr.write("To reach a margin width in each direction of "+str(invasive_margin_width_microns)+"um we will grow the line by "+str(grow_margin_steps)+" pixels\n")
if microns_per_pixel is not None: self.microns_per_pixel = microns_per_pixel
if verbose: sys.stderr.write("microns_per_pixel "+str(self.microns_per_pixel)+"\n")
# read all terminal folders as sample_names unless there is none then the sample name is blank
abspath = os.path.abspath(path)
if not os.path.isdir(abspath): raise ValueError("Error project path must be a directory")
if len(os.path.split(abspath)) < 2: raise ValueError("expecting an IP path structure")
bpath1 = os.path.join(abspath,'INFORM_ANALYSIS')
if not os.path.isdir(bpath1): raise ValueError("expecting an INFORM_ANLAYSIS directory as a child directory of IP path")
#if autodectect_tumor:
# # Try to find out what the tumor is on this channel
# afiles = os.listdir(os.path.join(bpath1,export_names[0]))
# afiles = [x for x in afiles if re.search('_cell_seg_data.txt$',x)]
# if len(afiles) == 0: raise ValueError('expected some files in there')
# header = list(pd.read_csv(os.path.join(bpath1,export_names[0],afiles[0]),sep="\t").columns)
# cell = None
# for entry in header:
# m = re.match('Entire Cell (.* \('+autodectect_tumor+'\)) Mean \(Normalized Counts, Total Weighting\)',entry)
# if m: cell = m.group(1)
# if verbose and cell: sys.stderr.write("Detected the tumor channel as '"+str(cell)+"'\n")
# if cell: channel_abbreviations[cell] = 'TUMOR'
# #print(afile)
if verbose: sys.stderr.write("Reading sample "+path+" for sample "+sample_name+"\n")
# Read in one sample FOR this project
cellsample = self.create_cell_sample_class()
cellsample.read_path(path,sample_name=sample_name,
channel_abbreviations=channel_abbreviations,
verbose=verbose,
require=True,
require_score=True,
skip_segmentation_processing=skip_segmentation_processing,
export_names=export_names,
deidentify=deidentify,
steps = grow_margin_steps,
)
if deidentify: cellsample.sample_name = cellsample.id
# Save the sample TO this project
cellsample.to_hdf(self.h5path,location='samples/'+cellsample.id,mode='a')
current = self.key
if current is None:
current = pd.DataFrame([{'sample_id':cellsample.id,
'sample_name':cellsample.sample_name}])
current.index.name = 'db_id'
else:
iteration = max(current.index)+1
addition = pd.DataFrame([{'db_id':iteration,
'sample_id':cellsample.id,
'sample_name':cellsample.sample_name}]).set_index('db_id')
current = pd.concat([current,addition])
current.to_hdf(self.h5path,'info',mode='r+',complib='zlib',complevel=9,format='table')
return cellsample.id, cellsample.sample_name
[docs]class CellSampleInFormImmunoProfile(CellSampleInForm):
def create_cell_frame_class(self):
return CellFrameInFormLineArea() # this will be called when we read the HDF
def create_cell_frame_class_line_area(self):
return CellFrameInFormLineArea()
def create_cell_frame_class_custom_mask(self):
return CellFrameInFormCustomMask()
[docs] def read_path(self,path,sample_name=None,
channel_abbreviations=None,
verbose=False,
require=True,
require_score=True,
steps=76,
skip_margin=False,
skip_segmentation_processing=False,
skip_all_regions=False,
export_names=[],
deidentify=False):
if len(export_names)==0: raise ValueError("You need to know the names of the export(s)")
if sample_name is None: sample_name = path
if not os.path.isdir(path):
raise ValueError('Path input must be a directory')
absdir = os.path.abspath(path)
exportdir = os.path.join(absdir,'INFORM_ANALYSIS',export_names[0])
files = os.listdir(exportdir)
segs = [x for x in files if re.search('_cell_seg_data.txt$',x)]
if len(segs) == 0: raise ValueError("There needs to be cell_seg_data in the folder.")
frames = []
if skip_margin and verbose: sys.stderr.write("FORCE SKIP ANY MARGIN FILES.. Tumor and Stroma Only\n")
if skip_all_regions and verbose: sys.stderr.write("FORCE SKIP ALL REGION ANNOTATIONS .. Processed image will be annotated as a region 'Any'\n")
for file in segs:
m = re.match('(.*)cell_seg_data.txt$',file)
score = os.path.join(exportdir,m.group(1)+'score_data.txt')
#summary = os.path.join(path,m.group(1)+'cell_seg_data_summary.txt')
parent = os.path.split(exportdir)[0]
#print(path)
binary_seg_maps = os.path.join(exportdir,m.group(1)+'binary_seg_maps.tif')
component_image = os.path.join(exportdir,m.group(1)+'component_data.tif')
tfile = os.path.join(exportdir,m.group(1)+'tissue_seg_data.txt')
tumor = os.path.join(parent,'GIMP',m.group(1)+'Tumor.tif')
margin = os.path.join(parent,'GIMP',m.group(1)+'Invasive_Margin.tif')
tissue_seg_data = tfile if os.path.exists(tfile) else None
frame = m.group(1).rstrip('_')
data = os.path.join(exportdir,file)
if not os.path.exists(score):
raise ValueError('Missing score file '+score)
if verbose: sys.stderr.write('Acquiring frame '+data+"\n")
cid = None
if os.path.exists(margin) and not skip_margin and not skip_all_regions:
if verbose: sys.stderr.write("LINE AREA TYPE\n")
cid = self.create_cell_frame_class_line_area()
cid.read_raw(frame_name = frame,
cell_seg_data_file=data,
score_data_file=score,
tissue_seg_data_file=tissue_seg_data,
binary_seg_image_file=binary_seg_maps,
component_image_file=component_image,
channel_abbreviations=channel_abbreviations,
verbose=verbose,
require=require,
skip_segmentation_processing=skip_segmentation_processing)
#print(cid)
update_with_other_scores(cid,parent,m.group(1),export_names[1:])
if verbose: sys.stderr.write("growing margin by "+str(steps)+" steps\n")
if not skip_all_regions: cid.set_line_area(margin,tumor,steps=steps,verbose=verbose)
else:
if verbose: sys.stderr.write("TUMOR MASK ONLY TYPE\n")
cid = self.create_cell_frame_class_custom_mask()
cid.read_raw(frame_name = frame,
cell_seg_data_file=data,
score_data_file=score,
tissue_seg_data_file=tissue_seg_data,
binary_seg_image_file=binary_seg_maps,
component_image_file=component_image,
channel_abbreviations=channel_abbreviations,
verbose=verbose,
require=require,
require_score=require_score,
skip_segmentation_processing=skip_segmentation_processing)
#print(cid)
# Must update the score file before refactoring regions
update_with_other_scores(cid,parent,m.group(1),export_names[1:])
stroma_name = 'Stroma-No-Margin'
if os.path.exists(margin) and skip_margin: stroma_name = 'Stroma-Ignore-Margin'
if not skip_all_regions: cid.set_area(tumor,'Tumor',stroma_name,verbose=verbose)
if deidentify: cid.frame_name = cid.id
frame_id = cid.id
self._frames[frame_id]=cid
frames.append({'frame_id':frame_id,'frame_name':frame,'frame_path':absdir})
if verbose: sys.stderr.write("finished tumor and stroma and margin\n")
self._key = pd.DataFrame(frames)
self._key.index.name = 'db_id'
self.sample_name = sample_name
def update_with_other_scores(frame, parent, file_prefix, alt_folders):
# Now lets look for additional scores for this frame
for altfolder in alt_folders:
# see if there is an approrpriate score in this
altpath = os.path.join(parent,altfolder,file_prefix+'score_data.txt')
if not os.path.exists(altpath):
if verbose: sys.stderr.write("WARNING: Missing a score file in the alternate folder "+str(altpath)+"\n")
continue
# If we are still here we have a score file
# This part is a little hacky .. we are going to bring a function from an CellFrameInForm just so we can use its "preliminary_threshold_read" function
altscore = preliminary_threshold_read(altpath, frame.get_data('measurement_statistics'),
frame.get_data('measurement_features'),
frame.get_data('measurement_channels'),
frame.get_data('regions')).reset_index().copy()
current_max = max(frame.get_data('thresholds').index)
altscore['gate_index'] = altscore['gate_index'].apply(lambda x: x+current_max+1)
newscore = pd.concat([frame.get_data('thresholds').reset_index(),altscore],sort=True).set_index('gate_index')
frame.set_data('thresholds',newscore)
return