Source code for plio.io.io_gdal

import json
import math
import os
import warnings

import affine
import numpy as np
import pvl


from plio.io import extract_metadata, conditional_gdal
from plio.geofuncs import geofuncs
from plio.utils.utils import find_in_dict
from plio.io import gdal, ogr, osr

NP2GDAL_CONVERSION = {
  "byte": 1,
  "uint8": 1,
  "int8": 1,
  "uint16": 2,
  "int16": 3,
  "uint32": 4,
  "int32": 5,
  "float32": 6,
  "float64": 7,
  "complex64": 10,
  "complex128": 11,
}

GDAL2NP_CONVERSION = {v:k for k,v in NP2GDAL_CONVERSION.items()}

DEFAULT_PROJECTIONS = {'mars':'GEOGCS["Mars 2000",DATUM["D_Mars_2000",SPHEROID["Mars_2000_IAU_IAG",3396190.0,169.89444722361179]],PRIMEM["Greenwich",0],UNIT["Decimal_Degree",0.0174532925199433]]',
                       'moon':'GEOGCS["Moon 2000",DATUM["D_Moon_2000",SPHEROID["Moon_2000_IAU_IAG",1737400.0,0.0]],PRIMEM["Greenwich",0],UNIT["Decimal_Degree",0.0174532925199433]]'}
DEFAULT_RADII = {'mars': 3396190.0}


