Skip to content

Core Functions

API for core operations.

pyngb.read_ngb(path, *, return_metadata=False, baseline_file=None, dynamic_axis='sample_temperature')

read_ngb(
    path: str,
    *,
    return_metadata: Literal[False] = False,
    baseline_file: None = None,
    dynamic_axis: str = "time",
) -> pa.Table
read_ngb(
    path: str,
    *,
    return_metadata: Literal[True],
    baseline_file: None = None,
    dynamic_axis: str = "time",
) -> tuple[FileMetadata, pa.Table]
read_ngb(
    path: str,
    *,
    return_metadata: Literal[False] = False,
    baseline_file: str,
    dynamic_axis: str = "time",
) -> pa.Table
read_ngb(
    path: str,
    *,
    return_metadata: Literal[True],
    baseline_file: str,
    dynamic_axis: str = "time",
) -> tuple[FileMetadata, pa.Table]

Read NETZSCH NGB file data with optional baseline subtraction.

This is the primary function for loading NGB files. By default, it returns a PyArrow table with embedded metadata. For direct metadata access, use return_metadata=True. When baseline_file is provided, baseline subtraction is performed automatically.

Parameters

path : str Path to the NGB file (.ngb-ss3 or similar extension). Supports absolute and relative paths. return_metadata : bool, default False If False (default), return PyArrow table with embedded metadata. If True, return (metadata, data) tuple. baseline_file : str or None, default None Path to baseline file (.ngb-bs3) for baseline subtraction. If provided, performs automatic baseline subtraction. The baseline file must have an identical temperature program to the sample file. dynamic_axis : str, default "sample_temperature" Axis to use for dynamic segment alignment in baseline subtraction. Options: "time", "sample_temperature", "furnace_temperature"

Returns

pa.Table or tuple[FileMetadata, pa.Table] - If return_metadata=False: PyArrow table with embedded metadata - If return_metadata=True: (metadata dict, PyArrow table) tuple - If baseline_file provided: baseline-subtracted data

Raises

FileNotFoundError If the specified file does not exist NGBStreamNotFoundError If required data streams are missing from the NGB file NGBCorruptedFileError If the file structure is invalid or corrupted zipfile.BadZipFile If the file is not a valid ZIP archive

Examples

Basic usage (recommended for most users):

from pyngb import read_ngb import polars as pl

Load NGB file

data = read_ngb("experiment.ngb-ss3")

Convert to DataFrame for analysis

df = pl.from_arrow(data) print(f"Shape: {df.height} rows x {df.width} columns") Shape: 2500 rows x 8 columns

Access embedded metadata

import json metadata = json.loads(data.schema.metadata[b'file_metadata']) print(f"Sample: {metadata['sample_name']}") print(f"Instrument: {metadata['instrument']}") Sample: Polymer Sample A Instrument: NETZSCH STA 449 F3 Jupiter

Advanced usage (for metadata-heavy workflows):

Get metadata and data separately

metadata, data = read_ngb("experiment.ngb-ss3", return_metadata=True)

Work with metadata directly

print(f"Operator: {metadata.get('operator', 'Unknown')}") print(f"Sample mass: {metadata.get('sample_mass', 0)} mg") print(f"Data points: {data.num_rows}") Operator: Jane Smith Sample mass: 15.2 mg Data points: 2500

Use metadata for data processing

df = pl.from_arrow(data) initial_mass = metadata['sample_mass'] df = df.with_columns( ... (pl.col('mass') / initial_mass * 100).alias('mass_percent') ... )

Data analysis workflow:

Simple analysis

data = read_ngb("sample.ngb-ss3") df = pl.from_arrow(data)

Basic statistics

if "sample_temperature" in df.columns: ... temp_range = df["sample_temperature"].min(), df["sample_temperature"].max() ... print(f"Temperature range: {temp_range[0]:.1f} to {temp_range[1]:.1f} °C") Temperature range: 25.0 to 800.0 °C

Mass loss calculation

if "mass" in df.columns: ... mass_loss = (df["mass"].max() - df["mass"].min()) / df["mass"].max() * 100 ... print(f"Mass loss: {mass_loss:.2f}%") Mass loss: 12.3%

Performance Notes

  • Fast binary parsing with NumPy optimization
  • Memory-efficient processing with PyArrow
  • Typical parsing time: 0.1-10 seconds depending on file size
  • Includes file hash for integrity verification

See Also

NGBParser : Low-level parser for custom processing BatchProcessor : Process multiple files efficiently

