"""
Wrapper functions around `sim.py` to help run/track multiple simulations
"""
import os
import collections
from typing import Callable, Optional
import copy
import pandas as pd
import numpy as np
from mpire import WorkerPool
import aplusml.sim as sim
[docs]
def test_diff_thresholds(simulation: sim.Simulation,
all_patients: list[sim.Patient],
thresholds: list[float],
utility_unit: str = '',
positive_outcome_state_ids: list[str] = [ 'positive_end_state', ],
**kwargs) -> pd.DataFrame:
"""Tests different model threshold values to find optimal cutoff point for binary predictions.
For each threshold value, runs the simulation and calculates utility metrics. The simulation must
contain a ``model_threshold`` variable that is used to binarize probabilistic predictions.
After testing all thresholds, sets the simulation and patients to use the threshold that
maximizes mean utility.
Args:
simulation (sim.Simulation): Simulation object containing workflow definition
all_patients (list[sim.Patient]): List of patients to simulate through workflow
thresholds (list[float]): List of threshold values to test (between 0 and 1)
utility_unit (str, optional): Name of utility unit to optimize (e.g. ``'qaly'``, ``'usd'``). Defaults to ``''``.
positive_outcome_state_ids (list[str], optional): State IDs that represent positive outcomes
(treatment administered). Used to calculate work per timestep. Defaults to ``['positive_end_state']``.
**kwargs: Additional arguments passed to ``simulation.run()``
Returns:
pd.DataFrame: Results for each threshold tested with columns
- ``threshold``: Threshold value tested
- ``mean_utility``: Mean utility achieved across all patients
- ``std_utility``: Standard deviation of utilities
- ``sem_utility``: Standard error of mean utility
- ``mean_work_per_timestep``: Average number of positive outcomes per timestep
Raises:
AssertionError: If ``model_threshold`` variable is not defined in ``simulation.variables``
"""
rows = []
assert 'model_threshold' in simulation.variables, "ERROR - The key 'model_threshold' must exist in 'simulation.variables' but is currently missing"
for x in thresholds:
simulation.variables['model_threshold']['value'] = x
all_patients = simulation.run(all_patients, **kwargs)
utilities = [ p.get_sum_utilities(simulation)[utility_unit] for p in all_patients ]
mean_work_per_timestep = len([p for p in all_patients if p.history[-1].state_id in positive_outcome_state_ids ]) / (simulation.current_timestep + 1)
rows.append({
'threshold' : x,
'mean_utility' : np.mean(utilities),
'std_utility' : np.std(utilities),
'sem_utility' : np.std(utilities) / np.sqrt(len(all_patients)),
'mean_work_per_timestep' : mean_work_per_timestep,
})
df = pd.DataFrame(rows)
# Best model threshold
max_threshold = df['threshold'].iloc[df['mean_utility'].argmax()]
simulation.variables['model_threshold']['value'] = max_threshold
# Set patients to correspond to best utility
all_patients = simulation.run(all_patients)
return df
def _run_test(simulation: sim.Simulation,
all_patients: list[sim.Patient],
func_run_test: Optional[Callable],
func_match_patient_to_property_column: Callable,
is_refresh_patients: bool,
l: str,
k2v: dict,
is_log: bool = False) -> pd.DataFrame:
"""Helper function that runs a single simulation test with specified parameters.
This function is designed to be used by ``run_test()`` for both serial and parallel processing.
It handles:
1. Creating a deep copy of the simulation to avoid state conflicts
2. Updating simulation variables based on test settings
3. Optionally refreshing patient properties
4. Running either a custom test function or basic simulation
5. Collecting and formatting results
Args:
simulation (sim.Simulation): Base simulation object to copy and modify
all_patients (list[sim.Patient]): List of patients to simulate
func_run_test (Optional[Callable]): Custom function to run simulation test, typically test_diff_thresholds.
If ``None``, runs basic simulation and sums utilities.
func_match_patient_to_property_column (Callable): Function to match patients to properties in CSV.
Takes ``(patient_id, random_idx, df, column)`` as arguments.
is_refresh_patients (bool): If ``True``, recreates patient objects with new properties
l (str): Label for this test run
k2v (dict): Dictionary mapping variable names to new values for this test
is_log (bool, optional): If ``True``, prints run progress. Defaults to ``False``.
Returns:
pd.DataFrame: Results dataframe. If using ``func_run_test``, matches that function's output
with added ``label`` column. Otherwise, contains summed utilities and ``label``.
"""
if is_log:
print(f"Run: {l}")
simulation: sim.Simulation = copy.deepcopy(simulation)
for key, val in k2v.items():
simulation.variables[key] = val
if is_refresh_patients:
all_patients = sim.create_patients_for_simulation(simulation, all_patients, func_match_patient_to_property_column, random_seed = 0)
if func_run_test:
_df: pd.DataFrame = func_run_test(simulation, all_patients, l)
_df['label'] = l
else:
all_patients = simulation.run(all_patients)
_df = collections.defaultdict(float)
for p in all_patients:
_u: dict = p.get_sum_utilities(simulation)
for key, val in _u.items():
_df[key] += val
_df['label'] = l
_df = pd.DataFrame([_df])
return _df
[docs]
def run_test(simulation: sim.Simulation,
all_patients: list[sim.Patient],
labels: list,
keys2values: list[dict[dict]],
df: pd.DataFrame = None,
func_run_test: Callable = None,
func_match_patient_to_property_column: Callable = None,
is_refresh_patients: bool = False,
is_use_multi_processing: bool = False) -> pd.DataFrame:
"""Runs multiple simulation tests with different variable settings.
This is the main entry point for running simulation experiments. It supports:
1. Testing multiple configurations in parallel or serial
2. Custom test functions (e.g. threshold testing)
3. Patient property refreshing between runs
4. Appending results to existing dataframes
5. Multiprocessing for improved performance
The function pairs each label with its corresponding variable settings from ``keys2values``
and runs the simulation with those settings. Results from all runs are combined into
a single dataframe.
Args:
simulation (sim.Simulation): Base simulation object
all_patients (list[sim.Patient]): List of patients to simulate
labels (list): Names for each test configuration
keys2values (list[dict[dict]]): List of variable settings to test. Each dict maps
variable names to new values/settings for that test run.
df (pd.DataFrame, optional): Existing results to append to. Defaults to None.
func_run_test (Callable, optional): Custom function to run each test. Typically
test_diff_thresholds. If ``None``, runs basic simulation. Defaults to ``None``.
func_match_patient_to_property_column (Callable, optional): Function to match patients
to properties in CSV. Required if refreshing patients. Defaults to ``None``.
is_refresh_patients (bool, optional): If ``True``, recreates patients with new properties
between runs. Defaults to ``False``.
is_use_multi_processing (bool, optional): If ``True``, runs tests in parallel using
available CPU cores. Defaults to ``False``.
Returns:
pd.DataFrame: Combined results from all test runs. Format depends on ``func_run_test``,
but always includes a ``label`` column identifying the test configuration.
References:
For usage examples, see:
- :doc:`/usage/tutorial_pad`
"""
df = df.copy() if df is not None else pd.DataFrame()
if is_use_multi_processing:
n_jobs = os.cpu_count() - 1
print('# of processes:', n_jobs)
with WorkerPool(n_jobs, use_dill=True) as pool:
results = pool.map(_run_test, [(copy.deepcopy(simulation),
all_patients,
func_run_test,
func_match_patient_to_property_column,
is_refresh_patients,
l, k2v) for l, k2v in zip(labels, keys2values) ])
df = pd.concat(results + [df])
else:
for l, k2v in zip(labels, keys2values):
_df = _run_test(copy.deepcopy(simulation),
all_patients,
func_run_test,
is_refresh_patients,
func_match_patient_to_property_column,
l, k2v)
df = pd.concat([df, _df])
return df
if __name__ == "__main__":
pass