Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

To obtain the environment to run psana2, execute the following:

Code Block
# For S3DF users (sdfiana nodes)
source /sdf/group/lcls/ds/ana/sw/conda2/manage/bin/psconda.sh

# For PCDS users (psana nodes)
source /cds/sw/ds/ana/conda2/manage/bin/psconda.sh

...

Publicly accessible practice data is located in S3DF in the directory /sdf/data/lcls/ds/prj/public01/xtc.  Use of this data requires the additional "dir=/sdf/data/lcls/ds/prj/public01/xtc" keyword argument to the DataSource object.

ExperimentRunComment
tmoc00118222Generic TMO dark data
tmoacr0194,5,6xtcav dark,lasing-off,lasing-on (cpo thinks, not certain)
rixx4351834A DAQ "fly scan" of motor (see ami#FlyScan:MeanVs.ScanValue)
rixx4351845A DAQ "step scan" of two motors
rixl101332063Rix stepping delay scan of both Vitara delay and ATM delay stage (lxt_ttc scan) at a single mono energy
rixl101332093Rix continuous mono scan with laser on/off data at a single time delay
rixx100382155An infinite sequence with two slow andors running at different rates
rixx100382168A finite burst sequence with one andor
uedcom1037epix10ka data
ueddaq02569epix10ka data

...

Code Block
(ps-4.1.0) psanagpu101:lcls2$ detnames exp=tmoc00118,run=222,dir=/cdssdf/data/lcls/psdmds/prj/public01/xtc
---------------------
Name      | Data Type
---------------------
epicsinfo | epicsinfo
timing    | raw      
hsd       | raw      
gmdstr0   | raw    
etc.  

...

Code Block
(ps-4.5.10) psanagpu101:~$ detnames -e exp=tmoc00118,run=222,dir=/cdssdf/data/lcls/psdmds/prj/public01/xtc | more
-----------------------------------------------------------------
Detector Name             | Epics Name                           
-----------------------------------------------------------------
StaleFlags                |                                      
Keithley_Sum              | EM2K0:XGMD:HPS:KeithleySum           
IM2K4_XrayPower           | IM2K4:PPM:SPM:VOLT_RBV               
IM3K4_XrayPower           | IM3K4:PPM:SPM:VOLT_RBV               
IM4K4_XrayPower           | IM4K4:PPM:SPM:VOLT_RBV               
etc.     

...

Code Block
languagepy
from psana import DataSource
ds = DataSource(exp='tmoc00118', run=222, dir='/cdssdf/data/lcls/psdmds/prj/public01/xtc'\
, max_events=100)
myrun = next(ds.runs())
opal = myrun.Detector('tmo_atmopal')
epics_det = myrun.Detector('IM2K4_XrayPower')
for evt in myrun.events():
    img = opal.raw.image(evt)
    epics_val = epics_det(evt) # epics variables have a different syntax
    # check for missing data                                                    
    if img is None:
        print('no image')
    else:
        print(img.shape)
    if epics_val is None:
        print('no epics value')
    else:
        print(epics_val)

...

Code Block
languagepy
from psana import DataSource
import numpy as np
import os

# OPTIONAL callback with "gathered" small data from all cores.
# usually used for creating realtime plots when analyzing from
# DAQ shared memory. Called back on each SRV node.
def my_smalldata(data_dict):
    print(data_dict)

# sets the number of h5 files to write. 1 is sufficient for 120Hz operation
# optional: only needed if you are saving h5.
os.environ['PS_SRV_NODES']='1'

ds = DataSource(exp='tmoc00118', run=222, dir='/cdssdf/data/psdmlcls/ds/prj/public01/xtc', max_events=10)
# batch_size is optional. specifies how often the dictionary of small
# user data is gathered. if you write out large data (NOT RECOMMENDED) it needs to be set small.
smd = ds.smalldata(filename='mysmallh5.h5', batch_size=5, callbacks=[my_smalldata])

for run in ds.runs():
    opal = run.Detector('tmo_opal1')
    ebeam = run.Detector('ebeam')

    runsum  = np.zeros((3),dtype=float) # beware of datatypes when summing: can overflow
    for evt in run.events():
        img = opal.raw.image(evt)
        photonEnergy = ebeam.raw.ebeamPhotonEnergy(evt)
        # important: always check for missing data
        if img is None or photonEnergy is None: continue
        evtsum = np.sum(img)
        # pass either dictionary or kwargs
        smd.event(evt, evtsum=evtsum, photonEnergy=photonEnergy)
        runsum += img[0,:3] # local sum on one mpi core
 
    # optional summary data for whole run
    if smd.summary:
        tot_runsum = smd.sum(runsum) # sum (or max/min) across all mpi cores. Must be numpy array or None.
        # pass either dictionary or kwargs.
        smd.save_summary({'sum_over_run' : tot_runsum}, summary_int=1)
    smd.done()

Full-Featured Example

...

: Callbacks

...

, Detector Selection and Variable Length Data

You can run this script with MPI the same way as shown in the previous example: mpirun -n 6 python example.py

NOTE: if you don't run with at least 6 cores, the destination selection code below will not function correctly.  Also, the destination selection code currently only works with 1 EB core (defined by the PS_SMD_NODES environment variable, which defaults to 1).

NOTE: variable length data names must have a "var_" prefix

These additional arguments for DataSource were added to this example

...

Code Block
languagepy
from psana import DataSource
import numpy as np
import os

# OPTIONAL callback with "gathered" small data from all cores.
# usually used for creating realtime plots when analyzing from
# DAQ shared memory. Called back on each SRV node.
def my_smalldata(data_dict):
    print(data_dict)  

# Event filtering and destination callback (runs on EB cores)
# Use this function to decide if you want to fetch large data for this event  
# and/or direct an event to process on a particular 'rank' 
# (this rank number should be between 1 and total no. of ranks - 3 
# since(6 3for ranks"mpirun are reserved-n 6") minus 3 since 3 ranks are reserved for SMD0, EB, SRV
# processes). If a thisnon-epics detector is needed in this routine, make sure to
# toadd definethe thisdetector detectorname in assmall_smdsxtc argumentkwarg for DataSource (see below).
# All epics and scan detectors are available automatically.
def smd_callback(run):
    opal = run.Detector('tmo_opal1')
    epics_det = run.Detector('IM2K4_XrayPower')

    n_bd_nodes = 3 # for mpirun -n 6, 3 ranks are reserved so there are 3 bdBigData ranks left

    for i_evt, evt in enumerate(run.events()):
        img = opal.raw.image(evt)
        epics_val = epics_det(evt)
        dest = (evt.timestamp % n_bd_nodes) + 1

        if epics_val is not None:
            # Set the destination (rank no.) where this event should be sent to
            evt.set_destination(dest)
            yield evt

# sets the number of h5 files to write. 1 is sufficient for 120Hz operation
# optional: only needed if you are saving h5.
os.environ['PS_SRV_NODES']='1'

ds = DataSource(exp='tmoc00118', run=222, dir='/cdssdf/data/lcls/psdmds/prj/public01/xtc',
        max_events  = 1040,
        detectors   = ['epicsinfo', 'tmo_opal1', 'ebeam'],  # only reads these detectors (faster)
        smd_callback= smd_callback,                         # smalldataevent-filtering/destination callback (see notes above)
        small_xtc   = ['tmo_opal1'],                        # detectors to be used in smalldata callback
        )

# batch_size is optional. specifies how often the dictionary of small
# user data is gathered.  if you write out large data (NOT RECOMMENDED) it needs to be set small.
smd = ds.smalldata(filename='mysmallh5.h5', batch_size=5, callbacks=[my_smalldata])

# used for variable-length data example
cnt = 0 
modulus = 4

for run in ds.runs():
    opal = run.Detector('tmo_opal1')
    ebeam = run.Detector('ebeam')

    runsum  = np.zeros((3),dtype=float) # beware of datatypes when summing: can overflow
    for evt in run.events():
        img = opal.raw.image(evt)
        photonEnergy = ebeam.raw.ebeamPhotonEnergy(evt)
        if img is None or photonEnergy is None: continue
        evtsum = np.sum(img)
        # pass either dictionary or kwargs
        smd.event(evt, evtsum=evtsum, photonEnergy=photonEnergy)
        runsum += img[0,:3] # local sum on one mpi core
 
    # optional summary data for# wholean run
example of variable-length data if smd.summary:
(must have "var_" name prefix)
        if cnt % modulus:
            tot_runsumx = smdnp.sumarange(runsumcnt%modulus)
 # sum (or max/min) across all mpi cores. Must be numpy arrayy or None.
     = [[x+1, (x+1)**2] for x in range(cnt%modulus)]
   # pass either dictionary or kwargs
        smd.save_summary(event(evt, {'sumvar_over_runtest' : tot_runsum}, summary_int=1{ 'x': x, 'y': y }})
    smd.done()

Excluding Detectors

In addition to only including specified detectors with the "detectors=[]" kwarg (see previous section) you can exclude detectors when creating a new datasource with xdetectors=[] argument.

Code Block
languagepy
# Create a datasource and tell it to exclude two detectors
from psana import DataSource
ds = DataSource(exp='tmoc00118', run=222, dir='/cds/data/psdm/prj/public01/xtc',
       else:
            # Note, this works either way, either not sending anything or
     xdetectors  = ['hsd'],    # sending # all other detectors will0-length data.  It should be available
noted if there is *no*
     max_events  = 10)
 
 
run = next(ds.runs())

# Createdata thesein detectorsthe normallyentire 
opalrun, = run.Detector('tmo_opal1')
ebeam = run.Detector('ebeam')
for i, evt in enumerate(run.events()):the var_array is *not* written to the
            # output!
            pass
    img = opal.raw.image(evt)
    photonEnergy = ebeam#smd.raw.ebeamPhotonEnergyevent(evt)
    print(f'got evt={i} ts={evt.timestamp} img={img.shape} {photonEnergy=}')

Note that other detectors that are also part of the excluded stream will also be excluded. You'll see a warning message like this one:

Code Block
languagebash
Warning: Stream-8 has one or more detectors matched with the excluded set. All detectors in this stream will be excluded.

If you're trying to access detectors that are part of the excluded list, you'll see DetectorNameError  message below:

Code Block
languagebash
Traceback (most recent call last):
  File "/cds/home/m/monarin/lcls2/psana/psana/tests/dummy.py", line 12, in <module>
    ebeam = run.Detector('ebeam')
  File "/cds/home/m/monarin/lcls2/psana/psana/psexp/run.py", line 119, in Detector
    raise DetectorNameError(err_msg)
psana.psexp.run.DetectorNameError: No available detector class matched with ebeam. If this is a new detector/version, make sure to add new class in detector folder.

Accessing Event Codes

LCLS1-style event codes can be accessed using the "timing" detector:

Code Block
from psana import DataSource
ds = DataSource(exp='tmoc00118',run=222,dir='/cds/data/psdm/prj/public01/xtc')
myrun = next(ds.runs())
timing = myrun.Detector('timing')
for nevt,evt in enumerate(myrun.events()):
    allcodes = timing.raw.eventcodes(evt)
    # event code 15 fires at 1Hz, and this exp/run had 10Hz triggers            
    print('event code 15 present:',allcodes[15])
    if nevt>20: break

Running in Parallel

Example of slurm scripts to submit are here:  Submitting SLURM Batch Jobs.  Instructions for submitting batch jobs at S3DF to run in parallel are here: https://confluence.slac.stanford.edu/display/PCDS/Running+at+S3DF.

Analyzing Scans

To get a list of the scan variables, use the following command:

, {'var_test' : { 'x': [], 'y': [] }})
        cnt += 1

    # optional summary data for whole run
    if smd.summary:
        tot_runsum = smd.sum(runsum) # sum (or max/min) across all mpi cores. Must be numpy array or None.
        # pass either dictionary or kwargs
        smd.save_summary({'sum_over_run' : tot_runsum}, summary_int=1)
    smd.done()

