Skip to content

API Reference

This section provides comprehensive documentation of pyngb's API, including all functions, classes, and modules.

Core Functions

Data Loading

pyngb.read_ngb(path, *, return_metadata=False, baseline_file=None, dynamic_axis='sample_temperature')

read_ngb(
    path: str,
    *,
    return_metadata: Literal[False] = False,
    baseline_file: None = None,
    dynamic_axis: str = "time",
) -> pa.Table
read_ngb(
    path: str,
    *,
    return_metadata: Literal[True],
    baseline_file: None = None,
    dynamic_axis: str = "time",
) -> tuple[FileMetadata, pa.Table]
read_ngb(
    path: str,
    *,
    return_metadata: Literal[False] = False,
    baseline_file: str,
    dynamic_axis: str = "time",
) -> pa.Table
read_ngb(
    path: str,
    *,
    return_metadata: Literal[True],
    baseline_file: str,
    dynamic_axis: str = "time",
) -> tuple[FileMetadata, pa.Table]

Read NETZSCH NGB file data with optional baseline subtraction.

This is the primary function for loading NGB files. By default, it returns a PyArrow table with embedded metadata. For direct metadata access, use return_metadata=True. When baseline_file is provided, baseline subtraction is performed automatically.

Parameters

path : str Path to the NGB file (.ngb-ss3 or similar extension). Supports absolute and relative paths. return_metadata : bool, default False If False (default), return PyArrow table with embedded metadata. If True, return (metadata, data) tuple. baseline_file : str or None, default None Path to baseline file (.ngb-bs3) for baseline subtraction. If provided, performs automatic baseline subtraction. The baseline file must have an identical temperature program to the sample file. dynamic_axis : str, default "sample_temperature" Axis to use for dynamic segment alignment in baseline subtraction. Options: "time", "sample_temperature", "furnace_temperature"

Returns

pa.Table or tuple[FileMetadata, pa.Table] - If return_metadata=False: PyArrow table with embedded metadata - If return_metadata=True: (metadata dict, PyArrow table) tuple - If baseline_file provided: baseline-subtracted data

Raises

FileNotFoundError If the specified file does not exist NGBStreamNotFoundError If required data streams are missing from the NGB file NGBCorruptedFileError If the file structure is invalid or corrupted zipfile.BadZipFile If the file is not a valid ZIP archive

Examples

Basic usage (recommended for most users):

from pyngb import read_ngb import polars as pl

Load NGB file

data = read_ngb("experiment.ngb-ss3")

Convert to DataFrame for analysis

df = pl.from_arrow(data) print(f"Shape: {df.height} rows x {df.width} columns") Shape: 2500 rows x 8 columns

Access embedded metadata

import json metadata = json.loads(data.schema.metadata[b'file_metadata']) print(f"Sample: {metadata['sample_name']}") print(f"Instrument: {metadata['instrument']}") Sample: Polymer Sample A Instrument: NETZSCH STA 449 F3 Jupiter

Advanced usage (for metadata-heavy workflows):

Get metadata and data separately

metadata, data = read_ngb("experiment.ngb-ss3", return_metadata=True)

Work with metadata directly

print(f"Operator: {metadata.get('operator', 'Unknown')}") print(f"Sample mass: {metadata.get('sample_mass', 0)} mg") print(f"Data points: {data.num_rows}") Operator: Jane Smith Sample mass: 15.2 mg Data points: 2500

Use metadata for data processing

df = pl.from_arrow(data) initial_mass = metadata['sample_mass'] df = df.with_columns( ... (pl.col('mass') / initial_mass * 100).alias('mass_percent') ... )

Data analysis workflow:

Simple analysis

data = read_ngb("sample.ngb-ss3") df = pl.from_arrow(data)

Basic statistics

if "sample_temperature" in df.columns: ... temp_range = df["sample_temperature"].min(), df["sample_temperature"].max() ... print(f"Temperature range: {temp_range[0]:.1f} to {temp_range[1]:.1f} °C") Temperature range: 25.0 to 800.0 °C

Mass loss calculation

if "mass" in df.columns: ... mass_loss = (df["mass"].max() - df["mass"].min()) / df["mass"].max() * 100 ... print(f"Mass loss: {mass_loss:.2f}%") Mass loss: 12.3%

Performance Notes
  • Fast binary parsing with NumPy optimization
  • Memory-efficient processing with PyArrow
  • Typical parsing time: 0.1-10 seconds depending on file size
  • Includes file hash for integrity verification
See Also

NGBParser : Low-level parser for custom processing BatchProcessor : Process multiple files efficiently

Source code in src/pyngb/api/loaders.py
def read_ngb(
    path: str,
    *,
    return_metadata: bool = False,
    baseline_file: str | None = None,
    dynamic_axis: str = "sample_temperature",
) -> Union[pa.Table, tuple[FileMetadata, pa.Table]]:
    """
    Read NETZSCH NGB file data with optional baseline subtraction.

    This is the primary function for loading NGB files. By default, it returns
    a PyArrow table with embedded metadata. For direct metadata access, use return_metadata=True.
    When baseline_file is provided, baseline subtraction is performed automatically.

    Parameters
    ----------
    path : str
        Path to the NGB file (.ngb-ss3 or similar extension).
        Supports absolute and relative paths.
    return_metadata : bool, default False
        If False (default), return PyArrow table with embedded metadata.
        If True, return (metadata, data) tuple.
    baseline_file : str or None, default None
        Path to baseline file (.ngb-bs3) for baseline subtraction.
        If provided, performs automatic baseline subtraction. The baseline file
        must have an identical temperature program to the sample file.
    dynamic_axis : str, default "sample_temperature"
        Axis to use for dynamic segment alignment in baseline subtraction.
        Options: "time", "sample_temperature", "furnace_temperature"

    Returns
    -------
    pa.Table or tuple[FileMetadata, pa.Table]
        - If return_metadata=False: PyArrow table with embedded metadata
        - If return_metadata=True: (metadata dict, PyArrow table) tuple
        - If baseline_file provided: baseline-subtracted data

    Raises
    ------
    FileNotFoundError
        If the specified file does not exist
    NGBStreamNotFoundError
        If required data streams are missing from the NGB file
    NGBCorruptedFileError
        If the file structure is invalid or corrupted
    zipfile.BadZipFile
        If the file is not a valid ZIP archive

    Examples
    --------
    Basic usage (recommended for most users):

    >>> from pyngb import read_ngb
    >>> import polars as pl
    >>>
    >>> # Load NGB file
    >>> data = read_ngb("experiment.ngb-ss3")
    >>>
    >>> # Convert to DataFrame for analysis
    >>> df = pl.from_arrow(data)
    >>> print(f"Shape: {df.height} rows x {df.width} columns")
    Shape: 2500 rows x 8 columns

    >>> # Access embedded metadata
    >>> import json
    >>> metadata = json.loads(data.schema.metadata[b'file_metadata'])
    >>> print(f"Sample: {metadata['sample_name']}")
    >>> print(f"Instrument: {metadata['instrument']}")
    Sample: Polymer Sample A
    Instrument: NETZSCH STA 449 F3 Jupiter

    Advanced usage (for metadata-heavy workflows):

    >>> # Get metadata and data separately
    >>> metadata, data = read_ngb("experiment.ngb-ss3", return_metadata=True)
    >>>
    >>> # Work with metadata directly
    >>> print(f"Operator: {metadata.get('operator', 'Unknown')}")
    >>> print(f"Sample mass: {metadata.get('sample_mass', 0)} mg")
    >>> print(f"Data points: {data.num_rows}")
    Operator: Jane Smith
    Sample mass: 15.2 mg
    Data points: 2500

    >>> # Use metadata for data processing
    >>> df = pl.from_arrow(data)
    >>> initial_mass = metadata['sample_mass']
    >>> df = df.with_columns(
    ...     (pl.col('mass') / initial_mass * 100).alias('mass_percent')
    ... )

    Data analysis workflow:

    >>> # Simple analysis
    >>> data = read_ngb("sample.ngb-ss3")
    >>> df = pl.from_arrow(data)
    >>>
    >>> # Basic statistics
    >>> if "sample_temperature" in df.columns:
    ...     temp_range = df["sample_temperature"].min(), df["sample_temperature"].max()
    ...     print(f"Temperature range: {temp_range[0]:.1f} to {temp_range[1]:.1f} °C")
    Temperature range: 25.0 to 800.0 °C

    >>> # Mass loss calculation
    >>> if "mass" in df.columns:
    ...     mass_loss = (df["mass"].max() - df["mass"].min()) / df["mass"].max() * 100
    ...     print(f"Mass loss: {mass_loss:.2f}%")
    Mass loss: 12.3%

    Performance Notes
    -----------------
    - Fast binary parsing with NumPy optimization
    - Memory-efficient processing with PyArrow
    - Typical parsing time: 0.1-10 seconds depending on file size
    - Includes file hash for integrity verification

    See Also
    --------
    NGBParser : Low-level parser for custom processing
    BatchProcessor : Process multiple files efficiently
    """
    parser = NGBParser()
    metadata, data = parser.parse(path)

    # Add file hash to metadata
    file_hash = get_hash(path)
    if file_hash is not None:
        metadata["file_hash"] = {
            "file": Path(path).name,
            "method": "BLAKE2b",
            "hash": file_hash,
        }

    # Handle baseline subtraction if requested
    if baseline_file is not None:
        from ..baseline import subtract_baseline

        # Validate dynamic_axis
        valid_axes = ["time", "sample_temperature", "furnace_temperature"]
        if dynamic_axis not in valid_axes:
            raise ValueError(
                f"dynamic_axis must be one of {valid_axes}, got '{dynamic_axis}'"
            )

        # Perform baseline subtraction (this will load baseline metadata internally)
        subtracted_df = subtract_baseline(
            path,
            baseline_file,
            dynamic_axis,  # type: ignore  # We validated it above
        )

        # Convert back to PyArrow
        data = subtracted_df.to_arrow()

    if return_metadata:
        return metadata, data

    # Attach metadata to the Arrow table
    data = set_metadata(data, tbl_meta={"file_metadata": metadata, "type": "STA"})
    return data

Usage Examples

# Basic data loading
from pyngb import read_ngb

# Method 1: Load as PyArrow table with embedded metadata (recommended)
table = read_ngb("sample.ngb-ss3")
print(f"Shape: {table.num_rows} x {table.num_columns}")

# Method 2: Get separate metadata and data
metadata, data = read_ngb("sample.ngb-ss3", return_metadata=True)
print(f"Sample: {metadata.get('sample_name', 'Unknown')}")

Baseline Subtraction

pyngb.subtract_baseline(sample_file, baseline_file, dynamic_axis='sample_temperature')

Subtract baseline data from sample data.

This function loads both sample (.ngb-ss3) and baseline (.ngb-bs3) files, validates that they have identical temperature programs, identifies isothermal and dynamic segments, and performs appropriate baseline subtraction. For isothermal segments, subtraction is done on the time axis. For dynamic segments, the user can choose the alignment axis.

Only the 'mass' and 'dsc_signal' columns are subtracted. All other columns (time, temperatures, flows) are retained from the sample file.

Parameters

sample_file : str Path to the sample file (.ngb-ss3) baseline_file : str Path to the baseline file (.ngb-bs3). Must have identical temperature program to the sample file. dynamic_axis : str, default="sample_temperature" Axis to use for dynamic segment alignment and subtraction. Options: "time", "sample_temperature", "furnace_temperature"

Returns

pl.DataFrame DataFrame with baseline-subtracted data

Raises

ValueError If temperature programs between sample and baseline are incompatible FileNotFoundError If either file does not exist

Examples

Basic subtraction using sample temperature axis for dynamic segments (default)

df = subtract_baseline("sample.ngb-ss3", "baseline.ngb-bs3")

Use time axis for dynamic segment alignment

df = subtract_baseline( ... "sample.ngb-ss3", ... "baseline.ngb-bs3", ... dynamic_axis="time" ... )

Source code in src/pyngb/baseline.py
def subtract_baseline(
    sample_file: str,
    baseline_file: str,
    dynamic_axis: Literal[
        "time", "sample_temperature", "furnace_temperature"
    ] = "sample_temperature",
) -> pl.DataFrame:
    """
    Subtract baseline data from sample data.

    This function loads both sample (.ngb-ss3) and baseline (.ngb-bs3) files,
    validates that they have identical temperature programs, identifies isothermal
    and dynamic segments, and performs appropriate baseline subtraction. For
    isothermal segments, subtraction is done on the time axis. For dynamic segments,
    the user can choose the alignment axis.

    Only the 'mass' and 'dsc_signal' columns are subtracted. All other columns
    (time, temperatures, flows) are retained from the sample file.

    Parameters
    ----------
    sample_file : str
        Path to the sample file (.ngb-ss3)
    baseline_file : str
        Path to the baseline file (.ngb-bs3). Must have identical temperature
        program to the sample file.
    dynamic_axis : str, default="sample_temperature"
        Axis to use for dynamic segment alignment and subtraction.
        Options: "time", "sample_temperature", "furnace_temperature"

    Returns
    -------
    pl.DataFrame
        DataFrame with baseline-subtracted data

    Raises
    ------
    ValueError
        If temperature programs between sample and baseline are incompatible
    FileNotFoundError
        If either file does not exist

    Examples
    --------
    >>> # Basic subtraction using sample temperature axis for dynamic segments (default)
    >>> df = subtract_baseline("sample.ngb-ss3", "baseline.ngb-bs3")

    >>> # Use time axis for dynamic segment alignment
    >>> df = subtract_baseline(
    ...     "sample.ngb-ss3",
    ...     "baseline.ngb-bs3",
    ...     dynamic_axis="time"
    ... )
    """
    from .api.loaders import read_ngb

    # Load both files
    sample_metadata, sample_table = read_ngb(sample_file, return_metadata=True)
    baseline_metadata, baseline_table = read_ngb(baseline_file, return_metadata=True)

    # Convert to Polars DataFrames
    sample_df = pl.from_arrow(sample_table)
    baseline_df = pl.from_arrow(baseline_table)

    # Ensure we have DataFrames
    if not isinstance(sample_df, pl.DataFrame):
        raise TypeError("Sample data could not be converted to DataFrame")
    if not isinstance(baseline_df, pl.DataFrame):
        raise TypeError("Baseline data could not be converted to DataFrame")

    # Create subtractor and process
    subtractor = BaselineSubtractor()
    result = subtractor.process_baseline_subtraction(
        sample_df, baseline_df, sample_metadata, baseline_metadata, dynamic_axis
    )

    return result

pyngb.BaselineSubtractor

Handles baseline subtraction operations for NGB data.

