Page History
Table of Contents |
---|
Background
When setting up AMI graphs one of the more complex issues is gathering results from the multiple nodes/cores that are receiving the events. This gather is done at a "heartbeat" rate (typically 1-10Hz) that uses the event timestamps to determine if enough time has passed so that a gather operation should happen.
...
Before describing how to accumulate events in AMI, it is important to understand the architecture of AMI, which consists of workers, local collectors, a global collector, a manager, and a client processes that run on different compute nodes.
NOTE: Any node which follows a global operation will only run on the global collector and so does not currently scale to multiple cores. To avoid performance issues try to keep these operations lightweight.
The client assembles a graph which is pushed to the manager that then distributes it to the workers and collectors for execution. Nodes in the graph are "colored" worker, localCollector, and globalCollector. Each type of process is responsible for executing a subgraph of nodes of the appropriate color (ie workers execute nodes with the color worker, and similarly for the local collector and global collector). Events in AMI are distributed to workers by Psana. AMI provides different types of graph nodes for accumulating events across this distributed architecture. Nodes which accumulate events are called global operations and GUI graph nodes are expanded into 3 underlying graph nodes of the same type but with different colors.
Gliffy Diagram | ||||||||
---|---|---|---|---|---|---|---|---|
|
...
Global Operations
...
Commonly Used By Users
Average0D, Average1D, Average2D
...
These global operations are typically used internally by AMI:
- Accumulator, Pick1, PickN, ReduceByKey, RollingBuffer
Any node which follows a global operation will only run on the global collector.
Average0D, Average1D, Average2D
Will average event data either Will average event data either over an infinite time ("infinite" checked in configuration) or a settable finite number of events (generated internally using the SumN pattern). For the finite case be careful not to set N too large as it can consume a lot of memory. The PickN produces an array of numbers/1D-arrays/2D-arrays which are, respectively, 1D/2D/3D arrays (with "time" being the added dimension). AMI allows you to average over any of those axes, but users will typically want to average over axis=0 ("time"), unless you want to see an average projection (axis=1 or 2) vs. time.
This pattern is similar to an MPI "reduce" operation.
Binning, Binning2D
Used to generate 1D and 2D summed histograms from per-event x (1D) or x/y (2D) coordinates. This is an infinite sum, i.e. there is currently no support for a time window. Internally uses the Accumulator pattern (see below). See the numpy.histogram documentation for a description of the "density" parameter. This This pattern is similar to an MPI "reduce" operation.
...
MeanVsScan, MeanWaveformVsScan
Patterns Use Internally By AMI
PickN
Use-case summary: Use PickN when you need to gather a precise number of events from multiple workers. It is unusual for users to directly use this pattern: it is typically only used internally by AMI. The ami graph output will update only when N events have been collected. Note that there is some arithmetic rounding depending on the number of events requested and the number of workers running parallel. This pattern (and RollingBuffer, below) are similar.
Be careful to not set N too large if the items being selected are also large (e.g. camera images) since that can use up a lot of memory in worker processes and local/global collector processes.
PickN accumulates N events into a list. The GUI node is expanded into 3 underlying PickN graph nodes each with a color of worker, localCollector, and globalCollector. The underlying graph nodes have a different N.
Workers collect: N // num_workers
localCollectors collect: N // num_local_collectors
globalCollector collects: (N // num_workers) * num_workers
Useful for either step-scans or fly-scans. Compute the mean of a value or waveform as a function of step number (for a step-scan) or the fly-scanned value (for a fly-scan). Internally uses the ReduceByKey pattern below.
SumN
Sum N events with a pattern very similar to the PickN pattern. See the PickN section below for some integer-arithmetic related subtleties. This pattern is similar to an MPI "reduce" operation.
Global Operations Typically Used Internally By AMI
PickN
Use-case summary: Use PickN when you need to gather a precise number of events from multiple workers. It is unusual for users to directly use this pattern: it is typically only used internally by AMI. The ami graph output will update only when N events have been collected. Note that there is some arithmetic rounding depending on the number of events requested and the number of workers running parallel. This pattern (and RollingBuffer, below) are similar.
Be careful to not set N too large if the items being selected are also large (e.g. camera images) since that can use up a lot of memory in worker processes and local/global collector processes.
PickN accumulates N events into a list. The GUI node is expanded into 3 underlying PickN graph nodes each with a color of worker, localCollector, and globalCollector. The underlying graph nodes have a different N.
Workers collect: N // num_workers
localCollectors collect: N // num_local_collectors
globalCollector collects: (N // num_workers) * num_workers
In the above example there are 4 workers and 2 localCollectors, if we wanted to collect N = ~25 events then each worker would collect 25 // In the above example there are 4 workers and 2 localCollectors, if we wanted to collect N = ~25 events then each worker would collect 25 // 4 = 6 events, each localCollector would collect 12 "events" from the workers (ie 2 lists of length 6 each), and the globalCollector would collect (25 // 4) * 4 = 24 "events" from the localCollectors (ie 2 lists of length 12 each).
...
This pattern is similar to an MPI "gather" operation (gather a list of items).
RollingBuffer
Use-case summary: similar to PickN. Use RollingBuffer like a circular buffer (or first-in-first-out buffer) that keeps a finite number of events. Unlike PickN the ami graph output updates every event.
...
globalCollector: [6, 7, 8, 9, 10, 2, 3, 4, 5, 11]
...
ReduceByKey
An accumulator takes two functions as an argument, a reduction and a res_factory that is called on reset. It accumulates into self.res on each event ReduceByKey takes a key and value and reduction function and reduces into a dictionary in the following way: if key in self.res: self.res[key] = reduction(self.res[key], *args) at the end of a heartbeat workers and local collectors reset self.res by calling self.res_factory.value) else: self.res[key] = value
Similar to Accumulator, it The accumulator is independent of the number of workers and local collectors and resets at the end of a heartbeat on workers and local collectors, but . To illustrate consider the following two worker, two local collector example.example:
Code Block | ||||
---|---|---|---|---|
| ||||
from ami.graphkit_wrapper import Graph import ami.graph_nodes as gn def make_graph(reduction(res, *rest): print(res, rest) def reduction(res, *rest): return res+rest[0] print(res, rest) def res_factory(): return res+rest[0] return 0 def res_factory(): graph = Graph(name='graph') graph.add(gn.ReduceByKey(name='reduced', inputs=['key', 'value'], outputs=['accumulated'], reduction=reduction)) graph.compile(num_workers=2, num_local_collectors=2) graph({'key': 1, 'value': 3}, color='worker') graph({'key': 2, 'value': 7}, color='worker') worker = graph({'key': 1, 'value': 13}, color='worker') # 3 (13, ) print("worker:", worker) # worker: {'accumulated_worker': {1: 16, 2: 7}} graph(worker, color='localCollector') graph(worker, color='localCollector') localCollector = graph(worker, color='localCollector') print("localCollector:", localCollector) # localCollector: {'accumulated_localCollector': {1: 48, 2: 21}} graph(localCollector, color='globalCollector') globalCollector = graph(localCollector, color='globalCollector') print("globalCollector:", globalCollector) # globalCollector: {'accumulated': {1: 96, 2: 42}} |
For more complicated examples look at MeanVsScan and MeanWaveformVsScan in the Operators section of AMI:
https://github.com/slac-lcls/ami/blob/master/ami/flowchart/library/Operators.py#L102-L120
https://github.com/slac-lcls/ami/blob/master/ami/flowchart/library/Operators.py#L149-L184
Accumulator
An accumulator takes two functions as an argument, a reduction and a res_factory that is called on reset. It accumulates into self.res on each event in the following way: self.res = reduction(self.res, *args) at the end of a heartbeat workers and local collectors reset self.res by calling self.res_factory.
The accumulator is independent of the number of workers and local collectors, but consider the following two worker, two local collector example.
Code Block | ||||
---|---|---|---|---|
| ||||
from ami.graphkit_wrapper import Graph import ami.graph_nodes as gn def make_graph(): return 0 graph = Graph(name='graph') def reduction(res, *rest): graph.add(gn.Accumulator(name='accumulated', inputs=['delta_t'], outputs=['accumulated_delta_t'], print(res, rest) reduction=reduction, res_factory=res_factory)) graph.compile(num_workers=2, num_local_collectors=2) return res+rest[0] return graph graph = make_graph() graph({'delta_t': 1}, color='worker') # 0 (1,) graph({'delta_t': 2}, color='worker') # 1 (2, ) graph({'delta_t': 3}, color='worker') # 3 (3, ) graph({'delta_t': 4}, color='worker') # 6 (4, ) worker1 = graph({'delta_t': 5}, color='worker') # 10 (5, ) print("worker1 done") graph2 = make_graph() for i in range(6, 11): def res_factory(): worker2 = graph2({'delta_t': i}, color='worker') return 0 print("worker2 done") graph3 = make_graph() localCollector1 = graph3(worker1, color='localCollector') # 0 (15, ) print("localCollector1 done") graph4 = make_graph() localCollector2 = graph4(worker2, color='localCollector') # 0 (40, ) print("localCollector2 done") graph5 = make_graph() globalCollector = graph5(localCollector1, color='globalCollector') # 0 (15, ) print("globalCollector") globalCollector = graph5(localCollector2, color='globalCollector') # 15 (40, ) print("globalCollector") print(globalCollector) # {'accumulated_delta_t': 55} |
For more complicated examples look at the to_operation method of the Binning2D and Average1D in the Numpy section of AMI:
https://github.com/slac-lcls/ami/blob/master/ami/flowchart/library/Numpy.py#L118-L149
https://github.com/slac-lcls/ami/blob/master/ami/flowchart/library/Numpy.py#L506-L528
ReduceByKey
ReduceByKey takes a key and value and reduction function and reduces into a dictionary in the following way: if key in self.res: self.res[key] = reduction(self.res[key], value) else: self.res[key] = value
Similar to Accumulator, it is independent of the number of workers and local collectors and resets at the end of a heartbeat on workers and local collectors. To illustrate consider the following example:
Code Block | ||||
---|---|---|---|---|
| ||||
from ami.graphkit_wrapper import Graph import ami.graph_nodes as gn def reduction(res, *rest): graph = Graph(name='graph') print(res, rest) graph.add(gn.Accumulator(name='accumulated', inputs=['delta_t'], outputs=['accumulated_delta_t'], return res+rest[0] reduction=reduction, res_factory=res_factory)) graph.compile(num_workers=2, num_local_collectors=2) def res_factory(): return graph return 0 graph = make_graph() graph({'delta_t': 1}, color='worker') # 0 (1,) graph({'delta_t': 2}, color='worker') # 1 (2, ) graph({'delta_t': 3}, color='worker') # 3 (3, ) graph({'delta_t': 4}, color='worker') # 6 (4, ) worker1 = graph({'delta_t': 5}, color='worker') # 10 (5, ) print("worker1 done") graph2 = make_graph() for i in range(6, 11): graph = Graph(name='graph') graph.add(gn.ReduceByKey(name='reduced', inputs=['key', 'value'], outputs=['accumulated'], reduction=reduction)) graph.compile(num_workers=2, num_local_collectors=2) graph({'key': 1, 'value': 3}, color='worker') graph({'key': 2, 'value': 7 worker2 = graph2({'delta_t': i}, color='worker') worker = graph({'key': 1, 'value': 13}, color='worker') # 3 (13, ) print("worker:", worker) # worker: {'accumulated_worker': {1: 16, 2: 7}} graph(worker print("worker2 done") graph3 = make_graph() localCollector1 = graph3(worker1, color='localCollector') # 0 (15, ) print("localCollector1 done") graph4 = make_graph(worker) localCollector2 = graph4(worker2, color='localCollector') localCollector) # 0 (40, ) print("localCollector2 done") graph5 = make_graph() globalCollector = graphgraph5(workerlocalCollector1, color='localCollectorglobalCollector') print("localCollector:", localCollector) # localCollector: {'accumulated_localCollector': {1: 48, 2: 21}} graph(localCollector, color='globalCollector')0 (15, ) print("globalCollector") globalCollector = graphgraph5(localCollectorlocalCollector2, color='globalCollector') # 15 (40, ) print("globalCollector:", ") print(globalCollector) # globalCollector: {'accumulated_delta_t': {1: 96, 2: 42}} 55} |
For more complicated examples look at MeanVsScan and MeanWaveformVsScan the to_operation method of the Binning2D and Average1D in the Operators Numpy section of AMI:
https://github.com/slac-lcls/ami/blob/master/ami/flowchart/library/OperatorsNumpy.py#L102py#L118-L120L149
https://github.com/slac-lcls/ami/blob/master/ami/flowchart/library/OperatorsNumpy.py#L149py#L506-L184L528
Accumulator Example
Example includes:
- ReduceByKey
- Accumulator
- RollingBuffer
- PickN
...