decent_dp package

Submodules

decent_dp.ddp module

class decent_dp.ddp.DecentralizedDataParallel(model: Module, optim_fn: Callable[[List[Tuple[str, Tensor]]], Optimizer], lr_scheduler_fn: Callable[[Optimizer], LRScheduler] | None = None, topology: str = 'complete', scaler: GradScaler | None = None, grad_clip_norm: float = 0.0, param_as_bucket_view: bool = True, sync_buffer_in_global_avg: bool = False, bucket_size_in_mb: int = 25, _local_world_size: int | None = None)[source]

Bases: Module

Decentralized data parallel wrapper for PyTorch module

  1. The wrapper places hooks during the backward pass to trace the order of used parameters in the first iteration, and 2. Split the parameters into buckets and create optimizers and LR schedulers for each bucket, Add hooks on the last parameter of each bucket to perform the bucket-wise update and communication, 3. During the backward passes in the training loop, the hooks are triggered to perform the bucket-wise update and communication

Warning:

The wrapper currently does not support “channels_last” memory format

Warning:

The wrapper assumes that the parameter will only be used once in the backward pass

Parameters:
  • model (Module) – PyTorch module to be wrapped

  • optim_fn (OPTIM_FN_TYPE) – Function to create the optimizer, which takes a list of tuples of parameters and their names

  • lr_scheduler_fn (Optional[LR_SCHEDULER_FN_TYPE], optional) – Function to create the learning rate scheduler, which takes the optimizer as input. Defaults to None.

  • topology (str, optional) – Topology of the decentralized communication graph. Defaults to ‘complete’.

  • scaler (Optional[GradScaler], optional) – Gradient scaler for mixed precision training. Defaults to None.

  • grad_clip_norm (float, optional) – Gradient clipping norm, set to 0.0 if no gradient clipping is applied. Defaults to 0.0.

  • param_as_bucket_view (bool, optional) – Whether to use the parameter as a view of part of the contiguous buffer. Defaults to True.

  • sync_buffer_in_global_avg (bool, optional) – Whether to synchronize the float buffers in the global average. Defaults to False.

  • bucket_size_in_mb (int, optional) – Size of the bucket in MB. Defaults to 25 MB.

  • local_world_size (Optional[int], optional) – Provide the local world size if not using the environment variable. Defaults to None.

FLOAT_DTYPES = [torch.float16, torch.float32, torch.float64]
eval()[source]

Set the module in evaluation mode

forward(*args, **kwargs)[source]

Forward pass of the model

global_avg()[source]

Perform global average on the parameters (and buffers if sync_buffer_in_global_avg is True) The function is called at the end of the training loop to synchronize the parameters across all nodes for evaluation

named_parameters(prefix: str = '', recurse: bool = True, remove_duplicate: bool = True) Iterator[Tuple[str, Parameter]][source]

Get the named parameters of the model

parameters(recurse: bool = True) Iterator[Parameter][source]

Get the parameters of the model

Parameters:

recurse (bool, optional) – Whether to get the parameters recursively. Defaults to True.

Yields:

Iterator[Parameter] – The iterator of the parameters

set_accumulate_grad(enable: bool = True)[source]

Set the gradient accumulation mode

Parameters:

enable (bool, optional) – Whether to accumulate the gradients. Defaults to True.

train(mode: bool = True)[source]

Set the module in training mode

Parameters:

mode (bool, optional) – Whether to set the module in training mode. Defaults to True.

decent_dp.ddp.OPTIM_FN_TYPE

Data type for the learning rate scheduler function

alias of Callable[[List[Tuple[str, Tensor]]], Optimizer]

decent_dp.optim module

class decent_dp.optim.AccumAdam(params: Any, lr: float = 0.001, betas: Tuple[float, float] = (0.9, 0.999), eps: float = 1e-08, weight_decay: float = 3.0517578125e-05, accum_iter: int = 4)[source]

Bases: Optimizer

AccumAdamW optimizer

Parameters:
  • params (Any) – parameters list or groups

  • lr (float, optional) – base learning rate. Defaults to 1e-3.

  • betas (Tuple[float, float], optional) – beta1 and beta2. Defaults to (0.9, 0.999).

  • eps (float, optional) – epsilon. Defaults to 1e-8.

  • weight_decay (float, optional) – weight decay. Defaults to 0.

  • accum_iter (int, optional) – number of accumulation steps. Defaults to 4. should be scaling up with the number of workers.

step(closure=None)[source]

Perform a single optimization step to update parameter.

Parameters:

closure (Callable) – A closure that reevaluates the model and returns the loss. Optional for most optimizers.

class decent_dp.optim.AccumAdamW(params: Any, lr: float = 0.001, betas: Tuple[float, float] = (0.9, 0.999), eps: float = 1e-08, weight_decay: float = 0, accum_iter: int = 4)[source]

Bases: Optimizer