Source code in src/pyngb/api/loaders.py
def read_ngb(
    path: str,
    *,
    return_metadata: bool = False,
    baseline_file: str | None = None,
    dynamic_axis: str = "sample_temperature",
) -> Union[pa.Table, tuple[FileMetadata, pa.Table]]:
    """
    Read NETZSCH NGB file data with optional baseline subtraction.

    This is the primary function for loading NGB files. By default, it returns
    a PyArrow table with embedded metadata. For direct metadata access, use return_metadata=True.
    When baseline_file is provided, baseline subtraction is performed automatically.

    Parameters
    ----------
    path : str
        Path to the NGB file (.ngb-ss3 or similar extension).
        Supports absolute and relative paths.
    return_metadata : bool, default False
        If False (default), return PyArrow table with embedded metadata.
        If True, return (metadata, data) tuple.
    baseline_file : str or None, default None
        Path to baseline file (.ngb-bs3) for baseline subtraction.
        If provided, performs automatic baseline subtraction. The baseline file
        must have an identical temperature program to the sample file.
    dynamic_axis : str, default "sample_temperature"
        Axis to use for dynamic segment alignment in baseline subtraction.
        Options: "time", "sample_temperature", "furnace_temperature"

    Returns
    -------
    pa.Table or tuple[FileMetadata, pa.Table]
        - If return_metadata=False: PyArrow table with embedded metadata
        - If return_metadata=True: (metadata dict, PyArrow table) tuple
        - If baseline_file provided: baseline-subtracted data

    Raises
    ------
    FileNotFoundError
        If the specified file does not exist
    NGBStreamNotFoundError
        If required data streams are missing from the NGB file
    NGBCorruptedFileError
        If the file structure is invalid or corrupted
    zipfile.BadZipFile
        If the file is not a valid ZIP archive

    Examples
    --------
    Basic usage (recommended for most users):

    >>> from pyngb import read_ngb
    >>> import polars as pl
    >>>
    >>> # Load NGB file
    >>> data = read_ngb("experiment.ngb-ss3")
    >>>
    >>> # Convert to DataFrame for analysis
    >>> df = pl.from_arrow(data)
    >>> print(f"Shape: {df.height} rows x {df.width} columns")
    Shape: 2500 rows x 8 columns

    >>> # Access embedded metadata
    >>> import json
    >>> metadata = json.loads(data.schema.metadata[b'file_metadata'])
    >>> print(f"Sample: {metadata['sample_name']}")
    >>> print(f"Instrument: {metadata['instrument']}")
    Sample: Polymer Sample A
    Instrument: NETZSCH STA 449 F3 Jupiter

    Advanced usage (for metadata-heavy workflows):

    >>> # Get metadata and data separately
    >>> metadata, data = read_ngb("experiment.ngb-ss3", return_metadata=True)
    >>>
    >>> # Work with metadata directly
    >>> print(f"Operator: {metadata.get('operator', 'Unknown')}")
    >>> print(f"Sample mass: {metadata.get('sample_mass', 0)} mg")
    >>> print(f"Data points: {data.num_rows}")
    Operator: Jane Smith
    Sample mass: 15.2 mg
    Data points: 2500

    >>> # Use metadata for data processing
    >>> df = pl.from_arrow(data)
    >>> initial_mass = metadata['sample_mass']
    >>> df = df.with_columns(
    ...     (pl.col('mass') / initial_mass * 100).alias('mass_percent')
    ... )

    Data analysis workflow:

    >>> # Simple analysis
    >>> data = read_ngb("sample.ngb-ss3")
    >>> df = pl.from_arrow(data)
    >>>
    >>> # Basic statistics
    >>> if "sample_temperature" in df.columns:
    ...     temp_range = df["sample_temperature"].min(), df["sample_temperature"].max()
    ...     print(f"Temperature range: {temp_range[0]:.1f} to {temp_range[1]:.1f} °C")
    Temperature range: 25.0 to 800.0 °C

    >>> # Mass loss calculation
    >>> if "mass" in df.columns:
    ...     mass_loss = (df["mass"].max() - df["mass"].min()) / df["mass"].max() * 100
    ...     print(f"Mass loss: {mass_loss:.2f}%")
    Mass loss: 12.3%

    Performance Notes
    -----------------
    - Fast binary parsing with NumPy optimization
    - Memory-efficient processing with PyArrow
    - Typical parsing time: 0.1-10 seconds depending on file size
    - Includes file hash for integrity verification

    See Also
    --------
    NGBParser : Low-level parser for custom processing
    BatchProcessor : Process multiple files efficiently
    """
    parser = NGBParser()
    metadata, data = parser.parse(path)

    # Add file hash to metadata
    file_hash = get_hash(path)
    if file_hash is not None:
        metadata["file_hash"] = {
            "file": Path(path).name,
            "method": "BLAKE2b",
            "hash": file_hash,
        }

    # Handle baseline subtraction if requested
    if baseline_file is not None:
        from ..baseline import subtract_baseline

        # Validate dynamic_axis
        valid_axes = ["time", "sample_temperature", "furnace_temperature"]
        if dynamic_axis not in valid_axes:
            raise ValueError(
                f"dynamic_axis must be one of {valid_axes}, got '{dynamic_axis}'"
            )

        # Perform baseline subtraction (this will load baseline metadata internally)
        subtracted_df = subtract_baseline(
            path,
            baseline_file,
            dynamic_axis,  # type: ignore  # We validated it above
        )

        # Convert back to PyArrow
        data = subtracted_df.to_arrow()

    if return_metadata:
        return metadata, data

    # Attach metadata to the Arrow table
    data = set_metadata(data, tbl_meta={"file_metadata": metadata, "type": "STA"})
    return data

