...
Experiment | Run | Comment |
---|
tmoc00118 | 222 | Generic TMO dark data |
tmoacr019 | 4,5,6 | xtcav dark,lasing-off,lasing-on (cpo thinks, not certain) |
rixx43518 | 34 | A DAQ "fly scan" of motor (see ami#FlyScan:MeanVs.ScanValue) |
rixx43518 | 45 | A DAQ "step scan" of two motors |
rixl1013320 | 63 | Rix stepping delay scan of both Vitara delay and ATM delay stage (lxt_ttc scan) at a single mono energy |
rixl1013320 | 93 | Rix continuous mono scan with laser on/off data at a single time delay |
rixx1003821 | 55 | An infinite sequence with two slow andors running at different rates |
rixx1003821 | 68 | A finite burst sequence with one andor |
uedcom103 | 7 | epix10ka data |
ueddaq02 | 569 | epix10ka data |
...
psana can scale to allow for high rate analysis. For example, many hdf5 files of small user-defined data (described above in Example Script Producing Small HDF5 File) can be written, one per "SRV" node in the diagram below. The total number of SRV nodes is defined by the environment variable PS_SRV_NODES (defaults to 0). These many hdf5 files are joined by psana into what appears to be one file using the hdf5 "virtual dataset" feature. Similarly, multiple nodes can be used for filtering ("EB" nodes in the diagram below) and multiple nodes can be used to process big data in the main psana event loop ("BD" nodes in the digram below). The one piece that cannot be scaled (currently) to multiple nodes is the SMD0 (SMallData) task, which reads the timestamps and fseek offsets from each tiny .smd.xtc2 file produced by the DAQ (typically one per detector, or one per detector segment, although it can contain more than one segment or detector). This task joins together the relevant data for each shot ("event build") using the timestamp. This SMD0 task is multi-threaded, with one thread for each detector. For highest performance it is important that all SMD0 threads be allocated an entire MPI node.
Image Removed
Image Added
Running a large job
Below shows how to setup a slurm job script to run a large job. This script uses setup_hosts_openmpi.sh
(also provided below) to assign a single node to SMD0 (see diagram above) and distribute all other tasks (EB, BD, & SRV) to the rest of available nodes. After source setup_hosts_openmpi.sh
, you can use $PS_N_RANKS and $PS_HOST_FILE in your mpirun command.
...
Code Block |
---|
language | bash |
---|
title | submit_large_psana2.sh |
---|
|
#!/bin/bash
#SBATCH --partition=milano
#SBATCH --job-name=run_large_psana2
#SBATCH --output=output-%j.txt
#SBATCH --error=output-%j.txt
#SBATCH --nodes=3
#SBATCH --exclusive
#SBATCH --time=10:00
# Configure psana2 parallelization
source setup_hosts_openmpi.sh
# Run your job with #ranks <= (#nodes - 1) * 120 + 1 or use $PS_N_RANKS
mpirun -np $PS_N_RANKS --hostfile $PS_HOST_FILE python test_mpi.py |
...
Code Block |
---|
language | bash |
---|
title | setup_hosts_openmpi.sh |
---|
|
############################################################
# First node must be exclusive to smd0
# * For openmpi, slots=1 must be assigned to the first node.
############################################################
# Get list of hosts by expand shorthand node list into a
# line-by-line node list
host_list=$(scontrol show hostnames $SLURM_JOB_NODELIST)
hosts=($host_list)
# Write out to host file by putting rank 0 on the first node
host_file="slurm_host_${SLURM_JOB_ID}"
for i in "${!hosts[@]}"; do
if [[ "$i" == "0" ]]; then
echo ${hosts[$i]} slots=1 > $host_file
else
echo ${hosts[$i]} >> $host_file
fi
done
# Export hostfile for mpirun
export PS_HOST_FILE=$host_file
# Calculate no. of ranks available in the job.
export PS_N_RANKS=$(( SLURM_CPUS_ON_NODE * ( SLURM_JOB_NUM_NODES - 1 ) + 1 )) |
...
Historical background: we went back and forth about how to manage the MPI helper-ranks. The alternative would have been to use callbacks instead of run/step/event loops to more effectively hide the helper-ranks from user code, but callbacks would have been user-unfriendly in a different way: writing loops is a more natural coding approach for many users. We felt the loop approach (with more fragile Detector objects that can be None) was the lesser of two evils.
Running psplot_live
From any drp nodesrix-daq node, source psana2 environment then run:
Code Block |
---|
language | bash |
---|
title | submit_large_psana2.sh |
---|
|
(ps-4.6.3) rix-daq:scripts> psplot_live ANDOR
|
This assumes that an analysis script has been submitted. An example of analysis script:
Python 3.9.16 | packaged by conda-forge | (main, Feb 1 2023, 21:39:03)
Type 'copyright', 'credits' or 'license' for more information
IPython 8.14.0 -- An enhanced Interactive Python. Type '?' for help.
In [1]: |
The above command activates psplot_live that listens to your analysis jobs (with plotting) and provides an interactive session. You can use the interactive session to list, kill, and reactivate plots. Note that to monitor more than one plot, you can use ' ' (space) to separate each plot name (e.g. psplot_live ANDOR ATMOPAL
).
Below shows an example of analysis (monitoring two plots: ANDOR and ATMOPAL) and job submission scripts that communicate directly to psplot_live. Note that if you are converting python script that works with psplot (no live), the main difference is shown on line 25 where you have to set psmon_publish=publish
as an additional DataSource argument. There may be other differences that need to be changed. Please let us know in this case.
Code Block |
---|
language | py |
---|
title | run_andor.py |
---|
linenumbers | true |
---|
|
from psana import DataSource
from psmon import publish
from psmon.plots import Image,XYPlot
import os, sys, time
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
os.environ['PS_SRV_NODES']='1'
os.environ['PS_SMD_N_EVENTS']='1'
# passing exp and runnum
exp=sys.argv[1]
runnum=int(sys.argv[2])
mount_dir = '/sdf/data/lcls/drpsrcf/ffb'
#mount_dir = '/cds/data/drpsrcf'
xtc_dir = os.path.join(mount_dir, exp[:3], exp, 'xtc')
ds = DataSource(exp=exp,run=runnum,dir=xtc_dir,intg_det='andor_vls',
batch_size=1,
psmon_publish=publish,
detectors=['timing','andor_vls','atmopal'],
max_events=0,
live=True)
def my_smalldata(data_dict):
if 'unaligned_andor_norm' in data_dict:
andor_norm = data_dict['unaligned_andor_norm'][0]
myplot = XYPlot(0,f"Andor (normalized) run:{runnum}",range(len(andor_norm)),andor_norm)
publish.send('ANDOR',myplot)
if 'sum_atmopal' in data_dict:
atmopal_sum = data_dict['sum_atmopal']
myplot = XYPlot(0,f"Atmopal (sum) run:{runnum}",range(len(atmopal_sum)), atmopal_sum)
publish.send('ATMOPAL', myplot)
for myrun in ds.runs():
andor = myrun.Detector('andor_vls')
atmopal = myrun.Detector('atmopal')
timing = myrun.Detector('timing')
smd = ds.smalldata(filename='mysmallh5.h5',batch_size=5, callbacks=[my_smalldata])
norm = 0
ndrop_inhibit = 0
sum_atmopal = None
cn_andor_events = 0
cn_intg_events = 0
ts_st = None
for nstep,step in enumerate(myrun.steps()):
print('step:',nstep)
for nevt,evt in enumerate(step.events()):
if ts_st is None: ts_st = evt.timestamp
cn_intg_events += 1
|
Code Block |
---|
language | py |
---|
title | run_andor.py |
---|
|
from psana import DataSource
from psmon import publish
from psmon.plots import Image,XYPlot
import os, sys, time
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
os.environ['PS_SRV_NODES']='1'
os.environ['PS_SMD_N_EVENTS']='1'
# passing exp and runnum
exp=sys.argv[1]
runnum=int(sys.argv[2])
mount_dir = '/sdf/data/lcls/drpsrcf/ffb'
#mount_dir = '/cds/data/drpsrcf'
xtc_dir = os.path.join(mount_dir, exp[:3], exp, 'xtc')
ds = DataSource(exp=exp,run=runnum,dir=xtc_dir,intg_det='andor_vls',
batchandor_sizeimg =1,
andor.raw.value(evt)
psmonatmopal_publish=publish,img = atmopal.raw.image(evt)
detectors=['timing','andor_vls','atmopal'],
if atmopal_img is max_events=0,not None:
live=True)
# we will remove this for batch processing andif use "psplot" instead
# publish.local = True
def my_smalldata(data_dict):
sum_atmopal is None:
if 'unaligned_andor_norm' in data_dict:
andorsum_normatmopal = data_dict['unaligned_andor_norm']atmopal_img[0,:]
myplot = XYPlot(0,f"Andor (normalized) run:{runnum}",range(len(andor_norm)),andor_norm)
else:
publish.send('ANDOR',myplot)
if 'sum_atmopal' in+= data_dict:
atmopal_img[0,:]
atmopal_sum# = data_dict['sum_atmopal']
myplot = XYPlot(0,f"Atmopal (sum) run:{runnum}",range(len(atmopal_sum)), atmopal_sum)
also need to check for events missing due to damage
# publish.send('ATMOPAL', myplot)
for myrun in ds.runs():
(or compare against expected number of events)
andor = myrun.Detector('andor_vls')
atmopalndrop_inhibit += myruntiming.raw.DetectorinhibitCounts('atmopal'evt)
timing = myrun.Detector('timing')
smd = dssmd.smalldata(filename='mysmallh5.h5',batch_size=5, callbacks=[my_smalldata])
event(evt, mydata=nevt) # high rate data saved to h5
norm = 0
# ndrop_inhibitneed =to 0
check Matt's new sum_atmopal = None
timing-system data on every
cn_andor_events = 0
cn_intg_events = 0
ts_st = None
# event to make sure we haven't missed normalization
for nstep,step in enumerate(myrun.steps()):
# data due print('step:',nstep)
to deadtime
for nevt,evt in enumerate(step.events()):norm+=nevt # fake normalization
if tsandor_stimg is not None:
ts_st = evt.timestamp
cn_intgandor_events += 1
andor_img = andor.raw.value(evt)
#print('andor data on evt:',nevt,'ndrop_inhibit:',ndrop_inhibit)
print(f'BD{rank-1}: #andor_events: atmopal_img = atmopal.raw.image(evt{cn_andor_events} #intg_event:{cn_intg_events} st: {ts_st} en:{evt.timestamp}')
if atmopal_img is not None:
# check that the high-read readout group (2) didn't
if sum_atmopal is None:
# miss any events due to deadtime
sum_atmopal = atmopal_img[0,:]
if ndrop_inhibit[2]!=0: print('*** data lost due to deadtime')
else:
# need to prefix the name with sum"unaligned_atmopal += atmopal_img[0,:]
" so
# also need to check for events missing due to damage
# the low-rate andor dataset doesn't get padded
# (or compare against expected number of events)
# to align with the high rate datasets
ndrop_inhibit += timing.raw.inhibitCounts(evt)
smd.event(evt, mydata=nevt) # high rate data saved to h5
,
# need to check Matt's new timing-system data on every
unaligned_andor_norm=(andor_img/norm),
# event to make sure we haven't missed normalization
# data due to deadtime
sum_atmopal=sum_atmopal)
norm+=nevt # fake normalization=0
if andor_img is not None: ndrop_inhibit=0
cnsum_andor_eventsatmopal += 1None
#print('andor data on evt:',nevt,'ndrop_inhibit:',ndrop_inhibit)cn_intg_events = 0
print(f'BD{rank-1}: #andor_events: {cn_andor_events} #intg_event:{cn_intg_events} st: {ts_st} en:{evt.timestamp}')
# check that the high-read readout group (2) didn't
# miss any events due to deadtime
ts_st = None
smd.done()
|
And an sbatch script:
Code Block |
---|
language | bash |
---|
title | submit_run_andor.sh |
---|
|
#!/bin/bash
#SBATCH --partition=milano
#SBATCH --account=<your account here>
#SBATCH --job-name=run_andor
#SBATCH --nodes=1
#SBATCH --ntasks=5
#SBATCH --output=output-%j.txt
#SBATCH --error=output-%j.txt
##SBATCH --exclusive
#SBATCH -t 00:05:00
t_start=`date +%s`
exp=$1
runnum=$2
mpirun -n 5 python run_andor.py $exp $runnum
t_end=`date +%s`
echo PSJobCompleted TotalElapsed $((t_end-t_start)) |
After creating the above two scripts, you can submit the job with:
Code Block |
---|
|
sbatch submit_run_andor.sh rixc00121 121 |
You should be able to see the psplot(s) pop up automatically,
Image Added
To view list of psplots,
Code Block |
---|
|
In [1]: ls()
ID SLURM_JOB_ID EXP RUN if ndrop_inhibit[2]!=0: print('*** data lost due to deadtime')
NODE # need to prefix the name with "unaligned_" so
PORT STATUS
1 #43195784 the low-rate andor dataset doesn'trixc00121 get padded
121 sdfmilan005.sdf.slac.stanford.edu 12323 PLOTTED |
If you close
the plot window, the process is automatically removed from the list:
Code Block |
---|
|
In [2]: ls()
ID SLURM_JOB_ID EXP # to align with the high rate datasets
RUN NODE smd.event(evt, mydata=nevt,
PORT STATUS |
You can submit your analysis job again (any increasing run numbers are always monitored). For old job (previously submitted run number from the same node and same psplot port), they will NOT be shown automatically (STATUS: RECEIVED). You can reactivate them using show(ID) command.
Code Block |
---|
|
In unaligned_andor_norm=(andor_img/norm),
[2]: ls()
ID SLURM_JOB_ID EXP RUN NODE sum_atmopal=sum_atmopal)
norm=0
PORT STATUS ndrop_inhibit=0
1 3275205 rixc00121 121 sdfiana001.sdf.slac.stanford.edu 12323 RECEIVED
In [3]: show(1)
Main received {'msgtype': 3} from db-zmq-server |
To kill a plot, use kill(ID) or kill_all() to kill all plots.
Code Block |
---|
|
In [5]: kill(1)
In [6]: ls()
ID SLURM_JOB_ID EXPsum_atmopal = None
cn_intg_events = 0
ts_st = None
RUN smd.done()
|
And an sbatch script:
Code Block |
---|
language | bash |
---|
title | submit_run_andor.sh |
---|
|
#!/bin/bash
#SBATCH --partition=milano
#SBATCH --account=lcls:data
#SBATCH --job-name=run_andor
#SBATCH --nodes=1
#SBATCH --ntasks=5
#SBATCH --output=output-%j.txt
#SBATCH --error=output-%j.txt
##SBATCH --exclusive
#SBATCH -t 00:05:00
t_start=`date +%s`
exp=$1
runnum=$2
#srun python run_andor.py $exp $runnum ${socket}
mpirun -n 5 python run_andor.py $exp $runnum
t_end=`date +%s`
echo PSJobCompleted TotalElapsed $((t_end-t_start)) |
After creating the above two scripts, you can submit the job with:
Code Block |
---|
|
sbatch submit_run_andor.sh rixc00121 121NODE PORT STATUS |