You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Single Core, Big Data

The above example will run very fast because we never read the cspad detector data, or the Camera::Frame data that is present in the events. When reading the cspad and doing things like calibrating, etc, things will run much more slowly. Below we use the Detector interface to also write:

  • A horizontal projection of the calibrated cspad image

for each event. This will run much more slowly, so there is a parameter to stop the script after some number of events.

We do not assume we know the size of the image array returned by the Detector image function, thus we do not create the dataset to hold the cspad projections until the first projection is formed.

For the large cspad data, we use a chunk size of 12. Given the dimensions of the 

In addition, we demonstrate how to compress the horizontal projection dataset. Note - compressing significantly slows down saving the data.

import numpy as np
import psana
import h5py

LASER_ON_EVR_CODE = 92 
NUM_EVENTS_TO_WRITE=20

ds = psana.DataSource('exp=xpptut15:run=54:smd')

h5out = h5py.File("saved_data_for_xppdaq12_run54_cspad.h5", 'w')
eventDataGroup = h5out.create_group('EventData')
evtSecDs = eventDataGroup.create_dataset('event_seconds',
                          (0,), dtype='i4', chunks=True, maxshape=(None,))
evtNanoDs = eventDataGroup.create_dataset('event_nanoseconds',
                          (0,), dtype='i4', chunks=True, maxshape=(None,))
ebeamL3Ds = eventDataGroup.create_dataset('ebeam_L3',
                          (0,), dtype='f4', chunks=True, maxshape=(None,))
laserOnDs = eventDataGroup.create_dataset('laser_on',
                          (0,), dtype='i1', chunks=True, maxshape=(None,))
cspadHprojDs = None # will create once we see first cspad and know dimensions

cspad = psana.Detector('cspad', ds.env())


nextDsIdx = -1
for evt in ds.events():
    eventId = evt.get(psana.EventId)
    evr = evt.get(psana.EvrData.DataV4, psana.Source('evr0'))
    ebeam = evt.get(psana.Bld.BldDataEBeamV7, psana.Source('EBeam'))

    # use new detector xface for cspad image
    cspadImage = cspad.image(evt)

    # check for event that has everything we want to write
    if (eventId is None) or (evr is None) or \
       (ebeam is None) or (cspadImage is None):
        continue

    nextDsIdx += 1

    cspadHproj = np.sum(cspadImage, 0)
    if cspadHprojDs is None:
        cspadHprojDs = eventDataGroup.create_dataset('cspadHproj',
                               (0, len(cspadHproj)), 
                               chunks=True, 
                               maxshape=(None, len(cspadHproj)),
                               compression='gzip',
                               compression_opts=1)

    evtSecDs.resize((nextDsIdx+1,))
    evtNanoDs.resize((nextDsIdx+1,))
    ebeamL3Ds.resize((nextDsIdx+1,))
    laserOnDs.resize((nextDsIdx+1,))
    cspadHprojDs.resize((nextDsIdx+1, len(cspadHproj)))

    evtSecDs[nextDsIdx] = eventId.time()[0]
    evtNanoDs[nextDsIdx] = eventId.time()[1]
    ebeamL3Ds[nextDsIdx] = ebeam.ebeamL3Energy()
    laserOnDs[nextDsIdx] = evr.present(LASER_ON_EVR_CODE)
    cspadHprojDs[nextDsIdx,:] = cspadHproj[:]

    if nextDsIdx >= NUM_EVENTS_TO_WRITE: 
        break

summaryGroup = h5out.create_group('Summary')
summaryGroup.create_dataset('l3average', data = np.average(ebeamL3Ds[:]))
h5out.close()
        

Parallel Writing with Big Data

Next we use the mpi file driver of the Hdf5 library to distribute our processing of the cspad. A common practice with MPI is to use reduce or gather for rank local results at the end of the script. In this example, we

  • Have each rank locally keep track of a histogram of calibrated cspad values.
  • Reduce all histograms at the end and write this as summary information

Some other details about the script

  • With the mpi hdf5 file driver, you must collectively
    • create the file
    • create the datastes
    • resize the datasets
  • Individual ranks may write to different elements of the dataset
  • To collectively increase the size of the datasets, all ranks read the small data and resize when all items are present
  • To detect that the cspad is present without reading the big data behind it, we inspect the EventKeys
    • This is the trickiest part - it involves checking that there is an event key whose source has the alias 'cspad'
  • The NUM_EVENTS_TO_WRITE can be adjusted to have the script run faster (it is presently set to more than the number of events in the run)
  • Since datasets must be created collectively, it is much more awkward (but not impossible) to not create the cspad hproj dataset until the first event is read, so we assume the size of 1691 from previous work

