Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We do not assume we know the size of the image array returned by the Detector image function, thus we do not create the dataset to hold the cspad projections until the first projection is formed.

For the large cspad data, we use horizontal projection, we create the chunksize after seeing the first image we will project. We target a chunk size of 12. Given the dimensions of the 16MB, but cap the number of horizontal projections in a chunk at 2048. Since the horizontal projection is small, we end up with a chunksize of 2048. When saving full images, you may find that targeting a 16MB chunk may put to few images in a chunk. In that case targeting larger chunk sizes, say 100MB, may be more optimal. 

In addition, we demonstrate how to compress the horizontal projection dataset. Note In addition, we demonstrate how to compress the horizontal projection dataset. Note - compressing significantly slows down saving the data.

Code Block
languagepython
import numpy as np
import math
import psana
import h5py
 
LASER_ON_EVR_CODE = 92 
NUM_EVENTS_TO_WRITE=20
 = 10
 
ds = psana.DataSource('exp=xpptut15:run=54:smd')
 
chunkSizeSmallData = 2048
h5out = h5py.File("saved_data_for_xppdaq12xpptut15_run54_cspad_hproj.h5", 'w')
eventDataGroup = h5out.create_group('EventData')
evtSecDs = eventDataGroup.create_dataset('event_seconds',
                          (0,), dtype='i4', chunks=True(chunkSizeSmallData,), maxshape=(None,))
evtNanoDs = eventDataGroup.create_dataset('event_nanoseconds',
  (0,),                      dtype='i4', chunks=(chunkSizeSmallData,), maxshape=(None,))
evtFiducialsDs = eventDataGroup.create_dataset('event_fiducials',
  (0,), dtype='i4', chunks=True(chunkSizeSmallData,), maxshape=(None,))
ebeamL3Ds = eventDataGroup.create_dataset('ebeam_L3',
  (0,),                        (0,), dtype='f4', chunks=Truedtype='f4', chunks=(chunkSizeSmallData,), maxshape=(None,))
laserOnDs = eventDataGroup.create_dataset('laser_on',
                          (0,), dtype='i1', chunks=True(chunkSizeSmallData,), maxshape=(None,))
cspadHprojDs = None # will create once we see first cspad and know dimensions
 
cspad = psana.Detector('cspad', ds.env())


nextDsIdx = -1
for evtIdx, evt in enumerate(ds.events()):
  if evtIdx >= NUM_EVENTS:
    break
  eventId = evt.get(psana.EventId)
    evr = evt.get(psana.EvrData.DataV4, psana.Source('evr0'))
    ebeam = evt.get(psana.Bld.BldDataEBeamV7, psana.Source('EBeam'))

    # use new detector xface for cspad image
    cspadImage = cspad.image(evt)
 
    # check for event that has everything we want to write
    if (eventIdcspadImage is None) or (evreventId is None) or \
       (ebeamevr is None) or (cspadImageebeam is None):
        continue

    nextDsIdx += 1

    cspadHprojhproj = np.sum(cspadImage, 0)
    if cspadHprojDs is None:
    mbPerEvent = (len(hproj)  cspadHprojDs* hproj.dtype.itemsize)/float(1<<20)
    targetChunkMb = eventDataGroup.create_dataset('cspadHproj',16.0
    chunkSize = min(2048, max(1,int(math.floor(targetChunkMb/mbPerEvent))))
    cspadHprojDs = eventDataGroup.create_dataset('cspad_hproj',
                  (0, len(cspadHproj)), 
                               chunks=True, 
(0,len(hproj)), dtype=cspadImage.dtype,
                                                 maxshapechunks=(NonechunkSize, len(cspadHprojhproj)),
                                 compression='gzip',
                maxshape=(None,len(hproj)),
               compression_opts=1)

        evtSecDs.resize((nextDsIdx+1,))
    evtNanoDs.resize((nextDsIdx+1,))
    ebeamL3Ds.resize((nextDsIdx+1,))
    laserOnDs.resize((nextDsIdx+1,))
    cspadHprojDs.resize((nextDsIdx+1, len(cspadHproj)))

         evtSecDs[nextDsIdx] compression= eventId.time()[0]
'gzip',
     evtNanoDs[nextDsIdx] = eventId.time()[1]
    ebeamL3Ds[nextDsIdx] = ebeam.ebeamL3Energy()
    laserOnDs[nextDsIdx] = evr.present(LASER_ON_EVR_CODE)
            cspadHprojDs[nextDsIdx,:] = cspadHproj[:]

    if nextDsIdx >= NUM_EVENTS_TO_WRITE: 
        break

summaryGroup = h5out.create_group('Summary')
summaryGroup.create_dataset('l3average', data = np.average(ebeamL3Ds[:]))
h5out.close()
        

...

compression_opts=1)
  nextDsIdx += 1
 
  evtSecDs.resize((nextDsIdx+1,))
  evtNanoDs.resize((nextDsIdx+1,))
  evtFiducialsDs.resize((nextDsIdx+1,))
  ebeamL3Ds.resize((nextDsIdx+1,))
  laserOnDs.resize((nextDsIdx+1,))
  cspadHprojDs.resize((nextDsIdx+1,len(hproj)))
  evtSecDs[nextDsIdx] = eventId.time()[0]
  evtNanoDs[nextDsIdx] = eventId.time()[1]
  evtFiducialsDs[nextDsIdx] = eventId.fiducials()
  ebeamL3Ds[nextDsIdx] = ebeam.ebeamL3Energy()
  laserOnDs[nextDsIdx] = evr.present(LASER_ON_EVR_CODE)
  cspadHprojDs[nextDsIdx,:] = hproj
 
