Introduction
A general introduction to LCLS data analysis (psana1, ami2, jupyter, slurm) can be viewed in this 53 minutes video here. This is for psana1 (i.e. LCLS1) but the user interface is quite similar to psana2 (for LCLS2) even if the internals are significantly improved for psana2:
JupyterHub
psana2 is also available in JupyterHub here in the kernel named "LCLS-II py3": https://pswww.slac.stanford.edu/jupyterhub/hub/home
Environment
To obtain the environment to run psana2, execute the following:
source /cds/sw/ds/ana/conda2/manage/bin/psconda.sh
Note that LCLS-II psana is not compatible with LCLS-I psana, so environments must activate one or the other, but not both. Since LCLS-I and LCLS-II live in different conda installations it is necessary to start a fresh session when switching between the two.
Public Practice Data
Publicly accessible practice data is located in the directory /cds/data/psdm/prj/public01/xtc. Use of this data requires the additional "dir=" keyword to the DataSource object.
Experiment | Run | Comment |
---|---|---|
tmoc00118 | 222 | Generic TMO dark data |
rixx43518 | 34 | A DAQ "fly scan" of motor (see ami#FlyScan:MeanVs.ScanValue) |
rixx43518 | 45 | A DAQ "step scan" of two motors |
rixl1013320 | 63 | Rix stepping delay scan of both Vitara delay and ATM delay stage (lxt_ttc scan) at a single mono energy |
rixl1013320 | 93 | Rix continuous mono scan with laser on/off data at a single time delay |
rixx1003821 | 55 | An infinite sequence with two slow andors running at different rates |
rixx1003821 | 68 | A finite burst sequence with one andor |
Detector Names
Use this command to see non-epics detector names (see "Detector Interface" example below):
(ps-4.1.0) psanagpu101:lcls2$ detnames exp=tmoc00118,run=222,dir=/cds/data/psdm/prj/public01/xtc --------------------- Name | Data Type --------------------- epicsinfo | epicsinfo timing | raw hsd | raw gmdstr0 | raw etc.
From within python the above information can be obtained as a dictionary using "run.detinfo".
Use the same command with the "-e" option to see epics detector names (see "Detector Interface" example below). These are slowly varying variables (like temperatures) that are not strongly time correlated with the regular per-shot detector data:
(ps-4.5.10) psanagpu101:~$ detnames -e exp=tmoc00118,run=222,dir=/cds/data/psdm/prj/public01/xtc | more ----------------------------------------------------------------- Detector Name | Epics Name ----------------------------------------------------------------- StaleFlags | Keithley_Sum | EM2K0:XGMD:HPS:KeithleySum IM2K4_XrayPower | IM2K4:PPM:SPM:VOLT_RBV IM3K4_XrayPower | IM3K4:PPM:SPM:VOLT_RBV IM4K4_XrayPower | IM4K4:PPM:SPM:VOLT_RBV etc.
Note that either of these names ("Detector Name" or "Epics Name") can be used with the Detector Interface described below. From within python the above information can be obtained as a dictionary using "run.epicsinfo".
Using the Detector Interface
Standard (per-shot) detectors and the slower epics variables can be accessed as shown here using the names discovered with the commands above. NOTE: the slow epics variables are polled in the DAQ at 1Hz, so for the first one second of a run they will typically return "None". NOTE: You can use tab-completion in ipython (see https://ipython.org/ipython-doc/3/interactive/tutorial.html) or the jupyter notebook to explore what you can do with the various detector objects. NOTE: epics variables use a different syntax than other detectors (see example here):
from psana import DataSource ds = DataSource(exp='tmoc00118', run=222, dir='/cds/data/psdm/prj/public01/xtc'\ , max_events=100) myrun = next(ds.runs()) opal = myrun.Detector('tmo_atmopal') epics_det = myrun.Detector('IM2K4_XrayPower') for evt in myrun.events(): img = opal.raw.image(evt) epics_val = epics_det(evt) # epics variables have a different syntax # check for missing data if img is None: print('no image') else: print(img.shape) if epics_val is None: print('no epics value') else: print(epics_val)
Example Script Producing Small HDF5 File
You can run this script with MPI: mpirun -n 6 python example.py
It also works on one core with: python example.py (useful for debugging). See MPI rank/task diagram here to understand what different mpi ranks are doing.
This mechanism by defaults produces "aligned" datasets where missing values are padded (with NaN's for floats, and -99999 for integers). To create an unaligned dataset (without padding) prefix the name of the variable with "unaligned_".
NOTE: in addition to the hdf5 you specify as your output file ("my.h5" below) you will see other h5 files like "my_part0.h5", one for each of the cores specified in PS_SRV_NODES. The reason for this is that each of those cores writes its own my_partN.h5 file: for LCLS2 it will be important for performance to write many files. The "my.h5" file is actually quite small, and uses a new HDF5 feature called a "Virtual DataSet" (VDS) to join together the various my_partN.h5 files. Also note that events in my.h5 will not be in time order. If you copy the .h5 somewhere else, you need to copy all of them.
NOTE: In python, if you want to exit early you often use a "break" statement. When running psana-python with mpi parallelization, however, not all cores will see this statement, and the result will be that your job will hang at the end. To avoid this use the "max_events" keyword argument to DataSource (see example below).
NOTE: The call below to smd.sum() (or min()/max()) only accepts numpy arrays or None. This requirement allows cores that don't see any events to not cause errors when summing across cores. All cores must call .sum() or the process will hang (a property of the underlying mpi "Reduce" call).
from psana import DataSource import numpy as np import os # OPTIONAL callback with "gathered" small data from all cores. # usually used for creating realtime plots when analyzing from # DAQ shared memory. Called back on each SRV node. def my_smalldata(data_dict): print(data_dict) # sets the number of h5 files to write. 1 is sufficient for 120Hz operation # optional: only needed if you are saving h5. os.environ['PS_SRV_NODES']='1' ds = DataSource(exp='tmoc00118', run=222, dir='/cds/data/psdm/prj/public01/xtc', max_events=10) # batch_size is optional. specifies how often the dictionary of small # user data is gathered. if you write out large data (NOT RECOMMENDED) it needs to be set small. smd = ds.smalldata(filename='mysmallh5.h5', batch_size=5, callbacks=[my_smalldata]) for run in ds.runs(): opal = run.Detector('tmo_opal1') ebeam = run.Detector('ebeam') runsum = np.zeros((3),dtype=float) # beware of datatypes when summing: can overflow for evt in run.events(): img = opal.raw.image(evt) photonEnergy = ebeam.raw.ebeamPhotonEnergy(evt) # important: always check for missing data if img is None or photonEnergy is None: continue evtsum = np.sum(img) # pass either dictionary or kwargs smd.event(evt, evtsum=evtsum, photonEnergy=photonEnergy) runsum += img[0,:3] # local sum on one mpi core # optional summary data for whole run if smd.summary: tot_runsum = smd.sum(runsum) # sum (or max/min) across all mpi cores. Must be numpy array or None. # pass either dictionary or kwargs smd.save_summary({'sum_over_run' : tot_runsum}, summary_int=1) smd.done()
Full-Featured Example with Callbacks and Detector Selection
You can run this script with MPI the same way as shown in the previous example: mpirun -n 6 python example.py
These additional arguments for DataSource were added to this example
- detectors=['detname1', 'detname2',]
List of detectors to be read from the disk. If you only need a few detectors for analysis, list their names here. The reading process will be faster since unused detector data is not read.
- smd_callback=smd_callback
You can use this callback to 1) decide which which event to read (yield evt) the large event data and 2) set the destination (rank id) to send this event to.
- small_xtc=['detname1', 'detname2']
List of detectors to be used in smd_callback()
from psana import DataSource import numpy as np import os # OPTIONAL callback with "gathered" small data from all cores. # usually used for creating realtime plots when analyzing from # DAQ shared memory. Called back on each SRV node. def my_smalldata(data_dict): print(data_dict) # Use this function to decide if you want to fetch large data for this event # and/or direct an event to process on a particular 'rank' # (this rank number should be between 1 and total no. of ranks - 3 # since 3 ranks are reserved). If this detector is needed, make sure # to define this detector in as_smds argument for DataSource (see below). # All epics and scan detectors are available automatically. def smd_callback(run): opal = run.Detector('tmo_opal1') epics_det = run.Detector('IM2K4_XrayPower') n_bd_nodes = 3 # for mpirun -n 6, 3 ranks are reserved so there are 3 bd ranks left for i_evt, evt in enumerate(run.events()): img = opal.raw.image(evt) epics_val = epics_det(evt) dest = (evt.timestamp % n_bd_nodes) + 1 if epics_val is not None: # Set the destination (rank no.) where this event should be sent to evt.set_destination(dest) yield evt # sets the number of h5 files to write. 1 is sufficient for 120Hz operation # optional: only needed if you are saving h5. os.environ['PS_SRV_NODES']='1' ds = DataSource(exp='tmoc00118', run=222, dir='/cds/data/psdm/prj/public01/xtc', max_events = 10, detectors = ['epicsinfo', 'tmo_opal1', 'ebeam'], # only reads these detectors (faster) smd_callback= smd_callback, # smalldata callback (see notes above) small_xtc = ['tmo_opal1'], # detectors to be used in smalldata callback ) # batch_size is optional. specifies how often the dictionary of small # user data is gathered. if you write out large data (NOT RECOMMENDED) it needs to be set small. smd = ds.smalldata(filename='mysmallh5.h5', batch_size=5, callbacks=[my_smalldata]) for run in ds.runs(): opal = run.Detector('tmo_opal1') ebeam = run.Detector('ebeam') runsum = np.zeros((3),dtype=float) # beware of datatypes when summing: can overflow for evt in run.events(): img = opal.raw.image(evt) photonEnergy = ebeam.raw.ebeamPhotonEnergy(evt) if img is None or photonEnergy is None: continue evtsum = np.sum(img) # pass either dictionary or kwargs smd.event(evt, evtsum=evtsum, photonEnergy=photonEnergy) runsum += img[0,:3] # local sum on one mpi core # optional summary data for whole run if smd.summary: tot_runsum = smd.sum(runsum) # sum (or max/min) across all mpi cores. Must be numpy array or None. # pass either dictionary or kwargs smd.save_summary({'sum_over_run' : tot_runsum}, summary_int=1) smd.done()
Excluding Detectors
In addition to only including specified detectors with the "detectors=[]" kwarg (see previous section) you can exclude detectors when creating a new datasource with xdetectors=[]
argument.
# Create a datasource and tell it to exclude two detectors from psana import DataSource ds = DataSource(exp='tmoc00118', run=222, dir='/cds/data/psdm/prj/public01/xtc', xdetectors = ['hsd'], # all other detectors will be available max_events = 10) run = next(ds.runs()) # Create these detectors normally opal = run.Detector('tmo_opal1') ebeam = run.Detector('ebeam') for i, evt in enumerate(run.events()): img = opal.raw.image(evt) photonEnergy = ebeam.raw.ebeamPhotonEnergy(evt) print(f'got evt={i} ts={evt.timestamp} img={img.shape} {photonEnergy=}')
Note that other detectors that are also part of the excluded stream will also be excluded. You'll see a warning message like this one:
Warning: Stream-8 has one or more detectors matched with the excluded set. All detectors in this stream will be excluded.
If you're trying to access detectors that are part of the excluded list, you'll see DetectorNameError
message below:
Traceback (most recent call last): File "/cds/home/m/monarin/lcls2/psana/psana/tests/dummy.py", line 12, in <module> ebeam = run.Detector('ebeam') File "/cds/home/m/monarin/lcls2/psana/psana/psexp/run.py", line 119, in Detector raise DetectorNameError(err_msg) psana.psexp.run.DetectorNameError: No available detector class matched with ebeam. If this is a new detector/version, make sure to add new class in detector folder.
Accessing Event Codes
LCLS1-style event codes can be accessed using the "timing" detector:
from psana import DataSource ds = DataSource(exp='tmoc00118',run=222,dir='/cds/data/psdm/prj/public01/xtc') myrun = next(ds.runs()) timing = myrun.Detector('timing') for nevt,evt in enumerate(myrun.events()): allcodes = timing.raw.eventcodes(evt) # event code 15 fires at 1Hz, and this exp/run had 10Hz triggers print('event code 15 present:',allcodes[15]) if nevt>20: break
Running in Parallel
Instructions for submitting batch jobs to run in parallel are here: Batch Nodes And Queues
Analyzing Scans
To get a list of the scan variables, use the following command:
(ps-4.3.2) psanagpu102:lcls2$ detnames -s exp=rixx43518,run=45,dir=/cds/data/psdm/prj/public01/xtc -------------------------- Name | Data Type -------------------------- motor1 | raw motor2 | raw step_value | raw step_docstring | raw -------------------------- (ps-4.3.2) psanagpu102:lcls2$
From within python the above information can be obtained as a dictionary using "run.scaninfo".
Note that "step_value"/"step_docstring" are convenience values defined by the DAQ to be a single value/string that is supposed to represent what has happened on a step, for use in plots/printout, since it's not easy to plot, for example, vs. multiple motor positions in a complex scan.
Code similar to this can be used to access the above scan variables:
from psana import DataSource ds = DataSource(exp='rixx43518',run=45,dir='/cds/data/psdm/prj/public01/xtc') myrun = next(ds.runs()) motor1 = myrun.Detector('motor1') motor2 = myrun.Detector('motor2') step_value = myrun.Detector('step_value') step_docstring = myrun.Detector('step_docstring') for step in myrun.steps(): print(motor1(step),motor2(step),step_value(step),step_docstring(step)) for evt in step.events(): pass
Running From Shared Memory
psana2 scripts can be run in real-time on shared memory. There is some extra complexity compared to writing python code for the real-time GUI ami, for example:
- you have to create plots and handle when when they update
- you have to worry about data getting out of time order
- you have to handle missing data
- you have to reset plots on run boundaries (if that's the behavior you want)
- you have to write code to handle the data gathered from multiple mpi cores (not required if you want to run on 1 core)
- be aware that the shared memory python scripts "steal" event statistics from other shared memory instances (e.g. ami)
but you get to take advantage of the power/flexibility of python. Look at the DAQ .cnf file (e.g. /cds/group/pcds/dist/pds/rix/scripts/rix.cnf) to see what the name of the node is running the shared memory server ("monReqServer"). To access those nodes you need to be on a special permissions list (email pcds-ana-l@slac.stanford.edu to request). You can find the name of the shared memory (hutch name is typically used) either by looking on the .cnf file (the "-P" option to monReqServer executable) or doing a command like this:
drp-neh-cmp003:~$ ls /dev/shm/ PdsMonitorSharedMemory_tmo drp-neh-cmp003:~$
For this output, you would use "DataSource(shmem='tmo')".
Note that one can develop the shared memory scripts (including real-time plots) using offline data, then change the DataSource line to run them in real-time.
Typically psmon is used for publishing results to realtime plots in the callback, publishing updates every "N" events. See this link for psmon examples: Visualization Tools.
When running multi-core with mpi one has to use the small data "callbacks" kwarg to receive the data gathered from all nodes. An example multi-core script is below (also works on 1 core). A pure 1-core script is simpler (no data "gathering" needed: can just be a loop over events with plot updates every N events). For multi-core, this can be run with "mpirun -n 3 python <scriptname>.py" and the two plots can be viewed on the same node with "psplot -a 1 OPALSUMS OPALIMG" (see Visualization Tools for "psplot" options). It can also be run in the usual offline manner by changing the DataSource line. It is written in a way to keep running even when the DAQ is restarted:
import os import numpy as np from psana import DataSource from psmon import publish from psmon.plots import XYPlot,Image from collections import deque from mpi4py import MPI numworkers = MPI.COMM_WORLD.Get_size()-1 if numworkers==0: numworkers=1 # the single core case (no mpi) os.environ['PS_SRV_NODES']='1' # one mpi core gathers/plots data mydeque=deque(maxlen=25) lastimg=None numevents=0 numendrun=0 def my_smalldata(data_dict): # one core gathers all data from mpi workers global numevents,lastimg,numendrun,mydeque if 'endrun' in data_dict: numendrun+=1 if numendrun==numworkers: print('Received endrun from all workers. Resetting data.') numendrun=0 numevents=0 mydeque=deque(maxlen=25) return numevents += 1 if 'opal' in data_dict: lastimg = data_dict['opal'] mydeque.append(data_dict['opalsum']) if numevents%100==0: # update plots around 1Hz print('event:',numevents) myxyplot = XYPlot(numevents, "Last 25 Sums", np.arange(len(mydeque)), np .array(mydeque), formats='o') publish.send("OPALSUMS", myxyplot) if lastimg is not None: # opal image is not sent all the time myimgplot = Image(numevents, "Opal Image", lastimg) publish.send("OPALIMG", myimgplot) while 1: # mpi worker processes ds = DataSource(shmem='rix') smd = ds.smalldata(batch_size=5, callbacks=[my_smalldata]) for myrun in ds.runs(): opal = myrun.Detector('atmopal') for nevt,evt in enumerate(myrun.events()): mydict={} image = opal.raw.image(evt) if image is None: continue # do as much work as possible in the workers # don't send large data all the time, if possible if nevt%10==0: mydict['opal']=image mydict['opalsum']=np.sum(image) smd.event(evt,mydict) smd.event(evt,{'endrun':1}) # tells gatherer to reset plots
When running multi-node mpi there are also some complexities propagating the environment to remote nodes: the way to address that is described in this link.
Running in LIVE mode
Here's a sample python script, how you can config datasource to run in live mode:
# Use environment variable to specify how many attempts, # the datasource should wait for file reading (1 second wait). # In this example, we set it to 30 (wait up 30 seconds). import os os.environ['PS_SMD_MAX_RETRIES'] = '30' # Create a datasource with live flag from psana import DataSource ds = DataSource(exp='tmoc00118', run=222, dir='/cds/data/psdm/prj/public01/xtc', live = True, max_events = 10) # Looping over your run and events as usual # You'll see "wait for an event..." message in case # The file system writing is slower than your analysis run = next(ds.runs()) for i, evt in enumerate(run.events()): print(f'got evt={i} ts={evt.timestamp}')
Note that this script is also available in tests folder in lcls2 repository. You can run the script by:
(ps-4.3.2) psanagpu109 tests $ mpirun -n 3 python livemode.py
Jump to events
Psana2 can jump to particular events when given a list of timestamps. In the example below, we only want to process these four timestamps. Note that timestamp array should be formatted to np.uint64.
from psana import DataSource import numpy as np timestamps = np.array([4194783241933859761,4194783249723600225,4194783254218190609,4194783258712780993], dtype=np.uint64) ds = DataSource(exp='tmoc00118', run=222, dir='/cds/data/psdm/prj/public01/xtc', timestamps=timestamps) myrun = next(ds.runs()) opal = myrun.Detector('tmo_atmopal') for nevt, evt in enumerate(myrun.events()): img = opal.raw.image(evt) print(nevt, evt.timestamp, img.shape)
Here is the output
(ps-4.5.5) monarin@psanagpu111 (master *) psana2 $ python test_jump.py 0 4194783241933859761 (1024, 1024) 1 4194783249723600225 (1024, 1024) 2 4194783254218190609 (1024, 1024) 3 4194783258712780993 (1024, 1024)
Event Times In Local Time Zone
import datetime as dt from psana import DataSource ds = DataSource(exp='tmoc00118', run=222, dir='/cds/data/psdm/prj/public01/xtc') myrun = next(ds.runs()) for nevt,evt in enumerate(myrun.events()): if nevt>3: break t = evt.datetime() localt = t.replace(tzinfo=dt.timezone.utc).astimezone(tz=None) print(localt.strftime('%H:%M:%S'))
Running with Integrating Detector
Here's a sample python script, how you can analyze a run with an integrating detector (in this example, it's 'andor'). For more detail about the technical design, visit this page.
Two keys variables when running in integrating-detector mode: PS_SMD_N_EVENTS (and environment variable) which is how many events SMD0 will send to each EB core, and the batch_size kwarg which controls how many events the EB cores send to the BD cores. When running integrating detectors then the "units" of these two variables change from being "high-rate events" to being "low-rate integrating-detector events". batch_size should be set to less than PS_SMD_N_EVENTS. The max_events kwarg to DataSource also changes to count integrating-detector events. Unfortunately, the batch_size kwarg to the DataSource.smalldata() call does NOT count integrating-detector events. We would like to simplify this in the future.
NOTE: part of psana ("SMD0") has buffers that need to be large enough to accommodate at least one event from the integrating detector. If PS_SMD_N_EVENTS is set too large and this is not possible then an error message is printed and psana exits.
# The PS_SMD_N_EVENTS should be set to a small number (e.g. 1) # since all other events which are part of this intg. event will be sent # in the same batch. import os os.environ['PS_SMD_N_EVENTS'] = '1' batch_size = 1 from psana import DataSource ds = DataSource(exp='xpptut15', run=1, dir='/cds/data/psdm/prj/public01/xtc/intg_det', intg_det='andor', batch_size=batch_size) run = next(ds.runs()) hsd = run.Detector('hsd') andor = run.Detector('andor') # Test calculating sum of the hsd for each integrating event. sum_hsd = 0 for i_evt, evt in enumerate(run.events()): hsd_calib = hsd.raw.calib(evt) andor_calib = andor.raw.calib(evt) # Keep summing the value of the other detector (hsd in this case) sum_hsd += np.sum(hsd_calib[:])/np.prod(hsd_calib.shape) # When an integrating event is found, print out and reset the sum variable if andor_calib is not None: val_andor = np.sum(andor_calib[:])/np.prod(andor_calib.shape) print(f'i_evt: {i_evt} andor: {val_andor} sum_hsd:{sum_hsd}') sum_hsd = 0
RIX Example: Deadtime and Integrating Detectors
Sometimes data for a particular shot can not be read out because DAQ data buffers are unavailable. This is called "deadtime". Deadtime can make it impossible to reliably use high-rate shots to perform operations like normalizations for lower-rate integrating detectors. Matt Weaver is adding counters to the timing system so the DAQ can count how many shots whose data has been dropped due to deadtime so experimenters can make a decision about whether there is sufficient data for a normalization-style analysis.
This example demonstrates (conceptually!) how to do the normalization, and view the results in a psmon plot (use the command "psplot ANDOR" on the same node). Run this script either with "python andor.py" or "mpirun -n 5 andor.py":
from psana import DataSource from psmon import publish from psmon.plots import Image,XYPlot import os from mpi4py import MPI comm = MPI.COMM_WORLD rank = comm.Get_rank() size = comm.Get_size() # The PS_SMD_N_EVENTS should be set to a small number (e.g. 1) # since all other events which are part of this intg. event will be sent # in the same batch. os.environ['PS_SMD_N_EVENTS'] = '1' os.environ['PS_SRV_NODES']='1' if rank==4: # hack for now to eliminate use of publish.local below publish.init() # we will remove this for batch processing and use "psplot" instead # publish.local = True def my_smalldata(data_dict): if 'unaligned_andor_norm' in data_dict: andor_norm = data_dict['unaligned_andor_norm'][0] myplot = XYPlot(0,"Andor (normalized)",range(len(andor_norm)),andor_norm ) publish.send('ANDOR',myplot) ds = DataSource(exp='rixx1003821',run=46,dir='/cds/data/psdm/prj/public01/xtc',intg_det='andor_vls',batch_size=1) for myrun in ds.runs(): andor = myrun.Detector('andor_vls') timing = myrun.Detector('timing') smd = ds.smalldata(filename='mysmallh5.h5',batch_size=1000, callbacks=[my_smalldata]) norm = 0 ndrop_inhibit = 0 for nstep,step in enumerate(myrun.steps()): print('step:',nstep) for nevt,evt in enumerate(step.events()): andor_img = andor.raw.value(evt) # also need to check for events missing due to damage # (or compare against expected number of events) ndrop_inhibit += timing.raw.inhibitCounts(evt) smd.event(evt, mydata=nevt) # high rate data saved to h5 # need to check Matt's new timing-system data on every # event to make sure we haven't missed normalization # data due to deadtime norm+=nevt # fake normalization if andor_img is not None: print('andor data on evt:',nevt,'ndrop_inhibit:',ndrop_inhibit) # check that the high-read readout group (2) didn't # miss any events due to deadtime if ndrop_inhibit[2]!=0: print('*** data lost due to deadtime') # need to prefix the name with "unaligned_" so # the low-rate andor dataset doesn't get padded # to align with the high rate datasets smd.event(evt, mydata=nevt, unaligned_andor_norm=(andor_img/norm)) norm=0 ndrop_inhibit=0
AMI and Integrating Detectors
AMI is also affected by the DAQ deadtime issue described above but, in addition, events can also be thrown away going into the AMI shared memory (this feature is what allows AMI analysis to stay real-time). This can cause additional issues with AMI integrating-detector normalization analyses that try to use high-rate shots to do normalizations for low-rate integrating detectors, since not all high-rate shots are guaranteed to be available. Also, unlike psana, AMI doesn't currently have the idea of shots that "belong together" which will make an AMI normalization-style analysis of an integrating detector impossible.
MPI Task Structure To Support Scaling
psana can scale to allow for high rate analysis. For example, many hdf5 files of small user-defined data (described above in Example Script Producing Small HDF5 File) can be written, one per "SRV" node in the diagram below. The total number of SRV nodes is defined by the environment variable PS_SRV_NODES (defaults to 0). These many hdf5 files are joined by psana into what appears to be one file using the hdf5 "virtual dataset" feature. Similarly, multiple nodes can be used for filtering ("EB" nodes in the diagram below) and multiple nodes can be used to process big data in the main psana event loop ("BD" nodes in the digram below). The one piece that cannot be scaled (currently) to multiple nodes is the SMD0 (SMallData) task, which reads the timestamps and fseek offsets from each tiny .smd.xtc2 file produced by the DAQ (typically one per detector, or one per detector segment, although it can contain more than one segment or detector). This task joins together the relevant data for each shot ("event build") using the timestamp. This SMD0 task is multi-threaded, with one thread for each detector. For highest performance it is important that all SMD0 threads be allocated an entire MPI node.
Running a large job
Below shows how to setup a slurm job script to run a large job. This script uses setup_hosts_openmpi.sh
(also provided below) to assign a single node to SMD0 (see diagram above) and distribute all other tasks (EB, BD, & SRV) to the rest of available nodes. After source setup_hosts_openmpi.sh
, you can use $PS_N_RANKS and $PS_HOST_FILE in your mpirun command.
Note on no. of ranks available: on S3DF, there are 120 cores per compute node. The below example asks for 3 nodes (120 x 3 = 360 ranks in total) will only have
120 x (3 - 1) + 1 = 241 ranks available. This is because all 120 ranks on the first node is fully reserved for SMD0.
#!/bin/bash #SBATCH --partition=milano #SBATCH --job-name=run_large_psana2 #SBATCH --output=output-%j.txt #SBATCH --error=output-%j.txt #SBATCH --nodes=3 #SBATCH --exclusive #SBATCH --time=10:00 # Configure psana2 parallelization source setup_hosts_openmpi.sh # Run your job mpirun -np $PS_N_RANKS --hostfile $PS_HOST_FILE python test_mpi.py
Here is the `setup_hosts_openmpi.sh`. You can create this script in your job folder.
############################################################ # First node must be exclusive to smd0 # * For openmpi, slots=1 must be assigned to the first node. ############################################################ # Get list of hosts by expand shorthand node list into a # line-by-line node list host_list=$(scontrol show hostnames $SLURM_JOB_NODELIST) hosts=($host_list) # Write out to host file by putting rank 0 on the first node host_file="slurm_host_${SLURM_JOB_ID}" for i in "${!hosts[@]}"; do if [[ "$i" == "0" ]]; then echo ${hosts[$i]} slots=1 > $host_file else echo ${hosts[$i]} >> $host_file fi done # Export hostfile for mpirun export PS_HOST_FILE=$host_file # Calculate no. of ranks available in the job export PS_N_RANKS=$(( SLURM_CPUS_ON_NODE * ( SLURM_JOB_NUM_NODES - 1 ) + 1 ))