Actors

Manager

class Manager(logdir: Union[pathlib.Path, str], port: int = 9000, publish_logs_via_zmq: bool = False, enable_api: bool = True, **kwargs)
async async_collect(unzip: bool = True) bool
async async_commit(graph: chimerapy.engine.graph.Graph, mapping: Dict[str, List[str]], context: Literal['multiprocessing', 'threading'] = 'multiprocessing', send_packages: Optional[List[Dict[str, Any]]] = None) bool

Committing Graph to the cluster.

Committing refers to how the graph itself (with its nodes and edges) and the mapping is distributed to the cluster. The whole routine is two steps: peer creation and peer-to-peer connection setup.

In peer creation, the Manager messages each Worker with the Nodes they need to execute. The Workers configure the Nodes, by giving them network information. The Nodes are then started and report back to the Workers.

With the successful creation of the Nodes, the Manager request the Nodes servers’ ip address and port numbers to create an address table for all the Nodes. Then this table is used to inform each Node where their in-bound and out-bound Nodes are located; thereby establishing the edges between Nodes.

Parameters
  • graph (cp.Graph) – The graph to deploy within the cluster.

  • mapping (Dict[str, List[str]) – Mapping from cp.Worker to cp.Nodes through a dictionary. The keys are the name of the workers, while the value is a list of the nodes’ names.

  • send_packages (Optional[List[Dict[str, Any]]]) – An optional feature for transferring a local package (typically a development package not found via PYPI or Anaconda). Provide a list of packages with each package configured via dictionary with the following key-value pairs: name:str and path:pathlit.Path.

Returns

Success in cluster’s setup

Return type

bool

async async_gather() Dict
async async_record() bool
async async_request_registered_method(node_id: str, method_name: str, params: Dict[str, Any] = {}) Dict[str, Any]
async async_reset(keep_workers: bool = True)
async async_shutdown() bool
async async_start() bool
async async_stop() bool
async async_zeroconf(enable: bool = True) bool
collect(unzip: bool = True) concurrent.futures._base.Future[bool]

Collect data from the Workers

First, we wait until all the Nodes have finished save their data. Then, manager request that Nodes’ from the Workers.

Parameters

unzip (bool) – Should the .zip archives be extracted.

Returns

Future of success in collect data from Workers

Return type

Future[bool]

commit_graph(graph: chimerapy.engine.graph.Graph, mapping: Dict[str, List[str]], context: Literal['multiprocessing', 'threading'] = 'multiprocessing', send_packages: Optional[List[Dict[str, Any]]] = None) concurrent.futures._base.Future[bool]

Committing Graph to the cluster.

Committing refers to how the graph itself (with its nodes and edges) and the mapping is distributed to the cluster. The whole routine is two steps: peer creation and peer-to-peer connection setup.

In peer creation, the Manager messages each Worker with the Nodes they need to execute. The Workers configure the Nodes, by giving them network information. The Nodes are then started and report back to the Workers.

With the successful creation of the Nodes, the Manager request the Nodes servers’ ip address and port numbers to create an address table for all the Nodes. Then this table is used to inform each Node where their in-bound and out-bound Nodes are located; thereby establishing the edges between Nodes.

Parameters
  • graph (cp.Graph) – The graph to deploy within the cluster.

  • mapping (Dict[str, List[str]) – Mapping from cp.Worker to cp.Nodes through a dictionary. The keys are the name of the workers, while the value is a list of the nodes’ names.

  • send_packages (Optional[List[Dict[str, Any]]]) – An optional feature for transferring a local package (typically a development package not found via PYPI or Anaconda). Provide a list of packages with each package configured via dictionary with the following key-value pairs: name:str and path:pathlit.Path.

Returns

Future of success in cluster’s setup

Return type

Future[bool]

gather() concurrent.futures._base.Future[Dict]
property host: str
property logdir: pathlib.Path
property port: int
record() concurrent.futures._base.Future[bool]

Start a recording data collection by the cluster.

request_registered_method(node_id: str, method_name: str, params: Dict[str, Any] = {}) concurrent.futures._base.Future[Dict[str, Any]]
reset(keep_workers: bool = True) concurrent.futures._base.Future[bool]
shutdown(blocking: bool = True) Union[bool, concurrent.futures._base.Future[bool]]

Proper shutting down ChimeraPy-Engine cluster.

Through this method, the Manager broadcast to all Workers to shutdown, in which they will stop their processes and threads safely.

start() concurrent.futures._base.Future[bool]

Start the executing of the cluster.

Before starting, make sure that you have perform the following steps:

  • Create Nodes

  • Create DAG with Nodes and their edges

  • Connect Workers (must be before committing Graph)

  • Register, map, and commit Graph

Returns

Future of the success of starting the cluster

Return type

Future[bool]

step() concurrent.futures._base.Future[bool]

Cluster step execution for offline operation.

The step function is for careful but slow operation of the cluster. For online execution, start and stop are the methods to be used.

Returns

Future of the success of step function broadcasting

Return type

Future[bool]

stop() concurrent.futures._base.Future[bool]

Stop the executiong of the cluster.

Do not forget that you still need to execute shutdown to properly shutdown processes, threads, and queues.

Returns

Future of the success of stopping the cluster

Return type

Future[bool]

property workers: Dict[str, chimerapy.engine.states.WorkerState]
zeroconf(enable: bool = True, timeout: Union[int, float] = 5) bool

Worker

class Worker(name: str, port: int = 0, delete_temp: bool = False, id: Optional[str] = None)
async async_collect() bool
async async_connect(host: Optional[str] = None, port: Optional[int] = None, method: Optional[Literal['ip', 'zeroconf']] = 'ip', timeout: Union[int, float] = 45) bool

Connect Worker to Manager.

This establish server-client connections between Worker and Manager. To ensure that the connections are close correctly, either the Manager or Worker should shutdown before stopping your program to avoid processes and threads that do not shutdown.

Parameters
  • method (Literal['ip', 'zeroconf']) – The approach to connecting to Manager

  • host (str) – The Manager’s IP address.

  • port (int) – The Manager’s port number

  • timeout (Union[int, float]) – Set timeout for the connection.

Returns

Success in connecting to the Manager

Return type

bool

async async_create_node(node_config: Union[chimerapy.engine.node.node_config.NodeConfig, Dict]) bool
async async_deregister() bool
async async_destroy_node(node_id: str) bool
async async_gather() Dict
async async_record_nodes() bool
async async_request_registered_method(node_id: str, method_name: str, params: Dict = {}) Dict[str, Any]
async async_shutdown() bool
async async_start_nodes() bool
async async_step() bool
async async_stop_nodes() bool
collect() concurrent.futures._base.Future[bool]
connect(host: Optional[str] = None, port: Optional[int] = None, method: Optional[Literal['ip', 'zeroconf']] = 'ip', timeout: Union[int, float] = 10.0, blocking: bool = True) Union[bool, concurrent.futures._base.Future[bool]]

Connect Worker to Manager.

This establish server-client connections between Worker and Manager. To ensure that the connections are close correctly, either the Manager or Worker should shutdown before stopping your program to avoid processes and threads that do not shutdown.

Parameters
  • host (str) – The Manager’s IP address.

  • port (int) – The Manager’s port number

  • timeout (Union[int, float]) – Set timeout for the connection.

  • blocking (bool) – Make the connection call blocking.

Returns

Success in connecting to the Manager

Return type

Future[bool]

create_node(node_config: chimerapy.engine.node.node_config.NodeConfig) concurrent.futures._base.Future[bool]
deregister() concurrent.futures._base.Future[bool]
destroy_node(node_id: str) concurrent.futures._base.Future[bool]
gather() concurrent.futures._base.Future[Dict]
property id: str
idle()
property ip: str
property name: str
property nodes: Dict[str, chimerapy.engine.states.NodeState]
property port: int
record_nodes() concurrent.futures._base.Future[bool]
request_registered_method(node_id: str, method_name: str, params: Dict = {}) concurrent.futures._base.Future[Tuple[bool, Any]]
services: chimerapy.engine.worker.worker_services_group.WorkerServicesGroup
shutdown(blocking: bool = True) Union[concurrent.futures._base.Future[bool], bool]

Shutdown Worker safely.

The Worker needs to shutdown its server, client and Nodes in a safe manner, such as setting flag variables and clearing out queues.

Parameters

msg (Dict) – Leave empty, required to work when Manager sends shutdown message to Worker.

start_nodes() concurrent.futures._base.Future[bool]
step() concurrent.futures._base.Future[bool]
stop_nodes() concurrent.futures._base.Future[bool]

Node

class Node(name: str, debug_port: Optional[int] = None, logdir: Optional[Union[str, pathlib.Path]] = None, id: Optional[str] = None)
context: Literal['main', 'multiprocessing', 'threading']
get_logger() logging.Logger
property id: str
main()

User-possible overwritten method.

This method can also be overwritten, through it is recommend to do so carefully. If overwritten, the handling of inputs will have to be implemented as well.

One can have access to this information from self.in_bound_data, and self.new_data_available attributes.

property name: str
registered_methods: Dict[str, chimerapy.engine.node.registered_method.RegisteredMethod] = {}
run(blocking: bool = True, running: Optional[multiprocess.context.BaseContext.Value] = None)

The actual method that is executed in the new process.

When working with multiprocessing.Process, it should be considered that the creation of a new process can yield unexpected behavior if not carefull. It is recommend that one reads the mutliprocessing documentation to understand the implications.

property running: bool
save_audio(name: str, data: numpy.ndarray, channels: int, format: int, rate: int)

Record audio data.

Parameters
  • name (str) – Name of the audio data (.wav extension will be suffixed).

  • data (np.ndarray) – Audio data as a numpy array.

  • channels (int) – Number of channels.

  • format (int) – Format of the audio data.

  • rate (int) – Sampling rate of the audio data.

Notes

It is the implementation’s responsibility to properly format the data

save_image(name: str, data: numpy.ndarray)
save_tabular(name: str, data: Union[pandas.core.frame.DataFrame, Dict[str, Any], pandas.core.series.Series])
save_video(name: str, data: numpy.ndarray, fps: int)
services: chimerapy.engine.service.ServiceGroup
setup()

User-defined method for Node setup.

In this method, the setup logic of the Node is executed. This would include opening files, creating connections to sensors, and calibrating sensors.

shutdown(msg: Dict = {})
step(data_chunks: Dict[str, chimerapy.engine.networking.data_chunk.DataChunk] = {}) Union[chimerapy.engine.networking.data_chunk.DataChunk, Any]

User-define method.

In this method, the logic that is executed within the Node’s while loop. For data sources (no inputs), the step method will execute as fast as possible; therefore, it is important to add time.sleep to specify the sampling rate.

For a Node that have inputs, these will be executed when new data is received.

Parameters

data_chunks (Optional[Dict[str, DataChunk]]) – For source nodes, this parameter should not be considered (as they don’t have inputs). For step and sink nodes, the data_dict must be included to avoid an error. The variable is a dictionary, where the key is the in-bound Node’s name and the value is the output of the in-bound Node’s step function.

teardown()

User-define method.

This method provides a convienient way to shutdown services, such as closing files, signaling to sensors to stop, and making any last minute corrections to the data.

Graph

class Graph(g: networkx.classes.digraph.DiGraph = <networkx.classes.digraph.DiGraph object>)
add_edge(src: chimerapy.engine.node.node.Node, dst: chimerapy.engine.node.node.Node, follow: bool = False)
add_edges_from(list_of_edges: Sequence[Sequence[chimerapy.engine.node.node.Node]])
add_node(node: chimerapy.engine.node.node.Node)
add_nodes_from(nodes: Sequence[chimerapy.engine.node.node.Node])
get_id_by_name(node_name: str)
get_layers_and_pos()
has_node_by_id(node_id: str)
is_valid()

Checks if Graph is a true DAG.

plot(font_size: int = 30, node_size: int = 5000)

Plotting the Graph to visualize data pipeline.

This visualization tool uses matplotlib and networkx to show the Nodes and their edges.

Parameters
  • font_size (int) – Font size

  • node_size (int) – Node size