Source code for fl4health.strategies.fedavg_sparse_coo_tensor

from collections import defaultdict
from collections.abc import Callable
from functools import reduce
from logging import WARNING

import torch
from flwr.common import MetricsAggregationFn, NDArrays, Parameters, ndarrays_to_parameters
from flwr.common.logger import log
from flwr.common.typing import FitRes, Scalar
from flwr.server.client_proxy import ClientProxy
from torch import Tensor

from fl4health.parameter_exchange.parameter_packer import SparseCooParameterPacker
from fl4health.strategies.basic_fedavg import BasicFedAvg
from fl4health.utils.functions import decode_and_pseudo_sort_results


[docs] class FedAvgSparseCooTensor(BasicFedAvg):
[docs] def __init__( self, *, fraction_fit: float = 1.0, fraction_evaluate: float = 1.0, min_fit_clients: int = 2, min_evaluate_clients: int = 2, min_available_clients: int = 2, evaluate_fn: ( Callable[[int, NDArrays, dict[str, Scalar]], tuple[float, dict[str, Scalar]] | None] | None ) = None, on_fit_config_fn: Callable[[int], dict[str, Scalar]] | None = None, on_evaluate_config_fn: Callable[[int], dict[str, Scalar]] | None = None, accept_failures: bool = True, initial_parameters: Parameters | None = None, fit_metrics_aggregation_fn: MetricsAggregationFn | None = None, evaluate_metrics_aggregation_fn: MetricsAggregationFn | None = None, weighted_aggregation: bool = True, weighted_eval_losses: bool = True, ) -> None: """ A generalization of the FedAvg strategy where the server can receive any arbitrary subset of parameters from any arbitrary subset of the clients. Weighted average for parameters belonging to each received tensor is performed independently. Note that this strategy differs from FedAvgDynamicLayer in that it does not require clients to send entire layers (tensors). A client can send an arbitrary set of parameters within a certain tensor, and these parameters are packed according to the sparse COO format. For more information on the sparse COO format and sparse tensors in PyTorch, please see the following two pages: 1. https://pytorch.org/docs/stable/generated/torch.sparse_coo_tensor.html 2. https://pytorch.org/docs/stable/sparse.html Args: fraction_fit (float, optional): Fraction of clients used during training. Defaults to 1.0. Defaults to 1.0. fraction_evaluate (float, optional): Fraction of clients used during validation. Defaults to 1.0. min_fit_clients (int, optional): _description_. Defaults to 2. min_evaluate_clients (int, optional): Minimum number of clients used during validation. Defaults to 2. min_available_clients (int, optional): Minimum number of clients used during validation. Defaults to 2. evaluate_fn (Callable[[int, NDArrays, dict[str, Scalar]], tuple[float, dict[str, Scalar]] | None] | None): Optional function used for central server-side evaluation. Defaults to None. on_fit_config_fn (Callable[[int], dict[str, Scalar]] | None, optional): Function used to configure training by providing a configuration dictionary. Defaults to None. on_evaluate_config_fn (Callable[[int], dict[str, Scalar]] | None, optional): Function used to configure client-side validation by providing a Config dictionary. Defaults to None. accept_failures (bool, optional): Whether or not accept rounds containing failures. Defaults to True. initial_parameters (Parameters | None, optional): Initial global model parameters. Defaults to None. fit_metrics_aggregation_fn (MetricsAggregationFn | None, optional): Metrics aggregation function. Defaults to None. evaluate_metrics_aggregation_fn (MetricsAggregationFn | None, optional): Metrics aggregation function. Defaults to None. weighted_aggregation (bool, optional): Determines whether parameter aggregation is a linearly weighted average or a uniform average. FedAvg default is weighted average by client dataset counts. Defaults to True. weighted_eval_losses (bool, optional): Determines whether losses during evaluation are linearly weighted averages or a uniform average. FedAvg default is weighted average of the losses by client dataset counts. Defaults to True. """ super().__init__( fraction_fit=fraction_fit, fraction_evaluate=fraction_evaluate, min_fit_clients=min_fit_clients, min_evaluate_clients=min_evaluate_clients, min_available_clients=min_available_clients, evaluate_fn=evaluate_fn, on_fit_config_fn=on_fit_config_fn, on_evaluate_config_fn=on_evaluate_config_fn, accept_failures=accept_failures, initial_parameters=initial_parameters, fit_metrics_aggregation_fn=fit_metrics_aggregation_fn, evaluate_metrics_aggregation_fn=evaluate_metrics_aggregation_fn, weighted_aggregation=weighted_aggregation, weighted_eval_losses=weighted_eval_losses, ) self.parameter_packer = SparseCooParameterPacker()
[docs] def aggregate_fit( self, server_round: int, results: list[tuple[ClientProxy, FitRes]], failures: list[tuple[ClientProxy, FitRes] | BaseException], ) -> tuple[Parameters | None, dict[str, Scalar]]: """ Aggregate the results from the federated fit round. The aggregation requires some special treatment, as the participating clients are allowed to exchange an arbitrary set of parameters. So before aggregation takes place alignment must be done using the tensor names packed in along with the weights in the client results. More precisely, this method performs the following steps: 1. Align all tensors according to their names. 2. For tensors that have the same name, construct the sparse COO tensors and convert them to dense tensors. 3. Perform averaging on the dense tensors (can be weighted or unweighted). 4. For every aggregated dense tensor, discard the zero values and retain all information needed to represent it in the sparse COO format. Args: server_round (int): Indicates the server round we're currently on. results (list[tuple[ClientProxy, FitRes]]): The client identifiers and the results of their local training that need to be aggregated on the server-side. In this scheme, the clients pack the tensor names into the results object along with the weight values to allow for alignment during aggregation. failures (list[tuple[ClientProxy, FitRes] | BaseException]): These are the results and exceptions from clients that experienced an issue during training, such as timeouts or exceptions. Returns: tuple[Parameters | None, dict[str, Scalar]]: The aggregated model weights and the metrics dictionary. For sparse tensor exchange we also pack in the names of all of the tensors that were aggregated in this phase to allow clients to insert the values into the proper areas of their models. """ if not results: return None, {} # Do not aggregate if there are failures and failures are not accepted if not self.accept_failures and failures: return None, {} # Sorting the results by elements and sample counts. This is primarily to reduce numerical fluctuations in # summing the numpy arrays during aggregation. This ensures that addition will occur in the same order, # reducing numerical fluctuation. # Convert client tensor weights and names into ndarrays decoded_and_sorted_results = [ (weights, sample_counts) for _, weights, sample_counts in decode_and_pseudo_sort_results(results) ] # For each tensor of the model, perform weighted average of all received weights from clients aggregated_tensors = self.aggregate(decoded_and_sorted_results) tensor_names = [] selected_parameters_all_tensors = [] selected_indices_all_tensors = [] tensor_shapes = [] for tensor_name, aggregated_tensor in aggregated_tensors.items(): selected_parameters, selected_indices, tensor_shape = self.parameter_packer.extract_coo_info_from_dense( aggregated_tensor ) tensor_names.append(tensor_name) selected_parameters_all_tensors.append(selected_parameters) selected_indices_all_tensors.append(selected_indices) tensor_shapes.append(tensor_shape) packed_parameters = self.parameter_packer.pack_parameters( selected_parameters_all_tensors, (selected_indices_all_tensors, tensor_shapes, tensor_names) ) # Aggregate custom metrics if aggregation fn was provided metrics_aggregated = {} if self.fit_metrics_aggregation_fn: fit_metrics = [(res.num_examples, res.metrics) for _, res in results] metrics_aggregated = self.fit_metrics_aggregation_fn(fit_metrics) elif server_round == 1: # Only log this warning once log(WARNING, "No fit_metrics_aggregation_fn provided") return ndarrays_to_parameters(packed_parameters), metrics_aggregated
[docs] def aggregate(self, results: list[tuple[NDArrays, int]]) -> dict[str, Tensor]: """ Aggregate the different tensors across clients that have contributed to a certain tensor. This aggregation may be weighted or unweighted. The called functions handle tensor alignment. Args: results (list[tuple[NDArrays, int]]): The weight results from each client's local training that need to be aggregated on the server-side and the number of training samples held on each client. In this scheme, the clients pack the tensor names into the results object along with the weight values to allow for alignment during aggregation. Returns: dict[str, Tensor]: A dictionary mapping the name of the tensor that was aggregated to the aggregated weights. """ if self.weighted_aggregation: return self.weighted_aggregate(results) else: return self.unweighted_aggregate(results)
[docs] def weighted_aggregate(self, results: list[tuple[NDArrays, int]]) -> dict[str, Tensor]: """ "results" consist of four parts: the exchanged (nonzero) parameter values, their coordinates within the tensor to which they belong, the shape of that tensor, and finally the name of that tensor. The first three items constitute the information that is needed to construct the tensor in the sparse COO format and convert it to a regular dense tensor. The tensor name is used to align tensors to ensure that averaging is performed only among tensors with the same name. This method performs the following steps: 1. Align all tensors according to their names. 2. For tensors that have the same name, construct the sparse COO tensors and convert them to dense tensors. 3. Perform weighted averaging on the dense tensors according to the number of training examples each client has. Note: this method performs weighted averaging. Args: results (list[tuple[NDArrays, int]]): The weight results from each client's local training that need to be aggregated on the server-side and the number of training samples held on each client. The weight results consist of four parts, as detailed above. In this scheme, the clients pack the layer names into the results object along with the weight values to allow for alignment during aggregation. Returns: dict[str, Tensor]: A dictionary mapping the name of the tensor that was aggregated to the aggregated weights. """ names_to_dense_tensors: defaultdict[str, list[Tensor]] = defaultdict(list) total_num_examples: defaultdict[str, int] = defaultdict(int) for packed_parameters, num_examples in results: nonzero_parameter_values, additional_info = self.parameter_packer.unpack_parameters(packed_parameters) parameter_indices, tensor_shapes, tensor_names = additional_info # Sanity check to ensure that they all have the same length and the length is > 0. assert ( len(nonzero_parameter_values) == len(parameter_indices) == len(tensor_shapes) == len(tensor_names) and len(tensor_names) > 0 ) for tensor_params, tensor_param_indices, tensor_shape, tensor_name in zip( nonzero_parameter_values, parameter_indices, tensor_shapes, tensor_names ): coo_tensor = torch.sparse_coo_tensor( indices=torch.tensor(tensor_param_indices.T), values=torch.tensor(tensor_params), size=torch.Size(tensor_shape), ) dense_tensor = coo_tensor.to_dense() names_to_dense_tensors[tensor_name].append(dense_tensor * num_examples) total_num_examples[tensor_name] += num_examples names_to_tensors_aggregated = { name_key: (reduce(torch.add, names_to_dense_tensors[name_key]) / total_num_examples[name_key]) for name_key in names_to_dense_tensors } return names_to_tensors_aggregated
[docs] def unweighted_aggregate(self, results: list[tuple[NDArrays, int]]) -> dict[str, Tensor]: """ "results" consist of four parts: the exchanged (nonzero) parameter values, their coordinates within the tensor to which they belong, the shape of that tensor, and finally the name of that tensor. The first three items constitute the information that is needed to construct the tensor in the sparse COO format and convert it to a regular dense tensor. The tensor name is used to align tensors to ensure that averaging is performed only among tensors with the same name. This method performs the following steps: 1. Align all tensors according to their names. 2. For tensors that have the same name, construct the sparse COO tensors and convert them to dense tensors. 3. Perform uniform averaging on the dense tensors across all clients. Note: this method performs uniform averaging. Args: results (list[tuple[NDArrays, int]]): The weight results from each client's local training that need to be aggregated on the server-side and the number of training samples held on each client. The weight results consist of four parts, as detailed above. In this scheme, the clients pack the layer names into the results object along with the weight values to allow for alignment during aggregation. Returns: dict[str, Tensor]: A dictionary mapping the name of the tensor that was aggregated to the aggregated weights. """ names_to_dense_tensors: defaultdict[str, list[Tensor]] = defaultdict(list) total_num_clients: defaultdict[str, int] = defaultdict(int) for packed_parameters, _ in results: nonzero_parameter_values, additional_info = self.parameter_packer.unpack_parameters(packed_parameters) parameter_indices, tensor_shapes, tensor_names = additional_info # Sanity check. assert ( len(nonzero_parameter_values) == len(parameter_indices) == len(tensor_shapes) == len(tensor_names) and len(tensor_names) > 0 ) for tensor_params, tensor_param_indices, tensor_shape, tensor_name in zip( nonzero_parameter_values, parameter_indices, tensor_shapes, tensor_names ): coo_tensor = torch.sparse_coo_tensor( indices=torch.tensor(tensor_param_indices.T), values=torch.tensor(tensor_params), size=torch.Size(tensor_shape), ) dense_tensor = coo_tensor.to_dense() names_to_dense_tensors[tensor_name].append(dense_tensor) total_num_clients[tensor_name] += 1 names_to_tensors_aggregated = { name_key: (reduce(torch.add, names_to_dense_tensors[name_key]) / total_num_clients[name_key]) for name_key in names_to_dense_tensors } return names_to_tensors_aggregated