The variable length data can be read using a script similar to this.  The variable-length data is stored as two datasets: one is a long array of appended values and the other is a set of per-event lengths.  Historically this approach has been more compute-efficient than the native hdf5 "ragged" arrays which often requires python-looping.  When reading the user must keep track of the lengths (called "idx" here):

Code Block
languagepy
import h5py
f = h5py.File("mysmallh5.h5", "r")
n1 = f['timestamp']
n2 = f['var_test_len']
n3 = f['var_test/x']
n4 = f['var_test/y']
print(len(n1), len(n2))
idx = 0
for i in range(len(n1)):
    print("%d --> %d" % (n1[i], n2[i]))
    for j in range(n2[i]):
        print( "    %s | %s" % (n3[idx+j], n4[idx+j]))
    print("")
    idx += n2[i]

Excluding Detectors

In addition to only including specified detectors with the "detectors=[]" kwarg (see previous section) you can exclude detectors when creating a new datasource with xdetectors=[] argument.

Code Block
languagepy
# Create a datasource and tell it to exclude two detectors
from psana import DataSource
ds = DataSource(exp='tmoc00118', run=222, dir='/sdf/data/lcls/ds/prj/public01/xtc',
Code Block
(ps-4.3.2) psanagpu102:lcls2$ detnames -s exp=rixx43518,run=45,dir=/cds/data/psdm/prj/public01/xtc
--------------------------
Name        xdetectors   | Data Type
--------------------------
motor1= ['hsd'],      # all other |detectors rawwill be available
    
motor2    max_events  = 10)
 
 |
run raw      
step_value     | raw      
step_docstring | raw      
--------------------------
(ps-4.3.2) psanagpu102:lcls2$  

From within python the above information can be obtained as a dictionary using "run.scaninfo".

Note that "step_value"/"step_docstring" are convenience values defined by the DAQ to be a single value/string that is supposed to represent what has happened on a step, for use in plots/printout, since it's not easy to plot, for example, vs. multiple motor positions in a complex scan.

Code similar to this can be used to access the above scan variables:

Code Block
from psana import DataSource
ds = DataSource(exp='rixx43518',run=45,dir='/cds/data/psdm/prj/public01/xtc')
myrun = next(ds.runs())
motor1 = myrun.Detector('motor1')
motor2 = myrun.Detector('motor2')
step_value = myrun.Detector('step_value')
step_docstring = myrun.Detector('step_docstring')
for step in myrun.steps():
    print(motor1(step),motor2(step),step_value(step),step_docstring(step))
    for evt in step.events():
        pass

Running From Shared Memory