pyngb.NGBParser

Main parser for NETZSCH STA NGB files with enhanced error handling.

This is the primary interface for parsing NETZSCH NGB files. It orchestrates the parsing of metadata and measurement data from the various streams within an NGB file.

The parser handles the complete workflow: 1. Opens and validates the NGB ZIP archive 2. Extracts metadata from stream_1.table 3. Processes measurement data from stream_2.table and stream_3.table 4. Returns structured data with embedded metadata

Example

parser = NGBParser() metadata, data_table = parser.parse("sample.ngb-ss3") print(f"Sample: {metadata.get('sample_name', 'Unknown')}") print(f"Data shape: {data_table.num_rows} x {data_table.num_columns}") Sample: Test Sample 1 Data shape: 2500 x 8

Advanced Configuration

config = PatternConfig() config.column_map["custom_id"] = "custom_column" parser = NGBParser(config)

Attributes:

Name Type Description
config

Pattern configuration for parsing

markers

Binary markers for data identification

binary_parser

Low-level binary parsing engine

metadata_extractor

Metadata extraction engine

data_processor

Data stream processing engine

Thread Safety

This parser is not thread-safe. Create separate instances for concurrent parsing operations.

Source code in src/pyngb/core/parser.py
class NGBParser:
    """Main parser for NETZSCH STA NGB files with enhanced error handling.

    This is the primary interface for parsing NETZSCH NGB files. It orchestrates
    the parsing of metadata and measurement data from the various streams within
    an NGB file.

    The parser handles the complete workflow:
    1. Opens and validates the NGB ZIP archive
    2. Extracts metadata from stream_1.table
    3. Processes measurement data from stream_2.table and stream_3.table
    4. Returns structured data with embedded metadata

    Example:
        >>> parser = NGBParser()
        >>> metadata, data_table = parser.parse("sample.ngb-ss3")
        >>> print(f"Sample: {metadata.get('sample_name', 'Unknown')}")
        >>> print(f"Data shape: {data_table.num_rows} x {data_table.num_columns}")
        Sample: Test Sample 1
        Data shape: 2500 x 8

    Advanced Configuration:
        >>> config = PatternConfig()
        >>> config.column_map["custom_id"] = "custom_column"
        >>> parser = NGBParser(config)

    Attributes:
        config: Pattern configuration for parsing
        markers: Binary markers for data identification
        binary_parser: Low-level binary parsing engine
        metadata_extractor: Metadata extraction engine
        data_processor: Data stream processing engine

    Thread Safety:
        This parser is not thread-safe. Create separate instances for
        concurrent parsing operations.
    """

    def __init__(self, config: PatternConfig | None = None) -> None:
        self.config = config or PatternConfig()
        self.markers = BinaryMarkers()
        self.binary_parser = BinaryParser(self.markers)
        self.metadata_extractor = MetadataExtractor(self.config, self.binary_parser)
        self.data_processor = DataStreamProcessor(self.config, self.binary_parser)

    def validate_ngb_structure(self, zip_file: zipfile.ZipFile) -> list[str]:
        """Validate that the ZIP file has the expected NGB structure.

        Args:
            zip_file: Open ZIP file to validate

        Returns:
            List of available streams

        Raises:
            NGBStreamNotFoundError: If required streams are missing
        """
        available_streams = zip_file.namelist()
        logger.debug(f"Available streams: {available_streams}")

        # Check for required streams
        # stream_1 and stream_2 are required for basic operation; stream_3 is optional
        required_streams = ["Streams/stream_1.table", "Streams/stream_2.table"]
        missing_streams = [
            stream for stream in required_streams if stream not in available_streams
        ]

        if missing_streams:
            raise NGBStreamNotFoundError(f"Missing required streams: {missing_streams}")

        return available_streams

    def parse(self, path: str) -> tuple[FileMetadata, pa.Table]:
        """Parse NGB file and return metadata and Arrow table.

        Opens an NGB file, extracts all metadata and measurement data,
        and returns them as separate objects for flexible use.

        Args:
            path: Path to the .ngb-ss3 file to parse

        Returns:
            Tuple of (metadata_dict, pyarrow_table) where:
            - metadata_dict contains instrument settings, sample info, etc.
            - pyarrow_table contains the measurement data columns

        Raises:
            FileNotFoundError: If the specified file doesn't exist
            NGBStreamNotFoundError: If required streams are missing
            NGBCorruptedFileError: If file structure is invalid
            zipfile.BadZipFile: If file is not a valid ZIP archive

        Example:
            >>> metadata, data = parser.parse("experiment.ngb-ss3")
            >>> print(f"Instrument: {metadata.get('instrument', 'Unknown')}")
            >>> print(f"Columns: {data.column_names}")
            >>> print(f"Temperature range: {data['sample_temperature'].min()} to {data['sample_temperature'].max()}")
            Instrument: NETZSCH STA 449 F3 Jupiter
            Columns: ['time', 'sample_temperature', 'mass', 'dsc_signal', 'purge_flow']
            Temperature range: 25.0 to 800.0
        """
        path_obj = Path(path)
        if not path_obj.exists():
            raise FileNotFoundError(f"File not found: {path}")

        metadata: FileMetadata = {}
        data_df = pl.DataFrame()

        try:
            with zipfile.ZipFile(path, "r") as z:
                # Validate NGB file structure
                available_streams = self.validate_ngb_structure(z)

                # stream_1: metadata
                with z.open("Streams/stream_1.table") as stream:
                    stream_data = stream.read()
                    tables = self.binary_parser.split_tables(stream_data)
                    metadata = self.metadata_extractor.extract_metadata(tables)

                # stream_2: primary data
                if "Streams/stream_2.table" in available_streams:
                    with z.open("Streams/stream_2.table") as stream:
                        stream_data = stream.read()
                        data_df = self.data_processor.process_stream_2(stream_data)

                # stream_3: additional data merged into existing df
                if "Streams/stream_3.table" in available_streams:
                    with z.open("Streams/stream_3.table") as stream:
                        stream_data = stream.read()
                        data_df = self.data_processor.process_stream_3(
                            stream_data, data_df
                        )

        except zipfile.BadZipFile as e:
            logger.error("Invalid ZIP archive: %s", e)
            raise
        except NGBStreamNotFoundError:
            # Re-raise our custom exceptions as-is
            raise
        except Exception as e:
            logger.error("Failed to parse NGB file: %s", e)
            raise

        # Convert to PyArrow at API boundary for cross-language compatibility
        # and metadata embedding. This is the single conversion point from
        # internal Polars processing to external PyArrow interface.
        return metadata, data_df.to_arrow()

