Imahn Shekhzadeh

Software Developer | Consultant | Alumnus of the German Academic Scholarship Foundation | MSc Physics

About Me

I am a software developer. Currently, I am a consultant at d-fine in Frankfurt. Previously, I had a research position at the Computer Science department of the University of Geneva, where my research interests were at the intersection of ML/AI and the physical sciences (ML\(4\)Science). My research stay was funded by the Swiss National Science Foundation. I earned an MSc degree in Physics at the University of Hamburg. In my MSc thesis, I worked on and developed L2LFlows. During my BSc and MSc studies in Physics, I was a scholarship holder of the German Academic Scholarship Foundation (German Studienstiftung des deutschen Volkes). I am also a member of the Hamburg Mathematical Society.

If you want to write me an encrypted e-mail, use my public PGP key.

Blog

DistributedDataParallel in PyTorch

This script demonstrates what sampler.set_epoch(epoch) does in a distributed setup in PyTorch. To test its effect, comment out sampler.set_epoch() and observe how in the same rank for the same batch index (yet for another epoch), the data remains the same. In this particular example, the test is done with an infinite loop over the dataloader.
import argparse
import logging
import os
import random
from typing import Generator, List

import numpy as np
import torch
from torch import Tensor
from torch import distributed as dist
from torch.utils.data import DataLoader, DistributedSampler, TensorDataset


def infiniteloop(dataloader) -> Generator[List[Tensor], None, None]:
    while True:
        for data in iter(dataloader):
            yield data


def setup(
    rank: int,
    world_size: int,
    master_addr: str = "localhost",
    master_port: str = "12355",
    backend: str = "nccl",
) -> None:
    """
    Initialize the distributed environment.

    Args:
        rank: Rank of the current process.
        world_size: Number of processes participating in the job.
        master_addr: IP address of the master node.
        master_port: Port number of the master node.
        backend: Backend to use.
    """

    os.environ["MASTER_ADDR"] = master_addr
    os.environ["MASTER_PORT"] = master_port

    # initialize the process group
    dist.init_process_group(
        backend=backend,
        rank=rank,
        world_size=world_size,
    )


def get_args() -> argparse.Namespace:
    """Get arguments passed via CLI."""
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--master_addr",
        type=str,
        default=None,
        help="IP address of the master node.",
    )
    parser.add_argument(
        "--master_port",
        type=str,
        default=None,
        help="Port of the master node.",
    )
    return parser.parse_args()


def seed_worker(worker_id: int) -> None:
    """
    Seed the worker for the dataloader. Function copy-pasted from [1].

    Args:
        worker_id: Worker ID.

    References:
        [1] https://pytorch.org/docs/stable/notes/randomness.html#dataloader
    """
    worker_seed = torch.initial_seed() % 2**32
    np.random.seed(worker_seed)
    random.seed(worker_seed)


def run(
    rank: int | torch.device, world_size: int, args: argparse.Namespace
) -> None:
    """
    Run test.

    Args:
        rank: Rank of the current process. Can be `torch.device("cpu")` if no
            GPU is available.
        world_size: Number of processes participating in distributed training.
            If `world_size` is 1, no distributed training is used.
    """
    if world_size > 1:
        setup(
            rank=rank,
            world_size=world_size,
            master_addr=args.master_addr,
            master_port=args.master_port,
        )

    num_samples = 12
    batch_size = 4
    # When using a single GPU per process and per
    # DistributedDataParallel, we need to divide the batch size
    # ourselves based on the total number of GPUs of the current node.
    batch_size = int(batch_size / world_size)
    num_epochs = 2

    tensor = torch.randn(
        num_samples, 2, generator=torch.Generator().manual_seed(2)
    )
    dataset = TensorDataset(tensor)
    sampler = DistributedSampler(dataset) if world_size > 1 else None
    dataloader = DataLoader(
        dataset=dataset,
        sampler=sampler,
        shuffle=False if world_size > 1 else True,
        worker_init_fn=seed_worker,
        generator=torch.Generator().manual_seed(0),
        batch_size=batch_size,
        num_workers=4,
    )
    datalooper = infiniteloop(dataloader)
    num_batches__per_epoch = len(dataloader)
    logging.info(f"# Batches/epoch: {num_batches__per_epoch}")

    for epoch in range(num_epochs):
        if sampler is not None:
            # necessary to ensure shuffling of the data
            # https://pytorch.org/docs/stable/data.html
            sampler.set_epoch(epoch)

        for batch_idx in range(num_batches__per_epoch):
            data = next(datalooper)
            logging.info(
                f"\n\nRank: {rank}, Epoch: {epoch}, Batch: {batch_idx}, Data:\n{data}"
            )


if __name__ == "__main__":
    logging.basicConfig(
        level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s"
    )

    args = get_args()
    world_size = int(os.getenv("WORLD_SIZE", 1))
    logging.info(
        f"{args}\nWorld_size: {world_size}\nPyTorch version: "
        f"{torch.__version__}"
    )

    run(rank=int(os.getenv("RANK", 0)), world_size=world_size, args=args)

typeguard

While Python allows duck typing, it can be very useful to check for the type of arguments provided in functions. This can be achieved with the library typeguard. An example of its usage would be:
from typeguard import typechecked

@typechecked
def say_hi(s: str) -> str:
    return s


if __name__ == "__main__":
    str_output = say_hi("Hi!")        
    str_output = say_hi(2)  # will fail!
    print(str_output)

While str_output = say_hi("Hi!") will work, str_output = say_hi(2) will not, since the type of 2 is not a string! Since we use typeguard, we get the error: typeguard.TypeCheckError: argument "s" (int) is not an instance of str.

In case you should have several decorators, the documentation states:

You should always place this decorator closest to the original function, as it will not work when there is another decorator wrapping the function. For the same reason, when you use it on a class that has wrapping decorators on its methods, such methods will not be instrumented. In contrast, the import hook has no such restrictions.

Here is an example of a function using two decorators, one of which is @typechecked from the typeguard module, the other is custom written:

from typing import Any, Callable

from typeguard import typechecked


@typechecked
def uppercase_decorator(function: Callable[[str], str]) -> Callable[[str], str]:
    def wrapper(*args: str, **kwargs: str) -> str:
        func = function(*args, **kwargs)
        make_uppercase = func * 2
        return make_uppercase

    return wrapper

@uppercase_decorator
@typechecked
def say_hi(s: str = "hi") -> str:
    return s


if __name__ == "__main__":
    str_output = say_hi("Hi!")
    str_output = say_hi(2)  # will fail!
    print(str_output)

Note that it is also be possible to use the @typechecked decorator on a class, in which case all functions of the class use the decorator. Also, while it is possible to use the decorator on a function whose arguments do not specify any types, it makes the @typechecked decorator obsolete. Last but not least, it is also possible to specify return types, which will be typechecked.

Profiling your Python code

Profiling allows you to get some insight about the runtime of different parts of your script, and to analyze some potential bottlenecks. You can profile your code as follows, which will save runtime information of different parts in a log file called profile_*.log, where the * will be replaced by the day and time of when you run the script.

import cProfile
import pstats

from typeguard import typechecked


@typechecked
def main() -> None:
    pass


if __name__ == "__main__":
    pr = cProfile.Profile()
    pr.enable()

    main()

    pr.disable()
    stats = pstats.Stats(
        pr, 
        stream=open(
            f"profile_{dt.now().strftime('%dp%mp%Y_%Hp%Mp%S')}.log", "w"
        )
    )
    stats.sort_stats("time")
    stats.print_stats()

Microsoft Azure: Automated Deletion of Inactive Users

Imagine having a Microsoft Azure account where a list of users (basic_users) has access to the entire functionality of the Services and could hence do a lot of damage by running computational resources that could place a heavy burden on the admin of the account, e.g. a company. Hence, I came up with a shell script that can be run locally to identify inactive participants and remove them from their role (basic_users) so that they do not have access to their Microsoft Azure account anymore.

#!/bin/bash

# Define role users have
ROLE="basic_users"
# Define time offset of the query range for the `az monitor activity-log list` 
# command, in `##d##h` format
OFFSET="90d"
DATE=$(date +%dp%mp%Y_%Hp%Mp%S)