Source code in src/pyngb/baseline.py
class BaselineSubtractor:
    """Handles baseline subtraction operations for NGB data."""

    def identify_segments(
        self, df: pl.DataFrame, temperature_program: dict[str, dict[str, float]]
    ) -> tuple[list[tuple[int, int]], list[tuple[int, int]]]:
        """
        Identify isothermal and dynamic segments based on temperature program.

        Parameters
        ----------
        df : pl.DataFrame
            The data to analyze
        temperature_program : dict
            Temperature program metadata from the file

        Returns
        -------
        tuple[list[tuple[int, int]], list[tuple[int, int]]]
            (isothermal_segments, dynamic_segments) as lists of (start_idx, end_idx) tuples
        """
        isothermal_segments = []
        dynamic_segments = []

        # Sort stages by time (cumulative)
        stages = []
        cumulative_time = 0.0

        for stage_name, stage_data in temperature_program.items():
            stage_time = stage_data.get("time", 0.0)
            heating_rate = stage_data.get("heating_rate", 0.0)
            start_time = cumulative_time
            end_time = cumulative_time + stage_time

            stages.append(
                {
                    "start_time": start_time,
                    "end_time": end_time,
                    "heating_rate": heating_rate,
                    "temperature": stage_data.get("temperature", 0.0),
                }
            )

            cumulative_time = end_time

        # Map time ranges to DataFrame indices
        for stage in stages:
            if stage["end_time"] <= stage["start_time"]:
                continue  # Skip zero-duration stages

            # Find indices corresponding to this time range
            mask = (df["time"] >= stage["start_time"]) & (
                df["time"] < stage["end_time"]
            )
            indices = df.with_row_index().filter(mask)["index"].to_list()

            if len(indices) > 0:
                start_idx = min(indices)
                end_idx = max(indices) + 1  # +1 for exclusive end

                if abs(stage["heating_rate"]) < 0.01:  # Essentially zero heating rate
                    isothermal_segments.append((start_idx, end_idx))
                else:
                    dynamic_segments.append((start_idx, end_idx))

        return isothermal_segments, dynamic_segments

    def interpolate_baseline(
        self, sample_segment: pl.DataFrame, baseline_segment: pl.DataFrame, axis: str
    ) -> pl.DataFrame:
        """
        Interpolate baseline data to match sample data points.

        Parameters
        ----------
        sample_segment : pl.DataFrame
            Sample data segment
        baseline_segment : pl.DataFrame
            Baseline data segment
        axis : str
            Axis to interpolate on ("time", "sample_temperature", or "furnace_temperature")

        Returns
        -------
        pl.DataFrame
            Interpolated baseline data
        """
        if axis not in sample_segment.columns or axis not in baseline_segment.columns:
            logger.warning(f"Axis '{axis}' not found in data, falling back to 'time'")
            axis = "time"

        # Get sample axis values for interpolation
        sample_axis = sample_segment[axis].to_numpy()
        baseline_axis = baseline_segment[axis].to_numpy()

        # Create interpolated baseline DataFrame
        interpolated_data = {"axis_values": sample_axis}

        # Interpolate each column we need for subtraction
        for col in ["mass", "dsc_signal"]:
            if col in baseline_segment.columns:
                baseline_values = baseline_segment[col].to_numpy()

                # Remove any NaN values for interpolation
                valid_mask = ~(np.isnan(baseline_axis) | np.isnan(baseline_values))
                if np.sum(valid_mask) < 2:
                    # Not enough valid points for interpolation
                    interpolated_values = np.full_like(sample_axis, np.nan)
                else:
                    valid_baseline_axis = baseline_axis[valid_mask]
                    valid_baseline_values = baseline_values[valid_mask]

                    # Linear interpolation, extrapolate with constant values
                    interpolated_values = np.interp(
                        sample_axis, valid_baseline_axis, valid_baseline_values
                    )

                interpolated_data[col] = interpolated_values

        # Add the axis column
        interpolated_data[axis] = sample_axis

        return pl.DataFrame(interpolated_data)

    def subtract_segment(
        self, sample_segment: pl.DataFrame, baseline_segment: pl.DataFrame, axis: str
    ) -> pl.DataFrame:
        """
        Subtract baseline from sample for a single segment.

        Parameters
        ----------
        sample_segment : pl.DataFrame
            Sample data segment
        baseline_segment : pl.DataFrame
            Baseline data segment
        axis : str
            Axis to use for alignment

        Returns
        -------
        pl.DataFrame
            Sample data with baseline subtracted
        """
        # Interpolate baseline to match sample points
        interpolated_baseline = self.interpolate_baseline(
            sample_segment, baseline_segment, axis
        )

        # Start with the original sample data
        result = sample_segment.clone()

        # Subtract mass and dsc_signal if available
        for col in ["mass", "dsc_signal"]:
            if col in result.columns and col in interpolated_baseline.columns:
                baseline_values = interpolated_baseline[col]
                result = result.with_columns(
                    [(pl.col(col) - baseline_values).alias(col)]
                )

        return result

    def validate_temperature_programs(
        self, sample_metadata: FileMetadata, baseline_metadata: FileMetadata
    ) -> None:
        """
        Validate that sample and baseline have compatible temperature programs.

        Parameters
        ----------
        sample_metadata : FileMetadata
            Sample file metadata
        baseline_metadata : FileMetadata
            Baseline file metadata

        Raises
        ------
        ValueError
            If temperature programs are incompatible
        """
        sample_temp_prog = sample_metadata.get("temperature_program", {})
        baseline_temp_prog = baseline_metadata.get("temperature_program", {})

        if not sample_temp_prog:
            logger.warning("No temperature program found in sample file")
            return

        if not baseline_temp_prog:
            raise ValueError(
                "Baseline file has no temperature program metadata. "
                "Cannot validate compatibility with sample file."
            )

        # Check if both have the same number of stages
        if len(sample_temp_prog) != len(baseline_temp_prog):
            raise ValueError(
                f"Temperature program mismatch: sample has {len(sample_temp_prog)} stages, "
                f"baseline has {len(baseline_temp_prog)} stages"
            )

        # Check each stage for compatibility
        tolerance = 1e-3  # Tolerance for floating point comparison

        for stage_key in sample_temp_prog:
            if stage_key not in baseline_temp_prog:
                raise ValueError(
                    f"Stage '{stage_key}' missing in baseline temperature program"
                )

            sample_stage = sample_temp_prog[stage_key]
            baseline_stage = baseline_temp_prog[stage_key]

            # Check critical parameters
            critical_params = ["temperature", "heating_rate", "time"]

            for param in critical_params:
                sample_val = sample_stage.get(param, 0.0)
                baseline_val = baseline_stage.get(param, 0.0)

                if abs(sample_val - baseline_val) > tolerance:
                    raise ValueError(
                        f"Temperature program mismatch in stage '{stage_key}', parameter '{param}': "
                        f"sample={sample_val}, baseline={baseline_val}"
                    )

        logger.info("Temperature programs validated successfully")

    def process_baseline_subtraction(
        self,
        sample_df: pl.DataFrame,
        baseline_df: pl.DataFrame,
        sample_metadata: FileMetadata,
        baseline_metadata: FileMetadata,
        dynamic_axis: str = "time",
    ) -> pl.DataFrame:
        """
        Process complete baseline subtraction.

        Parameters
        ----------
        sample_df : pl.DataFrame
            Sample data
        baseline_df : pl.DataFrame
            Baseline data
        sample_metadata : FileMetadata
            Sample file metadata containing temperature program
        baseline_metadata : FileMetadata
            Baseline file metadata containing temperature program
        dynamic_axis : str
            Axis to use for dynamic segment subtraction

        Returns
        -------
        pl.DataFrame
            Processed data with baseline subtracted

        Raises
        ------
        ValueError
            If temperature programs are incompatible
        """
        # Validate temperature programs first
        self.validate_temperature_programs(sample_metadata, baseline_metadata)
        # Get temperature program
        temp_program = sample_metadata.get("temperature_program", {})
        if not temp_program:
            logger.warning("No temperature program found, treating all data as dynamic")
            # Treat entire dataset as one dynamic segment
            return self.subtract_segment(sample_df, baseline_df, dynamic_axis)

        # Identify segments
        isothermal_segments, dynamic_segments = self.identify_segments(
            sample_df, temp_program
        )

        logger.info(
            f"Found {len(isothermal_segments)} isothermal segments and {len(dynamic_segments)} dynamic segments"
        )

        # Process each segment
        processed_segments = []

        # Process isothermal segments (always use time axis)
        for start_idx, end_idx in isothermal_segments:
            sample_segment = sample_df.slice(start_idx, end_idx - start_idx)
            baseline_segment = baseline_df  # Use full baseline for interpolation

            processed_segment = self.subtract_segment(
                sample_segment, baseline_segment, "time"
            )
            processed_segments.append(processed_segment)

        # Process dynamic segments (use user-specified axis)
        for start_idx, end_idx in dynamic_segments:
            sample_segment = sample_df.slice(start_idx, end_idx - start_idx)
            baseline_segment = baseline_df  # Use full baseline for interpolation

            processed_segment = self.subtract_segment(
                sample_segment, baseline_segment, dynamic_axis
            )
            processed_segments.append(processed_segment)

        # If no segments found, process as single dynamic segment
        if not processed_segments:
            logger.warning(
                "No valid segments found, processing entire dataset as dynamic"
            )
            return self.subtract_segment(sample_df, baseline_df, dynamic_axis)

        # Combine all segments back together
        result = pl.concat(processed_segments)

        return result

Functions

identify_segments(df, temperature_program)

Identify isothermal and dynamic segments based on temperature program.

Parameters

df : pl.DataFrame The data to analyze temperature_program : dict Temperature program metadata from the file

Returns

tuple[list[tuple[int, int]], list[tuple[int, int]]] (isothermal_segments, dynamic_segments) as lists of (start_idx, end_idx) tuples

Source code in src/pyngb/baseline.py
def identify_segments(
    self, df: pl.DataFrame, temperature_program: dict[str, dict[str, float]]
) -> tuple[list[tuple[int, int]], list[tuple[int, int]]]:
    """
    Identify isothermal and dynamic segments based on temperature program.

    Parameters
    ----------
    df : pl.DataFrame
        The data to analyze
    temperature_program : dict
        Temperature program metadata from the file

    Returns
    -------
    tuple[list[tuple[int, int]], list[tuple[int, int]]]
        (isothermal_segments, dynamic_segments) as lists of (start_idx, end_idx) tuples
    """
    isothermal_segments = []
    dynamic_segments = []

    # Sort stages by time (cumulative)
    stages = []
    cumulative_time = 0.0

    for stage_name, stage_data in temperature_program.items():
        stage_time = stage_data.get("time", 0.0)
        heating_rate = stage_data.get("heating_rate", 0.0)
        start_time = cumulative_time
        end_time = cumulative_time + stage_time

        stages.append(
            {
                "start_time": start_time,
                "end_time": end_time,
                "heating_rate": heating_rate,
                "temperature": stage_data.get("temperature", 0.0),
            }
        )

        cumulative_time = end_time

    # Map time ranges to DataFrame indices
    for stage in stages:
        if stage["end_time"] <= stage["start_time"]:
            continue  # Skip zero-duration stages

        # Find indices corresponding to this time range
        mask = (df["time"] >= stage["start_time"]) & (
            df["time"] < stage["end_time"]
        )
        indices = df.with_row_index().filter(mask)["index"].to_list()

        if len(indices) > 0:
            start_idx = min(indices)
            end_idx = max(indices) + 1  # +1 for exclusive end

            if abs(stage["heating_rate"]) < 0.01:  # Essentially zero heating rate
                isothermal_segments.append((start_idx, end_idx))
            else:
                dynamic_segments.append((start_idx, end_idx))

    return isothermal_segments, dynamic_segments
interpolate_baseline(sample_segment, baseline_segment, axis)

Interpolate baseline data to match sample data points.

Parameters

sample_segment : pl.DataFrame Sample data segment baseline_segment : pl.DataFrame Baseline data segment axis : str Axis to interpolate on ("time", "sample_temperature", or "furnace_temperature")

Returns

pl.DataFrame Interpolated baseline data

Source code in src/pyngb/baseline.py
def interpolate_baseline(
    self, sample_segment: pl.DataFrame, baseline_segment: pl.DataFrame, axis: str
) -> pl.DataFrame:
    """
    Interpolate baseline data to match sample data points.

    Parameters
    ----------
    sample_segment : pl.DataFrame
        Sample data segment
    baseline_segment : pl.DataFrame
        Baseline data segment
    axis : str
        Axis to interpolate on ("time", "sample_temperature", or "furnace_temperature")

    Returns
    -------
    pl.DataFrame
        Interpolated baseline data
    """
    if axis not in sample_segment.columns or axis not in baseline_segment.columns:
        logger.warning(f"Axis '{axis}' not found in data, falling back to 'time'")
        axis = "time"

    # Get sample axis values for interpolation
    sample_axis = sample_segment[axis].to_numpy()
    baseline_axis = baseline_segment[axis].to_numpy()

    # Create interpolated baseline DataFrame
    interpolated_data = {"axis_values": sample_axis}

    # Interpolate each column we need for subtraction
    for col in ["mass", "dsc_signal"]:
        if col in baseline_segment.columns:
            baseline_values = baseline_segment[col].to_numpy()

            # Remove any NaN values for interpolation
            valid_mask = ~(np.isnan(baseline_axis) | np.isnan(baseline_values))
            if np.sum(valid_mask) < 2:
                # Not enough valid points for interpolation
                interpolated_values = np.full_like(sample_axis, np.nan)
            else:
                valid_baseline_axis = baseline_axis[valid_mask]
                valid_baseline_values = baseline_values[valid_mask]

                # Linear interpolation, extrapolate with constant values
                interpolated_values = np.interp(
                    sample_axis, valid_baseline_axis, valid_baseline_values
                )

            interpolated_data[col] = interpolated_values

    # Add the axis column
    interpolated_data[axis] = sample_axis

    return pl.DataFrame(interpolated_data)
subtract_segment(sample_segment, baseline_segment, axis)

Subtract baseline from sample for a single segment.

Parameters

sample_segment : pl.DataFrame Sample data segment baseline_segment : pl.DataFrame Baseline data segment axis : str Axis to use for alignment

Returns

pl.DataFrame Sample data with baseline subtracted

Source code in src/pyngb/baseline.py
def subtract_segment(
    self, sample_segment: pl.DataFrame, baseline_segment: pl.DataFrame, axis: str
) -> pl.DataFrame:
    """
    Subtract baseline from sample for a single segment.

    Parameters
    ----------
    sample_segment : pl.DataFrame
        Sample data segment
    baseline_segment : pl.DataFrame
        Baseline data segment
    axis : str
        Axis to use for alignment

    Returns
    -------
    pl.DataFrame
        Sample data with baseline subtracted
    """
    # Interpolate baseline to match sample points
    interpolated_baseline = self.interpolate_baseline(
        sample_segment, baseline_segment, axis
    )

    # Start with the original sample data
    result = sample_segment.clone()

    # Subtract mass and dsc_signal if available
    for col in ["mass", "dsc_signal"]:
        if col in result.columns and col in interpolated_baseline.columns:
            baseline_values = interpolated_baseline[col]
            result = result.with_columns(
                [(pl.col(col) - baseline_values).alias(col)]
            )

    return result
validate_temperature_programs(sample_metadata, baseline_metadata)

Validate that sample and baseline have compatible temperature programs.

Parameters

sample_metadata : FileMetadata Sample file metadata baseline_metadata : FileMetadata Baseline file metadata

Raises

ValueError If temperature programs are incompatible

Source code in src/pyngb/baseline.py
def validate_temperature_programs(
    self, sample_metadata: FileMetadata, baseline_metadata: FileMetadata
) -> None:
    """
    Validate that sample and baseline have compatible temperature programs.

    Parameters
    ----------
    sample_metadata : FileMetadata
        Sample file metadata
    baseline_metadata : FileMetadata
        Baseline file metadata

    Raises
    ------
    ValueError
        If temperature programs are incompatible
    """
    sample_temp_prog = sample_metadata.get("temperature_program", {})
    baseline_temp_prog = baseline_metadata.get("temperature_program", {})

    if not sample_temp_prog:
        logger.warning("No temperature program found in sample file")
        return

    if not baseline_temp_prog:
        raise ValueError(
            "Baseline file has no temperature program metadata. "
            "Cannot validate compatibility with sample file."
        )

    # Check if both have the same number of stages
    if len(sample_temp_prog) != len(baseline_temp_prog):
        raise ValueError(
            f"Temperature program mismatch: sample has {len(sample_temp_prog)} stages, "
            f"baseline has {len(baseline_temp_prog)} stages"
        )

    # Check each stage for compatibility
    tolerance = 1e-3  # Tolerance for floating point comparison

    for stage_key in sample_temp_prog:
        if stage_key not in baseline_temp_prog:
            raise ValueError(
                f"Stage '{stage_key}' missing in baseline temperature program"
            )

        sample_stage = sample_temp_prog[stage_key]
        baseline_stage = baseline_temp_prog[stage_key]

        # Check critical parameters
        critical_params = ["temperature", "heating_rate", "time"]

        for param in critical_params:
            sample_val = sample_stage.get(param, 0.0)
            baseline_val = baseline_stage.get(param, 0.0)

            if abs(sample_val - baseline_val) > tolerance:
                raise ValueError(
                    f"Temperature program mismatch in stage '{stage_key}', parameter '{param}': "
                    f"sample={sample_val}, baseline={baseline_val}"
                )

    logger.info("Temperature programs validated successfully")
process_baseline_subtraction(sample_df, baseline_df, sample_metadata, baseline_metadata, dynamic_axis='time')

Process complete baseline subtraction.

Parameters

sample_df : pl.DataFrame Sample data baseline_df : pl.DataFrame Baseline data sample_metadata : FileMetadata Sample file metadata containing temperature program baseline_metadata : FileMetadata Baseline file metadata containing temperature program dynamic_axis : str Axis to use for dynamic segment subtraction

