Page History
...
Code Block | ||
---|---|---|
| ||
from psana import DataSource import numpy as np # called back on each SRV node, for every smd.event() call below def test_callback(data_dict): print(data_dict) ds = DataSource(exp='xpptut13', run=1, dir='.tmp') # batch_size here specifies how often the dictionary of information # is sent to the SRV nodes smd = ds.smalldata(filename='my.h5', batch_size=5, callbacks=[test_callback]) run = next(ds.runs()) # necessary (instead of "None") since some ranks may not receive events # and the smd.sum() below could fail arrsum = np.zeros((2), dtype=np.int) for i,evt in enumerate(run.events()): myones = np.ones_like(arrsum) smd.event(evt, myfloat=2.0, arrint=myones) arrsum += myones if smd.summary: smd.sum(arrsum) smd.save_summary({'summary_array' : arrsum}, summary_int=1) smd.done() |
Performance Measurements
Done by TJ Lane and cpo on Oct. 31, 2019
1M XTC Events from "xtcwriter -n 1000000 -e 0" on the test stand ffb (/ffb01/cpo/)
10,000 batch size (both for smalldata and DataSource)
10,000 cache size
"n" is the total number of cores given to mpirun on drp-tst-dev003 (a 20 core node)
Conclusions
- small h5 production is a 10% perturbation on the event-loop time, and we can scale our way out if performance is an issue.
- hdf5's VDS (done by "smd.join_files()") seems to behave well with 500 files each with 1M events, which is good because that is the one thing we can't scale.
Measure time for run.events() and smd.join_files()
=== time (sec) ==
loop_time join_time
--------- ---------
SRV=0 n=16 [no smalldata]
10.3 --
11.7 --
10.6 --
10.3 --
10.3 --
10.4 --
SRV=1 n=17
11.7 .0063
11.5 .0066
11.6 .0066
11.4 .0067
12.1 .0061
11.6 .0065
SRV=2 n=18
11.5 .0072
11.3 .0076
11.3 .0074
SRV=4 n=20
11.7 .0105
11.4 .0105
11.2 .0103
Code Block | ||
---|---|---|
| ||
import sys import os import numpy as np from psana import DataSource import time from mpi4py import MPI comm = MPI.COMM_WORLD rank = comm.Get_rank() size = comm.Get_size() xtc_dir = os.path.join(os.environ.get('TEST_XTC_DIR', os.getcwd()),'/ffb01/cpo') ds = DataSource(exp='xpptut13', run=1, dir=xtc_dir, filter=lambda x : True, batch_size=10000) run = next(ds.runs()) smd = ds.smalldata(filename='/ffb01/cpo/test.h5', batch_size=10000) nevt = 0 start = time.time() for nevt,evt in enumerate(run.events()): smd.event(evt, timestamp=evt.timestamp, one=1) smd.done() middle = time.time() tottime = middle-start maxtime = comm.reduce(tottime,op=MPI.MAX) if rank==0: print('*** proc time:',maxtime) # note that we commented out the automatic join_files() in the code in order to time it here. if rank==size-1: smd.join_files() print('*** join time:',time.time()-middle) |
Measure smd.join_files() for many files
join times (seconds):
each file was 1M data points
#files time
1 0.0065
2 0.0075
3 0.0104
100 0.5900
500 1.2000
MPI Task Structure
To allow for scaling, many hdf5 files are written, one per "SRV" node. The total number of SRV nodes is defined by the environment variable PS_SRV_NODES (defaults to 0). These many hdf5 files are joined by psana into what appears to be one file using the hdf5 "virtual dataset" feature.
...