summaryGroup = h5out.create_group('Summary')
summaryGroup.create_dataset('l3average', data = np.average(ebeamL3Ds[:]))
h5out.close()
        

Parallel Writing with Big Data

NOTE: this approach is not currently supported at LCLS since maintenance of the mpi-aware hdf5 package is time-consuming and the same parallel-writing can often be achieved using the newer "virtual dataset" feature of hdf5.

Next we use the mpi file driver of the Hdf5 library to distribute our processing of the cspad. A common practice with MPI is to use reduce or gather for rank local results at the end of the script. In this example, we

...

  • With the mpi hdf5 file driver, you must collectively
    • create the file
    • create the datastes
    • resize the datasets
  • Individual ranks may write to different elements of the dataset
    • the datastes
    • resize the datasets
  • Individual ranks may write to different elements of the dataset
  • To collectively increase the size of the datasets, all ranks read all of the small data events and collectively resize every 100 events or so.
    • This means that every 100 events, all ranks are doing the same thing. In between these resizes, they may be able to do parallel processing.
  • The extend of parallel processing performed is limited by the filesystem. Each rank is independently writing to different points in the Hdf5 file, however they filesystem may serialize these operations. Setting the lustre stripe size on the output file ahead of time can help - but in short, getting good performance out of parallel disk output can be involved.To collectively increase the size of the datasets, all ranks read the small data and resize when all items are present
  • To detect that the cspad is present without reading the big data behind it, we inspect the EventKeys
    • This is the trickiest part - it involves checking that there is an event key whose source has the alias 'cspad'
  • The NUM_EVENTS _TO_WRITE can be adjusted to have the script run faster (it is presently set to more than the number of events in the run)can be adjusted to have the script stop earlier.
  • Since datasets must be created collectively, it is much more awkward (but not impossible) to not create the cspad hproj dataset until the first event is read, so we assume the size of 1691 from previous work

...

  • event is read, but it is not to hard to figure out the size ahead of time.


Code Block
languagepython
import numpy as np
from mpi4py import MPI
import math
import psana
import MPI
import psana
import h5py
 h5py
import time
startTime = time.time()
ds = psana.DataSource('exp=xpptut15:run=54:smd')
 
def hasCspad(evt, aliasMap):
    for key in evt.keys():
        if 'cspad' == aliasMap.alias(key.src()):
            return True
    return if 'cspad' == aliasMap.alias(key.src()):
            return True
    return False

LASER_ON_EVR_CODE = 92 
CSPAD_HPROJ_LEN = 1691
NUM_EVENTS_TO_WRITE=1300

ds = psana.DataSource('exp=xpptut15:run=54:smd')

False
 
LASER_ON_EVR_CODE = 92
CSPAD_HPROJ_LEN = 1738
NUM_EVENTS = 120  # set to non-zero to stop early
RESIZE_GROW = 100  # how many events between collective resizes
currentDsetSize = 0
numResizes=0
 
chunkSizeSmallData = 2048
h5out = h5py.File("my_saved_data_for_xppdaq12xpptut15_run54_cspad_hproj.h5", 'w',
                   driver='mpio', comm=MPI.COMM_WORLD)
eventDataGroup = h5out.create_group('EventData')
evtSecDs = eventDataGroup.create_dataset('event_seconds',
                          (0currentDsetSize,), dtype='i4', chunks=True(chunkSizeSmallData,), maxshape=(None,))
evtNanoDs = eventDataGroup.create_dataset('event_nanoseconds',
                        ,
  (0currentDsetSize,), dtype='i4', chunks=True(chunkSizeSmallData,), maxshape=(None,))
