JupyterHub
psana2 is also available in JupyterHub here in the kernel named "LCLS-II py3": https://pswww.slac.stanford.edu/jupyterhub/hub/home
Environment
To obtain the environment to run psana2, execute the following:
source /cds/sw/ds/ana/conda2/manage/bin/psconda.sh
Note that LCLS-II psana is not compatible with LCLS-I psana, so environments must activate one or the other, but not both.
Public Practice Data
Publicly accessible practice data is located in the directory /cds/data/psdm/prj/public01/xtc. Use of this data requires the additional "dir=" keyword to the DataSource object.
Experiment | Run | Comment |
---|---|---|
tmoc00118 | 222 | Generic TMO dark data |
rixx43518 | 34 | A DAQ "fly scan" of motor (see ami#FlyScan:MeanVs.ScanValue) |
rixx43518 | 45 | A DAQ "step scan" of two motors |
Detector Names
Use this command to see non-epics detector names (see "Detector Interface" example below):
(ps-4.1.0) psanagpu101:lcls2$ detnames exp=tmoc00118,run=222,dir=/cds/data/psdm/prj/public01/xtc --------------------- Name | Data Type --------------------- epicsinfo | epicsinfo timing | raw hsd | raw gmdstr0 | raw etc.
From within python the above information can be obtained as a dictionary using "run.detinfo".
Use the same command with the "-e" option to see epics detector names (see "Detector Interface" example below). These are slowly varying variables (like temperatures) that are not strongly time correlated with the regular per-shot detector data:
(ps-4.3.2) psanagpu102:lcls2$ detnames -e exp=tmoc00118,run=222,dir=/cds/data/psdm/prj/public01/xtc | more ------------------------------------- Name | Data Type ------------------------------------- StaleFlags | raw Keithley_Sum | raw IM2K4_XrayPower | raw IM3K4_XrayPower | raw etc.
From within python the above information can be obtained as a dictionary using "run.epicsinfo".
Using the Detector Interface
Standard (per-shot) detectors and the slower epics variables can be accessed as shown here using the names discovered with the commands above. NOTE: the slow epics variables are polled in the DAQ at 1Hz, so for the first one second of a run they will typically return "None". You can use tab-completion in ipython (see https://ipython.org/ipython-doc/3/interactive/tutorial.html) or the jupyter notebook to explore what you can do with the various detector objects:
from psana import DataSource ds = DataSource(exp='tmoc00118', run=222, dir='/cds/data/psdm/prj/public01/xtc'\ , max_events=100) myrun = next(ds.runs()) opal = myrun.Detector('tmo_atmopal') epics_det = myrun.Detector('IM2K4_XrayPower') for evt in myrun.events(): img = opal.raw.image(evt) epics_val = epics_det(evt) # check for missing data if img is None: print('no image') else: print(img.shape) if epics_val is None: print('no epics value') else: print(epics_val)
Example Script Producing Small HDF5 File
You can run this script with MPI: mpirun -n 6 python example.py
It also works on one core with: python example.py (useful for debugging). See MPI rank/task diagram here to understand what different mpi ranks are doing.
This mechanism by defaults produces "aligned" datasets where missing values are padded (with NaN's for floats, and -99999 for integers). To create an unaligned dataset (without padding) prefix the name of the variable with "unaligned_".
NOTE: in addition to the hdf5 you specify as your output file ("my.h5" below) you will see other h5 files like "my_part0.h5", one for each of the cores specified in PS_SRV_NODES. The reason for this is that each of those cores writes its own my_partN.h5 file: for LCLS2 it will be important for performance to write many files. The "my.h5" file is actually quite small, and uses a new HDF5 feature called a "Virtual DataSet" (VDS) to join together the various my_partN.h5 files. Also note that events in my.h5 will not be in time order. If you copy the .h5 somewhere else, you need to copy all of them.
NOTE: In python, if you want to exit early you often use a "break" statement. When running psana-python with mpi parallelization, however, not all cores will see this statement, and the result will be that your job will hang at the end. To avoid this use the "max_events" keyword argument to DataSource (see example below).
NOTE: The call below to smd.sum() (or min()/max()) only accepts numpy arrays or None. This requirement allows cores that don't see any events to not cause errors when summing across cores.
from psana import DataSource import numpy as np import os # OPTIONAL callback with "gathered" small data from all cores. # usually used for creating realtime plots when analyzing from # DAQ shared memory. Called back on each SRV node. def my_smalldata(data_dict): print(data_dict) # sets the number of h5 files to write. 1 is sufficient for 120Hz operation # optional: only needed if you are saving h5. os.environ['PS_SRV_NODES']='1' ds = DataSource(exp='tmoc00118', run=222, dir='/cds/data/psdm/prj/public01/xtc', max_events=10) # batch_size is optional. specifies how often the dictionary of small # user data is gathered. if you write out large data (NOT RECOMMENDED) it needs to be set small. smd = ds.smalldata(filename='mysmallh5.h5', batch_size=5, callbacks=[my_smalldata]) for run in ds.runs(): opal = run.Detector('tmo_opal1') ebeam = run.Detector('ebeam') runsum = np.zeros((3),dtype=float) # beware of datatypes when summing: can overflow for evt in run.events(): img = opal.raw.image(evt) photonEnergy = ebeam.raw.ebeamPhotonEnergy(evt) # important: always check for missing data if img is None or photonEnergy is None: continue evtsum = np.sum(img) # pass either dictionary or kwargs smd.event(evt, evtsum=evtsum, photonEnergy=photonEnergy) runsum += img[0,:3] # local sum on one mpi core # optional summary data for whole run if smd.summary: tot_runsum = smd.sum(runsum) # sum (or max/min) across all mpi cores. Must be numpy array or None. # pass either dictionary or kwargs smd.save_summary({'sum_over_run' : tot_runsum}, summary_int=1) smd.done()
Full-Featured Example with Callbacks and Detector Selection
You can run this script with MPI the same way as shown in the previous example: mpirun -n 6 python example.py
These additional arguments for DataSource were added to this example
- detectors=['detname1', 'detname2',]
List of detectors to be read from the disk. If you only need a few detectors for analysis, list their names here. The reading process will be faster since unused detector data is not read.
- filter= filter_fn
You can write a filter_fn(evt) callback which returns True or False to include or exclude the event from getting read from disk.
- small_xtc=['detname1', 'detname2']
List of detectors to be used in filter_fn()
- destination=destination
You can write a destination(evt) callback with returns rank id that you want to send this event to.
from psana import DataSource import numpy as np import os # OPTIONAL callback with "gathered" small data from all cores. # usually used for creating realtime plots when analyzing from # DAQ shared memory. Called back on each SRV node. def my_smalldata(data_dict): print(data_dict) # Use this function to decide to keep/discard this event # If this detector is needed, make sure to define this # detector in as_smds argument for DataSource (see below) def filter_fn(evt): run = evt.run() step = run.step(evt) opal = run.Detector('tmo_opal1') img = opal.raw.image(evt) return True # Use this function to direct an event to process on a # particular 'rank'. This function should returns a rank # number between 1 and total no. of ranks - 3 (3 ranks are reserved). def destination(evt): # Note that run, step, and det can be accessed # the same way as shown in filter_fn n_bd_nodes = 3 # for mpirun -n 6, 3 ranks are reserved so there are 3 bd ranks left dest = (evt.timestamp % n_bd_nodes) + 1 return dest # sets the number of h5 files to write. 1 is sufficient for 120Hz operation # optional: only needed if you are saving h5. os.environ['PS_SRV_NODES']='1' ds = DataSource(exp='tmoc00118', run=222, dir='/cds/data/psdm/prj/public01/xtc', max_events = 10, detectors = ['tmo_opal1', 'ebeam'], # only reads these detectors (faster) filter = filter_fn, # filter_fn returns True/False small_xtc = ['tmo_opal1'], # detectors to be used in filter callback destination = destination) # returns rank no. (send this evt to this rank) # batch_size is optional. specifies how often the dictionary of small # user data is gathered. if you write out large data (NOT RECOMMENDED) it needs to be set small. smd = ds.smalldata(filename='mysmallh5.h5', batch_size=5, callbacks=[my_smalldata]) for run in ds.runs(): opal = run.Detector('tmo_opal1') ebeam = run.Detector('ebeam') runsum = np.zeros((3),dtype=float) # beware of datatypes when summing: can overflow for evt in run.events(): img = opal.raw.image(evt) photonEnergy = ebeam.raw.ebeamPhotonEnergy(evt) if img is None or photonEnergy is None: continue evtsum = np.sum(img) # pass either dictionary or kwargs smd.event(evt, evtsum=evtsum, photonEnergy=photonEnergy) runsum += img[0,:3] # local sum on one mpi core # optional summary data for whole run if smd.summary: tot_runsum = smd.sum(runsum) # sum (or max/min) across all mpi cores. Must be numpy array or None. # pass either dictionary or kwargs smd.save_summary({'sum_over_run' : tot_runsum}, summary_int=1) smd.done()
Accessing Event Codes
LCLS1-style event codes can be accessed using the "timing" detector:
from psana import DataSource ds = DataSource(exp='tmoc00118',run=222,dir='/cds/data/psdm/prj/public01/xtc') myrun = next(ds.runs()) timing = myrun.Detector('timing') for nevt,evt in enumerate(myrun.events()): allcodes = timing.raw.eventcodes(evt) # event code 15 fires at 1Hz, and this exp/run had 10Hz triggers print('event code 15 present:',allcodes[15]) if nevt>20: break
Running in Parallel
Instructions for submitting batch jobs to run in parallel are here: Batch Nodes And Queues
Analyzing Scans
To get a list of the scan variables, use the following command:
(ps-4.3.2) psanagpu102:lcls2$ detnames -s exp=rixx43518,run=45,dir=/cds/data/psdm/prj/public01/xtc -------------------------- Name | Data Type -------------------------- motor1 | raw motor2 | raw step_value | raw step_docstring | raw -------------------------- (ps-4.3.2) psanagpu102:lcls2$
From within python the above information can be obtained as a dictionary using "run.scaninfo".
Note that "step_value"/"step_docstring" are convenience values defined by the DAQ to be a single value/string that is supposed to represent what has happened on a step, for use in plots/printout, since it's not easy to plot, for example, vs. multiple motor positions in a complex scan.
Code similar to this can be used to access the above scan variables:
from psana import DataSource ds = DataSource(exp='rixx43518',run=45,dir='/cds/data/psdm/prj/public01/xtc') myrun = next(ds.runs()) motor1 = myrun.Detector('motor1') motor2 = myrun.Detector('motor2') step_value = myrun.Detector('step_value') step_docstring = myrun.Detector('step_docstring') for step in myrun.steps(): print(motor1(step),motor2(step),step_value(step),step_docstring(step)) for evt in step.events(): pass
Running From Shared Memory
psana2 scripts can be run in real-time on shared memory. There is some extra complexity compared to writing python code for the real-time GUI ami, for example:
- you have to create plots and handle when when they update
- you have to worry about data getting out of time order
- you have to handle missing data
- you have to reset plots on run boundaries (if that's the behavior you want)
- you have to write code to handle the data gathered from multiple mpi cores (not required if you want to run on 1 core)
- be aware that the shared memory python scripts "steal" event statistics from other shared memory instances (e.g. ami)
but you get to take advantage of the power/flexibility of python. Look at the DAQ .cnf file (e.g. /cds/group/pcds/dist/pds/rix/scripts/rix.cnf) to see what the name of the node is running the shared memory server ("monReqServer"). To access those nodes you need to be on a special permissions list (email pcds-ana-l@slac.stanford.edu to request). You can find the name of the shared memory (hutch name is typically used) either by looking on the .cnf file (the "-P" option to monReqServer executable) or doing a command like this:
drp-neh-cmp003:~$ ls /dev/shm/ PdsMonitorSharedMemory_tmo drp-neh-cmp003:~$
For this output, you would use "DataSource(shmem='tmo')".
Note that one can develop the shared memory scripts (including real-time plots) using offline data, then change the DataSource line to run them in real-time.
Typically psmon is used for publishing results to realtime plots in the callback, publishing updates every "N" events. See this link for psmon examples: Visualization Tools.
When running multi-core with mpi one has to use the small data "callbacks" kwarg to receive the data gathered from all nodes. An example multi-core script is below (also works on 1 core). A pure 1-core script is simpler (no data "gathering" needed: can just be a loop over events with plot updates every N events). For multi-core, this can be run with "mpirun -n 3 python <scriptname>.py" and the two plots can be viewed on the same node with "psplot -a 1 OPALSUMS OPALIMG" (see Visualization Tools for "psplot" options). It can also be run in the usual offline manner by changing the DataSource line. It is written in a way to keep running even when the DAQ is restarted:
import os import numpy as np from psana import DataSource from psmon import publish from psmon.plots import XYPlot,Image from collections import deque from mpi4py import MPI numworkers = MPI.COMM_WORLD.Get_size()-1 if numworkers==0: numworkers=1 # the single core case (no mpi) os.environ['PS_SRV_NODES']='1' # one mpi core gathers/plots data mydeque=deque(maxlen=25) lastimg=None numevents=0 numendrun=0 def my_smalldata(data_dict): # one core gathers all data from mpi workers global numevents,lastimg,numendrun,mydeque if 'endrun' in data_dict: numendrun+=1 if numendrun==numworkers: print('Received endrun from all workers. Resetting data.') numendrun=0 numevents=0 mydeque=deque(maxlen=25) return numevents += 1 if 'opal' in data_dict: lastimg = data_dict['opal'] mydeque.append(data_dict['opalsum']) if numevents%100==0: # update plots around 1Hz print('event:',numevents) myxyplot = XYPlot(numevents, "Last 25 Sums", np.arange(len(mydeque)), np .array(mydeque), formats='o') publish.send("OPALSUMS", myxyplot) if lastimg is not None: # opal image is not sent all the time myimgplot = Image(numevents, "Opal Image", lastimg) publish.send("OPALIMG", myimgplot) while 1: # mpi worker processes ds = DataSource(shmem='rix') smd = ds.smalldata(batch_size=5, callbacks=[my_smalldata]) for myrun in ds.runs(): opal = myrun.Detector('atmopal') for nevt,evt in enumerate(myrun.events()): mydict={} image = opal.raw.image(evt) if image is None: continue # do as much work as possible in the workers # don't send large data all the time, if possible if nevt%10==0: mydict['opal']=image mydict['opalsum']=np.sum(image) smd.event(evt,mydict) smd.event(evt,{'endrun':1}) # tells gatherer to reset plots
When running multi-node mpi there are also some complexities propagating the environment to remote nodes: the way to address that is described in this link.
Running in LIVE mode
Here's a sample python script, how you can config datasource to run in live mode:
# Use environment variable to specify how many attempts, # the datasource should wait for file reading (1 second wait). # In this example, we set it to 30 (wait up 30 seconds). import os os.environ['PS_SMD_MAX_RETRIES'] = '30' # Create a datasource with live flag from psana import DataSource ds = DataSource(exp='tmoc00118', run=222, dir='/cds/data/psdm/prj/public01/xtc', live = True, max_events = 10) # Looping over your run and events as usual # You'll see "wait for an event..." message in case # The file system writing is slower than your analysis run = next(ds.runs()) for i, evt in enumerate(run.events()): print(f'got evt={i} ts={evt.timestamp}')
Note that this script is also available in tests folder in lcls2 repository. You can run the script by:
(ps-4.3.2) psanagpu109 tests $ mpirun -n 3 python livemode.py
Jump to events
Psana2 can jump to particular events when given a list of timestamps. In the example below, we only want to process these four timestamps. Note that timestamp array should be formatted to np.uint64.
from psana import DataSource import numpy as np timestamps = np.array([4194783241933859761,4194783249723600225,4194783254218190609,4194783258712780993], dtype=np.uint64) ds = DataSource(exp='tmoc00118', run=222, dir='/cds/data/psdm/prj/public01/xtc', timestamps=timestamps) myrun = next(ds.runs()) opal = myrun.Detector('tmo_atmopal') for nevt, evt in enumerate(myrun.events()): img = opal.raw.image(evt) print(nevt, evt.timestamp, img.shape)
Here is the output
(ps-4.5.5) monarin@psanagpu111 (master *) psana2 $ python test_jump.py 0 4194783241933859761 (1024, 1024) 1 4194783249723600225 (1024, 1024) 2 4194783254218190609 (1024, 1024) 3 4194783258712780993 (1024, 1024)
MPI Task Structure To Support Scaling
psana can scale to allow for high rate analysis. For example, many hdf5 files of small user-defined data (described above in Example Script Producing Small HDF5 File) can be written, one per "SRV" node in the diagram below. The total number of SRV nodes is defined by the environment variable PS_SRV_NODES (defaults to 0). These many hdf5 files are joined by psana into what appears to be one file using the hdf5 "virtual dataset" feature. Similarly, multiple nodes can be used for filtering ("EB" nodes in the diagram below) and multiple nodes can be used to process big data in the main psana event loop ("BD" nodes in the digram below). The one piece that cannot be scaled (currently) to multiple nodes is the SMD0 (SMallData) task, which reads the timestamps and fseek offsets from each tiny .smd.xtc2 file produced by the DAQ (typically one per detector, or one per detector segment, although it can contain more than one segment or detector). This task joins together the relevant data for each shot ("event build") using the timestamp. This SMD0 task is multi-threaded, with one thread for each detector. For highest performance it is important that all SMD0 threads be allocated an entire MPI node.