Returns

pl.DataFrame Processed data with baseline subtracted

Raises

ValueError If temperature programs are incompatible

Source code in src/pyngb/baseline.py
def process_baseline_subtraction(
    self,
    sample_df: pl.DataFrame,
    baseline_df: pl.DataFrame,
    sample_metadata: FileMetadata,
    baseline_metadata: FileMetadata,
    dynamic_axis: str = "time",
) -> pl.DataFrame:
    """
    Process complete baseline subtraction.

    Parameters
    ----------
    sample_df : pl.DataFrame
        Sample data
    baseline_df : pl.DataFrame
        Baseline data
    sample_metadata : FileMetadata
        Sample file metadata containing temperature program
    baseline_metadata : FileMetadata
        Baseline file metadata containing temperature program
    dynamic_axis : str
        Axis to use for dynamic segment subtraction

    Returns
    -------
    pl.DataFrame
        Processed data with baseline subtracted

    Raises
    ------
    ValueError
        If temperature programs are incompatible
    """
    # Validate temperature programs first
    self.validate_temperature_programs(sample_metadata, baseline_metadata)
    # Get temperature program
    temp_program = sample_metadata.get("temperature_program", {})
    if not temp_program:
        logger.warning("No temperature program found, treating all data as dynamic")
        # Treat entire dataset as one dynamic segment
        return self.subtract_segment(sample_df, baseline_df, dynamic_axis)

    # Identify segments
    isothermal_segments, dynamic_segments = self.identify_segments(
        sample_df, temp_program
    )

    logger.info(
        f"Found {len(isothermal_segments)} isothermal segments and {len(dynamic_segments)} dynamic segments"
    )

    # Process each segment
    processed_segments = []

    # Process isothermal segments (always use time axis)
    for start_idx, end_idx in isothermal_segments:
        sample_segment = sample_df.slice(start_idx, end_idx - start_idx)
        baseline_segment = baseline_df  # Use full baseline for interpolation

        processed_segment = self.subtract_segment(
            sample_segment, baseline_segment, "time"
        )
        processed_segments.append(processed_segment)

    # Process dynamic segments (use user-specified axis)
    for start_idx, end_idx in dynamic_segments:
        sample_segment = sample_df.slice(start_idx, end_idx - start_idx)
        baseline_segment = baseline_df  # Use full baseline for interpolation

        processed_segment = self.subtract_segment(
            sample_segment, baseline_segment, dynamic_axis
        )
        processed_segments.append(processed_segment)

    # If no segments found, process as single dynamic segment
    if not processed_segments:
        logger.warning(
            "No valid segments found, processing entire dataset as dynamic"
        )
        return self.subtract_segment(sample_df, baseline_df, dynamic_axis)

    # Combine all segments back together
    result = pl.concat(processed_segments)

    return result

Usage Examples

# Standalone baseline subtraction
from pyngb import subtract_baseline

# Default behavior (sample_temperature axis for dynamic segments)
corrected_df = subtract_baseline("sample.ngb-ss3", "baseline.ngb-bs3")

# Custom axis selection
corrected_df = subtract_baseline(
    "sample.ngb-ss3",
    "baseline.ngb-bs3",
    dynamic_axis="time"
)

# Integrated approach
from pyngb import read_ngb

corrected_data = read_ngb(
    "sample.ngb-ss3",
    baseline_file="baseline.ngb-bs3"
)

Batch Processing

BatchProcessor Class

pyngb.BatchProcessor

High-performance batch processing for multiple NGB files.

Provides parallel processing, progress tracking, error handling, and flexible output formats for processing collections of NGB files.

Examples:

from pyngb.batch import BatchProcessor >>> >>> processor = BatchProcessor(max_workers=4) >>> results = processor.process_directory("./data/", output_format="parquet") >>> print(f"Processed {len(results)} files") >>> >>> # Custom processing with error handling >>> results = processor.process_files( ... file_list, ... output_dir="./output/", ... skip_errors=True ... )

Source code in src/pyngb/batch.py
class BatchProcessor:
    """High-performance batch processing for multiple NGB files.

    Provides parallel processing, progress tracking, error handling, and
    flexible output formats for processing collections of NGB files.

    Examples:
    >>> from pyngb.batch import BatchProcessor
        >>>
        >>> processor = BatchProcessor(max_workers=4)
        >>> results = processor.process_directory("./data/", output_format="parquet")
        >>> print(f"Processed {len(results)} files")
        >>>
        >>> # Custom processing with error handling
        >>> results = processor.process_files(
        ...     file_list,
        ...     output_dir="./output/",
        ...     skip_errors=True
        ... )
    """

    def __init__(self, max_workers: int | None = None, verbose: bool = True):
        """Initialize batch processor.

        Args:
            max_workers: Maximum number of parallel processes (default: CPU count)
            verbose: Whether to show progress information
        """
        self.max_workers = max_workers
        self.verbose = verbose
        self._setup_logging()

    def _setup_logging(self) -> None:
        """Configure logging for batch processing without altering global config."""
        if self.verbose and not logger.handlers:
            handler = logging.StreamHandler()
            handler.setFormatter(
                logging.Formatter(
                    "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
                )
            )
            logger.addHandler(handler)
            logger.setLevel(logging.INFO)

    def process_directory(
        self,
        directory: Union[str, Path],
        pattern: str = "*.ngb-ss3",
        output_format: str = "parquet",
        output_dir: Union[str, Path] | None = None,
        skip_errors: bool = True,
    ) -> list[dict[str, str | float | None]]:
        """Process all NGB files in a directory.

        Args:
            directory: Directory containing NGB files
            pattern: File pattern to match (default: "*.ngb-ss3")
            output_format: Output format ("parquet", "csv", "both")
            output_dir: Output directory (default: same as input)
            skip_errors: Whether to continue processing if individual files fail

        Returns:
            List of processing results with status and metadata

        Examples:
            >>> processor = BatchProcessor()
            >>> results = processor.process_directory(
            ...     "./experiments/",
            ...     output_format="both",
            ...     skip_errors=True
            ... )
            >>>
            >>> # Check for errors
            >>> errors = [r for r in results if r['status'] == 'error']
            >>> print(f"Failed to process {len(errors)} files")
        """
        directory = Path(directory)
        if not directory.exists():
            raise FileNotFoundError(f"Directory not found: {directory}")

        # Find all matching files
        files = list(directory.glob(pattern))
        if not files:
            logger.warning(
                f"No files matching pattern '{pattern}' found in {directory}"
            )
            return []

        logger.info(f"Found {len(files)} files to process")

        return self.process_files(
            files,  # type: ignore[arg-type]
            output_format=output_format,
            output_dir=output_dir or directory,
            skip_errors=skip_errors,
        )

    def process_files(
        self,
        files: list[Union[str, Path]],
        output_format: str = "parquet",
        output_dir: Union[str, Path] | None = None,
        skip_errors: bool = True,
    ) -> list[dict[str, str | float | None]]:
        """Process a list of NGB files with parallel execution.

        Args:
            files: List of file paths to process
            output_format: Output format ("parquet", "csv", "both")
            output_dir: Output directory
            skip_errors: Whether to continue if individual files fail

        Returns:
            List of processing results
        """
        if not files:
            return []

        output_dir = Path(output_dir) if output_dir else Path.cwd()
        output_dir.mkdir(parents=True, exist_ok=True)

        results = []
        start_time = time.perf_counter()

        if self.max_workers == 1:
            # Sequential processing for debugging
            for file_path in files:
                result = _process_single_file_worker(
                    str(file_path), output_format, str(output_dir), skip_errors
                )
                results.append(result)
                if self.verbose:
                    self._log_progress(len(results), len(files), start_time)
        else:
            # Parallel processing
            # Use 'spawn' to avoid fork-safety issues with PyArrow/Polars
            with ProcessPoolExecutor(
                max_workers=self.max_workers,
                mp_context=mp.get_context("spawn"),
            ) as executor:
                # Submit all tasks
                future_to_file = {
                    executor.submit(
                        _process_single_file_worker,
                        str(file_path),
                        output_format,
                        str(output_dir),
                        skip_errors,
                    ): str(file_path)
                    for file_path in files
                }

                # Collect results as they complete
                for future in as_completed(future_to_file):
                    src = future_to_file[future]
                    try:
                        result = future.result()
                    except Exception as e:
                        # Convert worker exception into an error record
                        result = {
                            "file": str(src),
                            "status": "error",
                            "rows": None,
                            "columns": None,
                            "sample_name": None,
                            "processing_time": 0.0,
                            "error": f"{type(e).__name__}: {e!s}",
                        }
                        logger.error(f"Failed to process {src}: {e!s}")
                    results.append(result)

                    if self.verbose:
                        self._log_progress(len(results), len(files), start_time)

        self._log_summary(results, start_time)
        return results

    # Note: per-file processing moved to module-level worker to be multiprocessing-safe

    def _log_progress(self, completed: int, total: int, start_time: float) -> None:
        """Log processing progress."""
        if (
            completed % 10 == 0 or completed == total
        ):  # Log every 10 files or at completion
            elapsed = time.perf_counter() - start_time
            rate = completed / elapsed if elapsed > 0 else 0
            eta = (total - completed) / rate if rate > 0 else 0

            logger.info(
                f"Progress: {completed}/{total} ({completed / total * 100:.1f}%) "
                f"- Rate: {rate:.1f} files/sec - ETA: {eta:.0f}s"
            )

    def _log_summary(self, results: list[dict], start_time: float) -> None:
        """Log processing summary."""
        total_time = time.perf_counter() - start_time
        successful = sum(1 for r in results if r["status"] == "success")
        failed = len(results) - successful

        total_rows = sum(r["rows"] or 0 for r in results if r["rows"])
        avg_rate = len(results) / total_time if total_time > 0 else 0

        logger.info(
            f"Batch processing completed in {total_time:.1f}s:\n"
            f"  ✅ Successful: {successful}\n"
            f"  ❌ Failed: {failed}\n"
            f"  📊 Total rows processed: {total_rows:,}\n"
            f"  ⚡ Average rate: {avg_rate:.1f} files/sec"
        )

Functions

__init__(max_workers=None, verbose=True)

Initialize batch processor.

Parameters:

Name Type Description Default
max_workers int | None

Maximum number of parallel processes (default: CPU count)

None
verbose bool

Whether to show progress information

True
Source code in src/pyngb/batch.py
def __init__(self, max_workers: int | None = None, verbose: bool = True):
    """Initialize batch processor.

    Args:
        max_workers: Maximum number of parallel processes (default: CPU count)
        verbose: Whether to show progress information
    """
    self.max_workers = max_workers
    self.verbose = verbose
    self._setup_logging()
process_directory(directory, pattern='*.ngb-ss3', output_format='parquet', output_dir=None, skip_errors=True)

Process all NGB files in a directory.

Parameters:

Name Type Description Default
directory Union[str, Path]

Directory containing NGB files

required
pattern str

File pattern to match (default: "*.ngb-ss3")

'*.ngb-ss3'
output_format str

Output format ("parquet", "csv", "both")

'parquet'
output_dir Union[str, Path] | None

Output directory (default: same as input)

None
skip_errors bool

Whether to continue processing if individual files fail

True

Returns:

Type Description
list[dict[str, str | float | None]]

List of processing results with status and metadata

Examples:

>>> processor = BatchProcessor()
>>> results = processor.process_directory(
...     "./experiments/",
...     output_format="both",
...     skip_errors=True
... )
>>>
>>> # Check for errors
>>> errors = [r for r in results if r['status'] == 'error']
>>> print(f"Failed to process {len(errors)} files")
Source code in src/pyngb/batch.py
def process_directory(
    self,
    directory: Union[str, Path],
    pattern: str = "*.ngb-ss3",
    output_format: str = "parquet",
    output_dir: Union[str, Path] | None = None,
    skip_errors: bool = True,
) -> list[dict[str, str | float | None]]:
    """Process all NGB files in a directory.

    Args:
        directory: Directory containing NGB files
        pattern: File pattern to match (default: "*.ngb-ss3")
        output_format: Output format ("parquet", "csv", "both")
        output_dir: Output directory (default: same as input)
        skip_errors: Whether to continue processing if individual files fail

    Returns:
        List of processing results with status and metadata

    Examples:
        >>> processor = BatchProcessor()
        >>> results = processor.process_directory(
        ...     "./experiments/",
        ...     output_format="both",
        ...     skip_errors=True
        ... )
        >>>
        >>> # Check for errors
        >>> errors = [r for r in results if r['status'] == 'error']
        >>> print(f"Failed to process {len(errors)} files")
    """
    directory = Path(directory)
    if not directory.exists():
        raise FileNotFoundError(f"Directory not found: {directory}")

    # Find all matching files
    files = list(directory.glob(pattern))
    if not files:
        logger.warning(
            f"No files matching pattern '{pattern}' found in {directory}"
        )
        return []

    logger.info(f"Found {len(files)} files to process")

    return self.process_files(
        files,  # type: ignore[arg-type]
        output_format=output_format,
        output_dir=output_dir or directory,
        skip_errors=skip_errors,
    )
process_files(files, output_format='parquet', output_dir=None, skip_errors=True)

Process a list of NGB files with parallel execution.

Parameters:

Name Type Description Default
files list[Union[str, Path]]

List of file paths to process

required
output_format str

Output format ("parquet", "csv", "both")

'parquet'
output_dir Union[str, Path] | None

Output directory

None
skip_errors bool

Whether to continue if individual files fail

True

Returns:

Type Description
list[dict[str, str | float | None]]

List of processing results

Source code in src/pyngb/batch.py
def process_files(
    self,
    files: list[Union[str, Path]],
    output_format: str = "parquet",
    output_dir: Union[str, Path] | None = None,
    skip_errors: bool = True,
) -> list[dict[str, str | float | None]]:
    """Process a list of NGB files with parallel execution.

    Args:
        files: List of file paths to process
        output_format: Output format ("parquet", "csv", "both")
        output_dir: Output directory
        skip_errors: Whether to continue if individual files fail

    Returns:
        List of processing results
    """
    if not files:
        return []

    output_dir = Path(output_dir) if output_dir else Path.cwd()
    output_dir.mkdir(parents=True, exist_ok=True)

    results = []
    start_time = time.perf_counter()

    if self.max_workers == 1:
        # Sequential processing for debugging
        for file_path in files:
            result = _process_single_file_worker(
                str(file_path), output_format, str(output_dir), skip_errors
            )
            results.append(result)
            if self.verbose:
                self._log_progress(len(results), len(files), start_time)
    else:
        # Parallel processing
        # Use 'spawn' to avoid fork-safety issues with PyArrow/Polars
        with ProcessPoolExecutor(
            max_workers=self.max_workers,
            mp_context=mp.get_context("spawn"),
        ) as executor:
            # Submit all tasks
            future_to_file = {
                executor.submit(
                    _process_single_file_worker,
                    str(file_path),
                    output_format,
                    str(output_dir),
                    skip_errors,
                ): str(file_path)
                for file_path in files
            }

            # Collect results as they complete
            for future in as_completed(future_to_file):
                src = future_to_file[future]
                try:
                    result = future.result()
                except Exception as e:
                    # Convert worker exception into an error record
                    result = {
                        "file": str(src),
                        "status": "error",
                        "rows": None,
                        "columns": None,
                        "sample_name": None,
                        "processing_time": 0.0,
                        "error": f"{type(e).__name__}: {e!s}",
                    }
                    logger.error(f"Failed to process {src}: {e!s}")
                results.append(result)

                if self.verbose:
                    self._log_progress(len(results), len(files), start_time)

    self._log_summary(results, start_time)
    return results

NGBDataset Class

pyngb.NGBDataset

Dataset management for collections of NGB files.

Provides high-level operations for managing and analyzing collections of NGB files including metadata aggregation, summary statistics, and batch operations.

Examples:

from pyngb.batch import NGBDataset >>> >>> # Create dataset from directory >>> dataset = NGBDataset.from_directory("./experiments/") >>> >>> # Get overview >>> summary = dataset.summary() >>> print(f"Dataset contains {len(dataset)} files") >>> >>> # Export metadata >>> dataset.export_metadata("experiment_summary.csv") >>> >>> # Filter by criteria >>> polymer_samples = dataset.filter_by_metadata( ... lambda meta: 'polymer' in meta.get('material', '').lower() ... )