Functions

validate_ngb_structure(zip_file)

Validate that the ZIP file has the expected NGB structure.

Parameters:

Name Type Description Default
zip_file ZipFile

Open ZIP file to validate

required

Returns:

Type Description
list[str]

List of available streams

Raises:

Type Description
NGBStreamNotFoundError

If required streams are missing

Source code in src/pyngb/core/parser.py
def validate_ngb_structure(self, zip_file: zipfile.ZipFile) -> list[str]:
    """Validate that the ZIP file has the expected NGB structure.

    Args:
        zip_file: Open ZIP file to validate

    Returns:
        List of available streams

    Raises:
        NGBStreamNotFoundError: If required streams are missing
    """
    available_streams = zip_file.namelist()
    logger.debug(f"Available streams: {available_streams}")

    # Check for required streams
    # stream_1 and stream_2 are required for basic operation; stream_3 is optional
    required_streams = ["Streams/stream_1.table", "Streams/stream_2.table"]
    missing_streams = [
        stream for stream in required_streams if stream not in available_streams
    ]

    if missing_streams:
        raise NGBStreamNotFoundError(f"Missing required streams: {missing_streams}")

    return available_streams

parse(path)

Parse NGB file and return metadata and Arrow table.

Opens an NGB file, extracts all metadata and measurement data, and returns them as separate objects for flexible use.

