You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 18 Next »

Background

Before describing how to accumulate events in AMI, it is important to understand the architecture of AMI, which consists of workers, local collectors, a global collector, a manager, and a client processes that run on different compute nodes.

The client assembles a graph which is pushed to the manager that then distributes it to the workers and collectors for execution. Nodes in the graph are "colored" worker, localCollector, and globalCollector. Each type of process is responsible for executing a subgraph of nodes of the appropriate color (ie workers execute nodes with the color worker, ...). Events in AMI are distributed to workers by Psana. AMI provides different types of graph nodes for accumulating events across this distributed architecture. Nodes which accumulate events are called global operations and GUI graph nodes are expanded into 3 underlying graph nodes of the same type but with different colors.

AMI processes

Average

(Average0D,Average1D, Average2D boxes)

Will average event data either over an infinite time ("infinite" checked in configuration) or a settable finite number of events (generated internally using the PickN pattern).  For the finite case be careful not to set N too large as it can consume a lot of memory.  The PickN produces an array of numbers/1D-arrays/2D-arrays which are, respectively, 1D/2D/3D arrays (with "time" being the added dimension).  AMI allows you to average over any of those axes, but users will typically want to average over axis=0 ("time"), unless you want to see an average projection (axis=1 or 2) vs. time.

PickN

Use-case summary: Use PickN when you need to gather a precise number of events from multiple workers.  It is unusual for users to use this pattern: it is typically only used internally by AMI.  The ami graph output will update only when N events have been collected.  Note that there is some arithmetic rounding depending on the number of events requested and the number of workers running parallel.

PickN accumulates N events into a list. The GUI node is expanded into 3 underlying PickN graph nodes each with a color of worker, localCollector, and globalCollector. The underlying graph nodes have a different N.

Workers collect: N // num_workers

localCollectors collect: N // num_local_collectors

