If you have xtc files (with associated smd files) you can run this script with MPI: PS_SRV_NODES=2; mpirun -n 6 python example.py
It also works on one core with: python example.py. See MPI rank/task diagram here.
from psana import DataSource import numpy as np # called back on each SRV node, for every smd.event() call below def test_callback(data_dict): print(data_dict) ds = DataSource(exp='xpptut13', run=1, dir='.tmp') # batch_size here specifies how often the dictionary of information # is sent to the SRV nodes smd = ds.smalldata(filename='my.h5', batch_size=5, callbacks=[test_callback]) run = next(ds.runs()) # necessary (instead of "None") since some ranks may not receive events # and the smd.sum() below could fail arrsum = np.zeros((2), dtype=np.int) for i,evt in enumerate(run.events()): myones = np.ones_like(arrsum) smd.event(evt, myfloat=2.0, arrint=myones) arrsum += myones if smd.summary: smd.sum(arrsum) smd.save_summary({'summary_array' : arrsum}, summary_int=1) smd.done()
Performance Measurements
Done by TJ Lane and cpo on Oct. 31, 2019
1M XTC Events from "xtcwriter -n 1000000 -e 0" on the test stand ffb (/ffb01/cpo/)
10,000 batch size (both for smalldata and DataSource)
10,000 cache size
"n" is the total number of cores given to mpirun on drp-tst-dev003 (a 20 core node)
Conclusions
- small h5 production is a 10% perturbation on the event-loop time, and we can scale our way out if performance is an issue.
- hdf5's VDS (done by "smd.join_files()") seems to behave well with 500 files each with 1M events, which is good because that is the one thing we can't scale.
Measure time for run.events() and smd.join_files()
=== time (sec) ==
loop_time join_time
--------- ---------
SRV=0 n=16 [no smalldata]
10.3 --
11.7 --
10.6 --
10.3 --
10.3 --
10.4 --
SRV=1 n=17
11.7 .0063
11.5 .0066
11.6 .0066
11.4 .0067
12.1 .0061
11.6 .0065
SRV=2 n=18
11.5 .0072
11.3 .0076
11.3 .0074
SRV=4 n=20
11.7 .0105
11.4 .0105
11.2 .0103
import sys import os import numpy as np from psana import DataSource import time from mpi4py import MPI comm = MPI.COMM_WORLD rank = comm.Get_rank() size = comm.Get_size() xtc_dir = os.path.join(os.environ.get('TEST_XTC_DIR', os.getcwd()),'/ffb01/cpo') ds = DataSource(exp='xpptut13', run=1, dir=xtc_dir, filter=lambda x : True, batch_size=10000) run = next(ds.runs()) smd = ds.smalldata(filename='/ffb01/cpo/test.h5', batch_size=10000) nevt = 0 start = time.time() for nevt,evt in enumerate(run.events()): smd.event(evt, timestamp=evt.timestamp, one=1) smd.done() middle = time.time() tottime = middle-start maxtime = comm.reduce(tottime,op=MPI.MAX) if rank==0: print('*** proc time:',maxtime) # note that we commented out the automatic join_files() in the code in order to time it here. if rank==size-1: smd.join_files() print('*** join time:',time.time()-middle)
Measure smd.join_files() for many files
join times (seconds):
each file was 1M data points
#files time
1 0.0065
2 0.0075
3 0.0104
100 0.5900
500 1.2000