10/18/2024

Sequence Parallel(SP)

toy model

class ToyModel(nn.Module):
"""MLP based model"""
def __init__(self):
super().__init__()
self.in_proj = nn.Linear(10, 32)
self.relu = nn.ReLU()
self.out_proj = nn.Linear(32, 5)

def forward(self, x):
return self.out_proj(self.relu(self.in_proj(x)))

 .

configuration

sp_model = parallelize_module(
module=model,
device_mesh=device_mesh,
parallelize_plan={
"in_proj": ColwiseParallel(input_layouts=Shard(0)),
"out_proj": RowwiseParallel(output_layouts=Shard(0)),
},
)

..






  1. Input Sharding:
    • The input sequence (shape [4 x 12 x 10]) is initially split along the sequence length dimension across 3 GPUs.
    • Each GPU receives a [4 x 4 x 10] shard of the input.
  2. All-Gather Operation:
    • An all-gather operation is performed to reconstruct the full input on each GPU.
    • After this, each GPU has the full [4 x 12 x 10] input.
  3. First Layer - in_proj (ColwiseParallel):
    • The weight matrix [10 x 32] is split column-wise across GPUs: [10 x 11], [10 x 11], [10 x 10].
    • Each GPU processes the full input [4 x 12 x 10] with its portion of the weight matrix.
    • The output on each GPU is [4 x 12 x 11], [4 x 12 x 11], and [4 x 12 x 10] respectively.
  4. ReLU Activation:
    • Applied element-wise to the output of the first layer on each GPU.
    • Shapes remain [4 x 12 x 11], [4 x 12 x 11], and [4 x 12 x 10] on the respective GPUs.
  5. Second Layer - out_proj (RowwiseParallel):
    • The weight matrix [32 x 5] is split row-wise across GPUs: [11 x 5], [11 x 5], [10 x 5].
    • Each GPU processes its input ([4 x 12 x 11], [4 x 12 x 11], [4 x 12 x 10]) with its portion of the weight matrix.
    • The output on each GPU is [4 x 12 x 5], representing partial sums for the full sequence.
  6. Reduce-Scatter Operation:
    • A reduce-scatter operation is performed to sum the partial results and distribute them across GPUs.
    • This results in each GPU having a portion of the final output, sharded along the sequence dimension.

Key Corrections and Clarifications:

  • There are indeed two collective operations: an all-gather at the beginning and a reduce-scatter at the end.
  • The GPUs do not receive the same amount of tensor in the first layer output due to the uneven split of the weight matrix.
  • The sequence dimension (12 in this example) is not sharded during the middle layers but is reconstructed and then re-sharded at the end.

This corrected diagram and explanation more accurately represent the sequence parallelism process as described in the original comment. It shows how the input is gathered, processed in parallel, and then the output is scattered, allowing for efficient parallel processing of the entire sequence across GPUs.



full source code
.
import os
import sys
import torch
import torch.nn as nn
from torch.distributed._tensor import Shard
from torch.distributed.tensor.parallel import (
parallelize_module,
ColwiseParallel,
RowwiseParallel,
)
from log_utils import rank_log, get_logger, verify_min_gpu_count
import torch.profiler

# ---- GPU check ------------
_min_gpu_count = 2
if not verify_min_gpu_count(min_gpus=_min_gpu_count):
print(f"Unable to locate sufficient {_min_gpu_count} gpus to run this example. Exiting.")
sys.exit()
# ---------------------------
from torch.distributed._tensor.device_mesh import init_device_mesh



"""
This is the script to test Sequence Parallel(SP) on a toy model in a
Megetron-LM SPMD style. We show an E2E working flow from forward,
backward and optimization.

We use the example of two `nn.Linear` layers with an element-wise `nn.RELU`
in between to show an example of sequence parallel, which was proposed in paper:

https://arxiv.org/pdf/2205.05198.pdf.

Like tensor parallel, we parallelize the first linear layer by column
and also parallelize the second linear layer by row. But the input in each rank
now is different so that we need one all-gather for input and one reduce-scatter
in the end of the second linear layer.
"""

class ToyModel(nn.Module):
"""MLP based model"""
def __init__(self):
super().__init__()
self.in_proj = nn.Linear(10, 32)
self.relu = nn.ReLU()
self.out_proj = nn.Linear(32, 5)

def forward(self, x):
return self.out_proj(self.relu(self.in_proj(x)))

def main():
logger = get_logger()
# create a device mesh based on the given world_size.
device_mesh = init_device_mesh(
device_type="cuda", mesh_shape=(int(os.environ["WORLD_SIZE"]),)
)
_rank = device_mesh.get_rank()
print(f"Starting PyTorch Sequence Parallel example on rank {_rank}.")
rank_log(_rank, logger, f"Device Mesh created: {device_mesh=}")

# create model and move it to GPU. Init_device_mesh has already assigned gpu ids...
model = ToyModel().to("cuda")