Source code in src/pyngb/batch.py
class NGBDataset:
    """Dataset management for collections of NGB files.

    Provides high-level operations for managing and analyzing
    collections of NGB files including metadata aggregation,
    summary statistics, and batch operations.

    Examples:
    >>> from pyngb.batch import NGBDataset
        >>>
        >>> # Create dataset from directory
        >>> dataset = NGBDataset.from_directory("./experiments/")
        >>>
        >>> # Get overview
        >>> summary = dataset.summary()
        >>> print(f"Dataset contains {len(dataset)} files")
        >>>
        >>> # Export metadata
        >>> dataset.export_metadata("experiment_summary.csv")
        >>>
        >>> # Filter by criteria
        >>> polymer_samples = dataset.filter_by_metadata(
        ...     lambda meta: 'polymer' in meta.get('material', '').lower()
        ... )
    """

    def __init__(self, files: list[Path]):
        """Initialize dataset with file list.

        Args:
            files: List of NGB file paths
        """
        self.files = files
        self._metadata_cache: dict[str, FileMetadata] = {}

    @classmethod
    def from_directory(
        cls, directory: Union[str, Path], pattern: str = "*.ngb-ss3"
    ) -> NGBDataset:
        """Create dataset from directory.

        Args:
            directory: Directory containing NGB files
            pattern: File pattern to match

        Returns:
            NGBDataset instance
        """
        directory = Path(directory)
        files = list(directory.glob(pattern))
        return cls(files)

    def __len__(self) -> int:
        """Return number of files in dataset."""
        return len(self.files)

    def summary(
        self,
    ) -> dict[str, int | float | list[str] | tuple[float, float] | None]:
        """Generate dataset summary statistics.

        Returns:
            Dictionary with summary information
        """
        if not self.files:
            return {"file_count": 0}

        # Load all metadata (cached)
        all_metadata = []
        for file_path in self.files:
            try:
                metadata = self._get_metadata(file_path)
                all_metadata.append(metadata)
            except Exception as e:
                logger.warning(f"Failed to load metadata for {file_path}: {e}")

        if not all_metadata:
            return {"file_count": len(self.files), "loadable_files": 0}

        # Extract statistics
        instruments = [m.get("instrument", "Unknown") for m in all_metadata]
        operators = [m.get("operator", "Unknown") for m in all_metadata]
        materials = [m.get("material", "Unknown") for m in all_metadata]

        sample_masses = [
            float(mass)
            for m in all_metadata
            if (mass := m.get("sample_mass")) is not None
        ]

        return {
            "file_count": len(self.files),
            "loadable_files": len(all_metadata),
            "unique_instruments": list(set(instruments)),
            "unique_operators": list(set(operators)),
            "unique_materials": list(set(materials)),
            "sample_mass_range": (min(sample_masses), max(sample_masses))
            if sample_masses
            else None,
            "avg_sample_mass": sum(sample_masses) / len(sample_masses)
            if sample_masses
            else None,
        }

    def export_metadata(
        self, output_path: Union[str, Path], format: str = "csv"
    ) -> None:
        """Export metadata for all files.

        Args:
            output_path: Output file path
            format: Output format ("csv", "json", "parquet")
        """
        all_metadata = []

        for file_path in self.files:
            try:
                metadata = self._get_metadata(file_path)
                # Flatten metadata for tabular export
                flat_meta = {
                    "file_path": str(file_path),
                    "file_name": file_path.name,
                    **metadata,
                }
                all_metadata.append(flat_meta)
            except Exception as e:
                logger.warning(f"Failed to load metadata for {file_path}: {e}")
                all_metadata.append(
                    {
                        "file_path": str(file_path),
                        "file_name": file_path.name,
                        "error": str(e),
                    }
                )

        if not all_metadata:
            logger.warning("No metadata to export")
            return

        # Convert to DataFrame for export
        df = pl.DataFrame(all_metadata)

        output_path = Path(output_path)
        if format.lower() == "csv":
            # Flatten nested data for CSV compatibility
            df_flattened = self._flatten_dataframe_for_csv(df)
            df_flattened.write_csv(output_path)
        elif format.lower() == "json":
            df.write_json(output_path)
        elif format.lower() == "parquet":
            df.write_parquet(output_path)
        else:
            raise ValueError(f"Unsupported format: {format}")

        logger.info(f"Exported metadata for {len(all_metadata)} files to {output_path}")

    def _flatten_dataframe_for_csv(self, df: pl.DataFrame) -> pl.DataFrame:
        """Flatten nested data structures for CSV export compatibility.

        Args:
            df: DataFrame with potentially nested data

        Returns:
            DataFrame with flattened data suitable for CSV export
        """
        import json

        # Create a new dataframe with flattened columns
        flattened_data = []

        for row in df.iter_rows(named=True):
            flattened_row = {}
            for key, value in row.items():
                if isinstance(value, (dict, list)):
                    # Convert nested structures to JSON strings
                    flattened_row[key] = (
                        json.dumps(value) if value is not None else None
                    )
                else:
                    flattened_row[key] = value
            flattened_data.append(flattened_row)

        return pl.DataFrame(flattened_data)

    def filter_by_metadata(
        self, predicate: Callable[[FileMetadata], bool]
    ) -> NGBDataset:
        """Filter dataset by metadata criteria.

        Args:
            predicate: Function that takes metadata dict and returns bool

        Returns:
            New NGBDataset with filtered files
        """
        filtered_files = []

        for file_path in self.files:
            try:
                metadata = self._get_metadata(file_path)
                if predicate(metadata):
                    filtered_files.append(file_path)
            except Exception as e:
                logger.warning(f"Failed to check metadata for {file_path}: {e}")

        return NGBDataset(filtered_files)

    def _get_metadata(self, file_path: Path) -> FileMetadata:
        """Get metadata for file with caching.

        Args:
            file_path: Path to NGB file

        Returns:
            File metadata
        """
        cache_key = str(file_path)

        if cache_key not in self._metadata_cache:
            metadata, _ = read_ngb(str(file_path), return_metadata=True)
            self._metadata_cache[cache_key] = metadata

        return self._metadata_cache[cache_key]

Functions

__init__(files)

Initialize dataset with file list.

Parameters:

Name Type Description Default
files list[Path]

List of NGB file paths

required
Source code in src/pyngb/batch.py
def __init__(self, files: list[Path]):
    """Initialize dataset with file list.

    Args:
        files: List of NGB file paths
    """
    self.files = files
    self._metadata_cache: dict[str, FileMetadata] = {}
from_directory(directory, pattern='*.ngb-ss3') classmethod

Create dataset from directory.

Parameters:

Name Type Description Default
directory Union[str, Path]

Directory containing NGB files

required
pattern str

File pattern to match

'*.ngb-ss3'

Returns:

Type Description
NGBDataset

NGBDataset instance

Source code in src/pyngb/batch.py
@classmethod
def from_directory(
    cls, directory: Union[str, Path], pattern: str = "*.ngb-ss3"
) -> NGBDataset:
    """Create dataset from directory.

    Args:
        directory: Directory containing NGB files
        pattern: File pattern to match

    Returns:
        NGBDataset instance
    """
    directory = Path(directory)
    files = list(directory.glob(pattern))
    return cls(files)
summary()

Generate dataset summary statistics.

Returns:

Type Description
dict[str, int | float | list[str] | tuple[float, float] | None]

Dictionary with summary information

Source code in src/pyngb/batch.py
def summary(
    self,
) -> dict[str, int | float | list[str] | tuple[float, float] | None]:
    """Generate dataset summary statistics.

    Returns:
        Dictionary with summary information
    """
    if not self.files:
        return {"file_count": 0}

    # Load all metadata (cached)
    all_metadata = []
    for file_path in self.files:
        try:
            metadata = self._get_metadata(file_path)
            all_metadata.append(metadata)
        except Exception as e:
            logger.warning(f"Failed to load metadata for {file_path}: {e}")

    if not all_metadata:
        return {"file_count": len(self.files), "loadable_files": 0}

    # Extract statistics
    instruments = [m.get("instrument", "Unknown") for m in all_metadata]
    operators = [m.get("operator", "Unknown") for m in all_metadata]
    materials = [m.get("material", "Unknown") for m in all_metadata]

    sample_masses = [
        float(mass)
        for m in all_metadata
        if (mass := m.get("sample_mass")) is not None
    ]

    return {
        "file_count": len(self.files),
        "loadable_files": len(all_metadata),
        "unique_instruments": list(set(instruments)),
        "unique_operators": list(set(operators)),
        "unique_materials": list(set(materials)),
        "sample_mass_range": (min(sample_masses), max(sample_masses))
        if sample_masses
        else None,
        "avg_sample_mass": sum(sample_masses) / len(sample_masses)
        if sample_masses
        else None,
    }
export_metadata(output_path, format='csv')

Export metadata for all files.

Parameters:

Name Type Description Default
output_path Union[str, Path]

Output file path

required
format str

Output format ("csv", "json", "parquet")

'csv'
Source code in src/pyngb/batch.py
def export_metadata(
    self, output_path: Union[str, Path], format: str = "csv"
) -> None:
    """Export metadata for all files.

    Args:
        output_path: Output file path
        format: Output format ("csv", "json", "parquet")
    """
    all_metadata = []

    for file_path in self.files:
        try:
            metadata = self._get_metadata(file_path)
            # Flatten metadata for tabular export
            flat_meta = {
                "file_path": str(file_path),
                "file_name": file_path.name,
                **metadata,
            }
            all_metadata.append(flat_meta)
        except Exception as e:
            logger.warning(f"Failed to load metadata for {file_path}: {e}")
            all_metadata.append(
                {
                    "file_path": str(file_path),
                    "file_name": file_path.name,
                    "error": str(e),
                }
            )

    if not all_metadata:
        logger.warning("No metadata to export")
        return

    # Convert to DataFrame for export
    df = pl.DataFrame(all_metadata)

    output_path = Path(output_path)
    if format.lower() == "csv":
        # Flatten nested data for CSV compatibility
        df_flattened = self._flatten_dataframe_for_csv(df)
        df_flattened.write_csv(output_path)
    elif format.lower() == "json":
        df.write_json(output_path)
    elif format.lower() == "parquet":
        df.write_parquet(output_path)
    else:
        raise ValueError(f"Unsupported format: {format}")

    logger.info(f"Exported metadata for {len(all_metadata)} files to {output_path}")
filter_by_metadata(predicate)

Filter dataset by metadata criteria.

Parameters:

Name Type Description Default
predicate Callable[[FileMetadata], bool]

Function that takes metadata dict and returns bool

required

Returns:

Type Description
NGBDataset

New NGBDataset with filtered files

Source code in src/pyngb/batch.py
def filter_by_metadata(
    self, predicate: Callable[[FileMetadata], bool]
) -> NGBDataset:
    """Filter dataset by metadata criteria.

    Args:
        predicate: Function that takes metadata dict and returns bool

    Returns:
        New NGBDataset with filtered files
    """
    filtered_files = []

    for file_path in self.files:
        try:
            metadata = self._get_metadata(file_path)
            if predicate(metadata):
                filtered_files.append(file_path)
        except Exception as e:
            logger.warning(f"Failed to check metadata for {file_path}: {e}")

    return NGBDataset(filtered_files)

Convenience Functions

pyngb.process_directory(directory, pattern='*.ngb-ss3', output_format='parquet', max_workers=None)

Process all NGB files in a directory.

Convenience function for quick batch processing.

Parameters:

Name Type Description Default
directory Union[str, Path]

Directory containing NGB files

required
pattern str

File pattern to match

'*.ngb-ss3'
output_format str

Output format ("parquet", "csv", "both")

'parquet'
max_workers int | None

Maximum parallel processes

None

Returns:

Type Description
list[dict[str, str | float | None]]

List of processing results

Examples:

from pyngb.batch import process_directory >>> >>> results = process_directory("./data/", output_format="both") >>> successful = [r for r in results if r['status'] == 'success'] >>> print(f"Successfully processed {len(successful)} files")

Source code in src/pyngb/batch.py
def process_directory(
    directory: Union[str, Path],
    pattern: str = "*.ngb-ss3",
    output_format: str = "parquet",
    max_workers: int | None = None,
) -> list[dict[str, str | float | None]]:
    """Process all NGB files in a directory.

    Convenience function for quick batch processing.

    Args:
        directory: Directory containing NGB files
        pattern: File pattern to match
        output_format: Output format ("parquet", "csv", "both")
        max_workers: Maximum parallel processes

    Returns:
        List of processing results

    Examples:
    >>> from pyngb.batch import process_directory
        >>>
        >>> results = process_directory("./data/", output_format="both")
        >>> successful = [r for r in results if r['status'] == 'success']
        >>> print(f"Successfully processed {len(successful)} files")
    """
    processor = BatchProcessor(max_workers=max_workers)
    return processor.process_directory(directory, pattern, output_format)

pyngb.process_files(files, output_format='parquet', max_workers=None)

Process a list of NGB files.

Convenience function for batch processing specific files.

Parameters:

Name Type Description Default
files list[Union[str, Path]]

List of file paths

required
output_format str

Output format ("parquet", "csv", "both")

'parquet'
max_workers int | None

Maximum parallel processes

None

Returns:

Type Description
list[dict[str, str | float | None]]

List of processing results

Source code in src/pyngb/batch.py
def process_files(
    files: list[Union[str, Path]],
    output_format: str = "parquet",
    max_workers: int | None = None,
) -> list[dict[str, str | float | None]]:
    """Process a list of NGB files.

    Convenience function for batch processing specific files.

    Args:
        files: List of file paths
        output_format: Output format ("parquet", "csv", "both")
        max_workers: Maximum parallel processes

    Returns:
        List of processing results
    """
    processor = BatchProcessor(max_workers=max_workers)
    return processor.process_files(files, output_format=output_format)

Batch Processing Examples

from pyngb import BatchProcessor, NGBDataset, process_directory

# Method 1: Using BatchProcessor class
processor = BatchProcessor(max_workers=4, verbose=True)
results = processor.process_files(
    ["file1.ngb-ss3", "file2.ngb-ss3"],
    output_format="both",
    output_dir="./output/"
)

# Method 2: Using convenience functions
results = process_directory(
    "./data/",
    pattern="*.ngb-ss3",
    output_format="parquet",
    max_workers=2
)

# Method 3: Dataset management
dataset = NGBDataset.from_directory("./experiments/")
summary = dataset.summary()
dataset.export_metadata("metadata.csv")

Data Validation

Validation Functions

pyngb.validate_sta_data(data, metadata=None)

Quick validation function that returns a list of issues.

Convenience function for basic validation without detailed reporting.

Parameters:

Name Type Description Default
data Union[Table, DataFrame]

STA data table or dataframe

required
metadata FileMetadata | None

Optional metadata dictionary

None

Returns:

Type Description
list[str]

List of validation issues found

Examples:

from pyngb import read_ngb from pyngb.validation import validate_sta_data >>> >>> table = read_ngb("sample.ngb-ss3") >>> issues = validate_sta_data(table) >>> >>> if issues: ... print("Validation issues found:") ... for issue in issues: ... print(f" - {issue}") ... else: ... print("Data validation passed!")

Source code in src/pyngb/validation.py
def validate_sta_data(
    data: Union[pa.Table, pl.DataFrame], metadata: FileMetadata | None = None
) -> list[str]:
    """Quick validation function that returns a list of issues.

    Convenience function for basic validation without detailed reporting.

    Args:
        data: STA data table or dataframe
        metadata: Optional metadata dictionary

    Returns:
        List of validation issues found

    Examples:
    >>> from pyngb import read_ngb
    >>> from pyngb.validation import validate_sta_data
        >>>
        >>> table = read_ngb("sample.ngb-ss3")
        >>> issues = validate_sta_data(table)
        >>>
        >>> if issues:
        ...     print("Validation issues found:")
        ...     for issue in issues:
        ...         print(f"  - {issue}")
        ... else:
        ...     print("Data validation passed!")
    """
    checker = QualityChecker(data, metadata)
    return checker.quick_check()

QualityChecker Class

pyngb.QualityChecker

Comprehensive quality checking for STA data.

