Actors¶
Manager¶
- class Manager(logdir: Union[pathlib.Path, str], port: int = 9000, publish_logs_via_zmq: bool = False, enable_api: bool = True, **kwargs)¶
- async async_collect(unzip: bool = True) bool¶
- async async_commit(graph: chimerapy.engine.graph.Graph, mapping: Dict[str, List[str]], context: Literal['multiprocessing', 'threading'] = 'multiprocessing', send_packages: Optional[List[Dict[str, Any]]] = None) bool¶
Committing
Graphto the cluster.Committing refers to how the graph itself (with its nodes and edges) and the mapping is distributed to the cluster. The whole routine is two steps: peer creation and peer-to-peer connection setup.
In peer creation, the
Managermessages eachWorkerwith theNodesthey need to execute. TheWorkersconfigure theNodes, by giving them network information. TheNodesare then started and report back to theWorkers.With the successful creation of the
Nodes, theManagerrequest theNodesservers’ ip address and port numbers to create an address table for all theNodes. Then this table is used to inform eachNodewhere their in-bound and out-boundNodesare located; thereby establishing the edges betweenNodes.- Parameters
graph (cp.Graph) – The graph to deploy within the cluster.
mapping (Dict[str, List[str]) – Mapping from
cp.Workertocp.Nodesthrough a dictionary. The keys are the name of the workers, while the value is a list of the nodes’ names.send_packages (Optional[List[Dict[str, Any]]]) – An optional feature for transferring a local package (typically a development package not found via PYPI or Anaconda). Provide a list of packages with each package configured via dictionary with the following key-value pairs: name:
strand path:pathlit.Path.
- Returns
Success in cluster’s setup
- Return type
bool
- async async_gather() Dict¶
- async async_record() bool¶
- async async_request_registered_method(node_id: str, method_name: str, params: Dict[str, Any] = {}) Dict[str, Any]¶
- async async_reset(keep_workers: bool = True)¶
- async async_shutdown() bool¶
- async async_start() bool¶
- async async_stop() bool¶
- async async_zeroconf(enable: bool = True) bool¶
- collect(unzip: bool = True) concurrent.futures._base.Future[bool]¶
Collect data from the Workers
First, we wait until all the Nodes have finished save their data. Then, manager request that Nodes’ from the Workers.
- Parameters
unzip (bool) – Should the .zip archives be extracted.
- Returns
Future of success in collect data from Workers
- Return type
Future[bool]
- commit_graph(graph: chimerapy.engine.graph.Graph, mapping: Dict[str, List[str]], context: Literal['multiprocessing', 'threading'] = 'multiprocessing', send_packages: Optional[List[Dict[str, Any]]] = None) concurrent.futures._base.Future[bool]¶
Committing
Graphto the cluster.Committing refers to how the graph itself (with its nodes and edges) and the mapping is distributed to the cluster. The whole routine is two steps: peer creation and peer-to-peer connection setup.
In peer creation, the
Managermessages eachWorkerwith theNodesthey need to execute. TheWorkersconfigure theNodes, by giving them network information. TheNodesare then started and report back to theWorkers.With the successful creation of the
Nodes, theManagerrequest theNodesservers’ ip address and port numbers to create an address table for all theNodes. Then this table is used to inform eachNodewhere their in-bound and out-boundNodesare located; thereby establishing the edges betweenNodes.- Parameters
graph (cp.Graph) – The graph to deploy within the cluster.
mapping (Dict[str, List[str]) – Mapping from
cp.Workertocp.Nodesthrough a dictionary. The keys are the name of the workers, while the value is a list of the nodes’ names.send_packages (Optional[List[Dict[str, Any]]]) – An optional feature for transferring a local package (typically a development package not found via PYPI or Anaconda). Provide a list of packages with each package configured via dictionary with the following key-value pairs: name:
strand path:pathlit.Path.
- Returns
Future of success in cluster’s setup
- Return type
Future[bool]
- gather() concurrent.futures._base.Future[Dict]¶
- property host: str¶
- property logdir: pathlib.Path¶
- property port: int¶
- record() concurrent.futures._base.Future[bool]¶
Start a recording data collection by the cluster.
- request_registered_method(node_id: str, method_name: str, params: Dict[str, Any] = {}) concurrent.futures._base.Future[Dict[str, Any]]¶
- reset(keep_workers: bool = True) concurrent.futures._base.Future[bool]¶
- shutdown(blocking: bool = True) Union[bool, concurrent.futures._base.Future[bool]]¶
Proper shutting down ChimeraPy-Engine cluster.
Through this method, the
Managerbroadcast to allWorkersto shutdown, in which they will stop their processes and threads safely.
- start() concurrent.futures._base.Future[bool]¶
Start the executing of the cluster.
Before starting, make sure that you have perform the following steps:
Create
NodesCreate
DAGwithNodesand their edgesConnect
Workers(must be before committingGraph)Register, map, and commit
Graph
- Returns
Future of the success of starting the cluster
- Return type
Future[bool]
- step() concurrent.futures._base.Future[bool]¶
Cluster step execution for offline operation.
The
stepfunction is for careful but slow operation of the cluster. For online execution,startandstopare the methods to be used.- Returns
Future of the success of step function broadcasting
- Return type
Future[bool]
- stop() concurrent.futures._base.Future[bool]¶
Stop the executiong of the cluster.
Do not forget that you still need to execute
shutdownto properly shutdown processes, threads, and queues.- Returns
Future of the success of stopping the cluster
- Return type
Future[bool]
- property workers: Dict[str, chimerapy.engine.states.WorkerState]¶
- zeroconf(enable: bool = True, timeout: Union[int, float] = 5) bool¶
Worker¶
- class Worker(name: str, port: int = 0, delete_temp: bool = False, id: Optional[str] = None)¶
- async async_collect() bool¶
- async async_connect(host: Optional[str] = None, port: Optional[int] = None, method: Optional[Literal['ip', 'zeroconf']] = 'ip', timeout: Union[int, float] = 45) bool¶
Connect
WorkertoManager.This establish server-client connections between
WorkerandManager. To ensure that the connections are close correctly, either theManagerorWorkershould shutdown before stopping your program to avoid processes and threads that do not shutdown.- Parameters
method (Literal['ip', 'zeroconf']) – The approach to connecting to
Managerhost (str) – The
Manager’s IP address.port (int) – The
Manager’s port numbertimeout (Union[int, float]) – Set timeout for the connection.
- Returns
Success in connecting to the Manager
- Return type
bool
- async async_create_node(node_config: Union[chimerapy.engine.node.node_config.NodeConfig, Dict]) bool¶
- async async_deregister() bool¶
- async async_destroy_node(node_id: str) bool¶
- async async_gather() Dict¶
- async async_record_nodes() bool¶
- async async_request_registered_method(node_id: str, method_name: str, params: Dict = {}) Dict[str, Any]¶
- async async_shutdown() bool¶
- async async_start_nodes() bool¶
- async async_step() bool¶
- async async_stop_nodes() bool¶
- collect() concurrent.futures._base.Future[bool]¶
- connect(host: Optional[str] = None, port: Optional[int] = None, method: Optional[Literal['ip', 'zeroconf']] = 'ip', timeout: Union[int, float] = 10.0, blocking: bool = True) Union[bool, concurrent.futures._base.Future[bool]]¶
Connect
WorkertoManager.This establish server-client connections between
WorkerandManager. To ensure that the connections are close correctly, either theManagerorWorkershould shutdown before stopping your program to avoid processes and threads that do not shutdown.- Parameters
host (str) – The
Manager’s IP address.port (int) – The
Manager’s port numbertimeout (Union[int, float]) – Set timeout for the connection.
blocking (bool) – Make the connection call blocking.
- Returns
Success in connecting to the Manager
- Return type
Future[bool]
- create_node(node_config: chimerapy.engine.node.node_config.NodeConfig) concurrent.futures._base.Future[bool]¶
- deregister() concurrent.futures._base.Future[bool]¶
- destroy_node(node_id: str) concurrent.futures._base.Future[bool]¶
- gather() concurrent.futures._base.Future[Dict]¶
- property id: str¶
- idle()¶
- property ip: str¶
- property name: str¶
- property nodes: Dict[str, chimerapy.engine.states.NodeState]¶
- property port: int¶
- record_nodes() concurrent.futures._base.Future[bool]¶
- request_registered_method(node_id: str, method_name: str, params: Dict = {}) concurrent.futures._base.Future[Tuple[bool, Any]]¶
- services: chimerapy.engine.worker.worker_services_group.WorkerServicesGroup¶
- shutdown(blocking: bool = True) Union[concurrent.futures._base.Future[bool], bool]¶
Shutdown
Workersafely.The
Workerneeds to shutdown its server, client andNodesin a safe manner, such as setting flag variables and clearing out queues.- Parameters
msg (Dict) – Leave empty, required to work when
Managersends shutdown message toWorker.
- start_nodes() concurrent.futures._base.Future[bool]¶
- step() concurrent.futures._base.Future[bool]¶
- stop_nodes() concurrent.futures._base.Future[bool]¶
Node¶
- class Node(name: str, debug_port: Optional[int] = None, logdir: Optional[Union[str, pathlib.Path]] = None, id: Optional[str] = None)¶
- context: Literal['main', 'multiprocessing', 'threading']¶
- get_logger() logging.Logger¶
- property id: str¶
- main()¶
User-possible overwritten method.
This method can also be overwritten, through it is recommend to do so carefully. If overwritten, the handling of inputs will have to be implemented as well.
One can have access to this information from
self.in_bound_data, andself.new_data_availableattributes.
- property name: str¶
- registered_methods: Dict[str, chimerapy.engine.node.registered_method.RegisteredMethod] = {}¶
- run(blocking: bool = True, running: Optional[multiprocess.context.BaseContext.Value] = None)¶
The actual method that is executed in the new process.
When working with
multiprocessing.Process, it should be considered that the creation of a new process can yield unexpected behavior if not carefull. It is recommend that one reads themutliprocessingdocumentation to understand the implications.
- property running: bool¶
- save_audio(name: str, data: numpy.ndarray, channels: int, format: int, rate: int)¶
Record audio data.
- Parameters
name (str) – Name of the audio data (.wav extension will be suffixed).
data (np.ndarray) – Audio data as a numpy array.
channels (int) – Number of channels.
format (int) – Format of the audio data.
rate (int) – Sampling rate of the audio data.
Notes
It is the implementation’s responsibility to properly format the data
- save_image(name: str, data: numpy.ndarray)¶
- save_tabular(name: str, data: Union[pandas.core.frame.DataFrame, Dict[str, Any], pandas.core.series.Series])¶
- save_video(name: str, data: numpy.ndarray, fps: int)¶
- services: chimerapy.engine.service.ServiceGroup¶
- setup()¶
User-defined method for
Nodesetup.In this method, the setup logic of the
Nodeis executed. This would include opening files, creating connections to sensors, and calibrating sensors.
- shutdown(msg: Dict = {})¶
- step(data_chunks: Dict[str, chimerapy.engine.networking.data_chunk.DataChunk] = {}) Union[chimerapy.engine.networking.data_chunk.DataChunk, Any]¶
User-define method.
In this method, the logic that is executed within the
Node’s while loop. For data sources (no inputs), thestepmethod will execute as fast as possible; therefore, it is important to addtime.sleepto specify the sampling rate.For a
Nodethat have inputs, these will be executed when new data is received.- Parameters
data_chunks (Optional[Dict[str, DataChunk]]) – For source nodes, this parameter should not be considered (as they don’t have inputs). For step and sink nodes, the
data_dictmust be included to avoid an error. The variable is a dictionary, where the key is the in-boundNode’s name and the value is the output of the in-boundNode’sstepfunction.
- teardown()¶
User-define method.
This method provides a convienient way to shutdown services, such as closing files, signaling to sensors to stop, and making any last minute corrections to the data.
Graph¶
- class Graph(g: networkx.classes.digraph.DiGraph = <networkx.classes.digraph.DiGraph object>)¶
- add_edge(src: chimerapy.engine.node.node.Node, dst: chimerapy.engine.node.node.Node, follow: bool = False)¶
- add_edges_from(list_of_edges: Sequence[Sequence[chimerapy.engine.node.node.Node]])¶
- add_node(node: chimerapy.engine.node.node.Node)¶
- add_nodes_from(nodes: Sequence[chimerapy.engine.node.node.Node])¶
- get_id_by_name(node_name: str)¶
- get_layers_and_pos()¶
- has_node_by_id(node_id: str)¶
- is_valid()¶
Checks if
Graphis a true DAG.
- plot(font_size: int = 30, node_size: int = 5000)¶
Plotting the
Graphto visualize data pipeline.This visualization tool uses
matplotlibandnetworkxto show theNodesand their edges.- Parameters
font_size (int) – Font size
node_size (int) – Node size