ebeamL3DsevtFiducialsDs = eventDataGroup.create_dataset('ebeamevent_L3fiducials',
  (currentDsetSize,), dtype='i4', chunks=(chunkSizeSmallData,),                      (0maxshape=(None,))
ebeamL3Ds = eventDataGroup.create_dataset('ebeam_L3',
  (currentDsetSize,), dtype='f4', chunks=True(chunkSizeSmallData,), maxshape=(None,))
laserOnDs = eventDataGroup.create_dataset('laser_on',
                          (0currentDsetSize,), dtype='i1', chunks=True(chunkSizeSmallData,), maxshape=(None,))
cspadHprojDs = eventDataGroup.create_dataset('cspadHproj',
                          (0currentDsetSize,CSPAD_HPROJ_LEN), dtype='f4', 
                          chunks=True=(2000,CSPAD_HPROJ_LEN), maxshape=(None, CSPAD_HPROJ_LEN))
 
cspad = psana.Detector('cspad', ds.env())
 
histogram_bin_edges = [-500, -400, -200, -150, -100, -50, -25, 0, 5, 10, 15, 20, 25, 50, 100, 200, 500]
local_cspad_histogram = np.zeros((len(histogram_bin_edges)-1,), np.int64)
timeResize = 0.0
timeWriting = 0.0
 
nextDsIdx = -1
for evtIdx, evt in enumerate(ds.events()):
  if NUM_EVENTS > 0 and evtIdx >= NUM_EVENTS:
    break
  eventId = evt.get(psana.EventId)
    evr = evt.get(psana.EvrData.DataV4, psana.Source('evr0'))
    ebeam = evt.get(psana.Bld.BldDataEBeamV7, psana.Source('EBeam'))
 
    # check for event that has everything we want to write
    if (eventId is None) or (evr is None) or (ebeam is None) or \
     (not hasCspad(evt, ds.env().aliasMap())):
    continue
  nextDsIdx += 1
 (ebeam is None) or (not hasCspad(evt, ds.env().aliasMap())): 
  if (nextDsIdx >= currentDsetSize):
    t0 = time.time()
    # collectively resize all the chunked continue
datasets
    currentDsetSize += RESIZE_GROW
    nextDsIdxnumResizes += 1

    if# nextDsIdx >= NUM_EVENTS_TO_WRITE: 
        break
this MPI_Barrier is not necessary, but helps to illustrate the collective operation
    # collectively resize chunked datasetsMPI.COMM_WORLD.Barrier()
    evtSecDs.resize((nextDsIdx+RESIZE_GROW,))
    evtSecDsevtNanoDs.resize((nextDsIdx+1RESIZE_GROW,))
    evtNanoDsevtFiducialsDs.resize((nextDsIdx+1RESIZE_GROW,))
    ebeamL3Ds.resize((nextDsIdx+1RESIZE_GROW,))
    laserOnDs.resize((nextDsIdx+1RESIZE_GROW,))
    cspadHprojDs.resize((nextDsIdx+1RESIZE_GROW, CSPAD_HPROJ_LEN))

    timeResize += time.time()-t0
  # only process this ranks events
    if nextDsIdx % MPI.COMM_WORLD.Get_size() != MPI.COMM_WORLD.Get_rank():
    continue
  t0  continue

  = time.time()
  # useexpensive detectorper xface for cspad image
 rank processing:
   cspadImage = cspad.image(evt)
    local_cspad_histogram += np.histogram(cspad.calib(evt), 
                                 histogram_bin_edges)[0]
    cspadHproj = np.sum(cspadImage, 0)

  
  evtSecDs[nextDsIdx] = eventId.time()[0]
    evtNanoDs[nextDsIdx] = eventId.time()[1]
  evtFiducialsDs[nextDsIdx] = eventId.fiducials()
  ebeamL3Ds[nextDsIdx] = ebeam.ebeamL3Energy()
    laserOnDs[nextDsIdx] = evr.present(LASER_ON_EVR_CODE)
    cspadHprojDs[nextDsIdx,:] = cspadHproj[:]

 = cspadHproj
  timeWriting += time.time()-t0
## finished processing, collectively form summary
# datasets, and get final historgram
summaryGroup = h5out.create_group('Summary')
summaryGroup.create_dataset('cspad_histogram_bins', 
                                data = histogram_bin_edges)
finalHistDs = summaryGroup.create_dataset('cspad_histogram_values', 
                             (len(local_cspad_histogram),), dtype='i8')
 
if MPI.COMM_WORLD.Get_rank()==0:
    finalHistogram = np.zeros_like(local_cspad_histogram)
    MPI.COMM_WORLD.Reduce(sendbuf=[local_cspad_histogram, MPI.INT64_T],
                          recvbuf=[finalHistogram, MPI.INT64_T],
                          op=MPI.SUM, root=0)
    finalHistDs[:] = local_cspad_histogram[:]
else:
    MPI.COMM_WORLD.Reduce(sendbuf=[local_cspad_histogram, MPI.INT64_T],
                          recvbuf=[None, MPI.INT64_T],
                          op=MPI.SUM, root=0)
     
h5out.close()
totalTime = time.time()-startTime
hz = evtIdx/totalTime
print(f"rnk={MPI.COMM_WORLD.Get_rank():3d}  recvbuf=[None, MPI.INT64_T],
                          op=MPI.SUM, root=0)
    
h5out.close()
        

evts={evtIdx:4d} time={totalTime:.2f} resizes={numResizes:d} time_per_resize={timeResizes/float(numResizes):.2f} writing={timeWriting:.2f} Hz={hz:.2f}"  
#(MPI.COMM_WORLD.Get_rank(), evtIdx, totalTime, numResizes, timeResize/float(numResizes), timeWriting, hz)
 

This script could be launched by doing

...