Performs various validation checks on STA data including: - Data completeness and structure - Physical validity of measurements - Temperature profile analysis - Statistical outlier detection - Metadata consistency

Examples:

from pyngb import read_ngb from pyngb.validation import QualityChecker >>> >>> table = read_ngb("sample.ngb-ss3") >>> checker = QualityChecker(table) >>> result = checker.full_validation() >>> >>> if not result.is_valid: ... print("Data validation failed!") ... print(result.report()) >>> >>> # Quick validation >>> issues = checker.quick_check() >>> print(f"Found {len(issues)} issues")

Source code in src/pyngb/validation.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
class QualityChecker:
    """Comprehensive quality checking for STA data.

    Performs various validation checks on STA data including:
    - Data completeness and structure
    - Physical validity of measurements
    - Temperature profile analysis
    - Statistical outlier detection
    - Metadata consistency

    Examples:
    >>> from pyngb import read_ngb
    >>> from pyngb.validation import QualityChecker
        >>>
        >>> table = read_ngb("sample.ngb-ss3")
        >>> checker = QualityChecker(table)
        >>> result = checker.full_validation()
        >>>
        >>> if not result.is_valid:
        ...     print("Data validation failed!")
        ...     print(result.report())
        >>>
        >>> # Quick validation
        >>> issues = checker.quick_check()
        >>> print(f"Found {len(issues)} issues")
    """

    df: pl.DataFrame
    metadata: FileMetadata
    result: ValidationResult

    def __init__(
        self, data: Union[pa.Table, pl.DataFrame], metadata: FileMetadata | None = None
    ):
        """Initialize quality checker.

        Args:
            data: STA data table or dataframe
            metadata: Optional metadata dictionary
        """
        if isinstance(data, pa.Table):
            df_temp = pl.from_arrow(data)
            # Ensure we have a DataFrame, not a Series
            self.df = (
                df_temp if isinstance(df_temp, pl.DataFrame) else df_temp.to_frame()
            )
            # Try to extract metadata from table
            if metadata is None:
                try:
                    if data.schema.metadata:  # type: ignore[attr-defined]
                        metadata = self._extract_metadata_from_table(data)
                except (AttributeError, KeyError):
                    # Schema has no metadata or metadata is not accessible
                    pass
        else:
            self.df = data

        self.metadata = metadata or {}
        self.result = ValidationResult()

    def _extract_metadata_from_table(self, table: pa.Table) -> FileMetadata:
        """Extract metadata from PyArrow table."""
        import json

        if b"file_metadata" in table.schema.metadata:
            metadata_json = table.schema.metadata[b"file_metadata"].decode()
            metadata: FileMetadata = json.loads(metadata_json)
            return metadata
        return {}

    def full_validation(self) -> ValidationResult:
        """Perform comprehensive validation of STA data.

        Returns:
            ValidationResult with detailed findings
        """
        self.result = ValidationResult()

        # Basic structure checks
        self._check_data_structure()

        # Column-specific checks
        if "sample_temperature" in self.df.columns:
            self._check_temperature_data()

        if "time" in self.df.columns:
            self._check_time_data()

        if "mass" in self.df.columns:
            self._check_mass_data()

        if "dsc_signal" in self.df.columns:
            self._check_dsc_data()

        # Cross-column consistency checks
        self._check_data_consistency()

        # Metadata validation
        if self.metadata:
            self._check_metadata_consistency()

        # Statistical checks
        self._check_statistical_properties()

        return self.result

    def quick_check(self) -> list[str]:
        """Perform quick validation and return list of issues.

        Returns:
            List of issue descriptions
        """
        issues = []

        # Check for required columns
        required_cols = ["time", "sample_temperature"]
        missing_cols = [col for col in required_cols if col not in self.df.columns]
        if missing_cols:
            issues.append(f"Missing required columns: {missing_cols}")

        # Check for empty data
        if self.df.height == 0:
            issues.append("Dataset is empty")
            return issues

        # Check for null values
        null_counts = self.df.null_count()
        for row in null_counts.iter_rows(named=True):
            for col, count in row.items():
                if count > 0:
                    percentage = (count / self.df.height) * 100
                    issues.append(
                        f"Column '{col}' has {count} null values ({percentage:.1f}%)"
                    )

        # Quick temperature check
        if "sample_temperature" in self.df.columns:
            temp_stats = self.df.select("sample_temperature").describe()
            temp_min = temp_stats.filter(pl.col("statistic") == "min")[
                "sample_temperature"
            ][0]
            temp_max = temp_stats.filter(pl.col("statistic") == "max")[
                "sample_temperature"
            ][0]

            if temp_min == temp_max:
                issues.append("Temperature is constant (no heating/cooling)")
            elif temp_min < -50 or temp_max > 2000:
                issues.append(
                    f"Unusual temperature range: {temp_min:.1f} to {temp_max:.1f}°C"
                )

        return issues

    def _check_data_structure(self) -> None:
        """Check basic data structure."""
        # Check if data exists
        if self.df.height == 0:
            self.result.add_error("Dataset is empty")
            return

        # Check for required columns
        required_cols = ["time", "sample_temperature"]
        missing_cols = [col for col in required_cols if col not in self.df.columns]
        if missing_cols:
            self.result.add_error(f"Missing required columns: {missing_cols}")
        else:
            self.result.add_pass("Required columns present")

        # Check data types
        schema_info = []
        for col, dtype in zip(self.df.columns, self.df.dtypes):
            schema_info.append(f"{col}: {dtype}")
        self.result.add_info(f"Data schema: {', '.join(schema_info)}")

        # Check for duplicate rows
        duplicate_count = self.df.height - self.df.unique().height
        if duplicate_count > 0:
            self.result.add_warning(f"Found {duplicate_count} duplicate rows")
        else:
            self.result.add_pass("No duplicate rows")

    def _check_temperature_data(self) -> None:
        """Validate temperature measurements."""
        temp_col = self.df.select("sample_temperature")

        # Check for null values
        null_count = temp_col.null_count().item()
        if null_count > 0:
            percentage = (null_count / self.df.height) * 100
            self.result.add_warning(
                f"Temperature has {null_count} null values ({percentage:.1f}%)"
            )

        # Get temperature statistics
        temp_stats = temp_col.describe()
        temp_min = temp_stats.filter(pl.col("statistic") == "min")[
            "sample_temperature"
        ][0]
        temp_max = temp_stats.filter(pl.col("statistic") == "max")[
            "sample_temperature"
        ][0]

        # Check temperature range
        if temp_min == temp_max:
            self.result.add_error("Temperature is constant throughout experiment")
        elif temp_max - temp_min < 10:
            self.result.add_warning(
                f"Small temperature range: {temp_max - temp_min:.1f}°C"
            )
        else:
            self.result.add_pass("Temperature range is reasonable")

        # Check for physically realistic temperatures
        if temp_min < -273:  # Below absolute zero
            self.result.add_error(f"Temperature below absolute zero: {temp_min:.1f}°C")
        elif temp_min < -50:
            self.result.add_warning(f"Very low minimum temperature: {temp_min:.1f}°C")

        if temp_max > 2000:
            self.result.add_warning(f"Very high maximum temperature: {temp_max:.1f}°C")

        # Check for temperature profile monotonicity
        temp_data = temp_col.to_numpy().flatten()
        temp_diff = np.diff(temp_data)

        if np.all(temp_diff >= 0):
            self.result.add_info(
                "Temperature profile is monotonically increasing (heating)"
            )
        elif np.all(temp_diff <= 0):
            self.result.add_info(
                "Temperature profile is monotonically decreasing (cooling)"
            )
        else:
            # Mixed heating/cooling
            heating_points: int = int(np.sum(temp_diff > 0))
            cooling_points: int = int(np.sum(temp_diff < 0))
            self.result.add_info(
                f"Mixed temperature profile: {heating_points} heating, {cooling_points} cooling points"
            )

    def _check_time_data(self) -> None:
        """Validate time measurements."""
        time_col = self.df.select("time")

        # Check for null values
        null_count = time_col.null_count().item()
        if null_count > 0:
            percentage = (null_count / self.df.height) * 100
            self.result.add_warning(
                f"Time has {null_count} null values ({percentage:.1f}%)"
            )

        # Check time progression
        time_data = time_col.to_numpy().flatten()
        time_diff = np.diff(time_data)

        if np.all(time_diff >= 0):
            self.result.add_pass("Time progresses monotonically")
        else:
            backwards_count: int = int(np.sum(time_diff < 0))
            self.result.add_error(f"Time goes backwards {backwards_count} times")

        # Check for reasonable time intervals
        if len(time_diff) > 0:
            positive_intervals = time_diff[time_diff > 0]
            if len(positive_intervals) > 0:
                avg_interval = np.mean(positive_intervals)
                if avg_interval < 0.1:  # Less than 0.1 second intervals
                    self.result.add_info(
                        f"Very high time resolution: {avg_interval:.3f}s average interval"
                    )
                elif avg_interval > 60:  # More than 1 minute intervals
                    self.result.add_warning(
                        f"Low time resolution: {avg_interval:.1f}s average interval"
                    )

    def _check_mass_data(self) -> None:
        """Validate mass measurements."""
        mass_col = self.df.select("mass")

        # Check for null values
        null_count = mass_col.null_count().item()
        if null_count > 0:
            percentage = (null_count / self.df.height) * 100
            self.result.add_warning(
                f"Mass has {null_count} null values ({percentage:.1f}%)"
            )

        # Get mass statistics
        mass_stats = mass_col.describe()
        mass_min = mass_stats.filter(pl.col("statistic") == "min")["mass"][0]
        mass_max = mass_stats.filter(pl.col("statistic") == "max")["mass"][0]

        # Check mass against sample mass from metadata if available
        if (
            hasattr(self, "metadata")
            and self.metadata
            and "sample_mass" in self.metadata
        ):
            sample_mass = self.metadata["sample_mass"]

            # Calculate total mass loss (most negative value represents maximum loss)
            max_mass_loss = abs(mass_min) if mass_min < 0 else 0

            if sample_mass > 0:
                mass_loss_percentage = (max_mass_loss / sample_mass) * 100

                # Check if mass loss exceeds sample mass (with 10% tolerance for measurement uncertainty)
                if max_mass_loss > sample_mass * 1.1:
                    self.result.add_error(
                        f"Mass loss ({max_mass_loss:.3f}mg) exceeds sample mass ({sample_mass:.3f}mg) by more than tolerance"
                    )
                elif mass_loss_percentage > 100:
                    self.result.add_warning(
                        f"Mass loss ({mass_loss_percentage:.1f}%) appears to exceed sample mass"
                    )
                else:
                    self.result.add_pass(
                        f"Mass loss ({mass_loss_percentage:.1f}%) is within expected range"
                    )
            else:
                self.result.add_warning(
                    "Sample mass in metadata is zero or negative - cannot validate mass loss"
                )
        else:
            self.result.add_info(
                "No sample mass in metadata - skipping mass loss validation"
            )

        # Check for extremely high maximum mass values (instrument limits)
        if mass_max > 1000:  # More than 1g
            self.result.add_warning(f"Very high mass reading: {mass_max:.1f}mg")

        # Check mass loss/gain
        initial_mass = mass_col[0, 0]
        final_mass = mass_col[-1, 0]

        # For thermal analysis, initial mass is typically zeroed, so calculate relative to that zero point
        # Check for reasonable mass change patterns
        mass_change = final_mass - initial_mass

        if abs(mass_change) < 0.001:  # Less than 1 μg change
            self.result.add_info(f"Very small mass change: {mass_change:.3f}mg")
        elif mass_change > 5:  # Mass gain > 5mg (unusual)
            self.result.add_warning(f"Significant mass gain: {mass_change:.3f}mg")
        else:
            self.result.add_pass("Mass change is within reasonable range")

    def _check_dsc_data(self) -> None:
        """Validate DSC measurements."""
        dsc_col = self.df.select("dsc_signal")

        # Check for null values
        null_count = dsc_col.null_count().item()
        if null_count > 0:
            percentage = (null_count / self.df.height) * 100
            self.result.add_warning(
                f"DSC has {null_count} null values ({percentage:.1f}%)"
            )

        # Get DSC statistics
        dsc_stats = dsc_col.describe()
        dsc_min = dsc_stats.filter(pl.col("statistic") == "min")["dsc_signal"][0]
        dsc_max = dsc_stats.filter(pl.col("statistic") == "max")["dsc_signal"][0]
        dsc_std = dsc_stats.filter(pl.col("statistic") == "std")["dsc_signal"][0]

        # Check for constant DSC signal (no thermal events)
        if dsc_std < 0.001:
            self.result.add_warning(
                "DSC signal is nearly constant - no thermal events detected"
            )
        else:
            self.result.add_pass("DSC signal shows variation")

        # Check for extreme values
        if abs(dsc_max) > 1000 or abs(dsc_min) > 1000:
            self.result.add_warning(
                f"Extreme DSC values detected: {dsc_min:.1f} to {dsc_max:.1f} μV"
            )

    def _check_data_consistency(self) -> None:
        """Check consistency between different measurements."""
        # Check if all columns have the same length (should be guaranteed by DataFrame)
        self.result.add_pass("All columns have consistent length")

        # Check for synchronized time/temperature if both present
        if "time" in self.df.columns and "sample_temperature" in self.df.columns:
            # Check if temperature changes correlate with time
            time_data = self.df.select("time").to_numpy().flatten()
            temp_data = self.df.select("sample_temperature").to_numpy().flatten()

            # Simple correlation check
            if len(time_data) > 1 and len(temp_data) > 1:
                correlation = np.corrcoef(time_data, temp_data)[0, 1]
                if abs(correlation) > 0.8:
                    self.result.add_pass(
                        f"Time and temperature are well correlated (r={correlation:.3f})"
                    )
                else:
                    self.result.add_info(
                        f"Time and temperature correlation: r={correlation:.3f}"
                    )

    def _check_metadata_consistency(self) -> None:
        """Check metadata for consistency and completeness."""
        required_metadata = ["instrument", "sample_name", "operator"]
        missing_metadata = [
            field for field in required_metadata if not self.metadata.get(field)
        ]

        if missing_metadata:
            self.result.add_warning(f"Missing metadata fields: {missing_metadata}")
        else:
            self.result.add_pass("Essential metadata fields present")

    def _check_statistical_properties(self) -> None:
        """Check statistical properties for anomalies."""
        numeric_columns = [
            col
            for col, dtype in zip(self.df.columns, self.df.dtypes)
            if dtype in [pl.Float64, pl.Float32, pl.Int64, pl.Int32]
        ]

        for col in numeric_columns:
            data = self.df.select(col).to_numpy().flatten()

            # Check for outliers using IQR method
            if len(data) > 10:  # Only check if enough data points
                q1 = np.percentile(data, 25)
                q3 = np.percentile(data, 75)
                iqr = q3 - q1

                if iqr > 0:
                    lower_bound = q1 - 1.5 * iqr
                    upper_bound = q3 + 1.5 * iqr

                    outliers: int = int(
                        np.sum((data < lower_bound) | (data > upper_bound))
                    )
                    outlier_percentage = (outliers / len(data)) * 100

                    if outlier_percentage > 5:
                        self.result.add_warning(
                            f"Column '{col}' has {outliers} outliers ({outlier_percentage:.1f}%)"
                        )

Functions

__init__(data, metadata=None)

Initialize quality checker.

Parameters:

Name Type Description Default
data Union[Table, DataFrame]

STA data table or dataframe

required
metadata FileMetadata | None

Optional metadata dictionary

None
Source code in src/pyngb/validation.py
def __init__(
    self, data: Union[pa.Table, pl.DataFrame], metadata: FileMetadata | None = None
):
    """Initialize quality checker.

    Args:
        data: STA data table or dataframe
        metadata: Optional metadata dictionary
    """
    if isinstance(data, pa.Table):
        df_temp = pl.from_arrow(data)
        # Ensure we have a DataFrame, not a Series
        self.df = (
            df_temp if isinstance(df_temp, pl.DataFrame) else df_temp.to_frame()
        )
        # Try to extract metadata from table
        if metadata is None:
            try:
                if data.schema.metadata:  # type: ignore[attr-defined]
                    metadata = self._extract_metadata_from_table(data)
            except (AttributeError, KeyError):
                # Schema has no metadata or metadata is not accessible
                pass
    else:
        self.df = data

    self.metadata = metadata or {}
    self.result = ValidationResult()
