Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Environment

To obtain the environment to run psana2, execute the following:

Code Block
source /reg/g/psdm/sw/conda2/manage/bin/psconda.sh

Note that psana2 is not compatible with psana1, so environments must activate one or the other, but not both.

Example Script

If If you have xtc files (with associated smd files) you can run this script with MPI: PS_SRV_NODES=2; mpirun -n 6 python example.py

...

Code Block
languagepy
from psana import DataSource
import numpy as np

# called back on each SRV node, for every smd.event() call below
def test_callback(data_dict):
    print(data_dict)
ds = DataSource(exp='xpptut13', run=1, dir='.tmp')
# batch_size here specifies how often the dictionary of information
# is sent to the SRV nodes
smd = ds.smalldata(filename='my.h5', batch_size=5, callbacks=[test_callback])
run = next(ds.runs())

# necessary (instead of "None") since some ranks may not receive events
# and the smd.sum() below could fail
arrsum = np.zeros((2), dtype=np.int)
for i,evt in enumerate(run.events()):
    myones = np.ones_like(arrsum)
    smd.event(evt, myfloat=2.0, arrint=myones)
    arrsum += myones

if smd.summary:
    smd.sum(arrsum)
    smd.save_summary({'summary_array' : arrsum}, summary_int=1)
smd.done()

Performance Measurements

Done by TJ Lane and cpo on Oct. 31, 2019

1M XTC Events from "xtcwriter -n 1000000 -e 0" on the test stand ffb (/ffb01/cpo/)
10,000 batch size (both for smalldata and DataSource)
10,000 cache size
"n" is the total number of cores given to mpirun on drp-tst-dev003 (a 20 core node)

Conclusions

...

.

...

done

...

()

...

Measure time for run.events() and smd.join_files()

...

Code Block
languagepy
import sys
import os
import numpy as np
from psana import DataSource
import time
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
xtc_dir = os.path.join(os.environ.get('TEST_XTC_DIR', os.getcwd()),'/ffb01/cpo')
ds = DataSource(exp='xpptut13', run=1, dir=xtc_dir, filter=lambda x : True, batch_size=10000)
run = next(ds.runs())
smd = ds.smalldata(filename='/ffb01/cpo/test.h5', batch_size=10000)
nevt = 0
start = time.time()
for nevt,evt in enumerate(run.events()):
    smd.event(evt, timestamp=evt.timestamp, one=1)
smd.done()
middle = time.time()
tottime = middle-start
maxtime = comm.reduce(tottime,op=MPI.MAX)
if rank==0:
    print('*** proc time:',maxtime)
# note that we commented out the automatic join_files() in the code in order to time it here.
if rank==size-1:
    smd.join_files()
    print('*** join time:',time.time()-middle)

Measure smd.join_files() for many files

...


MPI Task Structure

To allow for scaling, many hdf5 files are written, one per "SRV" node.  The total number of SRV nodes is defined by the environment variable PS_SRV_NODES (defaults to 0).  These many hdf5 files are joined by psana into what appears to be one file using the hdf5 "virtual dataset" feature.

...