Source code for pystac.utils

from __future__ import annotations

import os
import posixpath
import warnings
from datetime import datetime, timezone
from enum import Enum
from typing import (
    Any,
    Callable,
    TypeVar,
    Union,
    cast,
)
from urllib.parse import ParseResult as URLParseResult
from urllib.parse import urljoin, urlparse, urlunparse

import dateutil.parser

from pystac.errors import RequiredPropertyMissing

HREF = Union[str, os.PathLike[str]]


[docs]def make_posix_style(href: HREF) -> str: """Converts double back slashes and single back slashes to single forward slashes for converting Windows paths to Posix style. Args: href (Union[str, os.PathLike]) : The href string or path-like object. Returns: str : The converted href in string form. """ _href = str(os.fspath(href)) return _href.replace("\\\\", "/").replace("\\", "/")
[docs]def safe_urlparse(href: str) -> URLParseResult: """Wrapper around :func:`urllib.parse.urlparse` that returns consistent results for both Windows and UNIX file paths. For Windows paths, this function will include the drive prefix (e.g. ``"D:\\"``) as part of the ``path`` of the :class:`urllib.parse.ParseResult` rather than as the ``scheme`` for consistency with handling of UNIX/LINUX file paths. Args: href (str) : The HREF to parse. May be a local file path or URL. Returns: urllib.parse.ParseResult : The named tuple representing the parsed HREF. """ parsed = urlparse(href) if parsed.scheme != "" and ( href.lower().startswith(f"{parsed.scheme}:\\") or ( href.lower().startswith(f"{parsed.scheme}:/") and not href.lower().startswith(f"{parsed.scheme}://") ) ): return URLParseResult( scheme="", netloc="", path="{}:{}".format( # We use this more complicated formulation because parsed.scheme # converts to lower-case href[: len(parsed.scheme)], parsed.path, ), params=parsed.params, query=parsed.query, fragment=parsed.fragment, ) else: return parsed
[docs]class StringEnum(str, Enum): """Base :class:`enum.Enum` class for string enums that will serialize as the string value.""" def __repr__(self) -> str: return str(self.value) def __str__(self) -> str: return cast(str, self.value)
[docs]class JoinType(StringEnum): """DEPRECATED. .. deprecated:: 1.8.0 No longer used internally by pystac. Allowed join types for :func:`~pystac.utils.join_path_or_url`. """
[docs] @staticmethod def from_parsed_uri(parsed_uri: URLParseResult) -> JoinType: """DEPRECATED. .. deprecated:: 1.8.0 No longer used internally by pystac. Determines the appropriate join type based on the scheme of the parsed result. Args: parsed_uri (urllib.parse.ParseResult) : A named tuple representing the parsed URI. Returns: JoinType : The join type for the URI. """ warnings.warn( message=( "from_parsed_uri is deprecated and will be removed in pystac " "version 2.0.0. It is no longer used internally by pystac." ), category=DeprecationWarning, ) if parsed_uri.scheme == "": return JoinType.PATH else: return JoinType.URL
PATH = "path" URL = "url"
[docs]def join_path_or_url(join_type: JoinType, *args: str) -> str: """DEPRECATED. .. deprecated:: 1.8.0 No longer used internally by pystac. Functions similarly to :func:`os.path.join`, but can be used to join either a local file path or a URL. Args: join_type (JoinType) : One of ``JoinType.PATH`` or ``JoinType.URL``. If ``JoinType.PATH``, then :func:`os.path.join` is used for the join. If ``JoinType.URL``, then :func:`posixpath.join` is used. *args (str): Additional positional string arguments to be joined. Returns: str : The joined path """ warnings.warn( message=( "join_path_or_url is deprecated and will be removed in pystac " "version 2.0.0. It is no longer used internally by pystac." ), category=DeprecationWarning, ) if join_type == JoinType.PATH: return os.path.join(*args) else: return posixpath.join(*args)
def _make_relative_href_url( parsed_source: URLParseResult, parsed_start: URLParseResult, start_is_dir: bool = False, ) -> str: # If the start path is not a directory, get the parent directory start_dir = ( parsed_start.path if start_is_dir else os.path.dirname(parsed_start.path) ) # Strip the leading slashes from both paths start_dir = start_dir.lstrip("/") source_path = parsed_source.path.lstrip("/") # Get the relative path rel_url = posixpath.relpath(source_path, start_dir) # Ensure we retain a trailing slash from the original source path if parsed_source.path.endswith("/"): rel_url += "/" # Prepend the "./", if necessary if rel_url != "./" and not rel_url.startswith("../"): rel_url = "./" + rel_url return rel_url def _make_relative_href_path( parsed_source: URLParseResult, parsed_start: URLParseResult, start_is_dir: bool = False, ) -> str: # If the start path is not a directory, get the parent directory start_dir = ( parsed_start.path if start_is_dir else os.path.dirname(parsed_start.path) ) # Strip the leading slashes from both paths start_dir = start_dir.lstrip("/") source_path = parsed_source.path.lstrip("/") # posixpath doesn't play well with windows drive letters, so we have to use # the os-specific path library for the relpath function. This means we can # only handle windows paths on windows machines. relpath = make_posix_style(os.path.relpath(source_path, start_dir)) # Ensure we retain a trailing slash from the original source path if parsed_source.path.endswith("/"): relpath += "/" if relpath != "./" and not relpath.startswith("../"): relpath = "./" + relpath return relpath
[docs]def make_relative_href( source_href: str, start_href: str, start_is_dir: bool = False ) -> str: """Returns a new string that represents the ``source_href`` as a path relative to ``start_href``. If ``source_href`` and ``start_href`` do not share a common parent, then ``source_href`` is returned unchanged. May be used on either local file paths or URLs. Args: source_href : The HREF to make relative. start_href : The HREF that the resulting HREF will be relative to. start_is_dir : If ``True``, ``start_href`` is treated as a directory. Otherwise, ``start_href`` is considered to be a path to a file. Defaults to ``False``. Returns: str: The relative HREF. """ source_href = make_posix_style(source_href) start_href = make_posix_style(start_href) parsed_source = safe_urlparse(source_href) parsed_start = safe_urlparse(start_href) if not ( parsed_source.scheme == parsed_start.scheme and parsed_source.netloc == parsed_start.netloc ): return source_href if parsed_start.scheme == "": return _make_relative_href_path(parsed_source, parsed_start, start_is_dir) else: return _make_relative_href_url(parsed_source, parsed_start, start_is_dir)
def _make_absolute_href_url( parsed_source: URLParseResult, parsed_start: URLParseResult, start_is_dir: bool = False, ) -> str: # If the source is already absolute, just return it if parsed_source.scheme != "": return urlunparse(parsed_source) # If the start path is not a directory, get the parent directory if start_is_dir: start_dir = parsed_start.path else: # Ensure the directory has a trailing slash so urljoin works properly start_dir = parsed_start.path.rsplit("/", 1)[0] + "/" # Join the start directory to the relative path and find the absolute path abs_path = urljoin(start_dir, parsed_source.path) abs_path = abs_path.replace("\\", "/") return urlunparse( ( parsed_start.scheme, parsed_start.netloc, abs_path, parsed_source.params, parsed_source.query, parsed_source.fragment, ) ) def _make_absolute_href_path( parsed_source: URLParseResult, parsed_start: URLParseResult, start_is_dir: bool = False, ) -> str: # If the source is already absolute, just return it if os.path.isabs(parsed_source.path): return urlunparse(parsed_source) # If the start path is not a directory, get the parent directory start_dir = ( parsed_start.path if start_is_dir else os.path.dirname(parsed_start.path) ) # Join the start directory to the relative path and find the absolute path abs_path = make_posix_style( os.path.abspath(os.path.join(start_dir, parsed_source.path)) ) # Account for the normalization of abspath for # things like /vsitar// prefixes by replacing the # original start_dir text when abspath modifies the start_dir. if not start_dir == make_posix_style(os.path.abspath(start_dir)): abs_path = abs_path.replace( make_posix_style(os.path.abspath(start_dir)), start_dir ) return abs_path
[docs]def make_absolute_href( source_href: str, start_href: str | None = None, start_is_dir: bool = False ) -> str: """Returns a new string that represents ``source_href`` as an absolute path. If ``source_href`` is already absolute it is returned unchanged. If ``source_href`` is relative, the absolute HREF is constructed by joining ``source_href`` to ``start_href``. May be used on either local file paths or URLs. Args: source_href : The HREF to make absolute. start_href : The HREF that will be used as the basis for resolving relative paths, if ``source_href`` is a relative path. Defaults to the current working directory. start_is_dir : If ``True``, ``start_href`` is treated as a directory. Otherwise, ``start_href`` is considered to be a path to a file. Defaults to ``False``. Returns: str: The absolute HREF. """ if start_href is None: start_href = os.getcwd() start_is_dir = True source_href = make_posix_style(source_href) start_href = make_posix_style(start_href) parsed_start = safe_urlparse(start_href) parsed_source = safe_urlparse(source_href) if parsed_source.scheme != "" or parsed_start.scheme != "": return _make_absolute_href_url(parsed_source, parsed_start, start_is_dir) else: return _make_absolute_href_path(parsed_source, parsed_start, start_is_dir)
[docs]def is_absolute_href(href: str) -> bool: """Determines if an HREF is absolute or not. May be used on either local file paths or URLs. Args: href : The HREF to consider. Returns: bool: ``True`` if the given HREF is absolute, ``False`` if it is relative. """ parsed = safe_urlparse(href) return parsed.scheme != "" or os.path.isabs(parsed.path)
[docs]def datetime_to_str(dt: datetime, timespec: str = "auto") -> str: """Converts a :class:`datetime.datetime` instance to an ISO8601 string in the `RFC 3339, section 5.6 <https://datatracker.ietf.org/doc/html/rfc3339#section-5.6>`__ format required by the :stac-spec:`STAC Spec <master/item-spec/common-metadata.md#date-and-time>`. Args: dt : The datetime to convert. timespec: An optional argument that specifies the number of additional terms of the time to include. Valid options are 'auto', 'hours', 'minutes', 'seconds', 'milliseconds' and 'microseconds'. The default value is 'auto'. Returns: str: The ISO8601 (RFC 3339) formatted string representing the datetime. """ if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) timestamp = dt.isoformat(timespec=timespec) zulu = "+00:00" if timestamp.endswith(zulu): timestamp = f"{timestamp[: -len(zulu)]}Z" return timestamp
[docs]def str_to_datetime(s: str) -> datetime: """Converts a string timestamp to a :class:`datetime.datetime` instance using :meth:`dateutil.parser.parse` under the hood. The input string may be in any format supported by the parser.parser>. This includes many formats including ISO 8601 and RFC 3339. Args: s (str) : The string to convert to :class:`datetime.datetime`. Returns: str: The :class:`datetime.datetime` represented the by the string. """ return dateutil.parser.isoparse(s)
[docs]def now_in_utc() -> datetime: """Returns a datetime value of now with the UTC timezone applied""" return datetime.now(timezone.utc)
[docs]def now_to_rfc3339_str() -> str: """Returns an RFC 3339 string representing now""" return datetime_to_str(now_in_utc())
[docs]def geometry_to_bbox(geometry: dict[str, Any]) -> list[float]: """Extract the bounding box from a geojson geometry Args: geometry : GeoJSON geometry dict Returns: list: Bounding box of geojson geometry, formatted according to: https://tools.ietf.org/html/rfc7946#section-5 """ coords = geometry["coordinates"] lats: list[float] = [] lons: list[float] = [] def extract_coords(coords: list[list[float] | list[list[Any]]]) -> None: for x in coords: # This handles points if isinstance(x, float): assert isinstance( coords[0], float ), f"Type mismatch: {coords[0]} is not a float" assert isinstance( coords[1], float ), f"Type mismatch: {coords[1]} is not a float" lats.append(coords[0]) lons.append(coords[1]) return if isinstance(x[0], list): extract_coords(x) # type:ignore else: lat, lon = x lats.append(lat) # type:ignore lons.append(lon) # type:ignore extract_coords(coords) lons.sort() lats.sort() bbox = [lats[0], lons[0], lats[-1], lons[-1]] return bbox
T = TypeVar("T") U = TypeVar("U")
[docs]def map_opt(fn: Callable[[T], U], v: T | None) -> U | None: """Maps the value of an optional type to another value, returning ``None`` if the input option is ``None``. Args: fn (Callable) : A function that maps the non-optional value of type ``T`` to another value. This function will be called on non-``None`` values of ``v``. v (Optional[T]) : The optional value to map. Examples: Given an optional value like the following... .. code-block:: python maybe_item: Optional[pystac.Item] = ... ...you could replace... .. code-block:: python maybe_item_id: Optional[str] = None if maybe_item is not None: maybe_item_id = maybe_item.id ...with: .. code-block:: python maybe_item_id = map_opt(lambda item: item.id, maybe_item) """ return v if v is None else fn(v)
[docs]def get_opt(option: T | None) -> T: """Retrieves the value of the ``Optional`` type, raising a :exc:`ValueError` if the value is ``None``. Use this to get a properly typed value from an optional in contexts where you can be certain the value is not ``None``. If there is potential for a non-``None`` value, it's best to handle the ``None`` case of the optional instead of using this method. Args: option (Optional[T]) : Some ``Optional`` value Returns: The value of type T wrapped by the Optional[T] Examples: .. code-block:: python d = { "some_key": "some_value" } # This passes val: str = get_opt(d.get("some_key")) # This raises a ValueError val: str = get_opt(d.get("does_not_exist")) """ if option is None: raise ValueError("Cannot get value from None") return option
[docs]def get_required(option: T | None, obj: str | Any, prop: str) -> T: """Retrieves an ``Optional`` value that comes from a required property of some object. If the option is ``None``, throws an :exc:`pystac.RequiredPropertyMissing` with the given obj and property. This method is primarily used internally to retrieve properly typed required properties from dictionaries. For an example usage, see the :attr:`pystac.extensions.eo.Band.name` source code. Args: option (Optional[T]) : The ``Optional`` value. obj (str, Any) : The object from which the value is being retrieved. This will be passed to the :exc:`~pystac.RequiredPropertyMissing` exception if ``option`` is ``None``. prop (str) : The name of the property being retrieved. Returns: T : The properly typed, non-``None`` value. """ if option is None: raise RequiredPropertyMissing(obj, prop) return option
[docs]def is_file_path(href: str) -> bool: """Checks if an HREF resembles a file path. This method checks if the given HREF resembles a file path. It checks if the path ends with any kind of file extension and if true, assumes it is a file. Unlike `os.path.isfile()` it does NOT check the actual file. Caution: There are cases for which this method may return wrong results! Args: href (str) : The HREF to consider. Returns: bool: ``True`` if the given HREF resembles a file path, ``False`` if it does not. """ parsed = urlparse(href) return bool(os.path.splitext(parsed.path)[1])