blazefl.core.ThreadPoolClientTrainer#

class blazefl.core.ThreadPoolClientTrainer(*args, **kwargs)[source]#

Bases: BaseClientTrainer[UplinkPackage, DownlinkPackage], Protocol[UplinkPackage, DownlinkPackage]

__init__(*args, **kwargs)#

Methods

__init__(*args, **kwargs)

get_client_device(cid)

local_process(payload, cid_list)

Process the downlink payload from the server for a list of client IDs.

progress_fn(it)

A no-op progress function that can be overridden to provide custom progress tracking.

uplink_package()

Prepare the data package to be sent from the client to the server.

worker(cid, device, payload, stop_event)

Process a single client's training task in a thread.

Attributes

cache: list[UplinkPackage]#
device: str#
device_count: int#
get_client_device(cid: int) str[source]#
local_process(payload: DownlinkPackage, cid_list: list[int]) None[source]#

Process the downlink payload from the server for a list of client IDs.

Parameters:
  • payload (DownlinkPackage) – The data package received from the server.

  • cid_list (list[int]) – A list of client IDs to process.

Returns:

None

num_parallels: int#
progress_fn(it: list[Future[UplinkPackage]]) Iterable[Future[UplinkPackage]][source]#

A no-op progress function that can be overridden to provide custom progress tracking.

Parameters:

it (list[Future[UplinkPackage]]) – A list of Future objects representing the results of client processing.

Returns:

The original iterable.

Return type:

Iterable[Future[UplinkPackage]]

stop_event: Event#
worker(cid: int, device: str, payload: DownlinkPackage, stop_event: Event) UplinkPackage[source]#

Process a single client’s training task in a thread.

Parameters:
  • cid (int) – The client ID.

  • device (str) – The device to use for processing this client.

  • payload (DownlinkPackage) – The data package received from the server.

  • stop_event (threading.Event) – Event to signal stopping the worker.

Returns:

The uplink package containing the client’s results.

Return type:

UplinkPackage