from __future__ import annotations
import importlib.resources
import json
import numbers
from abc import abstractmethod
from collections.abc import Iterable
from copy import deepcopy
from enum import Enum
from functools import lru_cache
from typing import (
TYPE_CHECKING,
Any,
Generic,
Protocol,
TypeVar,
Union,
)
import pystac
from pystac.utils import get_required
if TYPE_CHECKING:
from pystac.collection import Collection
from pystac.item import Item
def __getattr__(name: str) -> Any:
if name == "FIELDS_JSON_URL":
import warnings
warnings.warn(
"FIELDS_JSON_URL is deprecated and will be removed in v2",
DeprecationWarning,
)
return (
"https://cdn.jsdelivr.net/npm/@radiantearth/"
"stac-fields/fields-normalized.json"
)
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
class _Comparable_x(Protocol):
"""Protocol for annotating comparable types.
For matching __lt__ that takes an 'x' parameter
(e.g. float)
"""
@abstractmethod
def __lt__(self: T, x: T) -> bool:
return NotImplemented
class _Comparable_other(Protocol):
"""Protocol for annotating comparable types.
For matching __lt___ that takes an 'other' parameter
(e.g. datetime)
"""
@abstractmethod
def __lt__(self: T, other: T) -> bool:
return NotImplemented
T = TypeVar("T", bound=Union[_Comparable_x, _Comparable_other])
[docs]
class RangeSummary(Generic[T]):
minimum: T
maximum: T
def __init__(self, minimum: T, maximum: T):
self.minimum = minimum
self.maximum = maximum
[docs]
def to_dict(self) -> dict[str, Any]:
return {"minimum": self.minimum, "maximum": self.maximum}
[docs]
def update_with_value(self, v: T) -> None:
self.minimum = min(self.minimum, v)
self.maximum = max(self.maximum, v)
[docs]
@classmethod
def from_dict(cls, d: dict[str, Any]) -> RangeSummary[T]:
minimum: T = get_required(d.get("minimum"), "RangeSummary", "minimum")
maximum: T = get_required(d.get("maximum"), "RangeSummary", "maximum")
return cls(minimum=minimum, maximum=maximum)
def __eq__(self, o: object) -> bool:
if not isinstance(o, RangeSummary):
return NotImplemented
return self.to_dict() == o.to_dict()
def __repr__(self) -> str:
return self.to_dict().__repr__()
@lru_cache(maxsize=None)
def _get_fields_json(url: str | None) -> dict[str, Any]:
if url is None:
# Every time pystac is released this file gets pulled from
# https://cdn.jsdelivr.net/npm/@radiantearth/stac-fields/fields-normalized.json
jsonfields: dict[str, Any] = json.loads(
importlib.resources.files("pystac.static")
.joinpath("fields-normalized.json")
.read_text()
)
return jsonfields
return pystac.StacIO.default().read_json(url)
[docs]
class SummaryStrategy(Enum):
ARRAY = "v"
RANGE = "r"
SCHEMA = "s"
DONT_SUMMARIZE = False
DEFAULT = True
[docs]
class Summarizer:
"""The Summarizer computes summaries from values, following the definition of fields
to summarize.
The fields to summarize can be provided as a JSON file or as a dictionary of
field names and SummaryStrategys. If nothing is provided, a default JSON file
will be used.
Only fields that are in the Item `properties` can be summarized.
Thus it is not possible to summarize the top-level fields such as `id` or `assets`.
For more information about the structure of the fields JSON file, see:
https://github.com/stac-utils/stac-fields
The default JSON file used is a snapshot of the following file at the time of
the pystac release:
https://cdn.jsdelivr.net/npm/@radiantearth/stac-fields/fields-normalized.json
Args:
fields: A string containing the path to the json file with field descriptions.
Alternatively, a dict with the field names as keys and SummaryStrategys
as values.
If nothing is passed, a default file with field descriptions will be used.
"""
summaryfields: dict[str, SummaryStrategy]
def __init__(self, fields: str | dict[str, SummaryStrategy] | None = None):
if isinstance(fields, dict):
self._set_field_definitions(fields)
else:
jsonfields = _get_fields_json(fields)
self._set_field_definitions(jsonfields["metadata"])
def _set_field_definitions(self, fields: dict[str, Any]) -> None:
self.summaryfields = {}
for name, desc in fields.items():
strategy: SummaryStrategy = SummaryStrategy.DEFAULT
if isinstance(desc, SummaryStrategy):
strategy = desc
elif isinstance(desc, dict):
strategy_value = desc.get("summary", True)
try:
strategy = SummaryStrategy(strategy_value)
except ValueError:
pass
if strategy != SummaryStrategy.DONT_SUMMARIZE:
self.summaryfields[name] = strategy
def _update_with_item(self, summaries: Summaries, item: Item) -> None:
for k, v in item.properties.items():
if k in self.summaryfields:
strategy = self.summaryfields[k]
if strategy == SummaryStrategy.RANGE or (
strategy == SummaryStrategy.DEFAULT
and isinstance(v, numbers.Number)
and not isinstance(v, bool)
):
rangesummary: RangeSummary[Any] | None = summaries.get_range(k)
if rangesummary is None:
summaries.add(k, RangeSummary(v, v))
else:
rangesummary.update_with_value(v)
elif strategy == SummaryStrategy.ARRAY or (
strategy == SummaryStrategy.DEFAULT and isinstance(v, list)
):
listsummary: list[Any] = summaries.get_list(k) or []
if not isinstance(v, list):
v = [v]
for element in v:
if element not in listsummary:
listsummary.append(element)
summaries.add(k, listsummary)
else:
summary: list[Any] = summaries.get_list(k) or []
if v not in summary:
summary.append(v)
summaries.add(k, summary)
[docs]
def summarize(self, source: Collection | Iterable[Item]) -> Summaries:
"""Creates summaries from items"""
summaries = Summaries.empty()
if isinstance(source, pystac.Collection):
for item in source.get_items(recursive=True):
self._update_with_item(summaries, item)
else:
for item in source:
self._update_with_item(summaries, item)
return summaries
DEFAULT_MAXCOUNT = 25
[docs]
class Summaries:
_summaries: dict[str, Any]
lists: dict[str, list[Any]]
other: dict[str, Any]
ranges: dict[str, RangeSummary[Any]]
schemas: dict[str, dict[str, Any]]
maxcount: int
def __init__(
self, summaries: dict[str, Any], maxcount: int = DEFAULT_MAXCOUNT
) -> None:
self._summaries = summaries
self.maxcount = maxcount
self.lists = {}
self.ranges = {}
self.schemas = {}
self.other = {}
for prop_key, summary in summaries.items():
self.add(prop_key, summary)
[docs]
def get_list(self, prop: str) -> list[Any] | None:
return self.lists.get(prop)
[docs]
def get_range(self, prop: str) -> RangeSummary[Any] | None:
return self.ranges.get(prop)
[docs]
def get_schema(self, prop: str) -> dict[str, Any] | None:
return self.schemas.get(prop)
[docs]
def add(
self,
prop_key: str,
summary: list[Any] | RangeSummary[Any] | dict[str, Any],
) -> None:
if isinstance(summary, list):
self.lists[prop_key] = summary
elif isinstance(summary, dict):
if "minimum" in summary:
self.ranges[prop_key] = RangeSummary[Any].from_dict(summary)
else:
self.schemas[prop_key] = summary
elif isinstance(summary, RangeSummary):
self.ranges[prop_key] = summary
else:
self.other[prop_key] = summary
[docs]
def remove(self, prop_key: str) -> None:
self.lists.pop(prop_key, None)
self.ranges.pop(prop_key, None)
self.schemas.pop(prop_key, None)
self.other.pop(prop_key, None)
[docs]
def update(self, summaries: Summaries) -> None:
self.lists.update(summaries.lists)
self.ranges.update(summaries.ranges)
self.schemas.update(summaries.schemas)
self.other.update(summaries.other)
[docs]
def combine(self, summaries: Summaries) -> None:
for listname, listvalue in summaries.lists.items():
if listname in self.lists:
self.lists[listname].extend(listvalue)
else:
self.lists[listname] = listvalue
for rangename, rang in summaries.ranges.items():
if rangename in self.ranges:
self.ranges[rangename].update_with_value(rang.minimum)
self.ranges[rangename].update_with_value(rang.maximum)
else:
self.ranges[rangename] = rang
for schemaname, schema in summaries.schemas.items():
if schemaname in self.schemas:
self.schemas[schemaname].update(schema)
else:
self.schemas[schemaname] = schema
for k, v in summaries.other.items():
if k in self.other:
self.other[k].update(v)
else:
self.other[k] = v
[docs]
def is_empty(self) -> bool:
return not (
any(self.lists) or any(self.ranges) or any(self.schemas) or any(self.other)
)
[docs]
def clone(self) -> Summaries:
"""Clones this object.
Returns:
Summaries: The clone of this object
"""
cls = self.__class__
summaries = cls(summaries=deepcopy(self._summaries), maxcount=self.maxcount)
summaries.lists = deepcopy(self.lists)
summaries.other = deepcopy(self.other)
summaries.ranges = deepcopy(self.ranges)
summaries.schemas = deepcopy(self.schemas)
return summaries
[docs]
def to_dict(self) -> dict[str, Any]:
return {
**{k: v for k, v in self.lists.items() if len(v) < self.maxcount},
**{k: v.to_dict() for k, v in self.ranges.items()},
**self.schemas,
**self.other,
}
[docs]
@classmethod
def empty(cls) -> Summaries:
return Summaries({})