globalCollector collects: (N // num_workers) * num_workers

In the above example there are 4 workers and 2 localCollectors, if we wanted to collect N = ~25 events then each worker would collect 25 // 4 = 6 events, each localCollector would collect 12 "events" from the workers (ie 2 lists of length 6 each),  and the globalCollector would collect (25 // 4) * 4 = 24 "events" from the localCollectors (ie 2 lists of length 12 each).

The PickN will only update when N events have been collected. There is no guarantee on the order in which events are collected. 

RollingBuffer

Use-case summary: Use RollingBuffer like a circular buffer (or first-in-first-out buffer) that keeps a finite number of events.  The ami graph output updates every event.

RollingBuffer works similarly to the PickN, only it accumulates to a circular buffer, which shifts left after N elements have been accumulated. Unlike the PickN which only returns when N events have collected, the RollingBuffer returns after event.

For example assume N = 10 and we have 2 workers and 2 localCollectors. The workers and local collectors will both collect 5 events each and the global collector will collect 10 events. Once 10 events have been collected all elements in the list will shift left and the latest event will appear in the buffer at index 9.

worker1 buffer: [1, 2, 3, 4, 5]

worker2 buffer: [6, 7, 8, 9, 10]

localCollector1 buffer: [1, 2, 3, 4, 5]

localCollector2 buffer: [6, 7, 8, 9, 10]

globalCollector: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

worker1 receives NEW EVENT 11

worker1 buffer: [2, 3, 4, 5, 11]

worker2 buffer: [6, 7, 8, 9, 10]

localCollector1 buffer: [2, 3, 4, 5, 11]

localCollector2 buffer: [6, 7, 8, 9, 10]

globalCollector: [6, 7, 8, 9, 10, 2, 3, 4, 5, 11]

Accumulator

An accumulator takes two functions as an argument, a reduction and a res_factory that is called on reset. It accumulates into self.res on each event in the following way: self.res = reduction(self.res, *args) at the end of a heartbeat workers and local collectors reset self.res by calling self.res_factory.

The accumulator is independent of the number of workers and local collectors, but consider the following two worker, two local collector example.

from ami.graphkit_wrapper import Graph
import ami.graph_nodes as gn


def make_graph():                                                                                                      
    def reduction(res, *rest):                                                                                         
        print(res, rest)                                                                                               
        return res+rest[0]                                                                                             
                                                                                                                       
    def res_factory():                                                                                                 
        return 0                                                                                                       
                                                                                                                       
    graph = Graph(name='graph')                                                                                        
    graph.add(gn.Accumulator(name='accumulated', inputs=['delta_t'], outputs=['accumulated_delta_t'],                  
                             reduction=reduction, res_factory=res_factory))                                             
    graph.compile(num_workers=2, num_local_collectors=2)                                                               
    return graph                                                                                                       


graph = make_graph()
graph({'delta_t': 1}, color='worker')
# 0 (1,)
graph({'delta_t': 2}, color='worker')
# 1 (2, )
graph({'delta_t': 3}, color='worker')
# 3 (3, )
graph({'delta_t': 4}, color='worker')
# 6 (4, )
worker1 = graph({'delta_t': 5}, color='worker')
# 10 (5, )
print("worker1 done")

graph2 = make_graph()
for i in range(6, 11):                                                                                                 
    worker2 = graph2({'delta_t': i}, color='worker')                                                                   
print("worker2 done")

graph3 = make_graph()
localCollector1 = graph3(worker1, color='localCollector')
# 0 (15, )
print("localCollector1 done")

graph4 = make_graph()
localCollector2 = graph4(worker2, color='localCollector')
# 0 (40, )
print("localCollector2 done")

graph5 = make_graph()
globalCollector = graph5(localCollector1, color='globalCollector')
# 0 (15, )
print("globalCollector")

globalCollector = graph5(localCollector2, color='globalCollector')
# 15 (40, )
print("globalCollector")
print(globalCollector)
# {'accumulated_delta_t': 55}

For more complicated examples look at the to_operation method of the Binning2D and Average1D in the Numpy section of AMI:

https://github.com/slac-lcls/ami/blob/master/ami/flowchart/library/Numpy.py#L118-L149

https://github.com/slac-lcls/ami/blob/master/ami/flowchart/library/Numpy.py#L506-L528

ReduceByKey

ReduceByKey takes a key and value and reduction function and reduces into a dictionary in the following way: if key in self.res: self.res[key] = reduction(self.res[key], value) else: self.res[key] = value

Similar to Accumulator, it is independent of the number of workers and local collectors and resets at the end of a heartbeat on workers and local collectors. To illustrate consider the following example:

from ami.graphkit_wrapper import Graph
import ami.graph_nodes as gn


def reduction(res, *rest):                                                                                             
    print(res, rest)                                                                                                   
    return res+rest[0]                                                                                                 

def res_factory():                                                                                                     
    return 0                                                                                                           

graph = Graph(name='graph')
graph.add(gn.ReduceByKey(name='reduced', inputs=['key', 'value'], outputs=['accumulated'],                             
                         reduction=reduction))
graph.compile(num_workers=2, num_local_collectors=2)

graph({'key': 1, 'value': 3}, color='worker') 
graph({'key': 2, 'value': 7}, color='worker')
worker = graph({'key': 1, 'value': 13}, color='worker') # 3 (13, )
print("worker:", worker)  # worker: {'accumulated_worker': {1: 16, 2: 7}}
graph(worker, color='localCollector')  
graph(worker, color='localCollector')  
localCollector = graph(worker, color='localCollector') 
print("localCollector:", localCollector)  # localCollector: {'accumulated_localCollector': {1: 48, 2: 21}}
graph(localCollector, color='globalCollector')
globalCollector = graph(localCollector, color='globalCollector')
print("globalCollector:", globalCollector)  # globalCollector: {'accumulated': {1: 96, 2: 42}}

For more complicated examples look at MeanVsScan and MeanWaveformVsScan in the Operators section of AMI:

https://github.com/slac-lcls/ami/blob/master/ami/flowchart/library/Operators.py#L102-L120

https://github.com/slac-lcls/ami/blob/master/ami/flowchart/library/Operators.py#L149-L184

Current Global Operations in AMI

The following operations in AMI use one of the above accumulations and are considered global operations:

  • Average0D, Average1D, Average2D
  • Binning, Binning2D
  • MeanVsScan, MeanWaveformVsScan
  • Accumulator, Pick1, PickN, ReduceByKey, RollingBuffer

Any node which follows a global operation will only run on the global collector.

Accumulator Example

Example includes:

  • ReduceByKey
  • Accumulator
  • RollingBuffer
  • PickN
ami-local -l /reg/g/psdm/tutorials/ami2/random/accumulator.fc random:///reg/g/psdm/tutorials/ami2/random/worker.json




  • No labels