Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagepy
from psana import DataSource
import numpy as np
import os

# OPTIONAL callback with "gathered" small data from all cores.
# usually used for creating realtime plots when analyzing from
# DAQ shared memory. Called back on each SRV node.
def my_smalldata(data_dict):
    print(data_dict)

# sets the number of h5 files to write. 1 is sufficient for 120Hz operation
# optional: only needed if you are saving h5.
os.environ['PS_SRV_NODES']='1'

ds = DataSource(exp='tmoc00118', run=222, dir='/cds/data/psdm/prj/public01/xtc', max_events=10)
# batch_size is optional. specifies how often the dictionary of small
# user data is gathered. if you write out large data (NOT RECOMMENDED) it needs to be set small.
smd = ds.smalldata(filename='mysmallh5.h5', batch_size=5, callbacks=[my_smalldata])

for run in ds.runs():
    opal = run.Detector('tmo_opal1')
    ebeam = run.Detector('ebeam')
 
    runsum  = np.zeros((3),dtype= 0float) # beware of datatypes when summing: can overflow
    for evt in run.events():
        img = opal.raw.image(evt)
        photonEnergy = ebeam.raw.ebeamPhotonEnergy(evt)
        # important: always check for missing data
        if img is None or photonEnergy is None: continue
        evtsum = np.sum(img)
        # pass either dictionary or kwargs
        smd.event(evt, evtsum=evtsum, photonEnergy=photonEnergy)
        runsum += evtsum # beware of datatypes when summing: can overflowimg[0,:3]
 
    # optional summary data for whole run
    if smd.summary:
        tot_runsum = smd.sum(runsum) # sum (or max/min) across all mpi cores
        # pass either dictionary or kwargs
        smd.save_summary({'sum_over_run' : tot_runsum}, summary_int=1)
    smd.done()

...

Code Block
languagepy
from psana import DataSource
import numpy as np
import os

# OPTIONAL callback with "gathered" small data from all cores.
# usually used for creating realtime plots when analyzing from
# DAQ shared memory. Called back on each SRV node.
def my_smalldata(data_dict):
    print(data_dict)

# Use this function to decide to keep/discard this event
# If this detector is needed, make sure to define this
# detector in as_smds argument for DataSource (see below)
def filter_fn(evt):
    run = evt.run()
    step = run.step(evt)
    opal = run.Detector('tmo_opal1')
    img = opal.raw.image(evt)
    return True

# Use this function to direct an event to process on a
# particular 'rank'. This function should returns a rank
# number between 1 and total no. of ranks - 3 (3 ranks are reserved).
def destination(evt):
    # Note that run, step, and det can be accessed
    # the same way as shown in filter_fn
    n_bd_nodes = 3 # for mpirun -n 6, 3 ranks are reserved so there are 3 bd ranks left
    dest = (evt.timestamp % n_bd_nodes) + 1
    return dest

# sets the number of h5 files to write. 1 is sufficient for 120Hz operation
# optional: only needed if you are saving h5.
os.environ['PS_SRV_NODES']='1'

ds = DataSource(exp='tmoc00118', run=222, dir='/cds/data/psdm/prj/public01/xtc', 
        max_events  = 10,   
        detectors   = ['tmo_opal1', 'ebeam'],   # only reads these detectors (faster)
        filter      = filter_fn,                # filter_fn returns True/False
        small_xtc   = ['tmo_opal1'],            # detectors to be used in filter callback
        destination = destination)              # returns rank no. (send this evt to this rank)

# batch_size is optional. specifies how often the dictionary of small
# user data is gathered.  if you write out large data (NOT RECOMMENDED) it needs to be set small.
smd = ds.smalldata(filename='mysmallh5.h5', batch_size=5, callbacks=[my_smalldata])

for run in ds.runs():
    opal = run.Detector('tmo_opal1')
    ebeam = run.Detector('ebeam')
 
    runsum  = np.zeros((3),dtype= 0float) # beware of datatypes when summing: can overflow
    for evt in run.events():
        img = opal.raw.image(evt)
        photonEnergy = ebeam.raw.ebeamPhotonEnergy(evt)
        if img is None or photonEnergy is None: continue
        evtsum = np.sum(img)
        # pass either dictionary or kwargs
        smd.event(evt, evtsum=evtsum, photonEnergy=photonEnergy)
        runsum += evtsum # beware of datatypes when summing: can overflowimg[0,:3]
 
    # optional summary data for whole run
    if smd.summary:
        tot_runsum = smd.sum(runsum) # sum (or max/min) across all mpi cores
        # pass either dictionary or kwargs
        smd.save_summary({'sum_over_run' : tot_runsum}, summary_int=1)
    smd.done()

...