Overview

In this page we explore the DASK python library for the data analysis of the LCLS II data.
Dask is a flexible library for parallel computing in Python (https://docs.dask.org/en/stable/).
It is mainly used for 2 purposes:
1) data manipulation bigger than the allocated memory of the machine
2) scheduling and administration of parallel computing

Video Presentation

From Riccardo Melchiorri, Dec. 5, 2022

video1328844330.mp4

Introduction to Dask.pptx

INTRODUCTION to data manipulation

The idea behind Dask is to avoid using the "real" data until it is imperative to provide a result. The data manipulation is done via references, Dask stores the information on how to provide the calculation and when it is needed it uses this information to provide the result, but the script does not keep in memory the data, only the information on how to process it. This means that it is possible to "load" a vector that is bigger than the available memory of the machine, the term  "lazy" vector is used.

A simple example:

this is a numpy array that cannot be stored in memory.

this is a Dask array of the same numpy array, Dask has already created chunks of the array.


Dask can use the same nomenclature as numpy, and also pandas as dask.dataframe, 


Up to now the arrays and dataframes are considered as "lazy", there is no calculation involved.

Even when requesting an operation on a lazy array, the result is still a lazy array.
In order to trigger the result, we need to apply ".compute()" to the array, this will require time to calculate the result and it will follow the instructions that are resumed in the following graph.


Example of visualization of a calculations for an array (1000,1000,100)

In order to exit the "dask domain", one must ".compute()" the result.

caveat

  • It is important to notice that some commands will trigger the ".compute()", like "matplotlib.plot". Whereas, "print" will provide information on the lazy array without computing it.
    If one creates a function and passes lazy arrays to it, one may want to use the decorators "@dask.delayed":
  • It is also important to remember that the result of ".compute()" will NOT be lazy, therefor it is imperative that the resulting array or dataframe has a dimension that can fit in the machine memory.
  • Avoid calculating unwanted results, i.e. avoid convert lazy arrays in numpy array when it is not needed.

Reading hdf5 files

Dask has specific functions to read hdf5 files.
Since the data structure resembles the data frame, "dask.dataframe.read_hdf" would be ideal to read the file.

There are some known bugs in pandas while reading a hdf5 file, and also "variable length strings are not supported yet".
For these reasons reading an hdf5 directly in a dataframe (dask or pandas) is not recommended.
Instead, it is possible to use the h5py library and store dask arrays in a dictionnary:



This script presents several important concepts.

  • the h5 file is kept open until the end of the calculations.
  • we store in the dictionary the pointers to the data in the file, not the data itself 
    data['/timestamp']=fh['/timestamp'][:] it is different than data['/timestamp']=fh['/timestamp']
  • we store the lazy arrays in the dictionary:
    data['/timestamp']=da.from_array(fh['/timestamp'])

The last concept allows to use data['/timestamp'][:] as a lazy array and it can be used to make calculations without the risk of saturating the memory.
We store the hd5 file into lazy arrays without "reading" the data.

As previously noted, Dask calculates its own chunks even when not prompted. In some cases this can cause errors, especially when extracting sub arrays of arrays 

In this situation we are extracting a sub array with the same chunk dimensions as the array.
To avoid possible errors, it is advised to assign pre-calculated chunk dimensions when creating dask arrays:

The determination of the value for the chunks is mostly empirical, or if the data are known to be written in chunks, then it is possible to use that value to read them.
If the chunks are too small, "reading" the hdf file will take time (dask needs to create all the chunks), too big it might cause an error when extracting a sub array.

