from abc import ABC, abstractmethod
from typing import Generic, TypeVar
import numpy as np
import torch
from flwr.common.typing import NDArray, NDArrays
from torch import Tensor
T = TypeVar("T")
[docs]
class ParameterPacker(ABC, Generic[T]):
[docs]
@abstractmethod
def pack_parameters(self, model_weights: NDArrays, additional_parameters: T) -> NDArrays:
raise NotImplementedError
[docs]
@abstractmethod
def unpack_parameters(self, packed_parameters: NDArrays) -> tuple[NDArrays, T]:
raise NotImplementedError
[docs]
class ParameterPackerWithControlVariates(ParameterPacker[NDArrays]):
[docs]
def __init__(self, size_of_model_params: int) -> None:
"""
Class to handle the exchange of control variates for the SCAFFOLD FL method
**NOTE** model params exchanged and control variates can be different sizes, for example, when layers are
frozen or the state dictionary contains things like Batch Normalization layers.
Args:
size_of_model_params (int): This is the number of layers that are associated with the parameters of the
model itself. This is used to split the covariates from the model parameters during unpacking.
"""
self.size_of_model_params = size_of_model_params
super().__init__()
[docs]
def pack_parameters(self, model_weights: NDArrays, additional_parameters: NDArrays) -> NDArrays:
return model_weights + additional_parameters
[docs]
def unpack_parameters(self, packed_parameters: NDArrays) -> tuple[NDArrays, NDArrays]:
return packed_parameters[: self.size_of_model_params], packed_parameters[self.size_of_model_params :]
[docs]
class ParameterPackerWithClippingBit(ParameterPacker[float]):
[docs]
def pack_parameters(self, model_weights: NDArrays, additional_parameters: float) -> NDArrays:
return model_weights + [np.array(additional_parameters)]
[docs]
def unpack_parameters(self, packed_parameters: NDArrays) -> tuple[NDArrays, float]:
# The last entry in the parameters list is assumed to be a clipping bound (even if we're evaluating)
split_size = len(packed_parameters) - 1
model_parameters = packed_parameters[:split_size]
clipping_bound = packed_parameters[split_size:][0]
return model_parameters, clipping_bound.item()
[docs]
class ParameterPackerAdaptiveConstraint(ParameterPacker[float]):
[docs]
def pack_parameters(self, model_weights: NDArrays, extra_adaptive_variable: float) -> NDArrays:
return model_weights + [np.array(extra_adaptive_variable)]
[docs]
def unpack_parameters(self, packed_parameters: NDArrays) -> tuple[NDArrays, float]:
# The last entry is an extra packed adaptive constraint variable (information to allow for adaptation)
split_size = len(packed_parameters) - 1
model_parameters = packed_parameters[:split_size]
# The packed contents should have length 1
packed_contents = packed_parameters[split_size:]
assert len(packed_contents) == 1
extra_adaptive_variable = float(packed_contents[0])
return model_parameters, extra_adaptive_variable
[docs]
class ParameterPackerWithLayerNames(ParameterPacker[list[str]]):
[docs]
def pack_parameters(self, model_weights: NDArrays, weights_names: list[str]) -> NDArrays:
return model_weights + [np.array(weights_names)]
[docs]
def unpack_parameters(self, packed_parameters: NDArrays) -> tuple[NDArrays, list[str]]:
"""
Function to separate the model parameters from the layer names that have been packed with them.
Args:
packed_parameters (NDArrays): packed_parameters is a list containing model parameters followed by an
NDArray that contains the corresponding names of those parameters.
Returns:
tuple[NDArrays, list[str]]: tuple of model parameters and the names of the layers to which they correspond
"""
split_size = len(packed_parameters) - 1
model_parameters = packed_parameters[:split_size]
param_names = packed_parameters[split_size:][0].tolist()
return model_parameters, param_names
[docs]
class SparseCooParameterPacker(ParameterPacker[tuple[NDArrays, NDArrays, list[str]]]):
"""
This parameter packer is responsible for selecting an arbitrary set of parameters and then representing them in
the sparse COO tensor format, which requires knowing the indices of the parameters within the tensor to which they
belong, the shape of that tensor, and also the name of it.
For more information on the sparse COO format and sparse tensors in PyTorch, please see the following
two pages:
1. https://pytorch.org/docs/stable/generated/torch.sparse_coo_tensor.html
2. https://pytorch.org/docs/stable/sparse.html
"""
[docs]
def pack_parameters(
self, model_parameters: NDArrays, additional_parameters: tuple[NDArrays, NDArrays, list[str]]
) -> NDArrays:
parameter_indices, tensor_shapes, tensor_names = additional_parameters
return model_parameters + parameter_indices + tensor_shapes + [np.array(tensor_names)]
[docs]
def unpack_parameters(self, packed_parameters: NDArrays) -> tuple[NDArrays, tuple[NDArrays, NDArrays, list[str]]]:
# The names of the tensors is wrapped in a list, which is then transformed into an NDArrays of length 1
# before packing.
assert len(packed_parameters) % 3 == 1
split_size = (len(packed_parameters) - 1) // 3
model_parameters = packed_parameters[:split_size]
parameter_indices = packed_parameters[split_size : (2 * split_size)]
tensor_shapes = packed_parameters[(2 * split_size) : (3 * split_size)]
tensor_names = packed_parameters[(3 * split_size) :][0].tolist()
return model_parameters, (parameter_indices, tensor_shapes, tensor_names)