Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

To obtain the environment to run psana2, execute the following:

Code Block
# For S3DF users (sdfiana nodes)
source /sdf/group/lcls/ds/ana/sw/conda2/manage/bin/psconda.sh

# For PCDS users (psana nodes)
source /cds/sw/ds/ana/conda2/manage/bin/psconda.sh

...

Publicly accessible practice data is located in S3DF in the directory /sdf/data/lcls/ds/prj/public01/xtc.  Use of this data requires the additional "dir=/sdf/data/lcls/ds/prj/public01/xtc" keyword argument to the DataSource object.

ExperimentRunComment
tmoc00118222Generic TMO dark data
tmoacr0194,5,6xtcav dark,lasing-off,lasing-on (cpo thinks, not certain)
rixx4351834A DAQ "fly scan" of motor (see ami#FlyScan:MeanVs.ScanValue)
rixx4351845A DAQ "step scan" of two motors
rixl101332063Rix stepping delay scan of both Vitara delay and ATM delay stage (lxt_ttc scan) at a single mono energy
rixl101332093Rix continuous mono scan with laser on/off data at a single time delay
rixx100382155An infinite sequence with two slow andors running at different rates
rixx100382168A finite burst sequence with one andor
uedcom1037epix10ka data
ueddaq02569epix10ka data

...

You can run this script with MPI the same way as shown in the previous example: mpirun -n 6 python example.py

These additional arguments for DataSource were added to this example

NOTE: if you don't run with at least 6 cores, the destination selection code below will not function correctly.  Also, the destination selection code currently only works with 1 EB core (defined by the PS_SMD_NODES environment variable, which defaults to 1).

NOTE: variable length data names must have a "var_" prefix

These additional arguments for DataSource were added to this example

  • detectors=['detname1', 'detectors=['detname1', 'detname2',]

           List of detectors to be read from the disk. If you only need a few detectors for analysis, list their names here. The reading process will be faster since unused detector data is not read.

...

Code Block
languagepy
from psana import DataSource
import numpy as np
import os

# OPTIONAL callback with "gathered" small data from all cores.
# usually used for creating realtime plots when analyzing from
# DAQ shared memory. Called back on each SRV node.
def my_smalldata(data_dict):
    print(data_dict)  

# Event filtering and destination callback (runs on EB cores)
# Use this function to decide if you want to fetch large data for this event  
# and/or direct an event to process on a particular 'rank' 
# (this rank number should be between 1 and total no. of ranks
# (6 for "mpirun -n 6") minus 3 since 3 ranks are reserved for SMD0, EB, SRV
# processes). If a non-epics detector is needed in this routine, make sure to
# add the detector name in small_xtc kwarg for DataSource (see below).
# All epics and scan detectors are available automatically.
def smd_callback(run):
    opal = run.Detector('tmo_opal1')
    epics_det = run.Detector('IM2K4_XrayPower')

    n_bd_nodes = 3 # for mpirun -n 6, 3 ranks are reserved so there are 3 BigData ranks left

    for i_evt, evt in enumerate(run.events()):
        img = opal.raw.image(evt)
        epics_val = epics_det(evt)
        dest = (evt.timestamp % n_bd_nodes) + 1

        if epics_val is not None:
            # Set the destination (rank no.) where this event should be sent to
            evt.set_destination(dest)
            yield evt

# sets the number of h5 files to write. 1 is sufficient for 120Hz operation
# optional: only needed if you are saving h5.
os.environ['PS_SRV_NODES']='1'

ds = DataSource(exp='tmoc00118', run=222, dir='/sdf/data/lcls/ds/prj/public01/xtc',
        max_events  = 40,
        detectors   = ['epicsinfo', 'tmo_opal1', 'ebeam'],  # only reads these detectors (faster)
        smd_callback= smd_callback,                         # event-filtering/destination callback (see notes above)
        small_xtc   = ['tmo_opal1'],                        # detectors to be used in smalldata callback
        )

# batch_size is optional. specifies how often the dictionary of small
# user data is gathered.  if you write out large data (NOT RECOMMENDED) it needs to be set small.
smd = ds.smalldata(filename='mysmallh5.h5', batch_size=5, callbacks=[my_smalldata])

# used for variable-length data example
cnt = 0 
modulus = 4

for run in ds.runs():
    opal = run.Detector('tmo_opal1')
    ebeam = run.Detector('ebeam')

    runsum  = np.zeros((3),dtype=float) # beware of datatypes when summing: can overflow
    for evt in run.events():
        img = opal.raw.image(evt)
        photonEnergy = ebeam.raw.ebeamPhotonEnergy(evt)
        if img is None or photonEnergy is None: continue
        evtsum = np.sum(img)
        # pass either dictionary or kwargs
        smd.event(evt, evtsum=evtsum, photonEnergy=photonEnergy)
        runsum += img[0,:3] # local sum on one mpi core
 
        # an example of variable-length data (must have "var_" name prefix)
        if cnt % modulus:
            x = np.arange(cnt%modulus)
            y = [[x+1, (x+1)**2] for x in range(cnt%modulus)]
            smd.event(evt, {'var_test' : { 'x': x, 'y': y }})
        else:
            # Note, this works either way, either not sending anything or
            # sending 0-length data.  It should be noted if there is *no*
            # data in the entire run, the var_array is *not* written to the
            # output!
            pass
            #smd.event(evt, {'var_test' : { 'x': [], 'y': [] }})
        cnt += 1

    # optional summary data for whole run
    if smd.summary:
        tot_runsum = smd.sum(runsum) # sum (or max/min) across all mpi cores. Must be numpy array or None.
        # pass either dictionary or kwargs
        smd.save_summary({'sum_over_run' : tot_runsum}, summary_int=1)
    smd.done()

...

psana can scale to allow for high rate analysis.  For example, many hdf5 files of small user-defined data (described above in Example Script Producing Small HDF5 File) can be written, one per "SRV" node in the diagram below.  The total number of SRV nodes is defined by the environment variable PS_SRV_NODES (defaults to 0).  These many hdf5 files are joined by psana into what appears to be one file using the hdf5 "virtual dataset" feature.  Similarly, multiple nodes can be used for filtering ("EB" nodes in the diagram below) and multiple nodes can be used to process big data in the main psana event loop ("BD" nodes in the digram below).  The one piece that cannot be scaled (currently) to multiple nodes is the SMD0 (SMallData) task, which reads the timestamps and fseek offsets from each tiny .smd.xtc2 file produced by the DAQ (typically one per detector, or one per detector segment, although it can contain more than one segment or detector).  This task joins together the relevant data for each shot ("event build") using the timestamp.  This SMD0 task is multi-threaded, with one thread for each detector.  For highest performance it is important that all SMD0 threads be allocated an entire MPI node.

Image RemovedImage Added

Running a large job

Below shows how to setup a slurm job script to run a large job. This script uses setup_hosts_openmpi.sh  (also provided below) to assign a single node to SMD0 (see diagram above) and distribute all other tasks (EB, BD, & SRV) to the rest of available nodes. After source setup_hosts_openmpi.sh , you can use $PS_N_RANKS and $PS_HOST_FILE in your mpirun command. 

...

Code Block
languagebash
titlesubmit_large_psana2.sh
#!/bin/bash
#SBATCH --partition=milano
#SBATCH --job-name=run_large_psana2
#SBATCH --output=output-%j.txt
#SBATCH --error=output-%j.txt
#SBATCH --nodes=3
#SBATCH --exclusive
#SBATCH --time=10:00

# Configure psana2 parallelization
source setup_hosts_openmpi.sh

# Run your job
mpirun -np $PS_N_RANKS --hostfile $PS_HOST_FILE python with #ranks <= (#nodes - 1) * 120 + 1 or use $PS_N_RANKS 
mpirun -np $PS_N_RANKS --hostfile $PS_HOST_FILE python test_mpi.py

Here is the `setup_hosts_openmpi.sh`. You can create this script in your job folder. 

Code Block
languagebash
titlesetup_hosts_openmpi.sh
############################################################
# First node must be exclusive to smd0
# * For openmpi, slots=1 must be assigned to the first node.
############################################################

# Get list of hosts by expand shorthand node list into a 
# line-by-line node list
host_list=$(scontrol show hostnames $SLURM_JOB_NODELIST)
hosts=($host_list)

# Write out to host file by putting rank 0 on the first node
host_file="slurm_host_${SLURM_JOB_ID}"
for i in "${!hosts[@]}"; do
    if [[ "$i" == "0" ]]; then
        echo ${hosts[$i]} slots=1 > $host_file
    else
        echo ${hosts[$i]} >> $host_file
    fi
done

# Export hostfile for mpirun  
export PS_HOST_FILE=$host_file

# Calculate no. of ranks available in the job. 
export PS_N_RANKS=$(( SLURM_CPUS_ON_NODE * ( SLURM_JOB_NUM_NODES - 1 ) + 1 ))

...

psana also has some grafana monitoring built in that, with expert help, can be used to identify bottlenecks in an analysis.  Contact pcds-ana-l@slac.stanford.edu for guidance.

Sorting Small HDF5 File

The small hdf5 file is likely unsorted due to parallelism in psana2. In case your output h5 is large (> 1 billion records), you can use timestamp_sort_h5  tool by submitting the following job:

Code Block
languagebash
titlesubmit_large_psana2.sh
#!/bin/bash
#SBATCH --partition=milano
#SBATCH --job-name=timestamp_sort_h5
#SBATCH --output=output-%j.txt
#SBATCH --error=output-%j.txt
#SBATCH --nodes=1
#SBATCH --exclusive
#SBATCH --time=10:00

timestamp_sort_h5 /sdf/data/lcls/drpsrcf/ffb/users/monarin/h5/mylargeh5.h5 /sdf/data/lcls/drpsrcf/ffb/users/monarin/h5/output/result.h5

Note the first required argument is the unsorted hdf5 file and the second is the desired output file. There are other optional arguments, which can be access by running timestamp_sort_h5 --help. 

Why Is My Detector Object "None" On Some MPI Ranks?

In psana2 all mpi ranks execute the same code, but not all ranks can create a Detector object since there are “hidden” MPI helper-ranks shown in this diagram: MPITaskStructureToSupportScaling.  For those helper-ranks the Detector object will be None.  Those helper-ranks won’t enter psana2 loops over runs/steps/events, so as long as you only use a Detector object inside loops your code will run correctly without any special checks.  However, if you use the Detector object outside those loops you must check that the Detector object is not None before calling any methods.

Historical background: we went back and forth about how to manage the MPI helper-ranks.  The alternative would have been to use callbacks instead of run/step/event loops to more effectively hide the helper-ranks from user code, but callbacks would have been user-unfriendly in a different way: writing loops is a more natural coding approach for many users.  We felt the loop approach (with more fragile Detector objects that can be None) was the lesser of two evils.


Running psplot_live

From rix-daq node, source psana2 environment then run:

Code Block
languagebash
titlesubmit_large_psana2.sh
(ps-4.6.3) rix-daq:scripts> psplot_live ANDOR
Python 3.9.16 | packaged by conda-forge | (main, Feb  1 2023, 21:39:03) 
Type 'copyright', 'credits' or 'license' for more information
IPython 8.14.0 -- An enhanced Interactive Python. Type '?' for help.

In [1]: 

The above command activates psplot_live that listens to your analysis jobs (with plotting) and provides an interactive session. You can use the interactive session to list, kill, and reactivate plots. Note that to monitor more than one plot, you can use ' ' (space) to separate each plot name (e.g. psplot_live ANDOR ATMOPAL ). 


Below shows an example of analysis (monitoring two plots: ANDOR and ATMOPAL) and job submission scripts that communicate directly to psplot_live. Note that if you are converting python script that works with psplot (no live), the main difference is shown on line 25 where you have to set psmon_publish=publish as an additional DataSource argument. There may be other differences that need to be changed. Please let us know in this case. 

Code Block
languagepy
titlerun_andor.py
linenumberstrue
from psana import DataSource
from psmon import publish
from psmon.plots import Image,XYPlot
import os, sys, time
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
 

os.environ['PS_SRV_NODES']='1'
os.environ['PS_SMD_N_EVENTS']='1'


# passing exp and runnum
exp=sys.argv[1]
runnum=int(sys.argv[2])


mount_dir = '/sdf/data/lcls/drpsrcf/ffb'
#mount_dir = '/cds/data/drpsrcf'
xtc_dir = os.path.join(mount_dir, exp[:3], exp, 'xtc')
ds = DataSource(exp=exp,run=runnum,dir=xtc_dir,intg_det='andor_vls',
        batch_size=1, 
        psmon_publish=publish,
        detectors=['timing','andor_vls','atmopal'],
        max_events=0,
        live=True)


def my_smalldata(data_dict):
    if 'unaligned_andor_norm' in data_dict:
        andor_norm = data_dict['unaligned_andor_norm'][0]
        myplot = XYPlot(0,f"Andor (normalized) run:{runnum}",range(len(andor_norm)),andor_norm)
        publish.send('ANDOR',myplot)
    if 'sum_atmopal' in data_dict:
        atmopal_sum = data_dict['sum_atmopal']
        myplot = XYPlot(0,f"Atmopal (sum) run:{runnum}",range(len(atmopal_sum)), atmopal_sum)
        publish.send('ATMOPAL', myplot)
 
for myrun in ds.runs():
    andor = myrun.Detector('andor_vls')
    atmopal = myrun.Detector('atmopal')
    timing = myrun.Detector('timing')
    smd = ds.smalldata(filename='mysmallh5.h5',batch_size=5, callbacks=[my_smalldata])
    norm = 0
    ndrop_inhibit = 0
    sum_atmopal = None
    cn_andor_events = 0
    cn_intg_events = 0
    ts_st = None
    for nstep,step in enumerate(myrun.steps()):
        print('step:',nstep)
        for nevt,evt in enumerate(step.events()):
            if ts_st is None: ts_st = evt.timestamp
            cn_intg_events += 1
            andor_img = andor.raw.value(evt)
            atmopal_img = atmopal.raw.image(evt)
            if atmopal_img is not None:
                if sum_atmopal is None:
                    sum_atmopal = atmopal_img[0,:]
                else:
                    sum_atmopal += atmopal_img[0,:]
            # also need to check for events missing due to damage
            # (or compare against expected number of events)
            ndrop_inhibit += timing.raw.inhibitCounts(evt)
            smd.event(evt, mydata=nevt) # high rate data saved to h5
            # need to check Matt's new timing-system data on every
            # event to make sure we haven't missed normalization
            # data due to deadtime
            norm+=nevt # fake normalization
            if andor_img is not None:
                cn_andor_events += 1
                #print('andor data on evt:',nevt,'ndrop_inhibit:',ndrop_inhibit)
                print(f'BD{rank-1}: #andor_events: {cn_andor_events} #intg_event:{cn_intg_events} st: {ts_st} en:{evt.timestamp}')
                # check that the high-read readout group (2) didn't
                # miss any events due to deadtime
                if ndrop_inhibit[2]!=0: print('*** data lost due to deadtime')
                # need to prefix the name with "unaligned_" so
                # the low-rate andor dataset doesn't get padded
                # to align with the high rate datasets
                smd.event(evt, mydata=nevt,
                          unaligned_andor_norm=(andor_img/norm),
                          sum_atmopal=sum_atmopal)
                norm=0
                ndrop_inhibit=0
                sum_atmopal = None
                cn_intg_events = 0
                ts_st = None
    smd.done()

And an sbatch script:

Code Block
languagebash
titlesubmit_run_andor.sh
#!/bin/bash
#SBATCH --partition=milano
#SBATCH --account=<your account here>
#SBATCH --job-name=run_andor
#SBATCH --nodes=1
#SBATCH --ntasks=5
#SBATCH --output=output-%j.txt
#SBATCH --error=output-%j.txt
##SBATCH --exclusive
#SBATCH -t 00:05:00

t_start=`date +%s`

exp=$1
runnum=$2
mpirun -n 5 python run_andor.py $exp $runnum

t_end=`date +%s`
echo PSJobCompleted TotalElapsed $((t_end-t_start))  

After creating the above two scripts, you can submit the job with:

Code Block
languagebash
titlesbatch
sbatch submit_run_andor.sh rixc00121 121

You should be able to see the psplot(s) pop up automatically, 

Image Added

To view list of psplots, 

Code Block
languagebash
titlesbatch
In [1]: ls()
ID    SLURM_JOB_ID EXP        RUN   NODE                                PORT  STATUS    
1     43195784     rixc00121  121   sdfmilan005.sdf.slac.stanford.edu   12323 PLOTTED

If you close (error) the plot window, the process is automatically removed from the list:

Code Block
languagebash
titlesbatch
In [2]: ls()
ID    SLURM_JOB_ID EXP        RUN   NODE                                PORT  STATUS

You can submit your analysis job again (any increasing run numbers are always monitored). For old job (previously submitted run number from the same node and same psplot port), they will NOT be shown automatically (STATUS: RECEIVED). You can reactivate them using show(ID) command. 

Code Block
languagebash
titlesbatch
In [2]: ls()
ID    SLURM_JOB_ID EXP        RUN   NODE                                PORT  STATUS    
1     3275205      rixc00121  121   sdfiana001.sdf.slac.stanford.edu    12323 RECEIVED  

In [3]: show(1)
Main received {'msgtype': 3} from db-zmq-server

To kill a plot, use kill(ID) or kill_all() to kill all plots. 

Code Block
languagebash
titlesbatch
In [5]: kill(1)

In [6]: ls()
ID    SLURM_JOB_ID EXP        RUN   NODE                                PORT  STATUS