Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

It is straightforward to write a script that runs on one core.  When running multi-core with mpi one has to use the small data "callbacks" kwarg to receive the data gathered from all nodes, as shown in this line from the example .  An example script is here:  

Code Block
smd = ds.smalldata(batch_size=5, callbacks=[my_smalldata])

It is also necessary to have one core reserved to do the gathering, so you need a line like this:

languagepy
import os
import numpy as np
from psana import DataSource
from psmon import publish
from psmon.plots import XYPlot,Image
from collections import deque

from mpi4py import MPI
numworkers = MPI.COMM_WORLD.Get_size()-1
if numworkers==0: numworkers=1 # the single core case (no mpi)

os.environ['PS_SRV_NODES']='1' # one mpi core gathers/plots data

mydeque=deque(maxlen=25)
lastimg=None
numevents=0
numendrun=0

def my_smalldata(data_dict): # one core gathers all data from mpi workers
    global numevents,lastimg,numendrun,mydeque
    if 'endrun' in data_dict:
        numendrun+=1
        if numendrun==numworkers:
            print('Received endrun from all workers. Resetting data.')
            numendrun=0
            numevents=0
            mydeque=deque(maxlen=25)
        return
    numevents += 1
    if 'opal' in data_dict:
        lastimg = data_dict['opal']
    mydeque.append(data_dict['opalsum'])
    if numevents%100==0: # update plots around 1Hz
        print('event:',numevents)
        myxyplot = XYPlot(numevents, "Last 25 Sums", np.arange(len(mydeque)), np
.array(mydeque), formats='o')
        publish.send("OPALSUMS", myxyplot)
        if lastimg is not None: # opal image is not sent all the time
            myimgplot = Image(numevents, "Opal Image", lastimg)
            publish.send("OPALIMG", myimgplot)

while 1: # mpi worker processes
    ds = DataSource(shmem='rix')
    smd = ds.smalldata(batch_size=5, callbacks=[my_smalldata])
    for myrun in ds.runs():
        opal = myrun.Detector('atmopal')
        for nevt,evt in enumerate(myrun.events()):
            mydict={}
            image = opal.raw.image(evt)
            if image is None: continue
            # do as much work as possible in the workers
            # don't send large data all the time, if possible
            if nevt%10==0: mydict['opal']=image
            mydict['opalsum']=np.sum(image)
            smd.event(evt,mydict)
        smd.event(evt,{'endrun':1}) # tells gatherer to reset plots
Code Block
os.environ['PS_SRV_NODES']='1'


When running multi-node mpi there are also some complexities propagating the environment to remote nodes: the way to address that is described in this link.  

...