quick_check()

Perform quick validation and return list of issues.

Returns:

Type Description
list[str]

List of issue descriptions

Source code in src/pyngb/validation.py
def quick_check(self) -> list[str]:
    """Perform quick validation and return list of issues.

    Returns:
        List of issue descriptions
    """
    issues = []

    # Check for required columns
    required_cols = ["time", "sample_temperature"]
    missing_cols = [col for col in required_cols if col not in self.df.columns]
    if missing_cols:
        issues.append(f"Missing required columns: {missing_cols}")

    # Check for empty data
    if self.df.height == 0:
        issues.append("Dataset is empty")
        return issues

    # Check for null values
    null_counts = self.df.null_count()
    for row in null_counts.iter_rows(named=True):
        for col, count in row.items():
            if count > 0:
                percentage = (count / self.df.height) * 100
                issues.append(
                    f"Column '{col}' has {count} null values ({percentage:.1f}%)"
                )

    # Quick temperature check
    if "sample_temperature" in self.df.columns:
        temp_stats = self.df.select("sample_temperature").describe()
        temp_min = temp_stats.filter(pl.col("statistic") == "min")[
            "sample_temperature"
        ][0]
        temp_max = temp_stats.filter(pl.col("statistic") == "max")[
            "sample_temperature"
        ][0]

        if temp_min == temp_max:
            issues.append("Temperature is constant (no heating/cooling)")
        elif temp_min < -50 or temp_max > 2000:
            issues.append(
                f"Unusual temperature range: {temp_min:.1f} to {temp_max:.1f}°C"
            )

    return issues
full_validation()

Perform comprehensive validation of STA data.

Returns:

Type Description
ValidationResult

ValidationResult with detailed findings

Source code in src/pyngb/validation.py
def full_validation(self) -> ValidationResult:
    """Perform comprehensive validation of STA data.

    Returns:
        ValidationResult with detailed findings
    """
    self.result = ValidationResult()

    # Basic structure checks
    self._check_data_structure()

    # Column-specific checks
    if "sample_temperature" in self.df.columns:
        self._check_temperature_data()

    if "time" in self.df.columns:
        self._check_time_data()

    if "mass" in self.df.columns:
        self._check_mass_data()

    if "dsc_signal" in self.df.columns:
        self._check_dsc_data()

    # Cross-column consistency checks
    self._check_data_consistency()

    # Metadata validation
    if self.metadata:
        self._check_metadata_consistency()

    # Statistical checks
    self._check_statistical_properties()

    return self.result

ValidationResult Class

pyngb.ValidationResult

Container for validation results.

Stores validation issues, warnings, and overall status.

Source code in src/pyngb/validation.py
class ValidationResult:
    """Container for validation results.

    Stores validation issues, warnings, and overall status.
    """

    def __init__(self):
        self.errors: list[str] = []
        self.warnings: list[str] = []
        self.info: list[str] = []
        self.passed_checks: list[str] = []

    def add_error(self, message: str) -> None:
        """Add an error message."""
        self.errors.append(message)
        logger.error(f"Validation error: {message}")

    def add_warning(self, message: str) -> None:
        """Add a warning message."""
        self.warnings.append(message)
        logger.warning(f"Validation warning: {message}")

    def add_info(self, message: str) -> None:
        """Add an info message."""
        self.info.append(message)
        logger.info(f"Validation info: {message}")

    def add_pass(self, check_name: str) -> None:
        """Mark a check as passed."""
        self.passed_checks.append(check_name)

    @property
    def is_valid(self) -> bool:
        """Return True if no errors were found."""
        return len(self.errors) == 0

    @property
    def has_warnings(self) -> bool:
        """Return True if warnings were found."""
        return len(self.warnings) > 0

    def summary(self) -> dict[str, int | bool]:
        """Get validation summary."""
        return {
            "is_valid": self.is_valid,
            "has_warnings": self.has_warnings,
            "error_count": len(self.errors),
            "warning_count": len(self.warnings),
            "checks_passed": len(self.passed_checks),
            "total_issues": len(self.errors) + len(self.warnings),
        }

    def report(self) -> str:
        """Generate a formatted validation report."""
        lines = ["=== STA Data Validation Report ===\n"]

        # Summary
        summary = self.summary()
        status = "✅ VALID" if summary["is_valid"] else "❌ INVALID"
        lines.append(f"Overall Status: {status}")
        lines.append(f"Checks Passed: {summary['checks_passed']}")
        lines.append(f"Errors: {summary['error_count']}")
        lines.append(f"Warnings: {summary['warning_count']}\n")

        # Errors
        if self.errors:
            lines.append("🔴 ERRORS:")
            for error in self.errors:
                lines.append(f"  • {error}")
            lines.append("")

        # Warnings
        if self.warnings:
            lines.append("🟡 WARNINGS:")
            for warning in self.warnings:
                lines.append(f"  • {warning}")
            lines.append("")

        # Info
        if self.info:
            lines.append("INFO:")
            for info in self.info:
                lines.append(f"  • {info}")
            lines.append("")

        return "\n".join(lines)

Attributes

is_valid property

Return True if no errors were found.

has_warnings property

Return True if warnings were found.

Functions

summary()

Get validation summary.

Source code in src/pyngb/validation.py
def summary(self) -> dict[str, int | bool]:
    """Get validation summary."""
    return {
        "is_valid": self.is_valid,
        "has_warnings": self.has_warnings,
        "error_count": len(self.errors),
        "warning_count": len(self.warnings),
        "checks_passed": len(self.passed_checks),
        "total_issues": len(self.errors) + len(self.warnings),
    }
report()

Generate a formatted validation report.

Source code in src/pyngb/validation.py
def report(self) -> str:
    """Generate a formatted validation report."""
    lines = ["=== STA Data Validation Report ===\n"]

    # Summary
    summary = self.summary()
    status = "✅ VALID" if summary["is_valid"] else "❌ INVALID"
    lines.append(f"Overall Status: {status}")
    lines.append(f"Checks Passed: {summary['checks_passed']}")
    lines.append(f"Errors: {summary['error_count']}")
    lines.append(f"Warnings: {summary['warning_count']}\n")

    # Errors
    if self.errors:
        lines.append("🔴 ERRORS:")
        for error in self.errors:
            lines.append(f"  • {error}")
        lines.append("")

    # Warnings
    if self.warnings:
        lines.append("🟡 WARNINGS:")
        for warning in self.warnings:
            lines.append(f"  • {warning}")
        lines.append("")

    # Info
    if self.info:
        lines.append("INFO:")
        for info in self.info:
            lines.append(f"  • {info}")
        lines.append("")

    return "\n".join(lines)

Validation Examples

from pyngb.validation import QualityChecker, validate_sta_data
import polars as pl

# Load data
table = read_ngb("sample.ngb-ss3")
df = pl.from_arrow(table)

# Method 1: Quick validation
issues = validate_sta_data(df)
print(f"Found {len(issues)} issues")

# Method 2: Comprehensive validation
checker = QualityChecker(df)
result = checker.full_validation()

print(f"Valid: {result.is_valid}")
print(f"Errors: {result.summary()['error_count']}")
print(f"Warnings: {result.summary()['warning_count']}")

# Get detailed report
print(result.report())

Core Parser Classes

NGBParser

pyngb.NGBParser

Main parser for NETZSCH STA NGB files with enhanced error handling.

This is the primary interface for parsing NETZSCH NGB files. It orchestrates the parsing of metadata and measurement data from the various streams within an NGB file.

The parser handles the complete workflow: 1. Opens and validates the NGB ZIP archive 2. Extracts metadata from stream_1.table 3. Processes measurement data from stream_2.table and stream_3.table 4. Returns structured data with embedded metadata

Example

parser = NGBParser() metadata, data_table = parser.parse("sample.ngb-ss3") print(f"Sample: {metadata.get('sample_name', 'Unknown')}") print(f"Data shape: {data_table.num_rows} x {data_table.num_columns}") Sample: Test Sample 1 Data shape: 2500 x 8

Advanced Configuration

config = PatternConfig() config.column_map["custom_id"] = "custom_column" parser = NGBParser(config)

Attributes:

Name Type Description
config

Pattern configuration for parsing

markers

Binary markers for data identification

binary_parser

Low-level binary parsing engine

metadata_extractor

Metadata extraction engine

data_processor

Data stream processing engine

Thread Safety

This parser is not thread-safe. Create separate instances for concurrent parsing operations.

Source code in src/pyngb/core/parser.py
class NGBParser:
    """Main parser for NETZSCH STA NGB files with enhanced error handling.

    This is the primary interface for parsing NETZSCH NGB files. It orchestrates
    the parsing of metadata and measurement data from the various streams within
    an NGB file.

    The parser handles the complete workflow:
    1. Opens and validates the NGB ZIP archive
    2. Extracts metadata from stream_1.table
    3. Processes measurement data from stream_2.table and stream_3.table
    4. Returns structured data with embedded metadata

    Example:
        >>> parser = NGBParser()
        >>> metadata, data_table = parser.parse("sample.ngb-ss3")
        >>> print(f"Sample: {metadata.get('sample_name', 'Unknown')}")
        >>> print(f"Data shape: {data_table.num_rows} x {data_table.num_columns}")
        Sample: Test Sample 1
        Data shape: 2500 x 8

    Advanced Configuration:
        >>> config = PatternConfig()
        >>> config.column_map["custom_id"] = "custom_column"
        >>> parser = NGBParser(config)

    Attributes:
        config: Pattern configuration for parsing
        markers: Binary markers for data identification
        binary_parser: Low-level binary parsing engine
        metadata_extractor: Metadata extraction engine
        data_processor: Data stream processing engine

    Thread Safety:
        This parser is not thread-safe. Create separate instances for
        concurrent parsing operations.
    """

    def __init__(self, config: PatternConfig | None = None) -> None:
        self.config = config or PatternConfig()
        self.markers = BinaryMarkers()
        self.binary_parser = BinaryParser(self.markers)
        self.metadata_extractor = MetadataExtractor(self.config, self.binary_parser)
        self.data_processor = DataStreamProcessor(self.config, self.binary_parser)

    def validate_ngb_structure(self, zip_file: zipfile.ZipFile) -> list[str]:
        """Validate that the ZIP file has the expected NGB structure.

        Args:
            zip_file: Open ZIP file to validate

        Returns:
            List of available streams

        Raises:
            NGBStreamNotFoundError: If required streams are missing
        """
        available_streams = zip_file.namelist()
        logger.debug(f"Available streams: {available_streams}")

        # Check for required streams
        # stream_1 and stream_2 are required for basic operation; stream_3 is optional
        required_streams = ["Streams/stream_1.table", "Streams/stream_2.table"]
        missing_streams = [
            stream for stream in required_streams if stream not in available_streams
        ]

        if missing_streams:
            raise NGBStreamNotFoundError(f"Missing required streams: {missing_streams}")

        return available_streams

    def parse(self, path: str) -> tuple[FileMetadata, pa.Table]:
        """Parse NGB file and return metadata and Arrow table.

        Opens an NGB file, extracts all metadata and measurement data,
        and returns them as separate objects for flexible use.

        Args:
            path: Path to the .ngb-ss3 file to parse

        Returns:
            Tuple of (metadata_dict, pyarrow_table) where:
            - metadata_dict contains instrument settings, sample info, etc.
            - pyarrow_table contains the measurement data columns

        Raises:
            FileNotFoundError: If the specified file doesn't exist
            NGBStreamNotFoundError: If required streams are missing
            NGBCorruptedFileError: If file structure is invalid
            zipfile.BadZipFile: If file is not a valid ZIP archive

        Example:
            >>> metadata, data = parser.parse("experiment.ngb-ss3")
            >>> print(f"Instrument: {metadata.get('instrument', 'Unknown')}")
            >>> print(f"Columns: {data.column_names}")
            >>> print(f"Temperature range: {data['sample_temperature'].min()} to {data['sample_temperature'].max()}")
            Instrument: NETZSCH STA 449 F3 Jupiter
            Columns: ['time', 'sample_temperature', 'mass', 'dsc_signal', 'purge_flow']
            Temperature range: 25.0 to 800.0
        """
        path_obj = Path(path)
        if not path_obj.exists():
            raise FileNotFoundError(f"File not found: {path}")

        metadata: FileMetadata = {}
        data_df = pl.DataFrame()

        try:
            with zipfile.ZipFile(path, "r") as z:
                # Validate NGB file structure
                available_streams = self.validate_ngb_structure(z)

                # stream_1: metadata
                with z.open("Streams/stream_1.table") as stream:
                    stream_data = stream.read()
                    tables = self.binary_parser.split_tables(stream_data)
                    metadata = self.metadata_extractor.extract_metadata(tables)

                # stream_2: primary data
                if "Streams/stream_2.table" in available_streams:
                    with z.open("Streams/stream_2.table") as stream:
                        stream_data = stream.read()
                        data_df = self.data_processor.process_stream_2(stream_data)

                # stream_3: additional data merged into existing df
                if "Streams/stream_3.table" in available_streams:
                    with z.open("Streams/stream_3.table") as stream:
                        stream_data = stream.read()
                        data_df = self.data_processor.process_stream_3(
                            stream_data, data_df
                        )

        except zipfile.BadZipFile as e:
            logger.error("Invalid ZIP archive: %s", e)
            raise
        except NGBStreamNotFoundError:
            # Re-raise our custom exceptions as-is
            raise
        except Exception as e:
            logger.error("Failed to parse NGB file: %s", e)
            raise

        # Convert to PyArrow at API boundary for cross-language compatibility
        # and metadata embedding. This is the single conversion point from
        # internal Polars processing to external PyArrow interface.
        return metadata, data_df.to_arrow()

Functions

__init__(config=None)
Source code in src/pyngb/core/parser.py
def __init__(self, config: PatternConfig | None = None) -> None:
    self.config = config or PatternConfig()
    self.markers = BinaryMarkers()
    self.binary_parser = BinaryParser(self.markers)
    self.metadata_extractor = MetadataExtractor(self.config, self.binary_parser)
    self.data_processor = DataStreamProcessor(self.config, self.binary_parser)
parse(path)

Parse NGB file and return metadata and Arrow table.

Opens an NGB file, extracts all metadata and measurement data, and returns them as separate objects for flexible use.

Parameters:

Name Type Description Default
path str

Path to the .ngb-ss3 file to parse

required

Returns:

Type Description
FileMetadata

Tuple of (metadata_dict, pyarrow_table) where:

Table
  • metadata_dict contains instrument settings, sample info, etc.
tuple[FileMetadata, Table]
  • pyarrow_table contains the measurement data columns

Raises:

Type Description
FileNotFoundError

If the specified file doesn't exist

NGBStreamNotFoundError

If required streams are missing

NGBCorruptedFileError

If file structure is invalid

BadZipFile

If file is not a valid ZIP archive

Example

metadata, data = parser.parse("experiment.ngb-ss3") print(f"Instrument: {metadata.get('instrument', 'Unknown')}") print(f"Columns: {data.column_names}") print(f"Temperature range: {data['sample_temperature'].min()} to {data['sample_temperature'].max()}") Instrument: NETZSCH STA 449 F3 Jupiter Columns: ['time', 'sample_temperature', 'mass', 'dsc_signal', 'purge_flow'] Temperature range: 25.0 to 800.0

