Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

ExperimentRunComment
tmoc00118222Generic TMO dark data
tmoacr0194,5,6xtcav dark,lasing-off,lasing-on (cpo thinks, not certain)
rixx4351834A DAQ "fly scan" of motor (see ami#FlyScan:MeanVs.ScanValue)
rixx4351845A DAQ "step scan" of two motors
rixl101332063Rix stepping delay scan of both Vitara delay and ATM delay stage (lxt_ttc scan) at a single mono energy
rixl101332093Rix continuous mono scan with laser on/off data at a single time delay
rixx100382155An infinite sequence with two slow andors running at different rates
rixx100382168A finite burst sequence with one andor
uedcom1037epix10ka data
ueddaq02569epix10ka data

...

psana can scale to allow for high rate analysis.  For example, many hdf5 files of small user-defined data (described above in Example Script Producing Small HDF5 File) can be written, one per "SRV" node in the diagram below.  The total number of SRV nodes is defined by the environment variable PS_SRV_NODES (defaults to 0).  These many hdf5 files are joined by psana into what appears to be one file using the hdf5 "virtual dataset" feature.  Similarly, multiple nodes can be used for filtering ("EB" nodes in the diagram below) and multiple nodes can be used to process big data in the main psana event loop ("BD" nodes in the digram below).  The one piece that cannot be scaled (currently) to multiple nodes is the SMD0 (SMallData) task, which reads the timestamps and fseek offsets from each tiny .smd.xtc2 file produced by the DAQ (typically one per detector, or one per detector segment, although it can contain more than one segment or detector).  This task joins together the relevant data for each shot ("event build") using the timestamp.  This SMD0 task is multi-threaded, with one thread for each detector.  For highest performance it is important that all SMD0 threads be allocated an entire MPI node.

Image RemovedImage Added

Running a large job

Below shows how to setup a slurm job script to run a large job. This script uses setup_hosts_openmpi.sh  (also provided below) to assign a single node to SMD0 (see diagram above) and distribute all other tasks (EB, BD, & SRV) to the rest of available nodes. After source setup_hosts_openmpi.sh , you can use $PS_N_RANKS and $PS_HOST_FILE in your mpirun command. 

...

Code Block
languagebash
titlesubmit_large_psana2.sh
#!/bin/bash
#SBATCH --partition=milano
#SBATCH --job-name=run_large_psana2
#SBATCH --output=output-%j.txt
#SBATCH --error=output-%j.txt
#SBATCH --nodes=3
#SBATCH --exclusive
#SBATCH --time=10:00

# Configure psana2 parallelization
source setup_hosts_openmpi.sh

# Run your job with #ranks <= (#nodes - 1) * 120 + 1 or use $PS_N_RANKS 
mpirun -np $PS_N_RANKS --hostfile $PS_HOST_FILE python test_mpi.py

...

Code Block
languagebash
titlesetup_hosts_openmpi.sh
############################################################
# First node must be exclusive to smd0
# * For openmpi, slots=1 must be assigned to the first node.
############################################################

# Get list of hosts by expand shorthand node list into a 
# line-by-line node list
host_list=$(scontrol show hostnames $SLURM_JOB_NODELIST)
hosts=($host_list)

# Write out to host file by putting rank 0 on the first node
host_file="slurm_host_${SLURM_JOB_ID}"
for i in "${!hosts[@]}"; do
    if [[ "$i" == "0" ]]; then
        echo ${hosts[$i]} slots=1 > $host_file
    else
        echo ${hosts[$i]} >> $host_file
    fi
done

# Export hostfile for mpirun  
export PS_HOST_FILE=$host_file

# Calculate no. of ranks available in the job. 
export PS_N_RANKS=$(( SLURM_CPUS_ON_NODE * ( SLURM_JOB_NUM_NODES - 1 ) + 1 ))

...

Historical background: we went back and forth about how to manage the MPI helper-ranks.  The alternative would have been to use callbacks instead of run/step/event loops to more effectively hide the helper-ranks from user code, but callbacks would have been user-unfriendly in a different way: writing loops is a more natural coding approach for many users.  We felt the loop approach (with more fragile Detector objects that can be None) was the lesser of two evils.


Running psplot_live

From any drp nodesrix-daq node, source psana2 environment then run:

Code Block
languagebash
titlesubmit_large_psana2.sh
(ps-4.6.3) rix-daq:scripts> psplot_live ANDOR

This assumes that an analysis script has been submitted. An example of analysis script:

Python 3.9.16 | packaged by conda-forge | (main, Feb  1 2023, 21:39:03) 
Type 'copyright', 'credits' or 'license' for more information
IPython 8.14.0 -- An enhanced Interactive Python. Type '?' for help.

In [1]: 

The above command activates psplot_live that listens to your analysis jobs (with plotting) and provides an interactive session. You can use the interactive session to list, kill, and reactivate plots. Note that to monitor more than one plot, you can use ' ' (space) to separate each plot name (e.g. psplot_live ANDOR ATMOPAL ). 


Below shows an example of analysis (monitoring two plots: ANDOR and ATMOPAL) and job submission scripts that communicate directly to psplot_live. Note that if you are converting python script that works with psplot (no live), the main difference is shown on line 25 where you have to set psmon_publish=publish as an additional DataSource argument. There may be other differences that need to be changed. Please let us know in this case. 

Code Block
languagepy
titlerun_andor.py
linenumberstrue
from psana import DataSource
from psmon import publish
from psmon.plots import Image,XYPlot
import os, sys, time
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
 

os.environ['PS_SRV_NODES']='1'
os.environ['PS_SMD_N_EVENTS']='1'


# passing exp and runnum
exp=sys.argv[1]
runnum=int(sys.argv[2])


mount_dir = '/sdf/data/lcls/drpsrcf/ffb'
#mount_dir = '/cds/data/drpsrcf'
xtc_dir = os.path.join(mount_dir, exp[:3], exp, 'xtc')
ds = DataSource(exp=exp,run=runnum,dir=xtc_dir,intg_det='andor_vls',
        batch_size=1, 
        psmon_publish=publish,
        detectors=['timing','andor_vls','atmopal'],
        max_events=0,
        live=True)


def my_smalldata(data_dict):
    if 'unaligned_andor_norm' in data_dict:
        andor_norm = data_dict['unaligned_andor_norm'][0]
        myplot = XYPlot(0,f"Andor (normalized) run:{runnum}",range(len(andor_norm)),andor_norm)
        publish.send('ANDOR',myplot)
    if 'sum_atmopal' in data_dict:
        atmopal_sum = data_dict['sum_atmopal']
        myplot = XYPlot(0,f"Atmopal (sum) run:{runnum}",range(len(atmopal_sum)), atmopal_sum)
        publish.send('ATMOPAL', myplot)
 
for myrun in ds.runs():
    andor = myrun.Detector('andor_vls')
    atmopal = myrun.Detector('atmopal')
    timing = myrun.Detector('timing')
    smd = ds.smalldata(filename='mysmallh5.h5',batch_size=5, callbacks=[my_smalldata])
    norm = 0
    ndrop_inhibit = 0
    sum_atmopal = None
    cn_andor_events = 0
    cn_intg_events = 0
    ts_st = None
    for nstep,step in enumerate(myrun.steps()):
        print('step:',nstep)
        for nevt,evt in enumerate(step.events()):
            if ts_st is None: ts_st = evt.timestamp
            cn_intg_events += 1
    
Code Block
languagepy
titlerun_andor.py
from psana import DataSource
from psmon import publish
from psmon.plots import Image,XYPlot
import os, sys, time
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
 

os.environ['PS_SRV_NODES']='1'
os.environ['PS_SMD_N_EVENTS']='1'


# passing exp and runnum
exp=sys.argv[1]
runnum=int(sys.argv[2])


mount_dir = '/sdf/data/lcls/drpsrcf/ffb'
#mount_dir = '/cds/data/drpsrcf'
xtc_dir = os.path.join(mount_dir, exp[:3], exp, 'xtc')
ds = DataSource(exp=exp,run=runnum,dir=xtc_dir,intg_det='andor_vls',
        batchandor_sizeimg =1, 
 andor.raw.value(evt)
            psmonatmopal_publish=publish,img = atmopal.raw.image(evt)
        detectors=['timing','andor_vls','atmopal'],
    if atmopal_img is  max_events=0,not None:
        live=True)


# we will remove this for batch processing andif use "psplot" instead
# publish.local = True


def my_smalldata(data_dict):
sum_atmopal is None:
         if 'unaligned_andor_norm' in data_dict:
        andorsum_normatmopal = data_dict['unaligned_andor_norm']atmopal_img[0,:]
            myplot = XYPlot(0,f"Andor (normalized) run:{runnum}",range(len(andor_norm)),andor_norm)
else:
               publish.send('ANDOR',myplot)
    if 'sum_atmopal' in+= data_dict:
atmopal_img[0,:]
            atmopal_sum# = data_dict['sum_atmopal']
        myplot = XYPlot(0,f"Atmopal (sum) run:{runnum}",range(len(atmopal_sum)), atmopal_sum)
also need to check for events missing due to damage
            # publish.send('ATMOPAL', myplot)
 
for myrun in ds.runs():
(or compare against expected number of events)
      andor = myrun.Detector('andor_vls')
    atmopalndrop_inhibit += myruntiming.raw.DetectorinhibitCounts('atmopal'evt)
    timing = myrun.Detector('timing')
    smd = dssmd.smalldata(filename='mysmallh5.h5',batch_size=5, callbacks=[my_smalldata])
event(evt, mydata=nevt) # high rate data saved to h5
       norm = 0
   # ndrop_inhibitneed =to 0
check Matt's new  sum_atmopal = None
timing-system data on every
      cn_andor_events = 0
    cn_intg_events = 0
    ts_st = None
# event to make sure we haven't missed normalization
     for nstep,step in enumerate(myrun.steps()):
    # data due  print('step:',nstep)
to deadtime
            for nevt,evt in enumerate(step.events()):norm+=nevt # fake normalization
            if tsandor_stimg is not None:
  ts_st = evt.timestamp
            cn_intgandor_events += 1
                andor_img = andor.raw.value(evt)
#print('andor data on evt:',nevt,'ndrop_inhibit:',ndrop_inhibit)
                print(f'BD{rank-1}: #andor_events:  atmopal_img = atmopal.raw.image(evt{cn_andor_events} #intg_event:{cn_intg_events} st: {ts_st} en:{evt.timestamp}')
            if atmopal_img is not None:
# check that the high-read readout group (2) didn't
        if sum_atmopal is None:
     # miss any events due to deadtime
         sum_atmopal = atmopal_img[0,:]
     if ndrop_inhibit[2]!=0: print('*** data lost due to deadtime')
    else:
            # need to prefix the name with  sum"unaligned_atmopal += atmopal_img[0,:]
 " so
           # also need to check for events missing due to damage
       # the low-rate andor dataset doesn't get padded
      # (or compare against expected number of events)
   # to align with the high rate datasets
  ndrop_inhibit += timing.raw.inhibitCounts(evt)
            smd.event(evt, mydata=nevt) # high rate data saved to h5
,
                 # need to check Matt's new timing-system data on every
       unaligned_andor_norm=(andor_img/norm),
      # event to make sure we haven't missed normalization
            # data due to deadtime
sum_atmopal=sum_atmopal)
                norm+=nevt # fake normalization=0
            if andor_img is not None: ndrop_inhibit=0
                cnsum_andor_eventsatmopal += 1None
                #print('andor data on evt:',nevt,'ndrop_inhibit:',ndrop_inhibit)cn_intg_events = 0
                print(f'BD{rank-1}: #andor_events: {cn_andor_events} #intg_event:{cn_intg_events} st: {ts_st} en:{evt.timestamp}')
                # check that the high-read readout group (2) didn't
                # miss any events due to deadtime
       ts_st = None
    smd.done()

