import concurrent.futures
import re
from functools import cached_property
from pathlib import Path
from typing import Dict, List, Optional, Sequence, Union
import albumentations
import cv2
import numpy as np
import tifffile
from skimage.util import img_as_float32
from ..base import MaskDataset
from ..types import BundledPath
[docs]class BBBC020(MaskDataset):
"""Murine bone-marrow derived macrophages
The image set consists of 25 images, each consisting of three channels. The
samples were stained with DAPI and CD11b/APC. In addition to this, a merged
image is provided. DAPI labels the nuclei and CD11b/APC the cell surface.
Parameters
----------
root_dir : str
Path to root directory
output : {'both', 'image', 'mask'}, default: 'both'
Change outputs. 'both' returns {'image': image, 'mask': mask}.
transforms : albumentations.Compose, optional
An instance of Compose (albumentations pkg) that defines augmentation in
sequence.
num_samples : int, optional
Useful when ``transforms`` is set. Define the total length of the
dataset. If it is set, it overwrites ``__len__``.
grayscale : bool, default: False
Convert images to grayscale
grayscale_mode : {'equal', 'cv2', Sequence[float]}, default: 'equal'
How to convert to grayscale. If set to 'cv2', it follows opencv
implementation. Else if set to 'equal', it sums up values along channel
axis, then divides it by the number of expected channels.
image_ch : {'cell', 'nuclei'}, default: ('cell', 'nuclei')
Which channel(s) to load as image. Make sure to give it as a Sequence
when choose a single channel.
anno_ch : {'nuclei', 'cells'}, default: ('nuclei',)
Which channel(s) to load as annotation. Make sure to give it as a
Sequence when choose a single channel.
drop_missing_pairs : bool, default: True
Valid only if `output='both'`. It will drop images that do not have mask
pairs.
Warnings
--------
5 annotations are missing: ind={17,18,19,20,21}
[jw-30min 1, jw-30min 2, jw-30min 3, jw-30min 4, jw-30min 5]
- ./BBBC020_v1_images/jw-30min 1/jw-30min 1_(c1+c5).TIF
- ./BBBC020_v1_images/jw-30min 2/jw-30min 2_(c1+c5).TIF
- ./BBBC020_v1_images/jw-30min 3/jw-30min 3_(c1+c5).TIF
- ./BBBC020_v1_images/jw-30min 4/jw-30min 4_(c1+c5).TIF
- ./BBBC020_v1_images/jw-30min 5/jw-30min 5_(c1+c5).TIF
- BBC020_v1_outlines_nuclei/jw-15min 5_c5_43.TIF exists but corrupted
Notes
-----
- Anotations are instance segmented where each of them is saved as a single
image file. It loads and aggregates them as a single array. Label loaded
after will override the one loaded before. If you do not want this
behavior, make a subclass out of this class and override ``get_mask()``
method, accordingly.
- 2 channels; R channel is the same as G, R==G!=B
Assign 0 to red channel
- BBBC has received a complaint that "BBB020_v1_outlines_nuclei" appears
incomplete and we have been unable to obtain the missing images from the
original contributor.
- Nuclei anno looks good
- Should separte nuclei and cells annotation; if ``anno_ch=None``,
``anno_dict`` becomes a mess.
References
----------
.. [1] https://bbbc.broadinstitute.org/BBBC020
See Also
--------
MaskDataset : Super class
Dataset : Base class
DatasetInterface : Interface
"""
# Dataset's acronym
acronym = 'BBBC020'
def __init__(
self,
root_dir: str,
*,
output: str = 'both',
transforms: Optional[albumentations.Compose] = None,
num_samples: Optional[int] = None,
grayscale: bool = False,
grayscale_mode: Union[str, Sequence[float]] = 'equal',
# specific to this dataset
image_ch: Sequence[str] = ('nuclei', 'cells'),
anno_ch: Sequence[str] = ('nuclei',),
drop_missing_pairs: bool = True,
**kwargs
):
self._root_dir = root_dir
self._output = output
self._transforms = transforms
self._num_samples = num_samples
self._grayscale = grayscale
self._grayscale_mode = grayscale_mode
# specific to this dataset
self._num_channels = 2 # explicit for `grayscale`
self.image_ch = image_ch
self.anno_ch = anno_ch
if not any([ch in ('nuclei', 'cells') for ch in image_ch]):
raise ValueError("Set `image_ch` in ('nuclei', 'cells') in sequence")
if not any([ch in ('nuclei', 'cells') for ch in anno_ch]):
raise ValueError("Set `anno_ch` in ('nuclei', 'cells') in sequence")
self.drop_missing_pairs = drop_missing_pairs
if self.output == 'both' and self.drop_missing_pairs:
self.file_list, self.anno_dict = self._drop_missing_pairs()
[docs] def get_image(self, p: Path) -> np.ndarray:
img = tifffile.imread(p)
# R==G, zero 0
img[..., 0] = 0
if len(ch := self.image_ch) == 1:
if ch[0] == 'cells':
img = cv2.cvtColor(img[..., 1], cv2.COLOR_GRAY2RGB)
elif ch[0] == 'nuclei':
img = cv2.cvtColor(img[..., 2], cv2.COLOR_GRAY2RGB)
else:
raise ValueError
return img_as_float32(img)
[docs] def get_mask(self, lst_p: Union[BundledPath, List[BundledPath]]) -> np.ndarray:
def _assign_index(
mask: np.ndarray,
fn: Union[str, Path],
ind: int
):
"""For threading"""
tif: np.ndarray = tifffile.imread(fn)
idx_nz = tif.nonzero()
mask[idx_nz] = ind
if len(self.anno_ch) == 1:
tif: np.ndarray = tifffile.imread(lst_p[0])
mask = np.zeros_like(tif)
idx_nz = tif.nonzero()
mask[idx_nz] = 1
with concurrent.futures.ThreadPoolExecutor() as executor:
[executor.submit(_assign_index, mask, p, ind)
for ind, p in enumerate(lst_p[1:], 2)]
elif len(self.anno_ch) == 2:
# mask_cells
lst_p_cells = lst_p[0]
tif: np.ndarray = tifffile.imread(lst_p_cells[0])
mask_cells = np.zeros_like(tif)
idx_nz = tif.nonzero()
mask_cells[idx_nz] = 1
# mask_nuclei
lst_p_nuclei = lst_p[1]
tif: np.ndarray = tifffile.imread(lst_p_nuclei[0])
mask_nuclei = np.zeros_like(tif)
idx_nz = tif.nonzero()
mask_nuclei[idx_nz] = 1
# threading
with concurrent.futures.ThreadPoolExecutor() as executor:
[executor.submit(_assign_index, mask_cells, p, ind)
for ind, p in enumerate(lst_p_cells[1:], 2)]
[executor.submit(_assign_index, mask_nuclei, p, ind)
for ind, p in enumerate(lst_p_nuclei[1:], 2)]
# 'cells' (ch=Green) first then 'nuclei' (ch=Blue)
mask = np.stack((mask_cells, mask_nuclei), axis=-1)
return mask
@cached_property
def file_list(self) -> List[Path]:
root_dir = self.root_dir
parent = 'BBBC020_v1_images'
if len(ch := self.image_ch) == 1:
if ch[0] == 'cells':
return sorted(root_dir.glob(f'{parent}/*/*_c1.TIF'))
elif ch[0] == 'nuclei':
return sorted(root_dir.glob(f'{parent}/*/*_c5.TIF'))
else:
raise ValueError
file_list = sorted(root_dir.glob(f'{parent}/*/*_(c1+c5).TIF'))
return file_list
@staticmethod
def _sort_key(p: Path):
res = re.search(r'\d+$', p.stem)
if res is None:
raise ValueError
return int(res.group())
@cached_property
def anno_dict(self) -> Dict[int, BundledPath]:
def _filter_valid_file(p: Path):
return p.stat().st_size > 0
root_dir = self.root_dir
anno_dict = {}
for i, p in enumerate(self.file_list):
k = p.parent.stem
# parent = root_dir / 'BBBC020_v1_outlines_*'
if len(ch := self.anno_ch) == 1:
parent = root_dir / f'BBBC020_v1_outlines_{ch[0]}'
anno_list = sorted(
parent.glob(f'{k}_*.TIF'),
key=self._sort_key
)
anno_list = list(filter(_filter_valid_file, anno_list))
elif len(ch) == 2:
anno_list_cells = sorted(
root_dir.glob(f'BBBC020_v1_outlines_nuclei/{k}_*.TIF'),
key=self._sort_key
)
anno_list_cells = list(filter(_filter_valid_file,
anno_list_cells))
anno_list_nuclei = sorted(
root_dir.glob(f'BBBC020_v1_outlines_cells/{k}_*.TIF'),
key=self._sort_key
)
anno_list_nuclei = list(filter(_filter_valid_file,
anno_list_nuclei))
if anno_list_cells and anno_list_nuclei:
# 'cells' (ch=Green) first then 'nuclei' (ch=Blue)
anno_list = [anno_list_cells, anno_list_nuclei]
else:
anno_list = []
else:
raise ValueError("Set `anno_ch` in ('nuclei', 'cells')")
if anno_list:
anno_dict[i] = anno_list
return anno_dict