importconcurrent.futuresfromflwr.common.typingimportCode,GetPropertiesIns,GetPropertiesResfromflwr.server.client_proxyimportClientProxyPollResultsAndFailures=tuple[list[tuple[ClientProxy,GetPropertiesRes]],list[tuple[ClientProxy,GetPropertiesRes]|BaseException],]def_handle_finished_future_after_poll(future:concurrent.futures.Future,results:list[tuple[ClientProxy,GetPropertiesRes]],failures:list[tuple[ClientProxy,GetPropertiesRes]|BaseException],)->None:""" Convert finished future into either a result or a failure for polling. Args: future (concurrent.futures.Future): The future returned by a client executing polling. It is either added to results if there are no exceptions or failures if there are any. results (list[tuple[ClientProxy, GetPropertiesRes]]): Set of good results from clients that have accumulated. failures (list[tuple[ClientProxy, GetPropertiesRes] | BaseException]): The set of failing results that have accumulated for the polling. """# Check if there was an exceptionfailure=future.exception()iffailureisnotNone:failures.append(failure)return# Successfully received a result from a clientresult:tuple[ClientProxy,GetPropertiesRes]=future.result()_,res=result# Check result status codeifres.status.code==Code.OK:results.append(result)return# Not successful, client returned a result where the status code is not OKfailures.append(result)
[docs]defpoll_client(client:ClientProxy,ins:GetPropertiesIns)->tuple[ClientProxy,GetPropertiesRes]:""" Get Properties of client. This is run for each client to extract the properties from the target client. Args: client (ClientProxy): Client proxy representing one of the clients managed by the server. ins (GetPropertiesIns): ins provides any configurations required to help the client retrieve the correct properties. Returns: tuple[ClientProxy, GetPropertiesRes]: Returns the resulting properties from the client response. """property_res:GetPropertiesRes=client.get_properties(ins=ins,timeout=None,group_id=None)returnclient,property_res
[docs]defpoll_clients(client_instructions:list[tuple[ClientProxy,GetPropertiesIns]],max_workers:int|None,timeout:float|None,)->PollResultsAndFailures:""" Poll clients concurrently on all selected clients. Args: client_instructions (list[tuple[ClientProxy, GetPropertiesIns]]): This is the set of instructions for the polling to be passed to each client. Each client is represented by a single ``ClientProxy`` in the list. max_workers (int | None): This is the maximum number of concurrent workers to be used by the server to poll the clients. This should be set if pooling an extremely large number, if none a maximum of 32 workers are used. timeout (float | None): How long the executor should wait to receive a response before moving on. Returns: PollResultsAndFailures: Object holding the results and failures associate with the concurrent polling. """withconcurrent.futures.ThreadPoolExecutor(max_workers=max_workers)asexecutor:submitted_fs={executor.submit(poll_client,client_proxy,property_ins)forclient_proxy,property_insinclient_instructions}finished_fs,_=concurrent.futures.wait(fs=submitted_fs,timeout=None,# Handled in the respective communication stack)# Gather resultsresults:list[tuple[ClientProxy,GetPropertiesRes]]=[]failures:list[tuple[ClientProxy,GetPropertiesRes]|BaseException]=[]forfutureinfinished_fs:_handle_finished_future_after_poll(future=future,results=results,failures=failures)returnresults,failures