Page History
You can run this script with MPI: mpirun -n 6 python myscript.py
Code Block | ||
---|---|---|
| ||
import numpy as np
from smalldata import SmallData
# we assume you have some raw data to process that
# you can iterate over
raw_data = np.random.randn(100, 10)
# you can make custom callback functions that are
# called-back on every smd.event(...) call with
# the collected data as an argument
callbacks=[print]
smd = SmallData(servers=2, # the number of file writers
clients=4, # the number of data processors
filename='my.h5', # the HDF5 file to put results in
callbacks=callbacks)
for i,data in enumerate(raw_data):
# do some processing ...
# save per-event data
# here "i" is a timestamp or other unique ID
smd.event(i, mydata=np.sum(data))
# or use nested dictionaries to structure the output
smd.event(i, {'group' : {'field' : np.arange(5)}})
# you can also save "summary data", typically at a lower frequency
if smd.summary:
n_workers = smd.sum(1) # reduce across all cores
smd.save_summary(n_workers=n_workers)
# finish up any last communication
smd.done() |
Performance Measurements
Done by TJ Lane and cpo on Oct. 31, 2019
1M XTC Events from "xtcwriter -n 1000000 -e 0" on the test stand ffb (/ffb01/cpo/)
10,000 batch size (both for smalldata and DataSource)
10,000 cache size
"n" is the total number of cores given to mpirun on drp-tst-dev003 (a 20 core node)
Conclusions
- small h5 production is a 10% perturbation on the event-loop time, and we can scale our way out if performance is an issue.
- hdf5's VDS (done by "smd.join_files()") seems to behave well with 500 files each with 1M events, which is good because that is the one thing we can't scale.
Measure time for run.events() and smd.join_files()
=== time (sec) ==
loop_time join_time
--------- ---------
SRV=0 n=16 [no smalldata]
10.3 --
11.7 --
10.6 --
10.3 --
10.3 --
10.4 --
SRV=1 n=17
11.7 .0063
11.5 .0066
11.6 .0066
11.4 .0067
12.1 .0061
11.6 .0065
SRV=2 n=18
11.5 .0072
11.3 .0076
11.3 .0074
SRV=4 n=20
11.7 .0105
11.4 .0105
11.2 .0103
Code Block | ||
---|---|---|
| ||
import sys import os import numpy as np from psana import DataSource import time from mpi4py import MPI comm = MPI.COMM_WORLD rank = comm.Get_rank() size = comm.Get_size() xtc_dir = os.path.join(os.environ.get('TEST_XTC_DIR', os.getcwd()),'/ffb01/cpo') ds = DataSource(exp='xpptut13', run=1, dir=xtc_dir, filter=lambda x : True, batch_size=10000) run = next(ds.runs()) smd = ds.smalldata(filename='/ffb01/cpo/test.h5', batch_size=10000) nevt = 0 start = time.time() for nevt,evt in enumerate(run.events()): smd.event(evt, timestamp=evt.timestamp, one=1) smd.done() middle = time.time() tottime = middle-start maxtime = comm.reduce(tottime,op=MPI.MAX) if rank==0: print('*** proc time:',maxtime) # note that we commented out the automatic join_files() in the code in order to time it here. if rank==size-1: smd.join_files() print('*** join time:',time.time()-middle) |
Measure smd.join_files() for many files
join times (seconds):
each file was 1M data points
#files time
1 0.0065
2 0.0075
3 0.0104
100 0.5900
500 1.2000
Overview
Content Tools