Skip to content

Job

Job

Job(process, fds_file, output_dir, monitor, chid)

Represents a running or completed FDS simulation job.

PARAMETER DESCRIPTION
process

The running FDS process

TYPE: Popen

fds_file

Path to the FDS input file

TYPE: Path

output_dir

Directory where outputs are written

TYPE: Path

monitor

Progress monitor instance

TYPE: ProgressMonitor

chid

Case identifier

TYPE: str

Examples:

>>> job = runner.run(fds_file="test.fds", wait=False)
>>> while job.is_running():
...     print(f"Progress: {job.progress}%")
...     time.sleep(5)
>>> results = job.get_results()
Source code in src/pyfds/execution/runner.py
def __init__(
    self,
    process: "subprocess.Popen[bytes]",
    fds_file: Path,
    output_dir: Path,
    monitor: ProgressMonitor | None,
    chid: str,
):
    self._process = process
    self.fds_file = fds_file
    self.output_dir = output_dir
    self._monitor = monitor
    self.chid = chid
    self._stdout: str | None = None
    self._stderr: str | None = None
    self._exit_code: int | None = None

Attributes

progress property

progress

Get current progress percentage.

RETURNS DESCRIPTION
float

Progress percentage (0-100)

progress_info property

progress_info

Get detailed progress information.

RETURNS DESCRIPTION
(ProgressInfo, optional)

Detailed progress info, or None if not available

estimated_time_remaining property

estimated_time_remaining

Get estimated time remaining in seconds.

RETURNS DESCRIPTION
(float, optional)

Estimated seconds remaining, or None if not available

exit_code property

exit_code

Get process exit code.

RETURNS DESCRIPTION
(int, optional)

Exit code if process finished, None otherwise

Functions

is_running

is_running()

Check if the job is still running.

RETURNS DESCRIPTION
bool

True if job is running, False otherwise

Source code in src/pyfds/execution/runner.py
def is_running(self) -> bool:
    """
    Check if the job is still running.

    Returns
    -------
    bool
        True if job is running, False otherwise
    """
    if self._process.poll() is None:
        return True

    # Process finished, capture output
    if self._stdout is None:
        stdout, stderr = self._process.communicate()
        self._stdout = stdout.decode() if stdout else ""
        self._stderr = stderr.decode() if stderr else ""
        self._exit_code = self._process.returncode

    return False

wait

wait(timeout=None)

Wait for job to complete and return results.

PARAMETER DESCRIPTION
timeout

Maximum time to wait in seconds

TYPE: float DEFAULT: None

RETURNS DESCRIPTION
Results

Simulation results object

RAISES DESCRIPTION
FDSTimeoutError

If timeout is exceeded

ExecutionError

If FDS execution failed

Source code in src/pyfds/execution/runner.py
def wait(self, timeout: float | None = None) -> "Results":
    """
    Wait for job to complete and return results.

    Parameters
    ----------
    timeout : float, optional
        Maximum time to wait in seconds

    Returns
    -------
    Results
        Simulation results object

    Raises
    ------
    FDSTimeoutError
        If timeout is exceeded
    ExecutionError
        If FDS execution failed
    """
    try:
        stdout, stderr = self._process.communicate(timeout=timeout)
        self._stdout = stdout.decode() if stdout else ""
        self._stderr = stderr.decode() if stderr else ""
        self._exit_code = self._process.returncode
    except subprocess.TimeoutExpired as e:
        self.kill()
        raise FDSTimeoutError(
            f"FDS execution exceeded timeout of {timeout} seconds",
            fds_file=str(self.fds_file),
        ) from e
    finally:
        if self._monitor is not None:
            self._monitor.stop()

    # Check for errors
    if self._exit_code != 0:
        # Parse .out file for error messages
        out_file = self.output_dir / f"{self.chid}.out"
        errors = parse_out_file_for_errors(out_file)
        error_msg = "\n".join(errors) if errors else "Unknown error"

        raise ExecutionError(
            f"FDS execution failed: {error_msg}",
            exit_code=self._exit_code,
            stdout=self._stdout,
            stderr=self._stderr,
            fds_file=str(self.fds_file),
        )

    # Import here to avoid circular import
    from pyfds.analysis.results import Results

    return Results(chid=self.chid, output_dir=self.output_dir)

