Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

You can run this script with MPI: PS_SRV_NODES=2; mpirun -n 6 python myscriptexample.py

Code Block
languagepy
from psana import DataSource
import numpy as np
from smalldata import SmallData

# we assume you have some raw data to process that
# you can iterate over
raw_data = np.random.randn(100, 10)

# you can make custom callback functions that are
# called-back on every smd.event(...) call with
# the collected data as an argument
callbacks=[print]

smd = SmallData(servers=2,        # the number of file writers
                clients=4,        # the number of data processors
                def test_callback(data_dict):
    # called back for every event
    print(data_dict)

ds = DataSource(exp='xpptut13', run=1, dir='.tmp', filter=lambda x : True, batch_size=2)
smd = ds.smalldata(filename='my.h5', # the HDF5 file to put results in
                callbacks=callbacks)batch_size=5, callbacks=[test_callback])
run = next(ds.runs())
arrsum = None

for i,dataevt in enumerate(raw_data(run.events()):

    #myones do some processing ...

= np.ones(2, dtype=np.int)
    # save per-event datasmd.event(evt, myfloat=2.0, arrint=myones)
    # here "i"if arrsum is aNone:
 timestamp or other unique ID
    smd.event(i, mydata=np.sum(data))

arrsum = myones
    #else:
 or use nested dictionaries to structure the output
arrsum   += myones


if smd.event(i, {'group' : {'field' : np.arange(5)}})


# you can also save "summary data", typically at a lower frequency
if smd.summary:
    n_workers =summary:
    # add up array across "big data" cores and save to h5
    smd.sum(1arrsum) # reduceassumes acrossthat all cores have received events
    smd.save_summary(n_workers=n_workers)

# finish up any last communication{'summary_array' : arrsum}, summary_int=1)
smd.done()

 

Performance Measurements

...