The h5py library allows to read vds files(https://docs.hdfgroup.org/archive/support/HDF5/docNewFeatures/NewFeaturesVirtualDatasetDocs.html) in a transparent way. The user needs only to provide the manifest file (.hd5) and the library will automatically use the partial files (_part.hd5).

N.B. if the data are moved to a different location, one needs to recreate the manifest file because the partial files are identified with an absolute path.
Copy the following python script into the folder where the data are stored, and run it. It will automatically recognize all manifests files that needs to be created (if there are several vds files to be generated) and will use the absolute path to the present folder in generating the manifests.
vdsPathReplace.py

INTRODUCTION to Parallel Computing

Up to this point we have not provided any information to Dask on how to use multi-cores, multi-processes, nor how to use partitions.
Dask provides a tool to control what Dask does while calculating a result, the dashboard
Whenever a "client" is called, it is possible to use a browser to open the localhost at the port 8787.


Dashboard Note: Dask dashboard starts on localhost:8787 by default and you can access the status page by navigating to your jupyter's host IP and port on remote Firefox (from the login node). Currently, we've seen some problems starting Firefox on s3df login nodes. Alternatively, you can also access Dask dashboard from your local machine using the following steps:

  • On your local machine (assuming Mac), open ~/.ssh/config file with your text editor.
  • Paste the following text (make sure to update to your username in the User line) in the config file and save.
Host sdfiana*
  User yourusername
  ProxyJump s3dflogin.slac.stanford.edu
  • In the terminal on your local machine, run this command: 
ssh -C2qfTnN -L 8787:localhost:8787 sdfiana002

          Note on the command above:

    • The switch -C2qfTnN is used to provide a background tunnel process as explained by Hans-Thorsten Schwander in the comment section below.
    • The first port number (8787) can be anything. It's your local machine port. The example shows the same value with the second port number for simplicity.
    • localhost:8787 is what's shown in your jupyter notebok after setting up Dask client. 
    • sdfiana002 is the host where your jupyter notebook is running. You can run the following command in the notebook to see the value of your hostname.

      %%bash
      hostname
  • On your local machine, open the browser and navigate to localhost:8787/status to access Dask dashboard.



The dashboard reports the result of calculating the sum c value previously provided, it is possible to observe that 8 workers are used (there are 8 lines).
The colors are associated with specific commands (described in the graph). The red color is associated with communication from workers to the scheduler.
Dask takes care of the parallelization by using the available resources, even when not explicitly defined. 

To take advantage of Clusters we install dask-jobqueue:

  • cluster defines the type of process to be used for parallelization, in this example is a SLURM cluster.
  • queue is the partition to be used, in s3df it will be roma, milano, ...
  • cores defines the number of cores
  • memory  is the total memory per job
  • processes is by default the square root of the number of cores, so that together with the threads is equal to the number of cores.
    since the code we use does not imply threads, then the number of processes can be equal to the number of cores, leaving threads=1.
  • jobs is the number of SLURM jobs that are requested to perform the calculations, these are also called "workers" in Dask.

It is important to notice that as long as the client is open, the script has access to the workers, cores, and SLURM jobs. 

The use of clusters can drastically reduce the time of execution of the script. If a script takes 5 minutes (380s) without clusters, the parallelization can reduce this execution time to less than half a minute, when an appropriate number of cores and workers is used. As per the chunks, these parameters are mostly empirically selected, depending on the resources available.
It is important to notice that not providing chunks will trigger Dask to calculate them by default, which will cause a slow down of the process. If chunks are known (or knowable) it is advised to provide them.


Remember at the end of the script to close:

  • fh.close() #(hdf file)
  • cluster.close() #workers
  • client.close() #communication to dashboard


example of a dask dashboard for 30 cores, 30 workers (jobs), 30 processes Dask-Slurm calculation.

Issues with writing hdf5 files using distributed client

We can use dask libraries to write dask array or dask dataframe to hdf5 files as shown in the following examples:

# Dask array to hdf5 file
import dask.array as da
da_ts = da.from_array([1,2,3], chunks='auto')
da_ts.to_hdf5('out.h5')

# Dask dataframe to hdf5 file
import dask.dataframe as dd
import pandas as pd
d = {'col1': [1, 2, 3, 4], 'col2': [5, 6, 7, 8]}
df = dd.from_pandas(pd.DataFrame(data=d), npartitions=2)
df.to_hdf('/sdf/data/lcls/drpsrcf/ffb/users/monarin/h5/output.hdf', '/data') 

Both examples work when distributed.client is not used. The first example (dask array) fails when we try to run on multiple processes using dask distributed client. This is because of an known bug in dask (for more information, read this issue). The second example uses dataframe which only supports 2D data. This can be a problem since lcls data can be multi-dimensional.

  • No labels

1 Comment

  1. for ssh local forward I recommend to skip the -vv  and add _-C2qfTnN_ which is quiet, enables compression, backgrounds the tunnel process, doesn't allocate a pty and redirects stdout. The terminal remains usable. Also a more general approach is outlined here Using a socks proxy with firefox