Page History
...
Code Block |
---|
from ami.graphkit_wrapper import Graph import ami.graph_nodes as gn def make_graph(): def reduction(res, *rest): print(res, rest) return res+rest[0] def res_factory(): return 0 graph = Graph(name='graph') graph.add(gn.Accumulator(name='accumulated', inputs=['delta_t'], outputs=['accumulated_delta_t'], reduction=reduction, res_factorfactory=res_factory)) graph.compile(num_workers=2, num_local_collectors=2) return graph graph = make_graph() graph({'delta_t': 1}, color='worker') # 0 (1,) graph({'delta_t': 2}, color='worker') # 1 (2, ) graph({'delta_t': 3}, color='worker') # 3 (3, ) graph({'delta_t': 4}, color='worker') # 6 (4, ) worker1 = graph({'delta_t': 5}, color='worker') # 10 (5, ) print("worker1 done") graph2 = make_graph() for i in range(6, 11): worker2 = graph2({'delta_t': i}, color='worker') print("worker2 done") graph3 = make_graph() localCollector1 = graph3(worker1, color='localCollector') # 0 (15, ) print("localCollector1 done") graph4 = make_graph() localCollector2 = graph4(worker2, color='localCollector') # 0 (40, ) print("localCollector2 done") graph5 = make_graph() globalCollector = graph5(localCollector1, color='globalCollector') # 0 (15, ) print("globalCollector") globalCollector = graph5(localCollector2, color='globalCollector') # 15 (40, ) print("globalCollector") print(globalCollector) # {'accumulated_delta_t': 55} |
...
https://github.com/slac-lcls/ami/blob/master/ami/flowchart/library/Numpy.py#L506-L528
ReduceByKey
ReduceByKey takes a key and value and reduction and reduces into a dictionary in the following way: if key in self.res: self.res[key] = reduction(self.res[key], value) else: self.res[key] = value
Similar to Accumulator, it is independent of the number of workers and local collectors and resets at the end of a heartbeat on workers and local collectors. To illustrate consider the following example:
Code Block |
---|
from ami.graphkit_wrapper import Graph
import ami.graph_nodes as gn
def reduction(res, *rest):
print(res, rest)
return res+rest[0]
def res_factory():
return 0
graph = Graph(name='graph')
graph.add(gn.ReduceByKey(name='reduced', inputs=['key', 'value'], outputs=['accumulated'],
reduction=reduction))
graph.compile(num_workers=2, num_local_collectors=2)
graph({'key': 1, 'value': 3}, color='worker')
graph({'key': 2, 'value': 7}, color='worker')
worker = graph({'key': 1, 'value': 13}, color='worker') # 3 (13, )
print("worker:", worker) # worker: {'accumulated_worker': {1: 16, 2: 7}}
graph(worker, color='localCollector')
graph(worker, color='localCollector')
localCollector = graph(worker, color='localCollector')
print("localCollector:", localCollector) # localCollector: {'accumulated_localCollector': {1: 48, 2: 21}}
graph(localCollector, color='globalCollector')
globalCollector = graph(localCollector, color='globalCollector')
print("globalCollector:", globalCollector) # globalCollector: {'accumulated': {1: 96, 2: 42}}
|
For more complicated examples look at MeanVsScan and MeanWaveformVsScan in the Operators section of AMI:
https://github.com/slac-lcls/ami/blob/master/ami/flowchart/library/Operators.py#L102-L120
https://github.com/slac-lcls/ami/blob/master/ami/flowchart/library/Operators.py#L149-L184