Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Historical background: we went back and forth about how to manage the MPI helper-ranks.  The alternative would have been to use callbacks instead of run/step/event loops to more effectively hide the helper-ranks from user code, but callbacks would have been user-unfriendly in a different way: writing loops is a more natural coding approach for many users.  We felt the loop approach (with more fragile Detector objects that can be None) was the lesser of two evils.


Running psplot_live

From any drp nodesrix-daq node, source psana2 environment then run:

Code Block
languagebash
titlesubmit_large_psana2.sh
(ps-4.6.3) rix-daq:scripts> psplot_live ANDOR

This assumes that an analysis script has been submitted. An example of analysis script:

Python 3.9.16 | packaged by conda-forge | (main, Feb  1 2023, 21:39:03) 
Type 'copyright', 'credits' or 'license' for more information
IPython 8.14.0 -- An enhanced Interactive Python. Type '?' for help.

In [1]: 

The above command activates psplot_live that listens to your analysis jobs (with plotting) and provides an interactive session. You can use the interactive session to list, kill, and reactivate plots. Note that to monitor more than one plot, you can use ' ' (space) to separate each plot name (e.g. psplot_live ANDOR ATMOPAL ). 


Below shows an example of analysis (monitoring two plots: ANDOR and ATMOPAL) and job submission scripts that communicate directly to psplot_live. Note that if you are converting python script that works with psplot (no live), the main difference is shown on line 25 where you have to set psmon_publish=publish as an additional DataSource argument. There may be other differences that need to be changed. Please let us know in this case. 

Code Block
languagepy
titlerun_andor.py
linenumberstrue
from psana import DataSource
from psmon import publish
from psmon.plots import Image,XYPlot
import os, sys, time
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
 

os.environ['PS_SRV_NODES']='1'
os.environ['PS_SMD_N_EVENTS']='1'


# passing exp and runnum
exp=sys.argv[1]
runnum=int(sys.argv[2])


mount_dir = '/sdf/data/lcls/drpsrcf/ffb'
#mount_dir = '/cds/data/drpsrcf'
xtc_dir = os.path.join(mount_dir, exp[:3], exp, 'xtc')
ds = DataSource(exp=exp,run=runnum,dir=xtc_dir,intg_det='andor_vls',
        batch_size=1, 
        psmon_publish=publish,
        detectors=['timing','andor_vls','atmopal'],
        max_events=0,
        live=True)


def my_smalldata(data_dict):
    if 'unaligned_andor_norm' in data_dict:
        andor_norm = data_dict['unaligned_andor_norm'][0]
        myplot = XYPlot(0,f"Andor (normalized) run:{runnum}",range(len(andor_norm)),andor_norm)
        publish.send('ANDOR',myplot)
    if 'sum_atmopal' in data_dict:
        atmopal_sum = data_dict['sum_atmopal']
        myplot = XYPlot(0,f"Atmopal (sum) run:{runnum}",range(len(atmopal_sum)), atmopal_sum)
        publish.send('ATMOPAL', myplot)
 
for myrun in ds.runs():
    andor = myrun.Detector('andor_vls')
    atmopal = myrun.Detector('atmopal')
    timing = myrun.Detector('timing')
    smd = ds.smalldata(filename='mysmallh5.h5',batch_size=5, callbacks=[my_smalldata])
    norm = 0
    ndrop_inhibit = 0
    sum_atmopal = None
    cn_andor_events = 0
    cn_intg_events = 0
    ts_st = None
    for nstep,step in enumerate(myrun.steps()):
        print('step:',nstep)
        for nevt,evt in enumerate(step.events()):
 
Code Block
languagepy
titlerun_andor.py
linenumberstrue
from psana import DataSource
from psmon import publish
from psmon.plots import Image,XYPlot
import os, sys, time
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
 

os.environ['PS_SRV_NODES']='1'
os.environ['PS_SMD_N_EVENTS']='1'


