You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 12 Next »

If you have xtc files (with associated smd files) you can run this script with MPI: PS_SRV_NODES=2; mpirun -n 6 python example.py

It also works on one core with: python example.py.  See MPI rank/task diagram here.

This mechanism by defaults produces "aligned" datasets where missing values are padded (with NaN's for floats, and -99999 for integers).  To create an unaligned dataset (without padding) prefix the name of the variable with "unaligned_".

from psana import DataSource
import numpy as np

# called back on each SRV node, for every smd.event() call below
def test_callback(data_dict):
    print(data_dict)
ds = DataSource(exp='xpptut13', run=1, dir='.tmp')
# batch_size here specifies how often the dictionary of information
# is sent to the SRV nodes
smd = ds.smalldata(filename='my.h5', batch_size=5, callbacks=[test_callback])
run = next(ds.runs())

# necessary (instead of "None") since some ranks may not receive events
# and the smd.sum() below could fail
arrsum = np.zeros((2), dtype=np.int)
for i,evt in enumerate(run.events()):
    myones = np.ones_like(arrsum)
    smd.event(evt, myfloat=2.0, arrint=myones)
    arrsum += myones

if smd.summary:
    smd.sum(arrsum)
    smd.save_summary({'summary_array' : arrsum}, summary_int=1)
smd.done()

 

Performance Measurements

Done by TJ Lane and cpo on Oct. 31, 2019

1M XTC Events from "xtcwriter -n 1000000 -e 0" on the test stand ffb (/ffb01/cpo/)
10,000 batch size (both for smalldata and DataSource)
10,000 cache size
"n" is the total number of cores given to mpirun on drp-tst-dev003 (a 20 core node)

Conclusions

  • small h5 production is a 10% perturbation on the event-loop time, and we can scale our way out if performance is an issue.
  • hdf5's VDS (done by "smd.join_files()") seems to behave well with 500 files each with 1M events, which is good because that is the one thing we can't scale.

Measure time for run.events() and smd.join_files()

=== time (sec) ==
 loop_time  join_time
 ---------  --------- 

SRV=0 n=16 [no smalldata]
10.3  --
11.7  --
10.6  -- 
10.3  -- 
10.3  --
10.4  --

SRV=1 n=17
11.7 .0063
11.5 .0066
11.6 .0066
11.4      .0067
12.1 .0061
11.6 .0065

SRV=2 n=18
11.5 .0072
11.3 .0076
11.3 .0074

SRV=4 n=20
11.7 .0105
11.4 .0105
11.2 .0103

import sys
import os
import numpy as np
from psana import DataSource
import time
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
xtc_dir = os.path.join(os.environ.get('TEST_XTC_DIR', os.getcwd()),'/ffb01/cpo')
ds = DataSource(exp='xpptut13', run=1, dir=xtc_dir, filter=lambda x : True, batch_size=10000)
run = next(ds.runs())
smd = ds.smalldata(filename='/ffb01/cpo/test.h5', batch_size=10000)
nevt = 0
start = time.time()
for nevt,evt in enumerate(run.events()):
    smd.event(evt, timestamp=evt.timestamp, one=1)
smd.done()
middle = time.time()
tottime = middle-start
maxtime = comm.reduce(tottime,op=MPI.MAX)
if rank==0:
    print('*** proc time:',maxtime)
# note that we commented out the automatic join_files() in the code in order to time it here.
if rank==size-1:
    smd.join_files()
    print('*** join time:',time.time()-middle)

Measure smd.join_files() for many files


join times (seconds):
each file was 1M data points

#files time
 1 0.0065
 2 0.0075
 3 0.0104
100 0.5900
500 1.2000


MPI Task Structure

To allow for scaling, many hdf5 files are written, one per "SRV" node.  The total number of SRV nodes is defined by the environment variable PS_SRV_NODES (defaults to 0).  These many hdf5 files are joined by psana into what appears to be one file using the hdf5 "virtual dataset" feature.


  • No labels