Introduction

A general introduction to LCLS data analysis (psana1, ami2, jupyter, slurm) can be viewed in this 53 minutes video here.  This is for psana1 (i.e. LCLS1) but the user interface is quite similar to psana2 (for LCLS2) even if the internals are significantly improved for psana2:

video1502222460.mp4

JupyterHub

psana2 is also available in JupyterHub here in the kernel named "LCLS-II py3": https://pswww.slac.stanford.edu/jupyterhub/hub/home

Environment

To obtain the environment to run psana2, execute the following:

# For S3DF users (sdfiana nodes)
source /sdf/group/lcls/ds/ana/sw/conda2/manage/bin/psconda.sh

# For PCDS users (psana nodes)
source /cds/sw/ds/ana/conda2/manage/bin/psconda.sh

Note that LCLS-II psana is not compatible with LCLS-I psana, so environments must activate one or the other, but not both.  Since LCLS-I and LCLS-II live in different conda installations it is necessary to start a fresh session when switching between the two.

Public Practice Data

Publicly accessible practice data is located in S3DF in the directory /sdf/data/lcls/ds/prj/public01/xtc.  Use of this data requires the additional "dir=/sdf/data/lcls/ds/prj/public01/xtc" keyword argument to the DataSource object.

ExperimentRunComment
tmoc00118222Generic TMO dark data
rixx4351834A DAQ "fly scan" of motor (see ami#FlyScan:MeanVs.ScanValue)
rixx4351845A DAQ "step scan" of two motors
rixl101332063Rix stepping delay scan of both Vitara delay and ATM delay stage (lxt_ttc scan) at a single mono energy
rixl101332093Rix continuous mono scan with laser on/off data at a single time delay
rixx100382155An infinite sequence with two slow andors running at different rates
rixx100382168A finite burst sequence with one andor
uedcom1037epix10ka data
ueddaq02569epix10ka data


Detector Names

Use this command to see non-epics detector names (see "Detector Interface" example below):

(ps-4.1.0) psanagpu101:lcls2$ detnames exp=tmoc00118,run=222,dir=/sdf/data/lcls/ds/prj/public01/xtc
---------------------
Name      | Data Type
---------------------
epicsinfo | epicsinfo
timing    | raw      
hsd       | raw      
gmdstr0   | raw    
etc.  

From within python the above information can be obtained as a dictionary using "run.detinfo".

Use the same command with the "-e" option to see epics detector names (see "Detector Interface" example below).  These are slowly varying variables (like temperatures) that are not strongly time correlated with the regular per-shot detector data:

(ps-4.5.10) psanagpu101:~$ detnames -e exp=tmoc00118,run=222,dir=/sdf/data/lcls/ds/prj/public01/xtc | more
-----------------------------------------------------------------
Detector Name             | Epics Name                           
-----------------------------------------------------------------
StaleFlags                |                                      
Keithley_Sum              | EM2K0:XGMD:HPS:KeithleySum           
IM2K4_XrayPower           | IM2K4:PPM:SPM:VOLT_RBV               
IM3K4_XrayPower           | IM3K4:PPM:SPM:VOLT_RBV               
IM4K4_XrayPower           | IM4K4:PPM:SPM:VOLT_RBV               
etc.     

Note that either of these names ("Detector Name" or "Epics Name") can be used with the Detector Interface described below.  From within python the above information can be obtained as a dictionary using "run.epicsinfo".

Using the Detector Interface

Standard (per-shot) detectors and the slower epics variables can be accessed as shown here using the names discovered with the commands above.  NOTE: the slow epics variables are polled in the DAQ at 1Hz, so for the first one second of a run they will typically return "None".  NOTE: You can use tab-completion in ipython (see https://ipython.org/ipython-doc/3/interactive/tutorial.html) or the jupyter notebook to explore what you can do with the various detector objects.  NOTE: epics variables use a different syntax than other detectors (see example here):

from psana import DataSource
ds = DataSource(exp='tmoc00118', run=222, dir='/sdf/data/lcls/ds/prj/public01/xtc'\
, max_events=100)
myrun = next(ds.runs())
opal = myrun.Detector('tmo_atmopal')
epics_det = myrun.Detector('IM2K4_XrayPower')
for evt in myrun.events():
    img = opal.raw.image(evt)
    epics_val = epics_det(evt) # epics variables have a different syntax
    # check for missing data                                                    
    if img is None:
        print('no image')
    else:
        print(img.shape)
    if epics_val is None:
        print('no epics value')
    else:
        print(epics_val)


Example Script Producing Small HDF5 File

You can run this script with MPI: mpirun -n 6 python example.py

It also works on one core with: python example.py (useful for debugging).  See MPI rank/task diagram here to understand what different mpi ranks are doing.

This mechanism by defaults produces "aligned" datasets where missing values are padded (with NaN's for floats, and -99999 for integers).  To create an unaligned dataset (without padding) prefix the name of the variable with "unaligned_".

NOTE: In addition to the hdf5 you specify as your output file ("my.h5" below) you will see other h5 files like "my_part0.h5", one for each of the cores specified in PS_SRV_NODES.  The reason for this is that each of those SRV cores writes its own my_partN.h5 file: for LCLS2 it will be important for performance to write many files.  The "my.h5" file is actually quite small, and uses a new HDF5 feature called a "Virtual DataSet" (VDS) to join together the various my_partN.h5 files.  Also note that events in my.h5 will not be in time order.  If you copy the .h5 somewhere else, you need to copy all of them.

NOTE: In python, if you want to exit early you often use a "break" statement.  When running psana-python with mpi parallelization, however, not all cores will see this statement, and the result will be that your job will hang at the end.  To avoid this use the "max_events" keyword argument to DataSource (see example below).  An example of the failure mode is if an EB core communicates with only 1 BD core and that BD core exits, then the EB core gets stuck and doesn't exit (see MPITaskStructureToSupportScaling).  MPI requires all cores to exit gracefully for a job to exit.  The max_events kwarg allows the SMD0 core to shut everything down.

NOTE: The call below to smd.sum() (or min()/max()) only accepts numpy arrays or None.  This requirement allows cores that don't see any events to not cause errors when summing across cores.  All cores must call .sum() or the process will hang (a property of the underlying mpi "Reduce" call).

NOTE: The smd.event() and smd.save_summary() calsl (for persisting data to hdf5 or passing it to an SRV callback) can be passed either a series of kwargs or a dictionary.  The dictionary can optionally be hierarchical (e.g. d['mykey1']['mykey2']=value) and those keys will be reflected in the hdf5 dataset structure.  This allows users to organize hdf5 data with a structure that they prefer. 

from psana import DataSource
import numpy as np
import os

# OPTIONAL callback with "gathered" small data from all cores.
# usually used for creating realtime plots when analyzing from
# DAQ shared memory. Called back on each SRV node.
def my_smalldata(data_dict):
    print(data_dict)

# sets the number of h5 files to write. 1 is sufficient for 120Hz operation
# optional: only needed if you are saving h5.
os.environ['PS_SRV_NODES']='1'

ds = DataSource(exp='tmoc00118', run=222, dir='/sdf/data/lcls/ds/prj/public01/xtc', max_events=10)
# batch_size is optional. specifies how often the dictionary of small
# user data is gathered. if you write out large data (NOT RECOMMENDED) it needs to be set small.
smd = ds.smalldata(filename='mysmallh5.h5', batch_size=5, callbacks=[my_smalldata])

for run in ds.runs():
    opal = run.Detector('tmo_opal1')
    ebeam = run.Detector('ebeam')

    runsum  = np.zeros((3),dtype=float) # beware of datatypes when summing: can overflow
    for evt in run.events():
        img = opal.raw.image(evt)
        photonEnergy = ebeam.raw.ebeamPhotonEnergy(evt)
        # important: always check for missing data
        if img is None or photonEnergy is None: continue
        evtsum = np.sum(img)
        # pass either dictionary or kwargs
        smd.event(evt, evtsum=evtsum, photonEnergy=photonEnergy)
        runsum += img[0,:3] # local sum on one mpi core
 
    # optional summary data for whole run
    if smd.summary:
        tot_runsum = smd.sum(runsum) # sum (or max/min) across all mpi cores. Must be numpy array or None.
        # pass either dictionary or kwargs.
        smd.save_summary({'sum_over_run' : tot_runsum}, summary_int=1)
    smd.done()

Full-Featured Example: Callbacks, Detector Selection and Variable Length Data

You can run this script with MPI the same way as shown in the previous example: mpirun -n 6 python example.py

NOTE: if you don't run with at least 6 cores, the destination selection code below will not function correctly.  Also, the destination selection code currently only works with 1 EB core (defined by the PS_SMD_NODES environment variable, which defaults to 1).

NOTE: variable length data names must have a "var_" prefix

These additional arguments for DataSource were added to this example

  • detectors=['detname1', 'detname2',]

           List of detectors to be read from the disk. If you only need a few detectors for analysis, list their names here. The reading process will be faster since unused detector data is not read.

  • smd_callback=smd_callback

           You can use this callback to 1) decide which which event to read (yield evt) the large event data and 2) set the destination (rank id) to send this event to. 

  • small_xtc=['detname1', 'detname2']

           List of detectors to be used in smd_callback()

from psana import DataSource
import numpy as np
import os

# OPTIONAL callback with "gathered" small data from all cores.
# usually used for creating realtime plots when analyzing from
# DAQ shared memory. Called back on each SRV node.
def my_smalldata(data_dict):
    print(data_dict)  

# Event filtering and destination callback (runs on EB cores)
# Use this function to decide if you want to fetch large data for this event  
# and/or direct an event to process on a particular 'rank' 
# (this rank number should be between 1 and total no. of ranks
# (6 for "mpirun -n 6") minus 3 since 3 ranks are reserved for SMD0, EB, SRV
# processes). If a non-epics detector is needed in this routine, make sure to
# add the detector name in small_xtc kwarg for DataSource (see below).
# All epics and scan detectors are available automatically.
def smd_callback(run):
    opal = run.Detector('tmo_opal1')
    epics_det = run.Detector('IM2K4_XrayPower')

    n_bd_nodes = 3 # for mpirun -n 6, 3 ranks are reserved so there are 3 BigData ranks left

    for i_evt, evt in enumerate(run.events()):
        img = opal.raw.image(evt)
        epics_val = epics_det(evt)
        dest = (evt.timestamp % n_bd_nodes) + 1

        if epics_val is not None:
            # Set the destination (rank no.) where this event should be sent to
            evt.set_destination(dest)
            yield evt

# sets the number of h5 files to write. 1 is sufficient for 120Hz operation
# optional: only needed if you are saving h5.
os.environ['PS_SRV_NODES']='1'

ds = DataSource(exp='tmoc00118', run=222, dir='/sdf/data/lcls/ds/prj/public01/xtc',
        max_events  = 40,
        detectors   = ['epicsinfo', 'tmo_opal1', 'ebeam'],  # only reads these detectors (faster)
        smd_callback= smd_callback,                         # event-filtering/destination callback (see notes above)
        small_xtc   = ['tmo_opal1'],                        # detectors to be used in smalldata callback
        )

# batch_size is optional. specifies how often the dictionary of small
# user data is gathered.  if you write out large data (NOT RECOMMENDED) it needs to be set small.
smd = ds.smalldata(filename='mysmallh5.h5', batch_size=5, callbacks=[my_smalldata])

# used for variable-length data example
cnt = 0 
modulus = 4

for run in ds.runs():
    opal = run.Detector('tmo_opal1')
    ebeam = run.Detector('ebeam')

    runsum  = np.zeros((3),dtype=float) # beware of datatypes when summing: can overflow
    for evt in run.events():
        img = opal.raw.image(evt)
        photonEnergy = ebeam.raw.ebeamPhotonEnergy(evt)
        if img is None or photonEnergy is None: continue
        evtsum = np.sum(img)
        # pass either dictionary or kwargs
        smd.event(evt, evtsum=evtsum, photonEnergy=photonEnergy)
        runsum += img[0,:3] # local sum on one mpi core
 
        # an example of variable-length data (must have "var_" name prefix)
        if cnt % modulus:
            x = np.arange(cnt%modulus)
            y = [[x+1, (x+1)**2] for x in range(cnt%modulus)]
            smd.event(evt, {'var_test' : { 'x': x, 'y': y }})
        else:
            # Note, this works either way, either not sending anything or
            # sending 0-length data.  It should be noted if there is *no*
            # data in the entire run, the var_array is *not* written to the
            # output!
            pass
            #smd.event(evt, {'var_test' : { 'x': [], 'y': [] }})
        cnt += 1

    # optional summary data for whole run
    if smd.summary:
        tot_runsum = smd.sum(runsum) # sum (or max/min) across all mpi cores. Must be numpy array or None.
        # pass either dictionary or kwargs
        smd.save_summary({'sum_over_run' : tot_runsum}, summary_int=1)
    smd.done()

The variable length data can be read using a script similar to this.  The variable-length data is stored as two datasets: one is a long array of appended values and the other is a set of per-event lengths.  Historically this approach has been more compute-efficient than the native hdf5 "ragged" arrays which often requires python-looping.  When reading the user must keep track of the lengths (called "idx" here):

import h5py
f = h5py.File("mysmallh5.h5", "r")
n1 = f['timestamp']
n2 = f['var_test_len']
n3 = f['var_test/x']
n4 = f['var_test/y']
print(len(n1), len(n2))
idx = 0
for i in range(len(n1)):
    print("%d --> %d" % (n1[i], n2[i]))
    for j in range(n2[i]):
        print( "    %s | %s" % (n3[idx+j], n4[idx+j]))
    print("")
    idx += n2[i]

Excluding Detectors

In addition to only including specified detectors with the "detectors=[]" kwarg (see previous section) you can exclude detectors when creating a new datasource with xdetectors=[] argument.

# Create a datasource and tell it to exclude two detectors
from psana import DataSource
ds = DataSource(exp='tmoc00118', run=222, dir='/sdf/data/lcls/ds/prj/public01/xtc',
        xdetectors  = ['hsd'],      # all other detectors will be available
        max_events  = 10)
 
 
run = next(ds.runs())

# Create these detectors normally 
opal = run.Detector('tmo_opal1')
ebeam = run.Detector('ebeam')
for i, evt in enumerate(run.events()):
    img = opal.raw.image(evt)
    photonEnergy = ebeam.raw.ebeamPhotonEnergy(evt)
    print(f'got evt={i} ts={evt.timestamp} img={img.shape} {photonEnergy=}')

Note that other detectors that are also part of the excluded stream will also be excluded. You'll see a warning message like this one:

Warning: Stream-8 has one or more detectors matched with the excluded set. All detectors in this stream will be excluded.

If you're trying to access detectors that are part of the excluded list, you'll see DetectorNameError  message below:

Traceback (most recent call last):
  File "/cds/home/m/monarin/lcls2/psana/psana/tests/dummy.py", line 12, in <module>
    ebeam = run.Detector('ebeam')
  File "/cds/home/m/monarin/lcls2/psana/psana/psexp/run.py", line 119, in Detector
    raise DetectorNameError(err_msg)
psana.psexp.run.DetectorNameError: No available detector class matched with ebeam. If this is a new detector/version, make sure to add new class in detector folder.

Accessing Event Codes

LCLS1-style event codes can be accessed using the "timing" detector:

from psana import DataSource
ds = DataSource(exp='tmoc00118',run=222,dir='/sdf/data/lcls/ds/prj/public01/xtc')
myrun = next(ds.runs())
timing = myrun.Detector('timing')
for nevt,evt in enumerate(myrun.events()):
    allcodes = timing.raw.eventcodes(evt)
    # event code 15 fires at 1Hz, and this exp/run had 10Hz triggers            
    print('event code 15 present:',allcodes[15])
    if nevt>20: break

Running in Parallel

Example of slurm scripts to submit are here:  Submitting SLURM Batch Jobs.  Instructions for submitting batch jobs at S3DF to run in parallel are here: https://confluence.slac.stanford.edu/display/PCDS/Running+at+S3DF.

Analyzing Scans

To get a list of the scan variables, use the following command:

(ps-4.3.2) psanagpu102:lcls2$ detnames -s exp=rixx43518,run=45,dir=/sdf/data/lcls/ds/prj/public01/xtc
--------------------------
Name           | Data Type
--------------------------
motor1         | raw      
motor2         | raw      
step_value     | raw      
step_docstring | raw      
--------------------------
(ps-4.3.2) psanagpu102:lcls2$  

From within python the above information can be obtained as a dictionary using "run.scaninfo".

Note that "step_value"/"step_docstring" are convenience values defined by the DAQ to be a single value/string that is supposed to represent what has happened on a step, for use in plots/printout, since it's not easy to plot, for example, vs. multiple motor positions in a complex scan.

Code similar to this can be used to access the above scan variables:

from psana import DataSource
ds = DataSource(exp='rixx43518',run=45,dir='/sdf/data/lcls/ds/prj/public01/xtc')
myrun = next(ds.runs())
motor1 = myrun.Detector('motor1')
motor2 = myrun.Detector('motor2')
step_value = myrun.Detector('step_value')
step_docstring = myrun.Detector('step_docstring')
for step in myrun.steps():
    print(motor1(step),motor2(step),step_value(step),step_docstring(step))
    for evt in step.events():
        pass

Running From Shared Memory

psana2 scripts can be run in real-time on shared memory.  There is some extra complexity compared to writing python code for the real-time GUI ami, for example:

  • you have to create plots and handle when when they update
  • you have to worry about data getting out of time order
  • you have to handle missing data
  • you have to reset plots on run boundaries (if that's the behavior you want)
  • you have to write code to handle the data gathered from multiple mpi cores (not required if you want to run on 1 core)
  • be aware that the shared memory python scripts "steal" event statistics from other shared memory instances (e.g. ami)

but you get to take advantage of the power/flexibility of python.  Look at the DAQ .cnf file (e.g. /cds/group/pcds/dist/pds/rix/scripts/rix.cnf) to see what the name of the node is running the shared memory server ("monReqServer").  To access those nodes you need to be on a special permissions list (email pcds-ana-l@slac.stanford.edu to request).  You can find the name of the shared memory (hutch name is typically used) either by looking on the .cnf file (the "-P" option to monReqServer executable) or doing a command like this:

drp-neh-cmp003:~$ ls /dev/shm/
PdsMonitorSharedMemory_tmo
drp-neh-cmp003:~$

For this output, you would use "DataSource(shmem='tmo')".

Note that one can develop the shared memory scripts (including real-time plots) using offline data, then change the DataSource line to run them in real-time.

Typically psmon is used for publishing results to realtime plots in the callback, publishing updates every "N" events.  See this link for psmon examples: Visualization Tools.

When running multi-core with mpi one has to use the small data "callbacks" kwarg to receive the data gathered from all nodes.  An example multi-core script is below (also works on 1 core).  A pure 1-core script is simpler (no data "gathering" needed: can just be a loop over events with plot updates every N events).  For multi-core, this can be run with "mpirun -n 3 python <scriptname>.py" and the two plots can be viewed on the same node with "psplot -a 1 OPALSUMS OPALIMG" (see Visualization Tools for "psplot" options).  It can also be run in the usual offline manner by changing the DataSource line.  It is written in a way to keep running even when the DAQ is restarted:

import os
import numpy as np
from psana import DataSource
from psmon import publish
from psmon.plots import XYPlot,Image
from collections import deque

from mpi4py import MPI
numworkers = MPI.COMM_WORLD.Get_size()-1
if numworkers==0: numworkers=1 # the single core case (no mpi)

os.environ['PS_SRV_NODES']='1' # one mpi core gathers/plots data

mydeque=deque(maxlen=25)
lastimg=None
numevents=0
numendrun=0

def my_smalldata(data_dict): # one core gathers all data from mpi workers
    global numevents,lastimg,numendrun,mydeque
    if 'endrun' in data_dict:
        numendrun+=1
        if numendrun==numworkers:
            print('Received endrun from all workers. Resetting data.')
            numendrun=0
            numevents=0
            mydeque=deque(maxlen=25)
        return
    numevents += 1
    if 'opal' in data_dict:
        lastimg = data_dict['opal']
    mydeque.append(data_dict['opalsum'])
    if numevents%100==0: # update plots around 1Hz
        print('event:',numevents)
        myxyplot = XYPlot(numevents, "Last 25 Sums", np.arange(len(mydeque)), np
.array(mydeque), formats='o')
        publish.send("OPALSUMS", myxyplot)
        if lastimg is not None: # opal image is not sent all the time
            myimgplot = Image(numevents, "Opal Image", lastimg)
            publish.send("OPALIMG", myimgplot)

while 1: # mpi worker processes
    ds = DataSource(shmem='rix')
    smd = ds.smalldata(batch_size=5, callbacks=[my_smalldata])
    for myrun in ds.runs():
        opal = myrun.Detector('atmopal')
        for nevt,evt in enumerate(myrun.events()):
            mydict={}
            image = opal.raw.image(evt)
            if image is None: continue
            # do as much work as possible in the workers
            # don't send large data all the time, if possible
            if nevt%10==0: mydict['opal']=image
            mydict['opalsum']=np.sum(image)
            smd.event(evt,mydict)
        smd.event(evt,{'endrun':1}) # tells gatherer to reset plots

When running multi-node mpi there are also some complexities propagating the environment to remote nodes: the way to address that is described in this link.  

Running in LIVE mode

Here's a sample python script, how you can config datasource to run in live mode:

livemode.py
# Use environment variable to specify how many attempts,
# the datasource should wait for file reading (1 second wait).
# In this example, we set it to 30 (wait up 30 seconds).
import os
os.environ['PS_SMD_MAX_RETRIES'] = '30'


# Create a datasource with live flag
from psana import DataSource
ds = DataSource(exp='tmoc00118', run=222, dir='/sdf/data/lcls/ds/prj/public01/xtc', 
        live        = True,
        max_events  = 10)


# Looping over your run and events as usual
# You'll see "wait for an event..." message in case
# The file system writing is slower than your analysis
run = next(ds.runs())
for i, evt in enumerate(run.events()):
    print(f'got evt={i} ts={evt.timestamp}')

Note that this script is also available in tests folder in lcls2 repository. You can run the script by:

(ps-4.3.2) psanagpu109 tests $ mpirun -n 3 python livemode.py

Jump to events

Psana2 can jump to particular events when given a list of timestamps. In the example below, we only want to process these four timestamps. Note that timestamp array should be formatted to np.uint64.

test_jump.py
from psana import DataSource
import numpy as np
timestamps = np.array([4194783241933859761,4194783249723600225,4194783254218190609,4194783258712780993], dtype=np.uint64)
ds = DataSource(exp='tmoc00118', run=222, dir='/sdf/data/lcls/ds/prj/public01/xtc',
        timestamps=timestamps)
myrun = next(ds.runs())
opal = myrun.Detector('tmo_atmopal')
for nevt, evt in enumerate(myrun.events()):
    img = opal.raw.image(evt)
    print(nevt, evt.timestamp, img.shape)

Here is the output

(ps-4.5.5) monarin@psanagpu111 (master *) psana2 $ python test_jump.py 
0 4194783241933859761 (1024, 1024)
1 4194783249723600225 (1024, 1024)
2 4194783254218190609 (1024, 1024)
3 4194783258712780993 (1024, 1024)

Event Times In Local Time Zone

NOTE: the method evt.datetime().timestamp() can also be used to generate more natural timestamps (e.g. for use in plots).

import datetime as dt
from psana import DataSource
ds = DataSource(exp='tmoc00118', run=222, dir='/sdf/data/lcls/ds/prj/public01/xtc')
myrun = next(ds.runs())
for nevt,evt in enumerate(myrun.events()):
    if nevt>3: break
    t = evt.datetime()
    localt = t.replace(tzinfo=dt.timezone.utc).astimezone(tz=None)    
    print(localt.strftime('%H:%M:%S'))

Running with Integrating Detector

Here's a sample python script, how you can analyze a run with an integrating detector (in this example, it's 'andor'). For more detail about the technical design, visit this page.

Two keys variables when running in integrating-detector mode: PS_SMD_N_EVENTS (and environment variable) which is how many events SMD0 will send to each EB core, and the batch_size kwarg which controls how many events the EB cores send to the BD cores.  When running integrating detectors then the "units" of these two variables change from being "high-rate events" to being "low-rate integrating-detector events".  batch_size should be set to less than PS_SMD_N_EVENTS.  The max_events kwarg to DataSource also changes to count integrating-detector events.  Unfortunately, the batch_size kwarg to the DataSource.smalldata() call does NOT count integrating-detector events.  We would like to simplify this in the future.

NOTE: part of psana ("SMD0") has buffers that need to be large enough to accommodate at least one event from the integrating detector.  If PS_SMD_N_EVENTS is set too large and this is not possible then an error message is printed and psana exits.

intg_det.py
# The PS_SMD_N_EVENTS should be set to a small number (e.g. 1)
# since all other events which are part of this intg. event will be sent
# in the same batch.

import os
os.environ['PS_SMD_N_EVENTS'] = '1'
batch_size = 1

from psana import DataSource
ds = DataSource(exp='xpptut15', run=1, dir='/sdf/data/lcls/ds/prj/public01/xtc/intg_det',
        intg_det='andor',         
        batch_size=batch_size)
run = next(ds.runs())
hsd = run.Detector('hsd')
andor = run.Detector('andor')

# Test calculating sum of the hsd for each integrating event.
sum_hsd = 0
for i_evt, evt in enumerate(run.events()):
    hsd_calib = hsd.raw.calib(evt)
    andor_calib = andor.raw.calib(evt)

    # Keep summing the value of the other detector (hsd in this case)
    sum_hsd += np.sum(hsd_calib[:])/np.prod(hsd_calib.shape)
    
    # When an integrating event is found, print out and reset the sum variable
    if andor_calib is not None:
        val_andor = np.sum(andor_calib[:])/np.prod(andor_calib.shape)
        print(f'i_evt: {i_evt} andor: {val_andor} sum_hsd:{sum_hsd}')
        sum_hsd = 0

RIX Example: Deadtime and Integrating Detectors

Sometimes data for a particular shot can not be read out because DAQ data buffers are unavailable.  This is called "deadtime".  Deadtime can make it impossible to reliably use high-rate shots to perform operations like normalizations for lower-rate integrating detectors.  Matt Weaver is adding counters to the timing system so the DAQ can count how many shots whose data has been dropped due to deadtime so experimenters can make a decision about whether there is sufficient data for a normalization-style analysis.

This example demonstrates (conceptually!) how to do the normalization, and view the results in a psmon plot (use the command "psplot ANDOR" on the same node).  Run this script either with "python andor.py" or "mpirun -n 5 andor.py":

from psana import DataSource
from psmon import publish
from psmon.plots import Image,XYPlot
import os
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

# The PS_SMD_N_EVENTS should be set to a small number (e.g. 1)
# since all other events which are part of this intg. event will be sent
# in the same batch.
os.environ['PS_SMD_N_EVENTS'] = '1'
os.environ['PS_SRV_NODES']='1'

if rank==4: # hack for now to eliminate use of publish.local below
    publish.init()

# we will remove this for batch processing and use "psplot" instead
# publish.local = True

def my_smalldata(data_dict):
    if 'unaligned_andor_norm' in data_dict:
        andor_norm = data_dict['unaligned_andor_norm'][0]
        myplot = XYPlot(0,"Andor (normalized)",range(len(andor_norm)),andor_norm
)
        publish.send('ANDOR',myplot)

ds = DataSource(exp='rixx1003821',run=46,dir='/sdf/data/lcls/ds/prj/public01/xtc',intg_det='andor_vls',batch_size=1)
for myrun in ds.runs():
    andor = myrun.Detector('andor_vls')
    timing = myrun.Detector('timing')
    smd = ds.smalldata(filename='mysmallh5.h5',batch_size=1000, callbacks=[my_smalldata])
    norm = 0
    ndrop_inhibit = 0
    for nstep,step in enumerate(myrun.steps()):
        print('step:',nstep)
        for nevt,evt in enumerate(step.events()):
            andor_img = andor.raw.value(evt)
            # also need to check for events missing due to damage
            # (or compare against expected number of events)
            ndrop_inhibit += timing.raw.inhibitCounts(evt)
            smd.event(evt, mydata=nevt) # high rate data saved to h5
            # need to check Matt's new timing-system data on every
            # event to make sure we haven't missed normalization
            # data due to deadtime
            norm+=nevt # fake normalization
            if andor_img is not None:
                print('andor data on evt:',nevt,'ndrop_inhibit:',ndrop_inhibit)
                # check that the high-read readout group (2) didn't
                # miss any events due to deadtime
                if ndrop_inhibit[2]!=0: print('*** data lost due to deadtime')
                # need to prefix the name with "unaligned_" so
                # the low-rate andor dataset doesn't get padded
                # to align with the high rate datasets
                smd.event(evt, mydata=nevt,
                          unaligned_andor_norm=(andor_img/norm))
                norm=0
                ndrop_inhibit=0

AMI and Integrating Detectors

AMI is also affected by the DAQ deadtime issue described above but, in addition, events can also be thrown away going into the AMI shared memory (this feature is what allows AMI analysis to stay real-time).  This can cause additional issues with AMI integrating-detector normalization analyses that try to use high-rate shots to do normalizations for low-rate integrating detectors, since not all high-rate shots are guaranteed to be available.  Also, unlike psana, AMI doesn't currently have the idea of shots that "belong together" which will make an AMI normalization-style analysis of an integrating detector impossible.

MPI Task Structure To Support Scaling

psana can scale to allow for high rate analysis.  For example, many hdf5 files of small user-defined data (described above in Example Script Producing Small HDF5 File) can be written, one per "SRV" node in the diagram below.  The total number of SRV nodes is defined by the environment variable PS_SRV_NODES (defaults to 0).  These many hdf5 files are joined by psana into what appears to be one file using the hdf5 "virtual dataset" feature.  Similarly, multiple nodes can be used for filtering ("EB" nodes in the diagram below) and multiple nodes can be used to process big data in the main psana event loop ("BD" nodes in the digram below).  The one piece that cannot be scaled (currently) to multiple nodes is the SMD0 (SMallData) task, which reads the timestamps and fseek offsets from each tiny .smd.xtc2 file produced by the DAQ (typically one per detector, or one per detector segment, although it can contain more than one segment or detector).  This task joins together the relevant data for each shot ("event build") using the timestamp.  This SMD0 task is multi-threaded, with one thread for each detector.  For highest performance it is important that all SMD0 threads be allocated an entire MPI node.

Running a large job

Below shows how to setup a slurm job script to run a large job. This script uses setup_hosts_openmpi.sh  (also provided below) to assign a single node to SMD0 (see diagram above) and distribute all other tasks (EB, BD, & SRV) to the rest of available nodes. After source setup_hosts_openmpi.sh , you can use $PS_N_RANKS and $PS_HOST_FILE in your mpirun command. 


Note on no. of ranks available: on S3DF, there are 120 cores per compute node. The below example asks for 3 nodes (120 x 3 = 360 ranks in total) will only have 120 x (3 - 1) + 1 = 241 ranks available. This is because all 120 ranks on the first node is fully reserved for SMD0. 

submit_large_psana2.sh
#!/bin/bash
#SBATCH --partition=milano
#SBATCH --job-name=run_large_psana2
#SBATCH --output=output-%j.txt
#SBATCH --error=output-%j.txt
#SBATCH --nodes=3
#SBATCH --exclusive
#SBATCH --time=10:00

# Configure psana2 parallelization
source setup_hosts_openmpi.sh

# Run your job
mpirun -np $PS_N_RANKS --hostfile $PS_HOST_FILE python test_mpi.py

Here is the `setup_hosts_openmpi.sh`. You can create this script in your job folder. 

setup_hosts_openmpi.sh
############################################################
# First node must be exclusive to smd0
# * For openmpi, slots=1 must be assigned to the first node.
############################################################

# Get list of hosts by expand shorthand node list into a 
# line-by-line node list
host_list=$(scontrol show hostnames $SLURM_JOB_NODELIST)
hosts=($host_list)

# Write out to host file by putting rank 0 on the first node
host_file="slurm_host_${SLURM_JOB_ID}"
for i in "${!hosts[@]}"; do
    if [[ "$i" == "0" ]]; then
        echo ${hosts[$i]} slots=1 > $host_file
    else
        echo ${hosts[$i]} >> $host_file
    fi
done

# Export hostfile for mpirun  
export PS_HOST_FILE=$host_file

# Calculate no. of ranks available in the job
export PS_N_RANKS=$(( SLURM_CPUS_ON_NODE * ( SLURM_JOB_NUM_NODES - 1 ) + 1 ))

Performance Tuning Tips

(see MPITaskStructureToSupportScaling to get a sense for where the parameters described here are used)

To get improved performance when running large jobs consider the following options.  It is not straightforward to set these parameters optimally for an arbitrary analysis job so some study is required for specific applications.  In some cases we can offer guidelines.

  • understand the CPU usage of your big-data ("BD") processing loop to make sure the bottleneck isn't "user code".  this can typically be done by running on 1 core.
  • increase the environment variable PS_EB_NODES to be larger than its default of 1.  For many analyses, a number that is 1/16 of the number of big data cores has been good.  This variable, along with the PS_SRV_NODES variable described next determines how many cores are used for which task in psana (see MPITaskStructureToSupportScaling).
  • if you're writing a large amount of hdf5 data increase the environment variable PS_SRV_NODES to have more cores writing hdf5 files.  It is difficult here to provide guidance on the number since it depends on the application
  • set environment variable PS_SMD_N_EVENTS larger to increase the number of events that get sent in a "batch" when transmitting data from SMD0 cores through to BD cores
  • when setting up the smalldata, increase the number of events that get sent in a "batch" when transmitting data from BD cores to SRV cores by setting the batch_size kwarg in the DataSource.smalldata() call.

psana also has some grafana monitoring built in that, with expert help, can be used to identify bottlenecks in an analysis.  Contact pcds-ana-l@slac.stanford.edu for guidance.

Sorting Small HDF5 File

The small hdf5 file is likely unsorted due to parallelism in psana2. In case your output h5 is large (> 1 billion records), you can use timestamp_sort_h5  tool by submitting the following job:

submit_large_psana2.sh
#!/bin/bash
#SBATCH --partition=milano
#SBATCH --job-name=timestamp_sort_h5
#SBATCH --output=output-%j.txt
#SBATCH --error=output-%j.txt
#SBATCH --nodes=1
#SBATCH --exclusive
#SBATCH --time=10:00

timestamp_sort_h5 /sdf/data/lcls/drpsrcf/ffb/users/monarin/h5/mylargeh5.h5 /sdf/data/lcls/drpsrcf/ffb/users/monarin/h5/output/result.h5

Note the first required argument is the unsorted hdf5 file and the second is the desired output file. There are other optional arguments, which can be access by running timestamp_sort_h5 --help. 

Why Is My Detector Object "None" On Some MPI Ranks?

In psana2 all mpi ranks execute the same code, but not all ranks can create a Detector object since there are “hidden” MPI helper-ranks shown in this diagram: MPITaskStructureToSupportScaling.  For those helper-ranks the Detector object will be None.  Those helper-ranks won’t enter psana2 loops over runs/steps/events, so as long as you only use a Detector object inside loops your code will run correctly without any special checks.  However, if you use the Detector object outside those loops you must check that the Detector object is not None before calling any methods.

Historical background: we went back and forth about how to manage the MPI helper-ranks.  The alternative would have been to use callbacks instead of run/step/event loops to more effectively hide the helper-ranks from user code, but callbacks would have been user-unfriendly in a different way: writing loops is a more natural coding approach for many users.  We felt the loop approach (with more fragile Detector objects that can be None) was the lesser of two evils.


Running psplot_live

From rix-daq node, source psana2 environment then run:

submit_large_psana2.sh
(ps-4.6.3) rix-daq:scripts> psplot_live ANDOR
Python 3.9.16 | packaged by conda-forge | (main, Feb  1 2023, 21:39:03) 
Type 'copyright', 'credits' or 'license' for more information
IPython 8.14.0 -- An enhanced Interactive Python. Type '?' for help.

In [1]: 

The above command activates psplot_live that listens to your analysis jobs (with plotting) and provides an interactive session. You can use the interactive session to list, kill, and reactivate plots. Note that to monitor more than one plot, you can use ' ' (space) to separate each plot name (e.g. psplot_live ANDOR ATMOPAL ). 


Below shows an example of analysis (monitoring two plots: ANDOR and ATMOPAL) and job submission scripts that communicate directly to psplot_live. Note that if you are converting python script that works with psplot (no live), the main difference is shown on line 25 where you have to set psmon_publish=publish as an additional DataSource argument. There may be other differences that need to be changed. Please let us know in this case. 

run_andor.py
from psana import DataSource
from psmon import publish
from psmon.plots import Image,XYPlot
import os, sys, time
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
 

os.environ['PS_SRV_NODES']='1'
os.environ['PS_SMD_N_EVENTS']='1'


# passing exp and runnum
exp=sys.argv[1]
runnum=int(sys.argv[2])


mount_dir = '/sdf/data/lcls/drpsrcf/ffb'
#mount_dir = '/cds/data/drpsrcf'
xtc_dir = os.path.join(mount_dir, exp[:3], exp, 'xtc')
ds = DataSource(exp=exp,run=runnum,dir=xtc_dir,intg_det='andor_vls',
        batch_size=1, 
        psmon_publish=publish,
        detectors=['timing','andor_vls','atmopal'],
        max_events=0,
        live=True)


def my_smalldata(data_dict):
    if 'unaligned_andor_norm' in data_dict:
        andor_norm = data_dict['unaligned_andor_norm'][0]
        myplot = XYPlot(0,f"Andor (normalized) run:{runnum}",range(len(andor_norm)),andor_norm)
        publish.send('ANDOR',myplot)
    if 'sum_atmopal' in data_dict:
        atmopal_sum = data_dict['sum_atmopal']
        myplot = XYPlot(0,f"Atmopal (sum) run:{runnum}",range(len(atmopal_sum)), atmopal_sum)
        publish.send('ATMOPAL', myplot)
 
for myrun in ds.runs():
    andor = myrun.Detector('andor_vls')
    atmopal = myrun.Detector('atmopal')
    timing = myrun.Detector('timing')
    smd = ds.smalldata(filename='mysmallh5.h5',batch_size=5, callbacks=[my_smalldata])
    norm = 0
    ndrop_inhibit = 0
    sum_atmopal = None
    cn_andor_events = 0
    cn_intg_events = 0
    ts_st = None
    for nstep,step in enumerate(myrun.steps()):
        print('step:',nstep)
        for nevt,evt in enumerate(step.events()):
            if ts_st is None: ts_st = evt.timestamp
            cn_intg_events += 1
            andor_img = andor.raw.value(evt)
            atmopal_img = atmopal.raw.image(evt)
            if atmopal_img is not None:
                if sum_atmopal is None:
                    sum_atmopal = atmopal_img[0,:]
                else:
                    sum_atmopal += atmopal_img[0,:]
            # also need to check for events missing due to damage
            # (or compare against expected number of events)
            ndrop_inhibit += timing.raw.inhibitCounts(evt)
            smd.event(evt, mydata=nevt) # high rate data saved to h5
            # need to check Matt's new timing-system data on every
            # event to make sure we haven't missed normalization
            # data due to deadtime
            norm+=nevt # fake normalization
            if andor_img is not None:
                cn_andor_events += 1
                #print('andor data on evt:',nevt,'ndrop_inhibit:',ndrop_inhibit)
                print(f'BD{rank-1}: #andor_events: {cn_andor_events} #intg_event:{cn_intg_events} st: {ts_st} en:{evt.timestamp}')
                # check that the high-read readout group (2) didn't
                # miss any events due to deadtime
                if ndrop_inhibit[2]!=0: print('*** data lost due to deadtime')
                # need to prefix the name with "unaligned_" so
                # the low-rate andor dataset doesn't get padded
                # to align with the high rate datasets
                smd.event(evt, mydata=nevt,
                          unaligned_andor_norm=(andor_img/norm),
                          sum_atmopal=sum_atmopal)
                norm=0
                ndrop_inhibit=0
                sum_atmopal = None
                cn_intg_events = 0
                ts_st = None
    smd.done()

And an sbatch script:

submit_run_andor.sh
#!/bin/bash
#SBATCH --partition=milano
#SBATCH --account=<your account here>
#SBATCH --job-name=run_andor
#SBATCH --nodes=1
#SBATCH --ntasks=5
#SBATCH --output=output-%j.txt
#SBATCH --error=output-%j.txt
##SBATCH --exclusive
#SBATCH -t 00:05:00

t_start=`date +%s`

exp=$1
runnum=$2
mpirun -n 5 python run_andor.py $exp $runnum

t_end=`date +%s`
echo PSJobCompleted TotalElapsed $((t_end-t_start))  

After creating the above two scripts, you can submit the job with:

sbatch
sbatch submit_run_andor.sh rixc00121 121

You should be able to see the psplot(s) pop up automatically, 

To view list of psplots, 

sbatch
In [1]: ls()
ID    SLURM_JOB_ID EXP        RUN   NODE                                PORT  STATUS    
1     43195784     rixc00121  121   sdfmilan005.sdf.slac.stanford.edu   12323 PLOTTED

If you close (error) the plot window, the process is automatically removed from the list:

sbatch
In [2]: ls()
ID    SLURM_JOB_ID EXP        RUN   NODE                                PORT  STATUS

You can submit your analysis job again (any increasing run numbers are always monitored). For old job (previously submitted run number from the same node and same psplot port), they will NOT be shown automatically (STATUS: RECEIVED). You can reactivate them using show(ID) command. 

sbatch
In [2]: ls()
ID    SLURM_JOB_ID EXP        RUN   NODE                                PORT  STATUS    
1     3275205      rixc00121  121   sdfiana001.sdf.slac.stanford.edu    12323 RECEIVED  

In [3]: show(1)
Main received {'msgtype': 3} from db-zmq-server

To kill a plot, use kill(ID) or kill_all() to kill all plots. 

sbatch
In [5]: kill(1)

In [6]: ls()
ID    SLURM_JOB_ID EXP        RUN   NODE                                PORT  STATUS 
  • No labels