Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

psana2 scripts can be run in real-time on shared memory.  There is some extra complexity compared to writing python code for the real-time GUI ami, for example:

  • you have to create plots and handle when when they update
  • you have to worry about data getting out of time order
  • you have to handle missing data
  • you have to reset plots on run boundaries (if that's the behavior you want)
  • you have to write code to handle the data gathered from multiple mpi cores (not required if you want to run on 1 core)
  • be aware that the shared memory python scripts "steal" event statistics from other shared memory instances (e.g. ami)

...

When running multi-core with mpi one has to use the small data "callbacks" kwarg to receive the data gathered from all nodes.  An example multi-core script is below (also works on 1 core).  A pure 1-core script is simpler (no data "gathering" needed: can just be a loop over events with plot updates every N events).  For multi-core, this can be run with "mpirun -n 3 python <scriptname>.py" and the two plots can be viewed on the same node with "psplot -a 1 OPALSUMS OPALIMG" (see Visualization Tools for "psplot" options).  It can also be run in the usual offline manner by changing the DataSource line.  It is written in a way to keep running even when the DAQ is restarted:

Code Block
languagepy
import os
import numpy as np
from psana import DataSource
from psmon import publish
from psmon.plots import XYPlot,Image
from collections import deque

from mpi4py import MPI
numworkers = MPI.COMM_WORLD.Get_size()-1
if numworkers==0: numworkers=1 # the single core case (no mpi)

os.environ['PS_SRV_NODES']='1' # one mpi core gathers/plots data

mydeque=deque(maxlen=25)
lastimg=None
numevents=0
numendrun=0

def my_smalldata(data_dict): # one core gathers all data from mpi workers
    global numevents,lastimg,numendrun,mydeque
    if 'endrun' in data_dict:
        numendrun+=1
        if numendrun==numworkers:
            print('Received endrun from all workers. Resetting data.')
            numendrun=0
            numevents=0
            mydeque=deque(maxlen=25)
        return
    numevents += 1
    if 'opal' in data_dict:
        lastimg = data_dict['opal']
    mydeque.append(data_dict['opalsum'])
    if numevents%100==0: # update plots around 1Hz
        print('event:',numevents)
        myxyplot = XYPlot(numevents, "Last 25 Sums", np.arange(len(mydeque)), np
.array(mydeque), formats='o')
        publish.send("OPALSUMS", myxyplot)
        if lastimg is not None: # opal image is not sent all the time
            myimgplot = Image(numevents, "Opal Image", lastimg)
            publish.send("OPALIMG", myimgplot)

while 1: # mpi worker processes
    ds = DataSource(shmem='rix')
    smd = ds.smalldata(batch_size=5, callbacks=[my_smalldata])
    for myrun in ds.runs():
        opal = myrun.Detector('atmopal')
        for nevt,evt in enumerate(myrun.events()):
            mydict={}
            image = opal.raw.image(evt)
            if image is None: continue
            # do as much work as possible in the workers
            # don't send large data all the time, if possible
            if nevt%10==0: mydict['opal']=image
            mydict['opalsum']=np.sum(image)
            smd.event(evt,mydict)
        smd.event(evt,{'endrun':1}) # tells gatherer to reset plots

...