Skip to content

Commit

Permalink
Merge pull request #49 from lincc-frameworks/from_flat_and_lists
Browse files Browse the repository at this point in the history
Wrap `from_lists` and `from_flat`
  • Loading branch information
dougbrn authored Aug 27, 2024
2 parents 24fb348 + 745b5d1 commit cb71a92
Show file tree
Hide file tree
Showing 4 changed files with 311 additions and 16 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ classifiers = [
dynamic = ["version"]
requires-python = ">=3.9"
dependencies = [
'nested-pandas==0.1.3',
'nested-pandas==0.2.1',
'numpy',
'dask>=2024.3.0',
'dask[distributed]>=2024.3.0',
Expand Down
169 changes: 154 additions & 15 deletions src/nested_dask/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,18 @@ def _rebuild(self, graph, func, args): # type: ignore
return collection


def _nested_meta_from_flat(flat, name):
"""construct meta for a packed series from a flat dataframe"""
pd_fields = flat.dtypes.to_dict() # grabbing pandas dtypes
pyarrow_fields = {} # grab underlying pyarrow dtypes
for field, dtype in pd_fields.items():
if hasattr(dtype, "pyarrow_dtype"):
pyarrow_fields[field] = dtype.pyarrow_dtype
else: # or convert from numpy types
pyarrow_fields[field] = pa.from_numpy_dtype(dtype)
return pd.Series(name=name, dtype=NestedDtype.from_fields(pyarrow_fields))


class NestedFrame(
_Frame, dd.DataFrame
): # can use dd.DataFrame instead of dx.DataFrame if the config is set true (default in >=2024.3.0)
Expand All @@ -70,17 +82,6 @@ def __getitem__(self, item):
else:
return super().__getitem__(item)

def _nested_meta_from_flat(self, flat, name):
"""construct meta for a packed series from a flat dataframe"""
pd_fields = flat.dtypes.to_dict() # grabbing pandas dtypes
pyarrow_fields = {} # grab underlying pyarrow dtypes
for field, dtype in pd_fields.items():
if hasattr(dtype, "pyarrow_dtype"):
pyarrow_fields[field] = dtype.pyarrow_dtype
else: # or convert from numpy types
pyarrow_fields[field] = pa.from_numpy_dtype(dtype)
return pd.Series(name=name, dtype=NestedDtype.from_fields(pyarrow_fields))

def __setitem__(self, key, value):
"""Adds custom __setitem__ behavior for nested columns"""

Expand All @@ -102,8 +103,8 @@ def __setitem__(self, key, value):
new_flat = new_flat.astype({col: pd.ArrowDtype(pa.string())})

# pack the modified df back into a nested column
meta = self._nested_meta_from_flat(new_flat, nested)
packed = new_flat.map_partitions(lambda x: pack(x), meta=meta)
meta = _nested_meta_from_flat(new_flat, nested)
packed = new_flat.map_partitions(lambda x: pack(x, dtype=meta.dtype), meta=meta)
return super().__setitem__(nested, packed)

# Adding a new nested structure from a column
Expand All @@ -114,8 +115,8 @@ def __setitem__(self, key, value):
value.name = col
value = value.to_frame()

meta = self._nested_meta_from_flat(value, new_nested)
packed = value.map_partitions(lambda x: pack(x), meta=meta)
meta = _nested_meta_from_flat(value, new_nested)
packed = value.map_partitions(lambda x: pack(x, dtype=meta.dtype), meta=meta)
return super().__setitem__(new_nested, packed)

return super().__setitem__(key, value)
Expand Down Expand Up @@ -280,6 +281,144 @@ def from_map(
)
return NestedFrame.from_dask_dataframe(nf)

@classmethod
def from_flat(cls, df, base_columns, nested_columns=None, index=None, name="nested"):
"""Creates a NestedFrame with base and nested columns from a flat
dataframe.
Parameters
----------
df: dd.DataFrame or nd.NestedFrame
A flat dataframe.
base_columns: list-like
The columns that should be used as base (flat) columns in the
output dataframe.
nested_columns: list-like, or None
The columns that should be packed into a nested column. All columns
in the list will attempt to be packed into a single nested column
with the name provided in `nested_name`. If None, is defined as all
columns not in `base_columns`.
index: str, or None
The name of a column to use as the new index. Typically, the index
should have a unique value per row for base columns, and should
repeat for nested columns. For example, a dataframe with two
columns; a=[1,1,1,2,2,2] and b=[5,10,15,20,25,30] would want an
index like [0,0,0,1,1,1] if a is chosen as a base column. If not
provided the current index will be used.
name:
The name of the output column the `nested_columns` are packed into.
Returns
-------
NestedFrame
A NestedFrame with the specified nesting structure.
"""

# Handle meta
meta = npd.NestedFrame(df[base_columns]._meta)

if nested_columns is None:
nested_columns = [col for col in df.columns if (col not in base_columns) and col != index]

if len(nested_columns) > 0:
nested_meta = pack(df[nested_columns]._meta, name)
meta = meta.join(nested_meta)

return df.map_partitions(
lambda x: npd.NestedFrame.from_flat(
df=x, base_columns=base_columns, nested_columns=nested_columns, index=index, name=name
),
meta=meta,
)

@classmethod
def from_lists(cls, df, base_columns=None, list_columns=None, name="nested"):
"""Creates a NestedFrame with base and nested columns from a flat
dataframe.
Parameters
----------
df: dd.DataFrame or nd.NestedFrame
A dataframe with list columns.
base_columns: list-like, or None
Any columns that have non-list values in the input df. These will
simply be kept as identical columns in the result
list_columns: list-like, or None
The list-value columns that should be packed into a nested column.
All columns in the list will attempt to be packed into a single
nested column with the name provided in `nested_name`. All columns
in list_columns must have pyarrow list dtypes, otherwise the
operation will fail. If None, is defined as all columns not in
`base_columns`.
name:
The name of the output column the `nested_columns` are packed into.
Returns
-------
NestedFrame
A NestedFrame with the specified nesting structure.
Note
----
As noted above, all columns in `list_columns` must have a pyarrow
ListType dtype. This is needed for proper meta propagation. To convert
a list column to this dtype, you can use this command structure:
`nf= nf.astype({"colname": pd.ArrowDtype(pa.list_(pa.int64()))})`
Where pa.int64 above should be replaced with the correct dtype of the
underlying data accordingly.
Additionally, it's a known issue in Dask
(https://github.com/dask/dask/issues/10139) that columns with list
values will by default be converted to the string type. This will
interfere with the ability to recast these to pyarrow lists. We
recommend setting the following dask config setting to prevent this:
`dask.config.set({"dataframe.convert-string":False})`
"""

# Resolve inputs for meta
if base_columns is None:
if list_columns is None:
# with no inputs, assume all columns are list-valued
list_columns = df.columns
else:
# if list_columns are defined, assume everything else is base
base_columns = [col for col in df.columns if col not in list_columns]
else:
if list_columns is None:
# with defined base_columns, assume everything else is list
list_columns = [col for col in df.columns if col not in base_columns]

# from_lists should have at least one list column defined
if len(list_columns) == 0:
raise ValueError("No columns were assigned as list columns.")
else:
# reject any list columns that are not pyarrow dtyped
for col in list_columns:
if not hasattr(df[col].dtype, "pyarrow_dtype"):
raise TypeError(
f"""List column '{col}' dtype ({df[col].dtype}) is not a pyarrow list dtype.
Refer to the docstring for guidance on dtype requirements and assignment."""
)
elif not pa.types.is_list(df[col].dtype.pyarrow_dtype):
raise TypeError(
f"""List column '{col}' dtype ({df[col].dtype}) is not a pyarrow list dtype.
Refer to the docstring for guidance on dtype requirements and assignment."""
)

meta = npd.NestedFrame(df[base_columns]._meta)

nested_meta = pack(df[list_columns]._meta, name)
meta = meta.join(nested_meta)

return df.map_partitions(
lambda x: npd.NestedFrame.from_lists(
df=x, base_columns=base_columns, list_columns=list_columns, name=name
),
meta=meta,
)

def compute(self, **kwargs):
"""Compute this Dask collection, returning the underlying dataframe or series."""
return npd.NestedFrame(super().compute(**kwargs))
Expand Down
4 changes: 4 additions & 0 deletions tests/nested_dask/test_io.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import nested_dask as nd
import pandas as pd
import pyarrow as pa


def test_read_parquet(test_dataset, tmp_path):
Expand All @@ -19,6 +21,8 @@ def test_read_parquet(test_dataset, tmp_path):
base = nd.read_parquet(test_save_path, calculate_divisions=True)
nested = nd.read_parquet(nested_save_path, calculate_divisions=True)

# this is read as a large_string, just make it a string
nested = nested.astype({"band": pd.ArrowDtype(pa.string())})
base = base.add_nested(nested, "nested")

# Check the loaded dataset against the original
Expand Down
Loading

0 comments on commit cb71a92

Please sign in to comment.