Page History
...
globalCollector: [6, 7, 8, 9, 10, 2, 3, 4, 5, 11]
...
ReduceByKey
An accumulator takes two functions as an argument, a reduction and a res_factory that is called on reset. It accumulates into self.res on each event ReduceByKey takes a key and value and reduction function and reduces into a dictionary in the following way: if key in self.res: self.res[key] = reduction(self.res[key], *args) at the end of a heartbeat workers and local collectors reset self.res by calling self.res_factory.value) else: self.res[key] = value
Similar to Accumulator, it The accumulator is independent of the number of workers and local collectors , but and resets at the end of a heartbeat on workers and local collectors. To illustrate consider the following two worker, two local collector example.example:
Code Block | ||||
---|---|---|---|---|
| ||||
from ami.graphkit_wrapper import Graph import ami.graph_nodes as gn def make_graph(reduction(res, *rest): def reductionprint(res, *rest): print(res, rest) return res+rest[0] return res+rest[0] def res_factory(): return 0 graph def res_factory(): = Graph(name='graph') graph.add(gn.ReduceByKey(name='reduced', inputs=['key', 'value'], outputs=['accumulated'], reduction=reduction)) graph.compile(num_workers=2, num_local_collectors=2) graph({'key': 1, 'value': 3}, color='worker') graph({'key': 2, 'value': 7}, color='worker') worker = graph({'key': 1, 'value': 13}, color='worker') # 3 (13, ) print("worker:", worker) # worker: {'accumulated_worker': {1: 16, 2: 7}} graph(worker, color='localCollector') graph(worker, color='localCollector') localCollector = graph(worker, color='localCollector') print("localCollector:", localCollector) # localCollector: {'accumulated_localCollector': {1: 48, 2: 21}} graph(localCollector, color='globalCollector') globalCollector = graph(localCollector, color='globalCollector') print("globalCollector:", globalCollector) # globalCollector: {'accumulated': {1: 96, 2: 42}} |
For more complicated examples look at MeanVsScan and MeanWaveformVsScan in the Operators section of AMI:
https://github.com/slac-lcls/ami/blob/master/ami/flowchart/library/Operators.py#L102-L120
https://github.com/slac-lcls/ami/blob/master/ami/flowchart/library/Operators.py#L149-L184
Accumulator
An accumulator takes two functions as an argument, a reduction and a res_factory that is called on reset. It accumulates into self.res on each event in the following way: self.res = reduction(self.res, *args) at the end of a heartbeat workers and local collectors reset self.res by calling self.res_factory.
The accumulator is independent of the number of workers and local collectors, but consider the following two worker, two local collector example.
Code Block | ||||
---|---|---|---|---|
| ||||
from ami.graphkit_wrapper import Graph import ami.graph_nodes as gn def make_graph(): return 0 graph = Graph(name='graph') def reduction(res, *rest): graph.add(gn.Accumulator(name='accumulated', inputs=['delta_t'], outputs=['accumulated_delta_t'], print(res, rest) reduction=reduction, res_factory=res_factory)) graph.compile(num_workers=2, num_local_collectors=2) return res+rest[0] return graph graph = make_graph() graph({'delta_t': 1}, color='worker') # 0 (1,) graph({'delta_t': 2}, color='worker') # 1 (2, ) graph({'delta_t': 3}, color='worker') # 3 (3, ) graph({'delta_t': 4}, color='worker') # 6 (4, ) worker1 = graph({'delta_t': 5}, color='worker') # 10 (5, ) print("worker1 done") graph2 = make_graph() for i in range(6, 11): def res_factory(): worker2 = graph2({'delta_t': i}, color='worker') return 0 print("worker2 done") graph3 = make_graph() localCollector1 = graph3(worker1, color='localCollector') # 0 (15, ) print("localCollector1 done") graph4 = make_graph() localCollector2 = graph4(worker2, color='localCollector') # 0 (40, ) print("localCollector2 done") graph5 = make_graph() globalCollector = graph5(localCollector1, color='globalCollector') # 0 (15, ) print("globalCollector") globalCollector = graph5(localCollector2, color='globalCollector') # 15 (40, ) print("globalCollector") print(globalCollector) # {'accumulated_delta_t': 55} |
For more complicated examples look at the to_operation method of the Binning2D and Average1D in the Numpy section of AMI:
https://github.com/slac-lcls/ami/blob/master/ami/flowchart/library/Numpy.py#L118-L149
https://github.com/slac-lcls/ami/blob/master/ami/flowchart/library/Numpy.py#L506-L528
ReduceByKey
ReduceByKey takes a key and value and reduction function and reduces into a dictionary in the following way: if key in self.res: self.res[key] = reduction(self.res[key], value) else: self.res[key] = value
Similar to Accumulator, it is independent of the number of workers and local collectors and resets at the end of a heartbeat on workers and local collectors. To illustrate consider the following example:
Code Block | ||||
---|---|---|---|---|
| ||||
from ami.graphkit_wrapper import Graph import ami.graph_nodes as gn def reduction(res, *rest): graph = Graph(name='graph') print(res, rest) graph.add(gn.Accumulator(name='accumulated', inputs=['delta_t'], outputs=['accumulated_delta_t'], return res+rest[0] reduction=reduction, res_factory=res_factory)) graph.compile(num_workers=2, num_local_collectors=2) def res_factory(): return graph return 0 graph = make_graph() graph({'delta_t': 1}, color='worker') # 0 (1,) graph({'delta_t': 2}, color='worker') # 1 (2, ) graph({'delta_t': 3}, color='worker') # 3 (3, ) graph({'delta_t': 4}, color='worker') # 6 (4, ) worker1 = graph({'delta_t': 5}, color='worker') # 10 (5, ) print("worker1 done") graph2 = make_graph() for i in range(6, 11): graph = Graph(name='graph') graph.add(gn.ReduceByKey(name='reduced', inputs=['key', 'value'], outputs=['accumulated'], reduction=reduction)) graph.compile(num_workers=2, num_local_collectors=2) graph({'key': 1, 'value': 3}, color='worker') graph({'key': 2, 'value': 7 worker2 = graph2({'delta_t': i}, color='worker') worker = graph({'key': 1, 'value': 13}, color='worker') # 3 (13, ) print("worker:", worker) # worker: {'accumulated_worker': {1: 16, 2: 7}} graph(worker print("worker2 done") graph3 = make_graph() localCollector1 = graph3(worker1, color='localCollector') # 0 (15, ) print("localCollector1 done") graph4 = make_graph(worker) localCollector2 = graph4(worker2, color='localCollector') localCollector) # 0 (40, ) print("localCollector2 done") graph5 = make_graph() globalCollector = graphgraph5(workerlocalCollector1, color='localCollectorglobalCollector') print("localCollector:", localCollector) # localCollector: {'accumulated_localCollector': {1: 48, 2: 21}} graph(localCollector, color='globalCollector')0 (15, ) print("globalCollector") globalCollector = graphgraph5(localCollectorlocalCollector2, color='globalCollector') # 15 (40, ) print("globalCollector:", ") print(globalCollector) # globalCollector: {'accumulated_delta_t': {1: 96, 2: 42}} 55} |
For more complicated examples look at MeanVsScan and MeanWaveformVsScan the to_operation method of the Binning2D and Average1D in the Operators Numpy section of AMI:
https://github.com/slac-lcls/ami/blob/master/ami/flowchart/library/OperatorsNumpy.py#L102py#L118-L120L149
https://github.com/slac-lcls/ami/blob/master/ami/flowchart/library/OperatorsNumpy.py#L149py#L506-L184L528
Accumulator Example
Example includes:
- ReduceByKey
- Accumulator
- RollingBuffer
- PickN
...