Parameters:

Name Type Description Default
path str

Path to the .ngb-ss3 file to parse

required

Returns:

Type Description
FileMetadata

Tuple of (metadata_dict, pyarrow_table) where:

Table
  • metadata_dict contains instrument settings, sample info, etc.
tuple[FileMetadata, Table]
  • pyarrow_table contains the measurement data columns

Raises:

Type Description
FileNotFoundError

If the specified file doesn't exist

NGBStreamNotFoundError

If required streams are missing

NGBCorruptedFileError

If file structure is invalid

BadZipFile

If file is not a valid ZIP archive

Example

metadata, data = parser.parse("experiment.ngb-ss3") print(f"Instrument: {metadata.get('instrument', 'Unknown')}") print(f"Columns: {data.column_names}") print(f"Temperature range: {data['sample_temperature'].min()} to {data['sample_temperature'].max()}") Instrument: NETZSCH STA 449 F3 Jupiter Columns: ['time', 'sample_temperature', 'mass', 'dsc_signal', 'purge_flow'] Temperature range: 25.0 to 800.0

Source code in src/pyngb/core/parser.py
def parse(self, path: str) -> tuple[FileMetadata, pa.Table]:
    """Parse NGB file and return metadata and Arrow table.

    Opens an NGB file, extracts all metadata and measurement data,
    and returns them as separate objects for flexible use.

    Args:
        path: Path to the .ngb-ss3 file to parse

    Returns:
        Tuple of (metadata_dict, pyarrow_table) where:
        - metadata_dict contains instrument settings, sample info, etc.
        - pyarrow_table contains the measurement data columns

    Raises:
        FileNotFoundError: If the specified file doesn't exist
        NGBStreamNotFoundError: If required streams are missing
        NGBCorruptedFileError: If file structure is invalid
        zipfile.BadZipFile: If file is not a valid ZIP archive

    Example:
        >>> metadata, data = parser.parse("experiment.ngb-ss3")
        >>> print(f"Instrument: {metadata.get('instrument', 'Unknown')}")
        >>> print(f"Columns: {data.column_names}")
        >>> print(f"Temperature range: {data['sample_temperature'].min()} to {data['sample_temperature'].max()}")
        Instrument: NETZSCH STA 449 F3 Jupiter
        Columns: ['time', 'sample_temperature', 'mass', 'dsc_signal', 'purge_flow']
        Temperature range: 25.0 to 800.0
    """
    path_obj = Path(path)
    if not path_obj.exists():
        raise FileNotFoundError(f"File not found: {path}")

    metadata: FileMetadata = {}
    data_df = pl.DataFrame()

    try:
        with zipfile.ZipFile(path, "r") as z:
            # Validate NGB file structure
            available_streams = self.validate_ngb_structure(z)

            # stream_1: metadata
            with z.open("Streams/stream_1.table") as stream:
                stream_data = stream.read()
                tables = self.binary_parser.split_tables(stream_data)
                metadata = self.metadata_extractor.extract_metadata(tables)

            # stream_2: primary data
            if "Streams/stream_2.table" in available_streams:
                with z.open("Streams/stream_2.table") as stream:
                    stream_data = stream.read()
                    data_df = self.data_processor.process_stream_2(stream_data)

            # stream_3: additional data merged into existing df
            if "Streams/stream_3.table" in available_streams:
                with z.open("Streams/stream_3.table") as stream:
                    stream_data = stream.read()
                    data_df = self.data_processor.process_stream_3(
                        stream_data, data_df
                    )

    except zipfile.BadZipFile as e:
        logger.error("Invalid ZIP archive: %s", e)
        raise
    except NGBStreamNotFoundError:
        # Re-raise our custom exceptions as-is
        raise
    except Exception as e:
        logger.error("Failed to parse NGB file: %s", e)
        raise

    # Convert to PyArrow at API boundary for cross-language compatibility
    # and metadata embedding. This is the single conversion point from
    # internal Polars processing to external PyArrow interface.
    return metadata, data_df.to_arrow()