# Get list of all role assignments, skip first two lines.
# Also check whether first column (role assignee) contains an e-mail address.
# If user appears at least twice, we assume user has multiple distinct 
# roles, e.g. admin and basic roles.
az role assignment list \
    --resource-group company-azure-rg-name # put resource group name of your company here
    --output table | awk "
    NR > 2 && \$1 ~ /@/ {
        people[tolower(\$1)]++;
        if (\$2 ~ /$ROLE/)
        {
            role[tolower(\$1)] = \$2;
        }
    }
    END
    {
        for (person in people) {
            if (people[person] == 1 && role[person] ~ /$ROLE/)
            {
                print person;
            }
        }
    }
    " while read -r email; do

    # Get activity log for each role assignee
    output=$(
        az monitor activity-log list \
            --resource-group company-azure-rg-name \
            --caller "$email" \
            --max-events 1 \
            --output table \
            --offset ${OFFSET}
    )

    # If `output` is empty, then participant has been inactive for past `OFFSET` days
    if [[ "${output}" =~ ^[[:space:]]*$ ]]; then
        az role assignment delete \
        --assignee "$email" \
        --role "${ROLE}" \
        --resource-group "company-azure-rg-name"
    fi

done

Note:

git: Useful Commands

When often using git commands, you will notice that you almost always retype the same commands. This is obsolete and by using custom defined bash functions and commands, you can save a lot of effort in typing out the commands! In the following, I will provide you with the commands I came up with to simplify my life; note that the the l in front of each commands stands for lazy. Put the following commands into your .bash_profile or .bashrc file and source the file, e.g. source ~/.bash_profile.

# log currently checked out branch
bname() {
    branch=$(git branch --show-current)
    echo "$branch"
}            

# list all local and remote git branches
lb() {
    git branch -a
}

# create remote branch
# usage: `lbc new-branch`
lbc() {
    local branch_name="$1"
    
    git branch "$branch_name" && git push origin "$branch_name"
}

# delete branch
# usage: `lbd branch-name`
lbd() {
    local branch="$1"
    local current_branch=$(git branch --show-current)
    local exists_locally=$(git branch --list "$branch")
    local exists_remotely=$(git ls-remote --heads origin "$branch")

    if [[ "$branch" == "main" || "$branch" == "master" ]]; then
        echo "Deletion of 'main' or 'master' branch is not allowed."
        return
    fi

    if [[ "$branch" == "$current_branch" ]]; then
        echo "You are currently on branch $branch. Switching to 'main' before \
            deletion..."
        git switch main || git checkout master || { echo "Failed to switch \
            branches. Aborting."; return; }
    fi

    if [[ -n $exists_locally ]]; then
        echo "Deleting local branch: $branch"
        git branch -D "$branch"
    fi

    if [[ -n $exists_remotely ]]; then
        echo "Deleting remote branch: $branch"
        git push origin --delete "$branch"
    fi
}

# switch branch and create remotely if necessary
# usage: `lsw new-branch-name`
lsw() {
    local branch_name="$1"
    local exists_remotely=$(git ls-remote --heads origin "$branch")

    git switch -c "$branch_name"
    
    if [[ -n $exists_remotely ]]; then
        git push origin "$branch_name"
    fi
}

# cloning
# usage:
#     `lcl git@github.com:ImahnShekhzadeh/infra.git`
#     `lcl git@github.com:ImahnShekhzadeh/infra.git infra`
#     `lcl git@github.com:ImahnShekhzadeh/infra.git infra main`
lcl() {
    local dir_name="${2:-$(pwd)}"
    local branch_name="${3:-main}"
			
    git clone "$1" "$dir_name" && cd "$dir_name" && lsw "$branch_name"
}

# usage: lpush "add new files"
lpush() {
    (
	#  use subshell to change directory to Git root and perform actions 
	cd "$(git rev-parse --show-toplevel)" || exit
	git add . && git commit -a -m "$1" && git push origin $(bname) && $(llog)
    )			
}

# log
llog() {
    git log --decorate --graph --all
}

# update local repo to incorporate remote changes
lupd() {
    git fetch origin "$(bname)" && $(llog)
}

# force push
lforce() {
    git push origin "$(bname)" --force
}

# compare
ldiff() {
    git status && git diff --color
}

# pull remote changes
lpull() {
    git pull origin "$(bname)"
}

# show changes
# usage: `lsh`, `lsh commit-SHA`
lsh() {
    local commit_hash="${1:-HEAD}"
    git show "$commit_hash" 
}

