Page History
...
Code Block |
---|
from ami.graphkit_wrapper import Graph import ami.graph_nodes as gn def make_graph(): def reduction(res, *rest): print(res, rest) return res+rest[0] def res_factory(): return 0 graph = Graph(name='graph') graph.add(gn.Accumulator(name='accumulated', inputs=['delta_t'], outputs=['accumulated_delta_t'], reduction=reduction, res_factor=res_factory)) graph.compile(num_workers=2, num_local_collectors=2) return graph graph = make_graph() graph({'delta_t': 1}, color='worker') # 0 (1,) graph({'delta_t': 2}, color='worker') # 1 (2, ) graph({'delta_t': 3}, color='worker') # 3 (3, ) graph({'delta_t': 4}, color='worker') # 6 (4, ) worker1 = graph({'delta_t': 5}, color='worker') # 10 (5, ) print("worker1 done") graph2 = make_graph() for i in range(6, 11): worker2 = graph2({'delta_t': i}, color='worker') print("worker2 done") graph3 = make_graph() localCollector1 = graph3(worker1, color='localCollector') # 0 (15, ) print("localCollector1 done") graph4 = make_graph() localCollector2 = graph4(worker2, color='localCollector') # 0 (40, ) print("localCollector2 done") graph5 = make_graph() globalCollector = graph5(localCollector1, color='globalCollector') # 0 (15, ) print("globalCollector") globalCollector = graph5(localCollector2, color='globalCollector') # 15 (40, ) print("globalCollector") print(globalCollector) # {'accumulated_delta_t': 55} |
More complicated examples look at the to_operation method of the Binning2D and Average1D in the Numpy section of AMI:
https://github.com/slac-lcls/ami/blob/master/ami/flowchart/library/Numpy.py#L118-L149
https://github.com/slac-lcls/ami/blob/master/ami/flowchart/library/Numpy.py#L506-L528
ReduceByKey
Overview
Content Tools