"""
This module provides functions :func:`imread` and :func:`imwrite` which are
wrappers around concrete readers/writers provided by other libraries. This
allows us to support a wider array of formats than any of individual backends.
"""
import os
import numpy as np
import warnings # NOQA
import cv2
from os.path import exists, dirname
import ubelt as ub
from . import im_cv2
from . import im_core
__all__ = [
'imread', 'imwrite', 'load_image_shape',
]
# Common image extensions
JPG_EXTENSIONS = (
'.jpg', '.jpeg'
)
# These should be supported by opencv / PIL
_WELL_KNOWN_EXTENSIONS = (
JPG_EXTENSIONS +
('.bmp', '.pgm', '.png', '.qoi',)
)
# Extensions that usually will require GDAL
GDAL_EXTENSIONS = (
'.ntf', '.nitf', '.ptif', '.cog.tiff', '.cog.tif',
'.r0', '.r1', '.r2', '.r3', '.r4', '.r5', '.nsf',
'.jp2', '.vrt',
)
# TODO: ITK Image formats
# https://insightsoftwareconsortium.github.io/itk-js/docs/image_formats.html
ITK_EXTENSIONS = (
'.mha',
'.nrrd', # http://teem.sourceforge.net/nrrd/format.html
'.mgh', # https://surfer.nmr.mgh.harvard.edu/fswiki/FsTutorial/MghFormat
'.mgz',
'.nii', # https://nifti.nimh.nih.gov/nifti-1
'.img',
'.mrb', # multiple-resolution-bitmap https://whatext.com/mrb
)
# ITK Demo data:
# https://data.kitware.com/#collection/57b5c9e58d777f126827f5a1
IMAGE_EXTENSIONS = (
_WELL_KNOWN_EXTENSIONS +
('.tif', '.tiff',) +
GDAL_EXTENSIONS +
ITK_EXTENSIONS
)
[docs]
def imread(fpath, space='auto', backend='auto', **kw):
"""
Reads image data in a specified format using some backend implementation.
Args:
fpath (str): path to the file to be read
space (str):
The desired colorspace of the image. Can by any colorspace accepted
by `convert_colorspace`, or it can be 'auto', in which case the
colorspace of the image is unmodified (except in the case where a
color image is read by opencv, in which case we convert BGR to RGB
by default). If None, then no modification is made to whatever
backend is used to read the image. Defaults to 'auto'.
New in version 0.7.10: when the backend does not resolve to "cv2"
the "auto" space resolves to None, thus the image is read as-is.
backend (str): which backend reader to use. By default
the file extension is used to determine this, but it can be
manually overridden. Valid backends are 'gdal', 'skimage', 'itk',
'pil', and 'cv2'. Defaults to 'auto'.
**kw : backend-specific arguments
The gdal backend accepts:
overview, ignore_color_table, nodata_method, band_indices
Returns:
ndarray: the image data in the specified color space.
Note:
if space is something non-standard like HSV or LAB, then the file must
be a normal 8-bit color image, otherwise an error will occur.
Note:
Some backends will respect EXIF orientation (skimage) and others will
not (gdal, cv2).
The scikit-image backend is itself another multi-backend plugin-based
image reader/writer.
Raises:
IOError - If the image cannot be read
ImportError - If trying to read a nitf without gdal
NotImplementedError - if trying to read a corner-case image
Example:
>>> # xdoctest: +REQUIRES(--network)
>>> import kwimage
>>> import ubelt as ub
>>> # Test a non-standard image, which encodes a depth map
>>> fpath = ub.grabdata(
>>> 'http://www.topcoder.com/contest/problem/UrbanMapper3D/JAX_Tile_043_DTM.tif',
>>> hasher='sha256', hash_prefix='64522acba6f0fb7060cd4c202ed32c5163c34e63d386afdada4190cce51ff4d4')
>>> img1 = kwimage.imread(fpath)
>>> # Check that write + read preserves data
>>> dpath = ub.Path.appdir('kwimage/test/imread').ensuredir()
>>> tmp_fpath = dpath / 'tmp0.tif'
>>> kwimage.imwrite(tmp_fpath, img1)
>>> img2 = kwimage.imread(tmp_fpath)
>>> assert np.all(img2 == img1)
>>> tmp_fpath.delete()
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> kwplot.imshow(img1, pnum=(1, 2, 1), fnum=1, norm=True, title='tif orig')
>>> kwplot.imshow(img2, pnum=(1, 2, 2), fnum=1, norm=True, title='tif io round-trip')
Example:
>>> # xdoctest: +REQUIRES(--network)
>>> import kwimage
>>> import ubelt as ub
>>> img1 = kwimage.imread(ub.grabdata(
>>> 'http://i.imgur.com/iXNf4Me.png', fname='ada.png', hasher='sha256',
>>> hash_prefix='898cf2588c40baf64d6e09b6a93b4c8dcc0db26140639a365b57619e17dd1c77'))
>>> dpath = ub.Path.appdir('kwimage/test/imread').ensuredir()
>>> tmp_tif_fpath = dpath / 'tmp1.tif'
>>> tmp_png_fpath = dpath / 'tmp1.png'
>>> kwimage.imwrite(tmp_tif_fpath, img1)
>>> kwimage.imwrite(tmp_png_fpath, img1)
>>> tif_im = kwimage.imread(tmp_tif_fpath)
>>> png_im = kwimage.imread(tmp_png_fpath)
>>> assert np.all(tif_im == png_im)
>>> tmp_tif_fpath.delete()
>>> tmp_png_fpath.delete()
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> kwplot.imshow(png_im, pnum=(1, 2, 1), fnum=1, title='tif io')
>>> kwplot.imshow(tif_im, pnum=(1, 2, 2), fnum=1, title='png io')
Example:
>>> # xdoctest: +REQUIRES(--network)
>>> import kwimage
>>> import ubelt as ub
>>> # FIXME: Dead link...
>>> tif_fpath = ub.grabdata(
>>> 'https://ghostscript.com/doc/tiff/test/images/rgb-3c-16b.tiff',
>>> fname='pepper.tif', hasher='sha256',
>>> hash_prefix='31ff3a1f416cb7281acfbcbb4b56ee8bb94e9f91489602ff2806e5a49abc03c0')
>>> img1 = kwimage.imread(tif_fpath)
>>> dpath = ub.Path.appdir('kwimage/test/imread').ensuredir()
>>> tmp_tif_fpath = dpath / 'tmp2.tif'
>>> tmp_png_fpath = dpath / 'tmp2.png'
>>> kwimage.imwrite(tmp_tif_fpath, img1)
>>> kwimage.imwrite(tmp_png_fpath, img1)
>>> tif_im = kwimage.imread(tmp_tif_fpath)
>>> png_im = kwimage.imread(tmp_png_fpath)
>>> assert np.all(tif_im == png_im)
>>> tmp_tif_fpath.delete()
>>> tmp_png_fpath.delete()
>>> assert np.all(tif_im == png_im)
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> kwplot.imshow(png_im / 2 ** 16, pnum=(1, 2, 1), fnum=1)
>>> kwplot.imshow(tif_im / 2 ** 16, pnum=(1, 2, 2), fnum=1)
Example:
>>> # xdoctest: +REQUIRES(module:itk, --network)
>>> import kwimage
>>> import ubelt as ub
>>> # Grab an image that ITK can read
>>> fpath = ub.grabdata(
>>> url='https://data.kitware.com/api/v1/file/606754e32fa25629b9476f9e/download',
>>> fname='brainweb1e5a10f17Rot20Tx20.mha',
>>> hash_prefix='08f0812591691ae24a29788ba8cd1942e91', hasher='sha512')
>>> # Read the image (this is actually a DxHxW stack of images)
>>> img1_stack = kwimage.imread(fpath)
>>> # Check that write + read preserves data
>>> dpath = ub.Path.appdir('kwimage/test/imread').ensuredir()
>>> tmp_fpath = dpath / 'tmp3.mha'
>>> kwimage.imwrite(tmp_fpath, img1_stack)
>>> recon = kwimage.imread(tmp_fpath)
>>> assert not np.may_share_memory(recon, img1_stack)
>>> assert np.all(recon == img1_stack)
>>> tmp_fpath.delete()
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> kwplot.imshow(kwimage.stack_images_grid(recon[0::20]),
>>> title='kwimage.imread with a .mha file')
>>> kwplot.show_if_requested()
Benchmark:
>>> import timerit
>>> import kwimage
>>> import ubelt as ub
>>> #
>>> dsize = (1920, 1080)
>>> img1 = kwimage.grab_test_image('amazon', dsize=dsize)
>>> ti = timerit.Timerit(10, bestof=3, verbose=1, unit='us')
>>> formats = {}
>>> dpath = ub.Path.appdir('kwimage/bench/im_io').ensuredir()
>>> space = 'auto'
>>> formats['png'] = kwimage.imwrite(join(dpath, '.png'), img1, space=space, backend='cv2')
>>> formats['jpg'] = kwimage.imwrite(join(dpath, '.jpg'), img1, space=space, backend='cv2')
>>> formats['tif_raw'] = kwimage.imwrite(join(dpath, '.raw.tif'), img1, space=space, backend='gdal', compress='RAW')
>>> formats['tif_deflate'] = kwimage.imwrite(join(dpath, '.deflate.tif'), img1, space=space, backend='gdal', compress='DEFLATE')
>>> formats['tif_lzw'] = kwimage.imwrite(join(dpath, '.lzw.tif'), img1, space=space, backend='gdal', compress='LZW')
>>> grid = [
>>> ('cv2', 'png'),
>>> ('cv2', 'jpg'),
>>> ('gdal', 'jpg'),
>>> ('turbojpeg', 'jpg'),
>>> ('gdal', 'tif_raw'),
>>> ('gdal', 'tif_lzw'),
>>> ('gdal', 'tif_deflate'),
>>> ('skimage', 'tif_raw'),
>>> ]
>>> backend, filefmt = 'cv2', 'png'
>>> for backend, filefmt in grid:
>>> for timer in ti.reset(f'imread-{filefmt}-{backend}'):
>>> with timer:
>>> kwimage.imread(formats[filefmt], space=space, backend=backend)
>>> # Test all formats in auto mode
>>> for filefmt in formats.keys():
>>> for timer in ti.reset(f'kwimage.imread-{filefmt}-auto'):
>>> with timer:
>>> kwimage.imread(formats[filefmt], space=space, backend='auto')
>>> ti.measures = ub.map_vals(ub.sorted_vals, ti.measures)
>>> print('ti.measures = {}'.format(ub.urepr(ti.measures['min'], nl=2, align=':')))
Timed best=42891.504 µs, mean=44008.439 ± 1409.2 µs for imread-png-cv2
Timed best=33146.808 µs, mean=34185.172 ± 656.3 µs for imread-jpg-cv2
Timed best=40120.306 µs, mean=41220.927 ± 1010.9 µs for imread-jpg-gdal
Timed best=30798.162 µs, mean=31573.070 ± 737.0 µs for imread-jpg-turbojpeg
Timed best=6223.170 µs, mean=6370.462 ± 150.7 µs for imread-tif_raw-gdal
Timed best=42459.404 µs, mean=46519.940 ± 5664.9 µs for imread-tif_lzw-gdal
Timed best=36271.175 µs, mean=37301.108 ± 861.1 µs for imread-tif_deflate-gdal
Timed best=5239.503 µs, mean=6566.574 ± 1086.2 µs for imread-tif_raw-skimage
ti.measures = {
'imread-tif_raw-skimage' : 0.0052395030070329085,
'imread-tif_raw-gdal' : 0.006223169999429956,
'imread-jpg-turbojpeg' : 0.030798161998973228,
'imread-jpg-cv2' : 0.03314680799667258,
'imread-tif_deflate-gdal': 0.03627117499127053,
'imread-jpg-gdal' : 0.040120305988239124,
'imread-tif_lzw-gdal' : 0.042459404008695856,
'imread-png-cv2' : 0.042891503995633684,
}
"""
fpath = os.fspath(fpath)
if backend == 'auto':
# TODO: memoize the extensions?
# TODO: each backend should maintain a list of supported (possibly
# overlapping) formats, and that should be used to build a mapping from
# formats to candidate backends. We should then filter down to a
# backend that actually exists.
# Determine the backend reader using the file extension
_fpath_lower = fpath.lower()
# Note: rset dataset (https://trac.osgeo.org/gdal/ticket/3457) support is hacked
if _fpath_lower.endswith(ITK_EXTENSIONS):
backend = 'itk'
elif _fpath_lower.endswith(GDAL_EXTENSIONS):
backend = 'gdal'
elif _fpath_lower.endswith(('.tif', '.tiff')):
if _have_gdal():
backend = 'gdal'
else:
backend = 'skimage'
elif _fpath_lower.endswith(JPG_EXTENSIONS):
if _have_turbojpg():
backend = 'turbojpeg'
else:
backend = 'cv2'
elif _fpath_lower.endswith('.svg'):
backend = 'svg' # a bit hacky, not a raster format
else:
# TODO: if we don't have an extension we could try to inspect the
# file header
USE_FILE_HEADER = 0
if USE_FILE_HEADER:
'''
for key in kwimage.grab_test_image_fpath.keys():
fpath = kwimage.grab_test_image_fpath(key)
with open(fpath, 'rb') as file:
header_bytes = file.read(4)
print(header_bytes)
'''
JPEG_HEADER = b'\xff\xd8\xff'
PNG_HEADER = b'\x89PNG'
NITF_HEADER = b'NITF'
with open(fpath, 'rb') as file:
header_bytes = file.read(4)
if header_bytes.startswith(JPEG_HEADER):
backend = 'cv2'
elif header_bytes.startswith(PNG_HEADER):
backend = 'cv2'
elif header_bytes.startswith(NITF_HEADER):
backend = 'gdal'
else:
backend = 'cv2'
else:
backend = 'cv2'
if space == 'auto' and backend != 'cv2':
# cv2 is the only backend that does weird things, we can
# default to auto and save the user the headache of specifying this
space = None
try:
if backend == 'gdal':
image, src_space, auto_dst_space = _imread_gdal(fpath, **kw)
elif backend == 'cv2':
image, src_space, auto_dst_space = _imread_cv2(fpath)
elif backend == 'turbojpeg':
image, src_space, auto_dst_space = _imread_turbojpeg(fpath)
elif backend == 'skimage':
image, src_space, auto_dst_space = _imread_skimage(fpath)
elif backend == 'pil':
image, src_space, auto_dst_space = _imread_pil(fpath)
elif backend == 'qoi':
image, src_space, auto_dst_space = _imread_qoi(fpath)
elif backend == 'itk':
src_space, auto_dst_space = None, None
import itk
itk_obj = itk.imread(fpath)
image = np.asarray(itk_obj)
elif backend == 'svg':
image, src_space, auto_dst_space = _imread_svg(fpath)
else:
raise KeyError('Unknown imread backend={!r}'.format(backend))
if space == 'auto':
dst_space = auto_dst_space
else:
dst_space = space
if dst_space is not None:
if src_space is None:
raise ValueError((
'Cannot convert to destination colorspace ({}) because'
' the source colorspace could not be determined. Use '
' space=None to return the raw data.'
).format(dst_space))
image = im_cv2.convert_colorspace(image, src_space=src_space,
dst_space=dst_space,
implicit=False)
return image
except Exception as ex:
print('ex = {!r}'.format(ex))
print('Error reading fpath = {!r}'.format(fpath))
raise
def _imread_qoi(fpath):
"""
"""
import qoi
image = qoi.read(fpath)
src_space, auto_dst_space = None, None
return image, src_space, auto_dst_space
def _imwrite_qoi(fpath, data):
"""
Only seems to allow RGB 255.
Ignore:
>>> from kwimage.im_io import imread, _imread_qoi, _imwrite_qoi
>>> import kwimage
>>> data = kwimage.ensure_uint255(kwimage.checkerboard())
>>> fpath = 'tmp.qoi'
>>> _imwrite_qoi(fpath, data)
>>> recon, _, _ = _imread_qoi(fpath)
"""
import kwimage
import qoi
data = kwimage.atleast_3channels(data)
qoi.write(fpath, data)
return fpath
def _imread_turbojpeg(fpath):
"""
See: https://www.learnopencv.com/efficient-image-loading/
References:
https://pypi.org/project/PyTurboJPEG/
Bash:
pip install PyTurboJPEG
sudo apt install libturbojpeg -y
Ignore:
>>> # xdoctest: +REQUIRES(--network)
>>> # xdoctest: +REQUIRES(turbojpeg)
>>> import kwimage
>>> rgb_fpath = kwimage.grab_test_image_fpath('amazon')
>>> assert rgb_fpath.endswith('.jpg')
>>> #
>>> rgb = kwimage.imread(rgb_fpath)
>>> gray = kwimage.convert_colorspace(rgb, 'rgb', 'gray')
>>> gray_fpath = rgb_fpath + '.gray.jpg'
>>> kwimage.imwrite(gray_fpath, gray)
>>> #
>>> fpath = gray_fpath
>>> #
>>> from kwimage.im_io import _imread_turbojpeg, _imread_skimage, _imread_cv2
>>> import timerit
>>> ti = timerit.Timerit(50, bestof=10, verbose=2)
>>> #
>>> for timer in ti.reset('turbojpeg'):
>>> with timer:
>>> im_turbo = _imread_turbojpeg(fpath)
>>> #
>>> for timer in ti.reset('cv2'):
>>> with timer:
>>> im_cv2 = _imread_cv2(fpath)
"""
import turbojpeg
jpeg = turbojpeg.TurboJPEG()
with open(fpath, 'rb') as file:
data = file.read()
(width, height, jpeg_subsample, jpeg_colorspace) = jpeg.decode_header(data)
# print('width = {!r}'.format(width))
# print('height = {!r}'.format(height))
# print('jpeg_subsample = {!r}'.format(jpeg_subsample))
# print('jpeg_colorspace = {!r}'.format(jpeg_colorspace))
if jpeg_colorspace == turbojpeg.TJCS_GRAY:
pixel_format = turbojpeg.TJPF_GRAY
src_space = 'gray'
auto_dst_space = 'gray'
else:
pixel_format = turbojpeg.TJPF_RGB
src_space = 'rgb'
auto_dst_space = 'rgb'
image = jpeg.decode(data, pixel_format=pixel_format)
return image, src_space, auto_dst_space
def _imread_pil(fpath):
from PIL import Image
pil_img = Image.open(fpath)
image = np.array(pil_img)
if pil_img.mode == 'RGB':
src_space = 'rgb'
auto_dst_space = 'rgb'
elif pil_img.mode == 'RGBA':
src_space = 'rgba'
auto_dst_space = 'rgba'
elif len(image.shape) == 2 or image.shape[2] == 1:
src_space = 'gray'
auto_dst_space = 'gray'
elif len(image.shape) == 3 or image.shape[2] == 3:
src_space = 'rgb'
auto_dst_space = 'rgb'
return image, src_space, auto_dst_space
def _imread_skimage(fpath):
import skimage.io
# with warnings.catch_warnings():
# warnings.simplefilter("ignore")
# skimage reads color in RGB by default
image = skimage.io.imread(fpath)
n_channels = im_core.num_channels(image)
if n_channels == 3:
src_space = 'rgb'
elif n_channels == 4:
src_space = 'rgba'
elif n_channels == 1:
src_space = 'gray'
else:
src_space = None
# raise NotImplementedError('unknown number of channels')
auto_dst_space = src_space
return image, src_space, auto_dst_space
def _imread_cv2(fpath):
# opencv reads color in BGR by default
image = cv2.imread(fpath, flags=cv2.IMREAD_UNCHANGED)
if image is None:
if exists(fpath):
# TODO: this could be a permissions error. We could test for that.
# and print a better error message in that case.
raise IOError('OpenCV cannot read this image: "{}", '
'but it exists'.format(fpath))
else:
raise IOError('OpenCV cannot read this image: "{}", '
'because it does not exist'.format(fpath))
n_channels = im_core.num_channels(image)
if n_channels == 3:
src_space = 'bgr'
auto_dst_space = 'rgb'
elif n_channels == 4:
src_space = 'bgra'
auto_dst_space = 'rgba'
elif n_channels == 1:
src_space = 'gray'
auto_dst_space = 'gray'
else:
raise NotImplementedError('unknown number of channels')
return image, src_space, auto_dst_space
def _imread_gdal(fpath, overview=None, ignore_color_table=False,
nodata_method=None, band_indices=None, nodata=None):
"""
gdal imread backend
Args:
overview (int | str):
if specified load a specific overview level.
Can be `coarsest` to use the lowest resolution overview.
ignore_color_table (bool):
if True and the image has a color table, return its indexes
instead of the colored image.
nodata_method (None | str):
if None, any nodata attributes are ignored. Otherwise specifies how
nodata values should be handled. If "ma", returns a masked array
instead of a normal ndarray. If "float", always returns a float
array where masked values are replaced with nan.
band_indices (None | List[int]):
if None, all bands are read, otherwise only specified
band indexes are read.
References:
[GDAL_Config_Options] https://gdal.org/user/configoptions.html
https://gis.stackexchange.com/questions/180961/reading-a-specific-overview-layer-from-geotiff-file-using-gdal-python
Ignore:
>>> import kwimage
>>> fpath = kwimage.grab_test_image_fpath('amazon')
Example:
>>> # xdoctest: +REQUIRES(module:osgeo)
>>> # Test nodata values
>>> import kwimage
>>> from osgeo import gdal
>>> from osgeo import osr
>>> # Make a dummy geotiff
>>> imdata = kwimage.grab_test_image('airport')
>>> dpath = ub.Path.appdir('kwimage/test/geotiff').ensuredir()
>>> geo_fpath = dpath / 'dummy_geotiff.tif'
>>> # compute dummy values for a geotransform to CRS84
>>> img_h, img_w = imdata.shape[0:2]
>>> img_box = kwimage.Boxes([[0, 0, img_w, img_h]], 'xywh')
>>> wld_box = kwimage.Boxes([[-73.7595528, 42.6552404, 0.0001, 0.0001]], 'xywh')
>>> img_corners = img_box.corners()
>>> wld_corners = wld_box.corners()
>>> transform = kwimage.Affine.fit(img_corners, wld_corners)
>>> nodata = -9999
>>> srs = osr.SpatialReference()
>>> srs.ImportFromEPSG(4326)
>>> crs = srs.ExportToWkt()
>>> # Set a region to be nodata
>>> imdata = imdata.astype(np.int16)
>>> imdata[-100:] = nodata
>>> imdata[0:200:, -200:-180] = nodata
>>> mask = (imdata == nodata)
>>> # Windows seems to have issues with non-raw compress
>>> kwimage.imwrite(geo_fpath, imdata, backend='gdal', nodata_value=-9999,
>>> crs=crs, transform=transform, compress='RAW')
>>> # Read the geotiff with different methods
>>> raw_recon = kwimage.imread(geo_fpath, nodata_method=None)
>>> ma_recon = kwimage.imread(geo_fpath, nodata_method='ma')
>>> nan_recon = kwimage.imread(geo_fpath, nodata_method='float')
>>> # raw values should be read
>>> assert np.all(raw_recon[mask] == nodata)
>>> assert not np.any(raw_recon[~mask] == nodata)
>>> # nans should be in nodata places
>>> assert np.all(np.isnan(nan_recon[mask]))
>>> assert not np.any(np.isnan(nan_recon[~mask]))
>>> # check locations are masked correctly
>>> assert np.all(ma_recon[mask].mask)
>>> assert not np.any(ma_recon[~mask].mask)
Example:
>>> # xdoctest: +REQUIRES(module:osgeo)
>>> # Test band specification
>>> import kwimage
>>> import pytest
>>> # Make a dummy geotiff
>>> imdata = kwimage.grab_test_image('airport')
>>> dpath = ub.Path.appdir('kwimage/test/geotiff').ensuredir()
>>> fpath1 = dpath / 'dummy_overviews_rgb.tif'
>>> kwimage.imwrite(fpath1, imdata, overviews=3, backend='gdal')
>>> band0 = kwimage.imread(fpath1, backend='gdal', band_indices=[0, 1])
>>> assert band0.shape[2] == 2
import timerit
ti = timerit.Timerit(100, bestof=10, verbose=2)
for timer in ti.reset('time'):
with timer:
band0 = kwimage.imread(fpath1, backend='gdal', band_indices=[0, 1])
Example:
>>> # xdoctest: +REQUIRES(module:osgeo)
>>> # Test overview values
>>> import kwimage
>>> import pytest
>>> # Make a dummy geotiff
>>> imdata = kwimage.grab_test_image('airport')
>>> dpath = ub.Path.appdir('kwimage/test/geotiff').ensuredir()
>>> fpath1 = dpath / 'dummy_overviews_rgb.tif'
>>> fpath2 = dpath / 'dummy_overviews_gray.tif'
>>> kwimage.imwrite(fpath1, imdata, overviews=3, backend='gdal')
>>> kwimage.imwrite(fpath2, imdata[:, :, 0], overviews=3, backend='gdal')
>>> recon1_3a = kwimage.imread(fpath1, overview='coarsest', backend='gdal')
>>> recon1_3b = kwimage.imread(fpath1, overview=3, backend='gdal')
>>> recon1_None = kwimage.imread(fpath1, backend='gdal')
>>> recon1_0 = kwimage.imread(fpath1, overview=0, backend='gdal')
>>> recon1_1 = kwimage.imread(fpath1, overview=1, backend='gdal')
>>> recon1_2 = kwimage.imread(fpath1, overview=2, backend='gdal')
>>> recon1_3 = kwimage.imread(fpath1, overview=3, backend='gdal')
>>> with pytest.raises(ValueError):
>>> kwimage.imread(fpath1, overview=4, backend='gdal')
>>> assert recon1_0.shape == (868, 1156, 3)
>>> assert recon1_1.shape == (434, 578, 3)
>>> assert recon1_2.shape == (217, 289, 3)
>>> assert recon1_3.shape == (109, 145, 3)
>>> assert recon1_3a.shape == (109, 145, 3)
>>> assert recon1_3b.shape == (109, 145, 3)
>>> recon2_3a = kwimage.imread(fpath2, overview='coarsest', backend='gdal')
>>> recon2_3b = kwimage.imread(fpath2, overview=3, backend='gdal')
>>> recon2_0 = kwimage.imread(fpath2, overview=0, backend='gdal')
>>> assert recon2_0.shape == (868, 1156)
>>> assert recon2_3a.shape == (109, 145)
>>> assert recon2_3b.shape == (109, 145)
>>> # TODO: test an image with a color table
"""
try:
from osgeo import gdal
except ImportError:
import gdal
try:
if nodata is not None:
ub.schedule_deprecation(
modname='kwimage', name='nodata',
type='argument to _imread_gdal',
migration='use nodata_method instead',
deprecate='0.9.5', error='0.10.0', remove='0.11.0')
nodata_method = nodata
if nodata_method is not None:
if isinstance(nodata_method, str):
if nodata_method not in {'ma', 'nan', 'float'}:
raise KeyError('nodata_method={} must be ma, nan, (or float), or None'.format(nodata_method))
else:
raise TypeError(type(nodata_method))
gdal_dset = gdal.Open(os.fspath(fpath), gdal.GA_ReadOnly)
if gdal_dset is None:
raise IOError('GDAL cannot read: {!r}'.format(fpath))
gdalkw = {} # xoff, yoff, win_xsize, win_ysize
image, num_channels = _gdal_read(
gdal_dset, overview=overview,
ignore_color_table=ignore_color_table,
band_indices=band_indices, gdalkw=gdalkw,
nodata_method=nodata_method,
nodata_value=None,
)
# note this isn't a safe assumption, but it is an OK default heuristic
if num_channels == 1:
src_space = 'gray'
elif num_channels == 3:
src_space = 'rgb'
elif num_channels == 4:
src_space = 'rgba'
else:
# We have no hint of the source color space in this instance
src_space = None
except Exception:
raise
finally:
gdal_dset = None
auto_dst_space = src_space
return image, src_space, auto_dst_space
def _gdal_read(gdal_dset, overview, nodata=None, ignore_color_table=None,
band_indices=None, gdalkw=None, nodata_method=None,
nodata_value=None):
"""
Backend for reading data from an open gdal dataset
"""
if nodata is not None:
# backwards compat
nodata_method = nodata
# TODO:
# - [ ] Handle SubDatasets (e.g. ones produced by scikit-image)
# https://gdal.org/drivers/raster/gtiff.html#subdatasets
# See ../tests/test_io.py for experiments that trigger this
if len(gdal_dset.GetSubDatasets()):
raise NotImplementedError('subdatasets are not handled correctly')
# INTERLEAVE = gdal_dset.GetMetadata('IMAGE_STRUCTURE').get('INTERLEAVE', '')
# if INTERLEAVE == 'BAND':
# if len(gdal_dset.GetSubDatasets()) > 0:
# raise NotImplementedError('Cannot handle interleaved files yet')
total_num_channels = gdal_dset.RasterCount
if band_indices is None:
band_indices = range(total_num_channels)
num_channels = total_num_channels
else:
num_channels = len(band_indices)
default_bands = [gdal_dset.GetRasterBand(i + 1) for i in band_indices]
default_band0 = default_bands[0]
if overview:
overview_count = default_band0.GetOverviewCount()
if isinstance(overview, str):
if overview == 'coarsest':
overview = overview_count
else:
raise KeyError(overview)
if overview < 0:
ub.schedule_deprecation(
'kwimage', name='overviews',
type='as a negative integer argument to kwimage.imread',
migration='Use overviews="coarsest" to get the lowest resolution overview',
deprecate='0.9.21', error='1.0.0', remove='1.1.0',
)
# warnings.warn('Using negative overviews is deprecated. '
# 'Use coarsest to get the lowest resolution overview')
overview = max(overview_count + overview, 0)
if overview > overview_count:
raise ValueError('Image has no overview={}'.format(overview))
if overview > 0:
bands = [b.GetOverview(overview - 1) for b in default_bands]
if any(b is None for b in bands):
raise AssertionError(
'Band was None in {}'.format(gdal_dset.GetDescription()))
else:
bands = default_bands
else:
bands = default_bands
if num_channels == 1:
band = bands[0]
color_table = None if ignore_color_table else band.GetColorTable()
if color_table is None:
buf = band.ReadAsArray(**gdalkw)
if buf is None:
# Sometimes this works if you try again. I don't know why.
# It spits out annoying messages, not sure how to supress.
# TODO: need MWE and an issue describing this workaround.
buf = band.ReadAsArray(**gdalkw)
if buf is None:
raise IOError('GDal was unable to read this band')
image = np.array(buf)
else:
# The buffer is an index into the color table
buf = band.ReadAsArray(**gdalkw)
gdal_dtype = color_table.GetPaletteInterpretation()
dtype = _gdal_to_numpy_dtype(gdal_dtype)
num_colors = color_table.GetCount()
if num_colors <= 0:
raise AssertionError('invalid color table')
idx_to_color = []
for idx in range(num_colors):
color = color_table.GetColorEntry(idx)
idx_to_color.append(color)
# The color table specifies the real number of channels
num_channels = len(color)
idx_to_color = np.array(idx_to_color, dtype=dtype)
image = idx_to_color[buf]
if nodata_method is not None:
# TODO: not sure if this works right for
# color table images
band_nodata = band.GetNoDataValue()
mask = band_nodata == buf
if color_table is not None:
# Fix mask to align with color table
table_chans = idx_to_color.shape[1]
mask = np.tile(mask[:, :, None], (1, 1, table_chans))
else:
band0 = bands[0]
xsize = gdalkw.get('win_xsize', band0.XSize)
ysize = gdalkw.get('win_ysize', band0.YSize)
gdal_dtype = band0.DataType
dtype = _gdal_to_numpy_dtype(gdal_dtype)
shape = (ysize, xsize, num_channels)
# Preallocate and populate image
image = np.empty(shape, dtype=dtype)
if nodata_method is not None:
mask = np.empty(shape, dtype=bool)
for idx, band in enumerate(bands):
# load with less memory by specifing buf_obj
buf = image[:, :, idx]
ret = band.ReadAsArray(buf_obj=buf, **gdalkw)
# ret = buf = band.ReadAsArray(**gdalkw)
if ret is None:
raise IOError(ub.paragraph(
'''
GDAL was unable to read band: {}, {}'
from {!r}
'''.format(idx, band, gdal_dset.GetDescription())))
# image[:, :, idx] = buf
if nodata_method is not None:
band_nodata = band.GetNoDataValue()
mask_buf = mask[:, :, idx]
np.equal(buf, band_nodata, out=mask_buf)
# mask[:, :, idx] = (buf == band_nodata)
if nodata_method is not None:
if nodata_method == 'ma':
image = np.ma.array(image, mask=mask)
elif nodata_method in {'float', 'nan'}:
promote_dtype = np.result_type(image.dtype, np.float32)
image = image.astype(promote_dtype)
image[mask] = np.nan
else:
raise KeyError('nodata_method={}'.format(nodata_method))
return image, num_channels
[docs]
def imwrite(fpath, image, space='auto', backend='auto', **kwargs):
"""
Writes image data to disk.
Args:
fpath (PathLike): location to save the image
image (ndarray): image data
space (str | None):
the colorspace of the image to save. Can by any colorspace accepted
by `convert_colorspace`, or it can be 'auto', in which case we
assume the input image is either RGB, RGBA or grayscale. If None,
then absolutely no color modification is made and whatever backend
is used writes the image as-is.
New in version 0.7.10: when the backend does not resolve to "cv2",
the "auto" space resolves to None, thus the image is saved as-is.
backend (str):
Which backend writer to use. By default the file extension is used
to determine this. Valid backends are 'gdal', 'skimage', 'itk', and
'cv2'.
**kwargs : args passed to the backend writer.
When the backend is gdal, available options are:
compress (str): Common options are auto, DEFLATE, LZW, JPEG.
blocksize (int): size of tiled blocks (e.g. 256)
overviews (None | str | int | list): Number of overviews.
overview_resample (str): Common options NEAREST, CUBIC, LANCZOS
options (List[str]): other gdal options.
nodata (int): denotes a integer value as nodata.
metadata (dict): the metadata for the default empty domain.
transform (kwimage.Affine): Transform to CRS from pixel space
crs (str): The coordinate reference system for transform.
See :func:`_imwrite_cloud_optimized_geotiff` for more details each options.
When the backend is itk, see :func:`itk.imwrite` for options
When the backend is skimage, see :func:`skimage.io.imsave` for options
When the backend is cv2 see :func:`cv2.imwrite` for options.
Returns:
str: path to the written file
Note:
The image may be modified to preserve its colorspace depending on which
backend is used to write the image.
When saving as a jpeg or png, the image must be encoded with the uint8
data type. When saving as a tiff, any data type is allowed.
The scikit-image backend is itself another multi-backend plugin-based
image reader/writer.
Raises:
Exception : if the image cannot be written
Example:
>>> # xdoctest: +REQUIRES(--network)
>>> # This should be moved to a unit test
>>> from kwimage.im_io import _have_gdal # NOQA
>>> import kwimage
>>> import ubelt as ub
>>> import uuid
>>> dpath = ub.Path.appdir('kwimage/test/imwrite').ensuredir()
>>> test_image_paths = [
>>> #ub.grabdata('https://ghostscript.com/doc/tiff/test/images/rgb-3c-16b.tiff', fname='pepper.tif'),
>>> ub.grabdata('http://i.imgur.com/iXNf4Me.png', fname='ada.png'),
>>> #ub.grabdata('http://www.topcoder.com/contest/problem/UrbanMapper3D/JAX_Tile_043_DTM.tif'),
>>> ub.grabdata('https://upload.wikimedia.org/wikipedia/commons/f/fa/Grayscale_8bits_palette_sample_image.png', fname='parrot.png')
>>> ]
>>> for fpath in test_image_paths:
>>> for space in ['auto', 'rgb', 'bgr', 'gray', 'rgba']:
>>> img1 = kwimage.imread(fpath, space=space)
>>> print('Test im-io consistency of fpath = {!r} in {} space, shape={}'.format(fpath, space, img1.shape))
>>> # Write the image in TIF and PNG format
>>> tmp_tif_fpath = dpath / (str(uuid.uuid4()) + '.tif')
>>> tmp_png_fpath = dpath / (str(uuid.uuid4()) + '.png')
>>> kwimage.imwrite(tmp_tif_fpath, img1, space=space, backend='skimage')
>>> kwimage.imwrite(tmp_png_fpath, img1, space=space)
>>> tif_im = kwimage.imread(tmp_tif_fpath, space=space)
>>> png_im = kwimage.imread(tmp_png_fpath, space=space)
>>> assert np.all(tif_im == png_im), 'im-read/write inconsistency'
>>> if _have_gdal:
>>> tmp_tif2_fpath = dpath / (str(uuid.uuid4()) + '.tif')
>>> kwimage.imwrite(tmp_tif2_fpath, img1, space=space, backend='gdal')
>>> tif_im2 = kwimage.imread(tmp_tif2_fpath, space=space)
>>> assert np.all(tif_im == tif_im2), 'im-read/write inconsistency'
>>> tmp_tif2_fpath.delete()
>>> if space == 'gray':
>>> assert tif_im.ndim == 2
>>> assert png_im.ndim == 2
>>> elif space in ['rgb', 'bgr']:
>>> assert tif_im.shape[2] == 3
>>> assert png_im.shape[2] == 3
>>> elif space in ['rgba', 'bgra']:
>>> assert tif_im.shape[2] == 4
>>> assert png_im.shape[2] == 4
>>> tmp_tif_fpath.delete()
>>> tmp_png_fpath.delete()
Benchmark:
>>> import timerit
>>> import os
>>> import kwimage
>>> import tempfile
>>> #
>>> img1 = kwimage.grab_test_image('astro', dsize=(1920, 1080))
>>> space = 'auto'
>>> #
>>> file_sizes = {}
>>> #
>>> ti = timerit.Timerit(10, bestof=3, verbose=2)
>>> #
>>> for timer in ti.reset('imwrite-skimage-tif'):
>>> with timer:
>>> tmp = tempfile.NamedTemporaryFile(suffix='.tif')
>>> kwimage.imwrite(tmp.name, img1, space=space, backend='skimage')
>>> file_sizes[ti.label] = os.stat(tmp.name).st_size
>>> #
>>> for timer in ti.reset('imwrite-cv2-png'):
>>> with timer:
>>> tmp = tempfile.NamedTemporaryFile(suffix='.png')
>>> kwimage.imwrite(tmp.name, img1, space=space, backend='cv2')
>>> file_sizes[ti.label] = os.stat(tmp.name).st_size
>>> #
>>> for timer in ti.reset('imwrite-cv2-jpg'):
>>> with timer:
>>> tmp = tempfile.NamedTemporaryFile(suffix='.jpg')
>>> kwimage.imwrite(tmp.name, img1, space=space, backend='cv2')
>>> file_sizes[ti.label] = os.stat(tmp.name).st_size
>>> #
>>> for timer in ti.reset('imwrite-gdal-raw'):
>>> with timer:
>>> tmp = tempfile.NamedTemporaryFile(suffix='.tif')
>>> kwimage.imwrite(tmp.name, img1, space=space, backend='gdal', compress='RAW')
>>> file_sizes[ti.label] = os.stat(tmp.name).st_size
>>> #
>>> for timer in ti.reset('imwrite-gdal-lzw'):
>>> with timer:
>>> tmp = tempfile.NamedTemporaryFile(suffix='.tif')
>>> kwimage.imwrite(tmp.name, img1, space=space, backend='gdal', compress='LZW')
>>> file_sizes[ti.label] = os.stat(tmp.name).st_size
>>> #
>>> for timer in ti.reset('imwrite-gdal-zstd'):
>>> with timer:
>>> tmp = tempfile.NamedTemporaryFile(suffix='.tif')
>>> kwimage.imwrite(tmp.name, img1, space=space, backend='gdal', compress='ZSTD')
>>> file_sizes[ti.label] = os.stat(tmp.name).st_size
>>> #
>>> for timer in ti.reset('imwrite-gdal-deflate'):
>>> with timer:
>>> tmp = tempfile.NamedTemporaryFile(suffix='.tif')
>>> kwimage.imwrite(tmp.name, img1, space=space, backend='gdal', compress='DEFLATE')
>>> file_sizes[ti.label] = os.stat(tmp.name).st_size
>>> #
>>> for timer in ti.reset('imwrite-gdal-jpeg'):
>>> with timer:
>>> tmp = tempfile.NamedTemporaryFile(suffix='.tif')
>>> kwimage.imwrite(tmp.name, img1, space=space, backend='gdal', compress='JPEG')
>>> file_sizes[ti.label] = os.stat(tmp.name).st_size
>>> #
>>> file_sizes = ub.sorted_vals(file_sizes)
>>> import xdev
>>> file_sizes_human = ub.map_vals(lambda x: xdev.byte_str(x, 'MB'), file_sizes)
>>> print('ti.rankings = {}'.format(ub.urepr(ti.rankings, nl=2)))
>>> print('file_sizes = {}'.format(ub.urepr(file_sizes_human, nl=1)))
Example:
>>> # Test saving a multi-band file
>>> import kwimage
>>> import pytest
>>> import ubelt as ub
>>> dpath = ub.Path.appdir('kwimage/test/imwrite').ensuredir()
>>> # In this case the backend will not resolve to cv2, so
>>> # we should not need to specify space.
>>> data = np.random.rand(32, 32, 13).astype(np.float32)
>>> fpath = dpath / 'tmp1.tif'
>>> kwimage.imwrite(fpath, data)
>>> recon = kwimage.imread(fpath)
>>> assert np.all(recon == data)
>>> kwimage.imwrite(fpath, data, backend='skimage')
>>> recon = kwimage.imread(fpath, backend='skimage')
>>> assert np.all(recon == data)
>>> # xdoctest: +REQUIRES(module:osgeo)
>>> # gdal should error when trying to read an image written by skimage
>>> with pytest.raises(NotImplementedError):
>>> kwimage.imread(fpath, backend='gdal')
>>> # In this case the backend will resolve to cv2, and thus we expect
>>> # a failure
>>> fpath = dpath / 'tmp1.png'
>>> with pytest.raises(NotImplementedError):
>>> kwimage.imwrite(fpath, data)
Example:
>>> import ubelt as ub
>>> import kwimage
>>> dpath = ub.Path.appdir('kwimage/badwrite').ensuredir()
>>> dpath.delete().ensuredir()
>>> imdata = kwimage.ensure_uint255(kwimage.grab_test_image())[:, :, 0]
>>> import pytest
>>> fpath = dpath / 'does-not-exist/img.jpg'
>>> with pytest.raises(IOError):
... kwimage.imwrite(fpath, imdata, backend='cv2')
>>> with pytest.raises(IOError):
... kwimage.imwrite(fpath, imdata, backend='skimage')
>>> # xdoctest: +SKIP
>>> # TODO: run tests conditionally
>>> with pytest.raises(IOError):
... kwimage.imwrite(fpath, imdata, backend='gdal')
>>> with pytest.raises((IOError, RuntimeError)):
... kwimage.imwrite(fpath, imdata, backend='itk')
"""
fpath = os.fspath(fpath)
if backend == 'auto':
_fpath_lower = fpath.lower()
if _fpath_lower.endswith(('.tif', '.tiff')):
if _have_gdal():
backend = 'gdal'
else:
backend = 'skimage'
elif _fpath_lower.endswith(GDAL_EXTENSIONS):
if _have_gdal():
backend = 'gdal'
elif _fpath_lower.endswith(ITK_EXTENSIONS):
backend = 'itk'
else:
backend = 'cv2'
if space == 'auto':
if backend != 'cv2':
# For non-cv2 backends, we can read / writ the image as is
# without worrying about channel ordering conversions
space = None
if space is not None:
n_channels = im_core.num_channels(image)
if space == 'auto':
if n_channels == 3:
auto_src_space = 'rgb'
elif n_channels == 4:
auto_src_space = 'rgba'
elif n_channels == 1:
auto_src_space = 'gray'
else:
# TODO: allow writing of arbitrary num channels using gdal
raise NotImplementedError('unknown number of channels')
src_space = auto_src_space
else:
src_space = space
if space is not None:
if backend == 'cv2':
# OpenCV writes images in BGR(A)/ grayscale
if n_channels == 3:
dst_space = 'bgr'
elif n_channels == 4:
dst_space = 'bgra'
elif n_channels == 1:
dst_space = 'gray'
else:
raise AssertionError('impossible state')
else:
# most writers like skimage and gdal write images in RGB(A)/ grayscale
if n_channels == 3:
dst_space = 'rgb'
elif n_channels == 4:
dst_space = 'rgba'
elif n_channels == 1:
dst_space = 'gray'
else:
raise AssertionError('impossible state')
image = im_cv2.convert_colorspace(
image, src_space=src_space, dst_space=dst_space,
implicit=False)
if backend == 'cv2':
try:
flag = cv2.imwrite(fpath, image, **kwargs)
except cv2.error as ex:
if 'could not find a writer for the specified extension' in str(ex):
raise ValueError(
'Image fpath {!r} does not have a known image extension '
'(e.g. png/jpg)'.format(fpath))
else:
raise
else:
# TODO: generalize error handling and diagnostics for all backends
if not flag:
if not exists(dirname(fpath)):
raise IOError((
'kwimage failed to write with opencv backend. '
'Reason: destination fpath {!r} is in a directory that '
'does not exist.').format(fpath))
elif image.size > 4e10:
raise IOError(
'kwimage failed to write with opencv backend. '
f'Reason: unknown, but could image with shape {image.shape} is too big.')
else:
raise IOError(
'kwimage failed to write with opencv backend. '
'Reason: unknown.')
elif backend == 'skimage':
import skimage.io
skimage.io.imsave(fpath, image, **kwargs)
elif backend == 'gdal':
_imwrite_cloud_optimized_geotiff(fpath, image, **kwargs)
elif backend == 'pil':
from PIL import Image
pil_img = Image.fromarray(image)
pil_img.save(fpath)
elif backend == 'itk':
import itk
itk_obj = itk.image_view_from_array(image)
itk.imwrite(itk_obj, fpath, **kwargs)
elif backend == 'turbojpeg':
raise NotImplementedError
else:
raise KeyError('Unknown imwrite backend={!r}'.format(backend))
return fpath
[docs]
def load_image_shape(fpath, backend='auto', include_channels=True):
"""
Determine the height/width/channels of an image without reading the entire
file.
Args:
fpath (str): path to an image
backend (str): can be "auto", "pil", or "gdal".
include_channels (bool): if False, only reads the height, width.
Returns:
Tuple[int, int, int] - shape of the image
Recall this library uses the convention that "shape" is refers to
height,width,channels array-style ordering and "size" is
width,height cv2-style ordering.
Example:
>>> # xdoctest: +REQUIRES(module:osgeo)
>>> # Test the loading the shape works the same as loading the image and
>>> # testing the shape
>>> import kwimage
>>> temp_dpath = ub.Path.appdir('kwimage/tests/load_image_shape').ensuredir()
>>> data = kwimage.grab_test_image()
>>> datas = {
>>> 'rgb255': kwimage.ensure_uint255(data),
>>> 'rgb01': kwimage.ensure_float01(data),
>>> 'rgba01': kwimage.ensure_alpha_channel(data),
>>> }
>>> results = {}
>>> # These should be consistent
>>> # The was a problem where CV2_IMREAD_UNCHANGED read the alpha band,
>>> # but PIL did not, but maybe this is fixed now?
>>> for key, imdata in datas.items():
>>> fpath = temp_dpath / f'{key}.png'
>>> kwimage.imwrite(fpath, imdata)
>>> shapes = {}
>>> shapes['pil_load_shape'] = kwimage.load_image_shape(fpath, backend='pil')
>>> shapes['gdal_load_shape'] = kwimage.load_image_shape(fpath, backend='gdal')
>>> shapes['auto_load_shape'] = kwimage.load_image_shape(fpath, backend='auto')
>>> shapes['pil'] = kwimage.imread(fpath, backend='pil').shape
>>> shapes['cv2'] = kwimage.imread(fpath, backend='cv2').shape
>>> shapes['gdal'] = kwimage.imread(fpath, backend='gdal').shape
>>> shapes['skimage'] = kwimage.imread(fpath, backend='skimage').shape
>>> results[key] = shapes
>>> print('results = {}'.format(ub.urepr(results, nl=2, align=':', sort=0)))
>>> for shapes in results.values():
>>> assert ub.allsame(shapes.values())
>>> temp_dpath.delete()
Benchmark:
>>> # For large files, PIL is much faster
>>> # xdoctest: +REQUIRES(module:osgeo)
>>> from osgeo import gdal
>>> from PIL import Image
>>> import timerit
>>> #
>>> import kwimage
>>> fpath = kwimage.grab_test_image_fpath()
>>> #
>>> ti = timerit.Timerit(100, bestof=10, verbose=2)
>>> for timer in ti.reset('gdal'):
>>> with timer:
>>> gdal_dset = gdal.Open(fpath, gdal.GA_ReadOnly)
>>> width = gdal_dset.RasterXSize
>>> height = gdal_dset.RasterYSize
>>> gdal_dset = None
>>> #
>>> for timer in ti.reset('PIL'):
>>> with timer:
>>> pil_img = Image.open(fpath)
>>> width, height = pil_img.size
>>> pil_img.close()
>>> # xdoctest: +REQUIRES(module:imagesize)
>>> # The imagesize module is quite fast
>>> import imagesize
>>> for timer in ti.reset('imagesize'):
>>> with timer:
>>> width, height = imagesize.get(fpath)
Timed gdal for: 100 loops, best of 10
time per loop: best=54.423 µs, mean=72.761 ± 15.9 µs
Timed PIL for: 100 loops, best of 10
time per loop: best=25.986 µs, mean=26.791 ± 1.2 µs
Timed imagesize for: 100 loops, best of 10
time per loop: best=5.092 µs, mean=5.195 ± 0.1 µs
Example:
>>> # xdoctest: +REQUIRES(module:osgeo)
>>> import ubelt as ub
>>> import kwimage
>>> dpath = ub.Path.appdir('kwimage/tests', type='cache').ensuredir()
>>> fpath = dpath / 'foo.tif'
>>> kwimage.imwrite(fpath, np.random.rand(64, 64, 3))
>>> shape = kwimage.load_image_shape(fpath)
>>> assert shape == (64, 64, 3)
Ignore:
* Note: this seems to have an issue with PNG's with mode='LA',
which means that there really are two underlying channels, but it
kwimage.imread cv2 backend reads it as a 4 channel RGBA array.
"""
if backend == 'auto':
try:
shape = load_image_shape(fpath, backend='pil', include_channels=include_channels)
except Exception as pil_ex:
if not _have_gdal():
raise
try:
shape = load_image_shape(fpath, backend='gdal', include_channels=include_channels)
except Exception:
raise pil_ex
elif backend == 'pil':
from PIL import Image
fpath = os.fspath(fpath)
with Image.open(fpath) as pil_img:
width, height = pil_img.size
if include_channels:
num_channels = len(pil_img.getbands())
shape = (height, width, num_channels)
else:
shape = (height, width)
elif backend == 'gdal':
from osgeo import gdal
fpath = os.fspath(fpath)
gdal_dset = gdal.Open(fpath, gdal.GA_ReadOnly)
if gdal_dset is None:
raise Exception(gdal.GetLastErrorMsg())
width = gdal_dset.RasterXSize
height = gdal_dset.RasterYSize
if include_channels:
num_channels = gdal_dset.RasterCount
shape = (height, width, num_channels)
else:
shape = (height, width)
gdal_dset = None
elif backend == 'imagesize':
import imagesize
if include_channels:
raise NotImplementedError('no way to get number of channels with imagesize')
width, height = imagesize.get(fpath)
shape = (height, width)
else:
raise KeyError(backend)
return shape
def __inspect_optional_overhead():
r"""
Benchmark:
>>> from kwimage.im_io import _have_gdal, _have_turbojpg # NOQA
>>> def dis_instructions(func):
>>> import dis
>>> import io
>>> buf = io.StringIO()
>>> dis.disassemble(func.__code__, file=buf)
>>> _, text = buf.seek(0), buf.read()
>>> return text
>>> func = _have_turbojpg
>>> func = _have_gdal
>>> memo = ub.memoize(func)
>>> print(func_dis := dis_instructions(func))
>>> print(memo_dis := dis_instructions(memo))
>>> n = max(func_dis.count('\n'), memo_dis.count('\n'))
>>> sep = ' | \n' * n
>>> prefix = '| \n' * n
>>> print('\n\n')
>>> print(ub.hzcat([prefix, x, sep, y]))
Benchmark:
>>> from kwimage.im_io import _have_gdal, _have_turbojpg # NOQA
>>> funcs = []
>>> funcs += [_have_turbojpg]
>>> funcs += [_have_gdal]
>>> for func in funcs:
>>> memo = ub.memoize(func)
>>> print('func = {!r}'.format(func))
>>> print('memo = {!r}'.format(memo))
>>> import timerit
>>> ti = timerit.Timerit(100, bestof=10, verbose=1, unit='us')
>>> ti.reset('call func').call(func).report()
>>> ti.reset('call memo').call(memo).report()
"""
raise NotImplementedError
@ub.memoize
def _have_turbojpg():
"""
pip install PyTurboJPEG
"""
try:
import turbojpeg # NOQA
turbojpeg.TurboJPEG()
except Exception:
return False
else:
return True
def _have_gdal():
try:
from osgeo import gdal # NOQA
except Exception:
return False
else:
return True
def _imwrite_cloud_optimized_geotiff(fpath, data, compress='auto',
blocksize=256, overviews=None,
overview_resample='NEAREST',
interleave='PIXEL',
options=None,
nodata=None,
nodata_value=None,
metadata=None,
crs=None, transform=None):
"""
Writes data as a cloud-optimized geotiff using gdal
Args:
fpath (PathLike): file path to save the COG to.
data (ndarray[ndim=3]): Raw HWC image data to save. Dimensions should
be height, width, channels.
compress (bool): Can be JPEG (lossy) or LZW (lossless),
or DEFLATE (lossless). Can also be 'auto', which will try to
heuristically choose a sensible choice.
blocksize (int): size of tiled blocks
overviews (None | str | int | list):
If specified as a string, can be 'auto'.
if specified as a list, then uses exactly those overviews. If
specified as an integer a list is created using powers of two.
overview_resample (str): resampling method for
overview pyramid. Valid choices are: 'NEAREST', 'AVERAGE',
'BILINEAR', 'CUBIC', 'CUBICSPLINE', 'LANCZOS'.
options (List[str]): other gdal options. See [GDAL_GTiff_Options]_ for
details.
nodata_value (int):
if specified, uses this as the nodata value for each band.
transform (kwimage.Affine):
An affine transform from image coordinates into a specified
coordinate reference system (must set crs).
metadata (dict):
if specified this is interpreted as the metadata for the default
empty domain.
crs (str):
The coordinate reference system for the geo_transform.
Returns:
str: the file path where the data was written
References:
https://geoexamples.com/other/2019/02/08/cog-tutorial.html#create-a-cog-using-gdal-python
http://osgeo-org.1560.x6.nabble.com/gdal-dev-Creating-Cloud-Optimized-GeoTIFFs-td5320101.html
https://gdal.org/drivers/raster/cog.html
https://github.com/harshurampur/Geotiff-conversion
https://github.com/sshuair/cogeotiff
https://github.com/cogeotiff/rio-cogeo
https://gis.stackexchange.com/questions/1104/should-gdal-be-set-to-produce-geotiff-files-with-compression-which-algorithm-sh
.. [GDAL_GTiff_Options] https://gdal.org/drivers/raster/gtiff.html
https://gdal.org/drivers/raster/cog.html
Note:
Need to fix `CXXABI_1.3.11 not found` with conda gdal sometimes
CLI to reproduce:
python -c "import kwimage; kwimage.imread(kwimage.grab_test_image_fpath(), backend='gdal')"
This error seems to be fixed by using 2.3.3 instead of 3.x gdal, not
sure why, should look into that.
CommandLine:
xdoctest -m kwimage.im_io _imwrite_cloud_optimized_geotiff
Example:
>>> # xdoctest: +REQUIRES(module:osgeo)
>>> from kwimage.im_io import * # NOQA
>>> from kwimage.im_io import _imwrite_cloud_optimized_geotiff
>>> dpath = ub.Path.appdir('kwimage/test/imwrite_cog').ensuredir()
>>> fpath = dpath / 'tmp0.cog.tif'
>>> data = np.random.randint(0, 255, (800, 800, 3), dtype=np.uint8)
>>> compress = 'JPEG'
>>> _imwrite_cloud_optimized_geotiff(fpath, data, compress='JPEG')
>>> _imwrite_cloud_optimized_geotiff(fpath, data, compress='LZW')
>>> data = (np.random.rand(100, 100, 4) * 255).astype(np.uint8)
>>> _imwrite_cloud_optimized_geotiff(fpath, data, compress='JPEG')
>>> _imwrite_cloud_optimized_geotiff(fpath, data, compress='LZW')
>>> _imwrite_cloud_optimized_geotiff(fpath, data, compress='DEFLATE')
>>> data = (np.random.rand(100, 100, 5) * 255).astype(np.uint8)
>>> _imwrite_cloud_optimized_geotiff(fpath, data, compress='LZW')
>>> _imwrite_cloud_optimized_geotiff(fpath, data, overviews=3)
>>> from osgeo import gdal
>>> ds = gdal.Open(os.fspath(fpath), gdal.GA_ReadOnly)
>>> filename = ds.GetDescription()
>>> main_band = ds.GetRasterBand(1)
>>> assert main_band.GetOverviewCount() == 3
>>> _imwrite_cloud_optimized_geotiff(fpath, data, overviews=[2, 4])
Example:
>>> # xdoctest: +REQUIRES(module:osgeo)
>>> from kwimage.im_io import * # NOQA
>>> from kwimage.im_io import _imwrite_cloud_optimized_geotiff
>>> import kwimage
>>> # Test with uint16
>>> shape = (100, 100, 1)
>>> dtype = np.uint16
>>> dinfo = np.iinfo(np.uint16)
>>> data = kwimage.normalize(kwimage.gaussian_patch(shape))
>>> data = ((data - dinfo.min) * (dinfo.max - dinfo.min)).astype(dtype)
>>> dpath = ub.Path.appdir('kwimage/test/imwrite_cog').ensuredir()
>>> fpath = dpath / 'tmp1.tif'
>>> kwimage.imwrite(fpath, data)
>>> loaded = kwimage.imread(fpath)
>>> assert np.all(loaded.ravel() == data.ravel())
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.imshow(loaded / dinfo.max)
>>> kwplot.show_if_requested()
Example:
>>> # xdoctest: +REQUIRES(module:osgeo)
>>> # Test GDAL options
>>> from kwimage.im_io import * # NOQA
>>> from kwimage.im_io import _imwrite_cloud_optimized_geotiff
>>> import kwimage
>>> data = kwimage.grab_test_image()
>>> dpath = ub.Path.appdir('kwimage/test/imwrite_cog').ensuredir()
>>> fpath = dpath / 'tmp2.tif'
>>> kwimage.imwrite(fpath, data, compress='LZW', interleave='PIXEL', blocksize=64, options=['NUM_THREADS=ALL_CPUS'])
>>> _ = ub.cmd('gdalinfo ' + fpath, verbose=3)
>>> loaded = kwimage.imread(fpath)
>>> assert np.all(loaded.ravel() == data.ravel())
>>> # xdoctest: +REQUIRES(--show)
>>> # xdoctest: +REQUIRES(module:kwplot)
>>> import kwplot
>>> kwplot.autompl()
>>> dinfo = np.iinfo(np.uint16)
>>> kwplot.imshow(loaded / dinfo.max)
>>> kwplot.show_if_requested()
Example:
>>> # xdoctest: +REQUIRES(module:osgeo)
>>> # Test GDAL mdatadata
>>> import ubelt as ub
>>> import kwimage
>>> dpath = ub.Path.appdir('kwimage/tests', type='cache').ensuredir()
>>> fpath = dpath / 'foo.tif'
>>> kwimage.imwrite(fpath, np.random.rand(64, 64, 3), metadata={
>>> 'role': 'data',
>>> 'quantization': {'min': 0},
>>> })
>>> print(ub.cmd('gdalinfo ' + fpath)['out'])
Example:
>>> # xdoctest: +REQUIRES(module:osgeo)
>>> # xdoctest: +REQUIRES(--slow)
>>> # Test GDAL options
>>> from kwimage.im_io import * # NOQA
>>> from kwimage.im_io import _imwrite_cloud_optimized_geotiff
>>> import kwimage
>>> orig_data = kwimage.grab_test_image()
>>> dpath = ub.Path.appdir('kwimage/test/imwrite_cog').ensuredir()
>>> fpath = dpath / 'tmp2.tif'
>>> imwrite_param_basis = {
>>> 'interleave': ['BAND', 'PIXEL'],
>>> 'compress': ['NONE', 'DEFLATE'],
>>> 'blocksize': [64, 128, None],
>>> 'overviews': [None, 'auto'],
>>> }
>>> data_param_basis = {
>>> 'dsize': [(256, 256), (532, 202)],
>>> 'dtype': ['float32', 'uint8'],
>>> }
>>> data_param_grid = list(ub.named_product(data_param_basis))
>>> imwrite_param_grid = list(ub.named_product(imwrite_param_basis))
>>> for data_kwargs in data_param_grid:
>>> data = kwimage.imresize(orig_data, dsize=data_kwargs['dsize'])
>>> data = data.astype(data_kwargs['dtype'])
>>> for imwrite_kwargs in imwrite_param_grid:
>>> print('data_kwargs = {}'.format(ub.urepr(data_kwargs, nl=1)))
>>> print('imwrite_kwargs = {}'.format(ub.urepr(imwrite_kwargs, nl=1)))
>>> kwimage.imwrite(fpath, data, **imwrite_kwargs)
>>> _ = ub.cmd('gdalinfo ' + fpath, verbose=3)
>>> loaded = kwimage.imread(fpath)
>>> assert np.all(loaded == data)
"""
from osgeo import gdal
if len(data.shape) == 2:
data = data[:, :, None]
if options is None:
options = []
y_size, x_size, num_bands = data.shape
data_set = None
if compress == 'auto':
compress = _gdal_auto_compress(data=data)
# if compress not in ['JPEG', 'LZW', 'DEFLATE', 'RAW']:
# raise KeyError('unknown compress={}'.format(compress))
# JPEG/LZW/PACKBITS/DEFLATE/CCITTRLE/CCITTFAX3/CCITTFAX4/LZMA/ZSTD/LERC/LERC_DEFLATE/LERC_ZSTD/WEBP/NONE
if compress == 'JPEG' and num_bands >= 5:
raise ValueError('Cannot use JPEG with more than 4 channels (got {})'.format(num_bands))
eType = _numpy_to_gdal_dtype(data.dtype)
if compress == 'JPEG':
if eType not in [gdal.GDT_Byte, gdal.GDT_UInt16]:
raise ValueError('JPEG compression must use 8 or 16 bit integers')
# NEAREST/AVERAGE/BILINEAR/CUBIC/CUBICSPLINE/LANCZOS
if isinstance(overviews, str):
if overviews == 'auto':
smallest_overview_dim = 512
# Compute as many overviews as needed to get both dimensions < smallest_overview_dim
# unless that would cause one dimension to disolve
max_x_overviews = int(np.log2(y_size))
max_y_overviews = int(np.log2(x_size))
max_overviews = min(max_x_overviews, max_y_overviews)
y_overviews = y_size // smallest_overview_dim
x_overviews = x_size // smallest_overview_dim
request_overviews = max(y_overviews, x_overviews)
overviews = min(max_overviews, request_overviews)
if overviews is None:
overviewlist = []
elif isinstance(overviews, int):
overviewlist = (2 ** np.arange(1, overviews + 1)).tolist()
else:
overviewlist = overviews
_options = [
# We are still using the GTiff Driver instead of COG to have control
# over interleave
'BIGTIFF=YES',
]
if blocksize is not None:
_options += [
'TILED=YES',
'BLOCKXSIZE={}'.format(blocksize),
'BLOCKYSIZE={}'.format(blocksize),
]
if compress == 'RAW':
compress = 'NONE'
_options += ['COMPRESS={}'.format(compress)]
if compress == 'JPEG' and num_bands == 3:
# Using YCBCR speeds up jpeg compression by quite a bit
_options += ['PHOTOMETRIC=YCBCR']
# https://gdal.org/drivers/raster/gtiff.html#creation-options
if interleave == 'BAND':
# For 1-band images I don' think this matters?
_options += ['INTERLEAVE=BAND']
elif interleave == 'PIXEL':
_options += ['INTERLEAVE=PIXEL']
else:
raise KeyError(interleave)
if overviewlist:
_options.append('COPY_SRC_OVERVIEWS=YES')
_options += options
_options = list(map(str, _options)) # python2.7 support
# Create an in-memory dataset where we will prepare the COG data structure
driver = gdal.GetDriverByName(str('MEM'))
data_set = driver.Create(str(''), x_size, y_size, num_bands, eType=eType)
if transform is not None or crs is not None:
# TODO: add ability to add RPC
if crs is None or transform is None:
raise ValueError('Specify transform and crs together')
# TODO: Allow transform to be a normal gdal object or something
# coercable to an affine object.
if isinstance(transform, tuple):
# assume we have a normal gdal tuple
aff_gdal = transform
else:
aff_gdal = transform.to_gdal()
data_set.SetProjection(crs)
data_set.SetGeoTransform(aff_gdal)
if metadata is not None:
data_set.SetMetadata(metadata)
if nodata is not None:
ub.schedule_deprecation(
modname='kwimage', name='nodata',
type='argument to _imwrite_gdal',
migration='use nodata_value instead',
deprecate='0.9.5', error='0.10.0', remove='0.11.0')
if nodata_value is None:
nodata_value = nodata
for i in range(num_bands):
band_data = np.ascontiguousarray(data[:, :, i])
band = data_set.GetRasterBand(i + 1)
band.WriteArray(band_data)
if nodata_value is not None:
band.SetNoDataValue(nodata_value)
# TODO:
# could set the color interpretation here
band = None
if overviewlist:
# Build the downsampled overviews (for fast zoom in / out)
data_set.BuildOverviews(str(overview_resample), overviewlist)
driver = None
# Copy the in-memory dataset to an on-disk GeoTiff
driver2 = gdal.GetDriverByName(str('GTiff'))
data_set2 = driver2.CreateCopy(os.fspath(fpath), data_set, options=_options)
data_set = None
# OK, so setting things to None turns out to be important. Gah!
# NOTE: if data_set2 is None here, that may be because the directory
# we are trying to write to does not exist.
if data_set2 is None:
last_gdal_error = gdal.GetLastErrorMsg()
if 'No such file or directory' in last_gdal_error:
ex_cls = IOError
else:
ex_cls = Exception
raise ex_cls(
'Unable to create gtiff driver for fpath={}, options={}, last_gdal_error={}'.format(
fpath, _options, last_gdal_error))
data_set2.FlushCache()
# Dereference everything
data_set2 = None
driver2 = None
return fpath
def _numpy_to_gdal_dtype(numpy_dtype):
"""
maps numpy dtypes to gdal dtypes
"""
from osgeo import gdal
if not hasattr(numpy_dtype, 'kind'):
# convert to the dtype instance object
numpy_dtype = numpy_dtype().dtype
kindsize = (numpy_dtype.kind, numpy_dtype.itemsize)
if kindsize == ('u', 1):
eType = gdal.GDT_Byte
elif kindsize == ('u', 2):
eType = gdal.GDT_UInt16
elif kindsize == ('u', 4):
eType = gdal.GDT_UInt32
elif kindsize == ('i', 2):
eType = gdal.GDT_Int16
elif kindsize == ('i', 4):
eType = gdal.GDT_Int32
elif kindsize == ('f', 4):
eType = gdal.GDT_Float32
elif kindsize == ('f', 8):
eType = gdal.GDT_Float64
elif kindsize == ('c', 8):
eType = gdal.GDT_CFloat32
elif kindsize == ('c', 16):
eType = gdal.GDT_CFloat64
else:
raise TypeError('Unsupported GDAL dtype for {}'.format(kindsize))
return eType
def _gdal_to_numpy_dtype(gdal_dtype):
"""
maps gdal dtypes to numpy dtypes
Example:
>>> # xdoctest: +REQUIRES(module:osgeo)
>>> numpy_types = [np.uint8, np.uint16, np.int16, np.uint32, np.int32,
>>> np.float32, np.float64, np.complex64,
>>> np.complex128]
>>> for np_type in numpy_types:
>>> numpy_dtype1 = np_type().dtype
>>> gdal_dtype1 = _numpy_to_gdal_dtype(numpy_dtype1)
>>> numpy_dtype2 = _gdal_to_numpy_dtype(gdal_dtype1)
>>> gdal_dtype2 = _numpy_to_gdal_dtype(numpy_dtype2)
>>> assert gdal_dtype2 == gdal_dtype1
>>> assert _dtype_equality(numpy_dtype1, numpy_dtype2)
"""
from osgeo import gdal
_GDAL_DTYPE_LUT = {
gdal.GDT_Byte: np.uint8,
gdal.GDT_UInt16: np.uint16,
gdal.GDT_Int16: np.int16,
gdal.GDT_UInt32: np.uint32,
gdal.GDT_Int32: np.int32,
gdal.GDT_Float32: np.float32,
gdal.GDT_Float64: np.float64,
gdal.GDT_CInt16: np.complex_,
gdal.GDT_CInt32: np.complex_,
gdal.GDT_CFloat32: np.complex64,
gdal.GDT_CFloat64: np.complex128
}
return _GDAL_DTYPE_LUT[gdal_dtype]
def _gdal_auto_compress(src_fpath=None, data=None, data_set=None):
"""
Heuristic for automatically choosing gdal compression type
Args:
src_fpath (str): path to source image if known
data (ndarray): data pixels if known
data_set (gdal.Dataset): gdal dataset if known
Returns:
str: gdal compression code
References:
https://kokoalberti.com/articles/geotiff-compression-optimization-guide/
Example:
>>> # xdoctest: +REQUIRES(module:osgeo)
>>> assert _gdal_auto_compress(src_fpath='foo.jpg') == 'JPEG'
>>> assert _gdal_auto_compress(src_fpath='foo.png') == 'LZW'
>>> if not ub.WIN32:
>>> assert _gdal_auto_compress(data=np.random.rand(3, 2)) == 'DEFLATE'
>>> assert _gdal_auto_compress(data=np.random.rand(3, 2, 3).astype(np.uint8)) == 'DEFLATE'
>>> assert _gdal_auto_compress(data=np.random.rand(3, 2, 4).astype(np.uint8)) == 'DEFLATE'
>>> assert _gdal_auto_compress(data=np.random.rand(3, 2, 1).astype(np.uint8)) == 'DEFLATE'
"""
compress = None
num_channels = None
dtype = None
if src_fpath is not None:
# the filepath might hint at which compress method is best.
ext = src_fpath[-5:].lower()
if ext.endswith(('.jpg', '.jpeg')):
compress = 'JPEG'
elif ext.endswith(('.png', '.png')):
compress = 'LZW'
if compress is None:
if data_set is not None:
if dtype is None:
main_band = data_set.GetRasterBand(1)
dtype = _gdal_to_numpy_dtype(main_band.DataType)
# if num_channels is None:
# data_set.RasterCount == 3
elif data is not None:
if dtype is None:
dtype = data.dtype
if num_channels is None:
if len(data.shape) == 3:
num_channels = data.shape[2]
# if compress is None:
# if _dtype_equality(dtype, np.uint8) and num_channels == 3:
# compress = 'JPEG'
if compress is None:
# which backend is best in this case?
if ub.WIN32:
# windows seems to have issues with deflate, not sure why
compress = 'LZW'
else:
compress = 'DEFLATE'
return compress
def _dtype_equality(dtype1, dtype2):
"""
Check for numpy dtype equality
References:
https://stackoverflow.com/questions/26921836/correct-way-to-test-for-numpy-dtype
Example:
dtype1 = np.empty(0, dtype=np.uint8).dtype
dtype2 = np.uint8
_dtype_equality(dtype1, dtype2)
"""
dtype1_ = getattr(dtype1, 'type', dtype1)
dtype2_ = getattr(dtype2, 'type', dtype2)
return dtype1_ == dtype2_
def _imread_svg(fpath):
"""
References:
https://pypi.org/project/svglib/
Ignore:
# xdoctest: +REQUIRES(module:svglib)
# xdoctest: +REQUIRES(module:reportlab)
from kwimage.im_io import * # NOQA
from kwimage.im_io import _imread_svg # NOQA
import kwimage
fpath = ub.grabdata('https://upload.wikimedia.org/wikipedia/commons/a/aa/Philips_PM5544.svg')
# This doesnt work quite how I would expect it to.
imdata, _, _ = _imread_svg(fpath)
image = kwimage.imread(fpath)
import kwplot
kwplot.autompl()
kwplot.imshow(image)
"""
from reportlab.graphics import renderPM
from svglib.svglib import svg2rlg
import io
from PIL import Image
file = io.BytesIO()
drawing = svg2rlg(fpath)
renderPM.drawToFile(drawing, file, fmt="PNG")
file.seek(0)
pil_img = Image.open(file)
imdata = np.asarray(pil_img)
src_space = auto_dst_space = 'rgb'
return imdata, src_space, auto_dst_space