# remove directory or file
# usage: `lrm dir`, `lrm file`
lrm() {
    git rm -r "$1"
}

# interactive rebase
# usage: `lreb`, `lreb 5`
lreb() {
    local num=${1:-5}
    git rebase -i HEAD~"$num"
}

# reset entire repo or specific file to state of `HEAD` or specific commit hash.
# usage: `lres`, `lres commit-hash` or `lres commit-hash file-path`
lres() {
    if [[ $# -eq 0 ]] || [[ $# -eq 1 ]]; then
        local commit_hash=${1:-HEAD} 
	git reset --hard "$commit_hash"
    elif [ $# -eq 2 ]; then
	local commit_hash="$1"
	local file_path="$2"
	git restore --source="$commit_hash" "$file_path"
    else
	echo "Usage: `lres commit_hash file_path` or `lres commit_hash`"
    fi
}

Publications

Calibrating Neural Simulation-Based Inference with Differentiable Coverage Probability (NeurIPS, 2023)

Authors: Maciej Falkiewicz, Naoya Takeishi, Imahn Shekhzadeh, Antoine Wehenkel, Arnaud Delaunoy, Gilles Louppe, Alexandros Kalousis

Abstract: Bayesian inference allows expressing the uncertainty of posterior belief under a probabilistic model given prior information and the likelihood of the evidence. Predominantly, the likelihood function is only implicitly established by a simulator posing the need for simulation-based inference (SBI). However, the existing algorithms can yield overconfident posteriors (Hermans *et al.*, 2022) defeating the whole purpose of credibility if the uncertainty quantification is inaccurate. We propose to include a calibration term directly into the training objective of the neural model in selected amortized SBI techniques. By introducing a relaxation of the classical formulation of calibration error we enable end-to-end backpropagation. The proposed method is not tied to any particular neural model and brings moderate computational overhead compared to the profits it introduces. It is directly applicable to existing computational pipelines allowing reliable black-box posterior inference. We empirically show on six benchmark problems that the proposed method achieves competitive or better results in terms of coverage and expected posterior density than the previously existing approaches.

L2LFlows: generating high-fidelity 3D calorimeter images (NeurIPS, 2023)

Authors: Sascha Diefenbacher, Engin Eren, Frank Gaede, Gregor Kasieczka, Claudius Krause, Imahn Shekhzadeh, David Shih

Abstract: We explore the use of normalizing flows to emulate Monte Carlo detector simulations of photon showers in a high-granularity electromagnetic calorimeter prototype for the International Large Detector (ILD). Our proposed method โ€” which we refer to as "Layer-to-Layer Flows" (L2LFlows) โ€” is an evolution of the CaloFlow architecture adapted to a higher-dimensional setting (30 layers of 10 ร— 10 voxels each). The main innovation of L2LFlows consists of introducing 30 separate normalizing flows, one for each layer of the calorimeter, where each flow is conditioned on the previous five layers in order to learn the layer-to-layer correlations. We compare our results to the BIB-AE, a state-of-the-art generative network trained on the same dataset and find our model has a significantly improved fidelity.

Code: https://gitlab.com/Imahn/l2lflows

Advancing Generative Modelling of Calorimeter Showers on Three Frontiers (NeurIPS ML\(4\)PhysicalSciences Workshop, 2023)

Authors: Erik Buhmann, Sascha Diefenbacher, Engin Eren, Frank Gaede, Gregor Kasieczka, William Korcari, Anatolii Korol, Claudius Krause, Katja Krรผger, Peter McKeown, Imahn Shekhzadeh, David Shih

Abstract: Generative machine learning can be used to augment and speed-up traditional physics simulations, i.e. the simulation of elementary particles in the detector of collider experiments. Like many physics data, these calorimeter showers can either be represented as images or as permutation-invariant lists of measurements, i.e. as point clouds. We advance the generative models for calorimeter showers on three frontiers: (1) increasing the number of conditional features for precise energy- and angle-wise generation with the bounded bottleneck auto-encoder (BIB-AE), (2) improving generation fidelity using a normalizing flow model, dubbed "Layer-to-Layer-Flows" (L\(2\)LFlows), (3) developing a diffusion model for geometry-independent calorimeter point cloud scalable to \(\mathcal O\)(1000) points, called CaloClouds, and distilling it into a consistency model for fast single-shot sampling.