kill

kill()

Kill the running job.

Source code in src/pyfds/execution/runner.py
def kill(self) -> None:
    """Kill the running job."""
    if self.is_running():
        self._process.kill()
        if self._monitor is not None:
            self._monitor.stop()

request_stop

request_stop()

Request graceful shutdown by creating CHID.stop file.

From FDS User Guide §3.4: "To stop a calculation before its scheduled time, create a file in the same directory as the output files called CHID.stop. The existence of this file stops the program gracefully, causing it to dump out the latest flow variables for viewing in Smokeview."

This method creates the stop file and logs the action. FDS will check for the file and stop gracefully after completing the current timestep.

Examples:

>>> job = sim.run(wait=False)
>>> # ... simulation running ...
>>> job.request_stop()  # Ask FDS to stop gracefully
>>> job.wait()  # Wait for graceful shutdown
Source code in src/pyfds/execution/runner.py
def request_stop(self) -> None:
    """
    Request graceful shutdown by creating CHID.stop file.

    From FDS User Guide §3.4:
    "To stop a calculation before its scheduled time, create a file in
    the same directory as the output files called CHID.stop. The existence
    of this file stops the program gracefully, causing it to dump out the
    latest flow variables for viewing in Smokeview."

    This method creates the stop file and logs the action. FDS will check
    for the file and stop gracefully after completing the current timestep.

    Examples
    --------
    >>> job = sim.run(wait=False)
    >>> # ... simulation running ...
    >>> job.request_stop()  # Ask FDS to stop gracefully
    >>> job.wait()  # Wait for graceful shutdown
    """
    stop_file = self.output_dir / f"{self.chid}.stop"
    stop_file.touch()
    logger.info(f"Created stop file: {stop_file}")
    logger.info(
        "FDS will stop gracefully after current timestep completes. "
        "Use wait() or get_results() to wait for shutdown."
    )

get_results

get_results()

Get results (waits for completion if still running).

RETURNS DESCRIPTION
Results

Simulation results object

Source code in src/pyfds/execution/runner.py
def get_results(self) -> "Results":
    """
    Get results (waits for completion if still running).

    Returns
    -------
    Results
        Simulation results object
    """
    return self.wait()

Overview

The Job class represents a running or completed FDS simulation. It provides methods to monitor progress, wait for completion, and retrieve results.

Jobs are typically created by calling FDSRunner.run() with wait=False.

Creating Jobs

from pyfds import run_fds
from pyfds.execution import FDSRunner

# Non-blocking execution returns a Job
runner = FDSRunner()
job = runner.run("simulation.fds", wait=False)

# Or using convenience function
job = run_fds("simulation.fds", wait=False)

Monitoring Progress

Check if Running

job = run_fds("simulation.fds", wait=False)

while job.is_running():
    print("Simulation still running...")
    time.sleep(5)

print("Simulation complete!")

Get Progress Percentage

job = run_fds("simulation.fds", wait=False)

while job.is_running():
    progress = job.progress
    print(f"Progress: {progress:.1f}%")
    time.sleep(10)

Get Detailed Progress Info

job = run_fds("simulation.fds", wait=False)

while job.is_running():
    info = job.progress_info
    if info:
        print(f"Time step: {info.current_time:.1f}s / {info.total_time:.1f}s")
        print(f"Progress: {info.percent_complete:.1f}%")
        print(f"ETA: {info.eta_seconds:.0f} seconds")
    time.sleep(10)

Estimated Time Remaining

job = run_fds("simulation.fds", wait=False)

while job.is_running():
    eta = job.estimated_time_remaining
    if eta is not None:
        print(f"Estimated time remaining: {eta:.0f} seconds")
    time.sleep(10)

Waiting for Completion

Wait Indefinitely

job = run_fds("simulation.fds", wait=False)

# Do other work...

# Wait for completion and get results
results = job.wait()
print(f"Max HRR: {results.hrr.max()}")

Wait with Timeout

from pyfds.exceptions import FDSTimeoutError

job = run_fds("simulation.fds", wait=False)

try:
    results = job.wait(timeout=3600)  # 1 hour timeout