And an sbatch script:

Code Block
languagebash
titlesubmit_run_andor.sh
#!/bin/bash
#SBATCH --partition=milano
#SBATCH --account=<your account here>
#SBATCH --job-name=run_andor
#SBATCH --nodes=1
#SBATCH --ntasks=5
#SBATCH --output=output-%j.txt
#SBATCH --error=output-%j.txt
##SBATCH --exclusive
#SBATCH -t 00:05:00

t_start=`date +%s`

exp=$1
runnum=$2
mpirun -n 5 python run_andor.py $exp $runnum

t_end=`date +%s`
echo PSJobCompleted TotalElapsed $((t_end-t_start))  

After creating the above two scripts, you can submit the job with:

Code Block
languagebash
titlesbatch
sbatch submit_run_andor.sh rixc00121 121

You should be able to see the psplot(s) pop up automatically, 

Image Added

To view list of psplots, 

Code Block
languagebash
titlesbatch
In [1]: ls()
ID    SLURM_JOB_ID EXP        RUN if ndrop_inhibit[2]!=0: print('*** data lost due to deadtime')
 NODE                   # need to prefix the name with "unaligned_" so
     PORT  STATUS    
1     #43195784 the low-rate andor dataset doesn'trixc00121 get padded
121   sdfmilan005.sdf.slac.stanford.edu   12323 PLOTTED

