""" Backend for ``alt.datasets.Loader``. Notes ----- Extending would be more ergonomic if `read`, `scan`, `_constraints` were available under a single export:: from altair.datasets import ext, reader import polars as pl impls = ( ext.read(pl.read_parquet, ext.is_parquet), ext.read(pl.read_csv, ext.is_csv), ext.read(pl.read_json, ext.is_json), ) user_reader = reader(impls) user_reader.dataset("airports") """ from __future__ import annotations from collections import Counter from collections.abc import Mapping from importlib import import_module from importlib.util import find_spec from itertools import chain from pathlib import Path from typing import TYPE_CHECKING, Any, ClassVar, Generic, Literal, cast, overload from urllib.request import build_opener as _build_opener from narwhals.stable import v1 as nw from narwhals.stable.v1.typing import IntoDataFrameT, IntoExpr from packaging.requirements import Requirement from altair.datasets import _readimpl from altair.datasets._cache import CsvCache, DatasetCache, SchemaCache, _iter_metadata from altair.datasets._constraints import is_parquet from altair.datasets._exceptions import AltairDatasetsError, module_not_found from altair.datasets._readimpl import IntoFrameT, is_available if TYPE_CHECKING: import sys from collections.abc import Callable, Sequence from urllib.request import OpenerDirector import pandas as pd import polars as pl import pyarrow as pa from altair.datasets._readimpl import BaseImpl, R, Read, Scan from altair.datasets._typing import Dataset, Extension, Metadata from altair.vegalite.v6.schema._typing import OneOrSeq if sys.version_info >= (3, 13): from typing import TypeIs, TypeVar else: from typing_extensions import TypeIs, TypeVar if sys.version_info >= (3, 12): from typing import Unpack else: from typing_extensions import Unpack if sys.version_info >= (3, 11): from typing import LiteralString else: from typing_extensions import LiteralString if sys.version_info >= (3, 10): from typing import TypeAlias else: from typing_extensions import TypeAlias _Polars: TypeAlias = Literal["polars"] _Pandas: TypeAlias = Literal["pandas"] _PyArrow: TypeAlias = Literal["pyarrow"] _PandasAny: TypeAlias = Literal[_Pandas, "pandas[pyarrow]"] _Backend: TypeAlias = Literal[_Polars, _PandasAny, _PyArrow] _CuDF: TypeAlias = Literal["cudf"] _Dask: TypeAlias = Literal["dask"] _DuckDB: TypeAlias = Literal["duckdb"] _Ibis: TypeAlias = Literal["ibis"] _PySpark: TypeAlias = Literal["pyspark"] _NwSupport: TypeAlias = Literal[ _Polars, _Pandas, _PyArrow, _CuDF, _Dask, _DuckDB, _Ibis, _PySpark ] _NwSupportT = TypeVar( "_NwSupportT", _Polars, _Pandas, _PyArrow, _CuDF, _Dask, _DuckDB, _Ibis, _PySpark, ) _SupportProfile: TypeAlias = Mapping[ Literal["supported", "unsupported"], "Sequence[Dataset]" ] """ Dataset support varies between backends and available dependencies. Any name listed in ``"unsupported"`` will raise an error on:: from altair.datasets import load load("7zip") Instead, they can be loaded via:: import altair as alt from altair.datasets import url alt.Chart(url("7zip")) """ class Reader(Generic[IntoDataFrameT, IntoFrameT]): """ Modular file reader, targeting remote & local tabular resources. .. warning:: Use ``reader(...)`` instead of instantiating ``Reader`` directly. """ _read: Sequence[Read[IntoDataFrameT]] """Eager file read functions.""" _scan: Sequence[Scan[IntoFrameT]] """Lazy file read functions.""" _name: str """ Used in error messages, repr and matching ``@overload``(s). Otherwise, has no concrete meaning. """ _implementation: nw.Implementation """ Corresponding `narwhals implementation`_. .. _narwhals implementation: https://github.com/narwhals-dev/narwhals/blob/9b6a355530ea46c590d5a6d1d0567be59c0b5742/narwhals/utils.py#L61-L290 """ _opener: ClassVar[OpenerDirector] = _build_opener() _metadata_path: ClassVar[Path] = ( Path(__file__).parent / "_metadata" / "metadata.parquet" ) def __init__( self, read: Sequence[Read[IntoDataFrameT]], scan: Sequence[Scan[IntoFrameT]], name: str, implementation: nw.Implementation, ) -> None: self._read = read self._scan = scan self._name = name self._implementation = implementation self._schema_cache = SchemaCache(implementation=implementation) def __repr__(self) -> str: from textwrap import indent PREFIX = " " * 4 NL = "\n" body = f"read\n{indent(NL.join(str(el) for el in self._read), PREFIX)}" if self._scan: body += f"\nscan\n{indent(NL.join(str(el) for el in self._scan), PREFIX)}" return f"Reader[{self._name}] {self._implementation!r}\n{body}" def read_fn(self, meta: Metadata, /) -> Callable[..., IntoDataFrameT]: return self._solve(meta, self._read) def scan_fn(self, meta: Metadata | Path | str, /) -> Callable[..., IntoFrameT]: meta = meta if isinstance(meta, Mapping) else {"suffix": _into_suffix(meta)} return self._solve(meta, self._scan) @property def cache(self) -> DatasetCache: return DatasetCache(self) def _handle_pyarrow_date_error(self, e: Exception, name: str) -> None: """Handle PyArrow date parsing errors with informative error messages, see https://github.com/apache/arrow/issues/41488.""" if "CSV conversion error to date" in str(e) and "pyarrow" in str( type(e).__module__ ): message = ( f"PyArrow cannot parse date format in dataset '{name}'. " f"This is a known limitation of PyArrow's CSV reader for non-ISO date formats.\n\n" f"Alternatives:\n" f"1. Use a different backend: data.{name}(engine='pandas') or data.{name}(engine='polars')\n" f"2. Convert dates manually after loading as strings\n\n" f"Original error: {e}" ) raise AltairDatasetsError(message) from e raise e def dataset( self, name: Dataset | LiteralString, suffix: Extension | None = None, /, **kwds: Any, ) -> IntoDataFrameT: frame = self._query(name, suffix) meta = next(_iter_metadata(frame)) fn = self.read_fn(meta) fn_kwds = self._merge_kwds(meta, kwds) if self.cache.is_active(): fp = self.cache._maybe_download(meta) try: return fn(fp, **fn_kwds) except Exception as e: self._handle_pyarrow_date_error(e, name) raise else: with self._opener.open(meta["url"]) as f: try: return fn(f, **fn_kwds) except Exception as e: self._handle_pyarrow_date_error(e, name) raise def url( self, name: Dataset | LiteralString, suffix: Extension | None = None, / ) -> str: frame = self._query(name, suffix) meta = next(_iter_metadata(frame)) if is_parquet(meta.items()) and not is_available("vegafusion"): raise AltairDatasetsError.from_url(meta) url = meta["url"] if isinstance(url, str): return url else: msg = f"Expected 'str' but got {type(url).__name__!r}\nfrom {url!r}." raise TypeError(msg) # TODO: (Multiple) # - Settle on a better name # - Add method to `Loader` # - Move docs to `Loader.{new name}` def open_markdown(self, name: Dataset, /) -> None: """ Learn more about a dataset, opening `vega-datasets/datapackage.md`_ with the default browser. Additional info *may* include: `description`_, `schema`_, `sources`_, `licenses`_. .. _vega-datasets/datapackage.md: https://github.com/vega/vega-datasets/blob/main/datapackage.md .. _description: https://datapackage.org/standard/data-resource/#description .. _schema: https://datapackage.org/standard/table-schema/#schema .. _sources: https://datapackage.org/standard/data-package/#sources .. _licenses: https://datapackage.org/standard/data-package/#licenses """ import webbrowser from altair.utils import VERSIONS ref = self._query(name).get_column("file_name").item(0).replace(".", "") tag = VERSIONS["vega-datasets"] url = f"https://github.com/vega/vega-datasets/blob/v{tag}/datapackage.md#{ref}" webbrowser.open(url) @overload def profile(self, *, show: Literal[False] = ...) -> _SupportProfile: ... @overload def profile(self, *, show: Literal[True]) -> None: ... def profile(self, *, show: bool = False) -> _SupportProfile | None: """ Describe which datasets can be loaded as tabular data. Parameters ---------- show Print a densely formatted repr *instead of* returning a mapping. """ relevant_columns = set( chain.from_iterable(impl._relevant_columns for impl in self._read) ) frame = self._scan_metadata().select("dataset_name", *relevant_columns) inc_expr = nw.any_horizontal(impl._include_expr for impl in self._read) result: _SupportProfile = { "unsupported": _dataset_names(frame, ~inc_expr), "supported": _dataset_names(frame, inc_expr), } if show: import pprint pprint.pprint(result, compact=True, sort_dicts=False) return None return result def _query( self, name: Dataset | LiteralString, suffix: Extension | None = None, / ) -> nw.DataFrame[IntoDataFrameT]: """ Query a tabular version of `vega-datasets/datapackage.json`_. Applies a filter, erroring out when no results would be returned. .. _vega-datasets/datapackage.json: https://github.com/vega/vega-datasets/blob/main/datapackage.json """ constraints = _into_constraints(name, suffix) frame = self._scan_metadata(**constraints).collect() if not frame.is_empty(): return frame else: msg = f"Found no results for:\n {constraints!r}" raise ValueError(msg) def _merge_kwds(self, meta: Metadata, kwds: dict[str, Any], /) -> Mapping[str, Any]: """ Extend user-provided arguments with dataset & library-specfic defaults. .. important:: User-provided arguments have a higher precedence. """ if self._schema_cache.is_active() and ( schema := self._schema_cache.schema_kwds(meta) ): kwds = schema | kwds if kwds else schema return kwds @property def _metadata_frame(self) -> nw.LazyFrame[IntoFrameT]: fp = self._metadata_path return nw.from_native(self.scan_fn(fp)(fp)).lazy() def _scan_metadata( self, *predicates: OneOrSeq[IntoExpr], **constraints: Unpack[Metadata] ) -> nw.LazyFrame[IntoFrameT]: if predicates or constraints: return self._metadata_frame.filter(*predicates, **constraints) return self._metadata_frame def _solve( self, meta: Metadata, impls: Sequence[BaseImpl[R]], / ) -> Callable[..., R]: """ Return the first function that satisfies dataset constraints. See Also -------- ``altair.datasets._readimpl.BaseImpl.unwrap_or_skip`` """ items = meta.items() it = (some for impl in impls if (some := impl.unwrap_or_skip(items))) if fn_or_err := next(it, None): if _is_err(fn_or_err): raise fn_or_err.from_tabular(meta, self._name) return fn_or_err raise AltairDatasetsError.from_tabular(meta, self._name) def _dataset_names( frame: nw.LazyFrame, *predicates: OneOrSeq[IntoExpr] ) -> Sequence[Dataset]: # NOTE: helper function for `Reader.profile` return ( frame.filter(*predicates) .select("dataset_name") .collect() .get_column("dataset_name") .to_list() ) class _NoParquetReader(Reader[IntoDataFrameT, IntoFrameT]): def __repr__(self) -> str: return f"{super().__repr__()}\ncsv_cache\n {self.csv_cache!r}" @property def csv_cache(self) -> CsvCache: if not hasattr(self, "_csv_cache"): self._csv_cache = CsvCache() return self._csv_cache @property def _metadata_frame(self) -> nw.LazyFrame[IntoFrameT]: data = cast("dict[str, Any]", self.csv_cache.rotated) impl = self._implementation return nw.maybe_convert_dtypes(nw.from_dict(data, backend=impl)).lazy() @overload def reader( read_fns: Sequence[Read[IntoDataFrameT]], scan_fns: tuple[()] = ..., *, name: str | None = ..., implementation: nw.Implementation = ..., ) -> Reader[IntoDataFrameT, nw.LazyFrame[IntoDataFrameT]]: ... @overload def reader( read_fns: Sequence[Read[IntoDataFrameT]], scan_fns: Sequence[Scan[IntoFrameT]], *, name: str | None = ..., implementation: nw.Implementation = ..., ) -> Reader[IntoDataFrameT, IntoFrameT]: ... def reader( read_fns: Sequence[Read[IntoDataFrameT]], scan_fns: Sequence[Scan[IntoFrameT]] = (), *, name: str | None = None, implementation: nw.Implementation = nw.Implementation.UNKNOWN, ) -> ( Reader[IntoDataFrameT, IntoFrameT] | Reader[IntoDataFrameT, nw.LazyFrame[IntoDataFrameT]] ): name = name or Counter(el._inferred_package for el in read_fns).most_common(1)[0][0] if implementation is nw.Implementation.UNKNOWN: implementation = _into_implementation(Requirement(name)) if scan_fns: return Reader(read_fns, scan_fns, name, implementation) if stolen := _steal_eager_parquet(read_fns): return Reader(read_fns, stolen, name, implementation) else: return _NoParquetReader[IntoDataFrameT](read_fns, (), name, implementation) def infer_backend( *, priority: Sequence[_Backend] = ("polars", "pandas[pyarrow]", "pandas", "pyarrow") ) -> Reader[Any, Any]: """ Return the first available reader in order of `priority`. Notes ----- - ``"polars"``: can natively load every dataset (including ``(Geo|Topo)JSON``) - ``"pandas[pyarrow]"``: can load *most* datasets, guarantees ``.parquet`` support - ``"pandas"``: supports ``.parquet``, if `fastparquet`_ is installed - ``"pyarrow"``: least reliable .. _fastparquet: https://github.com/dask/fastparquet """ it = (_from_backend(name) for name in priority if is_available(_requirements(name))) if reader := next(it, None): return reader raise AltairDatasetsError.from_priority(priority) @overload def _from_backend(name: _Polars, /) -> Reader[pl.DataFrame, pl.LazyFrame]: ... @overload def _from_backend(name: _PandasAny, /) -> Reader[pd.DataFrame, pd.DataFrame]: ... @overload def _from_backend(name: _PyArrow, /) -> Reader[pa.Table, pa.Table]: ... # FIXME: The order this is defined in makes splitting the module complicated # - Can't use a classmethod, since some result in a subclass used def _from_backend(name: _Backend, /) -> Reader[Any, Any]: """ Reader initialization dispatcher. FIXME: Works, but defining these in mixed shape functions seems off. """ if not _is_backend(name): msg = f"Unknown backend {name!r}" raise TypeError(msg) implementation = _into_implementation(name) if name == "polars": rd, sc = _readimpl.pl_only() return reader(rd, sc, name=name, implementation=implementation) elif name == "pandas[pyarrow]": return reader(_readimpl.pd_pyarrow(), name=name, implementation=implementation) elif name == "pandas": return reader(_readimpl.pd_only(), name=name, implementation=implementation) elif name == "pyarrow": return reader(_readimpl.pa_any(), name=name, implementation=implementation) def _is_backend(obj: Any) -> TypeIs[_Backend]: return obj in {"polars", "pandas", "pandas[pyarrow]", "pyarrow"} def _is_err(obj: Any) -> TypeIs[type[AltairDatasetsError]]: return obj is AltairDatasetsError def _into_constraints( name: Dataset | LiteralString, suffix: Extension | None, / ) -> Metadata: """Transform args into a mapping to column names.""" m: Metadata = {} if "." in name: m["file_name"] = name elif suffix is None: m["dataset_name"] = name elif suffix.startswith("."): m = {"dataset_name": name, "suffix": suffix} else: from typing import get_args from altair.datasets._typing import Extension msg = ( f"Expected 'suffix' to be one of {get_args(Extension)!r},\n" f"but got: {suffix!r}" ) raise TypeError(msg) return m def _into_implementation( backend: _NwSupport | _PandasAny | Requirement, / ) -> nw.Implementation: primary = _import_guarded(backend) impl = nw.Implementation.from_backend(primary) if impl is not nw.Implementation.UNKNOWN: return impl msg = f"Package {primary!r} is not supported by `narwhals`." raise ValueError(msg) def _into_suffix(obj: Path | str, /) -> Any: if isinstance(obj, Path): return obj.suffix elif isinstance(obj, str): return obj else: msg = f"Unexpected type {type(obj).__name__!r}" raise TypeError(msg) def _steal_eager_parquet( read_fns: Sequence[Read[IntoDataFrameT]], / ) -> Sequence[Scan[nw.LazyFrame[IntoDataFrameT]]] | None: if convertable := next((rd for rd in read_fns if rd.include <= is_parquet), None): return (_readimpl.into_scan(convertable),) return None @overload def _import_guarded(req: _PandasAny, /) -> _Pandas: ... @overload def _import_guarded(req: _NwSupportT, /) -> _NwSupportT: ... @overload def _import_guarded(req: Requirement, /) -> LiteralString: ... def _import_guarded(req: Any, /) -> LiteralString: requires = _requirements(req) for name in requires: if spec := find_spec(name): import_module(spec.name) else: raise module_not_found(str(req), requires, missing=name) return requires[0] def _requirements(req: Requirement | str, /) -> tuple[Any, ...]: req = Requirement(req) if isinstance(req, str) else req return (req.name, *req.extras)