Source code for shareloc.dtm_reader

#!/usr/bin/env python
# coding: utf8
#
# Copyright (c) 2022 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of Shareloc
# (see https://github.com/CNES/shareloc).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Image DTM Image to handle DTM image data.
Inherits from Image Class with DTM particularities.
"""

# Standard imports
import logging

# Third party imports
import numpy as np
from rasterio.fill import fillnodata
from scipy import interpolate

# Shareloc imports
from shareloc.image import Image
from shareloc.proj_utils import (
    coordinates_conversion,
    transform_index_to_physical_point,
    transform_physical_point_to_index,
)


# pylint: disable=invalid-name
[docs] class dtm_reader(Image): """ class dtm_reader to handle DTM image data Inherits from Image Class with DTM particularities """ # pylint: disable=too-many-arguments def __init__( self, dtm_filename, geoid_filename=None, roi=None, roi_is_in_physical_space=False, fill_nodata="rio_fillnodata", fill_value=None, ): """ constructor :param dtm_filename: dtm filename :type dtm_filename: string :param geoid_filename: geoid filename, if None datum is ellispoid :type geoid_filename: string :param roi: region of interest [row_min,col_min,row_max,col_max] or [xmin,y_min,x_max,y_max] if roi_is_in_physical_space activated :type roi: list :param roi_is_in_physical_space: roi value in physical space :type roi_is_in_physical_space: bool :param fill_nodata: fill_nodata strategy in None/'constant'/'min'/'median'/'max'/'mean'/'rio_fillnodata'/ :type fill_nodata: str :param fill_value: fill value for constant strategy. fill value is used for 'roi_fillnodata' residuals nodata, if None 'min' is used :type fill_value: float """ super().__init__(dtm_filename, read_data=True, roi=roi, roi_is_in_physical_space=roi_is_in_physical_space)
[docs] self.dtm_filename = dtm_filename
[docs] self.geoid_filename = geoid_filename
[docs] self.stats = {}
if self.mask is not None: self.mask[np.isnan(self.data)] = 0 else: # no nodata in dtm, we create a mask with nans self.mask = (1 - np.isnan(self.data)) * 255 valid_data = self.data[self.mask[:, :] == 255] self.stats["min"] = np.min(valid_data) self.stats["max"] = np.max(valid_data) self.stats["mean"] = np.mean(valid_data) self.stats["median"] = np.median(valid_data) if fill_nodata is not None: self.fill_nodata(strategy=fill_nodata, fill_value=fill_value)
[docs] self.alt_data = self.data[:, :].astype("float64")
if geoid_filename is not None: logging.debug("remove geoid height") self.grid_row, self.grid_col = np.mgrid[0 : self.nb_rows : 1, 0 : self.nb_columns : 1] lat, lon = transform_index_to_physical_point(self.transform, self.grid_row, self.grid_col) positions = np.vstack([lon.flatten(), lat.flatten()]).transpose() if self.epsg != 4326: positions = coordinates_conversion(positions, self.epsg, 4326) geoid_height = interpolate_geoid_height(geoid_filename, positions) self.alt_data += geoid_height.reshape(lon.shape) else: logging.debug("no geoid file is given dtm is assumed to be w.r.t ellipsoid")
[docs] self.trans_inv = self.trans_inv.to_gdal()
[docs] self.transform = self.transform.to_gdal()
[docs] def fill_nodata(self, strategy="rio_fillnodata", max_search_distance=100.0, smoothing_iterations=0, fill_value=0.0): """ fill nodata in DTM image, nan values in dtm are also filled :param strategy: fill strategy ('constant'/'min'/'median'/'max'/'mean'/'rio_fillnodata'/) :type strategy: str :param max_search_distance: fill max_search_distance :type max_search_distance: float :param smoothing_iterations: smoothing_iterations :type smoothing_iterations: int :param fill_value: fill value for constant strategy. fill value is used for 'roi_fillnodata' residuals nodata, if None 'min' is used :type fill_value: float """ if strategy in self.stats: self.data[self.mask[:, :] == 0] = self.stats[strategy] elif strategy == "rio_fillnodata": self.data = fillnodata(self.data, self.mask[:, :], max_search_distance, smoothing_iterations) if np.sum(self.data[self.mask[:, :] == 0] == self.nodata) != 0: if fill_value is None: fill_value = self.stats["min"] logging.info("Shareloc dtm_reader: not all nodata have been filled, fill with %d", fill_value) self.data[self.data[:, :] == self.nodata] = fill_value elif strategy == "constant": self.data[self.mask[:, :] == 0] = fill_value else: logging.warning("Shareloc dtm_reader: fill nodata strategy not available")
[docs] def interpolate_geoid_height(geoid_filename, positions, interpolation_method="linear"): """ terrain to index conversion retrieve geoid height above ellispoid :param geoid_filename: geoid_filename :type geoid_filename: str :param positions: geodetic coordinates :type positions: 2D numpy array: (number of points, [long coord, lat coord]) :param interpolation_method: default is 'linear' (interpn interpolation method) :type interpolation_method: str :return: geoid height :rtype: numpy array (number of points) """ geoid_image = Image(geoid_filename, read_data=True) # Check longitude overlap is not present, rounding to handle egm2008 with rounded pixel size if geoid_image.nb_columns * geoid_image.pixel_size_col - 360 < 10**-8: logging.debug("add one pixel overlap on longitudes") geoid_image.nb_columns += 1 # Check if we can add a column geoid_image.data = np.column_stack((geoid_image.data[:, :], geoid_image.data[:, 0])) # Prepare grid for interpolation row_indexes = np.arange(0, geoid_image.nb_rows, 1) col_indexes = np.arange(0, geoid_image.nb_columns, 1) points = (row_indexes, col_indexes) # add modulo lon/lat min_lon = geoid_image.origin_col + geoid_image.pixel_size_col / 2 max_lon = ( geoid_image.origin_col + geoid_image.nb_columns * geoid_image.pixel_size_col - geoid_image.pixel_size_col / 2 ) positions[:, 0] += ((positions[:, 0] + min_lon) < 0) * 360.0 positions[:, 0] -= ((positions[:, 0] - max_lon) > 0) * 360.0 if np.any(np.abs(positions[:, 1]) > 90.0): raise RuntimeError("Geoid cannot handle latitudes greater than 90 deg.") indexes_geoid = transform_physical_point_to_index(geoid_image.trans_inv, positions[:, 1], positions[:, 0]) return interpolate.interpn( points, geoid_image.data[:, :], indexes_geoid, method=interpolation_method, bounds_error=False, fill_value=np.nan, )