Source code for pystac.extensions.storage

"""Implements the Storage Extension.

https://github.com/stac-extensions/storage
"""

from __future__ import annotations

import re
import warnings
from typing import (
    Any,
    Generic,
    Literal,
    TypeVar,
    cast,
)

import pystac
from pystac.errors import RequiredPropertyMissing
from pystac.extensions.base import (
    ExtensionManagementMixin,
    PropertiesExtension,
    SummariesExtension,
)
from pystac.extensions.hooks import ExtensionHooks
from pystac.serialization.identify import STACJSONDescription, STACVersionID
from pystac.utils import StringEnum, get_required, map_opt

#: Generalized version of :class:`~pystac.Catalog`, :class:`~pystac.Collection`,
#: :class:`~pystac.Item`, :class:`~pystac.Asset`, :class:`~pystac.Link`,
#: or :class:`~pystac.ItemAssetDefinition`
T = TypeVar(
    "T",
    pystac.Catalog,
    pystac.Collection,
    pystac.Item,
    pystac.Asset,
    pystac.Link,
    pystac.ItemAssetDefinition,
)

SCHEMA_URI_PATTERN: str = (
    "https://stac-extensions.github.io/storage/v{version}/schema.json"
)
DEFAULT_VERSION: str = "2.0.0"
SUPPORTED_VERSIONS: list[str] = ["2.0.0", "1.0.0"]

PREFIX: str = "storage:"

# Field names
REFS_PROP: str = PREFIX + "refs"
SCHEMES_PROP: str = PREFIX + "schemes"

# Storage scheme object names
TYPE_PROP: str = "type"
PLATFORM_PROP: str = "platform"
REGION_PROP: str = "region"
REQUESTER_PAYS_PROP: str = "requester_pays"


