You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 124 Next »

Introduction

A general introduction to LCLS data analysis (psana1, ami2, jupyter, slurm) can be viewed in this 53 minutes video here.  This is for psana1 (i.e. LCLS1) but the user interface is quite similar to psana2 (for LCLS2) even if the internals are significantly improved for psana2:

video1502222460.mp4

JupyterHub

psana2 is also available in JupyterHub here in the kernel named "LCLS-II py3": https://pswww.slac.stanford.edu/jupyterhub/hub/home

Environment

To obtain the environment to run psana2, execute the following:

source /cds/sw/ds/ana/conda2/manage/bin/psconda.sh

Note that LCLS-II psana is not compatible with LCLS-I psana, so environments must activate one or the other, but not both.  Since LCLS-I and LCLS-II live in different conda installations it is necessary to start a fresh session when switching between the two.

Public Practice Data

Publicly accessible practice data is located in the directory /cds/data/psdm/prj/public01/xtc.  Use of this data requires the additional "dir=" keyword to the DataSource object.

ExperimentRunComment
tmoc00118222Generic TMO dark data
rixx4351834A DAQ "fly scan" of motor (see ami#FlyScan:MeanVs.ScanValue)
rixx4351845A DAQ "step scan" of two motors
rixl101332063Rix stepping delay scan of both Vitara delay and ATM delay stage (lxt_ttc scan) at a single mono energy
rixl101332093Rix continuous mono scan with laser on/off data at a single time delay
rixx100382155An infinite sequence with two slow andors running at different rates
rixx100382168A finite burst sequence with one andor


Detector Names

Use this command to see non-epics detector names (see "Detector Interface" example below):

(ps-4.1.0) psanagpu101:lcls2$ detnames exp=tmoc00118,run=222,dir=/cds/data/psdm/prj/public01/xtc
---------------------
Name      | Data Type
---------------------
epicsinfo | epicsinfo
timing    | raw      
hsd       | raw      
gmdstr0   | raw    
etc.  

From within python the above information can be obtained as a dictionary using "run.detinfo".

Use the same command with the "-e" option to see epics detector names (see "Detector Interface" example below).  These are slowly varying variables (like temperatures) that are not strongly time correlated with the regular per-shot detector data:

(ps-4.5.10) psanagpu101:~$ detnames -e exp=tmoc00118,run=222,dir=/cds/data/psdm/prj/public01/xtc | more
-----------------------------------------------------------------
Detector Name             | Epics Name                           
-----------------------------------------------------------------
StaleFlags                |                                      
Keithley_Sum              | EM2K0:XGMD:HPS:KeithleySum           
IM2K4_XrayPower           | IM2K4:PPM:SPM:VOLT_RBV               
IM3K4_XrayPower           | IM3K4:PPM:SPM:VOLT_RBV               
IM4K4_XrayPower           | IM4K4:PPM:SPM:VOLT_RBV               
etc.     

Note that either of these names ("Detector Name" or "Epics Name") can be used with the Detector Interface described below.  From within python the above information can be obtained as a dictionary using "run.epicsinfo".

Using the Detector Interface

Standard (per-shot) detectors and the slower epics variables can be accessed as shown here using the names discovered with the commands above.  NOTE: the slow epics variables are polled in the DAQ at 1Hz, so for the first one second of a run they will typically return "None".  NOTE: You can use tab-completion in ipython (see https://ipython.org/ipython-doc/3/interactive/tutorial.html) or the jupyter notebook to explore what you can do with the various detector objects.  NOTE: epics variables use a different syntax than other detectors (see example here):

from psana import DataSource
ds = DataSource(exp='tmoc00118', run=222, dir='/cds/data/psdm/prj/public01/xtc'\
, max_events=100)
myrun = next(ds.runs())
opal = myrun.Detector('tmo_atmopal')
epics_det = myrun.Detector('IM2K4_XrayPower')
for evt in myrun.events():
    img = opal.raw.image(evt)
    epics_val = epics_det(evt) # epics variables have a different syntax
    # check for missing data                                                    
    if img is None:
        print('no image')
    else:
        print(img.shape)
    if epics_val is None:
        print('no epics value')
    else:
        print(epics_val)


Example Script Producing Small HDF5 File

You can run this script with MPI: mpirun -n 6 python example.py

It also works on one core with: python example.py (useful for debugging).  See MPI rank/task diagram here to understand what different mpi ranks are doing.

This mechanism by defaults produces "aligned" datasets where missing values are padded (with NaN's for floats, and -99999 for integers).  To create an unaligned dataset (without padding) prefix the name of the variable with "unaligned_".

NOTE: In addition to the hdf5 you specify as your output file ("my.h5" below) you will see other h5 files like "my_part0.h5", one for each of the cores specified in PS_SRV_NODES.  The reason for this is that each of those SRV cores writes its own my_partN.h5 file: for LCLS2 it will be important for performance to write many files.  The "my.h5" file is actually quite small, and uses a new HDF5 feature called a "Virtual DataSet" (VDS) to join together the various my_partN.h5 files.  Also note that events in my.h5 will not be in time order.  If you copy the .h5 somewhere else, you need to copy all of them.

NOTE: In python, if you want to exit early you often use a "break" statement.  When running psana-python with mpi parallelization, however, not all cores will see this statement, and the result will be that your job will hang at the end.  To avoid this use the "max_events" keyword argument to DataSource (see example below).  An example of the failure mode is if an EB core communicates with only 1 BD core and that BD core exits, then the EB core gets stuck and doesn't exit (see MPITaskStructureToSupportScaling).  MPI requires all cores to exit gracefully for a job to exit.  The max_events kwarg allows the SMD0 core to shut everything down.

NOTE: The call below to smd.sum() (or min()/max()) only accepts numpy arrays or None.  This requirement allows cores that don't see any events to not cause errors when summing across cores.  All cores must call .sum() or the process will hang (a property of the underlying mpi "Reduce" call).

NOTE: The smd.event() and smd.save_summary() calsl (for persisting data to hdf5 or passing it to an SRV callback) can be passed either a series of kwargs or a dictionary.  The dictionary can optionally be hierarchical (e.g. d['mykey1']['mykey2']=value) and those keys will be reflected in the hdf5 dataset structure.  This allows users to organize hdf5 data with a structure that they prefer. 

from psana import DataSource
import numpy as np
import os

# OPTIONAL callback with "gathered" small data from all cores.
# usually used for creating realtime plots when analyzing from
# DAQ shared memory. Called back on each SRV node.
def my_smalldata(data_dict):
    print(data_dict)

# sets the number of h5 files to write. 1 is sufficient for 120Hz operation
# optional: only needed if you are saving h5.
os.environ['PS_SRV_NODES']='1'

ds = DataSource(exp='tmoc00118', run=222, dir='/cds/data/psdm/prj/public01/xtc', max_events=10)
# batch_size is optional. specifies how often the dictionary of small
# user data is gathered. if you write out large data (NOT RECOMMENDED) it needs to be set small.
smd = ds.smalldata(filename='mysmallh5.h5', batch_size=5, callbacks=[my_smalldata])

for run in ds.runs():
    opal = run.Detector('tmo_opal1')
    ebeam = run.Detector('ebeam')

    runsum  = np.zeros((3),dtype=float) # beware of datatypes when summing: can overflow
    for evt in run.events():
        img = opal.raw.image(evt)
        photonEnergy = ebeam.raw.ebeamPhotonEnergy(evt)
        # important: always check for missing data
        if img is None or photonEnergy is None: continue
        evtsum = np.sum(img)
        # pass either dictionary or kwargs
        smd.event(evt, evtsum=evtsum, photonEnergy=photonEnergy)
        runsum += img[0,:3] # local sum on one mpi core
 
    # optional summary data for whole run
    if smd.summary:
        tot_runsum = smd.sum(runsum) # sum (or max/min) across all mpi cores. Must be numpy array or None.
        # pass either dictionary or kwargs.
        smd.save_summary({'sum_over_run' : tot_runsum}, summary_int=1)
    smd.done()

Full-Featured Example with Callbacks and Detector Selection

You can run this script with MPI the same way as shown in the previous example: mpirun -n 6 python example.py

These additional arguments for DataSource were added to this example

  • detectors=['detname1', 'detname2',]

           List of detectors to be read from the disk. If you only need a few detectors for analysis, list their names here. The reading process will be faster since unused detector data is not read.

  • smd_callback=smd_callback

           You can use this callback to 1) decide which which event to read (yield evt) the large event data and 2) set the destination (rank id) to send this event to. 

  • small_xtc=['detname1', 'detname2']

           List of detectors to be used in smd_callback()

from psana import DataSource
import numpy as np
import os

# OPTIONAL callback with "gathered" small data from all cores.
# usually used for creating realtime plots when analyzing from
# DAQ shared memory. Called back on each SRV node.
def my_smalldata(data_dict):
    print(data_dict)  

# Use this function to decide if you want to fetch large data for this event  
# and/or direct an event to process on a particular 'rank' 
# (this rank number should be between 1 and total no. of ranks - 3 
# since 3 ranks are reserved). If this detector is needed, make sure 
# to define this detector in as_smds argument for DataSource (see below).
# All epics and scan detectors are available automatically.
def smd_callback(run):
    opal = run.Detector('tmo_opal1')
    epics_det = run.Detector('IM2K4_XrayPower')

    n_bd_nodes = 3 # for mpirun -n 6, 3 ranks are reserved so there are 3 bd ranks left

    for i_evt, evt in enumerate(run.events()):
        img = opal.raw.image(evt)
        epics_val = epics_det(evt)
        dest = (evt.timestamp % n_bd_nodes) + 1

        if epics_val is not None:
            # Set the destination (rank no.) where this event should be sent to
            evt.set_destination(dest)
            yield evt

# sets the number of h5 files to write. 1 is sufficient for 120Hz operation
# optional: only needed if you are saving h5.
os.environ['PS_SRV_NODES']='1'

ds = DataSource(exp='tmoc00118', run=222, dir='/cds/data/psdm/prj/public01/xtc',
        max_events  = 10,
        detectors   = ['epicsinfo', 'tmo_opal1', 'ebeam'],  # only reads these detectors (faster)
        smd_callback= smd_callback,                         # smalldata callback (see notes above)
        small_xtc   = ['tmo_opal1'],                        # detectors to be used in smalldata callback
        )

# batch_size is optional. specifies how often the dictionary of small
# user data is gathered.  if you write out large data (NOT RECOMMENDED) it needs to be set small.
smd = ds.smalldata(filename='mysmallh5.h5', batch_size=5, callbacks=[my_smalldata])

for run in ds.runs():
    opal = run.Detector('tmo_opal1')
    ebeam = run.Detector('ebeam')

    runsum  = np.zeros((3),dtype=float) # beware of datatypes when summing: can overflow
    for evt in run.events():
        img = opal.raw.image(evt)
        photonEnergy = ebeam.raw.ebeamPhotonEnergy(evt)
        if img is None or photonEnergy is None: continue
        evtsum = np.sum(img)
        # pass either dictionary or kwargs
        smd.event(evt, evtsum=evtsum, photonEnergy=photonEnergy)
        runsum += img[0,:3] # local sum on one mpi core
 
    # optional summary data for whole run
    if smd.summary:
        tot_runsum = smd.sum(runsum) # sum (or max/min) across all mpi cores. Must be numpy array or None.
        # pass either dictionary or kwargs
        smd.save_summary({'sum_over_run' : tot_runsum}, summary_int=1)
    smd.done()

Excluding Detectors

In addition to only including specified detectors with the "detectors=[]" kwarg (see previous section) you can exclude detectors when creating a new datasource with xdetectors=[] argument.

# Create a datasource and tell it to exclude two detectors
from psana import DataSource
ds = DataSource(exp='tmoc00118', run=222, dir='/cds/data/psdm/prj/public01/xtc',
        xdetectors  = ['hsd'],      # all other detectors will be available
        max_events  = 10)
 
 
run = next(ds.runs())

# Create these detectors normally 
opal = run.Detector('tmo_opal1')
ebeam = run.Detector('ebeam')
for i, evt in enumerate(run.events()):
    img = opal.raw.image(evt)
    photonEnergy = ebeam.raw.ebeamPhotonEnergy(evt)
    print(f'got evt={i} ts={evt.timestamp} img={img.shape} {photonEnergy=}')

Note that other detectors that are also part of the excluded stream will also be excluded. You'll see a warning message like this one:

Warning: Stream-8 has one or more detectors matched with the excluded set. All detectors in this stream will be excluded.

If you're trying to access detectors that are part of the excluded list, you'll see DetectorNameError  message below:

Traceback (most recent call last):
  File "/cds/home/m/monarin/lcls2/psana/psana/tests/dummy.py", line 12, in <module>
    ebeam = run.Detector('ebeam')
  File "/cds/home/m/monarin/lcls2/psana/psana/psexp/run.py", line 119, in Detector
    raise DetectorNameError(err_msg)
psana.psexp.run.DetectorNameError: No available detector class matched with ebeam. If this is a new detector/version, make sure to add new class in detector folder.

Accessing Event Codes

LCLS1-style event codes can be accessed using the "timing" detector:

from psana import DataSource
ds = DataSource(exp='tmoc00118',run=222,dir='/cds/data/psdm/prj/public01/xtc')
myrun = next(ds.runs())
timing = myrun.Detector('timing')
for nevt,evt in enumerate(myrun.events()):
    allcodes = timing.raw.eventcodes(evt)
    # event code 15 fires at 1Hz, and this exp/run had 10Hz triggers            
    print('event code 15 present:',allcodes[15])
    if nevt>20: break

Running in Parallel

Example of slurm scripts to submit are here:  Submitting SLURM Batch Jobs.  Instructions for submitting batch jobs at S3DF to run in parallel are here: https://confluence.slac.stanford.edu/display/PCDS/Running+at+S3DF.

Analyzing Scans

To get a list of the scan variables, use the following command:

(ps-4.3.2) psanagpu102:lcls2$ detnames -s exp=rixx43518,run=45,dir=/cds/data/psdm/prj/public01/xtc
--------------------------
Name           | Data Type
--------------------------
motor1         | raw      
motor2         | raw      
step_value     | raw      
step_docstring | raw      
--------------------------
(ps-4.3.2) psanagpu102:lcls2$  

From within python the above information can be obtained as a dictionary using "run.scaninfo".

Note that "step_value"/"step_docstring" are convenience values defined by the DAQ to be a single value/string that is supposed to represent what has happened on a step, for use in plots/printout, since it's not easy to plot, for example, vs. multiple motor positions in a complex scan.

Code similar to this can be used to access the above scan variables:

from psana import DataSource
ds = DataSource(exp='rixx43518',run=45,dir='/cds/data/psdm/prj/public01/xtc')
myrun = next(ds.runs())
motor1 = myrun.Detector('motor1')
motor2 = myrun.Detector('motor2')
step_value = myrun.Detector('step_value')
step_docstring = myrun.Detector('step_docstring')
for step in myrun.steps():
    print(motor1(step),motor2(step),step_value(step),step_docstring(step))
    for evt in step.events():
        pass

Running From Shared Memory

psana2 scripts can be run in real-time on shared memory.  There is some extra complexity compared to writing python code for the real-time GUI ami, for example:

  • you have to create plots and handle when when they update
  • you have to worry about data getting out of time order
  • you have to handle missing data
  • you have to reset plots on run boundaries (if that's the behavior you want)
  • you have to write code to handle the data gathered from multiple mpi cores (not required if you want to run on 1 core)
  • be aware that the shared memory python scripts "steal" event statistics from other shared memory instances (e.g. ami)

but you get to take advantage of the power/flexibility of python.  Look at the DAQ .cnf file (e.g. /cds/group/pcds/dist/pds/rix/scripts/rix.cnf) to see what the name of the node is running the shared memory server ("monReqServer").  To access those nodes you need to be on a special permissions list (email pcds-ana-l@slac.stanford.edu to request).  You can find the name of the shared memory (hutch name is typically used) either by looking on the .cnf file (the "-P" option to monReqServer executable) or doing a command like this:

drp-neh-cmp003:~$ ls /dev/shm/
PdsMonitorSharedMemory_tmo
drp-neh-cmp003:~$

For this output, you would use "DataSource(shmem='tmo')".

Note that one can develop the shared memory scripts (including real-time plots) using offline data, then change the DataSource line to run them in real-time.

Typically psmon is used for publishing results to realtime plots in the callback, publishing updates every "N" events.  See this link for psmon examples: Visualization Tools.

When running multi-core with mpi one has to use the small data "callbacks" kwarg to receive the data gathered from all nodes.  An example multi-core script is below (also works on 1 core).  A pure 1-core script is simpler (no data "gathering" needed: can just be a loop over events with plot updates every N events).  For multi-core, this can be run with "mpirun -n 3 python <scriptname>.py" and the two plots can be viewed on the same node with "psplot -a 1 OPALSUMS OPALIMG" (see Visualization Tools for "psplot" options).  It can also be run in the usual offline manner by changing the DataSource line.  It is written in a way to keep running even when the DAQ is restarted:

import os
import numpy as np
from psana import DataSource
from psmon import publish
from psmon.plots import XYPlot,Image
from collections import deque

from mpi4py import MPI
numworkers = MPI.COMM_WORLD.Get_size()-1
if numworkers==0: numworkers=1 # the single core case (no mpi)

os.environ['PS_SRV_NODES']='1' # one mpi core gathers/plots data

mydeque=deque(maxlen=25)
lastimg=None
numevents=0
numendrun=0

def my_smalldata(data_dict): # one core gathers all data from mpi workers
    global numevents,lastimg,numendrun,mydeque
    if 'endrun' in data_dict:
        numendrun+=1
        if numendrun==numworkers:
            print('Received endrun from all workers. Resetting data.')
            numendrun=0
            numevents=0
            mydeque=deque(maxlen=25)
        return
    numevents += 1
    if 'opal' in data_dict:
        lastimg = data_dict['opal']
    mydeque.append(data_dict['opalsum'])
    if numevents%100==0: # update plots around 1Hz
        print('event:',numevents)
        myxyplot = XYPlot(numevents, "Last 25 Sums", np.arange(len(mydeque)), np
.array(mydeque), formats='o')
        publish.send("OPALSUMS", myxyplot)
        if lastimg is not None: # opal image is not sent all the time
            myimgplot = Image(numevents, "Opal Image", lastimg)
            publish.send("OPALIMG", myimgplot)

while 1: # mpi worker processes
    ds = DataSource(shmem='rix')
    smd = ds.smalldata(batch_size=5, callbacks=[my_smalldata])
    for myrun in ds.runs():
        opal = myrun.Detector('atmopal')
        for nevt,evt in enumerate(myrun.events()):
            mydict={}
            image = opal.raw.image(evt)
            if image is None: continue
            # do as much work as possible in the workers
            # don't send large data all the time, if possible
            if nevt%10==0: mydict['opal']=image
            mydict['opalsum']=np.sum(image)
            smd.event(evt,mydict)
        smd.event(evt,{'endrun':1}) # tells gatherer to reset plots

When running multi-node mpi there are also some complexities propagating the environment to remote nodes: the way to address that is described in this link.  

Running in LIVE mode

Here's a sample python script, how you can config datasource to run in live mode:

livemode.py
# Use environment variable to specify how many attempts,
# the datasource should wait for file reading (1 second wait).
# In this example, we set it to 30 (wait up 30 seconds).
import os
os.environ['PS_SMD_MAX_RETRIES'] = '30'


# Create a datasource with live flag
from psana import DataSource
ds = DataSource(exp='tmoc00118', run=222, dir='/cds/data/psdm/prj/public01/xtc', 
        live        = True,
        max_events  = 10)


# Looping over your run and events as usual
# You'll see "wait for an event..." message in case
# The file system writing is slower than your analysis
run = next(ds.runs())
for i, evt in enumerate(run.events()):
    print(f'got evt={i} ts={evt.timestamp}')

Note that this script is also available in tests folder in lcls2 repository. You can run the script by:

(ps-4.3.2) psanagpu109 tests $ mpirun -n 3 python livemode.py

Jump to events

Psana2 can jump to particular events when given a list of timestamps. In the example below, we only want to process these four timestamps. Note that timestamp array should be formatted to np.uint64.

test_jump.py
from psana import DataSource
import numpy as np
timestamps = np.array([4194783241933859761,4194783249723600225,4194783254218190609,4194783258712780993], dtype=np.uint64)
ds = DataSource(exp='tmoc00118', run=222, dir='/cds/data/psdm/prj/public01/xtc',
        timestamps=timestamps)
myrun = next(ds.runs())
opal = myrun.Detector('tmo_atmopal')
for nevt, evt in enumerate(myrun.events()):
    img = opal.raw.image(evt)
    print(nevt, evt.timestamp, img.shape)

Here is the output

(ps-4.5.5) monarin@psanagpu111 (master *) psana2 $ python test_jump.py 
0 4194783241933859761 (1024, 1024)
1 4194783249723600225 (1024, 1024)
2 4194783254218190609 (1024, 1024)
3 4194783258712780993 (1024, 1024)

Event Times In Local Time Zone

NOTE: the method evt.datetime().timestamp() can also be used to generate more natural timestamps (e.g. for use in plots).

import datetime as dt
from psana import DataSource
ds = DataSource(exp='tmoc00118', run=222, dir='/cds/data/psdm/prj/public01/xtc')
myrun = next(ds.runs())
for nevt,evt in enumerate(myrun.events()):
    if nevt>3: break
    t = evt.datetime()
    localt = t.replace(tzinfo=dt.timezone.utc).astimezone(tz=None)    
    print(localt.strftime('%H:%M:%S'))

Running with Integrating Detector

Here's a sample python script, how you can analyze a run with an integrating detector (in this example, it's 'andor'). For more detail about the technical design, visit this page.

Two keys variables when running in integrating-detector mode: PS_SMD_N_EVENTS (and environment variable) which is how many events SMD0 will send to each EB core, and the batch_size kwarg which controls how many events the EB cores send to the BD cores.  When running integrating detectors then the "units" of these two variables change from being "high-rate events" to being "low-rate integrating-detector events".  batch_size should be set to less than PS_SMD_N_EVENTS.  The max_events kwarg to DataSource also changes to count integrating-detector events.  Unfortunately, the batch_size kwarg to the DataSource.smalldata() call does NOT count integrating-detector events.  We would like to simplify this in the future.

NOTE: part of psana ("SMD0") has buffers that need to be large enough to accommodate at least one event from the integrating detector.  If PS_SMD_N_EVENTS is set too large and this is not possible then an error message is printed and psana exits.

intg_det.py
# The PS_SMD_N_EVENTS should be set to a small number (e.g. 1)
# since all other events which are part of this intg. event will be sent
# in the same batch.

import os
os.environ['PS_SMD_N_EVENTS'] = '1'
batch_size = 1

from psana import DataSource
ds = DataSource(exp='xpptut15', run=1, dir='/cds/data/psdm/prj/public01/xtc/intg_det',
        intg_det='andor',         
        batch_size=batch_size)
run = next(ds.runs())
hsd = run.Detector('hsd')
andor = run.Detector('andor')

# Test calculating sum of the hsd for each integrating event.
sum_hsd = 0
for i_evt, evt in enumerate(run.events()):
    hsd_calib = hsd.raw.calib(evt)
    andor_calib = andor.raw.calib(evt)

    # Keep summing the value of the other detector (hsd in this case)
    sum_hsd += np.sum(hsd_calib[:])/np.prod(hsd_calib.shape)
    
    # When an integrating event is found, print out and reset the sum variable
    if andor_calib is not None:
        val_andor = np.sum(andor_calib[:])/np.prod(andor_calib.shape)
        print(f'i_evt: {i_evt} andor: {val_andor} sum_hsd:{sum_hsd}')
        sum_hsd = 0

RIX Example: Deadtime and Integrating Detectors

Sometimes data for a particular shot can not be read out because DAQ data buffers are unavailable.  This is called "deadtime".  Deadtime can make it impossible to reliably use high-rate shots to perform operations like normalizations for lower-rate integrating detectors.  Matt Weaver is adding counters to the timing system so the DAQ can count how many shots whose data has been dropped due to deadtime so experimenters can make a decision about whether there is sufficient data for a normalization-style analysis.

This example demonstrates (conceptually!) how to do the normalization, and view the results in a psmon plot (use the command "psplot ANDOR" on the same node).  Run this script either with "python andor.py" or "mpirun -n 5 andor.py":

from psana import DataSource
from psmon import publish
from psmon.plots import Image,XYPlot
import os
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

# The PS_SMD_N_EVENTS should be set to a small number (e.g. 1)
# since all other events which are part of this intg. event will be sent
# in the same batch.
os.environ['PS_SMD_N_EVENTS'] = '1'
os.environ['PS_SRV_NODES']='1'

if rank==4: # hack for now to eliminate use of publish.local below
    publish.init()

# we will remove this for batch processing and use "psplot" instead
# publish.local = True

def my_smalldata(data_dict):
    if 'unaligned_andor_norm' in data_dict:
        andor_norm = data_dict['unaligned_andor_norm'][0]
        myplot = XYPlot(0,"Andor (normalized)",range(len(andor_norm)),andor_norm
)
        publish.send('ANDOR',myplot)

ds = DataSource(exp='rixx1003821',run=46,dir='/cds/data/psdm/prj/public01/xtc',intg_det='andor_vls',batch_size=1)
for myrun in ds.runs():
    andor = myrun.Detector('andor_vls')
    timing = myrun.Detector('timing')
    smd = ds.smalldata(filename='mysmallh5.h5',batch_size=1000, callbacks=[my_smalldata])
    norm = 0
    ndrop_inhibit = 0
    for nstep,step in enumerate(myrun.steps()):
        print('step:',nstep)
        for nevt,evt in enumerate(step.events()):
            andor_img = andor.raw.value(evt)
            # also need to check for events missing due to damage
            # (or compare against expected number of events)
            ndrop_inhibit += timing.raw.inhibitCounts(evt)
            smd.event(evt, mydata=nevt) # high rate data saved to h5
            # need to check Matt's new timing-system data on every
            # event to make sure we haven't missed normalization
            # data due to deadtime
            norm+=nevt # fake normalization
            if andor_img is not None:
                print('andor data on evt:',nevt,'ndrop_inhibit:',ndrop_inhibit)
                # check that the high-read readout group (2) didn't
                # miss any events due to deadtime
                if ndrop_inhibit[2]!=0: print('*** data lost due to deadtime')
                # need to prefix the name with "unaligned_" so
                # the low-rate andor dataset doesn't get padded
                # to align with the high rate datasets
                smd.event(evt, mydata=nevt,
                          unaligned_andor_norm=(andor_img/norm))
                norm=0
                ndrop_inhibit=0

AMI and Integrating Detectors

AMI is also affected by the DAQ deadtime issue described above but, in addition, events can also be thrown away going into the AMI shared memory (this feature is what allows AMI analysis to stay real-time).  This can cause additional issues with AMI integrating-detector normalization analyses that try to use high-rate shots to do normalizations for low-rate integrating detectors, since not all high-rate shots are guaranteed to be available.  Also, unlike psana, AMI doesn't currently have the idea of shots that "belong together" which will make an AMI normalization-style analysis of an integrating detector impossible.

MPI Task Structure To Support Scaling

psana can scale to allow for high rate analysis.  For example, many hdf5 files of small user-defined data (described above in Example Script Producing Small HDF5 File) can be written, one per "SRV" node in the diagram below.  The total number of SRV nodes is defined by the environment variable PS_SRV_NODES (defaults to 0).  These many hdf5 files are joined by psana into what appears to be one file using the hdf5 "virtual dataset" feature.  Similarly, multiple nodes can be used for filtering ("EB" nodes in the diagram below) and multiple nodes can be used to process big data in the main psana event loop ("BD" nodes in the digram below).  The one piece that cannot be scaled (currently) to multiple nodes is the SMD0 (SMallData) task, which reads the timestamps and fseek offsets from each tiny .smd.xtc2 file produced by the DAQ (typically one per detector, or one per detector segment, although it can contain more than one segment or detector).  This task joins together the relevant data for each shot ("event build") using the timestamp.  This SMD0 task is multi-threaded, with one thread for each detector.  For highest performance it is important that all SMD0 threads be allocated an entire MPI node.

Running a large job

Below shows how to setup a slurm job script to run a large job. This script uses setup_hosts_openmpi.sh  (also provided below) to assign a single node to SMD0 (see diagram above) and distribute all other tasks (EB, BD, & SRV) to the rest of available nodes. After source setup_hosts_openmpi.sh , you can use $PS_N_RANKS and $PS_HOST_FILE in your mpirun command. 


Note on no. of ranks available: on S3DF, there are 120 cores per compute node. The below example asks for 3 nodes (120 x 3 = 360 ranks in total) will only have 120 x (3 - 1) + 1 = 241 ranks available. This is because all 120 ranks on the first node is fully reserved for SMD0. 

submit_large_psana2.sh
#!/bin/bash
#SBATCH --partition=milano
#SBATCH --job-name=run_large_psana2
#SBATCH --output=output-%j.txt
#SBATCH --error=output-%j.txt
#SBATCH --nodes=3
#SBATCH --exclusive
#SBATCH --time=10:00

# Configure psana2 parallelization
source setup_hosts_openmpi.sh

# Run your job
mpirun -np $PS_N_RANKS --hostfile $PS_HOST_FILE python test_mpi.py

Here is the `setup_hosts_openmpi.sh`. You can create this script in your job folder. 

setup_hosts_openmpi.sh
############################################################
# First node must be exclusive to smd0
# * For openmpi, slots=1 must be assigned to the first node.
############################################################

# Get list of hosts by expand shorthand node list into a 
# line-by-line node list
host_list=$(scontrol show hostnames $SLURM_JOB_NODELIST)
hosts=($host_list)

# Write out to host file by putting rank 0 on the first node
host_file="slurm_host_${SLURM_JOB_ID}"
for i in "${!hosts[@]}"; do
    if [[ "$i" == "0" ]]; then
        echo ${hosts[$i]} slots=1 > $host_file
    else
        echo ${hosts[$i]} >> $host_file
    fi
done

# Export hostfile for mpirun  
export PS_HOST_FILE=$host_file

# Calculate no. of ranks available in the job
export PS_N_RANKS=$(( SLURM_CPUS_ON_NODE * ( SLURM_JOB_NUM_NODES - 1 ) + 1 ))

Performance Tuning Tips

To get improved performance when running large jobs consider the following options.  It is not straightforward to set these parameters optimally for an arbitrary analysis job so some study is required for the particular application.

  • increase the environment variable PS_SMD_NODES to be larger than its default of 1.  For many analyses, a number that is 1/16 of the number of big data cores has been good
  • if you're writing a large amount of hdf5 data increase the environment variable PS_SRV_NODES to have more cores writing hdf5 files.  It is difficult here to provide guidance on the number since it depends on the application
  • set environment variable PS_SMD_N_EVENTS larger to increase the number of events that get sent in a "batch" when transmitting data from SMD0 cores through to BD cores
  • when setting up the smalldata, increase the number of events that get sent in a "batch" when transmitting data from BD cores to SRV cores by setting the batch_size kwarg in the DataSource.smalldata() call.
  • No labels