Source code in src/pyngb/core/parser.py
def parse(self, path: str) -> tuple[FileMetadata, pa.Table]:
    """Parse NGB file and return metadata and Arrow table.

    Opens an NGB file, extracts all metadata and measurement data,
    and returns them as separate objects for flexible use.

    Args:
        path: Path to the .ngb-ss3 file to parse

    Returns:
        Tuple of (metadata_dict, pyarrow_table) where:
        - metadata_dict contains instrument settings, sample info, etc.
        - pyarrow_table contains the measurement data columns

    Raises:
        FileNotFoundError: If the specified file doesn't exist
        NGBStreamNotFoundError: If required streams are missing
        NGBCorruptedFileError: If file structure is invalid
        zipfile.BadZipFile: If file is not a valid ZIP archive

    Example:
        >>> metadata, data = parser.parse("experiment.ngb-ss3")
        >>> print(f"Instrument: {metadata.get('instrument', 'Unknown')}")
        >>> print(f"Columns: {data.column_names}")
        >>> print(f"Temperature range: {data['sample_temperature'].min()} to {data['sample_temperature'].max()}")
        Instrument: NETZSCH STA 449 F3 Jupiter
        Columns: ['time', 'sample_temperature', 'mass', 'dsc_signal', 'purge_flow']
        Temperature range: 25.0 to 800.0
    """
    path_obj = Path(path)
    if not path_obj.exists():
        raise FileNotFoundError(f"File not found: {path}")

    metadata: FileMetadata = {}
    data_df = pl.DataFrame()

    try:
        with zipfile.ZipFile(path, "r") as z:
            # Validate NGB file structure
            available_streams = self.validate_ngb_structure(z)

            # stream_1: metadata
            with z.open("Streams/stream_1.table") as stream:
                stream_data = stream.read()
                tables = self.binary_parser.split_tables(stream_data)
                metadata = self.metadata_extractor.extract_metadata(tables)

            # stream_2: primary data
            if "Streams/stream_2.table" in available_streams:
                with z.open("Streams/stream_2.table") as stream:
                    stream_data = stream.read()
                    data_df = self.data_processor.process_stream_2(stream_data)

            # stream_3: additional data merged into existing df
            if "Streams/stream_3.table" in available_streams:
                with z.open("Streams/stream_3.table") as stream:
                    stream_data = stream.read()
                    data_df = self.data_processor.process_stream_3(
                        stream_data, data_df
                    )

    except zipfile.BadZipFile as e:
        logger.error("Invalid ZIP archive: %s", e)
        raise
    except NGBStreamNotFoundError:
        # Re-raise our custom exceptions as-is
        raise
    except Exception as e:
        logger.error("Failed to parse NGB file: %s", e)
        raise

    # Convert to PyArrow at API boundary for cross-language compatibility
    # and metadata embedding. This is the single conversion point from
    # internal Polars processing to external PyArrow interface.
    return metadata, data_df.to_arrow()

Advanced Parser Usage

from pyngb import NGBParser, PatternConfig

# Custom configuration
config = PatternConfig()
config.column_map["custom_id"] = "custom_column"
config.metadata_patterns["custom_field"] = (b"\x99\x99", b"\x88\x88")

# Create parser with custom config
parser = NGBParser(config)
metadata, data = parser.parse("sample.ngb-ss3")

Configuration Classes

PatternConfig

pyngb.PatternConfig dataclass

Configuration for metadata and column patterns.

This class defines the binary patterns used to locate and extract specific metadata fields, temperature program data, calibration constants, and data columns from NGB files.

The patterns are defined as tuples of (category_bytes, field_bytes) that are used to construct regex patterns for finding specific data fields in the binary stream.

Attributes:

Name Type Description
metadata_patterns dict[str, tuple[bytes, bytes]]

Maps field names to (category, field) byte patterns

temp_prog_patterns dict[str, bytes]

Patterns for temperature program extraction

cal_constants_patterns dict[str, bytes]

Patterns for calibration constant extraction

column_map dict[str, str]

Maps hex column IDs to human-readable column names

Example

config = PatternConfig() config.column_map["8d"] = "time" config.metadata_patterns["sample_id"] = (b"\x30\x75", b"\x98\x08")

Note

Modifying these patterns may break compatibility with certain NGB file versions. Use caution when customizing.

Source code in src/pyngb/constants.py
@dataclass
class PatternConfig:
    """Configuration for metadata and column patterns.

    This class defines the binary patterns used to locate and extract
    specific metadata fields, temperature program data, calibration constants,
    and data columns from NGB files.

    The patterns are defined as tuples of (category_bytes, field_bytes) that
    are used to construct regex patterns for finding specific data fields
    in the binary stream.

    Attributes:
        metadata_patterns: Maps field names to (category, field) byte patterns
        temp_prog_patterns: Patterns for temperature program extraction
        cal_constants_patterns: Patterns for calibration constant extraction
        column_map: Maps hex column IDs to human-readable column names

    Example:
        >>> config = PatternConfig()
        >>> config.column_map["8d"] = "time"
        >>> config.metadata_patterns["sample_id"] = (b"\\x30\\x75", b"\\x98\\x08")

    Note:
        Modifying these patterns may break compatibility with certain
        NGB file versions. Use caution when customizing.
    """

    metadata_patterns: dict[str, tuple[bytes, bytes]] = field(
        default_factory=lambda: {
            # Core metadata
            "instrument": (rb"\x75\x17", rb"\x59\x10"),
            "project": (rb"\x72\x17", rb"\x3c\x08"),
            "date_performed": (rb"\x72\x17", rb"\x3e\x08"),
            "lab": (rb"\x72\x17", rb"\x34\x08"),
            "operator": (rb"\x72\x17", rb"\x35\x08"),
            "crucible_type": (rb"\x7e\x17", rb"\x40\x08"),
            "comment": (rb"\x72\x17", rb"\x3d\x08"),
            "furnace_type": (rb"\x7a\x17", rb"\x40\x08"),
            "carrier_type": (rb"\x79\x17", rb"\x40\x08"),
            # Sample descriptors
            "sample_id": (rb"\x30\x75", rb"\x98\x08"),
            "sample_name": (rb"\x30\x75", rb"\x40\x08"),
            # Mass fields: crucible_mass pattern ALSO matches reference crucible mass (structural disambiguation required)
            "sample_mass": (rb"\x30\x75", rb"\x9e\x0c"),
            "crucible_mass": (rb"\x7e\x17", rb"\x9e\x0c"),
            # Additional
            "material": (rb"\x30\x75", rb"\x62\x09"),
            # Note: MFC fields are handled separately in _extract_mfc_metadata
            # to avoid conflicts with the general pattern matching
        }
    )
    temp_prog_patterns: dict[str, bytes] = field(
        default_factory=lambda: {
            "stage_type": b"\x3f\x08",
            "temperature": b"\x17\x0e",
            "heating_rate": b"\x13\x0e",
            "acquisition_rate": b"\x14\x0e",
            "time": b"\x15\x0e",
        }
    )

    # Temperature program binary structure constants
    temp_prog_type_separator: bytes = b"\x00\x00\x01\x00\x00\x00"
    temp_prog_data_type: bytes = b"\x0c"
    temp_prog_field_separator: bytes = b"\x00\x17\xfc\xff\xff"
    temp_prog_value_prefix: bytes = b"\x04\x80\x01"
    cal_constants_patterns: dict[str, bytes] = field(
        default_factory=lambda: {
            f"p{i}": bytes([0x4F + i, 0x04]) if i < 5 else b"\xc3\x04" for i in range(6)
        }
    )
    column_map: dict[str, str] = field(
        default_factory=lambda: {
            "8d": "time",
            "8e": "sample_temperature",
            "9c": "dsc_signal",
            "9d": "purge_flow_1",
            "9e": "purge_flow_2",
            "90": "protective_flow",
            "87": "mass",
            "30": "furnace_temperature",
            "32": "furnace_power",
            "33": "h_foil_temperature",
            "34": "uc_module",
            "35": "environmental_pressure",
            "36": "environmental_acceleration_x",
            "37": "environmental_acceleration_y",
            "38": "environmental_acceleration_z",
        }
    )

BinaryMarkers

pyngb.BinaryMarkers dataclass

Binary markers for parsing NGB files.

These byte sequences mark important boundaries and structures within the binary NGB file format. They are used to locate data sections, separate tables, and identify data types.

Attributes:

Name Type Description
END_FIELD bytes

Marks the end of a data field

TYPE_PREFIX bytes

Precedes data type identifier

TYPE_SEPARATOR bytes

Separates type from value data

END_TABLE bytes

Marks the end of a table

TABLE_SEPARATOR bytes

Separates individual tables in a stream

START_DATA bytes

Marks the beginning of data payload

END_DATA bytes

Marks the end of data payload

Source code in src/pyngb/constants.py
@dataclass(frozen=True)
class BinaryMarkers:
    """Binary markers for parsing NGB files.

    These byte sequences mark important boundaries and structures within
    the binary NGB file format. They are used to locate data sections,
    separate tables, and identify data types.

    Attributes:
        END_FIELD: Marks the end of a data field
        TYPE_PREFIX: Precedes data type identifier
        TYPE_SEPARATOR: Separates type from value data
        END_TABLE: Marks the end of a table
        TABLE_SEPARATOR: Separates individual tables in a stream
        START_DATA: Marks the beginning of data payload
        END_DATA: Marks the end of data payload
    """

    END_FIELD: bytes = b"\x01\x00\x00\x00\x02\x00\x01\x00\x00"
    TYPE_PREFIX: bytes = b"\x17\xfc\xff\xff"
    TYPE_SEPARATOR: bytes = b"\x80\x01"
    END_TABLE: bytes = b"\x18\xfc\xff\xff\x03"
    TABLE_SEPARATOR: bytes = b"\x00\x00\x01\x00\x00\x00\x0c\x00\x17\xfc\xff\xff\x1a\x80\x01\x01\x80\x02\x00\x00"
    START_DATA: bytes = b"\xa0\x01"
    END_DATA: bytes = (
        b"\x01\x00\x00\x00\x02\x00\x01\x00\x00\x00\x03\x00\x18\xfc\xff\xff\x03\x80\x01"
    )

Configuration Examples

from pyngb.constants import PatternConfig, BinaryMarkers

# Examine default configuration
config = PatternConfig()
print("Column mappings:", config.column_map)
print("Metadata patterns:", list(config.metadata_patterns.keys()))

# Binary markers for advanced use
markers = BinaryMarkers()
print("Start data marker:", markers.START_DATA)
print("End data marker:", markers.END_DATA)

Data Types and Enums

DataType Enum

pyngb.DataType

Bases: Enum

Binary data type identifiers used in NGB files.

These constants map to the binary identifiers used in NETZSCH NGB files to specify the data type of values stored in the binary format.

Examples:

>>> DataType.FLOAT64.value
b'\x05'
>>> data_type == DataType.FLOAT32.value
True
Source code in src/pyngb/constants.py
class DataType(Enum):
    """Binary data type identifiers used in NGB files.

    These constants map to the binary identifiers used in NETZSCH NGB files
    to specify the data type of values stored in the binary format.

    Examples:
        >>> DataType.FLOAT64.value
        b'\\x05'
        >>> data_type == DataType.FLOAT32.value
        True
    """

    INT32 = b"\x03"  # 32-bit signed integer (little-endian)
    FLOAT32 = b"\x04"  # 32-bit IEEE 754 float (little-endian)
    FLOAT64 = b"\x05"  # 64-bit IEEE 754 double (little-endian)
    STRING = b"\x1f"  # UTF-8 string with 4-byte length prefix

FileMetadata Type

pyngb.FileMetadata

Bases: TypedDict

Type definition for file metadata dictionary.

Mass-related fields grouped together after core identifying fields. Reference masses are structurally derived; crucible_mass pattern also matches reference_crucible_mass and is disambiguated using signature fragments (see SAMPLE_CRUCIBLE_SIG_FRAGMENT / REF_CRUCIBLE_SIG_FRAGMENT).

Source code in src/pyngb/constants.py
class FileMetadata(TypedDict, total=False):
    """Type definition for file metadata dictionary.

    Mass-related fields grouped together after core identifying fields. Reference masses
    are structurally derived; crucible_mass pattern also matches reference_crucible_mass and
    is disambiguated using signature fragments (see SAMPLE_CRUCIBLE_SIG_FRAGMENT / REF_CRUCIBLE_SIG_FRAGMENT).
    """

    instrument: str
    project: str
    date_performed: str
    lab: str
    operator: str
    crucible_type: str
    comment: str
    furnace_type: str
    carrier_type: str
    sample_id: str
    sample_name: str
    # Mass group
    sample_mass: float
    crucible_mass: float
    reference_mass: float
    reference_crucible_mass: float
    # Other descriptors
    material: str
    application_version: str
    licensed_to: str
    temperature_program: dict[str, dict[str, Any]]
    calibration_constants: dict[str, float]
    file_hash: dict[str, str]
    # MFC (Mass Flow Controller) metadata
    purge_1_mfc_gas: str
    purge_2_mfc_gas: str
    protective_mfc_gas: str
    purge_1_mfc_range: float
    purge_2_mfc_range: float
    protective_mfc_range: float
    # Control parameters (PID settings)
    furnace_xp: float
    furnace_tn: float
    furnace_tv: float
    sample_xp: float
    sample_tn: float
    sample_tv: float

Data Type Examples

from pyngb.constants import DataType, FileMetadata

# Data type identifiers
print("Float64 identifier:", DataType.FLOAT64.value)
print("String identifier:", DataType.STRING.value)

# Metadata structure (TypedDict)
metadata_example: FileMetadata = {
    "instrument": "NETZSCH STA 449 F3",
    "sample_name": "Test Sample",
    "sample_mass": 15.5,
    "operator": "Lab Technician"
}

Exception Hierarchy

Base Exception

pyngb.NGBParseError

Bases: Exception

Base exception for NGB file parsing errors.

Source code in src/pyngb/exceptions.py
class NGBParseError(Exception):
    """Base exception for NGB file parsing errors."""

Specific Exceptions

pyngb.NGBCorruptedFileError

Bases: NGBParseError

Raised when NGB file is corrupted or has invalid structure.

Source code in src/pyngb/exceptions.py
class NGBCorruptedFileError(NGBParseError):
    """Raised when NGB file is corrupted or has invalid structure."""

pyngb.NGBUnsupportedVersionError

Bases: NGBParseError

Raised when NGB file version is not supported.

Source code in src/pyngb/exceptions.py
class NGBUnsupportedVersionError(NGBParseError):
    """Raised when NGB file version is not supported."""

pyngb.NGBDataTypeError

Bases: NGBParseError

Raised when encountering unknown or invalid data type.

Source code in src/pyngb/exceptions.py
class NGBDataTypeError(NGBParseError):
    """Raised when encountering unknown or invalid data type."""

pyngb.NGBStreamNotFoundError

Bases: NGBParseError

Raised when expected stream is not found in NGB file.

Source code in src/pyngb/exceptions.py
class NGBStreamNotFoundError(NGBParseError):
    """Raised when expected stream is not found in NGB file."""

Exception Handling Examples

from pyngb import read_ngb, NGBParseError, NGBCorruptedFileError

try:
    table = read_ngb("sample.ngb-ss3")
except NGBCorruptedFileError:
    print("File appears to be corrupted")
except NGBParseError as e:
    print(f"Parsing error: {e}")
except FileNotFoundError:
    print("File not found")

Internal Modules

Binary Parser Module

pyngb.binary.parser.BinaryParser

Handles binary data parsing operations with memory optimization.

This class provides low-level binary parsing functionality for NGB files, including table splitting, data extraction, and value parsing. It uses memory-efficient techniques like memoryview to minimize copying.

The parser maintains compiled regex patterns for performance and includes a pluggable data type registry for extensibility.

Example

parser = BinaryParser() tables = parser.split_tables(binary_stream_data) data = parser.extract_data_array(tables[0], DataType.FLOAT64.value) [1.0, 2.0, 3.0, ...]

Attributes:

Name Type Description
markers

Binary markers used for parsing

_compiled_patterns dict[str, Pattern[bytes]]

Cache of compiled regex patterns

_data_type_registry

Registry of data type handlers

Performance Notes
  • Uses memoryview to avoid unnecessary memory copies
  • Caches compiled regex patterns for repeated use
  • Leverages NumPy frombuffer for fast array parsing

Functions

parse_value(data_type, value) staticmethod

Parse binary value based on data type.

Parameters:

Name Type Description Default
data_type bytes

Data type identifier from DataType enum

required
value bytes

Binary data to parse

required

Returns:

Type Description
Any

Parsed value or None if parsing fails

Raises:

Type Description
ValueError

If data length doesn't match expected type size

split_tables(data)

Split binary data into tables using the known separator.

NGB streams contain multiple tables separated by a specific byte sequence. This method efficiently splits the stream into individual tables for further processing.

Parameters:

Name Type Description Default
data bytes

Binary data from an NGB stream

required

Returns:

Type Description
list[bytes]

List of binary table data chunks

Example

stream_data = load_stream_from_ngb() tables = parser.split_tables(stream_data) print(f"Found {len(tables)} tables") Found 15 tables

Note

If no separator is found, returns the entire data as a single table.

