Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The accumulator is independent of the number of workers and local collectors, but consider the following two worker, two local collector example.

Code Block
languagepy
linenumberstrue
from ami.graphkit_wrapper import Graph
import ami.graph_nodes as gn


def make_graph():                                                                                                      
    def reduction(res, *rest):                                                                                         
        print(res, rest)                                                                                               
        return res+rest[0]                                                                                             
                                                                                                                       
    def res_factory():                                                                                                 
        return 0                                                                                                       
                                                                                                                       
    graph = Graph(name='graph')                                                                                        
    graph.add(gn.Accumulator(name='accumulated', inputs=['delta_t'], outputs=['accumulated_delta_t'],                  
                             reduction=reduction, res_factory=res_factory))                                             
    graph.compile(num_workers=2, num_local_collectors=2)                                                               
    return graph                                                                                                       


graph = make_graph()
graph({'delta_t': 1}, color='worker')
# 0 (1,)
graph({'delta_t': 2}, color='worker')
# 1 (2, )
graph({'delta_t': 3}, color='worker')
# 3 (3, )
graph({'delta_t': 4}, color='worker')
# 6 (4, )
worker1 = graph({'delta_t': 5}, color='worker')
# 10 (5, )
print("worker1 done")

graph2 = make_graph()
for i in range(6, 11):                                                                                                 
    worker2 = graph2({'delta_t': i}, color='worker')                                                                   
print("worker2 done")

graph3 = make_graph()
localCollector1 = graph3(worker1, color='localCollector')
# 0 (15, )
print("localCollector1 done")

graph4 = make_graph()
localCollector2 = graph4(worker2, color='localCollector')
# 0 (40, )
print("localCollector2 done")

graph5 = make_graph()
globalCollector = graph5(localCollector1, color='globalCollector')
# 0 (15, )
print("globalCollector")

globalCollector = graph5(localCollector2, color='globalCollector')
# 15 (40, )
print("globalCollector")
print(globalCollector)
# {'accumulated_delta_t': 55}

...

Similar to Accumulator, it is independent of the number of workers and local collectors and resets at the end of a heartbeat on workers and local collectors. To illustrate consider the following example:

Code Block
languagepy
linenumberstrue
from ami.graphkit_wrapper import Graph
import ami.graph_nodes as gn


def reduction(res, *rest):                                                                                             
    print(res, rest)                                                                                                   
    return res+rest[0]                                                                                                 

def res_factory():                                                                                                     
    return 0                                                                                                           

graph = Graph(name='graph')
graph.add(gn.ReduceByKey(name='reduced', inputs=['key', 'value'], outputs=['accumulated'],                             
                         reduction=reduction))
graph.compile(num_workers=2, num_local_collectors=2)

graph({'key': 1, 'value': 3}, color='worker') 
graph({'key': 2, 'value': 7}, color='worker')
worker = graph({'key': 1, 'value': 13}, color='worker') # 3 (13, )
print("worker:", worker)  # worker: {'accumulated_worker': {1: 16, 2: 7}}
graph(worker, color='localCollector')  
graph(worker, color='localCollector')  
localCollector = graph(worker, color='localCollector') 
print("localCollector:", localCollector)  # localCollector: {'accumulated_localCollector': {1: 48, 2: 21}}
graph(localCollector, color='globalCollector')
globalCollector = graph(localCollector, color='globalCollector')
print("globalCollector:", globalCollector)  # globalCollector: {'accumulated': {1: 96, 2: 42}}

...