Page History
...
List of detectors to be read from the disk. If you only need a few detectors for analysis, list their names here. The reading process will be faster since unused detector data is not read.
- filtersmd_callback= filtersmd_fncallback
You can write a filter_fn(evt) callback which returns True or False use this callback to 1) decide which which event to read (True) or not-read (False) yield evt) the large event data and 2) set the destination (rank id) to send this event to.
- small_xtc=['detname1', 'detname2']
List of detectors to be used in filter_fn()
...
You can write a destination(evt) callback with returns rank id that you want to send this event to.
Code Block | ||
---|---|---|
| ||
from psana import DataSource import numpy as np import os # OPTIONAL callback with "gathered" small data from all cores. # usually used for creating realtime plots when analyzing from # DAQ shared memory. Called back on each SRV node. def my_smalldata(data_dict): print(data_dict) # Use this function to decide to keep/discardif you want to fetch large data for this event # If this detector is needed, make sure to define this # detector in as_smds argument for DataSource (see below) def filter_fn(evt): run = evt.run() step = run.step(evt)and/or direct an event to process on a particular 'rank' # (this rank number should be between 1 and total no. of ranks - 3 # since 3 ranks are reserved). If this detector is needed, make sure # to define this detector in as_smds argument for DataSource (see below). # All epics and scan detectors are available automatically. def smd_callback(run): opal = run.Detector('tmo_opal1') imgepics_det = opalrun.raw.image(evt) Detector('IM2K4_XrayPower') n_bd_nodes return= True3 # True:for readmpirun large-data, False: do not read large-data # Use this function to direct an event to process on a # particular 'rank'. This function should returns a rank # number between 1 and total no. of ranks - 3 (3 ranks are reserved). def destination(evt): # Note that run, step, and det can be accessed # the same way as shown in filter_fn n_bd_nodes = 3 # for mpirun -n 6, 3 ranks are reserved so there are 3 bd ranks left dest = (evt.timestamp % n_bd_nodes) + 1 return destn 6, 3 ranks are reserved so there are 3 bd ranks left for i_evt, evt in enumerate(run.events()): img = opal.raw.image(evt) epics_val = epics_det(evt) dest = (evt.timestamp % n_bd_nodes) + 1 if epics_val is not None: # Set the destination (rank no.) where this event should be sent to evt.set_destination(dest) yield evt # sets the number of h5 files to write. 1 is sufficient for 120Hz operation # optional: only needed if you are saving h5. os.environ['PS_SRV_NODES']='1' ds = DataSource(exp='tmoc00118', run=222, dir='/cds/data/psdm/prj/public01/xtc', max_events = 10, detectors detectors = [= ['epicsinfo', 'tmo_opal1', 'ebeam'], # only reads these detectors (faster) filter smd_callback= smd_callback, = filter_fn, # filter_fn returns True (read large data)# orsmalldata Falsecallback (dosee notnotes readabove) small_xtc = ['tmo_opal1'], # detectors to be used in filter callback # detectors to destinationbe =used destination)in smalldata callback # returns rank no. (send this evt to this rank) # batch_size is optional. specifies how often the dictionary of small # user data is gathered. if you write out large data (NOT RECOMMENDED) it needs to be set small. smd = ds.smalldata(filename='mysmallh5.h5', batch_size=5, callbacks=[my_smalldata]) for run in ds.runs(): opal = run.Detector('tmo_opal1') ebeam = run.Detector('ebeam') runsum = np.zeros((3),dtype=float) # beware of datatypes when summing: can overflow for evt in run.events(): img = opal.raw.image(evt) photonEnergy = ebeam.raw.ebeamPhotonEnergy(evt) if img is None or photonEnergy is None: continue evtsum = np.sum(img) # pass either dictionary or kwargs smd.event(evt, evtsum=evtsum, photonEnergy=photonEnergy) runsum += img[0,:3] # local sum on one mpi core # optional summary data for whole run if smd.summary: tot_runsum = smd.sum(runsum) # sum (or max/min) across all mpi cores. Must be numpy array or None. # pass either dictionary or kwargs smd.save_summary({'sum_over_run' : tot_runsum}, summary_int=1) smd.done() |
...
Overview
Content Tools