handle_corrupted_data(data, context='')

Handle corrupted or malformed data gracefully.

Parameters:

Name Type Description Default
data bytes

Potentially corrupted binary data

required
context str

Context information for logging

''

Returns:

Type Description
list[float]

Empty list for corrupted data

validate_data_integrity(table)

Validate that a table has proper START_DATA and END_DATA markers.

Parameters:

Name Type Description Default
table bytes

Binary table data to validate

required

Returns:

Type Description
bool

True if table has valid structure, False otherwise

extract_data_array(table, data_type)

Extract array of numerical data with memory optimization.

Extracts arrays of floating-point data from binary tables using efficient memory operations and NumPy for fast conversion.

Parameters:

Name Type Description Default
table bytes

Binary table data containing the array

required
data_type bytes

Data type identifier (from DataType enum)

required

Returns:

Type Description
list[float]

List of floating-point values, empty list if no data found

Raises:

Type Description
NGBDataTypeError

If data type is not supported

Example

table_data = get_table_from_stream() values = parser.extract_data_array(table_data, DataType.FLOAT64.value) print(f"Extracted {len(values)} data points") Extracted 1500 data points

Performance

Uses NumPy frombuffer which is 10-50x faster than struct.iter_unpack for large arrays.

Binary Handlers Module

pyngb.binary.handlers.DataTypeRegistry

Registry for data type handlers with pluggable architecture.

This registry manages a collection of data type handlers that can process different binary data formats found in NGB files. New handlers can be registered to extend support for additional data types.

The registry uses a chain-of-responsibility pattern to find the appropriate handler for each data type.

Example

registry = DataTypeRegistry() registry.parse_data(b'\x05', binary_data) # Uses Float64Handler [1.0, 2.0, 3.0]

Add custom handler

class CustomHandler: ... def can_handle(self, data_type): return data_type == b'\x06' ... def parse_data(self, data): return [42.0] registry.register(CustomHandler())

Attributes:

Name Type Description
_handlers list[DataTypeHandler]

List of registered data type handlers

Note

Handlers are checked in registration order. Register more specific handlers before more general ones.

Functions

register(handler)

Register a new data type handler.

parse_data(data_type, data)

Parse data using appropriate handler.

Parameters:

Name Type Description Default
data_type bytes

Binary data type identifier

required
data bytes | memoryview

Binary data to parse

required

Returns:

Type Description
list[float]

List of parsed float values

Raises:

Type Description
NGBDataTypeError

If no handler is found for the data type

pyngb.binary.handlers.Float64Handler

Handler for 64-bit IEEE 754 double precision floating point data.

This handler processes binary data containing arrays of 64-bit doubles stored in little-endian format. Uses NumPy's frombuffer for optimal performance.

Example

handler = Float64Handler() handler.can_handle(b'\x05') # DataType.FLOAT64.value True data = b'\x00\x00\x00\x00\x00\x00\xf0\x3f' # 1.0 as double handler.parse_data(data) [1.0]

pyngb.binary.handlers.Float32Handler

Handler for 32-bit IEEE 754 single precision floating point data.

This handler processes binary data containing arrays of 32-bit floats stored in little-endian format. Uses NumPy's frombuffer for optimal performance.

Example

handler = Float32Handler() handler.can_handle(b'\x04') # DataType.FLOAT32.value True data = b'\x00\x00\x80\x3f' # 1.0 as float handler.parse_data(data) [1.0]

pyngb.binary.handlers.Int32Handler

Handler for 32-bit signed integer data.

This handler processes binary data containing arrays of 32-bit integers stored in little-endian format. Uses NumPy's frombuffer for optimal performance.

Example

handler = Int32Handler() handler.can_handle(b'\x03') # DataType.INT32.value True data = b'\x2a\x00\x00\x00' # 42 as little-endian int32 handler.parse_data(data) [42.0]

Metadata Extraction Module

pyngb.extractors.metadata.MetadataExtractor

Extracts metadata from NGB tables with improved type safety.

Functions

extract_field(table, field_name)

Extract a single metadata field (value only).

extract_metadata(tables)

Extract all metadata from tables with type safety.

Stream Processing Module

pyngb.extractors.streams.DataStreamProcessor

Processes data streams from NGB files with optimized parsing.

Functions

process_stream_2(stream_data)

Process primary data stream (stream_2).

process_stream_3(stream_data, existing_df)

Process secondary data stream (stream_3).

Utility Functions

File Utilities

pyngb.util.get_hash(path, max_size_mb=1000)

Generate file hash for metadata.

Parameters:

Name Type Description Default
path str

Path to the file to hash

required
max_size_mb int

Maximum file size in MB to hash (default: 1000MB)

1000

Returns:

Type Description
Optional[str]

BLAKE2b hash as hex string, or None if hashing fails

Raises:

Type Description
OSError

If there are file system related errors

PermissionError

If file access is denied

Source code in src/pyngb/util.py
def get_hash(path: str, max_size_mb: int = 1000) -> Optional[str]:
    """Generate file hash for metadata.

    Args:
        path: Path to the file to hash
        max_size_mb: Maximum file size in MB to hash (default: 1000MB)

    Returns:
        BLAKE2b hash as hex string, or None if hashing fails

    Raises:
        OSError: If there are file system related errors
        PermissionError: If file access is denied
    """
    try:
        # Pre-flight: ensure blake2b constructor is callable. If a hashing backend
        # failure occurs (e.g., during unit tests that patch blake2b to raise),
        # surface it as an unexpected error per contract.
        try:
            _ = hashlib.blake2b()  # type: ignore[call-arg]
        except Exception as e:  # pragma: no cover - exercised in tests via patch
            logger.error(
                "Unexpected error while generating hash for file %s: %s", path, e
            )
            return None
        # Check file size before hashing
        file_size = Path(path).stat().st_size
        max_size_bytes = max_size_mb * 1024 * 1024

        if file_size > max_size_bytes:
            logger.warning(
                "File too large for hashing (%d MB > %d MB): %s",
                file_size // (1024 * 1024),
                max_size_mb,
                path,
            )
            return None

        with open(path, "rb") as file:
            return hashlib.blake2b(file.read()).hexdigest()
    except FileNotFoundError:
        logger.warning("File not found while generating hash: %s", path)
        return None
    except PermissionError:
        logger.error("Permission denied while generating hash for file: %s", path)
        return None
    except OSError as e:
        logger.error("OS error while generating hash for file %s: %s", path, e)
        return None
    except Exception as e:
        logger.error("Unexpected error while generating hash for file %s: %s", path, e)
        return None

pyngb.util.set_metadata(tbl, col_meta={}, tbl_meta={})

Store table- and column-level metadata as json-encoded byte strings.

Provided by: https://stackoverflow.com/a/69553667/25195764

Table-level metadata is stored in the table's schema. Column-level metadata is stored in the table columns' fields.

To update the metadata, first new fields are created for all columns. Next a schema is created using the new fields and updated table metadata. Finally a new table is created by replacing the old one's schema, but without copying any data.

Parameters:

Name Type Description Default
tbl Table

The table to store metadata in

required
col_meta dict[str, Any]

A json-serializable dictionary with column metadata in the form { 'column_1': {'some': 'data', 'value': 1}, 'column_2': {'more': 'stuff', 'values': [1,2,3]} }

{}
tbl_meta dict[str, Any]

A json-serializable dictionary with table-level metadata.

{}

Returns:

Type Description
Table

pyarrow.Table: The table with updated metadata

Source code in src/pyngb/util.py
def set_metadata(
    tbl, col_meta: dict[str, Any] = {}, tbl_meta: dict[str, Any] = {}
) -> pa.Table:
    """Store table- and column-level metadata as json-encoded byte strings.

    Provided by: https://stackoverflow.com/a/69553667/25195764

    Table-level metadata is stored in the table's schema.
    Column-level metadata is stored in the table columns' fields.

    To update the metadata, first new fields are created for all columns.
    Next a schema is created using the new fields and updated table metadata.
    Finally a new table is created by replacing the old one's schema, but
    without copying any data.

    Args:
        tbl (pyarrow.Table): The table to store metadata in
        col_meta: A json-serializable dictionary with column metadata in the form
            {
                'column_1': {'some': 'data', 'value': 1},
                'column_2': {'more': 'stuff', 'values': [1,2,3]}
            }
        tbl_meta: A json-serializable dictionary with table-level metadata.

    Returns:
        pyarrow.Table: The table with updated metadata
    """
    # Create updated column fields with new metadata
    if col_meta or tbl_meta:
        fields = []
        for col in tbl.schema.names:
            if col in col_meta:
                # Get updated column metadata
                metadata = tbl.field(col).metadata or {}
                for k, v in col_meta[col].items():
                    if isinstance(v, bytes):
                        metadata[k] = v
                    elif isinstance(v, str):
                        metadata[k] = v.encode("utf-8")
                    else:
                        metadata[k] = json.dumps(v).encode("utf-8")
                # Update field with updated metadata
                fields.append(tbl.field(col).with_metadata(metadata))
            else:
                fields.append(tbl.field(col))

        # Get updated table metadata
        tbl_metadata = tbl.schema.metadata or {}
        for k, v in tbl_meta.items():
            if isinstance(v, bytes):
                tbl_metadata[k] = v
            elif isinstance(v, str):
                tbl_metadata[k] = v.encode("utf-8")
            else:
                tbl_metadata[k] = json.dumps(v).encode("utf-8")

        # Create new schema with updated field metadata and updated table metadata
        schema = pa.schema(fields, metadata=tbl_metadata)

        # With updated schema build new table (shouldn't copy data)
        # tbl = pa.Table.from_batches(tbl.to_batches(), schema)
        tbl = tbl.cast(schema)

    return tbl

Utility Examples

from pyngb.util import get_hash, set_metadata
import pyarrow as pa

# Generate file hash
file_hash = get_hash("sample.ngb-ss3")
print(f"File hash: {file_hash}")

# Add metadata to PyArrow table
table = pa.table({"data": [1, 2, 3]})
table_with_meta = set_metadata(
    table,
    tbl_meta={"source": "experiment_1", "version": "1.0"}
)

Advanced Usage Patterns

Custom Data Type Handlers

from pyngb.binary.handlers import DataTypeHandler, DataTypeRegistry
import struct

class CustomFloatHandler(DataTypeHandler):
    def can_handle(self, data_type: bytes) -> bool:
        return data_type == b'\x99'  # Custom type identifier

    def parse(self, data: bytes) -> list[float]:
        # Parse as 32-bit floats
        return [struct.unpack('<f', data[i:i+4])[0]
                for i in range(0, len(data), 4)]

# Register custom handler
registry = DataTypeRegistry()
registry.register(CustomFloatHandler())

Custom Validation Rules

from pyngb.validation import QualityChecker, ValidationResult

class CustomQualityChecker(QualityChecker):
    def domain_specific_validation(self):
        """Add domain-specific validation rules."""
        result = ValidationResult()

        # Custom rule: Check for reasonable mass loss
        if "mass" in self.data.columns:
            mass_col = self.data["mass"]
            initial_mass = mass_col.max()
            final_mass = mass_col.min()
            mass_loss_percent = (initial_mass - final_mass) / initial_mass * 100

            if mass_loss_percent > 50:
                result.add_warning(f"High mass loss: {mass_loss_percent:.1f}%")
            elif mass_loss_percent < 0:
                result.add_error("Negative mass loss detected")
            else:
                result.add_pass(f"Normal mass loss: {mass_loss_percent:.1f}%")

        return result

Memory-Efficient Processing

from pyngb import read_ngb
import polars as pl

def process_large_file_efficiently(file_path: str, chunk_size: int = 10000):
    """Process large files in chunks to manage memory."""
    table = read_ngb(file_path)

    results = []
    for i in range(0, table.num_rows, chunk_size):
        # Process chunk
        chunk = table.slice(i, min(chunk_size, table.num_rows - i))
        df_chunk = pl.from_arrow(chunk)

        # Perform analysis on chunk
        chunk_result = df_chunk.select([
            pl.col("time").mean().alias("avg_time"),
            pl.col("sample_temperature").mean().alias("avg_temp")
        ])

        results.append(chunk_result)

    # Combine results
    final_result = pl.concat(results)
    return final_result

Performance Considerations

Best Practices

  1. Use PyArrow Tables: More memory-efficient than Pandas DataFrames
  2. Batch Processing: Process multiple files in parallel when possible
  3. Chunk Large Files: Use slicing for very large datasets
  4. Cache Metadata: Extract metadata once and reuse
  5. Choose Appropriate Formats: Parquet for storage, CSV for sharing
  6. Optimize Conversions (v0.0.2+): Pass Polars DataFrames directly to validation functions

Optimized Data Processing (v0.0.2+)

import polars as pl
from pyngb import read_ngb
from pyngb.validation import validate_sta_data, check_temperature_profile

# Efficient workflow with minimal conversions
table = read_ngb("sample.ngb-ss3")
df = pl.from_arrow(table)  # Single conversion

# All operations use the DataFrame directly (no additional conversions)
issues = validate_sta_data(df)           # Zero conversion overhead
temp_analysis = check_temperature_profile(df)  # Zero conversion overhead

# Previous approach (pre-v0.0.2) required multiple conversions:
# validate_sta_data(table)  # Internal PyArrow → Polars conversion
# check_temperature_profile(table)  # Another PyArrow → Polars conversion

Memory Management

import gc
from pyngb import read_ngb

def memory_conscious_processing(files: list[str]):
    """Process files with explicit memory management."""
    for file_path in files:
        # Load and process
        table = read_ngb(file_path)

        # Process immediately
        process_table(table)

        # Explicitly delete reference
        del table

        # Force garbage collection periodically
        gc.collect()

Parallel Processing

from concurrent.futures import ProcessPoolExecutor
from pyngb import read_ngb

def parallel_file_processing(files: list[str], max_workers: int = 4):
    """Process files in parallel across multiple processes."""
    def process_single_file(file_path: str):
        table = read_ngb(file_path)
        # Perform processing
        return {"file": file_path, "rows": table.num_rows}

    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        results = list(executor.map(process_single_file, files))

    return results

Error Handling Patterns

Robust File Processing

from pyngb import read_ngb, NGBParseError
import logging

def robust_file_processing(files: list[str]):
    """Process files with comprehensive error handling."""
    results = []

    for file_path in files:
        try:
            table = read_ngb(file_path)
            results.append({
                "file": file_path,
                "status": "success",
                "rows": table.num_rows,
                "columns": table.num_columns
            })

        except NGBParseError as e:
            logging.error(f"Parse error in {file_path}: {e}")
            results.append({
                "file": file_path,
                "status": "parse_error",
                "error": str(e)
            })

        except FileNotFoundError:
            logging.error(f"File not found: {file_path}")
            results.append({
                "file": file_path,
                "status": "not_found"
            })

        except Exception as e:
            logging.error(f"Unexpected error in {file_path}: {e}")
            results.append({
                "file": file_path,
                "status": "error",
                "error": str(e)
            })

    return results

Command Line Interface

pyngb provides a comprehensive CLI for data processing and baseline subtraction:

Basic Usage

python -m pyngb input.ngb-ss3 [options]

Arguments

  • input: Path to the input NGB file (required)
  • -o, --output: Output directory (default: current directory)
  • -f, --format: Output format: parquet, csv, or all (default: parquet)
  • -v, --verbose: Enable verbose logging
  • -b, --baseline: Path to baseline file for baseline subtraction
  • --dynamic-axis: Axis for dynamic segment alignment: time, sample_temperature, or furnace_temperature (default: sample_temperature)

Examples

# Basic conversion
python -m pyngb sample.ngb-ss3

# CSV output with verbose logging
python -m pyngb sample.ngb-ss3 -f csv -v

# Baseline subtraction with default settings
python -m pyngb sample.ngb-ss3 -b baseline.ngb-bs3

# Baseline subtraction with time axis alignment
python -m pyngb sample.ngb-ss3 -b baseline.ngb-bs3 --dynamic-axis time

# All formats with custom output directory
python -m pyngb sample.ngb-ss3 -b baseline.ngb-bs3 -f all -o ./results/

Output Files

  • Without baseline: {input_name}.{format}
  • With baseline: {input_name}_baseline_subtracted.{format}

For more examples and detailed usage patterns, see the Quick Start Guide and Development Guide.