Introduction
A general introduction to LCLS data analysis (psana1, ami2, jupyter, slurm) can be viewed in this 53 minutes video here. This is for psana1 (i.e. LCLS1) but the user interface is quite similar to psana2 (for LCLS2) even if the internals are significantly improved for psana2:
JupyterHub
psana2 is also available in JupyterHub here in the kernel named "LCLS-II py3": https://pswww.slac.stanford.edu/jupyterhub/hub/home
Environment
To obtain the environment to run psana2, execute the following:
# For S3DF users (sdfiana nodes) source /sdf/group/lcls/ds/ana/sw/conda2/manage/bin/psconda.sh # For PCDS users (psana nodes) source /cds/sw/ds/ana/conda2/manage/bin/psconda.sh
Note that LCLS-II psana is not compatible with LCLS-I psana, so environments must activate one or the other, but not both. Since LCLS-I and LCLS-II live in different conda installations it is necessary to start a fresh session when switching between the two.
Public Practice Data
Publicly accessible practice data is located in S3DF in the directory /sdf/data/lcls/ds/prj/public01/xtc. Use of this data requires the additional "dir=/sdf/data/lcls/ds/prj/public01/xtc" keyword argument to the DataSource object.
Experiment | Run | Comment |
---|---|---|
tmoc00118 | 222 | Generic TMO dark data |
tmoacr019 | 4,5,6 | xtcav dark,lasing-off,lasing-on (cpo thinks, not certain) |
rixx43518 | 34 | A DAQ "fly scan" of motor (see ami#FlyScan:MeanVs.ScanValue) |
rixx43518 | 45 | A DAQ "step scan" of two motors |
rixl1013320 | 63 | Rix stepping delay scan of both Vitara delay and ATM delay stage (lxt_ttc scan) at a single mono energy |
rixl1013320 | 93 | Rix continuous mono scan with laser on/off data at a single time delay |
rixx1003821 | 55 | An infinite sequence with two slow andors running at different rates |
rixx1003821 | 68 | A finite burst sequence with one andor |
uedcom103 | 7 | epix10ka data |
ueddaq02 | 569 | epix10ka data |
Detector Names
Use this command to see non-epics detector names (see "Detector Interface" example below):
(ps-4.1.0) psanagpu101:lcls2$ detnames exp=tmoc00118,run=222,dir=/sdf/data/lcls/ds/prj/public01/xtc --------------------- Name | Data Type --------------------- epicsinfo | epicsinfo timing | raw hsd | raw gmdstr0 | raw etc.
From within python the above information can be obtained as a dictionary using "run.detinfo".
Use the same command with the "-e" option to see epics detector names (see "Detector Interface" example below). These are slowly varying variables (like temperatures) that are not strongly time correlated with the regular per-shot detector data:
(ps-4.5.10) psanagpu101:~$ detnames -e exp=tmoc00118,run=222,dir=/sdf/data/lcls/ds/prj/public01/xtc | more ----------------------------------------------------------------- Detector Name | Epics Name ----------------------------------------------------------------- StaleFlags | Keithley_Sum | EM2K0:XGMD:HPS:KeithleySum IM2K4_XrayPower | IM2K4:PPM:SPM:VOLT_RBV IM3K4_XrayPower | IM3K4:PPM:SPM:VOLT_RBV IM4K4_XrayPower | IM4K4:PPM:SPM:VOLT_RBV etc.
Note that either of these names ("Detector Name" or "Epics Name") can be used with the Detector Interface described below. From within python the above information can be obtained as a dictionary using "run.epicsinfo".
Using the Detector Interface
Standard (per-shot) detectors and the slower epics variables can be accessed as shown here using the names discovered with the commands above. NOTE: the slow epics variables are polled in the DAQ at 1Hz, so for the first one second of a run they will typically return "None". NOTE: You can use tab-completion in ipython (see https://ipython.org/ipython-doc/3/interactive/tutorial.html) or the jupyter notebook to explore what you can do with the various detector objects. NOTE: epics variables use a different syntax than other detectors (see example here):
from psana import DataSource ds = DataSource(exp='tmoc00118', run=222, dir='/sdf/data/lcls/ds/prj/public01/xtc'\ , max_events=100) myrun = next(ds.runs()) opal = myrun.Detector('tmo_atmopal') epics_det = myrun.Detector('IM2K4_XrayPower') for evt in myrun.events(): img = opal.raw.image(evt) epics_val = epics_det(evt) # epics variables have a different syntax # check for missing data if img is None: print('no image') else: print(img.shape) if epics_val is None: print('no epics value') else: print(epics_val)
Example Script Producing Small HDF5 File
You can run this script with MPI: mpirun -n 6 python example.py
It also works on one core with: python example.py (useful for debugging). See MPI rank/task diagram here to understand what different mpi ranks are doing.
This mechanism by defaults produces "aligned" datasets where missing values are padded (with NaN's for floats, and -99999 for integers). To create an unaligned dataset (without padding) prefix the name of the variable with "unaligned_".
NOTE: In addition to the hdf5 you specify as your output file ("my.h5" below) you will see other h5 files like "my_part0.h5", one for each of the cores specified in PS_SRV_NODES. The reason for this is that each of those SRV cores writes its own my_partN.h5 file: for LCLS2 it will be important for performance to write many files. The "my.h5" file is actually quite small, and uses a new HDF5 feature called a "Virtual DataSet" (VDS) to join together the various my_partN.h5 files. Also note that events in my.h5 will not be in time order. If you copy the .h5 somewhere else, you need to copy all of them.
NOTE: In python, if you want to exit a loop early you often use a "break" statement. When running psana-python with mpi parallelization, however, not all cores will see this statement, and the result will be that your job will hang at the end. To avoid this use the "max_events" keyword argument to DataSource (see example below). An example of the failure mode is if an EB core communicates with only 1 BD core and that BD core exits, then the EB core gets stuck and doesn't exit (see MPITaskStructureToSupportScaling). MPI requires all cores to exit gracefully for a job to exit. The max_events kwarg allows the SMD0 core to shut everything down.
NOTE: The call below to smd.sum() (or min()/max()) only accepts numpy arrays or None. This requirement allows cores that don't see any events to not cause errors when summing across cores. All cores must call .sum() or the process will hang (a property of the underlying mpi "Reduce" call).
NOTE: The smd.event() and smd.save_summary() calsl (for persisting data to hdf5 or passing it to an SRV callback) can be passed either a series of kwargs or a dictionary. The dictionary can optionally be hierarchical (e.g. d['mykey1']['mykey2']=value) and those keys will be reflected in the hdf5 dataset structure. This allows users to organize hdf5 data with a structure that they prefer.
from psana import DataSource import numpy as np import os # OPTIONAL callback with "gathered" small data from all cores. # usually used for creating realtime plots when analyzing from # DAQ shared memory. Called back on each SRV node. def my_smalldata(data_dict): print(data_dict) # sets the number of h5 files to write. 1 is sufficient for 120Hz operation # optional: only needed if you are saving h5. os.environ['PS_SRV_NODES']='1' ds = DataSource(exp='tmoc00118', run=222, dir='/sdf/data/lcls/ds/prj/public01/xtc', max_events=10) # batch_size is optional. specifies how often the dictionary of small # user data is gathered. if you write out large data (NOT RECOMMENDED) it needs to be set small. smd = ds.smalldata(filename='mysmallh5.h5', batch_size=5, callbacks=[my_smalldata]) for run in ds.runs(): opal = run.Detector('tmo_opal1') ebeam = run.Detector('ebeam') runsum = np.zeros((3),dtype=float) # beware of datatypes when summing: can overflow for evt in run.events(): img = opal.raw.image(evt) photonEnergy = ebeam.raw.ebeamPhotonEnergy(evt) # important: always check for missing data if img is None or photonEnergy is None: continue evtsum = np.sum(img) # pass either dictionary or kwargs smd.event(evt, evtsum=evtsum, photonEnergy=photonEnergy) runsum += img[0,:3] # local sum on one mpi core # optional summary data for whole run if smd.summary: tot_runsum = smd.sum(runsum) # sum (or max/min) across all mpi cores. Must be numpy array or None. # pass either dictionary or kwargs. smd.save_summary({'sum_over_run' : tot_runsum}, summary_int=1) smd.done()
Full-Featured Example: Callbacks, Detector Selection and Variable Length Data
You can run this script with MPI the same way as shown in the previous example: mpirun -n 6 python example.py
NOTE: if you don't run with at least 6 cores, the destination selection code below will not function correctly. Also, the destination selection code currently only works with 1 EB core (defined by the PS_SMD_NODES environment variable, which defaults to 1).
NOTE: variable length data names must have a "var_" prefix
These additional arguments for DataSource were added to this example
- detectors=['detname1', 'detname2',]
List of detectors to be read from the disk. If you only need a few detectors for analysis, list their names here. The reading process will be faster since unused detector data is not read.
- smd_callback=smd_callback
You can use this callback to 1) "filter" events: deciding which which event to read the large event data for (e.g. camera images) using smaller event data (e.g. diode values) and 2) set the "destination" (mph rank id) to send this event to. The typical LCLS use case for the destination is for construction of an average-image-vs-pump-probe-time "cube". NOTE: filtering events can decrease performance, since it causes more random access reads from disk. Whether filtering benefits your application or not must be determined on a case-by-case basis.
- small_xtc=['detname1', 'detname2']
List of detectors to be used in smd_callback() for filtering.
from psana import DataSource import numpy as np import os # OPTIONAL callback with "gathered" small data from all cores. # usually used for creating realtime plots when analyzing from # DAQ shared memory. Called back on each SRV node. def my_smalldata(data_dict): print(data_dict) # Event filtering and destination callback (runs on EB cores) # Use this function to decide if you want to fetch large data for this event # and/or direct an event to process on a particular 'rank' # (this rank number should be between 1 and total no. of ranks # (6 for "mpirun -n 6") minus 3 since 3 ranks are reserved for SMD0, EB, SRV # processes). If a non-epics detector is needed in this routine, make sure to # add the detector name in small_xtc kwarg for DataSource (see below). # All epics and scan detectors are available automatically. def smd_callback(run): opal = run.Detector('tmo_opal1') epics_det = run.Detector('IM2K4_XrayPower') n_bd_nodes = 3 # for mpirun -n 6, 3 ranks are reserved so there are 3 BigData ranks left for i_evt, evt in enumerate(run.events()): img = opal.raw.image(evt) epics_val = epics_det(evt) dest = (evt.timestamp % n_bd_nodes) + 1 if epics_val is not None: # Set the destination (rank no.) where this event should be sent to evt.set_destination(dest) yield evt # sets the number of h5 files to write. 1 is sufficient for 120Hz operation # optional: only needed if you are saving h5. os.environ['PS_SRV_NODES']='1' ds = DataSource(exp='tmoc00118', run=222, dir='/sdf/data/lcls/ds/prj/public01/xtc', max_events = 40, detectors = ['epicsinfo', 'tmo_opal1', 'ebeam'], # only reads these detectors (faster) smd_callback= smd_callback, # event-filtering/destination callback (see notes above) small_xtc = ['tmo_opal1'], # detectors to be used in smalldata callback ) # batch_size is optional. specifies how often the dictionary of small # user data is gathered. if you write out large data (NOT RECOMMENDED) it needs to be set small. smd = ds.smalldata(filename='mysmallh5.h5', batch_size=5, callbacks=[my_smalldata]) # used for variable-length data example cnt = 0 modulus = 4 for run in ds.runs(): opal = run.Detector('tmo_opal1') ebeam = run.Detector('ebeam') runsum = np.zeros((3),dtype=float) # beware of datatypes when summing: can overflow for evt in run.events(): img = opal.raw.image(evt) photonEnergy = ebeam.raw.ebeamPhotonEnergy(evt) if img is None or photonEnergy is None: continue evtsum = np.sum(img) # pass either dictionary or kwargs smd.event(evt, evtsum=evtsum, photonEnergy=photonEnergy) runsum += img[0,:3] # local sum on one mpi core # an example of variable-length data (must have "var_" name prefix) if cnt % modulus: x = np.arange(cnt%modulus) y = [[x+1, (x+1)**2] for x in range(cnt%modulus)] smd.event(evt, {'var_test' : { 'x': x, 'y': y }}) else: # Note, this works either way, either not sending anything or # sending 0-length data. It should be noted if there is *no* # data in the entire run, the var_array is *not* written to the # output! pass #smd.event(evt, {'var_test' : { 'x': [], 'y': [] }}) cnt += 1 # optional summary data for whole run if smd.summary: tot_runsum = smd.sum(runsum) # sum (or max/min) across all mpi cores. Must be numpy array or None. # pass either dictionary or kwargs smd.save_summary({'sum_over_run' : tot_runsum}, summary_int=1) smd.done()
The variable length data can be read using a script similar to this. The variable-length data is stored as two datasets: one is a long array of appended values and the other is a set of per-event lengths. Historically this approach has been more compute-efficient than the native hdf5 "ragged" arrays which often requires python-looping. When reading the user must keep track of the lengths (called "idx" here):
import h5py f = h5py.File("mysmallh5.h5", "r") n1 = f['timestamp'] n2 = f['var_test_len'] n3 = f['var_test/x'] n4 = f['var_test/y'] print(len(n1), len(n2)) idx = 0 for i in range(len(n1)): print("%d --> %d" % (n1[i], n2[i])) for j in range(n2[i]): print( " %s | %s" % (n3[idx+j], n4[idx+j])) print("") idx += n2[i]
Excluding Detectors
In addition to only including specified detectors with the "detectors=[]" kwarg (see previous section) you can exclude detectors when creating a new datasource with xdetectors=[]
argument.
# Create a datasource and tell it to exclude two detectors from psana import DataSource ds = DataSource(exp='tmoc00118', run=222, dir='/sdf/data/lcls/ds/prj/public01/xtc', xdetectors = ['hsd'], # all other detectors will be available max_events = 10) run = next(ds.runs()) # Create these detectors normally opal = run.Detector('tmo_opal1') ebeam = run.Detector('ebeam') for i, evt in enumerate(run.events()): img = opal.raw.image(evt) photonEnergy = ebeam.raw.ebeamPhotonEnergy(evt) print(f'got evt={i} ts={evt.timestamp} img={img.shape} {photonEnergy=}')
Note that other detectors that are also part of the excluded stream will also be excluded. You'll see a warning message like this one:
Warning: Stream-8 has one or more detectors matched with the excluded set. All detectors in this stream will be excluded.
If you're trying to access detectors that are part of the excluded list, you'll see DetectorNameError
message below:
Traceback (most recent call last): File "/cds/home/m/monarin/lcls2/psana/psana/tests/dummy.py", line 12, in <module> ebeam = run.Detector('ebeam') File "/cds/home/m/monarin/lcls2/psana/psana/psexp/run.py", line 119, in Detector raise DetectorNameError(err_msg) psana.psexp.run.DetectorNameError: No available detector class matched with ebeam. If this is a new detector/version, make sure to add new class in detector folder.
Accessing Event Codes
LCLS1-style event codes can be accessed using the "timing" detector:
from psana import DataSource ds = DataSource(exp='tmoc00118',run=222,dir='/sdf/data/lcls/ds/prj/public01/xtc') myrun = next(ds.runs()) timing = myrun.Detector('timing') for nevt,evt in enumerate(myrun.events()): allcodes = timing.raw.eventcodes(evt) # event code 15 fires at 1Hz, and this exp/run had 10Hz triggers print('event code 15 present:',allcodes[15]) if nevt>20: break
Running in Parallel
Example of slurm scripts to submit are here: Submitting SLURM Batch Jobs. Instructions for submitting batch jobs at S3DF to run in parallel are here: https://confluence.slac.stanford.edu/display/PCDS/Running+at+S3DF.
Analyzing Scans
To get a list of the scan variables, use the following command:
(ps-4.3.2) psanagpu102:lcls2$ detnames -s exp=rixx43518,run=45,dir=/sdf/data/lcls/ds/prj/public01/xtc -------------------------- Name | Data Type -------------------------- motor1 | raw motor2 | raw step_value | raw step_docstring | raw -------------------------- (ps-4.3.2) psanagpu102:lcls2$
From within python the above information can be obtained as a dictionary using "run.scaninfo".
Note that "step_value"/"step_docstring" are convenience values defined by the DAQ to be a single value/string that is supposed to represent what has happened on a step, for use in plots/printout, since it's not easy to plot, for example, vs. multiple motor positions in a complex scan.
Code similar to this can be used to access the above scan variables:
from psana import DataSource ds = DataSource(exp='rixx43518',run=45,dir='/sdf/data/lcls/ds/prj/public01/xtc') myrun = next(ds.runs()) motor1 = myrun.Detector('motor1') motor2 = myrun.Detector('motor2') step_value = myrun.Detector('step_value') step_docstring = myrun.Detector('step_docstring') for step in myrun.steps(): print(motor1(step),motor2(step),step_value(step),step_docstring(step)) for evt in step.events(): pass
Running From Shared Memory
psana2 scripts can be run in real-time on shared memory. There is some extra complexity compared to writing python code for the real-time GUI ami, for example:
- you have to create plots and handle when when they update
- you have to worry about data getting out of time order
- you have to handle missing data
- you have to reset plots on run boundaries (if that's the behavior you want)
- you have to write code to handle the data gathered from multiple mpi cores (not required if you want to run on 1 core)
- be aware that the shared memory python scripts "steal" event statistics from other shared memory instances (e.g. ami)
but you get to take advantage of the power/flexibility of python. Look at the DAQ .cnf file (e.g. /cds/group/pcds/dist/pds/rix/scripts/rix.cnf) to see what the name of the node is running the shared memory server ("monReqServer"). To access those nodes you need to be on a special permissions list (email pcds-ana-l@slac.stanford.edu to request). You can find the name of the shared memory (hutch name is typically used) either by looking on the .cnf file (the "-P" option to monReqServer executable) or doing a command like this:
drp-neh-cmp003:~$ ls /dev/shm/ PdsMonitorSharedMemory_tmo drp-neh-cmp003:~$
For this output, you would use "DataSource(shmem='tmo')".
Note that one can develop the shared memory scripts (including real-time plots) using offline data, then change the DataSource line to run them in real-time.
Typically psmon is used for publishing results to realtime plots in the callback, publishing updates every "N" events. See this link for psmon examples: Visualization Tools.
When running multi-core with mpi one has to use the small data "callbacks" kwarg to receive the data gathered from all nodes. An example multi-core script is below (also works on 1 core). A pure 1-core script is simpler (no data "gathering" needed: can just be a loop over events with plot updates every N events). For multi-core, this can be run with "mpirun -n 3 python <scriptname>.py" and the two plots can be viewed on the same node with "psplot -a 1 OPALSUMS OPALIMG" (see Visualization Tools for "psplot" options). It can also be run in the usual offline manner by changing the DataSource line. It is written in a way to keep running even when the DAQ is restarted:
import os import numpy as np from psana import DataSource from psmon import publish from psmon.plots import XYPlot,Image from collections import deque from mpi4py import MPI numworkers = MPI.COMM_WORLD.Get_size()-1 if numworkers==0: numworkers=1 # the single core case (no mpi) os.environ['PS_SRV_NODES']='1' # one mpi core gathers/plots data mydeque=deque(maxlen=25) lastimg=None numevents=0 numendrun=0 def my_smalldata(data_dict): # one core gathers all data from mpi workers global numevents,lastimg,numendrun,mydeque if 'endrun' in data_dict: numendrun+=1 if numendrun==numworkers: print('Received endrun from all workers. Resetting data.') numendrun=0 numevents=0 mydeque=deque(maxlen=25) return numevents += 1 if 'opal' in data_dict: lastimg = data_dict['opal'] mydeque.append(data_dict['opalsum']) if numevents%100==0: # update plots around 1Hz print('event:',numevents) myxyplot = XYPlot(numevents, "Last 25 Sums", np.arange(len(mydeque)), np .array(mydeque), formats='o') publish.send("OPALSUMS", myxyplot) if lastimg is not None: # opal image is not sent all the time myimgplot = Image(numevents, "Opal Image", lastimg) publish.send("OPALIMG", myimgplot) while 1: # mpi worker processes ds = DataSource(shmem='rix') smd = ds.smalldata(batch_size=5, callbacks=[my_smalldata]) for myrun in ds.runs(): opal = myrun.Detector('atmopal') for nevt,evt in enumerate(myrun.events()): mydict={} image = opal.raw.image(evt) if image is None: continue # do as much work as possible in the workers # don't send large data all the time, if possible if nevt%10==0: mydict['opal']=image mydict['opalsum']=np.sum(image) smd.event(evt,mydict) smd.event(evt,{'endrun':1}) # tells gatherer to reset plots
When running multi-node mpi there are also some complexities propagating the environment to remote nodes: the way to address that is described in this link.
Running in LIVE mode
Here's a sample python script, how you can config datasource to run in live mode:
# Use environment variable to specify how many attempts, # the datasource should wait for file reading (1 second wait). # In this example, we set it to 30 (wait up 30 seconds). import os os.environ['PS_SMD_MAX_RETRIES'] = '30' # Create a datasource with live flag from psana import DataSource ds = DataSource(exp='tmoc00118', run=222, dir='/sdf/data/lcls/ds/prj/public01/xtc', live = True, max_events = 10) # Looping over your run and events as usual # You'll see "wait for an event..." message in case # The file system writing is slower than your analysis run = next(ds.runs()) for i, evt in enumerate(run.events()): print(f'got evt={i} ts={evt.timestamp}')
Note that this script is also available in tests folder in lcls2 repository. You can run the script by:
(ps-4.3.2) psanagpu109 tests $ mpirun -n 3 python livemode.py
Jump to events
Psana2 can jump to particular events when given a list of timestamps. In the example below, we only want to process these four timestamps. Note that timestamp array should be formatted to np.uint64.
from psana import DataSource import numpy as np timestamps = np.array([4194783241933859761,4194783249723600225,4194783254218190609,4194783258712780993], dtype=np.uint64) ds = DataSource(exp='tmoc00118', run=222, dir='/sdf/data/lcls/ds/prj/public01/xtc', timestamps=timestamps) myrun = next(ds.runs()) opal = myrun.Detector('tmo_atmopal') for nevt, evt in enumerate(myrun.events()): img = opal.raw.image(evt) print(nevt, evt.timestamp, img.shape)
Here is the output
(ps-4.5.5) monarin@psanagpu111 (master *) psana2 $ python test_jump.py 0 4194783241933859761 (1024, 1024) 1 4194783249723600225 (1024, 1024) 2 4194783254218190609 (1024, 1024) 3 4194783258712780993 (1024, 1024)
Event Times In Local Time Zone
NOTE: the method evt.datetime().timestamp() can also be used to generate more natural timestamps (e.g. for use in plots).
import datetime as dt from psana import DataSource ds = DataSource(exp='tmoc00118', run=222, dir='/sdf/data/lcls/ds/prj/public01/xtc') myrun = next(ds.runs()) for nevt,evt in enumerate(myrun.events()): if nevt>3: break t = evt.datetime() localt = t.replace(tzinfo=dt.timezone.utc).astimezone(tz=None) print(localt.strftime('%H:%M:%S'))
Running with Integrating Detector
Here's a sample python script, how you can analyze a run with an integrating detector (in this example, it's 'andor'). For more detail about the technical design, visit this page. Some parameter guidance for integrating detector analysis:
- One key variable when running in integrating-detector mode: PS_SMD_N_EVENTS (an environment variable) which is how many events SMD0 will send to each EB core. When running integrating detectors then the "units" of this variable changes from being "high-rate events" to being "low-rate integrating-detector events". PS_SMD_N_EVENTS should typically be set small (since 1 integrating-detector contains many high-rate events).
- The batch_size kwarg to DataSource is ignored for the integrating-detector case (only PS_SMD_N_EVENTS controls the batch size)
- The max_events kwarg to DataSource also changes to count integrating-detector events
- Note that the batch_size kwarg to the DataSource.smalldata() call does NOT count integrating-detector events.
- Use evt.EndOfBatch() to check if the last integrating event in the current batch has been reached.
- For shifted integrating window, you can specify the amount of time, intg_delta_t (kwarg to DataSource) as a positive integer in ns to allow the integrating window to shift.
NOTE: part of psana ("SMD0") has buffers that need to be large enough to accommodate at least one event from the integrating detector. If PS_SMD_N_EVENTS is set too large and this is not possible then an error message is printed and psana exits.
# The PS_SMD_N_EVENTS should be set to a small number (e.g. 1) # since all other events which are part of this intg. event will be sent # in the same batch. import os import numpy as np os.environ['PS_SMD_N_EVENTS'] = '1' from psana import DataSource ds = DataSource(exp='xpptut15', run=1, dir='/sdf/data/lcls/ds/prj/public01/xtc/intg_det', intg_det='andor') run = next(ds.runs()) hsd = run.Detector('hsd') andor = run.Detector('andor') # Test calculating sum of the hsd for each integrating event. sum_hsd = 0 for i_evt, evt in enumerate(run.events()): hsd_calib = hsd.raw.calib(evt) andor_calib = andor.raw.calib(evt) # Keep summing the value of the other detector (hsd in this case) sum_hsd += np.sum(hsd_calib[:])/np.prod(hsd_calib.shape) # When an integrating event is found, print out and reset the sum variable if evt.EndOfBatch(): val_andor = np.sum(andor_calib[:])/np.prod(andor_calib.shape) print(f'i_evt: {i_evt} andor: {val_andor} sum_hsd:{sum_hsd}') sum_hsd = 0
RIX Example: Deadtime and Integrating Detectors
Sometimes data for a particular shot can not be read out because DAQ data buffers are unavailable. This is called "deadtime". Deadtime can make it impossible to reliably use high-rate shots to perform operations like normalizations for lower-rate integrating detectors. Matt Weaver is adding counters to the timing system so the DAQ can count how many shots whose data has been dropped due to deadtime so experimenters can make a decision about whether there is sufficient data for a normalization-style analysis.
This example demonstrates (conceptually!) how to do the normalization, and view the results in a psmon plot (use the command "psplot ANDOR" on the same node). Run this script either with "python andor.py" or "mpirun -n 5 andor.py":
from psana import DataSource from psmon import publish from psmon.plots import Image,XYPlot import os from mpi4py import MPI comm = MPI.COMM_WORLD rank = comm.Get_rank() size = comm.Get_size() # The PS_SMD_N_EVENTS should be set to a small number (e.g. 1) # since all other events which are part of this intg. event will be sent # in the same batch. os.environ['PS_SMD_N_EVENTS'] = '1' os.environ['PS_SRV_NODES']='1' def my_smalldata(data_dict): if 'unaligned_andor_norm' in data_dict: andor_norm = data_dict['unaligned_andor_norm'][0] myplot = XYPlot(0,"Andor (normalized)",range(len(andor_norm)),andor_norm ) publish.send('ANDOR',myplot) ds = DataSource(exp='rixx1003821',run=46,dir='/sdf/data/lcls/ds/prj/public01/xtc',intg_det='andor_vls',batch_size=1,psmon_publish=publish) for myrun in ds.runs(): andor = myrun.Detector('andor_vls') timing = myrun.Detector('timing') smd = ds.smalldata(filename='mysmallh5.h5',batch_size=1000, callbacks=[my_smalldata]) norm = 0 ndrop_inhibit = 0 for nstep,step in enumerate(myrun.steps()): print('step:',nstep) for nevt,evt in enumerate(step.events()): andor_img = andor.raw.value(evt) # also need to check for events missing due to damage # (or compare against expected number of events) ndrop_inhibit += timing.raw.inhibitCounts(evt) smd.event(evt, mydata=nevt) # high rate data saved to h5 # need to check Matt's new timing-system data on every # event to make sure we haven't missed normalization # data due to deadtime norm+=nevt # fake normalization if evt.EndOfBatch(): print('andor data on evt:',nevt,'ndrop_inhibit:',ndrop_inhibit) # check that the high-read readout group (2) didn't # miss any events due to deadtime if ndrop_inhibit[2]!=0: print('*** data lost due to deadtime') # need to prefix the name with "unaligned_" so # the low-rate andor dataset doesn't get padded # to align with the high rate datasets if andor_img is not None: smd.event(evt, mydata=nevt, unaligned_andor_norm=(andor_img/norm)) norm=0 ndrop_inhibit=0 smd.done()
AMI and Integrating Detectors
AMI is also affected by the DAQ deadtime issue described above but, in addition, events can also be thrown away going into the AMI shared memory (this feature is what allows AMI analysis to stay real-time). This can cause additional issues with AMI integrating-detector normalization analyses that try to use high-rate shots to do normalizations for low-rate integrating detectors, since not all high-rate shots are guaranteed to be available. Also, unlike psana, AMI doesn't currently have the idea of shots that "belong together" which will make an AMI normalization-style analysis of an integrating detector impossible.
Multiple Integrating Detectors
- If your experiment has multiple integrating detectors, their readout rates must be multiples of each other, and their readouts must be "in phase"
- The integrating detector name given to the DataSource objects must be the slowest one
- If you use a delta_t offset, the delta_t must be identical for all integrating detectors
- ISSUE: in the delta_t case: currently there is no end-of-batch marker for multiple integrating detectors for the faster detectors (since only the slow detector is given to datasource). This isn't an issue without delta_t because the detector's presence/absence is an end-of batch indicator. Ideally one would like multiple end-of-batch indicators with several delta-t's? Mona thinks this is possible. Mona will try to build this into the interface even if it doesn't work yet. Could kludge a rough faster end-of-batch with event counting?
- Ideally to support multiple out-of-phase integrating detectors psana would have to send events to multiple nodes. This feels like it would be enormously complex on top of all the other requirements that psana has to satisfy.
MPI Task Structure To Support Scaling
psana can scale to allow for high rate analysis. For example, many hdf5 files of small user-defined data (described above in Example Script Producing Small HDF5 File) can be written, one per "SRV" node in the diagram below. The total number of SRV nodes is defined by the environment variable PS_SRV_NODES (defaults to 0). These many hdf5 files are joined by psana into what appears to be one file using the hdf5 "virtual dataset" feature. Similarly, multiple nodes can be used for filtering ("EB" nodes in the diagram below) and multiple nodes can be used to process big data in the main psana event loop ("BD" nodes in the digram below). The one piece that cannot be scaled (currently) to multiple nodes is the SMD0 (SMallData) task, which reads the timestamps and fseek offsets from each tiny .smd.xtc2 file produced by the DAQ (typically one per detector, or one per detector segment, although it can contain more than one segment or detector). This task joins together the relevant data for each shot ("event build") using the timestamp. This SMD0 task is multi-threaded, with one thread for each detector. For highest performance it is important that all SMD0 threads be allocated an entire MPI node.
Running a large job
Below shows how to setup a slurm job script to run a large job. This script uses setup_hosts_openmpi.sh
(also provided below) to assign a single node to SMD0 (see diagram above) and distribute all other tasks (EB, BD, & SRV) to the rest of available nodes. After source setup_hosts_openmpi.sh
, you can use $PS_N_RANKS and $PS_HOST_FILE in your mpirun command.
Note on no. of ranks available: on S3DF, there are 120 cores per compute node. The below example asks for 3 nodes (120 x 3 = 360 ranks in total) will only have 120 x (3 - 1) + 1 = 241 ranks available. This is because all 120 ranks on the first node is fully reserved for SMD0.
#!/bin/bash #SBATCH --partition=milano #SBATCH --job-name=run_large_psana2 #SBATCH --output=output-%j.txt #SBATCH --error=output-%j.txt #SBATCH --nodes=3 #SBATCH --exclusive #SBATCH --time=10:00 # Configure psana2 parallelization source setup_hosts_openmpi.sh # Run your job with the hostfile flag mpirun --hostfile $PS_HOST_FILE python test_mpi.py
Here is the `setup_hosts_openmpi.sh`. You can create this script in your job folder.
############################################################ # First node must be exclusive to smd0 # * For openmpi, slots=1 must be assigned to the first node. ############################################################ # Get list of hosts by expand shorthand node list into a # line-by-line node list host_list=$(scontrol show hostnames $SLURM_JOB_NODELIST) hosts=($host_list) # Write out to host file by putting rank 0 on the first node # Note that is PS_N_TASKS_PER_NODE is set, this limit the no. # of cores per node. host_file="slurm_host_${SLURM_JOB_ID}" for i in "${!hosts[@]}"; do if [[ "$i" == "0" ]]; then echo ${hosts[$i]} slots=1 > $host_file else if [[ -z "${PS_N_TASKS_PER_NODE}" ]]; then echo ${hosts[$i]} >> $host_file else echo ${hosts[$i]} slots=${PS_N_TASKS_PER_NODE} >> $host_file fi fi done # Export hostfile for mpirun export PS_HOST_FILE=$host_file
If your application use a lot of memory or is I/O limited (~5GB/s per node), you can limit no. of cores per node by setting the environment variable PS_N_TASKS_PER_NODE as following:
#!/bin/bash #SBATCH --partition=milano #SBATCH --job-name=run_large_psana2 #SBATCH --output=output-%j.txt #SBATCH --error=output-%j.txt #SBATCH --nodes=2 #SBATCH --exclusive #SBATCH --time=10:00 # Configure psana2 parallelization export PS_N_TASKS_PER_NODE=60 source setup_hosts_openmpi.sh # Run your job with the hostfile flag mpirun --hostfile $PS_HOST_FILE python test_mpi.py
Performance Tuning Tips
(see MPITaskStructureToSupportScaling to get a sense for where the parameters described here are used)
To get improved performance when running large jobs consider the following options. It is not straightforward to set these parameters optimally for an arbitrary analysis job so some study is required for specific applications. In some cases we can offer guidelines.
- understand the CPU usage of your big-data ("BD") processing loop to make sure the bottleneck isn't "user code". this can typically be done by running on 1 core.
- increase the environment variable PS_EB_NODES to be larger than its default of 1. For many analyses, a number that is 1/16 of the number of big data cores has been good. This variable, along with the PS_SRV_NODES variable described next determines how many cores are used for which task in psana (see MPITaskStructureToSupportScaling).
- if you're writing a large amount of hdf5 data increase the environment variable PS_SRV_NODES to have more cores writing hdf5 files. It is difficult here to provide guidance on the number since it depends on the application
- Batch size parameters:
- Set environment variable PS_SMD_N_EVENTS larger to increase the number of events that get sent in a "batch" when transmitting data from SMD0 cores through to EB cores. Defaults to 10000.
- Set DataSource kwarg batch_size to decrease the number of events sent from EB to BD cores (this kwarg is ignored in the integrating-detector case). Defaults to 1000.
- When setting up the smalldata, increase the number of events that get sent in a batch when transmitting data from BD cores to SRV cores by setting the batch_size kwarg in the DataSource.smalldata() call. Defaults to 1000.
- filtering events using small data on the "EB" cores can in some cases reduce performance because the "BD" cores need to do more disk random access (see MPITaskStructureToSupportScaling). The performance impact (negative/positive) of filtering has to be evaluated for each case.
psana also has some grafana monitoring built in that, with expert help, can be used to identify bottlenecks in an analysis. Contact pcds-ana-l@slac.stanford.edu for guidance.
Sorting Small HDF5 File
The small hdf5 file is likely unsorted due to parallelism in psana2. In case your output h5 is large (> 1 billion records), you can use timestamp_sort_h5
tool by submitting the following job:
#!/bin/bash #SBATCH --partition=milano #SBATCH --job-name=timestamp_sort_h5 #SBATCH --output=output-%j.txt #SBATCH --error=output-%j.txt #SBATCH --nodes=1 #SBATCH --exclusive #SBATCH --time=10:00 timestamp_sort_h5 /sdf/data/lcls/drpsrcf/ffb/users/monarin/h5/mylargeh5.h5 /sdf/data/lcls/drpsrcf/ffb/users/monarin/h5/output/result.h5
Note the first required argument is the unsorted hdf5 file and the second is the desired output file. There are other optional arguments, which can be access by running timestamp_sort_h5 --help.
Why Is My Detector Object "None" On Some MPI Ranks?
In psana2 all mpi ranks execute the same code, but not all ranks can create a Detector object since there are “hidden” MPI helper-ranks shown in this diagram: MPITaskStructureToSupportScaling. For those helper-ranks the Detector object will be None. Those helper-ranks won’t enter psana2 loops over runs/steps/events, so as long as you only use a Detector object inside loops your code will run correctly without any special checks. However, if you use the Detector object outside those loops you must check that the Detector object is not None before calling any methods.
Historical background: we went back and forth about how to manage the MPI helper-ranks. The alternative would have been to use callbacks instead of run/step/event loops to more effectively hide the helper-ranks from user code, but callbacks would have been user-unfriendly in a different way: writing loops is a more natural coding approach for many users. We felt the loop approach (with more fragile Detector objects that can be None) was the lesser of two evils.
Running psplot_live
From rix-daq node, source psana2 environment then run:
(ps-4.6.3) rix-daq:scripts> psplot_live ANDOR Python 3.9.16 | packaged by conda-forge | (main, Feb 1 2023, 21:39:03) Type 'copyright', 'credits' or 'license' for more information IPython 8.14.0 -- An enhanced Interactive Python. Type '?' for help. In [1]:
The above command activates psplot_live that listens to your analysis jobs (with plotting) and provides an interactive session. You can use the interactive session to list, kill, and reactivate plots. Note that to monitor more than one plot, you can use ' ' (space) to separate each plot name (e.g. psplot_live ANDOR ATMOPAL
).
Below shows an example of analysis (monitoring two plots: ANDOR and ATMOPAL) and job submission scripts that communicate directly to psplot_live. Note that if you are converting python script that works with psplot (no live), the main difference is shown on line 25 where you have to set psmon_publish=publish
as an additional DataSource argument. There may be other differences that need to be changed. Please let us know in this case.
from psana import DataSource from psmon import publish from psmon.plots import Image,XYPlot import os, sys, time from mpi4py import MPI comm = MPI.COMM_WORLD rank = comm.Get_rank() size = comm.Get_size() os.environ['PS_SRV_NODES']='1' os.environ['PS_SMD_N_EVENTS']='1' # passing exp and runnum exp=sys.argv[1] runnum=int(sys.argv[2]) mount_dir = '/sdf/data/lcls/drpsrcf/ffb' #mount_dir = '/cds/data/drpsrcf' xtc_dir = os.path.join(mount_dir, exp[:3], exp, 'xtc') ds = DataSource(exp=exp,run=runnum,dir=xtc_dir,intg_det='andor_vls', batch_size=1, psmon_publish=publish, detectors=['timing','andor_vls','atmopal'], max_events=0, live=True) def my_smalldata(data_dict): if 'unaligned_andor_norm' in data_dict: andor_norm = data_dict['unaligned_andor_norm'][0] myplot = XYPlot(0,f"Andor (normalized) run:{runnum}",range(len(andor_norm)),andor_norm) publish.send('ANDOR',myplot) if 'sum_atmopal' in data_dict: atmopal_sum = data_dict['sum_atmopal'] myplot = XYPlot(0,f"Atmopal (sum) run:{runnum}",range(len(atmopal_sum)), atmopal_sum) publish.send('ATMOPAL', myplot) for myrun in ds.runs(): andor = myrun.Detector('andor_vls') atmopal = myrun.Detector('atmopal') timing = myrun.Detector('timing') smd = ds.smalldata(filename='mysmallh5.h5',batch_size=5, callbacks=[my_smalldata]) norm = 0 ndrop_inhibit = 0 sum_atmopal = None cn_andor_events = 0 cn_intg_events = 0 ts_st = None for nstep,step in enumerate(myrun.steps()): print('step:',nstep) for nevt,evt in enumerate(step.events()): if ts_st is None: ts_st = evt.timestamp cn_intg_events += 1 andor_img = andor.raw.value(evt) atmopal_img = atmopal.raw.image(evt) if atmopal_img is not None: if sum_atmopal is None: sum_atmopal = atmopal_img[0,:] else: sum_atmopal += atmopal_img[0,:] # also need to check for events missing due to damage # (or compare against expected number of events) ndrop_inhibit += timing.raw.inhibitCounts(evt) smd.event(evt, mydata=nevt) # high rate data saved to h5 # need to check Matt's new timing-system data on every # event to make sure we haven't missed normalization # data due to deadtime norm+=nevt # fake normalization if andor_img is not None: cn_andor_events += 1 #print('andor data on evt:',nevt,'ndrop_inhibit:',ndrop_inhibit) print(f'BD{rank-1}: #andor_events: {cn_andor_events} #intg_event:{cn_intg_events} st: {ts_st} en:{evt.timestamp}') # check that the high-read readout group (2) didn't # miss any events due to deadtime if ndrop_inhibit[2]!=0: print('*** data lost due to deadtime') # need to prefix the name with "unaligned_" so # the low-rate andor dataset doesn't get padded # to align with the high rate datasets smd.event(evt, mydata=nevt, unaligned_andor_norm=(andor_img/norm), sum_atmopal=sum_atmopal) norm=0 ndrop_inhibit=0 sum_atmopal = None cn_intg_events = 0 ts_st = None smd.done()
And an sbatch script:
#!/bin/bash #SBATCH --partition=milano #SBATCH --account=<your account here> #SBATCH --job-name=run_andor #SBATCH --nodes=1 #SBATCH --ntasks=5 #SBATCH --output=output-%j.txt #SBATCH --error=output-%j.txt ##SBATCH --exclusive #SBATCH -t 00:05:00 t_start=`date +%s` exp=$1 runnum=$2 mpirun -n 5 python run_andor.py $exp $runnum t_end=`date +%s` echo PSJobCompleted TotalElapsed $((t_end-t_start))
After creating the above two scripts, you can submit the job with:
sbatch submit_run_andor.sh rixc00121 121
You should be able to see the psplot(s) pop up automatically,
To view list of psplots,
In [1]: ls() ID SLURM_JOB_ID EXP RUN NODE PORT STATUS 1 43195784 rixc00121 121 sdfmilan005.sdf.slac.stanford.edu 12323 PLOTTED
If you close the plot window, the process is automatically removed from the list:
In [2]: ls() ID SLURM_JOB_ID EXP RUN NODE PORT STATUS
You can submit your analysis job again (any increasing run numbers are always monitored). For old job (previously submitted run number from the same node and same psplot port), they will NOT be shown automatically (STATUS: RECEIVED). You can reactivate them using show(ID) command.
In [2]: ls() ID SLURM_JOB_ID EXP RUN NODE PORT STATUS 1 3275205 rixc00121 121 sdfiana001.sdf.slac.stanford.edu 12323 RECEIVED In [3]: show(1) Main received {'msgtype': 3} from db-zmq-server
To kill a plot, use kill(ID) or kill_all() to kill all plots.
In [5]: kill(1) In [6]: ls() ID SLURM_JOB_ID EXP RUN NODE PORT STATUS
Psana2 Grafana Bottleneck Identifier
You can view a summary of psana2 processing rate bottlenecks for your analysis job via this Grafana page: psana2. In general it can be non-trivial to use these plots to identify bottlenecks, so reach out to data-systems team members if you need guidance.
Instructions and tips:
- Add monitor=True to DataSource keyword argument and submit your job
- The default display is set to the latest slurm jobid (submitted by all users). You can enter your slurm jobid and hit enter to update the display.
- Psana2 also allows custom jobid. Add prom_jobid="Your JobID" to DataSource keyword argument to set your own jobid.
Some explanations about the plots
- Total Processing Rate for EB (EventBuilder) and BD (BigData) groups is summed over all cores assigned for each group.
- BD Total Processing Rate refers to only bigdata-fetching rate and BD Analysis Rate refers to only user-code analysis rate.
- Four different Wait Time plots:
- SMD0 Wait Time refers to idle time on SMD0 core because it has to wait for EB cores. Large values observed here mean you may need to increase no. of EB cores (PS_EB_NODES environment variable: default is 1)
- EB-SMD0 Wait Time refers to the time each EB core has to wait for SMD0. Large values mean SMD0 is slow. You may need to run with advance setup where SMD0 is on a separate node (see instructions here)
- EB-BD Wait Time refers to the time each EB cores has to wait for its own client BD cores. Large values observed mean you may need to increase no. of BD cores.
- BD Wait Time refers to the time that each BD cores has to wait for its EB. Large value means EB is slow and you may need to increase no. of EB cores.
Some example plots from a TMO analysis of data relevant for the MRCO end station running. Here one can see the analysis running at 60kHz (30M events in 7.5 minutes) on 256 cores with the bottleneck being the number of BD cores, since the "EB-BD Wait Time" plot shows large numbers (EB cores waiting for as much as 1-2 seconds for a BD core to become available). You can see the user's slurm job id has been entered in the upper left hand corner of the grafana page. This job performance data is currently retained for some days, but not indefinitely.
Zooming in on the rate plot:
The full set of plots: