Introduction
Below we use h5py to save data while processing events of experiment data. Documentation on h5py is available at: http://docs.h5py.org/en/latest/
Single Core - Just Small Data
First we read through a short run and save the following for each event:
- a field ebeam
- wether or not a particular evr code is present
- The seconds and nanoseconds that uniquely identify the event
We further save some summary information at the end, just a average of the ebeam values. To compute the average, we read the values back from the h5 file where we have stored them.
We do not assume we know how many events we will write before hand. Thus we use hdf5 chunked storage and resize the datasets as need be.
We fictionally assume that there is a EVR code of 92 that indicates that the laser is on.
Before writing code like this, it is helpful to examine the data with EventKeys to see what is inside it, for example, doing
psana -m EventKeys -n 3 exp=xppdaq12:run=54:smd
Will show you the data types and sources present in the run.
import numpy as np import psana import h5py LASER_ON_EVR_CODE = 92 ds = psana.DataSource('exp=xppdaq12:run=54:smd') h5out = h5py.File("saved_data_for_xppdaq12_run54_no_cspad.h5", 'w') eventDataGroup = h5out.create_group('EventData') evtSecDs = eventDataGroup.create_dataset('event_seconds', (0,), dtype='i4', chunks=True, maxshape=(None,)) evtNanoDs = eventDataGroup.create_dataset('event_nanoseconds', (0,), dtype='i4', chunks=True, maxshape=(None,)) ebeamL3Ds = eventDataGroup.create_dataset('ebeam_L3', (0,), dtype='f4', chunks=True, maxshape=(None,)) laserOnDs = eventDataGroup.create_dataset('laser_on', (0,), dtype='i1', chunks=True, maxshape=(None,)) nextDsIdx = -1 for evt in ds.events(): eventId = evt.get(psana.EventId) evr = evt.get(psana.EvrData.DataV4, psana.Source('evr0')) ebeam = evt.get(psana.Bld.BldDataEBeamV7, psana.Source('EBeam')) # check for event that has everything we want to write if (eventId is None) or (evr is None) or (ebeam is None): continue nextDsIdx += 1 evtSecDs.resize((nextDsIdx+1,)) evtNanoDs.resize((nextDsIdx+1,)) ebeamL3Ds.resize((nextDsIdx+1,)) laserOnDs.resize((nextDsIdx+1,)) evtSecDs[nextDsIdx] = eventId.time()[0] evtNanoDs[nextDsIdx] = eventId.time()[1] ebeamL3Ds[nextDsIdx] = ebeam.ebeamL3Energy() laserOnDs[nextDsIdx] = evr.present(LASER_ON_EVR_CODE) summaryGroup = h5out.create_group('Summary') summaryGroup.create_dataset('l3average', data = np.average(ebeamL3Ds[:])) h5out.close()
good tools for inspecting the contents of an h5 file are h5dump and h5ls. Here we use h5ls to take a look at the datasets in the output file:
psana1611:~/rel/smallData $ h5ls -r saved_data_for_xppdaq12_run54_no_cspad.h5 / Group /EventData Group /EventData/ebeam_L3 Dataset {1219/Inf} /EventData/event_nanoseconds Dataset {1219/Inf} /EventData/event_seconds Dataset {1219/Inf} /EventData/laser_on Dataset {1219/Inf} /Summary Group /Summary/l3average Dataset {SCALAR}
Single Core, Big Data
The above example will run very fast because we never read the cspad detector data, or the Camera::Frame data that is present in the events. When reading the cspad, things, and doing things like calibrating, etc, things will run much more slowly. Below we use the Detector interface to also write:
- A horizontal projection of the calibrated cspad image
for each event. This will run much more slowly, so there is a parameter to stop the script after some number of events.
We not assume we know the size of the image array returned by the Detector image function, thus we do not create the dataset to hold the cspad projections until the first projection is formed.
import numpy as np import psana import h5py LASER_ON_EVR_CODE = 92 NUM_EVENTS_TO_WRITE=20 ds = psana.DataSource('exp=xppdaq12:run=54:smd') h5out = h5py.File("saved_data_for_xppdaq12_run54_cspad.h5", 'w') eventDataGroup = h5out.create_group('EventData') evtSecDs = eventDataGroup.create_dataset('event_seconds', (0,), dtype='i4', chunks=True, maxshape=(None,)) evtNanoDs = eventDataGroup.create_dataset('event_nanoseconds', (0,), dtype='i4', chunks=True, maxshape=(None,)) ebeamL3Ds = eventDataGroup.create_dataset('ebeam_L3', (0,), dtype='f4', chunks=True, maxshape=(None,)) laserOnDs = eventDataGroup.create_dataset('laser_on', (0,), dtype='i1', chunks=True, maxshape=(None,)) cspadHprojDs = None # will create once we see first cspad and know dimensions cspad = psana.Detector('cspad', ds.env()) nextDsIdx = -1 for evt in ds.events(): eventId = evt.get(psana.EventId) evr = evt.get(psana.EvrData.DataV4, psana.Source('evr0')) ebeam = evt.get(psana.Bld.BldDataEBeamV7, psana.Source('EBeam')) # use new detector xface for cspad image cspadImage = cspad.image(evt) # check for event that has everything we want to write if (eventId is None) or (evr is None) or \ (ebeam is None) or (cspadImage is None): continue nextDsIdx += 1 cspadHproj = np.sum(cspadImage, 0) if cspadHprojDs is None: cspadHprojDs = eventDataGroup.create_dataset('cspadHproj', (0, len(cspadHproj)), chunks=True, maxshape=(None, len(cspadHproj)), compression='gzip', compression_opts=1) evtSecDs.resize((nextDsIdx+1,)) evtNanoDs.resize((nextDsIdx+1,)) ebeamL3Ds.resize((nextDsIdx+1,)) laserOnDs.resize((nextDsIdx+1,)) cspadHprojDs.resize((nextDsIdx+1, len(cspadHproj))) evtSecDs[nextDsIdx] = eventId.time()[0] evtNanoDs[nextDsIdx] = eventId.time()[1] ebeamL3Ds[nextDsIdx] = ebeam.ebeamL3Energy() laserOnDs[nextDsIdx] = evr.present(LASER_ON_EVR_CODE) cspadHprojDs[nextDsIdx,:] = cspadHproj[:] if nextDsIdx >= NUM_EVENTS_TO_WRITE: break summaryGroup = h5out.create_group('Summary') summaryGroup.create_dataset('l3average', data = np.average(ebeamL3Ds[:])) h5out.close()
Parallel Writing with Big Data
Next we use the mpi file driver of the Hdf5 library to distibute our processing of the cspad. A common practice with MPI is to use reduce or gather for rank local results at the end of the script. In this example, we
- Have each rank locally keep track of a historgram of calibrated cspad values.
- Reduce all histograms at the end and write this as summary information
Some other details about the script
- With the mpi hdf5 file driver, you must collectively
- create the file
- create the datastes
- resize the datasets
- Individual ranks may write to different elements of the dataset
- To collectively increase the size of the datasets, all ranks read the small data and resize when all items are present
- To detect that the cspad is present without reading the big data behind it, we inspect the EventKeys
- This is the trickiest part - it involves checking that there is an event key whose source has the alias 'cspad'
- The NUM_EVENTS_TO_WRITE can be adjusted to have the script run faster (it is presently set to more than the number of events in the run)
- Since datasets must be created collectively, it is much more awkward (but not impossible) to not create the cspad hproj dataset until the first event is read, so we assume the size of 1691 from previous work
import numpy as np from mpi4py import MPI import psana import h5py def hasCspad(evt, aliasMap): for key in evt.keys(): if 'cspad' == aliasMap.alias(key.src()): return True return False LASER_ON_EVR_CODE = 92 CSPAD_HPROJ_LEN = 1691 NUM_EVENTS_TO_WRITE=1300 ds = psana.DataSource('exp=xppdaq12:run=54:smd') h5out = h5py.File("my_saved_data_for_xppdaq12_run54.h5", 'w', driver='mpio', comm=MPI.COMM_WORLD) eventDataGroup = h5out.create_group('EventData') evtSecDs = eventDataGroup.create_dataset('event_seconds', (0,), dtype='i4', chunks=True, maxshape=(None,)) evtNanoDs = eventDataGroup.create_dataset('event_nanoseconds', (0,), dtype='i4', chunks=True, maxshape=(None,)) ebeamL3Ds = eventDataGroup.create_dataset('ebeam_L3', (0,), dtype='f4', chunks=True, maxshape=(None,)) laserOnDs = eventDataGroup.create_dataset('laser_on', (0,), dtype='i1', chunks=True, maxshape=(None,)) cspadHprojDs = eventDataGroup.create_dataset('cspadHproj', (0,CSPAD_HPROJ_LEN), dtype='f4', chunks=True, maxshape=(None, CSPAD_HPROJ_LEN)) cspad = psana.Detector('cspad', ds.env()) histogram_bin_edges = [-500, -400, -200, -150, -100, -50, -25, 0, 5, 10, 15, 20, 25, 50, 100, 200, 500] local_cspad_histogram = np.zeros((len(histogram_bin_edges)-1,), np.int64) nextDsIdx = -1 for evt in ds.events(): eventId = evt.get(psana.EventId) evr = evt.get(psana.EvrData.DataV4, psana.Source('evr0')) ebeam = evt.get(psana.Bld.BldDataEBeamV7, psana.Source('EBeam')) # check for event that has everything we want to write if (eventId is None) or (evr is None) or \ (ebeam is None) or (not hasCspad(evt, ds.env().aliasMap())): continue nextDsIdx += 1 if nextDsIdx >= NUM_EVENTS_TO_WRITE: break # collectively resize chunked datasets evtSecDs.resize((nextDsIdx+1,)) evtNanoDs.resize((nextDsIdx+1,)) ebeamL3Ds.resize((nextDsIdx+1,)) laserOnDs.resize((nextDsIdx+1,)) cspadHprojDs.resize((nextDsIdx+1, CSPAD_HPROJ_LEN)) # only process this ranks events if nextDsIdx % MPI.COMM_WORLD.Get_size() != MPI.COMM_WORLD.Get_rank(): continue # use detector xface for cspad image cspadImage = cspad.image(evt) local_cspad_histogram += np.histogram(cspad.calib(evt), histogram_bin_edges)[0] cspadHproj = np.sum(cspadImage, 0) evtSecDs[nextDsIdx] = eventId.time()[0] evtNanoDs[nextDsIdx] = eventId.time()[1] ebeamL3Ds[nextDsIdx] = ebeam.ebeamL3Energy() laserOnDs[nextDsIdx] = evr.present(LASER_ON_EVR_CODE) cspadHprojDs[nextDsIdx,:] = cspadHproj[:] summaryGroup = h5out.create_group('Summary') summaryGroup.create_dataset('cspad_histogram_bins', data = histogram_bin_edges) finalHistDs = summaryGroup.create_dataset('cspad_histogram_values', (len(local_cspad_histogram),), dtype='i8') if MPI.COMM_WORLD.Get_rank()==0: finalHistogram = np.zeros_like(local_cspad_histogram) MPI.COMM_WORLD.Reduce(sendbuf=[local_cspad_histogram, MPI.INT64_T], recvbuf=[finalHistogram, MPI.INT64_T], op=MPI.SUM, root=0) finalHistDs[:] = local_cspad_histogram[:] else: MPI.COMM_WORLD.Reduce(sendbuf=[local_cspad_histogram, MPI.INT64_T], recvbuf=[None, MPI.INT64_T], op=MPI.SUM, root=0) h5out.close()
This script could be launched by doing
bsub -q psanaq -a mympi -n 16 python scriptcode.py