Page History
...
Code Block | ||
---|---|---|
| ||
from psana import DataSource import numpy as np import os # OPTIONAL callback with "gathered" small data from all cores. # usually used for creating realtime plots when analyzing from # DAQ shared memory. Called back on each SRV node. def my_smalldata(data_dict): print(data_dict) # sets the number of h5 files to write. 1 is sufficient for 120Hz operation # optional: only needed if you are saving h5. os.environ['PS_SRV_NODES']='1' ds = DataSource(exp='tmoc00118', run=222, dir='/cds/data/psdm/prj/public01/xtc', max_events=10) # batch_size is optional. specifies how often the dictionary of small # user data is gathered smd = ds.smalldata(filename='mysmallh5.h5', batch_size=5, callbacks=[my_smalldata]) for run in ds.runs(): opal = run.Detector('tmo_opal1') ebeam = run.Detector('ebeam') runsum = 0 for evt in run.events(): img = opal.raw.image(evt) photonEnergy = ebeam.raw.ebeamPhotonEnergy(evt) if img is None or photonEnergy is None: continue evtsum = np.sum(img) # pass either dictionary or kwargs smd.event(evt, evtsum=evtsum, photonEnergy=photonEnergy) runsum += evtsum # beware of datatypes when summing: can overflow # optional summary data for whole run if smd.summary: smd.sum(runsum) # sum across all mpi cores # pass either dictionary or kwargs smd.save_summary({'sum_over_run' : runsum}, summary_int=1) smd.done() |
Full Featured Example with Callbacks and Detector Selection
You can run this script with MPI the same way as previous example: mpirun -n 6 python example.py
Three more arguments for DataSource were added to this example
- detectors=['detname1', 'detname2',]
List of detectors to be read from the disk. If you only need a few detectors, list their name here. The reading process will be faster since we don't need to read other detectors.
- filter= filter_fn
You can write a filter_fn(evt) callback which returns True or False to include or exclude the event from getting read from the disk.
- small_xtc=['detname1', 'detname2']
List of detectors to be used in filter_fn()
- destination=destination
You can write a destination(evt) callback with returns rank id that you want to send this event to.
Code Block | ||
---|---|---|
| ||
from psana import DataSource
import numpy as np
import os
# OPTIONAL callback with "gathered" small data from all cores.
# usually used for creating realtime plots when analyzing from
# DAQ shared memory. Called back on each SRV node.
def my_smalldata(data_dict):
print(data_dict)
# Use this function to decide to keep/discard this event
# If this detector is needed, make sure to define this
# detector in as_smds argument for DataSource (see below)
def filter_fn(evt):
run = evt.run()
step = run.step(evt)
opal = run.Detector('tmo_opal1')
img = opal.raw.image(evt)
return True
# Use this function to direct an event to process on a
# particular 'rank'. This function should returns a rank
# number between 2 to no. of total ranks - 1.
def destination(evt):
# Note that run, step, and det can be accessed
# the same way as shown in filter_fn
n_bd_nodes = 3 # for mpirun -n 6, 3 ranks are reserved so there are 3 bd ranks left
dest = (evt.timestamp % n_bd_nodes) + 1
return dest
# sets the number of h5 files to write. 1 is sufficient for 120Hz operation
# optional: only needed if you are saving h5.
os.environ['PS_SRV_NODES']='1'
ds = DataSource(exp='tmoc00118', run=222, dir='/cds/data/psdm/prj/public01/xtc',
max_events = 10,
detectors = ['tmo_opal1', 'ebeam'], # only reads these detectors (faster)
filter = filter_fn, # filter_fn returns True/False
small_xtc = ['tmo_opal1'], # detectors to be used in filter callback
destination = destination) # returns rank no. (send this evt to this rank)
# batch_size is optional. specifies how often the dictionary of small
# user data is gathered
smd = ds.smalldata(filename='mysmallh5.h5', batch_size=5, callbacks=[my_smalldata])
for run in ds.runs():
opal = run.Detector('tmo_opal1')
ebeam = run.Detector('ebeam')
runsum = 0
for evt in run.events():
img = opal.raw.image(evt)
photonEnergy = ebeam.raw.ebeamPhotonEnergy(evt)
if img is None or photonEnergy is None: continue
evtsum = np.sum(img)
# pass either dictionary or kwargs
smd.event(evt, evtsum=evtsum, photonEnergy=photonEnergy)
runsum += evtsum # beware of datatypes when summing: can overflow
# optional summary data for whole run
if smd.summary:
smd.sum(runsum) # sum across all mpi cores
# pass either dictionary or kwargs
smd.save_summary({'sum_over_run' : runsum}, summary_int=1)
smd.done() |
Running in Parallel
Instructions for submitting batch jobs to run in parallel are here: Batch System Analysis Jobs
...