Skip to content

FDSRunner

Auto-Generated Documentation

The API reference will be automatically populated from source code docstrings using mkdocstrings once the implementation is complete.

Overview

The FDSRunner class executes FDS simulations and manages the FDS process.

Basic Usage

from pyfds import Simulation
from pyfds.execution import FDSRunner

# Create simulation
sim = Simulation(chid='test')
sim.add(Time(t_end=600.0))
sim.add(Mesh(ijk=Grid3D.of(50, 50, 25), xb=Bounds3D.of(0, 5, 0, 5, 0, 2.5)))
sim.write('test.fds')

# Run FDS
runner = FDSRunner()
job = runner.run('test.fds', wait=True)

if job.is_complete():
    print("Simulation completed successfully!")

Running Simulations

Synchronous Execution

Wait for simulation to complete:

runner = FDSRunner()
job = runner.run('simulation.fds', wait=True)

# Job is complete when this returns
print(f"Exit code: {job.exit_code}")

Asynchronous Execution

Run in background:

runner = FDSRunner()
job = runner.run('simulation.fds', wait=False)

# Do other work while simulation runs
print(f"Job PID: {job.pid}")
print(f"Status: {job.status}")

# Wait when ready
job.wait()

With Threading

Parallel execution with OpenMP:

# Run with 4 OpenMP threads
runner = FDSRunner()
job = runner.run('simulation.fds', n_threads=4, wait=True)

With MPI

Distributed execution (requires MPI-enabled FDS):

# Run with 4 MPI processes
runner = FDSRunner()
job = runner.run('simulation.fds', n_mpi=4, wait=True)

Configuration

FDS Executable

Specify FDS executable path:

runner = FDSRunner(fds_command='/path/to/fds')
job = runner.run('simulation.fds')

Or use environment variable:

export FDS_COMMAND=/path/to/fds

Working Directory

Set working directory for execution:

runner = FDSRunner(working_dir='/path/to/simulations')
job = runner.run('test.fds')

Checking FDS Installation

Verify Installation

from pyfds.execution import check_fds_installed

if check_fds_installed():
    print("FDS is installed")
else:
    print("FDS not found")

Get FDS Version

from pyfds.execution import get_fds_version

version = get_fds_version()
print(f"FDS version: {version}")
# Output: FDS version: 6.8.0

Job Management

Job Object

The run() method returns a Job object:

job = runner.run('simulation.fds', wait=False)

# Job properties
print(job.chid)          # 'simulation'
print(job.pid)           # Process ID
print(job.status)        # 'running', 'completed', 'failed'
print(job.start_time)    # Start timestamp
print(job.elapsed_time)  # Elapsed seconds

Monitoring Progress

def progress_callback(job, progress):
    """Called periodically with progress updates."""
    print(f"Progress: {progress:.1f}%")
    print(f"Time: {job.current_time:.1f}/{job.total_time:.1f}s")

job = runner.run(
    'simulation.fds',
    wait=False,
    monitor=True,
    callback=progress_callback,
    callback_interval=10.0  # Every 10 seconds
)

job.wait()

Streaming Output

job = runner.run('simulation.fds', wait=False)

# Stream FDS output in real-time
for line in job.stream_output():
    print(line, end='')

    # Check for specific messages
    if 'STOP' in line:
        print("\nSimulation stopping...")

Error Handling

Check for Errors

job = runner.run('simulation.fds', wait=True)

if job.has_failed():
    print(f"Simulation failed: {job.error_message}")
    print(f"Exit code: {job.exit_code}")
else:
    print("Simulation completed successfully")

Timeout

Set maximum execution time:

runner = FDSRunner()
job = runner.run(
    'simulation.fds',
    wait=True,
    timeout=3600  # Max 1 hour
)

if job.status == 'timeout':
    print("Simulation timed out")

Retry on Failure

