Source code for kwimage.structs.heatmap

"""
TODO:
    - [ ] Remove doctest dependency on ndsampler?

    - [ ] Remove the datakeys that tries to define what heatmap should represent
          (e.g. class_probs, keypoints, etc...) and instead just focus on a
          data structure that stores a [C, H, W] or [H, W] tensor?

CommandLine:
    xdoctest -m ~/code/kwimage/kwimage/structs/heatmap.py __doc__

Example:
    >>> # xdoctest: +REQUIRES(module:ndsampler)
    >>> # xdoctest: +REQUIRES(--mask)
    >>> from kwimage.structs.heatmap import *  # NOQA
    >>> import kwimage
    >>> import ndsampler
    >>> sampler = ndsampler.CocoSampler.demo('shapes')
    >>> iminfo, anns = sampler.load_image_with_annots(1)
    >>> image = iminfo['imdata']
    >>> input_dims = image.shape[0:2]
    >>> kp_classes = sampler.dset.keypoint_categories()
    >>> dets = kwimage.Detections.from_coco_annots(
    >>>     anns, sampler.dset.dataset['categories'],
    >>>     sampler.catgraph, kp_classes, shape=input_dims)
    >>> bg_size = [100, 100]
    >>> heatmap = dets.rasterize(bg_size, input_dims, soften=2)
    >>> # xdoctest: +REQUIRES(--show)
    >>> import kwplot
    >>> kwplot.autompl()
    >>> kwplot.figure(fnum=1, doclf=True)
    >>> kwplot.imshow(image)
    >>> heatmap.draw(invert=True, kpts=[0, 1, 2, 3, 4])

Example:
    >>> # xdoctest: +REQUIRES(module:ndsampler)
    >>> # xdoctest: +REQUIRES(--mask)
    >>> from kwimage.structs.heatmap import *  # NOQA
    >>> from kwimage.structs.detections import _dets_to_fcmaps
    >>> import kwimage
    >>> import ndsampler
    >>> sampler = ndsampler.CocoSampler.demo('shapes')
    >>> iminfo, anns = sampler.load_image_with_annots(1)
    >>> image = iminfo['imdata']
    >>> input_dims = image.shape[0:2]
    >>> kp_classes = sampler.dset.keypoint_categories()
    >>> dets = kwimage.Detections.from_coco_annots(
    >>>     anns, sampler.dset.dataset['categories'],
    >>>     sampler.catgraph, kp_classes, shape=input_dims)
    >>> bg_size = [100, 100]
    >>> bg_idxs = sampler.catgraph.index('background')
    >>> fcn_target = _dets_to_fcmaps(dets, bg_size, input_dims, bg_idxs)
    >>> fcn_target.keys()
    >>> print('fcn_target: ' + ub.urepr(ub.map_vals(lambda x: x.shape, fcn_target), nl=1))
    >>> # xdoctest: +REQUIRES(--show)
    >>> import kwplot
    >>> kwplot.autompl()
    >>> size_mask = fcn_target['size']
    >>> dxdy_mask = fcn_target['dxdy']
    >>> cidx_mask = fcn_target['cidx']
    >>> kpts_mask = fcn_target['kpts']
    >>> def _vizmask(dxdy_mask):
    >>>     dx, dy = dxdy_mask
    >>>     mag = np.sqrt(dx ** 2 + dy ** 2)
    >>>     mag /= (mag.max() + 1e-9)
    >>>     mask = (cidx_mask != 0).astype(np.float32)
    >>>     angle = np.arctan2(dy, dx)
    >>>     orimask = kwplot.make_orimask(angle, mask, alpha=mag)
    >>>     vecmask = kwplot.make_vector_field(
    >>>         dx, dy, stride=4, scale=0.1, thickness=1, tipLength=.2,
    >>>         line_type=16)
    >>>     return [vecmask, orimask]
    >>> vecmask, orimask = _vizmask(dxdy_mask)
    >>> raster = kwimage.overlay_alpha_layers(
    >>>     [vecmask, orimask, image], keepalpha=False)
    >>> raster = dets.draw_on((raster * 255).astype(np.uint8),
    >>>                       labels=True, alpha=None)
    >>> kwplot.imshow(raster)
    >>> kwplot.show_if_requested()
"""
import numpy as np
import ubelt as ub
import kwarray
import functools
from . import _generic


__docstubs__ = """
from kwimage._typing import SKImageGeometricTransform
"""