psana2 scripts can be run in real-time on shared memory.  There is some extra complexity compared to writing python code for the real-time GUI ami, for example:

  • you have to create plots and handle when when they update
  • you have to worry about data getting out of time order
  • you have to handle missing data
  • you have to reset plots on run boundaries (if that's the behavior you want)
  • you have to write code to handle the data gathered from multiple mpi cores (not required if you want to run on 1 core)
  • be aware that the shared memory python scripts "steal" event statistics from other shared memory instances (e.g. ami)

but you get to take advantage of the power/flexibility of python.  Look at the DAQ .cnf file (e.g. /cds/group/pcds/dist/pds/rix/scripts/rix.cnf) to see what the name of the node is running the shared memory server ("monReqServer").  To access those nodes you need to be on a special permissions list (email pcds-ana-l@slac.stanford.edu to request).  You can find the name of the shared memory (hutch name is typically used) either by looking on the .cnf file (the "-P" option to monReqServer executable) or doing a command like this:

Code Block
drp-neh-cmp003:~$ ls /dev/shm/
PdsMonitorSharedMemory_tmo
drp-neh-cmp003:~$

For this output, you would use "DataSource(shmem='tmo')".

Note that one can develop the shared memory scripts (including real-time plots) using offline data, then change the DataSource line to run them in real-time.

Typically psmon is used for publishing results to realtime plots in the callback, publishing updates every "N" events.  See this link for psmon examples: Visualization Tools.

When running multi-core with mpi one has to use the small data "callbacks" kwarg to receive the data gathered from all nodes.  An example multi-core script is below (also works on 1 core).  A pure 1-core script is simpler (no data "gathering" needed: can just be a loop over events with plot updates every N events).  For multi-core, this can be run with "mpirun -n 3 python <scriptname>.py" and the two plots can be viewed on the same node with "psplot -a 1 OPALSUMS OPALIMG" (see Visualization Tools for "psplot" options).  It can also be run in the usual offline manner by changing the DataSource line.  It is written in a way to keep running even when the DAQ is restarted:

Code Block
languagepy
import os
import numpy as np
from psana import DataSource
from psmon import publish
from psmon.plots import XYPlot,Image
from collections import deque

from mpi4py import MPI
numworkers = MPI.COMM_WORLD.Get_size()-1
if numworkers==0: numworkers=1 # the single core case (no mpi)

os.environ['PS_SRV_NODES']='1' # one mpi core gathers/plots data

mydeque=deque(maxlen=25)
lastimg=None
numevents=0
numendrun=0

def my_smalldata(data_dict): # one core gathers all data from mpi workers
    global numevents,lastimg,numendrun,mydeque
    if 'endrun' in data_dict:
        numendrun+=1
        if numendrun==numworkers:
            print('Received endrun from all workers. Resetting data.')
            numendrun=0
            numevents=0
            mydeque=deque(maxlen=25)
        return
    numevents += 1
    if 'opal' in data_dict:
        lastimg = data_dict['opal']
    mydeque.append(data_dict['opalsum'])
    if numevents%100==0: # update plots around 1Hz
        print('event:',numevents)
        myxyplot = XYPlot(numevents, "Last 25 Sums", np.arange(len(mydeque)), np
.array(mydeque), formats='o')
        publish.send("OPALSUMS", myxyplot)
        if lastimg is not None: # opal image is not sent all the time
            myimgplot = Image(numevents, "Opal Image", lastimg)
            publish.send("OPALIMG", myimgplot)

while 1: # mpi worker processes
    ds = DataSource(shmem='rix')
    smd = ds.smalldata(batch_size=5, callbacks=[my_smalldata])
    for myrun in ds.runs():
        opal = myrun.Detector('atmopal')
        for nevt,evt in enumerate(myrun.events()):
            mydict={}
            image = opal.raw.image(evt)
            if image is None: continue
            # do as much work as possible in the workers
            # don't send large data all the time, if possible
            if nevt%10==0: mydict['opal']=image
            mydict['opalsum']=np.sum(image)
            smd.event(evt,mydict)
        smd.event(evt,{'endrun':1}) # tells gatherer to reset plots

When running multi-node mpi there are also some complexities propagating the environment to remote nodes: the way to address that is described in this link.  

Running in LIVE mode

Here's a sample python script, how you can config datasource to run in live mode:

Code Block
languagepy
titlelivemode.py
# Use environment variable to specify how many attempts,
# the datasource should wait for file reading (1 second wait).
# In this example, we set it to 30 (wait up 30 seconds).
import os
os.environ['PS_SMD_MAX_RETRIES'] = '30'


# Create a datasource with live flag
from psana import DataSource
ds = DataSource(exp='tmoc00118', run=222, dir='/cds/data/psdm/prj/public01/xtc', 
        live        = True,
        max_events  = 10)


# Looping over your run and events as usual
# You'll see "wait for an event..." message in case
# The file system writing is slower than your analysis
run = next(ds.runs())
for i, evt in enumerate(run.events()):
    print(f'got evt={i} ts={evt.timestamp}')

Note that this script is also available in tests folder in lcls2 repository. You can run the script by:

Code Block
(ps-4.3.2) psanagpu109 tests $ mpirun -n 3 python livemode.py

Jump to events

Psana2 can jump to particular events when given a list of timestamps. In the example below, we only want to process these four timestamps. Note that timestamp array should be formatted to np.uint64.

Code Block
languagepy
titletest_jump.py
from psana import DataSource
import numpy as np
timestamps = np.array([4194783241933859761,4194783249723600225,4194783254218190609,4194783258712780993], dtype=np.uint64)
ds = DataSource(exp='tmoc00118', run=222, dir='/cds/data/psdm/prj/public01/xtc',
        timestamps=timestamps)
myrun = next(ds.runs())
opal = myrun.Detector('tmo_atmopal')
for nevt, evt in enumerate(myrun.events()):
    img = opal.raw.image(evt)
    print(nevt, evt.timestamp, img.shape)

Here is the output

Code Block
(ps-4.5.5) monarin@psanagpu111 (master *) psana2 $ python test_jump.py 
0 4194783241933859761 (1024, 1024)
1 4194783249723600225 (1024, 1024)
2 4194783254218190609 (1024, 1024)
3 4194783258712780993 (1024, 1024)

Event Times In Local Time Zone

NOTE: the method evt.datetime().timestamp() can also be used to generate more natural timestamps (e.g. for use in plots).

= next(ds.runs())

# Create these detectors normally 
opal = run.Detector('tmo_opal1')
ebeam = run.Detector('ebeam')
for i, evt in enumerate(run.events()):
    img = opal.raw.image(evt)
    photonEnergy = ebeam.raw.ebeamPhotonEnergy(evt)
    print(f'got evt={i} ts={evt.timestamp} img={img.shape} {photonEnergy=}')

Note that other detectors that are also part of the excluded stream will also be excluded. You'll see a warning message like this one:

Code Block
languagebash
Warning: Stream-8 has one or more detectors matched with the excluded set. All detectors in this stream will be excluded.

If you're trying to access detectors that are part of the excluded list, you'll see DetectorNameError  message below:

Code Block
languagebash
Traceback (most recent call last):
  File "/cds/home/m/monarin/lcls2/psana/psana/tests/dummy.py", line 12, in <module>
    ebeam = run.Detector('ebeam')
  File "/cds/home/m/monarin/lcls2/psana/psana/psexp/run.py", line 119, in Detector
    raise DetectorNameError(err_msg)
psana.psexp.run.DetectorNameError: No available detector class matched with ebeam. If this is a new detector/version, make sure to add new class in detector folder.

Accessing Event Codes

LCLS1-style event codes can be accessed using the "timing" detector:

Code Block
from psana import DataSource
ds = DataSource(exp='tmoc00118',run=222,dir='/sdf/data/lcls/ds/prj/public01/xtc')
myrun = next(ds.runs())
timing = myrun.Detector('timing')
for nevt,evt in enumerate(myrun.events()):
    allcodes = timing.raw.eventcodes(evt)
    # event code 15 fires at 1Hz, and this exp/run had 10Hz triggers            
    print('event code 15 present:',allcodes[15])
    if nevt>20: break

Running in Parallel

Example of slurm scripts to submit are here:  Submitting SLURM Batch Jobs.  Instructions for submitting batch jobs at S3DF to run in parallel are here: https://confluence.slac.stanford.edu/display/PCDS/Running+at+S3DF.

Analyzing Scans

To get a list of the scan variables, use the following command:

Code Block
(ps-4.3.2) psanagpu102:lcls2$ detnames -s exp=rixx43518,run=45,dir=/sdf/data/lcls/ds/prj/public01/xtc
--------------------------
Name           | Data Type
--------------------------
motor1         | raw      
motor2         | raw      
step_value     | raw      
step_docstring | raw      
--------------------------
(ps-4.3.2) psanagpu102:lcls2$  

From within python the above information can be obtained as a dictionary using "run.scaninfo".

Note that "step_value"/"step_docstring" are convenience values defined by the DAQ to be a single value/string that is supposed to represent what has happened on a step, for use in plots/printout, since it's not easy to plot, for example, vs. multiple motor positions in a complex scan.

Code similar to this can be used to access the above scan variables:

Code Block
from psana import DataSource
ds = DataSource(exp='rixx43518',run=45,dir='/sdf/data/lcls/ds/prj/public01/xtc')
myrun = next(ds.runs())
motor1 = myrun.Detector('motor1')
motor2 = myrun.Detector('motor2')
step_value = myrun.Detector('step_value')
step_docstring = myrun.Detector('step_docstring')
for step in myrun.steps():
    print(motor1(step),motor2(step),step_value(step),step_docstring(step))
    for evt in step.events():
        pass

Running From Shared Memory

psana2 scripts can be run in real-time on shared memory.  There is some extra complexity compared to writing python code for the real-time GUI ami, for example:

  • you have to create plots and handle when when they update
  • you have to worry about data getting out of time order
  • you have to handle missing data
  • you have to reset plots on run boundaries (if that's the behavior you want)
  • you have to write code to handle the data gathered from multiple mpi cores (not required if you want to run on 1 core)
  • be aware that the shared memory python scripts "steal" event statistics from other shared memory instances (e.g. ami)

but you get to take advantage of the power/flexibility of python.  Look at the DAQ .cnf file (e.g. /cds/group/pcds/dist/pds/rix/scripts/rix.cnf) to see what the name of the node is running the shared memory server ("monReqServer").  To access those nodes you need to be on a special permissions list (email pcds-ana-l@slac.stanford.edu to request).  You can find the name of the shared memory (hutch name is typically used) either by looking on the .cnf file (the "-P" option to monReqServer executable) or doing a command like this:

Code Block
drp-neh-cmp003:~$ ls /dev/shm/
PdsMonitorSharedMemory_tmo
drp-neh-cmp003:~$

For this output, you would use "DataSource(shmem='tmo')".

Note that one can develop the shared memory scripts (including real-time plots) using offline data, then change the DataSource line to run them in real-time.

Typically psmon is used for publishing results to realtime plots in the callback, publishing updates every "N" events.  See this link for psmon examples: Visualization Tools.

When running multi-core with mpi one has to use the small data "callbacks" kwarg to receive the data gathered from all nodes.  An example multi-core script is below (also works on 1 core).  A pure 1-core script is simpler (no data "gathering" needed: can just be a loop over events with plot updates every N events).  For multi-core, this can be run with "mpirun -n 3 python <scriptname>.py" and the two plots can be viewed on the same node with "psplot -a 1 OPALSUMS OPALIMG" (see Visualization Tools for "psplot" options).  It can also be run in the usual offline manner by changing the DataSource line.  It is written in a way to keep running even when the DAQ is restarted:

Code Block
languagepy
import os
import numpy as np
from psana import DataSource
from psmon import publish
from psmon.plots import XYPlot,Image
from collections import deque

from mpi4py import MPI
numworkers = MPI.COMM_WORLD.Get_size()-1
if numworkers==0: numworkers=1 # the single core case (no mpi)

os.environ['PS_SRV_NODES']='1' # one mpi core gathers/plots data

mydeque=deque(maxlen=25)
lastimg=None
numevents=0
numendrun=0

def my_smalldata(data_dict): # one core gathers all data from mpi workers
    global numevents,lastimg,numendrun,mydeque
    if 'endrun' in data_dict:
        numendrun+=1
        if numendrun==numworkers:
            print('Received endrun from all workers. Resetting data.')
            numendrun=0
            numevents=0
            mydeque=deque(maxlen=25)
        return
    numevents += 1
    if 'opal' in data_dict:
        lastimg = data_dict['opal']
    mydeque.append(data_dict['opalsum'])
    if numevents%100==0: # update plots around 1Hz
        print('event:',numevents)
        myxyplot = XYPlot(numevents, "Last 25 Sums", np.arange(len(mydeque)), np
.array(mydeque), formats='o')
        publish.send("OPALSUMS", myxyplot)
        if lastimg is not None: # opal image is not sent all the time
            myimgplot = Image(numevents, "Opal Image", lastimg)
            publish.send("OPALIMG", myimgplot)

while 1: # mpi worker processes
    ds = DataSource(shmem='rix')
    smd = ds.smalldata(batch_size=5, callbacks=[my_smalldata])
    for myrun in ds.runs():
        opal = myrun.Detector('atmopal')
        
Code Block
languagepy
import datetime as dt
from psana import DataSource
ds = DataSource(exp='tmoc00118', run=222, dir='/cds/data/psdm/prj/public01/xtc')
myrun = next(ds.runs())
for nevt,evt in enumerate(myrun.events()):
          if nevt>3: break
 mydict={}
            timage = evtopal.raw.datetimeimage(evt)
            if localtimage = t.replace(tzinfo=dt.timezone.utc).astimezone(tz=None)is None: continue
    
    print(localt.strftime('%H:%M:%S'))

Running with Integrating Detector

Here's a sample python script, how you can analyze a run with an integrating detector (in this example, it's 'andor'). For more detail about the technical design, visit this page.

Two keys variables when running in integrating-detector mode: PS_SMD_N_EVENTS (and environment variable) which is how many events SMD0 will send to each EB core, and the batch_size kwarg which controls how many events the EB cores send to the BD cores.  When running integrating detectors then the "units" of these two variables change from being "high-rate events" to being "low-rate integrating-detector events".  batch_size should be set to less than PS_SMD_N_EVENTS.  The max_events kwarg to DataSource also changes to count integrating-detector events.  Unfortunately, the batch_size kwarg to the DataSource.smalldata() call does NOT count integrating-detector events.  We would like to simplify this in the future.

       # do as much work as possible in the workers
            # don't send large data all the time, if possible
            if nevt%10==0: mydict['opal']=image
            mydict['opalsum']=np.sum(image)
            smd.event(evt,mydict)
        smd.event(evt,{'endrun':1}) # tells gatherer to reset plots

When running multi-node mpi there are also some complexities propagating the environment to remote nodes: the way to address that is described in this link.  

Running in LIVE mode

Here's a sample python script, how you can config datasource to run in live mode:NOTE: part of psana ("SMD0") has buffers that need to be large enough to accommodate at least one event from the integrating detector.  If PS_SMD_N_EVENTS is set too large and this is not possible then an error message is printed and psana exits.

Code Block
languagepy
titleintg_detlivemode.py
# The PS_SMD_N_EVENTS should be set to a small number (e.g. 1)
# since all other events which are part of this intg. event will be sent
# in the same batch.
Use environment variable to specify how many attempts,
# the datasource should wait for file reading (1 second wait).
# In this example, we set it to 30 (wait up 30 seconds).
import os
os.environ['PS_SMD_NMAX_EVENTSRETRIES'] = '130'
batch_size = 1



# Create a datasource with live flag
from psana import DataSource
ds = DataSource(exp='xpptut15tmoc00118', run=1222, dir='/cdssdf/data/psdmlcls/ds/prj/public01/xtc/intg_det', 
        intg_det='andor',live        = True,
        batchmax_size=batch_size)
run = next(ds.runs())
hsd = run.Detector('hsd')
andor = run.Detector('andor')

# Test calculating sum of the hsd for each integrating event.
sum_hsd = 0
for i_evtevents  = 10)


