Page History
You can run this script with MPI: PS_SRV_NODES=2; mpirun -n 6 python myscriptexample.py
Code Block | ||
---|---|---|
| ||
from psana import DataSource import numpy as np from smalldata import SmallData # we assume you have some raw data to process that # you can iterate over raw_data = np.random.randn(100, 10) # you can make custom callback functions that are # called-back on every smd.event(...) call with # the collected data as an argument callbacks=[print] smd = SmallData(servers=2, # the number of file writers clients=4, # the number of data processors def test_callback(data_dict): # called back for every event print(data_dict) ds = DataSource(exp='xpptut13', run=1, dir='.tmp', filter=lambda x : True, batch_size=2) smd = ds.smalldata(filename='my.h5', # the HDF5 file to put results in callbacks=callbacks)batch_size=5, callbacks=[test_callback]) run = next(ds.runs()) arrsum = None for i,dataevt in enumerate(raw_data(run.events()): #myones do some processing ... = np.ones(2, dtype=np.int) # save per-event datasmd.event(evt, myfloat=2.0, arrint=myones) # here "i"if arrsum is aNone: timestamp or other unique ID smd.event(i, mydata=np.sum(data)) arrsum = myones #else: or use nested dictionaries to structure the output arrsum += myones if smd.event(i, {'group' : {'field' : np.arange(5)}}) # you can also save "summary data", typically at a lower frequency if smd.summary: n_workers =summary: # add up array across "big data" cores and save to h5 smd.sum(1arrsum) # reduceassumes acrossthat all cores have received events smd.save_summary(n_workers=n_workers) # finish up any last communication{'summary_array' : arrsum}, summary_int=1) smd.done() |
Performance Measurements
...
Overview
Content Tools