Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagepy
from psana import DataSource
import numpy as np

# called back on each SRV node, for every smd.event() call below
def test_callback(data_dict):
    print(data_dict)
ds = DataSource(exp='xpptut13', run=1, dir='.tmp')
# batch_size here specifies how often the dictionary of information
# is sent to the SRV nodes
smd = ds.smalldata(filename='my.h5', batch_size=5, callbacks=[test_callback])
run = next(ds.runs())

# necessary (instead of "None") since some ranks may not receive events
# and the smd.sum() below could fail
arrsum = np.zeros((2), dtype=np.int)
for i,evt in enumerate(run.events()):
    myones = np.ones_like(arrsum)
    smd.event(evt, myfloat=2.0, arrint=myones)
    arrsum += myones

if smd.summary:
    smd.sum(arrsum)
    smd.save_summary({'summary_array' : arrsum}, summary_int=1)
smd.done()

 


Performance Measurements

Done by TJ Lane and cpo on Oct. 31, 2019

1M XTC Events from "xtcwriter -n 1000000 -e 0" on the test stand ffb (/ffb01/cpo/)
10,000 batch size (both for smalldata and DataSource)
10,000 cache size
"n" is the total number of cores given to mpirun on drp-tst-dev003 (a 20 core node)

Conclusions

  • small h5 production is a 10% perturbation on the event-loop time, and we can scale our way out if performance is an issue.
  • hdf5's VDS (done by "smd.join_files()") seems to behave well with 500 files each with 1M events, which is good because that is the one thing we can't scale.

Measure time for run.events() and smd.join_files()

=== time (sec) ==
 loop_time  join_time
 ---------  --------- 

SRV=0 n=16 [no smalldata]
10.3  --
11.7  --
10.6  -- 
10.3  -- 
10.3  --
10.4  --

SRV=1 n=17
11.7 .0063
11.5 .0066
11.6 .0066
11.4      .0067
12.1 .0061
11.6 .0065

SRV=2 n=18
11.5 .0072
11.3 .0076
11.3 .0074

SRV=4 n=20
11.7 .0105
11.4 .0105
11.2 .0103

Code Block
languagepy
import sys
import os
import numpy as np
from psana import DataSource
import time
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
xtc_dir = os.path.join(os.environ.get('TEST_XTC_DIR', os.getcwd()),'/ffb01/cpo')
ds = DataSource(exp='xpptut13', run=1, dir=xtc_dir, filter=lambda x : True, batch_size=10000)
run = next(ds.runs())
smd = ds.smalldata(filename='/ffb01/cpo/test.h5', batch_size=10000)
nevt = 0
start = time.time()
for nevt,evt in enumerate(run.events()):
    smd.event(evt, timestamp=evt.timestamp, one=1)
smd.done()
middle = time.time()
tottime = middle-start
maxtime = comm.reduce(tottime,op=MPI.MAX)
if rank==0:
    print('*** proc time:',maxtime)
# note that we commented out the automatic join_files() in the code in order to time it here.
if rank==size-1:
    smd.join_files()
    print('*** join time:',time.time()-middle)

Measure smd.join_files() for many files


join times (seconds):
each file was 1M data points

#files time
 1 0.0065
 2 0.0075
 3 0.0104
100 0.5900
500 1.2000


MPI Task Structure

To allow for scaling, many hdf5 files are written, one per "SRV" node.  The total number of SRV nodes is defined by the environment variable PS_SRV_NODES (defaults to 0).  These many hdf5 files are joined by psana into what appears to be one file using the hdf5 "virtual dataset" feature.

...