...
Historical background: we went back and forth about how to manage the MPI helper-ranks. The alternative would have been to use callbacks instead of run/step/event loops to more effectively hide the helper-ranks from user code, but callbacks would have been user-unfriendly in a different way: writing loops is a more natural coding approach for many users. We felt the loop approach (with more fragile Detector objects that can be None) was the lesser of two evils.
Running psplot_live
From any drp nodesrix-daq node, source psana2 environment then run:
Code Block |
---|
language | bash |
---|
title | submit_large_psana2.sh |
---|
|
(ps-4.6.3) rix-daq:scripts> psplot_live ANDOR
|
This assumes that an analysis script has been submitted. An example of analysis script:
Python 3.9.16 | packaged by conda-forge | (main, Feb 1 2023, 21:39:03)
Type 'copyright', 'credits' or 'license' for more information
IPython 8.14.0 -- An enhanced Interactive Python. Type '?' for help.
In [1]: |
The above command activates psplot_live that listens to your analysis jobs (with plotting) and provides an interactive session. You can use the interactive session to list, kill, and reactivate plots. Note that to monitor more than one plot, you can use ' ' (space) to separate each plot name (e.g. psplot_live ANDOR ATMOPAL
).
Below shows an example of analysis (monitoring two plots: ANDOR and ATMOPAL) and job submission scripts that communicate directly to psplot_live. Note that if you are converting python script that works with psplot (no live), the main difference is shown on line 25 where you have to set psmon_publish=publish
as an additional DataSource argument. There may be other differences that need to be changed. Please let us know in this case.
Code Block |
---|
language | py |
---|
title | run_andor.py |
---|
linenumbers | true |
---|
|
from psana import DataSource
from psmon import publish
from psmon.plots import Image,XYPlot
import os, sys, time
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
os.environ['PS_SRV_NODES']='1'
os.environ['PS_SMD_N_EVENTS']='1'
# passing exp and runnum
exp=sys.argv[1]
runnum=int(sys.argv[2])
mount_dir = '/sdf/data/lcls/drpsrcf/ffb'
#mount_dir = '/cds/data/drpsrcf'
xtc_dir = os.path.join(mount_dir, exp[:3], exp, 'xtc')
ds = DataSource(exp=exp,run=runnum,dir=xtc_dir,intg_det='andor_vls',
batch_size=1,
psmon_publish=publish,
detectors=['timing','andor_vls','atmopal'],
max_events=0,
live=True)
def my_smalldata(data_dict):
if 'unaligned_andor_norm' in data_dict:
andor_norm = data_dict['unaligned_andor_norm'][0]
myplot = XYPlot(0,f"Andor (normalized) run:{runnum}",range(len(andor_norm)),andor_norm)
publish.send('ANDOR',myplot)
if 'sum_atmopal' in data_dict:
atmopal_sum = data_dict['sum_atmopal']
myplot = XYPlot(0,f"Atmopal (sum) run:{runnum}",range(len(atmopal_sum)), atmopal_sum)
publish.send('ATMOPAL', myplot)
for myrun in ds.runs():
andor = myrun.Detector('andor_vls')
atmopal = myrun.Detector('atmopal')
timing = myrun.Detector('timing')
smd = ds.smalldata(filename='mysmallh5.h5',batch_size=5, callbacks=[my_smalldata])
norm = 0
ndrop_inhibit = 0
sum_atmopal = None
cn_andor_events = 0
cn_intg_events = 0
ts_st = None
for nstep,step in enumerate(myrun.steps()):
print('step:',nstep)
for nevt,evt in enumerate(step.events()):
|
Code Block |
---|
language | py |
---|
title | run_andor.py |
---|
linenumbers | true |
---|
|
from psana import DataSource
from psmon import publish
from psmon.plots import Image,XYPlot
import os, sys, time
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
os.environ['PS_SRV_NODES']='1'
os.environ['PS_SMD_N_EVENTS']='1'
# passing exp and runnum
exp=sys.argv[1]
runnum=int(sys.argv[2])
mount_dir = '/sdf/data/lcls/drpsrcf/ffb'
#mount_dir = '/cds/data/drpsrcf'
xtc_dir = os.path.join(mount_dir, exp[:3], exp, 'xtc')
ds = DataSource(exp=exp,run=runnum,dir=xtc_dir,intg_det='andor_vls',
batch_size=1,
psmon_publish=publish,
detectors=['timing','andor_vls','atmopal'],
max_events=0,
live=True)
def my_smalldata(data_dict):
if 'unaligned_andor_norm' in data_dict:
andor_norm = data_dict['unaligned_andor_norm'][0]
myplot = XYPlot(0,f"Andor (normalized) run:{runnum}",range(len(andor_norm)),andor_norm)
if ts_st is None: ts_st = evt.timestamp
publish.send('ANDOR',myplot)
if 'sum_atmopal' in data_dict:
cn_intg_events += 1
atmopalandor_sumimg = data_dict['sum_atmopal']andor.raw.value(evt)
myplot = XYPlot(0,f"Atmopal (sum) run:{runnum}",range(len(atmopal_sum)), atmopal_sum)
atmopal_img = atmopal.raw.image(evt)
publish.send('ATMOPAL', myplot)
for myrun in ds.runs():
if atmopal_img is not None:
andor = myrun.Detector('andor_vls')
atmopal = myrun.Detector('atmopal')
if timingsum_atmopal = myrun.Detector('timing')
is None:
smd = ds.smalldata(filename='mysmallh5.h5',batch_size=5, callbacks=[my_smalldata])
norm = 0
ndropsum_inhibitatmopal = atmopal_img[0,:]
sum_atmopal = None
cn_andor_events = 0else:
cn_intg_events = 0
ts_st = None
for nstep,step in enumerate(myrun.steps()):
sum_atmopal += atmopal_img[0,:]
print('step:',nstep)
# also need to check for nevt,evt in enumerate(step.events()):
events missing due to damage
# if ts_st is None: ts_st = evt.timestamp(or compare against expected number of events)
cnndrop_intg_eventsinhibit += 1
timing.raw.inhibitCounts(evt)
smd.event(evt, mydata=nevt) # high rate data saved andor_img = andor.raw.value(evt)to h5
atmopal_img# = atmopal.raw.image(evt)
need to check Matt's new timing-system data on every
if atmopal_img is not None:
# event to make sure we haven't missed normalization
if sum_atmopal is None:
# data due to deadtime
sum_atmopal = atmopal_img[0,:]norm+=nevt # fake normalization
if andor_img is not elseNone:
sum_atmopalcn_andor_events += atmopal_img[0,:]1
# also need to check for events missing due to damage
#print('andor data on evt:',nevt,'ndrop_inhibit:',ndrop_inhibit)
# (or compare against expected number of events)
print(f'BD{rank-1}: #andor_events: {cn_andor_events} #intg_event:{cn_intg_events} st: {ts_st} en:{evt.timestamp}')
ndrop_inhibit += timing.raw.inhibitCounts(evt)
# check that the high-read readout group (2) didn't
smd.event(evt, mydata=nevt) # highmiss rateany dataevents saveddue to h5deadtime
# need to check Matt's new timing-system data on everyif ndrop_inhibit[2]!=0: print('*** data lost due to deadtime')
# eventneed to makeprefix surethe wename haven't missed normalizationwith "unaligned_" so
# data due to deadtime
# the low-rate andor dataset doesn't get padded
norm+=nevt # fake normalization
# to align with ifthe andor_img is not None:high rate datasets
cn_andor_events += 1smd.event(evt, mydata=nevt,
#print('andor data on evt:',nevt,'ndrop_inhibit:',ndrop_inhibit)
unaligned_andor_norm=(andor_img/norm),
print(f'BD{rank-1}: #andor_events: {cn_andor_events} #intg_event:{cn_intg_events} st: {ts_st} en:{evt.timestamp}')
sum_atmopal=sum_atmopal)
# check that the high-read readout group (2) didn't
norm=0
# miss any events due to deadtimendrop_inhibit=0
if ndrop_inhibit[2]!=0: print('*** data lost due to deadtime')
sum_atmopal = None
cn_intg_events = 0
# need to prefix the name with "unaligned_" so
ts_st = None
# the low-rate andor dataset doesn't get padded
# to align with the high rate datasets
smd.event(evt, mydata=nevt,
unaligned_andor_norm=(andor_img/norm),
smd.done()
|
And an sbatch script:
Code Block |
---|
language | bash |
---|
title | submit_run_andor.sh |
---|
|
#!/bin/bash
#SBATCH --partition=milano
#SBATCH --account=<your account here>
#SBATCH --job-name=run_andor
#SBATCH --nodes=1
#SBATCH --ntasks=5
#SBATCH --output=output-%j.txt
#SBATCH --error=output-%j.txt
##SBATCH --exclusive
#SBATCH -t 00:05:00
t_start=`date +%s`
exp=$1
runnum=$2
mpirun -n 5 python run_andor.py $exp $runnum
t_end=`date +%s`
echo PSJobCompleted TotalElapsed $((t_end-t_start)) |
After creating the above two scripts, you can submit the job with:
Code Block |
---|
|
sbatch submit_run_andor.sh rixc00121 121 |
You should be able to see the psplot(s) pop up automatically,
Image Added
To view list of psplots,
Code Block |
---|
|
In [1]: ls()
ID SLURM_JOB_ID EXP RUN NODE sum_atmopal=sum_atmopal)
PORT STATUS norm=0
1 43195784 rixc00121 121 sdfmilan005.sdf.slac.stanford.edu 12323 PLOTTED |
If you close
the plot window, the process is automatically removed from the list:
Code Block |
---|
|
In [2]: ls()
ID ndrop_inhibit=0
SLURM_JOB_ID EXP sum_atmopal = None
RUN NODE cn_intg_events = 0
ts_st = None
PORT smd.done()
|
And an sbatch script:
Code Block |
---|
language | bash |
---|
title | submit_run_andor.sh |
---|
|
#!/bin/bash
#SBATCH --partition=milano
#SBATCH --account=lcls:data
#SBATCH --job-name=run_andor
#SBATCH --nodes=1
#SBATCH --ntasks=5
#SBATCH --output=output-%j.txt
#SBATCH --error=output-%j.txt
##SBATCH --exclusive
#SBATCH -t 00:05:00
t_start=`date +%s`
exp=$1
runnum=$2
#srun python run_andor.py $exp $runnum ${socket}
mpirun -n 5 python run_andor.py $exp $runnum
t_end=`date +%s`
echo PSJobCompleted TotalElapsed $((t_end-t_start)) |
You can submit your analysis job again (any increasing run numbers are always monitored). For old job (previously submitted run number from the same node and same psplot port), they will NOT be shown automatically. You can reactivate them using show() command.
Code Block |
---|
|
(ps-4.6.3) sbatch submit_run_andor.sh rixc00121 121
Submitted batch job 43195858
(ps-4.6.3) sbatch submit_run_andor.sh rixc00121 122
Submitted batch job 43195859 |
From psplot_live interactive session, you can list the plotsAfter creating the above two scripts, you can submit the job with:
Code Block |
---|
|
sbatch submit_run_andor.sh rixc00121 121In [2]: ls()
ID SLURM_JOB_ID EXP RUN NODE PORT STATUS |