[docs] class StorageSchemeType(StringEnum): AWS_S3 = "aws-s3" CUSTOM_S3 = "custom-s3" AZURE = "ms-azure"
[docs] class StorageScheme: """ Helper class for storage scheme objects. Can set well-defined properties, or if needed, any arbitrary property. """ _known_fields = {"type", "platform", "region", "requester_pays"} _properties: dict[str, Any] def __init__(self, properties: dict[str, Any]): super().__setattr__("_properties", properties) def __setattr__(self, name: str, value: Any) -> None: if hasattr(type(self), name): object.__setattr__(self, name, value) return if name in self._known_fields: prop = getattr(type(self), name) prop.fset(self, value) return props = object.__getattribute__(self, "_properties") props[name] = value def __getattr__(self, name: str) -> Any: props = object.__getattribute__(self, "_properties") if name in props: return props[name] raise AttributeError(name) def __eq__(self, other: object) -> bool: if not isinstance(other, StorageScheme): return NotImplemented return bool(self._properties == other._properties) def __repr__(self) -> str: return f"<StorageScheme platform={self.platform}>"
[docs] def apply( self, type: str, platform: str, region: str | None = None, requester_pays: bool | None = None, **kwargs: Any, ) -> None: self.type = type self.platform = platform self.region = region self.requester_pays = requester_pays self._properties.update(kwargs)
[docs] @classmethod def create( cls, type: str, platform: str, region: str | None = None, requester_pays: bool | None = None, **kwargs: Any, ) -> StorageScheme: """Set the properties for a new StorageScheme object. Additional properties can be set through kwargs to fulfill any additional variables in a templated uri. Args: type (str): Type identifier for the platform. platform (str): The cloud provider where data is stored as URI or URI template to the API. region (str | None): The region where the data is stored. Defaults to None. requester_pays (bool | None): requester pays or data manager/cloud provider pays. Defaults to None. kwargs (dict[str | Any]): Additional properties to set on scheme Returns: StorageScheme: storage scheme """ c = cls({}) c.apply( type=type, platform=platform, region=region, requester_pays=requester_pays, **kwargs, ) return c
@property def type(self) -> str: """ Get or set the required type property """ return cast( str, get_required( self._properties.get(TYPE_PROP), self, TYPE_PROP, ), ) @type.setter def type(self, v: str) -> None: self._properties[TYPE_PROP] = v @property def platform(self) -> str: """ Get or set the required platform property """ return cast( str, get_required( self._properties.get(PLATFORM_PROP), self, PLATFORM_PROP, ), ) @platform.setter def platform(self, v: str) -> None: self._properties[PLATFORM_PROP] = v @property def region(self) -> str | None: """ Get or set the optional region property """ return self._properties.get(REGION_PROP) @region.setter def region(self, v: str | None) -> None: if v is not None: self._properties[REGION_PROP] = v else: self._properties.pop(REGION_PROP, None) @property def requester_pays(self) -> bool | None: """ Get or set the optional requester_pays property """ return self._properties.get(REQUESTER_PAYS_PROP) @requester_pays.setter def requester_pays(self, v: bool | None) -> None: if v is not None: self._properties[REQUESTER_PAYS_PROP] = v else: self._properties.pop(REQUESTER_PAYS_PROP, None)
[docs] def to_dict(self) -> dict[str, Any]: """ Returns the dictionary encoding of this object Returns: dict[str, Any]: The dictionary encoding of this object """ return self._properties
[docs] class StorageExtension( Generic[T], PropertiesExtension, ExtensionManagementMixin[pystac.Item | pystac.Collection | pystac.Catalog], ): """An class that can be used to extend the properties of an :class:`~pystac.Catalog`, :class:`~pystac.Collection`, :class:`~pystac.Item`, :class:`~pystac.Asset`, :class:`~pystac.Link`, or :class:`~pystac.ItemAssetDefinition` with properties from the :stac-ext:`Storage Extension <storage>`. This class is generic over the type of STAC Object to be extended (e.g. :class:`~pystac.Item`, :class:`~pystac.Collection`). To create a concrete instance of :class:`StorageExtension`, use the :meth:`StorageExtension.ext` method. For example: .. code-block:: python >>> item: pystac.Item = ... >>> storage_ext = StorageExtension.ext(item) """ name: Literal["storage"] = "storage"
[docs] @classmethod def get_schema_uri(cls) -> str: return SCHEMA_URI_PATTERN.format(version=DEFAULT_VERSION)
# For type checking purposes only, these methods are overridden in mixins
[docs] def apply( self, *, schemes: dict[str, StorageScheme] | None = None, refs: list[str] | None = None, ) -> None: raise NotImplementedError()
@property def schemes(self) -> dict[str, StorageScheme]: raise NotImplementedError() @schemes.setter def schemes(self, v: dict[str, StorageScheme]) -> None: raise NotImplementedError()
[docs] def add_scheme(self, key: str, scheme: StorageScheme) -> None: raise NotImplementedError()
@property def refs(self) -> list[str]: raise NotImplementedError() @refs.setter def refs(self, v: list[str]) -> None: raise NotImplementedError()
[docs] def add_ref(self, ref: str) -> None: raise NotImplementedError()
[docs] @classmethod def ext(cls, obj: T, add_if_missing: bool = False) -> StorageExtension[T]: """Extends the given STAC Object with properties from the :stac-ext:`Storage Extension <storage>`. This extension can be applied to instances of :class:`~pystac.Catalog`, :class:`~pystac.Collection`, :class:`~pystac.Item`, :class:`~pystac.Asset`, :class:`~pystac.Link`, or :class:`~pystac.ItemAssetDefinition`. Raises: pystac.ExtensionTypeError : If an invalid object type is passed. """ if isinstance(obj, pystac.Item): cls.ensure_has_extension(obj, add_if_missing) return cast(StorageExtension[T], ItemStorageExtension(obj)) elif isinstance(obj, pystac.Collection): cls.ensure_has_extension(obj, add_if_missing) return cast(StorageExtension[T], CollectionStorageExtension(obj)) elif isinstance(obj, pystac.Catalog): cls.ensure_has_extension(obj, add_if_missing) return cast(StorageExtension[T], CatalogStorageExtension(obj)) elif isinstance(obj, pystac.Asset): cls.ensure_owner_has_extension(obj, add_if_missing) return cast(StorageExtension[T], AssetStorageExtension(obj)) elif isinstance(obj, pystac.Link): cls.ensure_owner_has_extension(obj, add_if_missing) return cast(StorageExtension[T], LinkStorageExtension(obj)) elif isinstance(obj, pystac.ItemAssetDefinition): cls.ensure_owner_has_extension(obj, add_if_missing) return cast(StorageExtension[T], ItemAssetsStorageExtension(obj)) else: raise pystac.ExtensionTypeError(cls._ext_error_message(obj))
[docs] @classmethod def summaries( cls, obj: pystac.Collection, add_if_missing: bool = False ) -> SummariesStorageExtension: """Returns the extended summaries object for the given collection.""" cls.ensure_has_extension(obj, add_if_missing) return SummariesStorageExtension(obj)
class _SchemesMixin: """Mixin for objects that support Storage Schemes (Items, Collections, Catalogs).""" properties: dict[str, Any] _set_property: Any def apply( self, *, schemes: dict[str, StorageScheme] | None = None, refs: list[str] | None = None, ) -> None: if refs is not None: raise ValueError("'refs' cannot be applied with this STAC object type.") if schemes is None: raise RequiredPropertyMissing( self, SCHEMES_PROP, "'schemes' property is required for this object type.", ) self.schemes = schemes @property def schemes(self) -> dict[str, StorageScheme]: schemes_dict: dict[str, Any] = get_required( self.properties.get(SCHEMES_PROP), self, SCHEMES_PROP ) return {k: StorageScheme(v) for k, v in schemes_dict.items()} @schemes.setter def schemes(self, v: dict[str, StorageScheme]) -> None: v_trans = {k: c.to_dict() for k, c in v.items()} self._set_property(SCHEMES_PROP, v_trans) def add_scheme(self, key: str, scheme: StorageScheme) -> None: current = self.properties.get(SCHEMES_PROP, {}) current[key] = scheme.to_dict() self._set_property(SCHEMES_PROP, current) class _RefsMixin: """Mixin for objects that support Storage Refs (Assets, Links).""" properties: dict[str, Any] _set_property: Any def apply( self, *, schemes: dict[str, StorageScheme] | None = None, refs: list[str] | None = None, ) -> None: if schemes is not None: raise ValueError("'schemes' cannot be applied with this STAC object type.") if refs is None: raise RequiredPropertyMissing( self, REFS_PROP, "'refs' property is required for this object type." ) self.refs = refs @property def refs(self) -> list[str]: return get_required(self.properties.get(REFS_PROP), self, REFS_PROP) @refs.setter def refs(self, v: list[str]) -> None: self._set_property(REFS_PROP, v) def add_ref(self, ref: str) -> None: try: current = self.refs if ref not in current: current.append(ref) self.refs = current except RequiredPropertyMissing: self.refs = [ref]
[docs] class ItemStorageExtension(_SchemesMixin, StorageExtension[pystac.Item]): def __init__(self, item: pystac.Item): self.item = item self.properties = item.properties def __repr__(self) -> str: return f"<ItemStorageExtension Item id={self.item.id}>"
[docs] class CatalogStorageExtension(_SchemesMixin, StorageExtension[pystac.Catalog]): """A concrete implementation of :class:`StorageExtension` on an :class:`~pystac.Catalog` that extends the properties of the Catalog to include properties defined in the :stac-ext:`Storage Extension <storage>`. This class should generally not be instantiated directly. Instead, call :meth:`StorageExtension.ext` on an :class:`~pystac.Catalog` to extend it. """ catalog: pystac.Catalog """The :class:`~pystac.Catalog` being extended.""" properties: dict[str, Any] """The :class:`~pystac.Catalog` properties, including extension properties.""" def __init__(self, catalog: pystac.Catalog): self.catalog = catalog self.properties = catalog.extra_fields def __repr__(self) -> str: return f"<CatalogStorageExtension Catalog id={self.catalog.id}>"
[docs] class CollectionStorageExtension(_SchemesMixin, StorageExtension[pystac.Collection]): """A concrete implementation of :class:`StorageExtension` on an :class:`~pystac.Collection` that extends the properties of the Collection to include properties defined in the :stac-ext:`Storage Extension <storage>`. This class should generally not be instantiated directly. Instead, call :meth:`StorageExtension.ext` on an :class:`~pystac.Collection` to extend it. """ collection: pystac.Collection """The :class:`~pystac.Collection` being extended.""" properties: dict[str, Any] """The :class:`~pystac.Collection` properties, including extension properties.""" def __init__(self, collection: pystac.Collection): self.collection = collection self.properties = collection.extra_fields def __repr__(self) -> str: return f"<CollectionStorageExtension Collection id={self.collection.id}>"
[docs] class AssetStorageExtension(_RefsMixin, StorageExtension[pystac.Asset]): """A concrete implementation of :class:`StorageExtension` on an :class:`~pystac.Asset` that extends the properties of the Asset to include properties defined in the :stac-ext:`Storage Extension <storage>`. This class should generally not be instantiated directly. Instead, call :meth:`StorageExtension.ext` on an :class:`~pystac.Asset` to extend it. """ asset: pystac.Asset """The :class:`~pystac.Asset` being extended.""" properties: dict[str, Any] """The :class:`~pystac.Asset` properties, including extension properties.""" def __init__(self, asset: pystac.Asset): self.asset = asset self.properties = asset.extra_fields def __repr__(self) -> str: return f"<AssetStorageExtension Asset href={self.asset.href}>"
[docs] class LinkStorageExtension(_RefsMixin, StorageExtension[pystac.Link]): """A concrete implementation of :class:`StorageExtension` on an :class:`~pystac.Link` that extends the properties of the Link to include properties defined in the :stac-ext:`Storage Extension <storage>`. This class should generally not be instantiated directly. Instead, call :meth:`StorageExtension.ext` on an :class:`~pystac.Link` to extend it. """ link: pystac.Link """The :class:`~pystac.Link` being extended.""" properties: dict[str, Any] """The :class:`~pystac.Link` properties, including extension properties.""" def __init__(self, link: pystac.Link): self.link = link self.properties = link.extra_fields def __repr__(self) -> str: return f"<LinkStorageExtension Link href={self.link.href}>"
[docs] class ItemAssetsStorageExtension( _RefsMixin, StorageExtension[pystac.ItemAssetDefinition] ): """A concrete implementation of :class:`StorageExtension` on an :class:`~pystac.ItemAssetDefinition` that extends the properties of the ItemAssetDefinition to include properties defined in the :stac-ext:`Storage Extension <storage>`. This class should generally not be instantiated directly. Instead, call :meth:`StorageExtension.ext` on an :class:`~pystac.ItemAssetDefinition` to extend it. """ item_asset: pystac.ItemAssetDefinition """The :class:`~pystac.ItemAssetDefinition` being extended.""" properties: dict[str, Any] """The :class:`~pystac.ItemAssetDefinition` properties, including extension properties.""" def __init__(self, item_asset: pystac.ItemAssetDefinition): self.item_asset = item_asset self.properties = item_asset.properties def __repr__(self) -> str: return f"<ItemAssetsStorageExtension ItemAssetDefinition={self.item_asset}>"
[docs] class SummariesStorageExtension(SummariesExtension): """A concrete implementation of :class:`~pystac.extensions.base.SummariesExtension` that extends the ``summaries`` field of a :class:`~pystac.Collection` to include properties defined in the :stac-ext:`Storage Extension <storage>`. """ @property def schemes(self) -> list[dict[str, StorageScheme]] | None: """Get or sets the summary of :attr:`StorageScheme.platform` values for this Collection. """ return map_opt( lambda schemes: [ {k: StorageScheme(v) for k, v in x.items()} for x in schemes ], self.summaries.get_list(SCHEMES_PROP), ) @schemes.setter def schemes(self, v: list[dict[str, StorageScheme]] | None) -> None: self._set_summary( SCHEMES_PROP, map_opt( lambda schemes: [ {k: c.to_dict() for k, c in x.items()} for x in schemes ], v, ), )
[docs] class StorageExtensionHooks(ExtensionHooks): schema_uri: str = SCHEMA_URI_PATTERN.format(version=DEFAULT_VERSION) prev_extension_ids = { SCHEMA_URI_PATTERN.format(version=v) for v in SUPPORTED_VERSIONS if v != DEFAULT_VERSION } stac_object_types = { pystac.STACObjectType.CATALOG, pystac.STACObjectType.COLLECTION, pystac.STACObjectType.ITEM, } # Mapping from v1.0.0 platform enum values to v2.0.0 type identifiers # Only AWS and Azure have defined v2.0.0 platform definitions _PLATFORM_TYPE_MAP: dict[str, str] = { "AWS": "aws-s3", "AZURE": "ms-azure", } # Mapping from v1.0.0 platform enum values to v2.0.0 platform URI templates _PLATFORM_URI_MAP: dict[str, str] = { "AWS": "https://{bucket}.s3.{region}.amazonaws.com", "AZURE": "https://{account}.blob.core.windows.net", } # Mapping from v1.0.0 platform enum values to scheme key prefixes _PLATFORM_KEY_PREFIX: dict[str, str] = { "AWS": "aws", "AZURE": "azure", } # Platforms that cannot be automatically migrated _UNSUPPORTED_PLATFORMS: set[str] = {"GCP", "IBM", "ALIBABA", "ORACLE", "OTHER"} # Regex patterns for parsing cloud storage URLs _S3_URL_PATTERN = re.compile(r"^s3://([^/]+)/") _AZURE_BLOB_PATTERN = re.compile(r"^https://([^.]+)\.blob\.core\.windows\.net/")
[docs] def migrate( self, obj: dict[str, Any], version: STACVersionID, info: STACJSONDescription ) -> None: if SCHEMA_URI_PATTERN.format(version="1.0.0") in info.extensions: props = obj.get("properties", obj) # v1 defined item level storage properties can # be used across all assets item_platform = props.get(PREFIX + "platform") item_region = props.get(PREFIX + "region") item_requester_pays = props.get(PREFIX + "requester_pays") item_tier = props.get(PREFIX + "tier") schemes: dict[str, dict[str, Any]] = {} scheme_hash_to_key: dict[int, str] = {} assets_with_tier: list[str] = [] assets_failed_parsing: list[str] = [] unsupported_platforms: set[str] = set() migrated_assets: list[str] = [] for asset_key, asset in obj.get("assets", {}).items(): platform = asset.get(PREFIX + "platform", item_platform) region = asset.get(PREFIX + "region", item_region) requester_pays = asset.get( PREFIX + "requester_pays", item_requester_pays ) tier = asset.get(PREFIX + "tier", item_tier) href = asset.get("href", "") if tier is not None: assets_with_tier.append(asset_key) # cannot migrate assets without a platform if platform is None: continue # cannot migrate assets with unsupported platforms platform_upper = platform.upper() if ( platform_upper in self._UNSUPPORTED_PLATFORMS or platform_upper not in self._PLATFORM_TYPE_MAP ): unsupported_platforms.add(platform_upper) continue scheme: dict[str, Any] = { "type": self._PLATFORM_TYPE_MAP[platform_upper], "platform": self._PLATFORM_URI_MAP[platform_upper], } if region is not None: scheme["region"] = region if requester_pays is not None: scheme["requester_pays"] = requester_pays # Parse bucket/account info from href if platform_upper == "AWS": if s3_match := self._S3_URL_PATTERN.match(href): scheme["bucket"] = s3_match.group(1) else: assets_failed_parsing.append(asset_key) continue elif platform_upper == "AZURE": if azure_match := self._AZURE_BLOB_PATTERN.match(href): scheme["account"] = azure_match.group(1) else: assets_failed_parsing.append(asset_key) continue # Deduplicate schemes by content hash scheme_hash = hash(frozenset(scheme.items())) if scheme_hash in scheme_hash_to_key: scheme_key = scheme_hash_to_key[scheme_hash] else: # Generate scheme key: provider-region or provider # if key would collide, appends an int suffix key_prefix = self._PLATFORM_KEY_PREFIX[platform_upper] base_key = ( f"{key_prefix}-{region.lower()}" if region else key_prefix ) scheme_key = base_key counter = 1 while scheme_key in schemes: scheme_key = f"{base_key}-{counter}" counter += 1 schemes[scheme_key] = scheme scheme_hash_to_key[scheme_hash] = scheme_key asset.pop(PREFIX + "platform", None) asset.pop(PREFIX + "region", None) asset.pop(PREFIX + "requester_pays", None) asset.pop(PREFIX + "tier", None) asset[REFS_PROP] = [scheme_key] migrated_assets.append(asset_key) if assets_with_tier: warnings.warn( "storage:tier was removed in storage extension v2.0.0 and cannot " f"be migrated. Property left in place for: {assets_with_tier}", UserWarning, ) if assets_failed_parsing: warnings.warn( "Could not parse bucket/account from href. " f"The following assets were not migrated: {assets_failed_parsing}", UserWarning, ) if unsupported_platforms: warnings.warn( "The following platforms cannot be automatically migrated to " f"storage extension v2.0.0: {unsupported_platforms}", UserWarning, ) # Only remove item-level properties if all assets were migrated if ( migrated_assets and not unsupported_platforms and not assets_failed_parsing ): props.pop(PREFIX + "platform", None) props.pop(PREFIX + "region", None) props.pop(PREFIX + "requester_pays", None) if schemes: props[SCHEMES_PROP] = schemes super().migrate(obj, version, info)
STORAGE_EXTENSION_HOOKS: ExtensionHooks = StorageExtensionHooks()