Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Parallel Writing with Big Data

NOTE: this approach is deprecatednot currently supported at LCLS since maintenance of the mpi-aware hdf5 package is time-consuming and the same parallel-writing can often be achieved using the newer "virtual dataset" feature of hdf5.

...

Code Block
languagepython
import numpy as np
from mpi4py import MPI
import math
import psana
import h5py
import time
startTime = time.time()
ds = psana.DataSource('exp=xpptut15:run=54:smd')
 
def hasCspad(evt, aliasMap):
    for key in evt.keys():
        if 'cspad' == aliasMap.alias(key.src()):
            return True
    return False
 
LASER_ON_EVR_CODE = 92
CSPAD_HPROJ_LEN = 1738
NUM_EVENTS = 120  # set to non-zero to stop early
RESIZE_GROW = 100  # how many events between collective resizes
currentDsetSize = 0
numResizes=0
 
chunkSizeSmallData = 2048
h5out = h5py.File("saved_data_for_xpptut15_run54_cspad_hproj.h5", 'w',
                  driver='mpio', comm=MPI.COMM_WORLD)
eventDataGroup = h5out.create_group('EventData')
evtSecDs = eventDataGroup.create_dataset('event_seconds',
  (currentDsetSize,), dtype='i4', chunks=(chunkSizeSmallData,), maxshape=(None,))
evtNanoDs = eventDataGroup.create_dataset('event_nanoseconds',
  (currentDsetSize,), dtype='i4', chunks=(chunkSizeSmallData,), maxshape=(None,))
evtFiducialsDs = eventDataGroup.create_dataset('event_fiducials',
  (currentDsetSize,), dtype='i4', chunks=(chunkSizeSmallData,), maxshape=(None,))
ebeamL3Ds = eventDataGroup.create_dataset('ebeam_L3',
  (currentDsetSize,), dtype='f4', chunks=(chunkSizeSmallData,), maxshape=(None,))
laserOnDs = eventDataGroup.create_dataset('laser_on',
  (currentDsetSize,), dtype='i1', chunks=(chunkSizeSmallData,), maxshape=(None,))
cspadHprojDs = eventDataGroup.create_dataset('cspadHproj',
                          (currentDsetSize,CSPAD_HPROJ_LEN), dtype='f4', 
                          chunks=(2000,CSPAD_HPROJ_LEN), maxshape=(None, CSPAD_HPROJ_LEN))
 
cspad = psana.Detector('cspad', ds.env())
 
histogram_bin_edges = [-500, -400, -200, -150, -100, -50, -25, 0, 5, 10, 15, 20, 25, 50, 100, 200, 500]
local_cspad_histogram = np.zeros((len(histogram_bin_edges)-1,), np.int64)
timeResize = 0.0
timeWriting = 0.0
 
nextDsIdx = -1
for evtIdx, evt in enumerate(ds.events()):
  if NUM_EVENTS > 0 and evtIdx >= NUM_EVENTS:
    break
  eventId = evt.get(psana.EventId)
  evr = evt.get(psana.EvrData.DataV4, psana.Source('evr0'))
  ebeam = evt.get(psana.Bld.BldDataEBeamV7, psana.Source('EBeam'))
 
  # check for event that has everything we want to write
  if (eventId is None) or (evr is None) or (ebeam is None) or \
     (not hasCspad(evt, ds.env().aliasMap())):
    continue
  nextDsIdx += 1
  
  if (nextDsIdx >= currentDsetSize):
    t0 = time.time()
    # collectively resize all the chunked datasets
    currentDsetSize += RESIZE_GROW
    numResizes += 1
    # this MPI_Barrier is not necessary, but helps to illustrate the collective operation
    MPI.COMM_WORLD.Barrier()
    evtSecDs.resize((nextDsIdx+RESIZE_GROW,))
    evtNanoDs.resize((nextDsIdx+RESIZE_GROW,))
    evtFiducialsDs.resize((nextDsIdx+RESIZE_GROW,))
    ebeamL3Ds.resize((nextDsIdx+RESIZE_GROW,))
    laserOnDs.resize((nextDsIdx+RESIZE_GROW,))
    cspadHprojDs.resize((nextDsIdx+RESIZE_GROW,CSPAD_HPROJ_LEN))
    timeResize += time.time()-t0
  # only process this ranks events
  if nextDsIdx % MPI.COMM_WORLD.Get_size() != MPI.COMM_WORLD.Get_rank():
    continue
  t0 = time.time()
  # expensive per rank processing:
  cspadImage = cspad.image(evt)
  local_cspad_histogram += np.histogram(cspad.calib(evt), 
                               histogram_bin_edges)[0]
  cspadHproj = np.sum(cspadImage, 0)
 
  evtSecDs[nextDsIdx] = eventId.time()[0]
  evtNanoDs[nextDsIdx] = eventId.time()[1]
  evtFiducialsDs[nextDsIdx] = eventId.fiducials()
  ebeamL3Ds[nextDsIdx] = ebeam.ebeamL3Energy()
  laserOnDs[nextDsIdx] = evr.present(LASER_ON_EVR_CODE)
  cspadHprojDs[nextDsIdx,:] = cspadHproj
  timeWriting += time.time()-t0
## finished processing, collectively form summary
# datasets, and get final historgram
summaryGroup = h5out.create_group('Summary')
summaryGroup.create_dataset('cspad_histogram_bins', 
                                data = histogram_bin_edges)
finalHistDs = summaryGroup.create_dataset('cspad_histogram_values', 
                             (len(local_cspad_histogram),), dtype='i8')
 
if MPI.COMM_WORLD.Get_rank()==0:
    finalHistogram = np.zeros_like(local_cspad_histogram)
    MPI.COMM_WORLD.Reduce(sendbuf=[local_cspad_histogram, MPI.INT64_T],
                          recvbuf=[finalHistogram, MPI.INT64_T],
                          op=MPI.SUM, root=0)
    finalHistDs[:] = local_cspad_histogram[:]
else:
    MPI.COMM_WORLD.Reduce(sendbuf=[local_cspad_histogram, MPI.INT64_T],
                          recvbuf=[None, MPI.INT64_T],
                          op=MPI.SUM, root=0)
     
h5out.close()
totalTime = time.time()-startTime
hz = evtIdx/totalTime
print (f"rnk=%3d{MPI.COMM_WORLD.Get_rank():3d} evts=%4d{evtIdx:4d} time=%{totalTime:.2f} resizes=%d{numResizes:d} time_per_resize=%{timeResizes/float(numResizes):.2f} writing=%{timeWriting:.2f} Hz=%{hz:.2f}" % \
    #(MPI.COMM_WORLD.Get_rank(), evtIdx, totalTime, numResizes, timeResize/float(numResizes), timeWriting, hz)
  

This script could be launched by doing

...