# Looping over your run and events as usual
# You'll see "wait for an event..." message in case
# The file system writing is slower than your analysis
run = next(ds.runs())
for i, evt in enumerate(run.events()):
    hsd_calib = hsd.raw.calib(evt)
    andor_calib = andor.raw.calib(evt)

    # Keep summing the value of the other detector (hsd in this case)
    sum_hsd += np.sum(hsd_calib[:])/np.prod(hsd_calib.shape)
    
    # When an integrating event is found, print out and reset the sum variable
    if andor_calib is not None:
        val_andor = np.sum(andor_calib[:])/np.prod(andor_calib.shape)
        print(f'i_evt: {i_evt} andor: {val_andor} sum_hsd:{sum_hsd}')
        sum_hsd = 0

RIX Example: Deadtime and Integrating Detectors

Sometimes data for a particular shot can not be read out because DAQ data buffers are unavailable.  This is called "deadtime".  Deadtime can make it impossible to reliably use high-rate shots to perform operations like normalizations for lower-rate integrating detectors.  Matt Weaver is adding counters to the timing system so the DAQ can count how many shots whose data has been dropped due to deadtime so experimenters can make a decision about whether there is sufficient data for a normalization-style analysis.

This example demonstrates (conceptually!) how to do the normalization, and view the results in a psmon plot (use the command "psplot ANDOR" on the same node).  Run this script either with "python andor.py" or "mpirun -n 5 andor.py":