# passing exp and runnum
exp=sys.argv[1]
runnum=int(sys.argv[2])


mount_dir = '/sdf/data/lcls/drpsrcf/ffb'
#mount_dir = '/cds/data/drpsrcf'
xtc_dir = os.path.join(mount_dir, exp[:3], exp, 'xtc')
ds = DataSource(exp=exp,run=runnum,dir=xtc_dir,intg_det='andor_vls',
        batch_size=1, 
        psmon_publish=publish,
        detectors=['timing','andor_vls','atmopal'],
        max_events=0,
        live=True)


def my_smalldata(data_dict):
    if 'unaligned_andor_norm' in data_dict:
        andor_norm = data_dict['unaligned_andor_norm'][0]
        myplot = XYPlot(0,f"Andor (normalized) run:{runnum}",range(len(andor_norm)),andor_norm)
  if ts_st is None: ts_st = evt.timestamp
       publish.send('ANDOR',myplot)
    if 'sum_atmopal' in data_dict:
cn_intg_events += 1
            atmopalandor_sumimg = data_dict['sum_atmopal']andor.raw.value(evt)
        myplot = XYPlot(0,f"Atmopal (sum) run:{runnum}",range(len(atmopal_sum)), atmopal_sum)
atmopal_img = atmopal.raw.image(evt)
           publish.send('ATMOPAL', myplot)
 
for myrun in ds.runs():
 if atmopal_img is not None:
     andor = myrun.Detector('andor_vls')
    atmopal = myrun.Detector('atmopal')
   if timingsum_atmopal = myrun.Detector('timing')
is None:
       smd = ds.smalldata(filename='mysmallh5.h5',batch_size=5, callbacks=[my_smalldata])
    norm = 0
    ndropsum_inhibitatmopal = atmopal_img[0,:]
        sum_atmopal = None
    cn_andor_events = 0else:
    cn_intg_events = 0
         ts_st = None
    for nstep,step in enumerate(myrun.steps()):
 sum_atmopal += atmopal_img[0,:]
         print('step:',nstep)
   # also need to check for nevt,evt in enumerate(step.events()):
 events missing due to damage
            # if ts_st is None: ts_st = evt.timestamp(or compare against expected number of events)
            cnndrop_intg_eventsinhibit += 1
timing.raw.inhibitCounts(evt)
            smd.event(evt, mydata=nevt) # high rate data saved andor_img = andor.raw.value(evt)to h5
            atmopal_img# = atmopal.raw.image(evt)
       need to check Matt's new timing-system data on every
     if atmopal_img is not None:
   # event to make sure we haven't missed normalization
      if sum_atmopal is None:
   # data due to deadtime
             sum_atmopal = atmopal_img[0,:]norm+=nevt # fake normalization
            if andor_img is not elseNone:
                    sum_atmopalcn_andor_events += atmopal_img[0,:]1
            # also need to check for events missing due to damage
#print('andor data on evt:',nevt,'ndrop_inhibit:',ndrop_inhibit)
               # (or compare against expected number of events)
print(f'BD{rank-1}: #andor_events: {cn_andor_events} #intg_event:{cn_intg_events} st: {ts_st} en:{evt.timestamp}')
             ndrop_inhibit += timing.raw.inhibitCounts(evt)
   # check that the high-read readout group (2) didn't
              smd.event(evt, mydata=nevt) # highmiss rateany dataevents saveddue to h5deadtime
              # need to check Matt's new timing-system data on everyif ndrop_inhibit[2]!=0: print('*** data lost due to deadtime')
                # eventneed to makeprefix surethe wename haven't missed normalizationwith "unaligned_" so
            # data due to deadtime
