Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This mechanism by defaults produces "aligned" datasets where missing values are padded (with NaN's for floats, and -99999 for integers).  To create an unaligned dataset (without padding) prefix the name of the variable with "unaligned_".

NOTE: in In addition to the hdf5 you specify as your output file ("my.h5" below) you will see other h5 files like "my_part0.h5", one for each of the cores specified in PS_SRV_NODES.  The reason for this is that each of those SRV cores writes its own my_partN.h5 file: for LCLS2 it will be important for performance to write many files.  The "my.h5" file is actually quite small, and uses a new HDF5 feature called a "Virtual DataSet" (VDS) to join together the various my_partN.h5 files.  Also note that events in my.h5 will not be in time order.  If you copy the .h5 somewhere else, you need to copy all of them.

...

NOTE: The call below to smd.sum() (or min()/max()) only accepts numpy arrays or None.  This requirement allows cores that don't see any events to not cause errors when summing across cores.  All cores must call .sum() or the process will hang (a property of the underlying mpi "Reduce" call).

NOTE: The smd.event() call (for persisting data to hdf5 or passing it to an SRV callback) can be passed either a series of kwargs or a dictionary.  The dictionary can optionally be hierarchical (e.g. d['mykey1']['mykey2']=value) and those keys will be reflected in the hdf5 dataset structure.  This allows the user to organize data with a structure that they prefer. 

Code Block
languagepy
from psana import DataSource
import numpy as np
import os

# OPTIONAL callback with "gathered" small data from all cores.
# usually used for creating realtime plots when analyzing from
# DAQ shared memory. Called back on each SRV node.
def my_smalldata(data_dict):
    print(data_dict)

# sets the number of h5 files to write. 1 is sufficient for 120Hz operation
# optional: only needed if you are saving h5.
os.environ['PS_SRV_NODES']='1'

ds = DataSource(exp='tmoc00118', run=222, dir='/cds/data/psdm/prj/public01/xtc', max_events=10)
# batch_size is optional. specifies how often the dictionary of small
# user data is gathered. if you write out large data (NOT RECOMMENDED) it needs to be set small.
smd = ds.smalldata(filename='mysmallh5.h5', batch_size=5, callbacks=[my_smalldata])

for run in ds.runs():
    opal = run.Detector('tmo_opal1')
    ebeam = run.Detector('ebeam')

    runsum  = np.zeros((3),dtype=float) # beware of datatypes when summing: can overflow
    for evt in run.events():
        img = opal.raw.image(evt)
        photonEnergy = ebeam.raw.ebeamPhotonEnergy(evt)
        # important: always check for missing data
        if img is None or photonEnergy is None: continue
        evtsum = np.sum(img)
        # pass either dictionary or kwargs. dictionaries can be hierarchical to add user-defined structure
        smd.event(evt, evtsum=evtsum, photonEnergy=photonEnergy)
        runsum += img[0,:3] # local sum on one mpi core
 
    # optional summary data for whole run
    if smd.summary:
        tot_runsum = smd.sum(runsum) # sum (or max/min) across all mpi cores. Must be numpy array or None.
        # pass either dictionary or kwargs.
 dictionaries can be hierarchical to add user-defined structure         smd.save_summary({'sum_over_run' : tot_runsum}, summary_int=1)
    smd.done()

...

Code Block
languagepy
from psana import DataSource
import numpy as np
import os

# OPTIONAL callback with "gathered" small data from all cores.
# usually used for creating realtime plots when analyzing from
# DAQ shared memory. Called back on each SRV node.
def my_smalldata(data_dict):
    print(data_dict)  

# Use this function to decide if you want to fetch large data for this event  
# and/or direct an event to process on a particular 'rank' 
# (this rank number should be between 1 and total no. of ranks - 3 
# since 3 ranks are reserved). If this detector is needed, make sure 
# to define this detector in as_smds argument for DataSource (see below).
# All epics and scan detectors are available automatically.
def smd_callback(run):
    opal = run.Detector('tmo_opal1')
    epics_det = run.Detector('IM2K4_XrayPower')

    n_bd_nodes = 3 # for mpirun -n 6, 3 ranks are reserved so there are 3 bd ranks left

    for i_evt, evt in enumerate(run.events()):
        img = opal.raw.image(evt)
        epics_val = epics_det(evt)
        dest = (evt.timestamp % n_bd_nodes) + 1

        if epics_val is not None:
            # Set the destination (rank no.) where this event should be sent to
            evt.set_destination(dest)
            yield evt

# sets the number of h5 files to write. 1 is sufficient for 120Hz operation
# optional: only needed if you are saving h5.
os.environ['PS_SRV_NODES']='1'

ds = DataSource(exp='tmoc00118', run=222, dir='/cds/data/psdm/prj/public01/xtc',
        max_events  = 10,
        detectors   = ['epicsinfo', 'tmo_opal1', 'ebeam'],  # only reads these detectors (faster)
        smd_callback= smd_callback,                         # smalldata callback (see notes above)
        small_xtc   = ['tmo_opal1'],                        # detectors to be used in smalldata callback
        )

# batch_size is optional. specifies how often the dictionary of small
# user data is gathered.  if you write out large data (NOT RECOMMENDED) it needs to be set small.
smd = ds.smalldata(filename='mysmallh5.h5', batch_size=5, callbacks=[my_smalldata])

for run in ds.runs():
    opal = run.Detector('tmo_opal1')
    ebeam = run.Detector('ebeam')

    runsum  = np.zeros((3),dtype=float) # beware of datatypes when summing: can overflow
    for evt in run.events():
        img = opal.raw.image(evt)
        photonEnergy = ebeam.raw.ebeamPhotonEnergy(evt)
        if img is None or photonEnergy is None: continue
        evtsum = np.sum(img)
        # pass either dictionary or kwargs. dictionaries can be hierarchical to add user-defined structure  
        smd.event(evt, evtsum=evtsum, photonEnergy=photonEnergy)
        runsum += img[0,:3] # local sum on one mpi core
 
    # optional summary data for whole run
    if smd.summary:
        tot_runsum = smd.sum(runsum) # sum (or max/min) across all mpi cores. Must be numpy array or None.
        # pass either dictionary or kwargs. dictionaries can be hierarchical to add user-defined structure 
        smd.save_summary({'sum_over_run' : tot_runsum}, summary_int=1)
    smd.done()

...