print(f'got evt={i} ts={evt.timestamp}')

Note that this script is also available in tests folder in lcls2 repository. You can run the script by:

Code Block
(ps-4.3.2) psanagpu109 tests $ mpirun -n 3 python livemode.py

Jump to events

Psana2 can jump to particular events when given a list of timestamps. In the example below, we only want to process these four timestamps. Note that timestamp array should be formatted to np.uint64.

Code Block
languagepy
titletest_jump.py
from psana import DataSource
import numpy as np
timestamps = np.array([4194783241933859761,4194783249723600225,4194783254218190609,4194783258712780993], dtype=np.uint64)
ds = DataSource(exp='tmoc00118', run=222, dir='/sdf/data/lcls/ds/prj/public01/xtc',
        timestamps=timestamps)
myrun = next(ds.runs())
opal = myrun.Detector('tmo_atmopal')
for nevt, evt in enumerate(myrun.events()):
    img = opal.raw.image(evt)
    print(nevt, evt.timestamp, img.shape)

Here is the output

Code Block
(ps-4.5.5) monarin@psanagpu111 (master *) psana2 $ python test_jump.py 
0 4194783241933859761 (1024, 1024)
1 4194783249723600225 (1024, 1024)
2 4194783254218190609 (1024, 1024)
3 4194783258712780993 (1024, 1024)

Event Times In Local Time Zone

NOTE: the method evt.datetime().timestamp() can also be used to generate more natural timestamps (e.g. for use in plots).

Code Block
languagepy
import datetime as dt
from psana import DataSource
ds = DataSource(exp='tmoc00118', run=222, dir='/sdf/data/lcls/ds/prj/public01/xtc')
myrun = next(ds.runs())
for nevt,evt in enumerate(myrun.events()):
    if nevt>3: break
    t = evt.datetime()
    localt = t.replace(tzinfo=dt.timezone.utc).astimezone(tz=None)    
    print(localt.strftime('%H:%M:%S'))

Running with Integrating Detector

Here's a sample python script, how you can analyze a run with an integrating detector (in this example, it's 'andor'). For more detail about the technical design, visit this page.

Two keys variables when running in integrating-detector mode: PS_SMD_N_EVENTS (and environment variable) which is how many events SMD0 will send to each EB core, and the batch_size kwarg which controls how many events the EB cores send to the BD cores.  When running integrating detectors then the "units" of these two variables change from being "high-rate events" to being "low-rate integrating-detector events".  batch_size should be set to less than PS_SMD_N_EVENTS.  The max_events kwarg to DataSource also changes to count integrating-detector events.  Unfortunately, the batch_size kwarg to the DataSource.smalldata() call does NOT count integrating-detector events.  We would like to simplify this in the future.

NOTE: part of psana ("SMD0") has buffers that need to be large enough to accommodate at least one event from the integrating detector.  If PS_SMD_N_EVENTS is set too large and this is not possible then an error message is printed and psana exits.

Code Block
languagepy
titleintg_det.py
# The PS_SMD_N_EVENTS should be set to a small number (e.g. 1)
# since all other events which are part of this intg. event will be sent
# in the same batch.

import os
os.environ['PS_SMD_N_EVENTS'] = '1'
batch_size = 1

from psana import DataSource
Code Block
languagepy
from psana import DataSource
from psmon import publish
from psmon.plots import Image,XYPlot
import os
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

# The PS_SMD_N_EVENTS should be set to a small number (e.g. 1)
# since all other events which are part of this intg. event will be sent
# in the same batch.
os.environ['PS_SMD_N_EVENTS'] = '1'
os.environ['PS_SRV_NODES']='1'

if rank==4: # hack for now to eliminate use of publish.local below
    publish.init()

# we will remove this for batch processing and use "psplot" instead
# publish.local = True

def my_smalldata(data_dict):
    if 'unaligned_andor_norm' in data_dict:
        andor_norm = data_dict['unaligned_andor_norm'][0]
        myplot = XYPlot(0,"Andor (normalized)",range(len(andor_norm)),andor_norm
)
        publish.send('ANDOR',myplot)

ds = DataSource(exp='rixx1003821xpptut15', run=461, dir='/cdssdf/data/lcls/psdmds/prj/public01/xtc/intg_det',
        intg_det='andor_vls',batch_size=1)
for myrun in ds.runs():
    andor = myrun.Detector('andor_vls')
    timing  = myrun  batch_size=batch_size)
run = next(ds.runs())
hsd = run.Detector('timinghsd')
    smdandor = dsrun.smalldataDetector(filename='mysmallh5.h5',batch_size=1000, callbacks=[my_smalldata])
    norm = 0
    ndrop_inhibitandor')