# the low-rate andor dataset doesn't get padded
     norm+=nevt # fake normalization
        # to align with ifthe andor_img is not None:high rate datasets
                cn_andor_events += 1smd.event(evt, mydata=nevt,
                #print('andor data on evt:',nevt,'ndrop_inhibit:',ndrop_inhibit)
       unaligned_andor_norm=(andor_img/norm),
         print(f'BD{rank-1}: #andor_events: {cn_andor_events} #intg_event:{cn_intg_events} st: {ts_st} en:{evt.timestamp}')
           sum_atmopal=sum_atmopal)
     # check that the high-read readout group (2) didn't
      norm=0
          # miss any events due to deadtimendrop_inhibit=0
                if ndrop_inhibit[2]!=0: print('*** data lost due to deadtime')
sum_atmopal = None
                cn_intg_events = 0
   # need to prefix the name with "unaligned_" so
     ts_st = None
         # the low-rate andor dataset doesn't get padded
                # to align with the high rate datasets
                smd.event(evt, mydata=nevt,
                          unaligned_andor_norm=(andor_img/norm),
    smd.done()

And an sbatch script:

Code Block
languagebash
titlesubmit_run_andor.sh
#!/bin/bash
#SBATCH --partition=milano
#SBATCH --account=<your account here>
#SBATCH --job-name=run_andor
#SBATCH --nodes=1
#SBATCH --ntasks=5
#SBATCH --output=output-%j.txt
#SBATCH --error=output-%j.txt
##SBATCH --exclusive
#SBATCH -t 00:05:00

t_start=`date +%s`

exp=$1
runnum=$2
mpirun -n 5 python run_andor.py $exp $runnum

t_end=`date +%s`
echo PSJobCompleted TotalElapsed $((t_end-t_start))  

After creating the above two scripts, you can submit the job with:

Code Block
languagebash
titlesbatch
sbatch submit_run_andor.sh rixc00121 121

You should be able to see the psplot(s) pop up automatically, 

Image Added

To view list of psplots, 

Code Block
languagebash
titlesbatch
In [1]: ls()
ID    SLURM_JOB_ID EXP        RUN   NODE                      sum_atmopal=sum_atmopal)
          PORT  STATUS    norm=0
1     43195784     rixc00121  121   sdfmilan005.sdf.slac.stanford.edu   12323 PLOTTED

If you close (error) the plot window, the process is automatically removed from the list:

Code Block
languagebash
titlesbatch
In [2]: ls()
ID ndrop_inhibit=0
         SLURM_JOB_ID EXP      sum_atmopal = None
RUN   NODE             cn_intg_events = 0
                ts_st = None
  PORT  smd.done()

And an sbatch script:

Code Block
languagebash
titlesubmit_run_andor.sh
#!/bin/bash
#SBATCH --partition=milano
#SBATCH --account=lcls:data
#SBATCH --job-name=run_andor
#SBATCH --nodes=1
#SBATCH --ntasks=5
#SBATCH --output=output-%j.txt
#SBATCH --error=output-%j.txt
##SBATCH --exclusive
#SBATCH -t 00:05:00

t_start=`date +%s`

exp=$1
runnum=$2
#srun python run_andor.py $exp $runnum ${socket}
mpirun -n 5 python run_andor.py $exp $runnum

t_end=`date +%s`
echo PSJobCompleted TotalElapsed $((t_end-t_start))  
STATUS

You can submit your analysis job again (any increasing run numbers are always monitored). For old job (previously submitted run number from the same node and same psplot port), they will NOT be shown automatically. You can reactivate them using show() command. 

Code Block
languagebash
titlesbatch
(ps-4.6.3) sbatch submit_run_andor.sh rixc00121 121
Submitted batch job 43195858
(ps-4.6.3) sbatch submit_run_andor.sh rixc00121 122
Submitted batch job 43195859

From psplot_live interactive session, you can list the plotsAfter creating the above two scripts, you can submit the job with:

Code Block
languagebash
titlesbatch
sbatch submit_run_andor.sh rixc00121 121In [2]: ls()
ID    SLURM_JOB_ID EXP        RUN   NODE                                PORT  STATUS