AccumAdamW optimizer

Parameters:
  • params (Any) – parameters list or groups

  • lr (float, optional) – base learning rate. Defaults to 1e-3.

  • betas (Tuple[float, float], optional) – beta1 and beta2. Defaults to (0.9, 0.999).

  • eps (float, optional) – epsilon. Defaults to 1e-8.

  • weight_decay (float, optional) – weight decay. Defaults to 0.

  • accum_iter (int, optional) – number of accumulation steps. Defaults to 4. should be scaling up with the number of workers.

step(closure=None)[source]

Perform a single optimization step to update parameter.

Parameters:

closure (Callable) – A closure that reevaluates the model and returns the loss. Optional for most optimizers.

decent_dp.optim.lr_scheduler_fn_cosine_with_warmup(optimizer: Optimizer, t_max: int, t_warmup: int, cosine_eta_min: float = 1e-06, warmup_decay: float = 0.01) LRScheduler[source]

An example of a function that creates a learning rate scheduler that combines a warmup and a cosine annealing schedule.

Returns:

a learning rate scheduler with the linear warmup followed by the cosine annealing

Return type:

LRScheduler

decent_dp.optim.optim_fn_accum_adam(params: List[Tuple[str, Tensor]], lr: float = 0.001, beta1: float = 0.9, beta2: float = 0.999, eps: float = 1e-08, weight_decay: float = 3.0517578125e-05, accum_iter: int = 4) Optimizer[source]
An example of a function that creates an AccumAdam optimizer with the given parameters and their names.

To change the hyperparameters of the optimizer, you can wrap it with functools.partial and pass the new values.

Returns:

an AccumAdam optimizer

Return type:

Optimizer

decent_dp.optim.optim_fn_accum_adamw(params: List[Tuple[str, Tensor]], lr: float = 0.001, beta1: float = 0.9, beta2: float = 0.999, eps: float = 1e-08, weight_decay: float = 0.1, accum_iter: int = 4) Optimizer[source]
An example of a function that creates an AccumAdamW optimizer with the given parameters and their names.

To change the hyperparameters of the optimizer, you can wrap it with functools.partial and pass the new values.

Returns:

an AccumAdamW optimizer

Return type:

Optimizer

decent_dp.optim.optim_fn_adam(params: List[Tuple[str, Tensor]], lr: float = 0.001, beta1: float = 0.9, beta2: float = 0.999, weight_decay: float = 3.0517578125e-05, eps: float = 1e-08) Optimizer[source]
An example of a function that creates an Adam optimizer with the given parameters and their names.

To change the hyperparameters of the optimizer, you can wrap it with functools.partial and pass the new values.

Returns:

an Adam optimizer

Return type:

Optimizer

decent_dp.optim.optim_fn_adamw(params: List[Tuple[str, Tensor]], lr: float = 0.001, beta1: float = 0.9, beta2: float = 0.999, weight_decay: float = 0.1, eps: float = 1e-08) Optimizer[source]
An example of a function that creates an AdamW optimizer with the given parameters and their names.

To change the hyperparameters of the optimizer, you can wrap it with functools.partial and pass the new values.

Returns:

an AdamW optimizer

Return type:

Optimizer

decent_dp.topo module

class decent_dp.topo.Edge(ranks: List[int], weight: float, group: ProcessGroup | None = None)[source]

Bases: object

Edge class for defining communication patterns among workers.

Weight defines the fraction of the message that each worker keeps. For example, if the weight is 0.3, then the worker keeps 30% of its message and shares 70% with other workers. $x_i = x_i * weight + sum_{j in text{ranks}} x_j * (1 - weight) / text{len(ranks)}$. The weight should be between 0 and 1 for convergence.

Parameters:
  • ranks (List[int]) – List of ranks of workers that communicate in this edge

  • weights (List[float]) – List of weights for each worker in the edge (required to be the same length as ranks)

  • group (Optional[ProcessGroup]) – Process group for the edge, which will be created by Topology class

group: ProcessGroup | None = None
ranks: List[int]
weight: float
class decent_dp.topo.Topology(local_world_size)[source]

Bases: object

get_edge(step: int) Edge[source]

Get the edge for the given iteration

class decent_dp.topo.TopologyReg[source]

Bases: object

classmethod register(name: str) Callable[source]
registry: Dict[str, type[Topology]] = {'alternating-exp-ring': <class 'decent_dp.topo.AlternatingExpRingTopology'>, 'complete': <class 'decent_dp.topo.CompleteTopology'>, 'one-peer-exp': <class 'decent_dp.topo.OnePeerExpTopology'>, 'ring': <class 'decent_dp.topo.RingTopology'>}

decent_dp.utils module

decent_dp.utils.initialize_dist() Tuple[int, int][source]

A utility function to initialize the distributed environment

Returns:

rank and world size

Return type:

Tuple[int, int]

Module contents