If you close (error) the plot window, the process is automatically removed from the list:

Code Block
languagebash
titlesbatch
In [2]: ls()
ID    SLURM_JOB_ID EXP             # to align with the high rate datasets
    RUN   NODE         smd.event(evt, mydata=nevt,
                      PORT  STATUS

You can submit your analysis job again (any increasing run numbers are always monitored). For old job (previously submitted run number from the same node and same psplot port), they will NOT be shown automatically (STATUS: RECEIVED). You can reactivate them using show(ID) command. 

Code Block
languagebash
titlesbatch
In  unaligned_andor_norm=(andor_img/norm),
   [2]: ls()
ID    SLURM_JOB_ID EXP        RUN    NODE      sum_atmopal=sum_atmopal)
                norm=0
          PORT  STATUS    ndrop_inhibit=0
1     3275205      rixc00121  121   sdfiana001.sdf.slac.stanford.edu    12323 RECEIVED  

In [3]: show(1)
Main received {'msgtype': 3} from db-zmq-server

To kill a plot, use kill(ID) or kill_all() to kill all plots. 

Code Block
languagebash
titlesbatch
In [5]: kill(1)

In [6]: ls()
ID    SLURM_JOB_ID EXPsum_atmopal = None
                cn_intg_events = 0
                ts_st = None
RUN     smd.done()

And an sbatch script:

Code Block
languagebash
titlesubmit_run_andor.sh
#!/bin/bash
#SBATCH --partition=milano
#SBATCH --account=lcls:data
#SBATCH --job-name=run_andor
#SBATCH --nodes=1
#SBATCH --ntasks=5
#SBATCH --output=output-%j.txt
#SBATCH --error=output-%j.txt
##SBATCH --exclusive
#SBATCH -t 00:05:00

t_start=`date +%s`

exp=$1
runnum=$2
#srun python run_andor.py $exp $runnum ${socket}
mpirun -n 5 python run_andor.py $exp $runnum

t_end=`date +%s`
echo PSJobCompleted TotalElapsed $((t_end-t_start))  

After creating the above two scripts, you can submit the job with:

Code Block
languagebash
titlesbatch
sbatch submit_run_andor.sh rixc00121 121NODE                                PORT  STATUS