Source code for fl4health.utils.early_stopper

from __future__ import annotations

import copy
from collections.abc import Callable
from logging import INFO, WARNING
from pathlib import Path
from typing import TYPE_CHECKING, Any

import torch.nn as nn
from flwr.common.logger import log
from torch.optim import Optimizer
from torch.optim.lr_scheduler import LRScheduler

from fl4health.checkpointing.checkpointer import PerRoundStateCheckpointer
from fl4health.reporting.reports_manager import ReportsManager
from fl4health.utils.logging import LoggingMode
from fl4health.utils.losses import TrainingLosses
from fl4health.utils.metrics import MetricManager
from fl4health.utils.snapshotter import (
    AbstractSnapshotter,
    LRSchedulerSnapshotter,
    NumberSnapshotter,
    OptimizerSnapshotter,
    SerializableObjectSnapshotter,
    T,
    TorchModuleSnapshotter,
)

if TYPE_CHECKING:
    from fl4health.clients.basic_client import BasicClient


[docs] class EarlyStopper:
[docs] def __init__( self, client: BasicClient, patience: int | None = 1, interval_steps: int = 5, snapshot_dir: Path | None = None, ) -> None: """ Early stopping class is a plugin for the client that allows to stop local training based on the validation loss. At each training step this class saves the best state of the client and restores it if the client is stopped. If the client starts to overfit, the early stopper will stop the training process and restore the best state of the client before sending the model to the server. Args: client (BasicClient): The client to be monitored. patience (int, optional): Number of validation cycles to wait before stopping the training. If it is equal to None client never stops, but still loads the best state before sending the model to the server. Defaults to 1. interval_steps (int): Specifies the frequency, in terms of training intervals, at which the early stopping mechanism should evaluate the validation loss. Defaults to 5. snapshot_dir (Path | None, optional): Rather than keeping best state in the memory we can checkpoint it to the given directory. If it is not given, the best state is kept in the memory. Defaults to None. """ self.client = client self.patience = patience self.count_down = patience self.interval_steps = interval_steps self.best_score: float | None = None self.snapshot_ckpt: dict[str, tuple[AbstractSnapshotter, Any]] = {} self.snapshot_attrs: dict = { "model": (TorchModuleSnapshotter(self.client), nn.Module), "optimizers": (OptimizerSnapshotter(self.client), Optimizer), "lr_schedulers": ( LRSchedulerSnapshotter(self.client), LRScheduler, ), "learning_rate": (NumberSnapshotter(self.client), float), "total_steps": (NumberSnapshotter(self.client), int), "total_epochs": (NumberSnapshotter(self.client), int), "reports_manager": ( SerializableObjectSnapshotter(self.client), ReportsManager, ), "train_loss_meter": ( SerializableObjectSnapshotter(self.client), TrainingLosses, ), "train_metric_manager": ( SerializableObjectSnapshotter(self.client), MetricManager, ), } if snapshot_dir is not None: # TODO: Move to generic checkpointer self.checkpointer = PerRoundStateCheckpointer(snapshot_dir) self.checkpoint_name = f"temp_{self.client.client_name}.pt" else: log(INFO, "Snapshot is being persisted in memory")
[docs] def add_default_snapshot_attr( self, name: str, snapshot_class: Callable[[BasicClient], AbstractSnapshotter], input_type: type[T] ) -> None: self.snapshot_attrs.update({name: (snapshot_class(self.client), input_type)})
[docs] def delete_default_snapshot_attr(self, name: str) -> None: del self.snapshot_attrs[name]
[docs] def save_snapshot(self) -> None: """ Creates a snapshot of the client state and if snapshot_ckpt is given, saves it to the checkpoint. """ for attr, (snapshotter_function, expected_type) in self.snapshot_attrs.items(): self.snapshot_ckpt.update(snapshotter_function.save(attr, expected_type)) if self.checkpointer is not None: log( INFO, f"Saving client best state to checkpoint at {self.checkpointer.checkpoint_dir} " f"with name {self.checkpoint_name}.", ) self.checkpointer.save_checkpoint(self.checkpoint_name, self.snapshot_ckpt) self.snapshot_ckpt.clear() else: log( WARNING, "Checkpointing directory is not provided. Client best state will be kept in the memory.", ) self.snapshot_ckpt = copy.deepcopy(self.snapshot_ckpt)
[docs] def load_snapshot(self, attributes: list[str] | None = None) -> None: """ Load checkpointed snapshot dict consisting to the respective model attributes. Args: attributes (list[str] | None): List of attributes to load from the checkpoint. If None, all attributes are loaded. Defaults to None. """ assert ( self.checkpointer.checkpoint_exists(self.checkpoint_name) or self.snapshot_ckpt != {} ), "No checkpoint to load" if attributes is None: attributes = list(self.snapshot_attrs.keys()) log(INFO, f"Loading client best state {attributes} from checkpoint at {self.checkpointer.checkpoint_dir}") if self.checkpointer.checkpoint_exists(self.checkpoint_name): self.snapshot_ckpt = self.checkpointer.load_checkpoint(self.checkpoint_name) for attr in attributes: snapshotter, expected_type = self.snapshot_attrs[attr] snapshotter.load(self.snapshot_ckpt, attr, expected_type)
[docs] def should_stop(self, steps: int) -> bool: """ Determine if the client should stop training based on early stopping criteria. Args: steps (int): Number of steps since the start of the training. Returns: bool: True if training should stop, otherwise False. """ if steps % self.interval_steps != 0: return False val_loss, _ = self.client._validate_or_test( loader=self.client.val_loader, loss_meter=self.client.val_loss_meter, metric_manager=self.client.val_metric_manager, logging_mode=LoggingMode.EARLY_STOP_VALIDATION, include_losses_in_metrics=False, ) if val_loss is None: return False if self.best_score is None or val_loss < self.best_score: self.best_score = val_loss self.count_down = self.patience self.save_snapshot() return False if self.count_down is not None: self.count_down -= 1 if self.count_down <= 0: return True return False