Page History
...
Typically psmon is used for publishing results to realtime plots in the callback, publishing updates every "N" events. See this link for psmon examples: Visualization Tools.
It is straightforward to write a script that runs on one core. When When running multi-core with mpi one has to use the small data "callbacks" kwarg to receive the data gathered from all nodes. An example script is heremulti-core script is below (also works on 1 core). A pure 1-core script is simpler (no data "gathering" needed: can just be a loop over events with plot updates every N events). For multi-core, this can be run with "mpirun -n 3 python <scriptname>.py" and the two plots can be viewed on the same node with "psplot -a 1 OPALSUMS OPALIMG". It can also be run in the usual offline manner by changing the DataSource line:
Code Block | ||
---|---|---|
| ||
import os import numpy as np from psana import DataSource from psmon import publish from psmon.plots import XYPlot,Image from collections import deque from mpi4py import MPI numworkers = MPI.COMM_WORLD.Get_size()-1 if numworkers==0: numworkers=1 # the single core case (no mpi) os.environ['PS_SRV_NODES']='1' # one mpi core gathers/plots data mydeque=deque(maxlen=25) lastimg=None numevents=0 numendrun=0 def my_smalldata(data_dict): # one core gathers all data from mpi workers global numevents,lastimg,numendrun,mydeque if 'endrun' in data_dict: numendrun+=1 if numendrun==numworkers: print('Received endrun from all workers. Resetting data.') numendrun=0 numevents=0 mydeque=deque(maxlen=25) return numevents += 1 if 'opal' in data_dict: lastimg = data_dict['opal'] mydeque.append(data_dict['opalsum']) if numevents%100==0: # update plots around 1Hz print('event:',numevents) myxyplot = XYPlot(numevents, "Last 25 Sums", np.arange(len(mydeque)), np .array(mydeque), formats='o') publish.send("OPALSUMS", myxyplot) if lastimg is not None: # opal image is not sent all the time myimgplot = Image(numevents, "Opal Image", lastimg) publish.send("OPALIMG", myimgplot) while 1: # mpi worker processes ds = DataSource(shmem='rix') smd = ds.smalldata(batch_size=5, callbacks=[my_smalldata]) for myrun in ds.runs(): opal = myrun.Detector('atmopal') for nevt,evt in enumerate(myrun.events()): mydict={} image = opal.raw.image(evt) if image is None: continue # do as much work as possible in the workers # don't send large data all the time, if possible if nevt%10==0: mydict['opal']=image mydict['opalsum']=np.sum(image) smd.event(evt,mydict) smd.event(evt,{'endrun':1}) # tells gatherer to reset plots |
...