Source code for pystac.stac_io

from __future__ import annotations

import json
import logging
import os
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any, Callable
from urllib.error import HTTPError
from urllib.request import Request, urlopen

import pystac
from pystac.serialization import (
    identify_stac_object,
    identify_stac_object_type,
    merge_common_properties,
    migrate_to_latest,
)
from pystac.utils import HREF, safe_urlparse

# Use orjson if available
try:
    import orjson
except ImportError:
    orjson = None  # type: ignore[assignment]

# Is urllib3 available?
try:
    import urllib3  # noqa
except ImportError:
    HAS_URLLIB3 = False
else:
    HAS_URLLIB3 = True

if TYPE_CHECKING:
    from pystac.catalog import Catalog
    from pystac.stac_object import STACObject


logger = logging.getLogger(__name__)


[docs]class StacIO(ABC): _default_io: Callable[[], StacIO] | None = None def __init__(self, headers: dict[str, str] | None = None): self.headers = headers or {}
[docs] @abstractmethod def read_text(self, source: HREF, *args: Any, **kwargs: Any) -> str: """Read text from the given URI. The source to read from can be specified as a string or :class:`os.PathLike` object (:class:`~pystac.Link` is a path-like object). If it is a string, it must be a URI or local path from which to read. Using a :class:`~pystac.Link` enables implementations to use additional link information, such as paging information contained in the extended links described in the `STAC API spec <https://github.com/radiantearth/stac-api-spec/tree/master/item-search#paging>`__. Args: source : The source to read from. *args : Arbitrary positional arguments that may be utilized by the concrete implementation. **kwargs : Arbitrary keyword arguments that may be utilized by the concrete implementation. Returns: str: The text contained in the file at the location specified by the uri. """ raise NotImplementedError
[docs] @abstractmethod def write_text( self, dest: HREF, txt: str, *args: Any, **kwargs: Any, ) -> None: """Write the given text to a file at the given URI. The destination to write to can be specified as a string or :class:`os.PathLike` object (:class:`~pystac.Link` is a path-like object). If it is a string, it must be a URI or local path from which to read. Using a :class:`~pystac.Link` enables implementations to use additional link information. Args: dest : The destination to write to. txt : The text to write. """ raise NotImplementedError
[docs] def json_loads(self, txt: str, *args: Any, **kwargs: Any) -> dict[str, Any]: """Method used internally by :class:`StacIO` instances to deserialize a dictionary from a JSON string. This method may be overwritten in :class:`StacIO` sub-classes to provide custom deserialization logic. The method accepts arbitrary keyword arguments. These are not used by the default implementation, but may be used by sub-class implementations. Args: txt : The JSON string to deserialize to a dictionary. """ result: dict[str, Any] if orjson is not None: result = orjson.loads(txt) else: result = json.loads(txt, *args, **kwargs) return result
[docs] def json_dumps(self, json_dict: dict[str, Any], *args: Any, **kwargs: Any) -> str: """Method used internally by :class:`StacIO` instances to serialize a dictionary to a JSON string. This method may be overwritten in :class:`StacIO` sub-classes to provide custom serialization logic. The method accepts arbitrary keyword arguments. These are not used by the default implementation, but may be used by sub-class implementations. Args: json_dict : The dictionary to serialize """ if orjson is not None: return orjson.dumps(json_dict, option=orjson.OPT_INDENT_2, **kwargs).decode( "utf-8" ) else: return json.dumps(json_dict, *args, indent=2, **kwargs)
[docs] def stac_object_from_dict( self, d: dict[str, Any], href: HREF | None = None, root: Catalog | None = None, preserve_dict: bool = True, ) -> STACObject: """Deserializes a :class:`~pystac.STACObject` sub-class instance from a dictionary. Args: d : The dictionary to deserialize href : Optional href to associate with the STAC object root : Optional root :class:`~pystac.Catalog` to associate with the STAC object. preserve_dict: If ``False``, the dict parameter ``d`` may be modified during this method call. Otherwise the dict is not mutated. Defaults to ``True``, which results results in a deepcopy of the parameter. Set to ``False`` when possible to avoid the performance hit of a deepcopy. """ href_str = None if href is None else str(os.fspath(href)) if identify_stac_object_type(d) == pystac.STACObjectType.ITEM: collection_cache = None if root is not None: collection_cache = root._resolved_objects.as_collection_cache() # Merge common properties in case this is an older STAC object. merge_common_properties( d, json_href=href_str, collection_cache=collection_cache ) info = identify_stac_object(d) d = migrate_to_latest(d, info) if info.object_type == pystac.STACObjectType.CATALOG: result = pystac.Catalog.from_dict( d, href=href_str, root=root, migrate=False, preserve_dict=preserve_dict ) result._stac_io = self return result if info.object_type == pystac.STACObjectType.COLLECTION: return pystac.Collection.from_dict( d, href=href_str, root=root, migrate=False, preserve_dict=preserve_dict ) if info.object_type == pystac.STACObjectType.ITEM: return pystac.Item.from_dict( d, href=href_str, root=root, migrate=False, preserve_dict=preserve_dict ) raise ValueError(f"Unknown STAC object type {info.object_type}")
[docs] def read_json(self, source: HREF, *args: Any, **kwargs: Any) -> dict[str, Any]: """Read a dict from the given source. See :func:`StacIO.read_text <pystac.StacIO.read_text>` for usage of str vs Link as a parameter. Args: source : The source from which to read. *args : Additional positional arguments to be passed to :meth:`StacIO.read_text`. **kwargs : Additional keyword arguments to be passed to :meth:`StacIO.read_text`. Returns: dict: A dict representation of the JSON contained in the file at the given source. """ txt = self.read_text(source, *args, **kwargs) return self.json_loads(txt)
[docs] def read_stac_object( self, source: HREF, root: Catalog | None = None, *args: Any, **kwargs: Any, ) -> STACObject: """Read a STACObject from a JSON file at the given source. See :func:`StacIO.read_text <pystac.StacIO.read_text>` for usage of str vs Link as a parameter. Args: source : The source from which to read. root : Optional root of the catalog for this object. If provided, the root's resolved object cache can be used to search for previously resolved instances of the STAC object. *args : Additional positional arguments to be passed to :meth:`StacIO.read_json`. **kwargs : Additional keyword arguments to be passed to :meth:`StacIO.read_json`. Returns: STACObject: The deserialized STACObject from the serialized JSON contained in the file at the given uri. """ d = self.read_json(source, *args, **kwargs) return self.stac_object_from_dict( d, href=source, root=root, preserve_dict=False )
[docs] def save_json( self, dest: HREF, json_dict: dict[str, Any], *args: Any, **kwargs: Any, ) -> None: """Write a dict to the given URI as JSON. See :func:`StacIO.write_text <pystac.StacIO.write_text>` for usage of str vs Link as a parameter. Args: dest : The destination file to write the text to. json_dict : The JSON dict to write. *args : Additional positional arguments to be passed to :meth:`StacIO.json_dumps`. **kwargs : Additional keyword arguments to be passed to :meth:`StacIO.json_dumps`. """ txt = self.json_dumps(json_dict, *args, **kwargs) self.write_text(dest, txt)
[docs] @classmethod def set_default(cls, stac_io_class: Callable[[], StacIO]) -> None: """Set the default StacIO instance to use.""" cls._default_io = stac_io_class
[docs] @classmethod def default(cls) -> StacIO: if cls._default_io is None: cls._default_io = DefaultStacIO return cls._default_io()
[docs]class DefaultStacIO(StacIO):
[docs] def read_text(self, source: HREF, *_: Any, **__: Any) -> str: """A concrete implementation of :meth:`StacIO.read_text <pystac.StacIO.read_text>`. Converts the ``source`` argument to a string (if it is not already) and delegates to :meth:`DefaultStacIO.read_text_from_href` for opening and reading the file.""" href = str(os.fspath(source)) return self.read_text_from_href(href)
[docs] def read_text_from_href(self, href: str) -> str: """Reads file as a UTF-8 string. If ``href`` has a "scheme" (e.g. if it starts with "https://") then this will use :func:`urllib.request.urlopen` to open the file and read the contents; otherwise, :func:`open` will be used to open a local file. Args: href : The URI of the file to open. """ href_contents: str if _is_url(href): try: logger.debug(f"GET {href} Headers: {self.headers}") req = Request(href, headers=self.headers) with urlopen(req) as f: href_contents = f.read().decode("utf-8") except HTTPError as e: raise Exception(f"Could not read uri {href}") from e else: with open(href, encoding="utf-8") as f: href_contents = f.read() return href_contents
[docs] def write_text(self, dest: HREF, txt: str, *_: Any, **__: Any) -> None: """A concrete implementation of :meth:`StacIO.write_text <pystac.StacIO.write_text>`. Converts the ``dest`` argument to a string (if it is not already) and delegates to :meth:`DefaultStacIO.write_text_from_href` for opening and reading the file.""" href = str(os.fspath(dest)) return self.write_text_to_href(href, txt)
[docs] def write_text_to_href(self, href: str, txt: str) -> None: """Writes text to file using UTF-8 encoding. This implementation uses :func:`open` and therefore can only write to the local file system. Args: href : The path to which the file will be written. txt : The string content to write to the file. """ if _is_url(href): raise NotImplementedError("DefaultStacIO cannot write to urls") href = os.fspath(href) dirname = os.path.dirname(href) if dirname != "" and not os.path.isdir(dirname): os.makedirs(dirname) with open(href, "w", encoding="utf-8") as f: f.write(txt)
[docs]class DuplicateKeyReportingMixin(StacIO): """A mixin for :class:`pystac.StacIO` implementations that will report on duplicate keys in the JSON being read in. See https://github.com/stac-utils/pystac/issues/313 """
[docs] def json_loads(self, txt: str, *_: Any, **__: Any) -> dict[str, Any]: """Overwrites :meth:`StacIO.json_loads <pystac.StacIO.json_loads>` as the internal method used by :class:`DuplicateKeyReportingMixin` for deserializing a JSON string to a dictionary while checking for duplicate object keys. Raises: pystac.DuplicateObjectKeyError : If a duplicate object key is found. """ result: dict[str, Any] = json.loads( txt, object_pairs_hook=self._report_duplicate_object_names ) return result
[docs] def read_json(self, source: HREF, *args: Any, **kwargs: Any) -> dict[str, Any]: """Overwrites :meth:`StacIO.read_json <pystac.StacIO.read_json>` for deserializing a JSON file to a dictionary while checking for duplicate object keys. Raises: pystac.DuplicateObjectKeyError : If a duplicate object key is found. """ txt = self.read_text(source, *args, **kwargs) try: return self.json_loads(txt, source=source) except pystac.DuplicateObjectKeyError as e: url = str(os.fspath(source)) msg = str(e) + f" in {url}" raise pystac.DuplicateObjectKeyError(msg)
@staticmethod def _report_duplicate_object_names( object_pairs: list[tuple[str, Any]] ) -> dict[str, Any]: result: dict[str, Any] = {} for key, value in object_pairs: if key in result: raise pystac.DuplicateObjectKeyError( f'Found duplicate object name "{key}"' ) else: result[key] = value return result
def _is_url(href: str) -> bool: parsed = safe_urlparse(href) return parsed.scheme != "" if HAS_URLLIB3: from typing import cast from urllib3 import PoolManager from urllib3.util import Retry
[docs] class RetryStacIO(DefaultStacIO): """A customized StacIO that retries requests, using :py:class:`urllib3.util.retry.Retry`. The headers are passed to :py:class:`DefaultStacIO`. If retry is not provided, a default retry is used. To use this class, you'll need to install PySTAC with urllib3: .. code-block:: shell pip install pystac[urllib3] """ retry: Retry """The :py:class:`urllib3.util.retry.Retry` to use with all reading network requests.""" def __init__( self, headers: dict[str, str] | None = None, retry: Retry | None = None, ): super().__init__(headers) self.retry = retry or Retry()
[docs] def read_text_from_href(self, href: str) -> str: """Reads file as a UTF-8 string, with retry support. Args: href : The URI of the file to open. """ if _is_url(href): # TODO provide a pooled StacIO to enable more efficient network # access (probably named `PooledStacIO`). http = PoolManager() try: response = http.request( "GET", href, retries=self.retry # type: ignore ) return cast(str, response.data.decode("utf-8")) except HTTPError as e: raise Exception(f"Could not read uri {href}") from e else: return super().read_text_from_href(href)