# Test calculating sum of the hsd for each integrating event.
sum_hsd = 0
    for nstep,stepi_evt, evt in enumerate(myrunrun.stepsevents()):
    hsd_calib    print('step:',nstep= hsd.raw.calib(evt)
     andor_calib = andor.raw.calib(evt)

    # forKeep nevt,evtsumming in enumerate(step.events()):
        the value of the other detector (hsd in this case)
    andorsum_imghsd += andor.raw.value(evtnp.sum(hsd_calib[:])/np.prod(hsd_calib.shape)
    
    # When an integrating #event alsois needfound, toprint checkout forand eventsreset missingthe duesum to damagevariable
    if andor_calib is not None:
    # (or compare against expected number of events)
            ndrop_inhibit += timing.raw.inhibitCounts(evt val_andor = np.sum(andor_calib[:])/np.prod(andor_calib.shape)
            smd.event(evt, mydata=nevt) # high rate data saved to h5print(f'i_evt: {i_evt} andor: {val_andor} sum_hsd:{sum_hsd}')
        sum_hsd = 0

RIX Example: Deadtime and Integrating Detectors

Sometimes data for a particular shot can not be read out because DAQ data buffers are unavailable.  This is called "deadtime".  Deadtime can make it impossible to reliably use high-rate shots to perform operations like normalizations for lower-rate integrating detectors.  Matt Weaver is adding counters to the timing system so the DAQ can count how many shots whose data has been dropped due to deadtime so experimenters can make a decision about whether there is sufficient data for a normalization-style analysis.

This example demonstrates (conceptually!) how to do the normalization, and view the results in a psmon plot (use the command "psplot ANDOR" on the same node).  Run this script either with "python andor.py" or "mpirun -n 5 andor.py":

Code Block
languagepy
from psana import DataSource
from psmon import publish
from psmon.plots import Image,XYPlot
import os
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

# The PS_SMD_N_EVENTS should be set to a small number (e.g. 1)
# since all other events which are part of this intg. event will be sent
# in the same batch.
os.environ['PS_SMD_N_EVENTS'] = '1'
os.environ['PS_SRV_NODES']='1'

if rank==4: # hack for now to eliminate use of publish.local below
    publish.init()

# we will remove this for batch processing and use "psplot" instead
# publish.local = True

def my_smalldata(data_dict):
    if 'unaligned_andor_norm' in data_dict:
        andor_norm = data_dict['unaligned_andor_norm'][0]   # need to check Matt's new timing-system data on every
            # event to make sure we haven't missed normalization
            # data due to deadtime
            norm+=nevt # fake normalization
            if andor_img is not None:
                print('andor data on evt:',nevt,'ndrop_inhibit:',ndrop_inhibit)
                # check that the high-read readout group (2) didn't
                # miss any events due to deadtime
                if ndrop_inhibit[2]!=0: print('*** data lost due to deadtime')
        myplot =       # need to prefix the name with "unaligned_" soXYPlot(0,"Andor (normalized)",range(len(andor_norm)),andor_norm
)
        publish.send('ANDOR',myplot)

ds = DataSource(exp='rixx1003821',run=46,dir='/sdf/data/lcls/ds/prj/public01/xtc',intg_det='andor_vls',batch_size=1)
for myrun in ds.runs():
   # theandor low-rate andor dataset doesn't get padded= myrun.Detector('andor_vls')
    timing = myrun.Detector('timing')
    smd =     # to align with the high rate datasetsds.smalldata(filename='mysmallh5.h5',batch_size=1000, callbacks=[my_smalldata])
    norm = 0
    ndrop_inhibit = 0
    for nstep,step in enumerate(myrun.steps()):
    smd.event(evt, mydata=nevt,
   print('step:',nstep)
        for nevt,evt in enumerate(step.events()):
            unaligned_andor_norm=(andor_img/norm))
img = andor.raw.value(evt)
            # also need to check for events missing due to norm=0damage
            # (or compare  ndrop_inhibit=0

AMI and Integrating Detectors

AMI is also affected by the DAQ deadtime issue described above but, in addition, events can also be thrown away going into the AMI shared memory (this feature is what allows AMI analysis to stay real-time).  This can cause additional issues with AMI integrating-detector normalization analyses that try to use high-rate shots to do normalizations for low-rate integrating detectors, since not all high-rate shots are guaranteed to be available.  Also, unlike psana, AMI doesn't currently have the idea of shots that "belong together" which will make an AMI normalization-style analysis of an integrating detector impossible.

MPI Task Structure To Support Scaling

psana can scale to allow for high rate analysis.  For example, many hdf5 files of small user-defined data (described above in Example Script Producing Small HDF5 File) can be written, one per "SRV" node in the diagram below.  The total number of SRV nodes is defined by the environment variable PS_SRV_NODES (defaults to 0).  These many hdf5 files are joined by psana into what appears to be one file using the hdf5 "virtual dataset" feature.  Similarly, multiple nodes can be used for filtering ("EB" nodes in the diagram below) and multiple nodes can be used to process big data in the main psana event loop ("BD" nodes in the digram below).  The one piece that cannot be scaled (currently) to multiple nodes is the SMD0 (SMallData) task, which reads the timestamps and fseek offsets from each tiny .smd.xtc2 file produced by the DAQ (typically one per detector, or one per detector segment, although it can contain more than one segment or detector).  This task joins together the relevant data for each shot ("event build") using the timestamp.  This SMD0 task is multi-threaded, with one thread for each detector.  For highest performance it is important that all SMD0 threads be allocated an entire MPI node.

Image Removed

Running a large job

Below shows how to setup a slurm job script to run a large job. This script uses setup_hosts_openmpi.sh  (also provided below) to assign a single node to SMD0 (see diagram above) and distribute all other tasks (EB, BD, & SRV) to the rest of available nodes. After source setup_hosts_openmpi.sh , you can use $PS_N_RANKS and $PS_HOST_FILE in your mpirun command. 

Note on no. of ranks available: on S3DF, there are 120 cores per compute node. The below example asks for 3 nodes (120 x 3 = 360 ranks in total) will only have 120 x (3 - 1) + 1 = 241 ranks available. This is because all 120 ranks on the first node is fully reserved for SMD0. 

Code Block
languagebash
titlesubmit_large_psana2.sh
#!/bin/bash
#SBATCH --partition=milano
#SBATCH --job-name=run_large_psana2
#SBATCH --output=output-%j.txt
#SBATCH --error=output-%j.txt
#SBATCH --nodes=3
#SBATCH --exclusive
#SBATCH --time=10:00

# Configure psana2 parallelization
source setup_hosts_openmpi.sh

# Run your job
mpirun -np $PS_N_RANKS --hostfile $PS_HOST_FILE python test_mpi.py

Here is the `setup_hosts_openmpi.sh`. You can create this script in your job folder. 

Code Block
languagebash
titlesetup_hosts_openmpi.sh
############################################################
# First node must be exclusive to smd0
# * For openmpi, slots=1 must be assigned to the first node.
############################################################

# Get list of hosts by expand shorthand node list into a 
# line-by-line node list
host_list=$(scontrol show hostnames $SLURM_JOB_NODELIST)
hosts=($host_list)

# Write out to host file by putting rank 0 on the first node
host_file="slurm_host_${SLURM_JOB_ID}"
for i in "${!hosts[@]}"; do
    if [[ "$i" == "0" ]]; then
        echo ${hosts[$i]} slots=1 > $host_file
    else
        echo ${hosts[$i]} >> $host_file
    fi
done

# Export hostfile for mpirun  
export PS_HOST_FILE=$host_file

# Calculate no. of ranks available in the job
export PS_N_RANKS=$(( SLURM_CPUS_ON_NODE * ( SLURM_JOB_NUM_NODES - 1 ) + 1 ))

Performance Tuning Tips

(see MPITaskStructureToSupportScaling to get a sense for where the parameters described here are used)

To get improved performance when running large jobs consider the following options.  It is not straightforward to set these parameters optimally for an arbitrary analysis job so some study is required for specific applications.  In some cases we can offer guidelines.

  • understand the CPU usage of your big-data ("BD") processing loop to make sure the bottleneck isn't "user code".  this can typically be done by running on 1 core.
  • increase the environment variable PS_SMD_NODES to be larger than its default of 1.  For many analyses, a number that is 1/16 of the number of big data cores has been good.  This variable, along with the PS_SRV_NODES variable described next determines how many cores are used for which task in psana (see MPITaskStructureToSupportScaling).
  • if you're writing a large amount of hdf5 data increase the environment variable PS_SRV_NODES to have more cores writing hdf5 files.  It is difficult here to provide guidance on the number since it depends on the application
  • set environment variable PS_SMD_N_EVENTS larger to increase the number of events that get sent in a "batch" when transmitting data from SMD0 cores through to BD cores
  • when setting up the smalldata, increase the number of events that get sent in a "batch" when transmitting data from BD cores to SRV cores by setting the batch_size kwarg in the DataSource.smalldata() call.

...

against expected number of events)
            ndrop_inhibit += timing.raw.inhibitCounts(evt)
            smd.event(evt, mydata=nevt) # high rate data saved to h5
            # need to check Matt's new timing-system data on every
            # event to make sure we haven't missed normalization
            # data due to deadtime
            norm+=nevt # fake normalization
            if andor_img is not None:
                print('andor data on evt:',nevt,'ndrop_inhibit:',ndrop_inhibit)
                # check that the high-read readout group (2) didn't
                # miss any events due to deadtime
                if ndrop_inhibit[2]!=0: print('*** data lost due to deadtime')
                # need to prefix the name with "unaligned_" so
                # the low-rate andor dataset doesn't get padded
                # to align with the high rate datasets
                smd.event(evt, mydata=nevt,
                          unaligned_andor_norm=(andor_img/norm))
                norm=0
                ndrop_inhibit=0

AMI and Integrating Detectors

AMI is also affected by the DAQ deadtime issue described above but, in addition, events can also be thrown away going into the AMI shared memory (this feature is what allows AMI analysis to stay real-time).  This can cause additional issues with AMI integrating-detector normalization analyses that try to use high-rate shots to do normalizations for low-rate integrating detectors, since not all high-rate shots are guaranteed to be available.  Also, unlike psana, AMI doesn't currently have the idea of shots that "belong together" which will make an AMI normalization-style analysis of an integrating detector impossible.

MPI Task Structure To Support Scaling

psana can scale to allow for high rate analysis.  For example, many hdf5 files of small user-defined data (described above in Example Script Producing Small HDF5 File) can be written, one per "SRV" node in the diagram below.  The total number of SRV nodes is defined by the environment variable PS_SRV_NODES (defaults to 0).  These many hdf5 files are joined by psana into what appears to be one file using the hdf5 "virtual dataset" feature.  Similarly, multiple nodes can be used for filtering ("EB" nodes in the diagram below) and multiple nodes can be used to process big data in the main psana event loop ("BD" nodes in the digram below).  The one piece that cannot be scaled (currently) to multiple nodes is the SMD0 (SMallData) task, which reads the timestamps and fseek offsets from each tiny .smd.xtc2 file produced by the DAQ (typically one per detector, or one per detector segment, although it can contain more than one segment or detector).  This task joins together the relevant data for each shot ("event build") using the timestamp.  This SMD0 task is multi-threaded, with one thread for each detector.  For highest performance it is important that all SMD0 threads be allocated an entire MPI node.

Image Added

Running a large job

Below shows how to setup a slurm job script to run a large job. This script uses setup_hosts_openmpi.sh  (also provided below) to assign a single node to SMD0 (see diagram above) and distribute all other tasks (EB, BD, & SRV) to the rest of available nodes. After source setup_hosts_openmpi.sh , you can use $PS_N_RANKS and $PS_HOST_FILE in your mpirun command. 


Note on no. of ranks available: on S3DF, there are 120 cores per compute node. The below example asks for 3 nodes (120 x 3 = 360 ranks in total) will only have 120 x (3 - 1) + 1 = 241 ranks available. This is because all 120 ranks on the first node is fully reserved for SMD0. 

Code Block
languagebash
titlesubmit_large_psana2.sh
#!/bin/bash
#SBATCH --partition=milano
#SBATCH --job-name=run_large_psana2
#SBATCH --output=output-%j.txt
#SBATCH --error=output-%j.txt
#SBATCH --nodes=3
#SBATCH --exclusive
#SBATCH --time=10:00

# Configure psana2 parallelization
source setup_hosts_openmpi.sh

# Run your job with #ranks <= (#nodes - 1) * 120 + 1 or use $PS_N_RANKS 
mpirun -np $PS_N_RANKS --hostfile $PS_HOST_FILE python test_mpi.py

Here is the `setup_hosts_openmpi.sh`. You can create this script in your job folder. 

Code Block
languagebash
titlesetup_hosts_openmpi.sh
############################################################
# First node must be exclusive to smd0
# * For openmpi, slots=1 must be assigned to the first node.
############################################################

# Get list of hosts by expand shorthand node list into a
# line-by-line node list
host_list=$(scontrol show hostnames $SLURM_JOB_NODELIST)
hosts=($host_list)

# Write out to host file by putting rank 0 on the first node
host_file="slurm_host_${SLURM_JOB_ID}"
for i in "${!hosts[@]}"; do
    if [[ "$i" == "0" ]]; then
        echo ${hosts[$i]} slots=1 > $host_file
    else
        echo ${hosts[$i]} >> $host_file
    fi
done

# Export hostfile for mpirun 
export PS_HOST_FILE=$host_file

# Calculate no. of ranks available in the job. 
export PS_N_RANKS=$(( SLURM_CPUS_ON_NODE * ( SLURM_JOB_NUM_NODES - 1 ) + 1 ))

Performance Tuning Tips

(see MPITaskStructureToSupportScaling to get a sense for where the parameters described here are used)

To get improved performance when running large jobs consider the following options.  It is not straightforward to set these parameters optimally for an arbitrary analysis job so some study is required for specific applications.  In some cases we can offer guidelines.

  • understand the CPU usage of your big-data ("BD") processing loop to make sure the bottleneck isn't "user code".  this can typically be done by running on 1 core.
  • increase the environment variable PS_EB_NODES to be larger than its default of 1.  For many analyses, a number that is 1/16 of the number of big data cores has been good.  This variable, along with the PS_SRV_NODES variable described next determines how many cores are used for which task in psana (see MPITaskStructureToSupportScaling).
  • if you're writing a large amount of hdf5 data increase the environment variable PS_SRV_NODES to have more cores writing hdf5 files.  It is difficult here to provide guidance on the number since it depends on the application
  • set environment variable PS_SMD_N_EVENTS larger to increase the number of events that get sent in a "batch" when transmitting data from SMD0 cores through to BD cores
  • when setting up the smalldata, increase the number of events that get sent in a "batch" when transmitting data from BD cores to SRV cores by setting the batch_size kwarg in the DataSource.smalldata() call.

psana also has some grafana monitoring built in that, with expert help, can be used to identify bottlenecks in an analysis.  Contact pcds-ana-l@slac.stanford.edu for guidance.

Sorting Small HDF5 File

The small hdf5 file is likely unsorted due to parallelism in psana2. In case your output h5 is large (> 1 billion records), you can use timestamp_sort_h5  tool by submitting the following job:

Code Block
languagebash
titlesubmit_large_psana2.sh
#!/bin/bash
#SBATCH --partition=milano
#SBATCH --job-name=timestamp_sort_h5
#SBATCH --output=output-%j.txt
#SBATCH --error=output-%j.txt
#SBATCH --nodes=1
#SBATCH --exclusive
#SBATCH --time=10:00

timestamp_sort_h5 /sdf/data/lcls/drpsrcf/ffb/users/monarin/h5/mylargeh5.h5 /sdf/data/lcls/drpsrcf/ffb/users/monarin/h5/output/result.h5

Note the first required argument is the unsorted hdf5 file and the second is the desired output file. There are other optional arguments, which can be access by running timestamp_sort_h5 --help. 

Why Is My Detector Object "None" On Some MPI Ranks?

In psana2 all mpi ranks execute the same code, but not all ranks can create a Detector object since there are “hidden” MPI helper-ranks shown in this diagram: MPITaskStructureToSupportScaling.  For those helper-ranks the Detector object will be None.  Those helper-ranks won’t enter psana2 loops over runs/steps/events, so as long as you only use a Detector object inside loops your code will run correctly without any special checks.  However, if you use the Detector object outside those loops you must check that the Detector object is not None before calling any methods.

Historical background: we went back and forth about how to manage the MPI helper-ranks.  The alternative would have been to use callbacks instead of run/step/event loops to more effectively hide the helper-ranks from user code, but callbacks would have been user-unfriendly in a different way: writing loops is a more natural coding approach for many users.  We felt the loop approach (with more fragile Detector objects that can be None) was the lesser of two evils.


Running psplot_live

From rix-daq node, source psana2 environment then run:

Code Block
languagebash
titlesubmit_large_psana2.sh
(ps-4.6.3) rix-daq:scripts> psplot_live ANDOR
Python 3.9.16 | packaged by conda-forge | (main, Feb  1 2023, 21:39:03) 
Type 'copyright', 'credits' or 'license' for more information
IPython 8.14.0 -- An enhanced Interactive Python. Type '?' for help.

In [1]: 

The above command activates psplot_live that listens to your analysis jobs (with plotting) and provides an interactive session. You can use the interactive session to list, kill, and reactivate plots. Note that to monitor more than one plot, you can use ' ' (space) to separate each plot name (e.g. psplot_live ANDOR ATMOPAL ). 


Below shows an example of analysis (monitoring two plots: ANDOR and ATMOPAL) and job submission scripts that communicate directly to psplot_live. Note that if you are converting python script that works with psplot (no live), the main difference is shown on line 25 where you have to set psmon_publish=publish as an additional DataSource argument. There may be other differences that need to be changed. Please let us know in this case. 

Code Block
languagepy
titlerun_andor.py
linenumberstrue
from psana import DataSource
from psmon import publish
from psmon.plots import Image,XYPlot
import os, sys, time
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
 

os.environ['PS_SRV_NODES']='1'
os.environ['PS_SMD_N_EVENTS']='1'


# passing exp and runnum
exp=sys.argv[1]
runnum=int(sys.argv[2])


mount_dir = '/sdf/data/lcls/drpsrcf/ffb'
#mount_dir = '/cds/data/drpsrcf'
xtc_dir = os.path.join(mount_dir, exp[:3], exp, 'xtc')
ds = DataSource(exp=exp,run=runnum,dir=xtc_dir,intg_det='andor_vls',
        batch_size=1, 
        psmon_publish=publish,
        detectors=['timing','andor_vls','atmopal'],
        max_events=0,
        live=True)


def my_smalldata(data_dict):
    if 'unaligned_andor_norm' in data_dict:
        andor_norm = data_dict['unaligned_andor_norm'][0]
        myplot = XYPlot(0,f"Andor (normalized) run:{runnum}",range(len(andor_norm)),andor_norm)
        publish.send('ANDOR',myplot)
    if 'sum_atmopal' in data_dict:
        atmopal_sum = data_dict['sum_atmopal']
        myplot = XYPlot(0,f"Atmopal (sum) run:{runnum}",range(len(atmopal_sum)), atmopal_sum)
        publish.send('ATMOPAL', myplot)
 
for myrun in ds.runs():
    andor = myrun.Detector('andor_vls')
    atmopal = myrun.Detector('atmopal')
    timing = myrun.Detector('timing')
    smd = ds.smalldata(filename='mysmallh5.h5',batch_size=5, callbacks=[my_smalldata])
    norm = 0
    ndrop_inhibit = 0
    sum_atmopal = None
    cn_andor_events = 0
    cn_intg_events = 0
    ts_st = None
    for nstep,step in enumerate(myrun.steps()):
        print('step:',nstep)
        for nevt,evt in enumerate(step.events()):
            if ts_st is None: ts_st = evt.timestamp
            cn_intg_events += 1
            andor_img = andor.raw.value(evt)
            atmopal_img = atmopal.raw.image(evt)
            if atmopal_img is not None:
                if sum_atmopal is None:
                    sum_atmopal = atmopal_img[0,:]
                else:
                    sum_atmopal += atmopal_img[0,:]
            # also need to check for events missing due to damage
            # (or compare against expected number of events)
            ndrop_inhibit += timing.raw.inhibitCounts(evt)
            smd.event(evt, mydata=nevt) # high rate data saved to h5
            # need to check Matt's new timing-system data on every
            # event to make sure we haven't missed normalization
            # data due to deadtime
            norm+=nevt # fake normalization
            if andor_img is not None:
                cn_andor_events += 1
                #print('andor data on evt:',nevt,'ndrop_inhibit:',ndrop_inhibit)
                print(f'BD{rank-1}: #andor_events: {cn_andor_events} #intg_event:{cn_intg_events} st: {ts_st} en:{evt.timestamp}')
                # check that the high-read readout group (2) didn't
                # miss any events due to deadtime
                if ndrop_inhibit[2]!=0: print('*** data lost due to deadtime')
                # need to prefix the name with "unaligned_" so
                # the low-rate andor dataset doesn't get padded
                # to align with the high rate datasets
                smd.event(evt, mydata=nevt,
                          unaligned_andor_norm=(andor_img/norm),
                          sum_atmopal=sum_atmopal)
                norm=0
                ndrop_inhibit=0
                sum_atmopal = None
                cn_intg_events = 0
                ts_st = None
    smd.done()

And an sbatch script:

Code Block
languagebash
titlesubmit_run_andor.sh
#!/bin/bash
#SBATCH --partition=milano
#SBATCH --account=<your account here>
#SBATCH --job-name=run_andor
#SBATCH --nodes=1
#SBATCH --ntasks=5
#SBATCH --output=output-%j.txt
#SBATCH --error=output-%j.txt
##SBATCH --exclusive
#SBATCH -t 00:05:00

t_start=`date +%s`

exp=$1
runnum=$2
mpirun -n 5 python run_andor.py $exp $runnum

t_end=`date +%s`
echo PSJobCompleted TotalElapsed $((t_end-t_start))  

After creating the above two scripts, you can submit the job with:

Code Block
languagebash
titlesbatch
sbatch submit_run_andor.sh rixc00121 121

You should be able to see the psplot(s) pop up automatically, 

Image Added

To view list of psplots, 

Code Block
languagebash
titlesbatch
In [1]: ls()
ID    SLURM_JOB_ID EXP        RUN   NODE                                PORT  STATUS    
1     43195784     rixc00121  121   sdfmilan005.sdf.slac.stanford.edu   12323 PLOTTED

If you close (error) the plot window, the process is automatically removed from the list:

Code Block
languagebash
titlesbatch
In [2]: ls()
ID    SLURM_JOB_ID EXP        RUN   NODE                                PORT  STATUS

You can submit your analysis job again (any increasing run numbers are always monitored). For old job (previously submitted run number from the same node and same psplot port), they will NOT be shown automatically (STATUS: RECEIVED). You can reactivate them using show(ID) command. 

Code Block
languagebash
titlesbatch
In [2]: ls()
ID    SLURM_JOB_ID EXP        RUN   NODE                                PORT  STATUS    
1     3275205      rixc00121  121   sdfiana001.sdf.slac.stanford.edu    12323 RECEIVED  

In [3]: show(1)
Main received {'msgtype': 3} from db-zmq-server

To kill a plot, use kill(ID) or kill_all() to kill all plots. 

Code Block
languagebash
titlesbatch
In [5]: kill(1)

In [6]: ls()
ID    SLURM_JOB_ID EXP        RUN   NODE                                PORT  STATUS