Page History
...
- With the mpi hdf5 file driver, you must collectively
- create the file
- create the datastes
- resize the datasets
- Individual ranks may write to different elements of the dataset
- To collectively increase the size of the datasets, all ranks read the small data and resize when all items are presentall of the small data events and collectively resize every 100 events or so.
- This means that every 100 events, all ranks are doing the same thing. In between these resizes, they may be able to do parallel processing.
- The extend of parallel processing performed is limited by the filesystem. Each rank is independently writing to different points in the Hdf5 file, however they filesystem may serialize these operations. Setting the lustre stripe size on the output file ahead of time can help - but in short, getting good performance out of parallel disk output can be involved.
- To detect that the cspad is present without reading the big data behind it, we inspect the EventKeys
- This is the trickiest part - it involves checking that there is an event key whose source has the alias 'cspad'
- The NUM_EVENTS _TO_WRITE can be adjusted to have the script run faster (it is presently set to more than the number of events in the run)stop earlier.
- Since datasets must be created collectively, it is much more awkward (but not impossible) to not create the cspad hproj dataset until the first event is read, so we assume the size of 1691 from previous workbut it is not to hard to figure out the size ahead of time.
Since this script collectively resizes each dataset on each event, it is not performing parallel processing. The next step is to resize in blocks greater than the world size less frequently. When the resize occurs, it is like having an MPI_Barrier. We want different ranks to process and write to their slots in the dataset in parallel.
Code Block | ||
---|---|---|
| ||
import numpy as np from mpi4py import MPI import math import psana import h5py import time startTime = time.time() ds = psana.DataSource('exp=xpptut15:run=54:smd') def hasCspad(evt, aliasMap): for key in evt.keys(): if 'cspad' == aliasMap.alias(key.src()): return True return False LASER_ON_EVR_CODE = 92 CSPAD_HPROJ_LEN = 16911738 NUM_EVENTS_TO_WRITE=1300 ds = psana.DataSource('exp=xpptut15:run=54:smd') = 120 # set to non-zero to stop early RESIZE_GROW = 100 # how many events between collective resizes currentDsetSize = 0 numResizes=0 chunkSizeSmallData = 2048 h5out = h5py.File("my_saved_data_for_xppdaq12xpptut15_run54_cspad_hproj.h5", 'w', driver='mpio', comm=MPI.COMM_WORLD) eventDataGroup = h5out.create_group('EventData') evtSecDs = eventDataGroup.create_dataset('event_seconds', (currentDsetSize,), dtype='i4', (0chunks=(chunkSizeSmallData,), maxshape=(None,)) evtNanoDs = eventDataGroup.create_dataset('event_nanoseconds', (currentDsetSize,), dtype='i4', chunks=True(chunkSizeSmallData,), maxshape=(None,)) evtNanoDsevtFiducialsDs = eventDataGroup.create_dataset('event_nanosecondsfiducials', (0(currentDsetSize,), dtype='i4', chunks=True(chunkSizeSmallData,), maxshape=(None,)) ebeamL3Ds = eventDataGroup.create_dataset('ebeam_L3', (0(currentDsetSize,), dtype='f4', chunks=True(chunkSizeSmallData,), maxshape=(None,)) laserOnDs = eventDataGroup.create_dataset('laser_on', (0(currentDsetSize,), dtype='i1', chunks=True(chunkSizeSmallData,), maxshape=(None,)) cspadHprojDs = eventDataGroup.create_dataset('cspadHproj', (0currentDsetSize,CSPAD_HPROJ_LEN), dtype='f4', chunks=True(2000,CSPAD_HPROJ_LEN), maxshape=(None, CSPAD_HPROJ_LEN)) cspad = psana.Detector('cspad', ds.env()) histogram_bin_edges = [-500, -400, -200, -150, -100, -50, -25, 0, 5, 10, 15, 20, 25, 50, 100, 200, 500] local_cspad_histogram = np.zeros((len(histogram_bin_edges)-1,), np.int64) timeResize = 0.0 timeWriting = 0.0 nextDsIdx = -1 for evtIdx, evt in enumerate(ds.events()): if NUM_EVENTS > 0 and evtIdx >= NUM_EVENTS: break eventId = evt.get(psana.EventId) evr = evt.get(psana.EvrData.DataV4, psana.Source('evr0')) ebeam = evt.get(psana.Bld.BldDataEBeamV7, psana.Source('EBeam')) # check for event that has everything we want to write if (eventId is None) or (evr is None) or (ebeam is None) or \ (not hasCspad(evt, ds.env().aliasMap())): continue nextDsIdx += 1 if (ebeamnextDsIdx is>= NonecurrentDsetSize): or (not hasCspad(evt, ds.env().aliasMap())): continue t0 = time.time() # collectively resize all the chunked datasets currentDsetSize += RESIZE_GROW nextDsIdxnumResizes += 1 if nextDsIdx >= NUM_EVENTS_TO_WRITE: break # this MPI_Barrier is not necessary, but helps to illustrate the collective operation # collectively resize chunked datasetsMPI.COMM_WORLD.Barrier() evtSecDs.resize((nextDsIdx+RESIZE_GROW,)) evtSecDsevtNanoDs.resize((nextDsIdx+1RESIZE_GROW,)) evtNanoDsevtFiducialsDs.resize((nextDsIdx+1RESIZE_GROW,)) ebeamL3Ds.resize((nextDsIdx+1RESIZE_GROW,)) laserOnDs.resize((nextDsIdx+1RESIZE_GROW,)) cspadHprojDs.resize((nextDsIdx+1RESIZE_GROW, CSPAD_HPROJ_LEN)) timeResize += time.time()-t0 # only process this ranks events if nextDsIdx % MPI.COMM_WORLD.Get_size() != MPI.COMM_WORLD.Get_rank(): continue t0 = continuetime.time() # useexpensive detectorper xface for cspad image rank processing: cspadImage = cspad.image(evt) local_cspad_histogram += np.histogram(cspad.calib(evt), histogram_bin_edges)[0] cspadHproj = np.sum(cspadImage, 0) evtSecDs[nextDsIdx] = eventId.time()[0] evtNanoDs[nextDsIdx] = eventId.time()[1] evtFiducialsDs[nextDsIdx] = eventId.fiducials() ebeamL3Ds[nextDsIdx] = ebeam.ebeamL3Energy() laserOnDs[nextDsIdx] = evr.present(LASER_ON_EVR_CODE) laserOnDscspadHprojDs[nextDsIdx,:] = evr.present(LASER_ON_EVR_CODE) cspadHprojDs[nextDsIdx,:] = cspadHproj[:] cspadHproj timeWriting += time.time()-t0 ## finished processing, collectively form summary # datasets, and get final historgram summaryGroup = h5out.create_group('Summary') summaryGroup.create_dataset('cspad_histogram_bins', data = histogram_bin_edges) finalHistDs = summaryGroup.create_dataset('cspad_histogram_values', (len(local_cspad_histogram),), dtype='i8') if MPI.COMM_WORLD.Get_rank()==0: finalHistogram = np.zeros_like(local_cspad_histogram) MPI.COMM_WORLD.Reduce(sendbuf=[local_cspad_histogram, MPI.INT64_T], recvbuf=[finalHistogram, MPI.INT64_T], op=MPI.SUM, root=0) finalHistDs[:] = local_cspad_histogram[:] else: MPI.COMM_WORLD.Reduce(sendbuf=[local_cspad_histogram, MPI.INT64_T], recvbuf=[None, MPI.INT64_T], T], op=MPI.SUM, root=0) h5out.close() totalTime = time.time()-startTime hz = evtIdx/totalTime print "rnk=%3d evts=%4d time=%.2f resizes=%d time_per_resize=%.2f op=MPI.SUM, root=0)writing=%.2f Hz=%.2f" % \ h5out.close(MPI.COMM_WORLD.Get_rank() , evtIdx, totalTime, numResizes, timeResize/float(numResizes), timeWriting, hz) |
This script could be launched by doing
...
Overview
Content Tools