You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 7 Next »

You can run this script with MPI: PS_SRV_NODES=2; mpirun -n 6 python example.py

from psana import DataSource
import numpy as np

# called back on each SRV node, for every smd.event() call below
def test_callback(data_dict):
    print(data_dict)
ds = DataSource(exp='xpptut13', run=1, dir='.tmp')
# batch_size here specifies how often the dictionary of information
# is sent to the SRV nodes
smd = ds.smalldata(filename='my.h5', batch_size=5, callbacks=[test_callback])
run = next(ds.runs())

# necessary (instead of "None") since some ranks may not receive events
# and the smd.sum() below could fail
arrsum = np.zeros((2), dtype=np.int)
for i,evt in enumerate(run.events()):
    myones = np.ones_like(arrsum)
    smd.event(evt, myfloat=2.0, arrint=myones)
    arrsum += myones

if smd.summary:
    smd.sum(arrsum)
    smd.save_summary({'summary_array' : arrsum}, summary_int=1)
smd.done()

 

Performance Measurements

Done by TJ Lane and cpo on Oct. 31, 2019

1M XTC Events from "xtcwriter -n 1000000 -e 0" on the test stand ffb (/ffb01/cpo/)
10,000 batch size (both for smalldata and DataSource)
10,000 cache size
"n" is the total number of cores given to mpirun on drp-tst-dev003 (a 20 core node)

Conclusions

  • small h5 production is a 10% perturbation on the event-loop time, and we can scale our way out if performance is an issue.
  • hdf5's VDS (done by "smd.join_files()") seems to behave well with 500 files each with 1M events, which is good because that is the one thing we can't scale.

Measure time for run.events() and smd.join_files()

=== time (sec) ==
 loop_time  join_time
 ---------  --------- 

SRV=0 n=16 [no smalldata]
10.3  --
11.7  --
10.6  -- 
10.3  -- 
10.3  --
10.4  --

SRV=1 n=17
11.7 .0063
11.5 .0066
11.6 .0066
11.4      .0067
12.1 .0061
11.6 .0065

SRV=2 n=18
11.5 .0072
11.3 .0076
11.3 .0074

SRV=4 n=20
11.7 .0105
11.4 .0105
11.2 .0103

import sys
import os
import numpy as np
from psana import DataSource
import time
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
xtc_dir = os.path.join(os.environ.get('TEST_XTC_DIR', os.getcwd()),'/ffb01/cpo')
ds = DataSource(exp='xpptut13', run=1, dir=xtc_dir, filter=lambda x : True, batch_size=10000)
run = next(ds.runs())
smd = ds.smalldata(filename='/ffb01/cpo/test.h5', batch_size=10000)
nevt = 0
start = time.time()
for nevt,evt in enumerate(run.events()):
    smd.event(evt, timestamp=evt.timestamp, one=1)
smd.done()
middle = time.time()
tottime = middle-start
maxtime = comm.reduce(tottime,op=MPI.MAX)
if rank==0:
    print('*** proc time:',maxtime)
# note that we commented out the automatic join_files() in the code in order to time it here.
if rank==size-1:
    smd.join_files()
    print('*** join time:',time.time()-middle)

Measure smd.join_files() for many files


join times (seconds):
each file was 1M data points

#files time
 1 0.0065
 2 0.0075
 3 0.0104
100 0.5900
500 1.2000

  • No labels