[docs] class _HeatmapDrawMixin(object): """ mixin methods for drawing heatmap details """
[docs] def _colorize_class_idx(self): """ """ # Ignore cases where index is negative? cidxs = kwarray.ArrayAPI.numpy(self.data['class_idx']).astype(int) import networkx as nx import kwimage classes = self.meta['classes'] backup_colors = iter(kwimage.Color.distinct(len(classes))) name_to_color = {} if hasattr(classes, 'graph'): name_to_color = nx.get_node_attributes(classes.graph, 'color') for node in classes.graph.nodes: color = classes.graph.nodes[node].get('color', None) if color is None: color = next(backup_colors) name_to_color[node] = kwimage.Color(color).as01() else: name_to_color = ub.dzip(classes, backup_colors) cx_to_color = np.array([name_to_color[cname] for cname in classes]) colorized = cx_to_color[cidxs] return colorized
[docs] def colorize(self, channel=None, invert=False, with_alpha=1.0, interpolation='linear', imgspace=False, cmap=None): """ Creates a colorized version of a heatmap channel suitable for visualization Args: channel (int | str): index of category to visualize, or a special code indicating how to visualize multiple classes. Can be class_idx, class_probs, or class_energy. imgspace (bool): colorize the image after warping into the image space. CommandLine: xdoctest -m ~/code/kwimage/kwimage/structs/heatmap.py _HeatmapDrawMixin.colorize --show Ignore: import xdev from kwimage.structs.heatmap import * # NOQA globals().update(xdev.get_func_kwargs(Heatmap.colorize)) Example: >>> # xdoctest: +REQUIRES(module:kwplot) >>> self = Heatmap.random(rng=0, dims=(32, 32)) >>> colormask1 = self.colorize(0, imgspace=False) >>> colormask2 = self.colorize(0, imgspace=True) >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> kwplot.imshow(colormask1, pnum=(1, 2, 1), fnum=1, title='output space') >>> kwplot.imshow(colormask2, pnum=(1, 2, 2), fnum=1, title='image space') >>> kwplot.show_if_requested() Example: >>> # xdoctest: +REQUIRES(module:kwplot) >>> self = Heatmap.random(rng=0, dims=(32, 32)) >>> colormask1 = self.colorize('diameter', imgspace=False) >>> colormask2 = self.colorize('diameter', imgspace=True) >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> kwplot.imshow(colormask1, pnum=(1, 2, 1), fnum=1, title='output space') >>> kwplot.imshow(colormask2, pnum=(1, 2, 2), fnum=1, title='image space') >>> kwplot.show_if_requested() Ignore: >>> # xdoctest: +REQUIRES(module:kwplot) >>> self = Heatmap.random(rng=0, dims=(32, 32)) >>> self.data['class_energy'] = (self.data['class_probs'] - .5) * 10 >>> colormask1 = self.colorize('class_energy_color', imgspace=False) >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> kwplot.imshow(colormask1, fnum=1, title='output space') >>> kwplot.show_if_requested() Ignore: >>> # xdoctest: +REQUIRES(module:kwplot) >>> import kwarray >>> import kwimage >>> rng = kwarray.ensure_rng(0) >>> class_probs = np.zeros((2, 32, 32)) >>> class_probs[0] = kwimage.Polygon.random(rng=rng).scale(16).translate(16).fill(class_probs[0], value=0.5) >>> class_probs[1] = kwimage.Polygon.random(rng=rng).scale(32).fill(class_probs[1], value=0.5) >>> self = kwimage.Heatmap(class_probs=class_probs) >>> canvas = self.colorize() >>> canvas = kwimage.overlay_alpha_images(canvas, np.zeros_like(canvas[:, :, 0:3])) >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> kwplot.imshow(canvas) """ import kwplot if channel is None: if 'class_idx' in self.data: channel = 'class_idx' elif 'class_probs' in self.data: channel = 'class_probs' elif 'class_energy' in self.data: channel = 'class_energy' else: raise Exception('unsure how to default channel') def _per_channel_color(data, with_alpha, classes=None): # Another hacky mode # data = a.data['class_energy'] import kwimage if len(data.shape) == 2: # add in prefix channel if its not there data = data[None, :, :] # Define default colors default_cidx_to_color = kwimage.Color.distinct(len(data)) # try and read colors from classes CategoryTree try: cidx_to_color = [] for cidx in range(len(data)): node = classes[cidx] color = classes.graph.nodes[node].get('color', None) if True: assert color is not None if color is None: # fallback, ignore conflicts color = default_cidx_to_color[cidx] else: color = kwimage.Color(color).as01() cidx_to_color.append(color) except Exception: # fallback on default colors cidx_to_color = default_cidx_to_color # Each class gets its own color, and modulates the alpha layers = [] for cidx, chan in enumerate(data): color = cidx_to_color[cidx] layer = np.empty(tuple(chan.shape) + (4,)) layer[..., 3] = chan layer[..., 0:3] = color layers.append(layer) colormask = kwimage.overlay_alpha_layers(layers) colormask[..., 3] *= with_alpha return colormask if channel in ['class_idx', 'idx']: # HACK import kwimage colormask = self._colorize_class_idx() colormask = kwimage.ensure_alpha_channel(colormask, with_alpha) if imgspace: import torch chw = torch.Tensor(colormask.transpose(2, 0, 1)) colormask = self._warp_imgspace(chw, interpolation=interpolation).transpose(1, 2, 0) return colormask if isinstance(channel, str): # TODO: this is a bit hacky / inefficient, needs cleanup if imgspace: mat = self.tf_data_to_img.params output_dims = self.img_dims a = self.warp(mat, version='old', output_dims=output_dims).numpy() else: a = self if channel == 'offset': mask = np.linalg.norm(a.offset, axis=0) elif channel == 'diameter': mask = np.linalg.norm(a.diameter, axis=0) elif channel == 'class_probs_max': if 'class_probs' in a.data: data = a.data['class_probs'] else: # HACK HACK HACK data = a.data['class_energy'] low = min(0, data.min()) high = max(1, data.max()) data = (data - low) / (high - low) mask = data.max(axis=0) elif channel == 'class_energy_max': mask = a.data['class_energy'].max(axis=0) mask -= mask.min() elif channel == 'class_probs_color' or channel == 'class_probs': if 'class_probs' in a.data: data = a.data['class_probs'] else: # HACK HACK HACK data = a.data['class_energy'] low = min(0, data.min()) high = max(1, data.max()) data = (data - low) / (high - low) classes = self.classes colormask = _per_channel_color(data, with_alpha, classes) return colormask elif channel == 'class_energy_color' or channel == 'class_energy': # Another hacky mode import scipy import scipy.special data = a.data['class_energy'] if 1: # Assume 0-1 range, but stretch beyond if needed low = min(0, data.min()) high = max(1, data.max()) data = (data - low) / (high - low) else: data = scipy.special.softmax(data, axis=0) classes = self.classes colormask = _per_channel_color(data, with_alpha, classes) return colormask else: raise KeyError(channel) mask = mask / np.maximum(mask.max(), 1e-9) else: if imgspace: mask = self.upscale(channel, interpolation=interpolation)[0] else: mask = self.class_probs[channel] if invert: mask = 1 - mask if cmap is None: cmap = 'plasma' colormask = kwplot.make_heatmask(mask, with_alpha=with_alpha, cmap=cmap) return colormask
[docs] def draw_stacked(self, image=None, dsize=(224, 224), ignore_class_idxs={}, top=None, chosen_cxs=None): """ Draws per-class probabilities and stacks them into a single image Example: >>> # xdoctest: +REQUIRES(module:kwplot) >>> self = Heatmap.random(rng=0, dims=(32, 32)) >>> stacked = self.draw_stacked() >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> kwplot.imshow(stacked) """ import kwimage import matplotlib as mpl import cv2 mat = None if image is not None: tf = self.tf_data_to_img if tf is not None: mat = np.linalg.inv(tf.params) cmap = mpl.cm.get_cmap('magma') level_dsize = self.class_probs.shape[-2:][::-1] if chosen_cxs is None: if top is not None: # Find the categories with the most "heat" cx_to_score = self.class_probs.mean(2).mean(1) for cx in ignore_class_idxs: cx_to_score[cx] = -np.inf chosen_cxs = kwarray.ArrayAPI.numpy(cx_to_score).argsort()[::-1][:top] else: chosen_cxs = np.arange(self.class_probs.shape[0]) if image is not None: if mat is not None: # warp image into dataspace dataspace_img = cv2.warpAffine(image, mat[0:2], dsize=level_dsize) else: dataspace_img = image small_img = cv2.resize(dataspace_img, dsize) colorized = [small_img] else: colorized = [] for cx in chosen_cxs: if cx in ignore_class_idxs: continue if self.classes: node = self.classes[cx] else: node = 'cx={}'.format(cx) c = self.class_probs[cx] c = cmap(c) c = (c[..., 0:3] * 255.0).astype(np.uint8) c = cv2.resize(c, dsize) c = kwimage.draw_text_on_image(c, '{}'.format(node), (0, 20), fontScale=.5) # kwplot.imshow(c, title=str(i), fnum=2) colorized.append(c) stacked = kwimage.stack_images(colorized, overlap=-3, axis=1) return stacked
[docs] def draw(self, channel=None, image=None, imgspace=None, **kwargs): """ Accepts same args as draw_on, but uses maplotlib Args: channel (int | str): category index to visualize, or special key """ # If draw doesnt exist use draw_on import numpy as np import kwplot if image is None: if imgspace: dims = self.img_dims else: dims = self.bounds shape = tuple(dims) + (4,) image = np.zeros(shape, dtype=np.float32) image = self.draw_on(image, channel=channel, imgspace=imgspace, **kwargs) kwplot.imshow(image)
[docs] def draw_on(self, image=None, channel=None, invert=False, with_alpha=1.0, interpolation='linear', vecs=False, kpts=None, imgspace=None): """ Overlays a heatmap channel on top of an image Args: image (ndarray): image to draw on, if unspecified one is created. channel (int | str): category index to visualize, or special key. special keys are: class_idx, class_probs, class_idx imgspace (bool): colorize the image after warping into the image space. TODO: - [ ] Find a way to visualize offset, diameter, and class_probs either individually or all at the same time CommandLine: xdoctest -m /home/joncrall/code/kwimage/kwimage/structs/heatmap.py Example: >>> # xdoctest: +REQUIRES(module:kwplot) >>> import kwarray >>> import kwimage >>> image = kwimage.grab_test_image('astro') >>> probs = kwimage.gaussian_patch(image.shape[0:2])[None, :] >>> probs = probs / probs.max() >>> class_probs = kwarray.ArrayAPI.cat([probs, 1 - probs], axis=0) >>> self = kwimage.Heatmap(class_probs=class_probs, offset=5 * np.random.randn(2, *probs.shape[1:])) >>> toshow = self.draw_on(image, 0, vecs=True, with_alpha=0.85) >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> kwplot.imshow(toshow) Example: >>> # xdoctest: +REQUIRES(module:kwplot) >>> # xdoctest: +REQUIRES(module:ndsampler) >>> import kwimage >>> self = kwimage.Heatmap.random(dims=(200, 200), dets='coco', keypoints=True) >>> image = kwimage.grab_test_image('astro') >>> toshow = self.draw_on(image, 0, vecs=False, with_alpha=0.85) >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> kwplot.imshow(toshow) Example: >>> # xdoctest: +REQUIRES(module:kwplot) >>> # xdoctest: +REQUIRES(module:ndsampler) >>> import kwimage >>> self = kwimage.Heatmap.random(dims=(200, 200), dets='coco', keypoints=True) >>> kpts = [6] >>> self = self.warp(self.tf_data_to_img.params) >>> image = kwimage.grab_test_image('astro') >>> image = kwimage.ensure_alpha_channel(image) >>> toshow = self.draw_on(image, 0, with_alpha=0.85, kpts=kpts) >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> kwplot.imshow(toshow) Example: >>> # xdoctest: +REQUIRES(module:kwplot) >>> # xdoctest: +REQUIRES(module:ndsampler) >>> import kwimage >>> mask = np.random.rand(32, 32) >>> self = kwimage.Heatmap( >>> class_probs=mask, >>> img_dims=mask.shape[0:2], >>> tf_data_to_img=np.eye(3), >>> ) >>> canvas = self.draw_on() >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> kwplot.imshow(canvas) Ignore: import xdev globals().update(xdev.get_func_kwargs(Heatmap.draw_on)) """ import kwimage if image is None: if imgspace: image = np.zeros(self.img_dims) else: image = np.zeros((*self.shape[-2:], 3)) if channel is None: if 'class_idx' in self.data: channel = 'class_idx' elif 'class_probs' in self.data: channel = 'class_probs' elif 'class_energy' in self.data: channel = 'class_energy' else: raise Exception('unsure how to default channel') if imgspace is None: if np.all(image.shape[0:2] == np.array(self.img_dims)): imgspace = True colormask = self.colorize(channel, invert=invert, with_alpha=with_alpha, interpolation=interpolation, imgspace=imgspace) dtype_fixer = _generic._consistent_dtype_fixer(image) image = kwimage.ensure_float01(image) layers = [] vec_colors = kwimage.Color.distinct(2) vec_alpha = .5 if kpts is not None: # TODO: make a nicer keypoint offset vector visuliazation if kpts is True: if self.data.get('keypoints', None) is not None: keypoints = self.data['keypoints'] kpts = list(range(len(keypoints.shape[1]))) if not ub.iterable(kpts): kpts = [kpts] E = int(bool(vecs)) vec_colors = kwimage.Color.distinct(len(kpts) + E) if vecs: if self.data.get('offset', None) is not None: #Hack # Visualize center offset vectors dy, dx = kwarray.ArrayAPI.numpy(self.data['offset']) color = vec_colors[0] vecmask = kwimage.make_vector_field( dx, dy, stride=4, scale=1.0, alpha=with_alpha * vec_alpha, color=color) vec_alpha = max(.1, vec_alpha - .1) import torch chw = torch.Tensor(vecmask.transpose(2, 0, 1)) vecalign = self._warp_imgspace(chw, interpolation=interpolation) vecalign = vecalign.transpose(1, 2, 0) layers.append(vecalign) if kpts is not None: import torch # TODO: make a nicer keypoint offset vector visuliazation if self.data.get('keypoints', None) is not None: keypoints = self.data['keypoints'] for i, k in enumerate(kpts): # color = (np.array(vec_colors[k]) * 255).astype(np.uint8) color = vec_colors[i + E] dy, dx = kwarray.ArrayAPI.numpy(keypoints[:, k]) vecmask = kwimage.make_vector_field(dx, dy, stride=8, scale=0.5, alpha=with_alpha * vec_alpha, color=color) vec_alpha = max(.1, vec_alpha - .1) chw = torch.Tensor(vecmask.transpose(2, 0, 1)) vecalign = self._warp_imgspace(chw, interpolation=interpolation) vecalign = vecalign.transpose(1, 2, 0) layers.append(vecalign) layers.append(colormask) layers.append(image) overlaid = kwimage.overlay_alpha_layers(layers) overlaid = dtype_fixer(overlaid, copy=False) return overlaid
[docs] class _HeatmapWarpMixin(object): """ mixin method having to do with warping and aligning heatmaps """
[docs] def _align_other(self, other): """ Warp another Heatmap (with the same underlying imgdims) into the same space as this heatmap. This lets us perform elementwise operations on the two heatmaps (like geometric mean). Args: other (Heatmap): the heatmap to align with `self` Returns: Heatmap: warped version of `other` that aligns with `self`. Example: >>> # xdoctest: +REQUIRES(module:torch) >>> from kwimage.structs.heatmap import * # NOQA >>> self = Heatmap.random((120, 130), img_dims=(200, 210), classes=2, nblips=10, rng=0) >>> other = Heatmap.random((60, 70), img_dims=(200, 210), classes=2, nblips=10, rng=1) >>> other2 = self._align_other(other) >>> assert self.shape != other.shape >>> assert self.shape == other2.shape >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> kwplot.imshow(self.colorize(0, imgspace=False), fnum=1, pnum=(2, 2, 1)) >>> kwplot.imshow(self.colorize(1, imgspace=False), fnum=1, pnum=(2, 2, 2)) >>> kwplot.imshow(other.colorize(0, imgspace=False), fnum=1, pnum=(2, 2, 3)) >>> kwplot.imshow(other.colorize(1, imgspace=False), fnum=1, pnum=(2, 2, 4)) """ if self is other: return other # The heatmaps must belong to the same image space assert self.classes == other.classes assert np.all(self.img_dims == other.img_dims) img_to_self = np.linalg.inv(self.tf_data_to_img.params) other_to_img = other.tf_data_to_img.params other_to_self = np.matmul(img_to_self, other_to_img) mat = other_to_self output_dims = self.class_probs.shape[1:] # other now exists in the same space as self new_other = other.warp(mat, output_dims=output_dims) return new_other
[docs] def _align(self, mask, interpolation='linear'): """ Align a linear combination of heatmap channels with the original image DEPRICATE """ import kwimage import cv2 M = self.tf_data_to_img.params[0:3] dsize = tuple(map(int, self.img_dims[::-1])) flags = kwimage.im_cv2._coerce_interpolation(interpolation) aligned = cv2.warpAffine(mask, M[0:2], dsize=tuple(dsize), flags=flags) aligned = np.clip(aligned, 0, 1) return aligned
[docs] def _warp_imgspace(self, chw, interpolation='linear'): import kwimage if self.tf_data_to_img is None and self.img_dims is None: aligned = chw.cpu().numpy() else: import torch if self.tf_data_to_img is None: # If img dims are the same then we dont need a transform we # know its identity if self.img_dims == self.dims: return chw.cpu().numpy() output_dims = self.img_dims mat = torch.Tensor(self.tf_data_to_img.params[0:3]) outputs = kwimage.warp_tensor( chw[None, :], mat, output_dims=output_dims, mode=interpolation ) aligned = outputs[0].cpu().numpy() return aligned
[docs] def upscale(self, channel=None, interpolation='linear'): """ Warp the heatmap with the image dimensions Args: channel (ndarray | None): if None, use class probs, else chw data. TODO: - [ ] Needs refactor Example: >>> # xdoctest: +REQUIRES(module:torch) >>> self = Heatmap.random(rng=0, dims=(32, 32)) >>> colormask = self.upscale() """ import torch if channel is None: chw = torch.Tensor(self.class_probs) else: chw = torch.Tensor(self.class_probs[channel])[None, :] aligned = self._warp_imgspace(chw, interpolation=interpolation) return aligned
# @profile
[docs] def warp(self, mat=None, input_dims=None, output_dims=None, interpolation='linear', modify_spatial_coords=True, int_interpolation='nearest', mat_is_xy=True, version=None): """ Warp all spatial maps. If the map contains spatial data, that data is also warped (ignoring the translation component). Args: mat (ArrayLike): transformation matrix input_dims (tuple): unused, only exists for compatibility output_dims (tuple): size of the output heatmap interpolation (str): see `kwimage.warp_tensor` int_interpolation (str): interpolation used for interger types (should be nearest) mat_is_xy (bool): set to false if the matrix is in yx space instead of xy space Returns: Heatmap: this heatmap warped into a new spatial dimension Ignore: # Verify swapping rows 0 and 1 and then swapping columns 0 and 1 # Produces a matrix that works with permuted coordinates # It does. import sympy a, b, c, d, e, f, g, h, i, x, y, z = sympy.symbols('a, b, c, d, e, f, g, h, i, x, y, z') M1 = sympy.Matrix([[a, b, c], [d, e, f], [g, h, i]]) M2 = sympy.Matrix([[e, d, f], [b, a, c], [h, g, i]]) xy = sympy.Matrix([[x], [y], [z]]) yx = sympy.Matrix([[y], [x], [z]]) R1 = M1.multiply(xy) R2 = M2.multiply(yx) R3 = sympy.Matrix([[R1[1]], [R1[0]], [R1[2]],]) assert R2 == R3 Example: >>> # xdoctest: +REQUIRES(module:torch) >>> from kwimage.structs.heatmap import * # NOQA >>> self = Heatmap.random(rng=0, keypoints=True) >>> S = 3.0 >>> mat = np.eye(3) * S >>> mat[-1, -1] = 1 >>> newself = self.warp(mat, np.array(self.dims) * S).numpy() >>> assert newself.offset.shape[0] == 2 >>> assert newself.diameter.shape[0] == 2 >>> f1 = newself.offset.max() / self.offset.max() >>> assert f1 == S >>> f2 = newself.diameter.max() / self.diameter.max() >>> assert f2 == S Example: >>> import kwimage >>> # xdoctest: +REQUIRES(module:ndsampler) >>> self = kwimage.Heatmap.random(dims=(100, 100), dets='coco', keypoints=True) >>> image = np.zeros(self.img_dims) >>> # xdoctest: +REQUIRES(module:kwplot) >>> toshow = self.draw_on(image, 1, vecs=True, with_alpha=0.85) >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> kwplot.figure(fnum=1, doclf=True) >>> kwplot.imshow(toshow) """ import kwimage import skimage if mat is None: mat = self.tf_data_to_img.params if isinstance(mat, skimage.transform.AffineTransform): mat = mat.params elif isinstance(mat, kwimage.Affine): mat = mat.matrix newdata = {} newmeta = self.meta.copy() impl = kwarray.ArrayAPI.coerce('tensor') if version is None: import warnings warnings.warn(ub.paragraph( ''' The old mat_is_xy logic has changed. Please ensure your application works with the old logic. Then set version='old' or 'new'. Both disable this warning message. ''')) version = 'old' # Change if matrix is in X/Y or Y/X coords. if version == 'new': if not mat_is_xy: mat = mat[[1, 0, 2], :][:, [1, 0, 2]] elif version == 'old': if mat_is_xy: mat = mat[[1, 0, 2], :][:, [1, 0, 2]] else: raise KeyError(version) mat = impl.asarray(mat) mat_np = impl.numpy(mat) tf = skimage.transform.AffineTransform(matrix=mat_np) # hack: need to get a version of the matrix without any translation tf_notrans = _remove_translation(tf) import torch mat_notrans = torch.Tensor(tf_notrans.params) if output_dims is None: # If output dimensions are not specified warp the existing dims # according to scale. NOTE: old behavior was to use the img_dims # but this has problems when we are making something smaller. def _auto_select_warped_output_shape(mat): h, w = self.dims # Warp corners of the box and determine a new output shape corners = kwimage.Coords(np.array([ [0., 0], [w, 0], [w, h], [0, h], ])) corners2 = corners.warp(mat.numpy()) wh2 = corners2.data.clip(1, None).max(axis=0) w2, h2 = np.ceil(wh2).astype(int).tolist() output_dims = (w2, h2) return output_dims output_dims = _auto_select_warped_output_shape(mat_notrans) if self.img_dims is not None: import warnings warnings.warn( 'NOTE: automatic selection of output_dims has changed. ' 'Previously it would use the img_dims, but now it calculates ' 'the output_dims based on mat and the current dims. ' 'Please check that your code still works and specify ' 'output_dims explicitly to supress this message.') # output_dims = self.img_dims # Modify data_to_img so the new heatmap will also properly upscale to # the image coordinates. inv_tf = skimage.transform.AffineTransform(matrix=tf._inv_matrix) # newmeta['tf_data_to_img'] = self.tf_data_to_img + inv_tf # NOTE: The old models were working with the above code, but I think # thats because there was no translation factor. I'm pretty sure the # code on the bottom is correct. Obviously if something messes up, it # should probably be reverted. Left-vs-right is hard. if self.tf_data_to_img is not None: newmeta['tf_data_to_img'] = inv_tf + self.tf_data_to_img for k, v in self.data.items(): if v is not None: v = kwarray.ArrayAPI.tensor(v) # For spatial keys we need to transform the underlying values # in addition to where those values are located. if modify_spatial_coords: if k in self.__spatialkeys__: pts = impl.contiguous(impl.T(v)) pts = kwimage.warp_points(mat_notrans, pts) v = impl.contiguous(impl.T(pts)) if kwarray.ArrayAPI.dtype_kind(v) == 'i': # use different interpolation for integer types if int_interpolation != 'nearest': warnings.warn('Using non-nearest int interpolation') new_v = kwimage.warp_tensor( v[None, :].float(), mat, output_dims=output_dims, mode=int_interpolation)[0] else: new_v = kwimage.warp_tensor( v[None, :].float(), mat, output_dims=output_dims, mode=interpolation)[0] newdata[k] = impl.asarray(new_v) newself = self.__class__(newdata, newmeta) return newself
[docs] def scale(self, factor, output_dims=None, interpolation='linear'): """ Scale the heatmap """ if not ub.iterable(factor): s1 = s2 = factor else: s1, s2 = factor mat = np.array([ [s1, 0, 0], [ 0, s2, 0], [ 0, 0, 1.], ]) return self.warp(mat, output_dims=output_dims, version='old', interpolation=interpolation)
[docs] def translate(self, offset, output_dims=None, interpolation='linear'): if not ub.iterable(offset): tx = ty = offset else: tx, ty = offset mat = np.array([ [1, 0, tx], [0, 1, ty], [0, 0, 1.], ]) return self.warp(mat, output_dims=output_dims, version='old', interpolation=interpolation)
[docs] class _HeatmapAlgoMixin(object): """ Algorithmic operations on heatmaps """
[docs] @classmethod def combine(cls, heatmaps, root_index=None, dtype=np.float32): """ Combine multiple heatmaps into a single heatmap. Args: heatmaps (Sequence[Heatmap]): multiple heatmaps to combine into one root_index (int): which heatmap in the sequence to align other heatmaps with Returns: Heatmap: the combined heatmap Example: >>> # xdoctest: +REQUIRES(module:torch) >>> from kwimage.structs.heatmap import * # NOQA >>> a = Heatmap.random((120, 130), img_dims=(200, 210), classes=2, nblips=10, rng=0) >>> b = Heatmap.random((60, 70), img_dims=(200, 210), classes=2, nblips=10, rng=1) >>> c = Heatmap.random((40, 30), img_dims=(200, 210), classes=2, nblips=10, rng=1) >>> heatmaps = [a, b, c] >>> newself = Heatmap.combine(heatmaps, root_index=2) >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> kwplot.imshow(a.colorize(0, imgspace=1), fnum=1, pnum=(4, 2, 1)) >>> kwplot.imshow(a.colorize(1, imgspace=1), fnum=1, pnum=(4, 2, 2)) >>> kwplot.imshow(b.colorize(0, imgspace=1), fnum=1, pnum=(4, 2, 3)) >>> kwplot.imshow(b.colorize(1, imgspace=1), fnum=1, pnum=(4, 2, 4)) >>> kwplot.imshow(c.colorize(0, imgspace=1), fnum=1, pnum=(4, 2, 5)) >>> kwplot.imshow(c.colorize(1, imgspace=1), fnum=1, pnum=(4, 2, 6)) >>> kwplot.imshow(newself.colorize(0, imgspace=1), fnum=1, pnum=(4, 2, 7)) >>> kwplot.imshow(newself.colorize(1, imgspace=1), fnum=1, pnum=(4, 2, 8)) >>> # xdoctest: +REQUIRES(--show) >>> kwplot.imshow(a.colorize('offset', imgspace=1), fnum=2, pnum=(4, 1, 1)) >>> kwplot.imshow(b.colorize('offset', imgspace=1), fnum=2, pnum=(4, 1, 2)) >>> kwplot.imshow(c.colorize('offset', imgspace=1), fnum=2, pnum=(4, 1, 3)) >>> kwplot.imshow(newself.colorize('offset', imgspace=1), fnum=2, pnum=(4, 1, 4)) >>> # xdoctest: +REQUIRES(--show) >>> kwplot.imshow(a.colorize('diameter', imgspace=1), fnum=3, pnum=(4, 1, 1)) >>> kwplot.imshow(b.colorize('diameter', imgspace=1), fnum=3, pnum=(4, 1, 2)) >>> kwplot.imshow(c.colorize('diameter', imgspace=1), fnum=3, pnum=(4, 1, 3)) >>> kwplot.imshow(newself.colorize('diameter', imgspace=1), fnum=3, pnum=(4, 1, 4)) """ # define arithmetic and geometric mean amean = functools.partial(np.mean, axis=0) # If the root is not specified use the largest heatmap if root_index is None: root_index = ub.argmax([np.prod(h.shape) for h in heatmaps]) root = heatmaps[root_index] aligned_heatmaps = [root._align_other(h).numpy() for h in heatmaps] aligned_root = aligned_heatmaps[root_index] # Use the appropriate mean for each type of data newdata = {} if 'class_probs' in aligned_root.data: import warnings with warnings.catch_warnings(): warnings.filterwarnings('ignore', 'divide by zero') tmp = np.array([h.class_probs.astype(dtype) for h in aligned_heatmaps], dtype=dtype) newdata['class_probs'] = _gmean(tmp, clobber=True) tmp = None if 'offset' in aligned_root.data: newdata['offset'] = amean([h.offset for h in aligned_heatmaps]) if 'diameter' in aligned_root.data: newdata['diameter'] = amean([h.diameter for h in aligned_heatmaps]) if 'keypoints' in aligned_root.data and aligned_root.data['keypoints'] is not None: newdata['keypoints'] = amean([h.data['keypoints'] for h in aligned_heatmaps]) newself = aligned_root.__class__(newdata, aligned_root.meta) return newself
[docs] def detect(self, channel, invert=False, min_score=0.01, num_min=10, max_dims=None, min_dims=None, dim_thresh_space='image'): """ Lossy conversion from a Heatmap to a Detections object. For efficiency, the detections are returned in the same space as the heatmap, which usually some downsampled version of the image space. This is because it is more efficient to transform the detections into image-space after non-max supression is applied. Args: channel (int | ArrayLike): class index to detect objects in. Alternatively, channel can be a custom probability map as long as its dimension agree with the heatmap. invert (bool): if True, inverts the probabilities in the chosen channel. (Useful if you have a background channel but want to detect foreground objects). min_score (float): probability threshold required for a pixel to be converted into a detection. Defaults to 0.1 num_min (int): always return at least `nmin` of the highest scoring detections even if they aren't above the `min_score` threshold. Defaults to 10. max_dims (Tuple[int, int]): maximum height / width of detections By default these are expected to be in image-space. min_dims (Tuple[int, int]): minimum height / width of detections By default these are expected to be in image-space. dim_thresh_space (str): When dim_thresh_space=='native', dimension thresholds (e.g. min_dims and max_dims) are specified in the native heatmap space (i.e. usually a downsampled space). If dim_thresh_space=='image', then dimension thresholds are interpreted in the original image space. Defaults to 'image' Returns: kwimage.Detections: raw detections. Note that these detections will not have class_idx populated It is the users responsbility to run non-max suppression on these results to remove duplicate detections. SeeAlso: Detections.rasterize Example: >>> # xdoctest: +REQUIRES(module:ndsampler) >>> from kwimage.structs.heatmap import * # NOQA >>> import ndsampler >>> self = Heatmap.random(rng=2, dims=(32, 32)) >>> dets = self.detect(channel=0, max_dims=7, num_min=None) >>> img_dets = dets.warp(self.tf_data_to_img) >>> assert img_dets.boxes.to_xywh().width.max() <= 7 >>> assert img_dets.boxes.to_xywh().height.max() <= 7 >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> dets1 = dets.sort().take(range(30)) >>> colormask1 = self.colorize(0, imgspace=False) >>> kwplot.imshow(colormask1, pnum=(1, 2, 1), fnum=1, title='output space') >>> dets1.draw() >>> # Transform heatmap and detections into image space. >>> dets2 = dets1.warp(self.tf_data_to_img) >>> colormask2 = self.colorize(0, imgspace=True) >>> kwplot.imshow(colormask2, pnum=(1, 2, 2), fnum=1, title='image space') >>> dets2.draw() Example: >>> # xdoctest: +REQUIRES(module:ndsampler) >>> from kwimage.structs.heatmap import * # NOQA >>> import torch >>> import ndsampler >>> catgraph = ndsampler.CategoryTree.demo() >>> class_energy = torch.rand(len(catgraph), 32, 32) >>> class_probs = catgraph.hierarchical_softmax(class_energy, dim=0) >>> self = Heatmap.random(rng=0, dims=(32, 32), classes=catgraph, keypoints=True) >>> print(ub.urepr(ub.map_vals(lambda x: x.shape, self.data), nl=1)) >>> self.data['class_probs'] = class_probs.numpy() >>> channel = catgraph.index('background') >>> dets = self.detect(channel, invert=True) >>> class_idx, scores = catgraph.decision(dets.probs, dim=1) >>> dets.data['class_idx'] = class_idx >>> dets.data['scores'] = scores >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> dets1 = dets.sort().take(range(10)) >>> colormask1 = self.colorize(0, imgspace=False) >>> kwplot.imshow(colormask1, pnum=(1, 2, 1), fnum=1, title='output space') >>> dets1.draw(radius=1.0) >>> # Transform heatmap and detections into image space. >>> colormask2 = self.colorize(0, imgspace=True) >>> dets2 = dets1.warp(self.tf_data_to_img) >>> kwplot.imshow(colormask2, pnum=(1, 2, 2), fnum=1, title='image space') >>> dets2.draw(radius=1.0) """ if isinstance(channel, int): probs = self.class_probs[channel] else: probs = channel if invert: probs = 1 - probs if max_dims is not None: max_dims = max_dims if ub.iterable(max_dims) else (max_dims, max_dims) max_dims = np.array(max_dims) elif min_dims is not None: min_dims = min_dims if ub.iterable(min_dims) else (min_dims, min_dims) min_dims = np.array(min_dims) # Convert the dims to a native space if necessary if dim_thresh_space == 'image': # convert thresholds to native space # NOT SURE IF WE NEED TO INVERT XY HERE OR NOT scale_dims = self.tf_data_to_img.scale[::-2] if max_dims is not None: max_dims = max_dims / scale_dims if min_dims is not None: min_dims = min_dims / scale_dims elif dim_thresh_space != 'native': raise KeyError(dim_thresh_space) dets = _prob_to_dets( probs, diameter=self.data.get('diameter', None), offset=self.data.get('offset', None), class_probs=self.data.get('class_probs', None), keypoints=self.data.get('keypoints', None), min_score=min_score, num_min=num_min, max_dims=max_dims, min_dims=min_dims, ) if dets.data.get('keypoints', None) is not None: kp_classes = self.meta['kp_classes'] dets.data['keypoints'].meta['classes'] = kp_classes dets.meta['kp_classes'] = kp_classes dets.meta['classes'] = self.classes return dets
[docs] class Heatmap(_generic.Spatial, _HeatmapDrawMixin, _HeatmapWarpMixin, _HeatmapAlgoMixin): """ Keeps track of a downscaled heatmap and how to transform it to overlay the original input image. Heatmaps generally are used to estimate class probabilites at each pixel. This data struction additionally contains logic to augment pixel with offset (dydx) and scale (diamter) information. Attributes: data (Dict[str, ArrayLike]): dictionary containing spatially aligned heatmap data. Valid keys are as follows. class_probs (ArrayLike[C, H, W] | ArrayLike[C, D, H, W]): A probability map for each class. C is the number of classes. offset (ArrayLike[2, H, W] | ArrayLike[3, D, H, W], optional): object center position offset in y,x / t,y,x coordinates diamter (ArrayLike[2, H, W] | ArrayLike[3, D, H, W], optional): object bounding box sizes in h,w / d,h,w coordinates keypoints (ArrayLike[2, K, H, W] | ArrayLike[3, K, D, H, W], optional): y/x offsets for K different keypoint classes meta (Dict[str, object]): dictionary containing miscellanious metadata about the heatmap data. Valid keys are as follows. img_dims (Tuple[H, W] | Tuple[D, H, W]): original image dimension tf_data_to_image (SKImageGeometricTransform): transformation matrix (typically similarity or affine) that projects the given, heatmap onto the image dimensions such that the image and heatmap are spatially aligned. classes (List[str] | ndsampler.CategoryTree): information about which index in ``data['class_probs']`` corresponds to which semantic class. dims (Tuple): dimensions of the heatmap (See ``image_dims``) for the original image dimensions. **kwargs: any key that is accepted by the `data` or `meta` dictionaries can be specified as a keyword argument to this class and it will be properly placed in the appropriate internal dictionary. CommandLine: xdoctest -m ~/code/kwimage/kwimage/structs/heatmap.py Heatmap --show Example: >>> # xdoctest: +REQUIRES(module:torch) >>> from kwimage.structs.heatmap import * # NOQA >>> import skimage >>> import kwimage >>> class_probs = kwimage.grab_test_image(dsize=(32, 32), space='gray')[None, ..., 0] / 255.0 >>> img_dims = (220, 220) >>> tf_data_to_img = skimage.transform.AffineTransform(translation=(-18, -18), scale=(8, 8)) >>> self = Heatmap(class_probs=class_probs, img_dims=img_dims, >>> tf_data_to_img=tf_data_to_img) >>> aligned = self.upscale() >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> kwplot.imshow(aligned[0]) >>> kwplot.show_if_requested() Example: >>> # xdoctest: +REQUIRES(module:torch) >>> import kwimage >>> self = Heatmap.random() >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> self.draw() """ # Valid keys for the data dictionary __datakeys__ = ['class_probs', 'offset', 'diameter', 'keypoints', 'class_idx', 'class_energy'] # Valid keys for the meta dictionary __metakeys__ = ['img_dims', 'tf_data_to_img', 'classes', 'kp_classes'] __spatialkeys__ = ['offset', 'diameter', 'keypoints'] def __init__(self, data=None, meta=None, **kwargs): # Standardize input format if kwargs: if data or meta: raise ValueError('Cannot specify kwargs AND data/meta dicts') _datakeys = self.__datakeys__ _metakeys = self.__metakeys__ # Allow the user to specify custom data and meta keys if 'datakeys' in kwargs: _datakeys = _datakeys + list(kwargs.pop('datakeys')) if 'metakeys' in kwargs: _metakeys = _metakeys + list(kwargs.pop('metakeys')) # Perform input checks whenever kwargs is given data = {key: kwargs.pop(key) for key in _datakeys if key in kwargs} meta = {key: kwargs.pop(key) for key in _metakeys if key in kwargs} tf_data_to_img = meta.get('tf_data_to_img', None) if tf_data_to_img is not None: if isinstance(tf_data_to_img, np.ndarray): import skimage meta['tf_data_to_img'] = skimage.transform.AffineTransform( matrix=tf_data_to_img) if kwargs: raise ValueError( 'Unknown kwargs: {}'.format(sorted(kwargs.keys()))) elif isinstance(data, self.__class__): # Avoid runtime checks and assume the user is doing the right thing # if data and meta are explicitly specified meta = data.meta data = data.data if meta is None: meta = {} self.data = data self.meta = meta def __nice__(self): return '{} on img_dims={}'.format(self.shape, self.img_dims) def __getitem__(self, index): return self.class_probs[index] def __len__(self): return len(self.class_probs) @property def shape(self): shape = None try: shape = self.class_probs.shape except Exception: for key, value in self.data.items(): try: shape = value.shape except AttributeError: pass return shape @property def bounds(self): return self.shape[-2:] # return self.class_probs.shape[1:] @property def dims(self): """ space-time dimensions of this heatmap """ return self.shape[-2:] # return self.class_probs.shape[1:]
[docs] def is_numpy(self): return self._impl.is_numpy
[docs] def is_tensor(self): return self._impl.is_tensor
@property def _impl(self): """ Returns the internal tensor/numpy ArrayAPI implementation Returns: kwarray.ArrayAPI """ return kwarray.ArrayAPI.coerce(self.data['class_probs']) # @property # def device(self): # """ If the backend is torch returns the data device, otherwise None """ # return self.data['class_probs'].device
[docs] @classmethod def random(cls, dims=(10, 10), classes=3, diameter=True, offset=True, keypoints=False, img_dims=None, dets=None, nblips=10, noise=0.0, smooth_k=3, rng=None, ensure_background=True): """ Creates dummy data, suitable for use in tests and benchmarks Args: dims (Tuple[int, int]): dimensions of the heatmap classes (int | List[str] | kwcoco.CategoryTree): foreground classes diameter (bool): if True, include a "diameter" heatmap offset (bool): if True, include an "offset" heatmap keypoints (bool): smooth_k (int): kernel size for gaussian blur to smooth out the heatmaps. img_dims (Tuple): dimensions of an upscaled image the heatmap corresponds to. (This should be removed and simply handled with a transform in the future). Returns: Heatmap Example: >>> from kwimage.structs.heatmap import * # NOQA >>> self = Heatmap.random((128, 128), img_dims=(200, 200), >>> classes=3, nblips=10, rng=0, noise=0.1) >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> kwplot.imshow(self.colorize(0, imgspace=0), fnum=1, pnum=(1, 4, 1), doclf=1) >>> kwplot.imshow(self.colorize(1, imgspace=0), fnum=1, pnum=(1, 4, 2)) >>> kwplot.imshow(self.colorize(2, imgspace=0), fnum=1, pnum=(1, 4, 3)) >>> kwplot.imshow(self.colorize(3, imgspace=0), fnum=1, pnum=(1, 4, 4)) Ignore: self.detect(0).sort().non_max_supress()[-np.arange(1, 4)].draw() from kwimage.structs.heatmap import * # NOQA import xdev globals().update(xdev.get_func_kwargs(Heatmap.random)) Example: >>> # xdoctest: +REQUIRES(module:ndsampler) >>> import kwimage >>> self = kwimage.Heatmap.random(dims=(50, 200), dets='coco', >>> keypoints=True) >>> image = np.zeros(self.img_dims) >>> # xdoctest: +REQUIRES(module:kwplot) >>> toshow = self.draw_on(image, 1, vecs=True, kpts=0, with_alpha=0.85) >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> kwplot.figure(fnum=1, doclf=True) >>> kwplot.imshow(toshow) Ignore: >>> kwplot.figure(fnum=1, doclf=True) >>> kwplot.imshow(image) >>> dets.draw() >>> dets.data['keypoints'].draw(radius=6) >>> dets.data['segmentations'].draw() >>> self.draw() """ import kwimage rng = kwarray.ensure_rng(rng) if dets == 'coco': # special detections from the ndsampler coco demo import ndsampler sampler = ndsampler.CocoSampler.demo('photos') iminfo, anns = sampler.load_image_with_annots(1) image = iminfo['imdata'] input_dims = image.shape[:2] kp_classes = sampler.dset.keypoint_categories() dets = kwimage.Detections.from_coco_annots( anns, cats=sampler.dset.dataset['categories'], shape=input_dims, kp_classes=kp_classes) img_dims = input_dims if isinstance(classes, int): classes = ['class_{}'.format(c) for c in range(classes)] # Pretend this heatmap corresponds to some upscaled subregion import skimage if img_dims is None: scale = 1 + rng.rand(2) * 2 translation = rng.rand(2) * np.array(dims[::-1]) / 2 tf_data_to_img = skimage.transform.AffineTransform( scale=scale, translation=translation) wh_dims = dims[::-1] img_wh_dims = tuple(np.ceil(tf_data_to_img([wh_dims]))[0].astype(int).tolist()) img_dims = img_wh_dims[::-1] else: img_dims = np.array(img_dims) tf_data_to_img = skimage.transform.AffineTransform( scale=(img_dims / dims)[::-1], translation=(0, 0), ) # TODO: clean up method of making heatmap from detections if dets is None: # We are either given detections, or we make random ones dets = kwimage.Detections.random(num=nblips, scale=img_dims, keypoints=keypoints, classes=classes, rng=rng) if ensure_background: if 'background' not in dets.classes: dets.classes.append('background') classes = dets.classes else: classes = dets.classes # assume we have background # bg_idx = dets.classes.index('background') # Warp detections into heatmap space transform = np.linalg.inv(tf_data_to_img.params) warped_dets = dets.warp(transform, input_dims=img_dims, output_dims=dims) tf_notrans = _remove_translation(tf_data_to_img) bg_size = tf_notrans.inverse([100, 100])[0] self = warped_dets.rasterize(bg_size, input_dims=dims, soften=1, img_dims=img_dims, tf_data_to_img=tf_data_to_img) class_probs = self.data['class_probs'] noise = (rng.randn(*class_probs.shape) * noise) class_probs += noise np.clip(class_probs, 0, None, out=class_probs) # class_probs = class_probs / class_probs.sum(axis=0) class_probs = np.array([smooth_prob(p, k=smooth_k) for p in class_probs]) class_probs = class_probs / np.maximum(class_probs.sum(axis=0), 1e-9) if not offset: self.data.pop('offset') if not diameter: self.data.pop('diameter') if keypoints is not False and keypoints is not None: # self.data['keypoints'] = keypoints if 'kp_classes' not in locals(): kp_classes = list(range(self.data['keypoints'].shape[1])) # HACK self.meta['kp_classes'] = kp_classes self.meta['classes'] = classes return self
# --- Data Properties --- @property def class_probs(self): return self.data['class_probs'] @property def offset(self): return self.data['offset'] @property def diameter(self): return self.data['diameter'] # --- Meta Properties --- @property def img_dims(self): return self.meta.get('img_dims', None) @property def tf_data_to_img(self): return self.meta.get('tf_data_to_img', None) @property def classes(self): return self.meta.get('classes', None) # ---
[docs] def numpy(self): """ Converts underlying data to numpy arrays """ newdata = {} for key, val in self.data.items(): if val is None: newval = val else: newval = kwarray.ArrayAPI.numpy(val) newdata[key] = newval newself = self.__class__(newdata, self.meta) return newself
[docs] def tensor(self, device=ub.NoParam): """ Converts underlying data to torch tensors """ newdata = {} for key, val in self.data.items(): if val is None: newval = val else: newval = kwarray.ArrayAPI.tensor(val, device=device) newdata[key] = newval newself = self.__class__(newdata, self.meta) return newself
[docs] def _prob_to_dets(probs, diameter=None, offset=None, class_probs=None, keypoints=None, min_score=0.01, num_min=10, max_dims=None, min_dims=None): """ Directly convert a one-channel probability map into a Detections object. Helper for Heatmap.detect It does this by converting each pixel above a threshold in a probability map to a detection with a specified diameter. Args: probs (ArrayLike[H, W]) a one-channel probability map indicating the liklihood that each particular pixel should be detected as an object. diameter (ArrayLike[2, H, W] | Tuple): H, W sizes for the bounding box at each pixel location. If passed as a tuple, then all boxes receive that diameter. offset (Tuple | ArrayLike[2, H, W]): Y, X offsets from the pixel location to the bounding box center. If passed as a tuple, then all boxes receive that offset. class_probs (ArrayLike[C, H, W], optional): probabilities for each class at each pixel location. If specified, this will populate the `probs` attribute of the returned Detections object. keypoints (ArrayLike[2, K, H, W], optional): Keypoint predictions for all keypoint classes min_score (float): probability threshold required for a pixel to be converted into a detection. Defaults to 0.1 num_min (int): always return at least `nmin` of the highest scoring detections even if they aren't above the `min_score` threshold. Defaults to 10 Returns: kwimage.Detections: raw detections. It is the users responsbility to run non-max suppression on these results to remove duplicate detections. Example: >>> # xdoctest: +REQUIRES(module:torch) >>> import torch >>> rng = np.random.RandomState(0) >>> probs = rng.rand(3, 3).astype(np.float32) >>> min_score = .5 >>> diameter = [10, 10] >>> dets = _prob_to_dets(probs, diameter, min_score=min_score) >>> assert dets.boxes.data.dtype.kind == 'f' >>> assert len(dets) == 9 >>> dets = _prob_to_dets(torch.FloatTensor(probs), diameter, min_score=min_score) >>> assert dets.boxes.data.dtype.is_floating_point >>> assert len(dets) == 9 Example: >>> # xdoctest: +REQUIRES(module:torch) >>> import kwimage >>> from kwimage.structs.heatmap import * >>> from kwimage.structs.heatmap import _prob_to_dets >>> heatmap = kwimage.Heatmap.random(rng=0, dims=(3, 3), keypoints=True) >>> # Try with numpy >>> min_score = .5 >>> dets = _prob_to_dets(heatmap.class_probs[0], heatmap.diameter, >>> heatmap.offset, heatmap.class_probs, >>> heatmap.data['keypoints'], >>> min_score) >>> assert dets.boxes.data.dtype.kind == 'f' >>> assert 'keypoints' in dets.data >>> dets_np = dets >>> # Try with torch >>> heatmap = heatmap.tensor() >>> dets = _prob_to_dets(heatmap.class_probs[0], heatmap.diameter, >>> heatmap.offset, heatmap.class_probs, >>> heatmap.data['keypoints'], >>> min_score) >>> assert dets.boxes.data.dtype.is_floating_point >>> assert len(dets) == len(dets_np) >>> dets_torch = dets >>> assert np.all(dets_torch.numpy().boxes.data == dets_np.boxes.data) Ignore: import kwil kwil.autompl() dets.draw(setlim=True, radius=.1) Example: >>> heatmap = Heatmap.random(rng=0, dims=(3, 3), diameter=1) >>> probs = heatmap.class_probs[0] >>> diameter = heatmap.diameter >>> offset = heatmap.offset >>> class_probs = heatmap.class_probs >>> min_score = 0.5 >>> dets = _prob_to_dets(probs, diameter, offset, class_probs, None, min_score) """ impl = kwarray.ArrayAPI.impl(probs) if diameter is None: diameter = 1 if offset is None: offset = 0 diameter_is_uniform = tuple(getattr(diameter, 'shape', []))[1:] != tuple(probs.shape) offset_is_uniform = tuple(getattr(offset, 'shape', []))[1:] != tuple(probs.shape) if diameter_is_uniform: if hasattr(diameter, 'shape'): if len(diameter.shape) > 2: raise Exception('Trailing diameter shape={} does not agree with probs.shape={}'.format( diameter.shape, probs.shape)) if not ub.iterable(diameter): diameter = [diameter, diameter] if offset_is_uniform: if not ub.iterable(offset): offset = impl.asarray([offset, offset]) flags = probs > min_score if not diameter_is_uniform: if max_dims is not None: max_dims = max_dims if ub.iterable(max_dims) else (max_dims, max_dims) max_height, max_width = max_dims if max_height is not None: flags &= diameter[0] <= max_height if max_width is not None: flags &= diameter[1] <= max_width if min_dims is not None: min_dims = min_dims if ub.iterable(min_dims) else (min_dims, min_dims) min_height, min_width = min_dims if min_height is not None: flags &= diameter[0] >= min_height if min_width is not None: flags &= diameter[1] >= min_width # Ensure that some detections are returned even if none are above the # threshold. if num_min is not None: numel = impl.numel(flags) if flags.sum() < num_min: if impl.is_tensor: topxs = probs.view(-1).argsort()[max(0, numel - num_min):numel] flags.view(-1)[topxs] = 1 else: idxs = kwarray.argmaxima(probs, num=num_min, ordered=False) # idxs = probs.argsort(axis=None)[-num_min:] flags.ravel()[idxs] = True yc, xc = impl.nonzero(flags) yc_ = impl.astype(yc, np.float32) xc_ = impl.astype(xc, np.float32) if diameter_is_uniform: h = impl.full_like(yc_, fill_value=diameter[0]) w = impl.full_like(xc_, fill_value=diameter[1]) else: h = impl.astype(diameter[0][flags], np.float32) w = impl.astype(diameter[1][flags], np.float32) cxywh = impl.cat([xc_[:, None], yc_[:, None], w[:, None], h[:, None]], axis=1) import kwimage ltrb = kwimage.Boxes(cxywh, 'cxywh').toformat('ltrb') scores = probs[flags] # TODO: # Can we extract the detected segmentation mask/poly here as well? dets = kwimage.Detections(boxes=ltrb, scores=scores) # Get per-class probs for each detection if class_probs is not None: det_probs = impl.T(class_probs[:, yc, xc]) dets.data['probs'] = det_probs if offset is not None: if offset_is_uniform: det_dxdy = offset[[1, 0]] else: det_dxdy = impl.T(offset[:, yc, xc][[1, 0]]) dets.boxes.translate(det_dxdy, inplace=True) if keypoints is not None: # Take keypoint predictions for each remaining detection det_kpts_xy = impl.contiguous(impl.T(keypoints[:, :, yc, xc][[1, 0]])) # Translate keypoints to absolute coordinates det_kpts_xy[..., 0] += xc_[:, None] det_kpts_xy[..., 1] += yc_[:, None] # The shape of det_kpts_xy is [N, K, 2] # TODO: need to package kp_classes as well # TODO: can we make this faster? It is bottlenecking, in this instance # the points list wont be jagged, so perhaps we can use a denser data # structure? if 1: # Try using a dense homogenous data structure det_coords = kwimage.Coords(det_kpts_xy) det_kpts = kwimage.Points({'xy': det_coords}) else: # Using a jagged non-homogenous data structure is slow det_coords = [ kwimage.Coords(xys) for xys in det_kpts_xy ] det_kpts = kwimage.PointsList([ kwimage.Points({'xy': xy}) for xy in det_coords ]) dets.data['keypoints'] = det_kpts assert len(dets.scores.shape) == 1 return dets
[docs] def smooth_prob(prob, k=3, inplace=False, eps=1e-9): """ Smooths the probability map, but preserves the magnitude of the peaks. Note: even if inplace is true, we still need to make a copy of the input array, however, we do ensure that it is cleaned up before we leave the function scope. sigma=0.8 @ k=3, sigma=1.1 @ k=5, sigma=1.4 @ k=7 """ import cv2 sigma = 0.3 * ((k - 1) * 0.5 - 1) + 0.8 # opencv formula blur = cv2.GaussianBlur(prob, (k, k), sigma) # Shift and scale the intensities so the maximum and minimum # pixel value in the blurred image match the original image minpos = np.unravel_index(blur.argmin(), blur.shape) maxpos = np.unravel_index(blur.argmax(), blur.shape) shift = prob[minpos] - blur[minpos] scale = prob[maxpos] / np.maximum((blur[maxpos] + shift), eps) if inplace: prob[:] = blur blur = prob np.add(blur, shift, out=blur) np.multiply(blur, scale, out=blur) return blur
[docs] def _remove_translation(tf): """ Removes the translation component of a transform TODO: - [ ] Is this possible in more general cases? E.g. projective transforms? """ import skimage if isinstance(tf, skimage.transform.AffineTransform): tf_notrans = skimage.transform.AffineTransform( scale=tf.scale, rotation=tf.rotation, shear=tf.shear) elif isinstance(tf, skimage.transform.SimilarityTransform): tf_notrans = skimage.transform.SimilarityTransform( scale=tf.scale, rotation=tf.rotation) elif isinstance(tf, skimage.transform.EuclideanTransform): tf_notrans = skimage.transform.EuclideanTransform( scale=tf.scale, rotation=tf.rotation) else: raise TypeError(tf) return tf_notrans
[docs] def _gmean(a, axis=0, clobber=False): """ Compute the geometric mean along the specified axis. Modification of the scipy.mstats method to be more memory efficient Example >>> rng = np.random.RandomState(0) >>> C, H, W = 8, 32, 32 >>> axis = 0 >>> a = rng.rand(2, C, H, W) >>> _gmean(a) """ assert isinstance(a, np.ndarray) if clobber: # NOTE: we reuse (a), we clobber the input array! log_a = np.log(a, out=a) else: log_a = np.log(a) # attempt to reuse memory when computing mean mem = log_a[tuple([slice(None)] * axis + [0])] mean_log_a = log_a.mean(axis=axis, out=mem) # And reuse memory again when computing the final result result = np.exp(mean_log_a, out=mean_log_a) return result