# Custom parallelization plan for the model
sp_model = parallelize_module(
module=model,
device_mesh=device_mesh,
parallelize_plan={
"in_proj": ColwiseParallel(input_layouts=Shard(0)),
"out_proj": RowwiseParallel(output_layouts=Shard(0)),
},
)

# Create a optimizer for the parallelized module.
lr = 0.25
optimizer = torch.optim.AdamW(sp_model.parameters(), lr=lr, foreach=True)

# Perform a num of iterations of forward/backward
# and optimizations for the sharded module.
num_iters = 10
rank_log(_rank, logger, "Sequence Parallel training starting...")

with torch.profiler.profile(
activities=[
torch.profiler.ProfilerActivity.CPU,
torch.profiler.ProfilerActivity.CUDA,
],
schedule=torch.profiler.schedule(wait=1, warmup=1, active=3, repeat=2),
on_trace_ready=torch.profiler.tensorboard_trace_handler(f'./log/tensorboard/rank_{_rank}'),
record_shapes=True,
profile_memory=True,
with_stack=True
) as prof:
for i in range(num_iters):
# For SP, input can be different across all ranks.
inp = torch.rand(20, 10, device="cuda")
output = sp_model(inp)
output.sum().backward()
optimizer.step()
rank_log(_rank, logger, f"Sequence Parallel iter {i} completed")
prof.step()

rank_log(_rank, logger, "Sequence Parallel training completed!")

# Print profiler results
print(prof.key_averages().table(sort_by="cuda_time_total", row_limit=10))

if __name__ == "__main__":
main()
..

Thank you!

10/10/2024

FSDP and TP explanation for 2 layer model

 FSDP and TP are complementary parallelism techniques:

  1. FSDP (Fully Sharded Data Parallelism):
    • Shards model parameters across GPUs
    • Each GPU holds a portion of each layer's parameters
    • During forward/backward pass, it gathers/scatters parameters as needed
    • Reduces memory usage per GPU, allowing larger models
  2. TP (Tensor Parallelism):
    • Splits individual tensors (layers) across GPUs
    • Each GPU computes a portion of a layer's operations
    • Useful for very large layers that don't fit on a single GPU

When combined:

  • FSDP handles overall model distribution
  • TP handles distribution of large individual layers
  • This allows for even larger models and better GPU utilization

Textual Representation:

GPU 1 GPU 2 GPU 3 GPU 4 +--------+ +--------+ +--------+ +--------+ | L1 P1 | | L1 P2 | | L2 P1 | | L2 P2 | | TP1 | | TP2 | | TP1 | | TP2 | +--------+ +--------+ +--------+ +--------+ | | | | +------------+ +------------+ Layer 1 Layer 2 L1, L2: Layers 1 and 2 P1, P2: Parameter shards (FSDP) TP1, TP2: Tensor Parallel splits

9/30/2024

How Gradient calculation in batch size.

 Let's use a simplified example with just 2 data points and walk through the process with actual numbers. This will help illustrate how gradients are calculated and accumulated for a batch.

Let's assume we have a very simple model with one parameter w, currently set to 1.0. Our loss function is the square error, and we're using basic gradient descent with a learning rate of 0.1.

Data points:

  1. x1 = 2, y1 = 4
  2. x2 = 3, y2 = 5

Batch size = 2 (both data points in one batch)

Step 1: Forward pass

  • For x1: prediction = w * x1 = 1.0 * 2 = 2
  • For x2: prediction = w * x2 = 1.0 * 3 = 3

Step 2: Calculate losses

  • Loss1 = (prediction1 - y1)^2 = (2 - 4)^2 = 4
  • Loss2 = (prediction2 - y2)^2 = (3 - 5)^2 = 4
  • Total batch loss = (Loss1 + Loss2) / 2 = (4 + 4) / 2 = 4

Step 3: Backward pass (calculate gradients)

  • Gradient1 = 2 * (prediction1 - y1) * x1 = 2 * (2 - 4) * 2 = -8
  • Gradient2 = 2 * (prediction2 - y2) * x2 = 2 * (3 - 5) * 3 = -12

Step 4: Accumulate gradients

  • Total gradient = (Gradient1 + Gradient2) / 2 = (-8 + -12) / 2 = -10

Step 5: Update weight (once for the batch)

  • New w = old w - learning_rate * total gradient
  • New w = 1.0 - 0.1 * (-10) = 2.0

So, after processing this batch of 2 data points:

  • We calculated 2 individual gradients (-8 and -12)
  • We accumulated these into one total gradient (-10)
  • We performed one weight update, changing w from 1.0 to 2.0

This process would then repeat for the next batch. In this case, we've processed all our data, so this completes one epoch.

9/28/2024

How many GPUs do I need to train a LLM?



How many GPUs do I need to train a LLM?

This is a complicated question in general, but if we assume that you are using FSDP with 
FULL_SHARD, activation checkpointing, and DecoupledLionW, then a good rule of thumb is:

Your total cluster memory in GB should be larger than 12 * N (# billions of params).

E.g. To train a GPT-13B model which has ~13 billion params, 
have at least 12 * 13 = 156 GB of total memory across your GPUs. 
You can accomplish this with 4xA100-40GB, or 2xA100-80GB, etc.

If you run into OOM errors when using small device counts, 
reduce device_train_microbatch_size until it succeeds.

Keep in mind: even though training will work in these minimalist settings, 
you will get much better throughput_per_device 
if you use a larger cluster or devices with higher memory capacity, 
because this will enable you to use larger microbatch sizes.

9/22/2024

What is TorchOps.cpp.inc in torch-mlir

 

What is TorchOps.cpp.inc?

  • TorchOps.cpp.inc: This file contains implementations of the operations for the torch-mlir dialect. It is typically generated from .td (TableGen) files that define the dialect and its operations.
  • The .td (TableGen) files describe MLIR operations in a high-level, declarative form, and the cmake build process automatically generates .cpp.inc files (like TorchOps.cpp.inc) from these .td files.

How it gets generated:

  1. TableGen: The TableGen tool processes .td files that define the operations and attributes for the torch dialect.
  2. CMake Build: During the CMake build process, the mlir-tblgen tool is invoked to generate various .inc files, including TorchOps.cpp.inc.

Where It Is Generated:

The TorchOps.cpp.inc file is usually generated in the build directory under the subdirectories for the torch-mlir project. For example:


build/tools/torch-mlir/lib/Dialect/Torch/IR/TorchOps.cpp.inc

This file gets included in the compiled source code to provide the implementation of the Torch dialect operations.

How to Ensure It Is Generated:

If the file is missing, it's likely because there was an issue in the build process. Here’s how to ensure it’s generated:

  1. Ensure CMake and Ninja Build: Make sure the CMake and Ninja build process is working correctly by following the steps we discussed earlier. You can check that the TorchOps.cpp.inc file is generated by looking in the build directory:

    ls build/tools/torch-mlir/lib/Dialect/Torch/IR/
  2. Check for TableGen Files: Make sure that the .td files (such as TorchOps.td) are present in the source directory. These are used by mlir-tblgen to generate the .cpp.inc files.

Debugging if Not Generated:

If TorchOps.cpp.inc or similar files are not generated, ensure:

  • You are running the full build using ninja or make.
  • mlir-tblgen is being invoked during the build process (you should see log messages referencing mlir-tblgen).

IREE test code and explanation

.

from iree import compiler, runtime
import numpy as np
import sys

def print_step(step):
print(f'Step: {step}', file=sys.stderr)

# MLIR code as a string
module_str = '''
func.func @simple_add(%arg0: tensor<4xf32>, %arg1: tensor<4xf32>) -> tensor<4xf32> {
%0 = arith.addf %arg0, %arg1 : tensor<4xf32>
return %0 : tensor<4xf32>
}
'''

print_step('Compiling module')
compiled_module = compiler.compile_str(module_str, target_backends=['llvm-cpu'])

print_step('Creating runtime config')
config = runtime.Config('local-task')

print_step('Creating system context')
ctx = runtime.SystemContext(config=config)

print_step('Creating VM instance')
vm_instance = runtime.VmInstance()

print_step('Creating VM module')
vm_module = runtime.VmModule.from_flatbuffer(vm_instance, compiled_module, warn_if_copy=False)

print_step('Adding VM module to context')
ctx.add_vm_module(vm_module)

print_step('Getting device')
device = runtime.get_driver('local-task').create_default_device()
print(f'Device: {device}', file=sys.stderr)

print_step('Getting function')
f = ctx.modules.module.simple_add

print_step('Creating device arrays')
arg1 = runtime.asdevicearray(device, np.array([1.0, 2.0, 3.0, 4.0], dtype=np.float32))
arg2 = runtime.asdevicearray(device, np.array([5.0, 6.0, 7.0, 8.0], dtype=np.float32))

print_step('Calling function')
result = f(arg1, arg2)

print_step('Getting result')
print(result.to_host())

print_step('Script completed successfully')

..

To run this code:

  1. Save it to a file, e.g., test_iree.py.
  2. Make sure you have IREE and its Python bindings installed and properly set up in your environment.
  3. Run the script using Python:
    python test_iree.py

This script will:

  1. Define a simple MLIR function that adds two 4-element float32 tensors.
  2. Compile this MLIR code to an IREE module.
  3. Set up the IREE runtime environment.
  4. Create input data as NumPy arrays.
  5. Execute the compiled function with the input data.
  6. Print the result.

The output should show each step of the process and finally print the result, which should be [ 6. 8. 10. 12.].

This example demonstrates the basic workflow for testing MLIR code with IREE using Python. You can modify the MLIR code string and input data to test different functions and operations as needed.