def run_with_retry(fds_file, max_attempts=3):
    """Run simulation with automatic retry."""
    runner = FDSRunner()

    for attempt in range(max_attempts):
        print(f"Attempt {attempt + 1}/{max_attempts}")

        job = runner.run(fds_file, wait=True)

        if job.is_complete():
            return job

        print(f"Failed: {job.error_message}")
        if attempt < max_attempts - 1:
            print("Retrying...")

    raise RuntimeError("Max retry attempts exceeded")

job = run_with_retry('simulation.fds')

Advanced Usage

Custom Environment

Set environment variables for FDS:

import os

env = os.environ.copy()
env['OMP_NUM_THREADS'] = '4'
env['FDS_DEBUG'] = '1'

runner = FDSRunner(env=env)
job = runner.run('simulation.fds')

Capture Output

Redirect stdout/stderr:

runner = FDSRunner()
job = runner.run(
    'simulation.fds',
    wait=True,
    stdout='simulation.out',
    stderr='simulation.err'
)

# Read captured output
with open('simulation.out') as f:
    output = f.read()

Process Priority

Set process priority (Unix):

import os

runner = FDSRunner()
job = runner.run('simulation.fds', wait=False)

# Lower priority (nice +10)
os.nice(10)

Integration with Simulation

Direct Execution

Run directly from Simulation object:

sim = Simulation(chid='test')
sim.add(Time(t_end=600.0))
sim.add(Mesh(ijk=Grid3D.of(50, 50, 25), xb=Bounds3D.of(0, 5, 0, 5, 0, 2.5)))

# Write and run in one step
job = sim.run(wait=True, n_threads=4)

if job.is_complete():
    print("Done!")

With Validation

Validate before running:

sim = Simulation(chid='test')
# ... configure simulation ...

if not sim.is_valid():
    errors = sim.validate()
    print(f"Validation errors: {errors}")
else:
    job = sim.run(wait=True)

Common Patterns

Batch Execution

Run multiple simulations:

runner = FDSRunner()
jobs = []

for i, fds_file in enumerate(fds_files):
    print(f"Starting {fds_file}...")
    job = runner.run(fds_file, wait=False, n_threads=2)
    jobs.append(job)

# Wait for all to complete
for job in jobs:
    job.wait()
    status = "✓" if job.is_complete() else "✗"
    print(f"{status} {job.chid}")

Sequential Queue

Run simulations one at a time:

from pyfds.execution import JobQueue

queue = JobQueue(max_concurrent=1)

for fds_file in fds_files:
    queue.add(fds_file, n_threads=8)

queue.start()

while not queue.is_complete():
    print(f"Progress: {queue.completed_count}/{queue.total_count}")
    time.sleep(10)

Parallel Execution

Run multiple simulations in parallel:

from concurrent.futures import ThreadPoolExecutor

def run_simulation(fds_file):
    """Run single simulation."""
    runner = FDSRunner()
    job = runner.run(fds_file, wait=True, n_threads=2)
    return job.chid, job.is_complete()

# Run 4 simulations in parallel
with ThreadPoolExecutor(max_workers=4) as executor:
    results = executor.map(run_simulation, fds_files)

    for chid, success in results:
        print(f"{chid}: {'Success' if success else 'Failed'}")

Platform Support

Windows

# Use fds.exe on Windows
runner = FDSRunner(fds_command='fds.exe')
job = runner.run('simulation.fds')

Linux/macOS

# Standard FDS binary
runner = FDSRunner(fds_command='fds')
job = runner.run('simulation.fds')

Docker

Run FDS in Docker container:

runner = FDSRunner(
    fds_command='docker',
    fds_args=['run', '--rm', '-v', f'{os.getcwd()}:/workdir',
              'fds-image', 'fds']
)
job = runner.run('simulation.fds')

Troubleshooting

FDS Not Found

from pyfds.execution import check_fds_installed

if not check_fds_installed():
    print("FDS not found in PATH")
    print("Set FDS_COMMAND environment variable")
    print("Or install FDS: https://pages.nist.gov/fds-smv/")

Permission Denied

# Make FDS executable
chmod +x /path/to/fds

OpenMP Issues

# Set number of threads explicitly
export OMP_NUM_THREADS=4

See Also


Job → Back to API →