except FDSTimeoutError:
    print("Simulation timed out")
    job.kill()

Controlling Jobs

Kill Job

job = run_fds("simulation.fds", wait=False)

# Cancel if taking too long
if job.progress < 10 and time.time() - start_time > 300:
    print("Simulation not making progress, killing...")
    job.kill()

Request Stop

job = run_fds("simulation.fds", wait=False)

# Gracefully request FDS to stop
job.request_stop()

# Wait for graceful shutdown
try:
    results = job.wait(timeout=60)
except FDSTimeoutError:
    # Force kill if graceful shutdown fails
    job.kill()

Job Properties

Exit Code

job = run_fds("simulation.fds", wait=False)
job.wait()

if job.exit_code == 0:
    print("Simulation completed successfully")
else:
    print(f"Simulation failed with exit code {job.exit_code}")

File Paths

job = run_fds("simulation.fds", wait=False)

print(f"Input file: {job.fds_file}")
print(f"Output directory: {job.output_dir}")
print(f"CHID: {job.chid}")

Error Handling

Execution Errors

from pyfds.exceptions import ExecutionError

job = run_fds("simulation.fds", wait=False)

try:
    results = job.wait()
except ExecutionError as e:
    print(f"Execution failed: {e}")
    print(f"Exit code: {e.exit_code}")
    print(f"Stdout: {e.stdout}")
    print(f"Stderr: {e.stderr}")

Timeout Handling

from pyfds.exceptions import FDSTimeoutError

job = run_fds("long_simulation.fds", wait=False)

try:
    results = job.wait(timeout=7200)  # 2 hours
except FDSTimeoutError as e:
    print(f"Simulation timed out: {e}")
    job.kill()
    # Inspect partial results
    print(f"Made it to {job.progress:.1f}% before timeout")

Common Patterns

Progress Bar

import time
from tqdm import tqdm

job = run_fds("simulation.fds", wait=False)

with tqdm(total=100, desc="Simulation Progress") as pbar:
    last_progress = 0
    while job.is_running():
        current = job.progress
        pbar.update(current - last_progress)
        last_progress = current
        time.sleep(5)
    pbar.update(100 - last_progress)

results = job.wait()

Parallel Jobs

from concurrent.futures import ThreadPoolExecutor

def run_simulation(fds_file):
    """Run simulation and return results."""
    job = run_fds(fds_file, wait=False)
    return job.wait()

# Run multiple simulations in parallel
sim_files = ["sim1.fds", "sim2.fds", "sim3.fds"]

with ThreadPoolExecutor(max_workers=3) as executor:
    futures = [executor.submit(run_simulation, f) for f in sim_files]
    results = [f.result() for f in futures]

Monitoring Multiple Jobs

jobs = []
for fds_file in ["sim1.fds", "sim2.fds", "sim3.fds"]:
    job = run_fds(fds_file, wait=False)
    jobs.append(job)

# Monitor all jobs
while any(job.is_running() for job in jobs):
    for i, job in enumerate(jobs):
        if job.is_running():
            print(f"Job {i+1}: {job.progress:.1f}%")
    time.sleep(10)

# Collect results
results = [job.wait() for job in jobs]

Conditional Stopping

job = run_fds("simulation.fds", wait=False)

while job.is_running():
    # Check progress info
    info = job.progress_info
    if info and info.current_time > 300:
        # Check some condition from partial output
        out_file = job.output_dir / f"{job.chid}.out"
        if out_file.exists():
            content = out_file.read_text()
            if "INSTABILITY" in content:
                print("Instability detected, stopping simulation")
                job.request_stop()
                break
    time.sleep(10)

try:
    results = job.wait(timeout=60)
except FDSTimeoutError:
    job.kill()

Integration with Results

# Start simulation
job = run_fds("simulation.fds", wait=False)

# Monitor and wait
while job.is_running():
    print(f"Progress: {job.progress:.1f}%")
    time.sleep(10)

# Get results
results = job.wait()

# Analyze results
print(f"Peak HRR: {results.hrr.max():.1f} kW")
print(f"Max temperature: {results.max_temp:.1f} °C")

# Plot results
results.plot_hrr()
results.plot_temperature("TEMP_1")

See Also