[docs]class GeoDataset(object): """ Geospatial dataset object that represents. Parameters ---------- file_name : str The name of the input image, including its full path. Attributes ---------- base_name : str The base name of the input image, extracted from the full path. bounding_box : object The bounding box of the image in lat/lon space geotransform : object Geotransform reference OGR object as an array of size 6 containing the affine transformation coefficients for transforming from raw sample/line to projected x/y. xproj = geotransform[0] + sample * geotransform[1] + line * geotransform[2] yproj = geotransform[3] + sample * geotransform[4] + line * geotransform[5] geospatial_coordinate_system : object Geospatial coordinate system OSR object. latlon_extent : list of two tuples containing the latitide/longitude boundaries. This list is in the form [(lowerlat, lowerlon), (upperlat, upperlon)]. pixel_width : float The width of the image pixels (i.e. displacement in the x-direction). Note: This is the second value geotransform array. pixel_height : float The height of the image pixels (i.e. displacement in the y-direction). Note: This is the sixth (last) value geotransform array. spatial_reference : object Spatial reference system OSR object. standard_parallels : list of the standard parallels used by the map projection found in the metadata using the spatial reference for this GeoDataset. unit_type : str Name of the unit used by the raster, e.g. 'm' or 'ft'. x_rotation : float The geotransform coefficient that represents the rotation about the x-axis. Note: This is the third value geotransform array. xy_extent : list of two tuples containing the sample/line boundaries. The first value is the upper left corner of the upper left pixel and the second value is the lower right corner of the lower right pixel. This list is in the form [(minx, miny), (maxx, maxy)]. xy_corners : list of tuple corner coordinates in the form: [upper left, lower left, lower right, upper right] y_rotation : float The geotransform coefficient that represents the rotation about the y-axis. Note: This is the fifth value geotransform array. coordinate_transformation : object The coordinate transformation from the spatial reference system to the geospatial coordinate system. inverse_coordinate_transformation : object The coordinate transformation from the geospatial coordinate system to the spatial reference system. scale : tuple The name and value of the linear projection units of the spatial reference system. This tuple is of type string/float of the form (unit name, value). To transform a linear distance to meters, multiply by this value. If no units are available ("Meters", 1) will be returned. spheroid : tuple The spheroid found in the metadata using the spatial reference system. This is of the form (semi-major, semi-minor, inverse flattening). raster_size : tuple The dimensions of the raster, i.e. (number of samples, number of lines). central_meridian : float The central meridian of the map projection from the metadata. no_data_value : float Special value used to indicate pixels that are not valid. metadata : dict A dictionary of available image metadata footprint : object An OGR footprint object """ def __init__(self, file_name): """ Initialization method to set the file name and open the file using GDAL. Parameters ---------- file_name : str The file name to set and open. """ self.file_name = file_name if not gdal: raise ImportError('No module name gdal.') self.dataset = gdal.Open(file_name) if self.dataset is None: raise IOError('File not found :', file_name) def __repr__(self): return os.path.basename(self.file_name) def __reduce__(self): return self.__class__, (self.file_name,) @property def base_name(self): if not getattr(self, '_base_name', None): self._base_name = os.path.splitext(os.path.basename(self.file_name))[0] return self._base_name @property def geotransform(self): """ Where the array is in the form: [top left x, w-e pixel resolution, x-rotation, top left y, y-rotation, n-s pixel resolution] """ if not getattr(self, '_geotransform', None): try: self._geotransform = self.dataset.GetGeoTransform() except: coords = json.loads(self.footprint.ExportToJson())['coordinates'][0][0] ul, ur, lr, ll = geofuncs.find_four_corners(coords) xsize, ysize = self.raster_size xstart = ul[0] xscale = (ur[0] - xstart) / xsize xskew = (ll[0] - xstart) / ysize ystart = ul[1] yskew = (ur[1] - ystart) / xsize yscale = (ll[1] - ystart) / ysize self._geotransform = [xstart, xscale, xskew, ystart, yskew, yscale] return self._geotransform @property def forward_affine(self): self._fa = affine.Affine.from_gdal(*self.geotransform) return self._fa @property def inverse_affine(self): # If det(A) == 0, the transformation is degenerate if self.forward_affine.is_degenerate: return None self._ia = ~self.forward_affine return self._ia @property def standard_parallels(self): if not getattr(self, '_standard_parallels', None): self._standard_parallels = extract_metadata.get_standard_parallels(self.spatial_reference) return self._standard_parallels @property def unit_type(self): if not getattr(self, '_unit_type', None): self._unit_type = self.dataset.GetRasterBand(1).GetUnitType() return self._unit_type @property def north_up(self): return True if self.footprint: return geofuncs.is_clockwise(json.loads(self.footprint.ExportToJson())['coordinates'][0][0]) else: return True @property def spatial_reference(self): if not getattr(self, '_srs', None): self._srs = osr.SpatialReference() proj = self.dataset.GetProjection() if proj: self._srs.ImportFromWkt(self.dataset.GetProjection()) else: target = find_in_dict(self.metadata, 'TargetName') self._srs.ImportFromWkt(DEFAULT_PROJECTIONS[target.lower()]) try: self._srs.MorphToESRI() self._srs.MorphFromESRI() except: pass #pragma: no cover #Setup the GCS self._gcs = self._srs.CloneGeogCS() return self._srs @property def nbands(self): if not getattr(self, '_nbands', None): self._nbands = self.dataset.RasterCount return self._nbands @property def geospatial_coordinate_system(self): if not getattr(self, '_gcs', None): self._gcs = self.spatial_reference.CloneGeogCS() return self._gcs @property def metadata(self): if not hasattr(self, '_metadata'): try: self._metadata = pvl.load(self.file_name) except: self._metadata = self.dataset.GetMetadata() return self._metadata @property def footprint(self): if not hasattr(self, '_footprint'): # Try to get the footprint from the image try: polygon_pvl = find_in_dict(self.metadata, 'Polygon') start_polygon_byte = find_in_dict(polygon_pvl, 'StartByte') num_polygon_bytes = find_in_dict(polygon_pvl, 'Bytes') # I too dislike the additional open here. Not sure a good option with open(self.file_name, 'r') as f: f.seek(start_polygon_byte - 1) # Sloppy unicode to string because GDAL pukes on unicode stream = str(f.read(num_polygon_bytes)) self._footprint = ogr.CreateGeometryFromWkt(stream) except: self._footprint = None # If the image does not have a footprint, try getting the image from projected # coordinates try: # Get the lat lon corners lat = [i[0] for i in self.latlon_corners] lon = [i[1] for i in self.latlon_corners] # Compute a ogr geometry for the tiff which # provides leverage for overlaps ring = ogr.Geometry(ogr.wkbLinearRing) for point in [*zip(lon, lat)]: ring.AddPoint(*point) ring.AddPoint(lon[0], lat[0]) poly = ogr.Geometry(ogr.wkbPolygon) poly.AddGeometry(ring) poly.FlattenTo2D() self._footprint = poly except: self._footprint = None return self._footprint @property def xy_corners(self): return [(0, 0), (0, self.dataset.RasterYSize), (self.dataset.RasterXSize,self.dataset.RasterYSize), (self.dataset.RasterXSize, 0)] @property def proj_corners(self): # The corner coordinates in projected space raise NotImplementedError @property def latlon_extent(self): if not getattr(self, '_latlon_extent', None): if self.footprint: fp = self.footprint # If we have a footprint, do not worry about computing a lat/lon transform minx, maxx, miny, maxy = fp.GetEnvelope() self._latlon_extent = [(minx, miny), (maxx, maxy)] else: # Attempt to compute a basic bounding box footprint self._latlon_extent = [] xy_extent = self.xy_extent maxlon, minlat = self.pixel_to_latlon(*xy_extent[0]) minlon, maxlat = self.pixel_to_latlon(*xy_extent[1]) self._latlon_extent.append((minlat, minlon)) self._latlon_extent.append((maxlat, maxlon)) return self._latlon_extent @property def latlon_corners(self): if not getattr(self, '_latlon_corners', None): if self.footprint: fp = self.footprint minx, maxx, miny, maxy = fp.GetEnvelope() self._latlon_corners = [(minx, maxy), (minx, miny), (maxx, miny), (maxx, maxy)] else: self._latlon_corners = [] for x, y in self.xy_corners: x, y = self.pixel_to_latlon(x,y) self._latlon_corners.append((x,y)) return self._latlon_corners @property def xy_extent(self): return [(0, 0), (self.dataset.RasterXSize, self.dataset.RasterYSize)] @property def proj_extent(self): # The extent in projected space raise NotImplementedError @property def pixel_area(self): if not getattr(self, '_pixel_area', None): extent = self.xy_extent self._pixel_area = extent[1][0] * extent[1][1] return self._pixel_area @property def pixel_width(self): if not getattr(self, '_pixel_width', None): self._pixel_width = self.geotransform[1] return self._pixel_width @property def pixel_height(self): if not getattr(self, '_pixel_height', None): self._pixel_height = self.geotransform[5] return self._pixel_height @property def x_rotation(self): if not getattr(self, '_x_rotation', None): self._x_rotation = self.geotransform[2] return self._x_rotation @property def y_rotation(self): if not getattr(self, '_y_rotation', None): self._y_rotation = self.geotransform[4] return self._y_rotation @property def coordinate_transformation(self): if not getattr(self, '_ct', None): self._ct = osr.CoordinateTransformation(self.spatial_reference, self.geospatial_coordinate_system) return self._ct @property def inverse_coordinate_transformation(self): if not getattr(self, '_ict', None): self._ict = osr.CoordinateTransformation(self.geospatial_coordinate_system, self.spatial_reference) return self._ict @property def no_data_value(self): if not getattr(self, '_no_data_value', None): self._no_data_value = self.dataset.GetRasterBand(1).GetNoDataValue() return self._no_data_value @property def scale(self): if not getattr(self, '_scale', None): unitname = self.spatial_reference.GetLinearUnitsName() value = self.spatial_reference.GetLinearUnits() self._scale = (unitname, value) return self._scale @property def spheroid(self): if not getattr(self, '_spheroid', None): self._spheroid = extract_metadata.get_spheroid(self.spatial_reference) return self._spheroid @property def raster_size(self): if not getattr(self, '_raster_size', None): self._raster_size = (self.dataset.RasterXSize, self.dataset.RasterYSize) return self._raster_size @property def central_meridian(self): if not getattr(self, '_central_meridian', None): self._central_meridian = extract_metadata.get_central_meridian(self.spatial_reference) return self._central_meridian
[docs] def pixel_to_latlon(self, x, y): """ Convert from pixel space (i.e. sample/line) to lat/lon space. Parameters ---------- x : float x-coordinate to be transformed. y : float y-coordinate to be transformed. Returns ------- lat, lon : tuple (Latitude, Longitude) corresponding to the given (x,y). """ lon, lat = self.forward_affine * (x,y) lon, lat, _ = self.coordinate_transformation.TransformPoint(lon, lat) return lat, lon
[docs] def latlon_to_pixel(self, lat, lon): """ Convert from lat/lon space to pixel space (i.e. sample/line). Parameters ---------- lat: float Latitude to be transformed. lon : float Longitude to be transformed. Returns ------- x, y : tuple (Sample, line) position corresponding to the given (latitude, longitude). """ lon, lat, _ = self.inverse_coordinate_transformation.TransformPoint(lon, lat) px, py = map(int, self.inverse_affine * (lon, lat)) return px, py
[docs] def read_array(self, band=1, pixels=None, dtype=None): """ Extract the required data as a NumPy array Parameters ---------- band : int The image band number to be extracted as a NumPy array. Default band=1. pixels : list [xstart, ystart, xcount, ycount]. Default pixels=None. dtype : str The NumPy dtype for the output array. Defaults to the band dtype. Returns ------- array : ndarray The dataset for the specified band. """ band = self.dataset.GetRasterBand(band) if dtype is None: dtype = GDAL2NP_CONVERSION[band.DataType] dtype = getattr(np, dtype) if not pixels: array = band.ReadAsArray().astype(dtype) #if self.north_up == False: # array = np.flipud(array) else: # Check that the read start is not outside of the image xstart, ystart, xcount, ycount = pixels xmax, ymax = map(int, self.xy_extent[1]) # If the image is south up, flip the roi #if self.north_up == False: # ystart = ymax - (ystart + ycount) if xstart < 0: xstart = 0 if ystart < 0: ystart = 0 if xstart + xcount > xmax: xcount = xmax - xstart if ystart + ycount > ymax: ycount = ymax - ystart array = band.ReadAsArray(xstart, ystart, xcount, ycount).astype(dtype) return array
def compute_overlap(self, geodata, **kwargs): return geofuncs.compute_overlap(self, geodata, **kwargs)
[docs]def array_to_raster(array, file_name, projection=None, geotransform=None, outformat='GTiff', ndv=None, bittype='GDT_Float64'): """ Converts the given NumPy array to a raster format using the GeoDataset class. Parameters ---------- array : ndarray The data to be written via GDAL file_name : str The output file PATH (relative or absolute) projection : object A GDAL readable projection object, WKT string, PROJ4 string, etc. Default: None geotransform : object A six parameter geotransformation Default:None. outformat : str A GDAL supported output format Default: 'GTiff'. ndv : float The no data value for the given band. Default: None. bittype : str A GDAL supported bittype, e.g. GDT_Int32 Default: GDT_Float64 """ if not gdal: raise ImportError('No module named gdal.') driver = gdal.GetDriverByName(outformat) try: y, x, bands = array.shape single = False except: bands = 1 y, x = array.shape single = True dataset = driver.Create(file_name, x, y, bands, getattr(gdal, bittype)) if geotransform: dataset.SetGeoTransform(geotransform) if projection: if isinstance(projection, str): dataset.SetProjection(projection) else: dataset.SetProjection(projection.ExportToWkt()) if single == True: bnd = dataset.GetRasterBand(1) if ndv != None: bnd.SetNoDataValue(ndv) bnd.WriteArray(array) dataset.FlushCache() else: for i in range(1, bands + 1): bnd = dataset.GetRasterBand(i) if ndv != None: bnd.SetNoDataValue(ndv) bnd.WriteArray(array[:,:,i - 1]) dataset.FlushCache()
@conditional_gdal def match_rasters(match_to, match_from, destination, resampling_method='GRA_Bilinear', ndv=0): """ Match a source raster to a match raster, including resolution and extent. Parameters ========== match_to : object A GeoDataSet object to be matched to match_from : object A GeoDataSet object to be clipped destination : str PATH where the output will be written resampling_method : str Resampling method to use. Options include: {GRA_NearestNeighbor, GRA_Bilinear (Default), GRA_Cubic, GRA_CubicSpline, GRA_Lanczos, GRA_Average, GRA_Mode} Examples ======== Open an example Apollo 15 Geotiff >>> from plio import get_path >>> from plio.io.io_gdal import GeoDataSet >>> ds = GeoDataSet(get_path('AS15-M-0296_SML_geo.tif')) """ import gdalconst # import here so Sphinx can build the docos, mocking is not working # TODO: If a destination is not provided create an in-memory GeoDataSet object match_to_srs = match_to.dataset.GetProjection() match_to_gt = match_to.geotransform width, height = match_to.raster_size match_from__srs = match_from.dataset.GetProjection() match_from__gt = match_from.geotransform dst = gdal.GetDriverByName('GTiff').Create(destination, width, height, 1, gdalconst.GDT_Float64) dst.SetGeoTransform(match_to_gt) dst.SetProjection(match_to_srs) dst.GetRasterBand(1).SetNoDataValue(ndv) gdal.ReprojectImage(match_from.dataset, dst, None, None, getattr(gdalconst, resampling_method))