Since this script collectively resizes each dataset on each event, it is not performing parallel processing. The next step is to resize in blocks greater than the world size less frequently. When the resize occurs, it is like having an MPI_Barrier. We want different ranks to process and write to their slots in the dataset in parallel.

import numpy as np
from mpi4py import MPI
import psana
import h5py

def hasCspad(evt, aliasMap):
    for key in evt.keys():
        if 'cspad' == aliasMap.alias(key.src()):
            return True
    return False

LASER_ON_EVR_CODE = 92 
CSPAD_HPROJ_LEN = 1691
NUM_EVENTS_TO_WRITE=1300

ds = psana.DataSource('exp=xpptut15:run=54:smd')

h5out = h5py.File("my_saved_data_for_xppdaq12_run54.h5", 'w',
                   driver='mpio', comm=MPI.COMM_WORLD)
eventDataGroup = h5out.create_group('EventData')
evtSecDs = eventDataGroup.create_dataset('event_seconds',
                          (0,), dtype='i4', chunks=True, maxshape=(None,))
evtNanoDs = eventDataGroup.create_dataset('event_nanoseconds',
                          (0,), dtype='i4', chunks=True, maxshape=(None,))
ebeamL3Ds = eventDataGroup.create_dataset('ebeam_L3',
                          (0,), dtype='f4', chunks=True, maxshape=(None,))
laserOnDs = eventDataGroup.create_dataset('laser_on',
                          (0,), dtype='i1', chunks=True, maxshape=(None,))
cspadHprojDs = eventDataGroup.create_dataset('cspadHproj',
                          (0,CSPAD_HPROJ_LEN), dtype='f4', 
                          chunks=True, maxshape=(None, CSPAD_HPROJ_LEN))

cspad = psana.Detector('cspad', ds.env())

histogram_bin_edges = [-500, -400, -200, -150, -100, -50, -25, 0, 5, 10, 15, 20, 25, 50, 100, 200, 500]
local_cspad_histogram = np.zeros((len(histogram_bin_edges)-1,), np.int64)


nextDsIdx = -1
for evt in ds.events():
    eventId = evt.get(psana.EventId)
    evr = evt.get(psana.EvrData.DataV4, psana.Source('evr0'))
    ebeam = evt.get(psana.Bld.BldDataEBeamV7, psana.Source('EBeam'))

    # check for event that has everything we want to write
    if (eventId is None) or (evr is None) or \
       (ebeam is None) or (not hasCspad(evt, ds.env().aliasMap())):
        continue

    nextDsIdx += 1

    if nextDsIdx >= NUM_EVENTS_TO_WRITE: 
        break

    # collectively resize chunked datasets
    evtSecDs.resize((nextDsIdx+1,))
    evtNanoDs.resize((nextDsIdx+1,))
    ebeamL3Ds.resize((nextDsIdx+1,))
    laserOnDs.resize((nextDsIdx+1,))
    cspadHprojDs.resize((nextDsIdx+1, CSPAD_HPROJ_LEN))

    # only process this ranks events
    if nextDsIdx % MPI.COMM_WORLD.Get_size() != MPI.COMM_WORLD.Get_rank():
        continue

    # use detector xface for cspad image
    cspadImage = cspad.image(evt)
    local_cspad_histogram += np.histogram(cspad.calib(evt), 
                                 histogram_bin_edges)[0]
    cspadHproj = np.sum(cspadImage, 0)

    evtSecDs[nextDsIdx] = eventId.time()[0]
    evtNanoDs[nextDsIdx] = eventId.time()[1]
    ebeamL3Ds[nextDsIdx] = ebeam.ebeamL3Energy()
    laserOnDs[nextDsIdx] = evr.present(LASER_ON_EVR_CODE)
    cspadHprojDs[nextDsIdx,:] = cspadHproj[:]


summaryGroup = h5out.create_group('Summary')
summaryGroup.create_dataset('cspad_histogram_bins', 
                                data = histogram_bin_edges)
finalHistDs = summaryGroup.create_dataset('cspad_histogram_values', 
                             (len(local_cspad_histogram),), dtype='i8')

if MPI.COMM_WORLD.Get_rank()==0:
    finalHistogram = np.zeros_like(local_cspad_histogram)
    MPI.COMM_WORLD.Reduce(sendbuf=[local_cspad_histogram, MPI.INT64_T],
                          recvbuf=[finalHistogram, MPI.INT64_T],
                          op=MPI.SUM, root=0)
    finalHistDs[:] = local_cspad_histogram[:]
else:
    MPI.COMM_WORLD.Reduce(sendbuf=[local_cspad_histogram, MPI.INT64_T],
                          recvbuf=[None, MPI.INT64_T],
                          op=MPI.SUM, root=0)
    
h5out.close()
        

This script could be launched by doing

bsub -q psanaq -a mympi -n 16 python scriptcode.py
  • No labels