Source code for fl4health.clients.model_merge_client

import datetime
from abc import abstractmethod
from collections.abc import Sequence
from pathlib import Path

import torch
import torch.nn as nn
from flwr.client import NumPyClient
from flwr.common.typing import Config, NDArrays, Scalar
from torch.utils.data import DataLoader

from fl4health.parameter_exchange.full_exchanger import FullParameterExchanger
from fl4health.parameter_exchange.parameter_exchanger_base import ParameterExchanger
from fl4health.reporting.base_reporter import BaseReporter
from fl4health.reporting.reports_manager import ReportsManager
from fl4health.utils.client import move_data_to_device
from fl4health.utils.metrics import Metric, MetricManager
from fl4health.utils.random import generate_hash
from fl4health.utils.typing import TorchInputType, TorchTargetType


[docs] class ModelMergeClient(NumPyClient):
[docs] def __init__( self, data_path: Path, model_path: Path, metrics: Sequence[Metric], device: torch.device, reporters: Sequence[BaseReporter] | None = None, client_name: str | None = None, ) -> None: """ ModelMergeClient to support functionality to simply perform model merging across client models and subsequently evaluate. Args: data_path (Path): path to the data to be used to load the data for client-side training model_path (Path): path to the checkpoint of the client model to be used in model merging. metrics (Sequence[Metric]): Metrics to be computed based on the labels and predictions of the client model device (torch.device): Device indicator for where to send the model, batches, labels etc. Often 'cpu' or 'cuda' reporters (Sequence[BaseReporter], optional): A sequence of FL4Health reporters which the client should send data to. client_name (str): An optional client name that uniquely identifies a client. If not passed, a hash is randomly generated. """ self.data_path = data_path self.model_path = model_path self.metrics = metrics self.device = device self.client_name = client_name if client_name is not None else generate_hash() self.initialized = False self.test_metric_manager = MetricManager(metrics=self.metrics, metric_manager_name="test") # Initialize reporters with client information. self.reports_manager = ReportsManager(reporters) self.reports_manager.initialize(id=self.client_name) self.model: nn.Module self.test_loader: DataLoader self.num_test_samples: int
[docs] def setup_client(self, config: Config) -> None: """ Sets up Merge Client by initializing model, dataloader and parameter exchanger with user defined methods. Subsequently, sets initialized attribute to True. Args: config (Config): The configuration from the server. """ self.model = self.get_model(config) self.test_loader = self.get_test_data_loader(config) self.num_test_samples = len(self.test_loader.dataset) # type: ignore self.parameter_exchanger = self.get_parameter_exchanger(config) self.initialized = True
[docs] def get_parameters(self, config: Config) -> NDArrays: """ Determines which parameters are sent back to the server for aggregation. This uses a parameter exchanger to determine parameters sent. For the ModelMergeClient, we assume that self.setup_client has already been called as it does not support client polling so get_parameters is called from fit and thus should be initialized by this point. Args: config (Config): The config is sent by the FL server to allow for customization in the function if desired. Returns: NDArrays: These are the parameters to be sent to the server. At minimum they represent the relevant model parameters to be aggregated, but can contain more information. """ assert self.model is not None return self.parameter_exchanger.push_parameters(self.model, config=config)
[docs] def set_parameters(self, parameters: NDArrays, config: Config) -> None: """ Sets the local model parameters transferred from the server using a parameter exchanger to coordinate how parameters are set. For the ModelMergeClient, we assume that initially parameters are being set to the parameters in the nn.Module returned by the user defined get_model method. Thus, set_parameters is only called once after model merging has occurred and before federated evaluation. Args: parameters (NDArrays): Parameters have information about model state to be added to the relevant client model but may contain more information than that. config (Config): The config is sent by the FL server to allow for customization in the function if desired. """ assert self.initialized self.parameter_exchanger.pull_parameters(parameters, self.model)
[docs] def fit(self, parameters: NDArrays, config: Config) -> tuple[NDArrays, int, dict[str, Scalar]]: """ Initializes client, validates local client model on local test data and returns parameters, test dataset length and test metrics. Importantly, parameters from Server, which is empty, is not used to initialized the client model. Note: Since we only assume the client provides a test_loader, client evaluation and sample counts are always based off the client test_loader. Args: parameters (NDArrays): Not used. config (NDArrays): The config from the server. Returns: tuple[NDArrays, int, dict[str, Scalar]]: The local model parameters along with the number of samples in the local test dataset and the computed metrics of the local model on the local test dataset. Raises: AssertionError: If model is initialized prior to fit method being called which should not happen in the case of the ModelMergeClient. """ assert not self.initialized self.setup_client(config) self.reports_manager.report( data={"host_type": "client", "fit_start": datetime.datetime.now()}, ) val_metrics = self.validate() self.reports_manager.report( data={"fit_metrics": val_metrics, "host_type": "client", "fit_end": datetime.datetime.now()}, ) return self.get_parameters(config), self.num_test_samples, val_metrics
def _move_data_to_device(self, data: TorchInputType | TorchTargetType) -> TorchTargetType | TorchInputType: """ Moving data to self.device where data is intended to be either input to the model or the targets that the model is trying to achieve Args: data (TorchInputType | TorchTargetType): The data to move to self.device. Can be a TorchInputType or a TorchTargetType Raises: TypeError: Raised if data is not one of the types specified by TorchInputType or TorchTargetType Returns: TorchTargetType | TorchInputType: The data argument except now it's been moved to self.device """ # Currently we expect both inputs and targets to be either tensors # or dictionaries of tensors if isinstance(data, torch.Tensor): return data.to(self.device) elif isinstance(data, dict): return {key: value.to(self.device) for key, value in data.items()} else: raise TypeError( "data must be of type torch.Tensor or dict[str, torch.Tensor]. \ If definition of TorchInputType or TorchTargetType has \ changed this method might need to be updated or split into \ two" )
[docs] def validate(self) -> dict[str, Scalar]: """ Validate the model on the test dataset. Returns: tuple[float, dict[str, Scalar]]: The loss and a dictionary of metrics from test set. """ self.model.eval() self.test_metric_manager.clear() with torch.no_grad(): for input, target in self.test_loader: input = move_data_to_device(input, self.device) target = move_data_to_device(target, self.device) preds = {"predictions": self.model(input)} self.test_metric_manager.update(preds, target) return self.test_metric_manager.compute()
[docs] def evaluate(self, parameters: NDArrays, config: Config) -> tuple[float, int, dict[str, Scalar]]: """ Evaluate the provided parameters using the locally held dataset. Args: parameters (NDArrays): The current model parameters. config (Config): Configuration object from the server. Returns: tuple[float, int, dict[str, Scalar]: The float represents the loss which is assumed to be 0 for the ModelMergeClient. The int represents the number of examples in the local test dataset and the dictionary is the computed metrics on the test set. """ self.set_parameters(parameters, config) metrics = self.validate() return 0.0, len(self.test_loader), metrics
[docs] def get_parameter_exchanger(self, config: Config) -> ParameterExchanger: """ Parameter exchange is assumed to always be full for model merging clients. However, this functionality may be overridden if a different exchanger is needed. Used in non-standard way for ModelMergeClient as set_parameters is only called for evaluate as parameters should initially be set to the parameters in the nn.Module returned by get_model. Args: config (Config): Configuration object from the server. Returns: FullParameterExchanger: The parameter exchanger used to set and get parameters. """ return FullParameterExchanger()
[docs] @abstractmethod def get_model(self, config: Config) -> nn.Module: """ User defined method that returns PyTorch model. This is the local model that will be communicated to the server for merging. Args: config (Config): The config from the server. Returns: nn.Module: The client model. Raises: NotImplementedError: To be defined in child class. """ raise NotImplementedError
[docs] @abstractmethod def get_test_data_loader(self, config: Config) -> DataLoader: """ User defined method that returns a PyTorch Test DataLoader. Args: config (Config): The config from the server. Returns: DataLoader: Client test data loader. """ raise NotImplementedError