# MIT License
#
# Copyright (C) HEART Authors 2024
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit
# persons to whom the Software is furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
# Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
# WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""Utility methods for converting data types to ART compatible versions."""
import logging
import uuid
from collections.abc import Sequence
from typing import TYPE_CHECKING, Any, Optional, Union
import numpy as np
from datasets import Dataset as HFDataset
from numpy.typing import ArrayLike, NDArray
from torch import Tensor, is_tensor
from torch.utils.data.dataset import Subset as TorchSubsetDataset
if TYPE_CHECKING:
from datasets import Dataset as HFDataset
logger: logging.Logger = logging.getLogger(__name__)
EMPTY_STRING = ""
[docs]
class ImageDataset:
"""
MAITE aligned dataset
Examples
--------
We can define a white-box attack and generate adversarial images:
>>> import numpy as np
>>> import torch
>>> import torchvision
>>> from heart_library.estimators.classification.pytorch import JaticPyTorchClassifier
>>> from heart_library.utils import ImageDataset
>>> from datasets import load_dataset
>>> from heart_library.metrics import AccuracyPerturbationMetric
>>> from heart_library.attacks.attack import JaticAttack
>>> from art.attacks.evasion import ProjectedGradientDescentPyTorch
>>> from copy import deepcopy
>>> from torchvision import transforms
Load an applicable dataset:
>>> data = load_dataset("CDAO/xview-subset-classification", split="test[0:14]")
Define the model:
>>> model = torchvision.models.resnet18(False)
>>> _ = model.eval()
Wrap the model:
>>> jptc = JaticPyTorchClassifier(
... model=model,
... loss=torch.nn.CrossEntropyLoss(),
... input_shape=(3, 224, 224),
... nb_classes=6,
... clip_values=(0, 1),
... )
Transform dataset:
>>> IMAGE_H, IMAGE_W = 224, 224
>>> preprocess = transforms.Compose([transforms.Resize((IMAGE_H, IMAGE_W)), transforms.ToTensor()])
>>> data = data.map(lambda x: {"image": preprocess(x["image"]), "label": x["label"]})
>>> to_image = lambda x: transforms.ToPILImage()(torch.Tensor(x))
Define and wrap the attacks:
>>> evasion_attack_undefended = ProjectedGradientDescentPyTorch(estimator=jptc, max_iter=10, eps=0.03)
>>> attack_undefended = JaticAttack(evasion_attack_undefended, norm=2)
Generate adversarial images:
>>> x_adv, y, metadata = attack_undefended(data=data)
>>> data_with_detections = ImageDataset(data, deepcopy(jptc(data)), metadata)
"""
metadata: dict[str, Any]
def __init__(
self,
images: list[NDArray[np.float32]],
targets: NDArray[np.float32],
metadata: list[dict[str, Any]],
metadata_id: Optional[str] = None,
) -> None:
"""ImageDataset initialization.
Args:
images (List[NDArray[np.float32]]): Array representation of images.
targets (NDArray[np.float32]): Targets.
metadata (List[dict[str, Any]]): Metadata.
"""
self._images = images
self._targets = targets
self._metadata = metadata
self.metadata = {"id": metadata_id if metadata_id is not None else str(uuid.uuid4())}
def __len__(self) -> int:
"""Returns image count.
Returns:
int: Image count.
"""
return len(self._images)
def __getitem__(self, ind: int) -> tuple[NDArray[np.float32], NDArray[np.float32], dict[str, Any]]:
"""Returns images, targets, metadata.
Args:
ind (int): Index of image, target, metadata combination to be returned.
Returns:
Tuple[NDArray[np.float32], NDArray[np.float32], dict[str, Any]]:
Specified image, target, metadata combination.
"""
return self._images[ind], self._targets[ind], self._metadata[ind]
[docs]
def hf_dataset_to_maite(
dataset: Any, # noqa ANN401
image_label: str = EMPTY_STRING,
target_label: str = EMPTY_STRING,
meta_label: str = EMPTY_STRING,
indices: Optional[Sequence[int]] = None,
) -> ImageDataset:
"""Convert HF dataset to MAITE aligned dataset
Args:
dataset (Any): Image data.
image_label (str, optional): Image label. Defaults to EMPTY_STRING.
target_label (str, optional): Target label. Defaults to EMPTY_STRING.
meta_label (str, optional): Metadata label. Defaults to EMPTY_STRING.
indices (Optional[Sequence[int]], optional): Indices. Defaults to None.
Raises:
ValueError: if image feature not found in dataset.
ValueError: if image feature not found in dataset.
Returns:
ImageDataset: MAITE aligned dataset.
"""
indices = [] if indices is None else indices
images: list[Any] = []
target_list: list[Any] = []
metadata: list[Any] = []
if len(indices) == 0:
for item in dataset:
# Generate list of images based on image label provided
images = __handle_image_labels(image_label, dataset, images, item)
# Generate lists of targets and metadata based on labels provided
target_list, metadata = __handle_target_meta_labels(
target_label,
target_list,
meta_label,
metadata,
dataset,
item,
)
else:
for idx in indices:
item = dataset[idx]
# Generate list of images based on image label provided
images = __handle_image_labels(image_label, dataset, images, item)
# Generate lists of targets and metadata based on labels provided
target_list, metadata = __handle_target_meta_labels(
target_label,
target_list,
meta_label,
metadata,
dataset,
item,
)
targets = np.array([None] * len(images)) if len(target_list) == 0 else np.array(target_list)
if len(metadata) == 0:
metadata = [{} for _ in range(len(images))]
return ImageDataset(images, targets, metadata)
def __handle_image_labels(
image_label: str,
dataset: "HFDataset",
images: list[Any],
item: dict[str, Any],
) -> list[Any]:
"""Generate list of images based on indices and image label provided.
Args:
image_label (str): Label of image feature in dataset.
dataset (Dataset): HF dataset.
images (list[Any]): Empty list for images to be appended to.
item (dict[str, Any]): Item within dataset.
Raises:
ValueError: Image feature not found in dataset.
Returns:
list[Any]: List of images.
"""
if image_label is not EMPTY_STRING and image_label in dataset.features:
images.append(item[image_label])
elif "img" in dataset.features:
images.append(item["img"])
elif "image" in dataset.features:
images.append(item["image"])
else:
raise ValueError("Image feature not found in dataset.")
return images
def __handle_target_meta_labels(
target_label: str,
target_list: list[Any],
meta_label: str,
metadata: list[Any],
dataset: "HFDataset",
item: dict[str, Any],
) -> tuple[list[Any], list[Any]]:
"""Generate lists of targets and metadata based on indices and labels provided.
Args:
target_label (str): Label of target feature in dataset.
target_list (list[Any]): Empty list for targets to be appended to.
meta_label (str): Label of metadata feature in dataset.
metadata (list[Any]): Empty list for metadata to be appended to.
dataset (Dataset): HF dataset.
item (dict[str, Any]): Item within dataset.
Returns:
tuple[list[Any], list[Any]]: Lists of targets and metadata.
"""
if target_label is not EMPTY_STRING:
target_list.append(item[target_label])
elif "label" in dataset.features:
target_list.append(item["label"])
if meta_label is not EMPTY_STRING:
metadata.append(item[meta_label])
return target_list, metadata
[docs]
def torch_subset_to_maite(dataset: Any) -> ImageDataset: # noqa ANN401
"""Convert Torch subset dataset to MAITE aligned dataset.
Args:
dataset (Any): Torch dataset.
Returns:
ImageDataset: MAITE aligned dataset.
"""
images = []
data = iter(dataset)
target_list = []
for item in data:
images.append(item[0])
target_list.append(item[1])
targets = np.array(target_list)
metadata: list[dict[str, Any]] = [{} for _ in range(len(images))]
return ImageDataset(images, targets, metadata)
def __handle_ndarray(
data: NDArray[np.float32],
images: np.ndarray,
targets: Optional[Any], # noqa: ANN401
metadata: list,
) -> tuple[NDArray[np.float32], None, list]:
"""Process ndarray format.
Args:
data (NDArray[np.float32]): Input data.
images (np.ndarray): Initialized image list.
targets (Optional[Any]): Initialized targets list.
metadata (list): Initialized metadata list.
Returns:
tuple[NDArray[np.float32], None, list]: Images, targets metadata.
"""
images = data
targets = None
metadata = []
return images, targets, metadata
def __handle_tensor(
data: Tensor,
images: NDArray[np.float32],
targets: Optional[Any], # noqa: ANN401
metadata: list,
) -> tuple[NDArray[np.float32], None, list]:
"""Process Tensor format.
Args:
data (NDArray[np.float32]): Input data.
images (np.ndarray): Initialized image list.
targets (Optional[Any]): Initialized targets list.
metadata (list): Initialized metadata list.
Returns:
tuple[NDArray[np.float32], None, list]: Images, targets metadata.
"""
images = np.asarray(data)
targets = None
metadata = []
return images, targets, metadata
def __handle_dict(
data: dict[str, Any],
images: NDArray[np.float32],
targets: Optional[Any], # noqa: ANN401
metadata: list,
) -> tuple[NDArray[np.float32], NDArray[np.float32], list[dict[str, Any]]]:
"""Process dict format.
Args:
data (NDArray[np.float32]): Input data.
images (np.ndarray): Initialized image list.
targets (Optional[Any]): Initialized targets list.
metadata (list): Initialized metadata list.
Returns:
tuple[NDArray[np.float32], None, list]: Images, targets metadata.
"""
images = data["images"]
# Generate targets
targets = __handle_dict_labels(data, targets)
metadata = data.get("metadata", [])
return images, targets, metadata
def __handle_tuple(
data: tuple,
images: NDArray[np.float32],
targets: Optional[Any], # noqa: ANN401
metadata: list,
) -> tuple[NDArray[np.float32], NDArray[np.float32], list[dict[str, Any]]]:
"""Process tuple format.
Args:
data (NDArray[np.float32]): Input data.
images (np.ndarray): Initialized image list.
targets (Optional[Any]): Initialized targets list.
metadata (list): Initialized metadata list.
Returns:
tuple[NDArray[np.float32], None, list]: Images, targets metadata.
"""
images = np.asarray(data[0]).astype(np.float32)
targets = data[1]
metadata = data[2]
# Generate targets if data is in tuple format.
targets = __handle_tuple_targets(targets)
return images, targets, metadata
def __handle_sequence(
data: Sequence,
images: NDArray[np.float32],
targets: Optional[Any], # noqa: ANN401
metadata: list,
) -> tuple[NDArray[np.float32], None, list]:
"""Process Sequence format.
Args:
data (NDArray[np.float32]): Input data.
images (np.ndarray): Initialized image list.
targets (Optional[Any]): Initialized targets list.
metadata (list): Initialized metadata list.
Returns:
tuple[NDArray[np.float32], None, list]: Images, targets metadata.
"""
# Generate images, targets, metadata if data is in Sequence format.
images, targets, metadata = __handle_sequence_targets(data, images, targets, metadata)
return images, targets, metadata
def __handle_dataset(
data: Any, # noqa: ANN401
images: NDArray[np.float32],
image_list: list,
targets: Optional[Any], # noqa: ANN401
metadata: list,
) -> tuple[NDArray[np.float32], Any, list]:
"""Process other dataset format.
Args:
data (NDArray[np.float32]): Input data.
images (np.ndarray): Initialized image list.
image_list (list): Temporary image list to be processed.
targets (Optional[Any]): Initialized targets list.
metadata (list): Initialized metadata list.
Returns:
tuple[NDArray[np.float32], None, list]: Images, targets metadata.
"""
# check for __getitem__ and __iter__.
image_list, targets, metadata = __handle_dataset_attr(data, image_list, targets, metadata)
# Check to see of image list is nonempty and same length as targets.
__check_dataset_lengths(image_list, targets)
# Handle dataset images of type PIL, NDArray, tensor, list.
images = __handle_dataset_images_types(image_list, images)
# Convert dataset targets to correct type.
targets = __handle_dataset_targets(targets)
return images, targets, metadata
def __handle_hf_torch_data(data: Any) -> ImageDataset: # noqa ANN401
"""Convert Hugging Face and Torch Subset data to MAITE aligned ImageDataset.
Args:
data (Any): Input data to be converted.
Returns:
ImageDataset: MAITE aligned image data.
"""
try:
from datasets import Dataset as HFDataset
from datasets.iterable_dataset import IterableDataset as HFIterableDataset
except ImportError as e:
raise ImportError("The 'datasets' package is required but not installed.") from e
# convert Hugging Face
if isinstance(data, (HFDataset, HFIterableDataset)):
data = hf_dataset_to_maite(data)
# convert Hugging Face if wrapped in Torch Subset
elif isinstance(data, TorchSubsetDataset) and isinstance(data.dataset, HFDataset):
data = hf_dataset_to_maite(data.dataset, indices=data.indices)
elif isinstance(data, TorchSubsetDataset):
data = torch_subset_to_maite(data)
return data
def __handle_dict_labels(data: dict[str, Any], targets: Any) -> NDArray[np.float32]: # noqa ANN401
"""Generate targets if data is dict format.
Args:
data (dict[str, Any]): Input data.
targets (Any): Empty initialization.
Returns:
NDArray[np.float32]: Targets in np array format.
"""
if "labels" in data:
targets = data["labels"]
# for object detection, convert to correct format if exist
if hasattr(targets[0], "boxes") and hasattr(targets[0], "scores"):
targets = [
{
"boxes": np.asarray(t.boxes).astype(np.float32),
"scores": np.asarray(t.scores).astype(np.float32),
"labels": np.asarray(t.labels).astype(np.int64),
}
for t in targets
]
elif isinstance(targets[0], np.ndarray):
targets = np.asarray(targets)
elif not any(targets):
targets = None
else:
targets = np.asarray(targets)
return targets
def __handle_tuple_targets(targets: Any) -> NDArray[np.float32]: # noqa ANN401
"""Generate targets if data is in tuple format.
Args:
targets (Any): Second item in data of tuple format.
Returns:
NDArray[np.float32]: Targets in np array format.
"""
# for object detection, convert to correct format if exist
if hasattr(targets[0], "boxes") and hasattr(targets[0], "scores"):
targets = [
{
"boxes": np.asarray(t.boxes).astype(np.float32),
"scores": np.asarray(t.scores).astype(np.float32),
"labels": np.asarray(t.labels).astype(np.int64),
}
for t in targets
]
elif isinstance(targets[0], np.ndarray) or is_tensor(targets[0]):
targets = np.asarray(targets)
elif not any(targets):
targets = None
else:
targets = np.asarray(targets)
return targets
def __handle_sequence_targets(
data: Any, # noqa: ANN401
images: NDArray[np.float32],
targets: Optional[Any], # noqa: ANN401
metadata: Any, # noqa: ANN401
) -> tuple[NDArray[np.float32], None, list]:
"""Generate images, targets, metadata if data is in Sequence format.
Args:
data (Any): Input data (images or videos). Video data must have
at least 5 dimensions (batch, channels, frames, height, width) or
(batch, frames, channels, height, width). Image data has no more
than 4 dimensions (batch, channels, height, width).
images (NDArray[np.float32]): Initialized image list.
targets (Optional[Any]): Initialized targets list.
metadata (Any): Initialized metadata list.
Returns:
tuple[NDArray[np.float32], None, list]: np array of images and empty targets, metadata.
"""
# here assuming data is a tensor - what if a numpy array?
# what if each batch (len 1 or >1), has different shape?
# - in this case will stack fail as different dim images
# - should auto pad or resize? what if this occurs in other data formats?
# a sequence of image batches
if np.asarray(data).ndim == 5: # video data
images = np.asarray(data)
targets = None
metadata = []
elif data[0].ndim == 4:
images = np.vstack([np.asarray(batch) for batch in data])
targets = None
metadata = []
# a sequence of single images
else:
images = np.stack([np.asarray(batch) for batch in data])
targets = None
metadata = []
return images, targets, metadata
def __handle_dataset_attr(
data: Any, # noqa: ANN401
image_list: list,
targets: Any, # noqa: ANN401
metadata: list,
) -> tuple[list, list, list]:
"""If dataset and has specified attributes, generate images, targets, metadata.
Args:
data (Any): Input data.
image_list (list): Initialized image list.
targets (list): Initialized targets list.
metadata (list): Initialized metadata list.
Raises:
ValueError: If dataset does not implement __getitem__ or __iter__.
Returns:
tuple[list, list, list]: images, targets, metadata
"""
if hasattr(data, "__getitem__"):
image_list = []
targets = []
metadata = []
for item in data:
image_list.append(item[0])
targets.append(item[1])
metadata.append(item[2])
elif hasattr(data, "__iter__"):
image_list = [item[0] for item in data]
targets = [item[1] for item in data]
metadata = [item[2] for item in data if len(item) == 3]
else:
raise ValueError("Dataset must implement __getitem__ or __iter__.")
return image_list, targets, metadata
def __check_dataset_lengths(image_list: list, targets: list) -> None:
"""Check to see of image list is nonempty and same length as targets.
Args:
image_list (list): Images.
targets (list): Targets.
Raises:
ValueError: If images and targets are not the same length.
ValueError: If image list is empty.
"""
# check images, targets and meta are same length
if len(image_list) != len(targets):
raise ValueError("Images and targets must be same length.")
# check not empty data
if not len(image_list) > 0:
raise ValueError("Images should not be empty.")
def __handle_dataset_images_types(image_list: list, images: NDArray[np.float32]) -> NDArray[np.float32]:
"""Handle dataset images of type PIL, NDArray, tensor, list.
Args:
image_list (list): Images.
images (NDArray[np.float32]): np array formatted images.
Returns:
NDArray[np.float32]: Images in NDArray format.
"""
try:
from PIL.Image import Image as PILImage
except ImportError as e:
raise ImportError("The 'Pillow' package is required but not installed.") from e
if isinstance(image_list[0], PILImage):
return np.asarray(image_list).transpose(0, 3, 1, 2).astype(np.float32)
if isinstance(image_list[0], np.ndarray) or is_tensor(image_list[0]):
return __handle_ndarray_or_tensor(image_list)
if isinstance(image_list[0], list):
return __handle_nested_list_images(image_list)
return images
def __handle_ndarray_or_tensor(image_list: list[ArrayLike]) -> NDArray[np.float32]:
"""Convert ndarray or tensor images to NDArray format.
Args:
image_list (list): List of ndarrays or tensors. Video ndarrays or tensors must have
at least 5 dimensions (batch, channels, frames, height, width) or
(batch, frames, channels, height, width). Image ndarrays or tensors have no more
than 4 dimensions (batch, channels, height, width).
Returns:
NDArray[np.float32]: Images in NDArray format.
"""
if np.asarray(image_list).ndim == 5: # video data
return np.asarray(image_list)
if image_list[0].ndim == 3:
return np.asarray(image_list).astype(np.float32)
return np.concatenate([np.asarray(batch) for batch in image_list])
def __handle_nested_list_images(image_list: list[list[ArrayLike]]) -> NDArray[np.float32]:
"""Convert nested list of images to NDArray format.
Args:
image_list (list): Nested list of images.
Returns:
NDArray[np.float32]: Images in NDArray format.
"""
return np.asarray(image_list).astype(np.float32)
def __handle_dataset_targets(targets: Any) -> Any: # noqa ANN401
"""Convert dataset targets to correct type.
Args:
targets (list): Targets.
Returns:
Any: Targets in NDArray or list if object detection.
"""
# for object detection, convert to correct format if exist
if hasattr(targets[0], "boxes") and hasattr(targets[0], "scores"):
targets = [
{
"boxes": np.asarray(t.boxes).astype(np.float32),
"scores": np.asarray(t.scores).astype(np.float32),
"labels": np.asarray(t.labels).astype(np.int64),
}
for t in targets
]
elif isinstance(targets[0], np.ndarray) or is_tensor(targets[0]):
targets = np.asarray(targets)
elif not any(targets):
targets = None
else:
targets = np.asarray(targets)
return targets
[docs]
def adjust_bboxes_resize(
bboxes: list[list[float]],
original_width: int,
original_height: int,
target_width: int = 640,
target_height: int = 640,
) -> list[list[float]]:
"""
Adjust bounding boxes for images resized to a fixed width and height (640x640).
Args:
bboxes (list of lists): Bounding boxes in [x, y, width, height] format.
original_width (int): Original width of the image.
original_height (int): Original height of the image.
target_width (int): Target width of the resized image.
target_height (int): Target height of the resized image.
Returns:
list of lists: Adjusted bounding boxes in [x_min, y_min, x_max, y_max] format.
"""
# Compute scaling factors
scale_x = target_width / original_width
scale_y = target_height / original_height
adjusted_bboxes = []
for bbox in bboxes:
x, y, width, height = bbox
x_min = x * scale_x
y_min = y * scale_y
x_max = (x + width) * scale_x
y_max = (y + height) * scale_y
adjusted_bboxes.append([x_min, y_min, x_max, y_max])
return adjusted_bboxes