TIFNJK_E41221588/venv/Lib/site-packages/altair/datasets/_reader.py

573 lines
18 KiB
Python

"""
Backend for ``alt.datasets.Loader``.
Notes
-----
Extending would be more ergonomic if `read`, `scan`, `_constraints` were available under a single export::
from altair.datasets import ext, reader
import polars as pl
impls = (
ext.read(pl.read_parquet, ext.is_parquet),
ext.read(pl.read_csv, ext.is_csv),
ext.read(pl.read_json, ext.is_json),
)
user_reader = reader(impls)
user_reader.dataset("airports")
"""
from __future__ import annotations
from collections import Counter
from collections.abc import Mapping
from importlib import import_module
from importlib.util import find_spec
from itertools import chain
from pathlib import Path
from typing import TYPE_CHECKING, Any, ClassVar, Generic, Literal, cast, overload
from urllib.request import build_opener as _build_opener
from narwhals.stable import v1 as nw
from narwhals.stable.v1.typing import IntoDataFrameT, IntoExpr
from packaging.requirements import Requirement
from altair.datasets import _readimpl
from altair.datasets._cache import CsvCache, DatasetCache, SchemaCache, _iter_metadata
from altair.datasets._constraints import is_parquet
from altair.datasets._exceptions import AltairDatasetsError, module_not_found
from altair.datasets._readimpl import IntoFrameT, is_available
if TYPE_CHECKING:
import sys
from collections.abc import Callable, Sequence
from urllib.request import OpenerDirector
import pandas as pd
import polars as pl
import pyarrow as pa
from altair.datasets._readimpl import BaseImpl, R, Read, Scan
from altair.datasets._typing import Dataset, Extension, Metadata
from altair.vegalite.v6.schema._typing import OneOrSeq
if sys.version_info >= (3, 13):
from typing import TypeIs, TypeVar
else:
from typing_extensions import TypeIs, TypeVar
if sys.version_info >= (3, 12):
from typing import Unpack
else:
from typing_extensions import Unpack
if sys.version_info >= (3, 11):
from typing import LiteralString
else:
from typing_extensions import LiteralString
if sys.version_info >= (3, 10):
from typing import TypeAlias
else:
from typing_extensions import TypeAlias
_Polars: TypeAlias = Literal["polars"]
_Pandas: TypeAlias = Literal["pandas"]
_PyArrow: TypeAlias = Literal["pyarrow"]
_PandasAny: TypeAlias = Literal[_Pandas, "pandas[pyarrow]"]
_Backend: TypeAlias = Literal[_Polars, _PandasAny, _PyArrow]
_CuDF: TypeAlias = Literal["cudf"]
_Dask: TypeAlias = Literal["dask"]
_DuckDB: TypeAlias = Literal["duckdb"]
_Ibis: TypeAlias = Literal["ibis"]
_PySpark: TypeAlias = Literal["pyspark"]
_NwSupport: TypeAlias = Literal[
_Polars, _Pandas, _PyArrow, _CuDF, _Dask, _DuckDB, _Ibis, _PySpark
]
_NwSupportT = TypeVar(
"_NwSupportT",
_Polars,
_Pandas,
_PyArrow,
_CuDF,
_Dask,
_DuckDB,
_Ibis,
_PySpark,
)
_SupportProfile: TypeAlias = Mapping[
Literal["supported", "unsupported"], "Sequence[Dataset]"
]
"""
Dataset support varies between backends and available dependencies.
Any name listed in ``"unsupported"`` will raise an error on::
from altair.datasets import load
load("7zip")
Instead, they can be loaded via::
import altair as alt
from altair.datasets import url
alt.Chart(url("7zip"))
"""
class Reader(Generic[IntoDataFrameT, IntoFrameT]):
"""
Modular file reader, targeting remote & local tabular resources.
.. warning::
Use ``reader(...)`` instead of instantiating ``Reader`` directly.
"""
_read: Sequence[Read[IntoDataFrameT]]
"""Eager file read functions."""
_scan: Sequence[Scan[IntoFrameT]]
"""Lazy file read functions."""
_name: str
"""
Used in error messages, repr and matching ``@overload``(s).
Otherwise, has no concrete meaning.
"""
_implementation: nw.Implementation
"""
Corresponding `narwhals implementation`_.
.. _narwhals implementation:
https://github.com/narwhals-dev/narwhals/blob/9b6a355530ea46c590d5a6d1d0567be59c0b5742/narwhals/utils.py#L61-L290
"""
_opener: ClassVar[OpenerDirector] = _build_opener()
_metadata_path: ClassVar[Path] = (
Path(__file__).parent / "_metadata" / "metadata.parquet"
)
def __init__(
self,
read: Sequence[Read[IntoDataFrameT]],
scan: Sequence[Scan[IntoFrameT]],
name: str,
implementation: nw.Implementation,
) -> None:
self._read = read
self._scan = scan
self._name = name
self._implementation = implementation
self._schema_cache = SchemaCache(implementation=implementation)
def __repr__(self) -> str:
from textwrap import indent
PREFIX = " " * 4
NL = "\n"
body = f"read\n{indent(NL.join(str(el) for el in self._read), PREFIX)}"
if self._scan:
body += f"\nscan\n{indent(NL.join(str(el) for el in self._scan), PREFIX)}"
return f"Reader[{self._name}] {self._implementation!r}\n{body}"
def read_fn(self, meta: Metadata, /) -> Callable[..., IntoDataFrameT]:
return self._solve(meta, self._read)
def scan_fn(self, meta: Metadata | Path | str, /) -> Callable[..., IntoFrameT]:
meta = meta if isinstance(meta, Mapping) else {"suffix": _into_suffix(meta)}
return self._solve(meta, self._scan)
@property
def cache(self) -> DatasetCache:
return DatasetCache(self)
def _handle_pyarrow_date_error(self, e: Exception, name: str) -> None:
"""Handle PyArrow date parsing errors with informative error messages, see https://github.com/apache/arrow/issues/41488."""
if "CSV conversion error to date" in str(e) and "pyarrow" in str(
type(e).__module__
):
message = (
f"PyArrow cannot parse date format in dataset '{name}'. "
f"This is a known limitation of PyArrow's CSV reader for non-ISO date formats.\n\n"
f"Alternatives:\n"
f"1. Use a different backend: data.{name}(engine='pandas') or data.{name}(engine='polars')\n"
f"2. Convert dates manually after loading as strings\n\n"
f"Original error: {e}"
)
raise AltairDatasetsError(message) from e
raise e
def dataset(
self,
name: Dataset | LiteralString,
suffix: Extension | None = None,
/,
**kwds: Any,
) -> IntoDataFrameT:
frame = self._query(name, suffix)
meta = next(_iter_metadata(frame))
fn = self.read_fn(meta)
fn_kwds = self._merge_kwds(meta, kwds)
if self.cache.is_active():
fp = self.cache._maybe_download(meta)
try:
return fn(fp, **fn_kwds)
except Exception as e:
self._handle_pyarrow_date_error(e, name)
raise
else:
with self._opener.open(meta["url"]) as f:
try:
return fn(f, **fn_kwds)
except Exception as e:
self._handle_pyarrow_date_error(e, name)
raise
def url(
self, name: Dataset | LiteralString, suffix: Extension | None = None, /
) -> str:
frame = self._query(name, suffix)
meta = next(_iter_metadata(frame))
if is_parquet(meta.items()) and not is_available("vegafusion"):
raise AltairDatasetsError.from_url(meta)
url = meta["url"]
if isinstance(url, str):
return url
else:
msg = f"Expected 'str' but got {type(url).__name__!r}\nfrom {url!r}."
raise TypeError(msg)
# TODO: (Multiple)
# - Settle on a better name
# - Add method to `Loader`
# - Move docs to `Loader.{new name}`
def open_markdown(self, name: Dataset, /) -> None:
"""
Learn more about a dataset, opening `vega-datasets/datapackage.md`_ with the default browser.
Additional info *may* include: `description`_, `schema`_, `sources`_, `licenses`_.
.. _vega-datasets/datapackage.md:
https://github.com/vega/vega-datasets/blob/main/datapackage.md
.. _description:
https://datapackage.org/standard/data-resource/#description
.. _schema:
https://datapackage.org/standard/table-schema/#schema
.. _sources:
https://datapackage.org/standard/data-package/#sources
.. _licenses:
https://datapackage.org/standard/data-package/#licenses
"""
import webbrowser
from altair.utils import VERSIONS
ref = self._query(name).get_column("file_name").item(0).replace(".", "")
tag = VERSIONS["vega-datasets"]
url = f"https://github.com/vega/vega-datasets/blob/v{tag}/datapackage.md#{ref}"
webbrowser.open(url)
@overload
def profile(self, *, show: Literal[False] = ...) -> _SupportProfile: ...
@overload
def profile(self, *, show: Literal[True]) -> None: ...
def profile(self, *, show: bool = False) -> _SupportProfile | None:
"""
Describe which datasets can be loaded as tabular data.
Parameters
----------
show
Print a densely formatted repr *instead of* returning a mapping.
"""
relevant_columns = set(
chain.from_iterable(impl._relevant_columns for impl in self._read)
)
frame = self._scan_metadata().select("dataset_name", *relevant_columns)
inc_expr = nw.any_horizontal(impl._include_expr for impl in self._read)
result: _SupportProfile = {
"unsupported": _dataset_names(frame, ~inc_expr),
"supported": _dataset_names(frame, inc_expr),
}
if show:
import pprint
pprint.pprint(result, compact=True, sort_dicts=False)
return None
return result
def _query(
self, name: Dataset | LiteralString, suffix: Extension | None = None, /
) -> nw.DataFrame[IntoDataFrameT]:
"""
Query a tabular version of `vega-datasets/datapackage.json`_.
Applies a filter, erroring out when no results would be returned.
.. _vega-datasets/datapackage.json:
https://github.com/vega/vega-datasets/blob/main/datapackage.json
"""
constraints = _into_constraints(name, suffix)
frame = self._scan_metadata(**constraints).collect()
if not frame.is_empty():
return frame
else:
msg = f"Found no results for:\n {constraints!r}"
raise ValueError(msg)
def _merge_kwds(self, meta: Metadata, kwds: dict[str, Any], /) -> Mapping[str, Any]:
"""
Extend user-provided arguments with dataset & library-specfic defaults.
.. important:: User-provided arguments have a higher precedence.
"""
if self._schema_cache.is_active() and (
schema := self._schema_cache.schema_kwds(meta)
):
kwds = schema | kwds if kwds else schema
return kwds
@property
def _metadata_frame(self) -> nw.LazyFrame[IntoFrameT]:
fp = self._metadata_path
return nw.from_native(self.scan_fn(fp)(fp)).lazy()
def _scan_metadata(
self, *predicates: OneOrSeq[IntoExpr], **constraints: Unpack[Metadata]
) -> nw.LazyFrame[IntoFrameT]:
if predicates or constraints:
return self._metadata_frame.filter(*predicates, **constraints)
return self._metadata_frame
def _solve(
self, meta: Metadata, impls: Sequence[BaseImpl[R]], /
) -> Callable[..., R]:
"""
Return the first function that satisfies dataset constraints.
See Also
--------
``altair.datasets._readimpl.BaseImpl.unwrap_or_skip``
"""
items = meta.items()
it = (some for impl in impls if (some := impl.unwrap_or_skip(items)))
if fn_or_err := next(it, None):
if _is_err(fn_or_err):
raise fn_or_err.from_tabular(meta, self._name)
return fn_or_err
raise AltairDatasetsError.from_tabular(meta, self._name)
def _dataset_names(
frame: nw.LazyFrame, *predicates: OneOrSeq[IntoExpr]
) -> Sequence[Dataset]:
# NOTE: helper function for `Reader.profile`
return (
frame.filter(*predicates)
.select("dataset_name")
.collect()
.get_column("dataset_name")
.to_list()
)
class _NoParquetReader(Reader[IntoDataFrameT, IntoFrameT]):
def __repr__(self) -> str:
return f"{super().__repr__()}\ncsv_cache\n {self.csv_cache!r}"
@property
def csv_cache(self) -> CsvCache:
if not hasattr(self, "_csv_cache"):
self._csv_cache = CsvCache()
return self._csv_cache
@property
def _metadata_frame(self) -> nw.LazyFrame[IntoFrameT]:
data = cast("dict[str, Any]", self.csv_cache.rotated)
impl = self._implementation
return nw.maybe_convert_dtypes(nw.from_dict(data, backend=impl)).lazy()
@overload
def reader(
read_fns: Sequence[Read[IntoDataFrameT]],
scan_fns: tuple[()] = ...,
*,
name: str | None = ...,
implementation: nw.Implementation = ...,
) -> Reader[IntoDataFrameT, nw.LazyFrame[IntoDataFrameT]]: ...
@overload
def reader(
read_fns: Sequence[Read[IntoDataFrameT]],
scan_fns: Sequence[Scan[IntoFrameT]],
*,
name: str | None = ...,
implementation: nw.Implementation = ...,
) -> Reader[IntoDataFrameT, IntoFrameT]: ...
def reader(
read_fns: Sequence[Read[IntoDataFrameT]],
scan_fns: Sequence[Scan[IntoFrameT]] = (),
*,
name: str | None = None,
implementation: nw.Implementation = nw.Implementation.UNKNOWN,
) -> (
Reader[IntoDataFrameT, IntoFrameT]
| Reader[IntoDataFrameT, nw.LazyFrame[IntoDataFrameT]]
):
name = name or Counter(el._inferred_package for el in read_fns).most_common(1)[0][0]
if implementation is nw.Implementation.UNKNOWN:
implementation = _into_implementation(Requirement(name))
if scan_fns:
return Reader(read_fns, scan_fns, name, implementation)
if stolen := _steal_eager_parquet(read_fns):
return Reader(read_fns, stolen, name, implementation)
else:
return _NoParquetReader[IntoDataFrameT](read_fns, (), name, implementation)
def infer_backend(
*, priority: Sequence[_Backend] = ("polars", "pandas[pyarrow]", "pandas", "pyarrow")
) -> Reader[Any, Any]:
"""
Return the first available reader in order of `priority`.
Notes
-----
- ``"polars"``: can natively load every dataset (including ``(Geo|Topo)JSON``)
- ``"pandas[pyarrow]"``: can load *most* datasets, guarantees ``.parquet`` support
- ``"pandas"``: supports ``.parquet``, if `fastparquet`_ is installed
- ``"pyarrow"``: least reliable
.. _fastparquet:
https://github.com/dask/fastparquet
"""
it = (_from_backend(name) for name in priority if is_available(_requirements(name)))
if reader := next(it, None):
return reader
raise AltairDatasetsError.from_priority(priority)
@overload
def _from_backend(name: _Polars, /) -> Reader[pl.DataFrame, pl.LazyFrame]: ...
@overload
def _from_backend(name: _PandasAny, /) -> Reader[pd.DataFrame, pd.DataFrame]: ...
@overload
def _from_backend(name: _PyArrow, /) -> Reader[pa.Table, pa.Table]: ...
# FIXME: The order this is defined in makes splitting the module complicated
# - Can't use a classmethod, since some result in a subclass used
def _from_backend(name: _Backend, /) -> Reader[Any, Any]:
"""
Reader initialization dispatcher.
FIXME: Works, but defining these in mixed shape functions seems off.
"""
if not _is_backend(name):
msg = f"Unknown backend {name!r}"
raise TypeError(msg)
implementation = _into_implementation(name)
if name == "polars":
rd, sc = _readimpl.pl_only()
return reader(rd, sc, name=name, implementation=implementation)
elif name == "pandas[pyarrow]":
return reader(_readimpl.pd_pyarrow(), name=name, implementation=implementation)
elif name == "pandas":
return reader(_readimpl.pd_only(), name=name, implementation=implementation)
elif name == "pyarrow":
return reader(_readimpl.pa_any(), name=name, implementation=implementation)
def _is_backend(obj: Any) -> TypeIs[_Backend]:
return obj in {"polars", "pandas", "pandas[pyarrow]", "pyarrow"}
def _is_err(obj: Any) -> TypeIs[type[AltairDatasetsError]]:
return obj is AltairDatasetsError
def _into_constraints(
name: Dataset | LiteralString, suffix: Extension | None, /
) -> Metadata:
"""Transform args into a mapping to column names."""
m: Metadata = {}
if "." in name:
m["file_name"] = name
elif suffix is None:
m["dataset_name"] = name
elif suffix.startswith("."):
m = {"dataset_name": name, "suffix": suffix}
else:
from typing import get_args
from altair.datasets._typing import Extension
msg = (
f"Expected 'suffix' to be one of {get_args(Extension)!r},\n"
f"but got: {suffix!r}"
)
raise TypeError(msg)
return m
def _into_implementation(
backend: _NwSupport | _PandasAny | Requirement, /
) -> nw.Implementation:
primary = _import_guarded(backend)
impl = nw.Implementation.from_backend(primary)
if impl is not nw.Implementation.UNKNOWN:
return impl
msg = f"Package {primary!r} is not supported by `narwhals`."
raise ValueError(msg)
def _into_suffix(obj: Path | str, /) -> Any:
if isinstance(obj, Path):
return obj.suffix
elif isinstance(obj, str):
return obj
else:
msg = f"Unexpected type {type(obj).__name__!r}"
raise TypeError(msg)
def _steal_eager_parquet(
read_fns: Sequence[Read[IntoDataFrameT]], /
) -> Sequence[Scan[nw.LazyFrame[IntoDataFrameT]]] | None:
if convertable := next((rd for rd in read_fns if rd.include <= is_parquet), None):
return (_readimpl.into_scan(convertable),)
return None
@overload
def _import_guarded(req: _PandasAny, /) -> _Pandas: ...
@overload
def _import_guarded(req: _NwSupportT, /) -> _NwSupportT: ...
@overload
def _import_guarded(req: Requirement, /) -> LiteralString: ...
def _import_guarded(req: Any, /) -> LiteralString:
requires = _requirements(req)
for name in requires:
if spec := find_spec(name):
import_module(spec.name)
else:
raise module_not_found(str(req), requires, missing=name)
return requires[0]
def _requirements(req: Requirement | str, /) -> tuple[Any, ...]:
req = Requirement(req) if isinstance(req, str) else req
return (req.name, *req.extras)