Source code for mipcandy.profiler
from dataclasses import dataclass
from inspect import stack
from os import PathLike
from time import time
from typing import Sequence, override
import torch
from psutil import cpu_percent, virtual_memory
from mipcandy.data import dump_allocated_tensors
from mipcandy.types import Device, AmbiguousShape
[docs]
@dataclass
class ProfilerFrame(object):
stack: str
cpu: float
mem: float
gpu: list[float] | None = None
gpu_mem: list[float] | None = None
[docs]
@override
def __str__(self) -> str:
r = f"[{self.stack}] CPU: {self.cpu:.2f}% @ Memory: {self.mem:.2f}%\n"
if self.gpu and self.gpu_mem:
for i, gpu in enumerate(self.gpu):
r += f"\t\tGPU {i}: {gpu:.2f}% @ Memory: {self.gpu_mem[i]:.2f}%\n"
return r
[docs]
def export(self, duration: float) -> str:
return f"{duration:.2f}s\t{self}"
[docs]
class _LineBreak(object):
def __init__(self, message: str) -> None:
self.message: str = message
[docs]
@override
def __str__(self) -> str:
return f"<{self.message}>\n"
[docs]
def export(self, duration: float) -> str:
return f"{duration:.2f}s\t{self}"
[docs]
class Profiler(object):
def __init__(self, title: str, save_as: str | PathLike[str], *, gpus: Sequence[Device] = ()) -> None:
self.title: str = title
self.save_as: str = save_as
self.total_mem: float = self.get_total_mem()
self.has_gpu: bool = len(gpus) > 0
self._gpus: Sequence[Device] = gpus
self.total_gpu_mem: list[float] = [self.get_total_gpu_mem(device) for device in gpus]
with open(save_as, "w") as f:
f.write(f"# {title}\nTotal memory: {self.total_mem}, Total GPU memory: {self.total_gpu_mem}\n\n")
self._t0: float = time()
self._allocated_tensors: tuple[float, list[tuple[
float, AmbiguousShape, torch.dtype, torch.device, bool, str]]] = (0, [])
[docs]
@staticmethod
def get_cpu_usage() -> float:
return cpu_percent()
[docs]
def get_mem_usage(self) -> float:
return 100 * virtual_memory().used / self.total_mem
[docs]
@staticmethod
def get_total_mem() -> float:
return virtual_memory().total
[docs]
@staticmethod
def get_gpu_usage(device: Device) -> float:
return torch.cuda.utilization(device)
[docs]
def get_gpu_mem_usage(self, device: Device) -> float:
return 100 * torch.cuda.device_memory_used(device) / self.total_gpu_mem[self._gpus.index(device)]
[docs]
@staticmethod
def get_total_gpu_mem(device: Device) -> float:
return torch.cuda.get_device_properties(device).total_memory
[docs]
def _save(self, obj: ProfilerFrame | _LineBreak | str) -> None:
with open(self.save_as, "a") as f:
t = time()
f.write(obj + "\n" if isinstance(obj, str) else f"{obj.export(t - self._t0)}\n")
self._t0 = t
[docs]
def record_allocated_tensors(self, *, limit: int = 10) -> str:
allocated_tensors = dump_allocated_tensors()
counted_tensors = []
added_tensors = []
removed_tensors = []
tensors, prev_tensors = allocated_tensors[1], self._allocated_tensors[1]
for tensor in tensors + prev_tensors:
if tensor in counted_tensors:
continue
sz, shape, dtype, device, requires_grad, grad_fn = tensor
t = f"{sz:8.1f} MB | {shape} | {dtype} | {device} | grad={requires_grad} | {grad_fn}"
if tensor in prev_tensors:
if tensor in tensors:
num_diff = tensors.count(tensor) - prev_tensors.count(tensor)
if num_diff > 0:
added_tensors.append(f"{num_diff} x {t}")
if num_diff < 0:
removed_tensors.append(f"{num_diff} x {t}")
else:
removed_tensors.append(f"{-prev_tensors.count(tensor)} x {t}")
else:
added_tensors.append(f"{tensors.count(tensor)} x {t}")
counted_tensors.append(tensor)
if len(added_tensors) > limit:
added_tensors = added_tensors[:limit]
if len(removed_tensors) > limit:
removed_tensors = removed_tensors[:limit]
r = (f"Total size diff: {allocated_tensors[0] - self._allocated_tensors[0]} MB\n"
f"Added tensors:\n{"\t\n".join(added_tensors)}\nRemoved tensors:\n{"\t\n".join(removed_tensors)}\n")
self._save(r)
self._allocated_tensors = allocated_tensors
return r
[docs]
def record(self, *, stack_trace_offset: int = 1) -> ProfilerFrame:
frame = ProfilerFrame(" -> ".join([f"{f.function}:{f.lineno}" for f in reversed(stack()[stack_trace_offset:])]),
self.get_cpu_usage(), self.get_mem_usage())
if self.has_gpu:
frame.gpu = [torch.cuda.utilization(device) for device in self._gpus]
frame.gpu_mem = [self.get_gpu_mem_usage(device) for device in self._gpus]
self._save(frame)
return frame
[docs]
def line_break(self, message: str) -> _LineBreak:
r = _LineBreak(message)
self._save(r)
return r