Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Historical background: we went back and forth about how to manage the MPI helper-ranks.  The alternative would have been to use callbacks instead of run/step/event loops to more effectively hide the helper-ranks from user code, but callbacks would have been user-unfriendly in a different way: writing loops is a more natural coding approach for many users.  We felt the loop approach (with more fragile Detector objects that can be None) was the lesser of two evils.


Running psplot_live

From any drp nodes,

Code Block
languagebash
titlesubmit_large_psana2.sh
psplot_live ANDOR

This assumes that an analysis script has been submitted. An example of analysis script:

Code Block
languagepy
titlerun_andor.py
from psana import DataSource
from psmon import publish
from psmon.plots import Image,XYPlot
import os, sys, time
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
 

os.environ['PS_SRV_NODES']='1'
os.environ['PS_SMD_N_EVENTS']='1'


# passing exp and runnum
exp=sys.argv[1]
runnum=int(sys.argv[2])


mount_dir = '/sdf/data/lcls/drpsrcf/ffb'
#mount_dir = '/cds/data/drpsrcf'
xtc_dir = os.path.join(mount_dir, exp[:3], exp, 'xtc')
ds = DataSource(exp=exp,run=runnum,dir=xtc_dir,intg_det='andor_vls',
        batch_size=1, 
        psmon_publish=publish,
        detectors=['timing','andor_vls','atmopal'],
        max_events=0,
        live=True)


# we will remove this for batch processing and use "psplot" instead
# publish.local = True


def my_smalldata(data_dict):
    if 'unaligned_andor_norm' in data_dict:
        andor_norm = data_dict['unaligned_andor_norm'][0]
        myplot = XYPlot(0,f"Andor (normalized) run:{runnum}",range(len(andor_norm)),andor_norm)
        publish.send('ANDOR',myplot)
    if 'sum_atmopal' in data_dict:
        atmopal_sum = data_dict['sum_atmopal']
        myplot = XYPlot(0,f"Atmopal (sum) run:{runnum}",range(len(atmopal_sum)), atmopal_sum)
        publish.send('ATMOPAL', myplot)
 
for myrun in ds.runs():
    andor = myrun.Detector('andor_vls')
    atmopal = myrun.Detector('atmopal')
    timing = myrun.Detector('timing')
    smd = ds.smalldata(filename='mysmallh5.h5',batch_size=5, callbacks=[my_smalldata])
    norm = 0
    ndrop_inhibit = 0
    sum_atmopal = None
    cn_andor_events = 0
    cn_intg_events = 0
    ts_st = None
    for nstep,step in enumerate(myrun.steps()):
        print('step:',nstep)
        for nevt,evt in enumerate(step.events()):
            if ts_st is None: ts_st = evt.timestamp
            cn_intg_events += 1
            andor_img = andor.raw.value(evt)
            atmopal_img = atmopal.raw.image(evt)
            if atmopal_img is not None:
                if sum_atmopal is None:
                    sum_atmopal = atmopal_img[0,:]
                else:
                    sum_atmopal += atmopal_img[0,:]
            # also need to check for events missing due to damage
            # (or compare against expected number of events)
            ndrop_inhibit += timing.raw.inhibitCounts(evt)
            smd.event(evt, mydata=nevt) # high rate data saved to h5
            # need to check Matt's new timing-system data on every
            # event to make sure we haven't missed normalization
            # data due to deadtime
            norm+=nevt # fake normalization
            if andor_img is not None:
                cn_andor_events += 1
                #print('andor data on evt:',nevt,'ndrop_inhibit:',ndrop_inhibit)
                print(f'BD{rank-1}: #andor_events: {cn_andor_events} #intg_event:{cn_intg_events} st: {ts_st} en:{evt.timestamp}')
                # check that the high-read readout group (2) didn't
                # miss any events due to deadtime
                if ndrop_inhibit[2]!=0: print('*** data lost due to deadtime')
                # need to prefix the name with "unaligned_" so
                # the low-rate andor dataset doesn't get padded
                # to align with the high rate datasets
                smd.event(evt, mydata=nevt,
                          unaligned_andor_norm=(andor_img/norm),
                          sum_atmopal=sum_atmopal)
                norm=0
                ndrop_inhibit=0
                sum_atmopal = None
                cn_intg_events = 0
                ts_st = None
    smd.done()

And an sbatch script:

Code Block
languagebash
titlesubmit_run_andor.sh
#!/bin/bash
#SBATCH --partition=milano
#SBATCH --account=lcls:data
#SBATCH --job-name=run_andor
#SBATCH --nodes=1
#SBATCH --ntasks=5
#SBATCH --output=output-%j.txt
#SBATCH --error=output-%j.txt
##SBATCH --exclusive
#SBATCH -t 00:05:00

t_start=`date +%s`

exp=$1
runnum=$2
#srun python run_andor.py $exp $runnum ${socket}
mpirun -n 5 python run_andor.py $exp $runnum

t_end=`date +%s`
echo PSJobCompleted TotalElapsed $((t_end-t_start))  

After creating the above two scripts, you can submit the job with:

Code Block
languagebash
titlesbatch
sbatch submit_run_andor.sh rixc00121 121