API Reference¶
DataChunk¶
- class DataChunk¶
- __init__()¶
- add(name: str, value: Any, content_type: Literal['image', 'other'] = 'other')¶
Add a new record to the DataChunk instance.
The important parameter here is the content_type, as this will affect the execution speed and real-time ability of a pipeline. As of now, we only have two options:
imageandother, as these are the provided serialization methods.When sending an image (a numpy array), use the
imageoption. As for anything else, use theotheroption until further notice.- Parameters
name (str) – The name to the record.
value (Any) – The contents to be stored with the name key.
content_type (Literal["image", "other"]) – Specifying the content type to help serialization and compression efficiency.
- get(name: str) Dict[str, Any]¶
Extract the record given a name.
- Parameters
name (str) – The requested key name.
- Returns
Returns a record, stored as a dictionary, with the following attributes:
value,content-typeandownership. Mostly you will only need to usevalue.- Return type
Dict[str, Any]
- update(name: str, record: Dict[str, Any])¶
Overwrite record with a new one, deletes previous meta data.
- Parameters
name (str) – The name of the record
record (Dict[str, Any]) – The new record to overwrite the pre-existing one.
Node¶
- class Node(name: str, debug_port: Optional[int] = None, logdir: Optional[Union[str, pathlib.Path]] = None, id: Optional[str] = None)¶
- __init__(name: str, debug_port: Optional[int] = None, logdir: Optional[Union[str, pathlib.Path]] = None, id: Optional[str] = None)¶
Create a basic unit of computation in ChimeraPy-Engine.
A node has three main functions that can be overwritten to add desired behavior:
setup,step, andteardown. You don’t require them all if not necessary. Thestepfunction is executed within a while loop, when new inputs are available (if inputs are specified in the graph).If the
stepfunction is too restrictive, themain(containing the while loop) can be overwritten instead.- Parameters
name (str) – The name that will later used to refer to the Node.
- setup()¶
User-defined method for
Nodesetup.In this method, the setup logic of the
Nodeis executed. This would include opening files, creating connections to sensors, and calibrating sensors.
- main()¶
User-possible overwritten method.
This method can also be overwritten, through it is recommend to do so carefully. If overwritten, the handling of inputs will have to be implemented as well.
One can have access to this information from
self.in_bound_data, andself.new_data_availableattributes.
- step(data_chunks: Dict[str, chimerapy.engine.networking.data_chunk.DataChunk] = {}) Union[chimerapy.engine.networking.data_chunk.DataChunk, Any]¶
User-define method.
In this method, the logic that is executed within the
Node’s while loop. For data sources (no inputs), thestepmethod will execute as fast as possible; therefore, it is important to addtime.sleepto specify the sampling rate.For a
Nodethat have inputs, these will be executed when new data is received.- Parameters
data_chunks (Optional[Dict[str, DataChunk]]) – For source nodes, this parameter should not be considered (as they don’t have inputs). For step and sink nodes, the
data_dictmust be included to avoid an error. The variable is a dictionary, where the key is the in-boundNode’s name and the value is the output of the in-boundNode’sstepfunction.
- teardown()¶
User-define method.
This method provides a convienient way to shutdown services, such as closing files, signaling to sensors to stop, and making any last minute corrections to the data.
Graph¶
- class Graph(g: networkx.classes.digraph.DiGraph = <networkx.classes.digraph.DiGraph object>)¶
- is_valid()¶
Checks if
Graphis a true DAG.
- plot(font_size: int = 30, node_size: int = 5000)¶
Plotting the
Graphto visualize data pipeline.This visualization tool uses
matplotlibandnetworkxto show theNodesand their edges.- Parameters
font_size (int) – Font size
node_size (int) – Node size
Worker¶
- class Worker(name: str, port: int = 0, delete_temp: bool = False, id: Optional[str] = None)¶
- __init__(name: str, port: int = 0, delete_temp: bool = False, id: Optional[str] = None)¶
Create a local Worker.
To execute
Nodeswithin the main computer that is also housing theManager, it will require aWorkeras well. Therefore, it is common to create aWorkerand aManagerwithin the same computer.To create a worker in another machine, you will have to use the following command (in the other machine’s terminal):
>>> cp-worker --ip <manager's IP> --port <manager's port> --name <name> --id <id>
- Parameters
name (str) – The name for the
Workerthat will be used as reference.port (int) – The port of the Worker’s HTTP server. Select 0 for a random port, mostly when running multiple Worker instances in the same computer.
delete_temp (bool) – After session is over, should the Worker delete any of the temporary files.
id (Optional[str]) – Can predefine the ID of the Worker.
- connect(host: Optional[str] = None, port: Optional[int] = None, method: Optional[Literal['ip', 'zeroconf']] = 'ip', timeout: Union[int, float] = 10.0, blocking: bool = True) Union[bool, concurrent.futures._base.Future[bool]]¶
Connect
WorkertoManager.This establish server-client connections between
WorkerandManager. To ensure that the connections are close correctly, either theManagerorWorkershould shutdown before stopping your program to avoid processes and threads that do not shutdown.- Parameters
host (str) – The
Manager’s IP address.port (int) – The
Manager’s port numbertimeout (Union[int, float]) – Set timeout for the connection.
blocking (bool) – Make the connection call blocking.
- Returns
Success in connecting to the Manager
- Return type
Future[bool]
- shutdown(blocking: bool = True) Union[concurrent.futures._base.Future[bool], bool]¶
Shutdown
Workersafely.The
Workerneeds to shutdown its server, client andNodesin a safe manner, such as setting flag variables and clearing out queues.- Parameters
msg (Dict) – Leave empty, required to work when
Managersends shutdown message toWorker.
Manager¶
- class Manager(logdir: Union[pathlib.Path, str], port: int = 9000, publish_logs_via_zmq: bool = False, enable_api: bool = True, **kwargs)¶
- __init__(logdir: Union[pathlib.Path, str], port: int = 9000, publish_logs_via_zmq: bool = False, enable_api: bool = True, **kwargs)¶
Create
Manager, the controller of the cluster.The
Manageris the director of the cluster, such as adding new computers, providing roles, starting and stopping data collection, and shutting down the system.- Parameters
port (int) – Referred port, might return a different one based on availablity.
max_num_of_workers (int) – Maximum number of allowed Workers
publish_logs_via_zmq (bool, optional) – Whether to publish logs via ZMQ. Defaults to False.
enable_api (bool) – Enable front-end API entrypoints to controll cluster. Defaults to True.
**kwargs – Additional keyword arguments. Currently, this is used to configure the ZMQ log handler.
- commit_graph(graph: chimerapy.engine.graph.Graph, mapping: Dict[str, List[str]], context: Literal['multiprocessing', 'threading'] = 'multiprocessing', send_packages: Optional[List[Dict[str, Any]]] = None) concurrent.futures._base.Future[bool]¶
Committing
Graphto the cluster.Committing refers to how the graph itself (with its nodes and edges) and the mapping is distributed to the cluster. The whole routine is two steps: peer creation and peer-to-peer connection setup.
In peer creation, the
Managermessages eachWorkerwith theNodesthey need to execute. TheWorkersconfigure theNodes, by giving them network information. TheNodesare then started and report back to theWorkers.With the successful creation of the
Nodes, theManagerrequest theNodesservers’ ip address and port numbers to create an address table for all theNodes. Then this table is used to inform eachNodewhere their in-bound and out-boundNodesare located; thereby establishing the edges betweenNodes.- Parameters
graph (cp.Graph) – The graph to deploy within the cluster.
mapping (Dict[str, List[str]) – Mapping from
cp.Workertocp.Nodesthrough a dictionary. The keys are the name of the workers, while the value is a list of the nodes’ names.send_packages (Optional[List[Dict[str, Any]]]) – An optional feature for transferring a local package (typically a development package not found via PYPI or Anaconda). Provide a list of packages with each package configured via dictionary with the following key-value pairs: name:
strand path:pathlit.Path.
- Returns
Future of success in cluster’s setup
- Return type
Future[bool]
- step() concurrent.futures._base.Future[bool]¶
Cluster step execution for offline operation.
The
stepfunction is for careful but slow operation of the cluster. For online execution,startandstopare the methods to be used.- Returns
Future of the success of step function broadcasting
- Return type
Future[bool]
- start() concurrent.futures._base.Future[bool]¶
Start the executing of the cluster.
Before starting, make sure that you have perform the following steps:
Create
NodesCreate
DAGwithNodesand their edgesConnect
Workers(must be before committingGraph)Register, map, and commit
Graph
- Returns
Future of the success of starting the cluster
- Return type
Future[bool]
- stop() concurrent.futures._base.Future[bool]¶
Stop the executiong of the cluster.
Do not forget that you still need to execute
shutdownto properly shutdown processes, threads, and queues.- Returns
Future of the success of stopping the cluster
- Return type
Future[bool]
- shutdown(blocking: bool = True) Union[bool, concurrent.futures._base.Future[bool]]¶
Proper shutting down ChimeraPy-Engine cluster.
Through this method, the
Managerbroadcast to allWorkersto shutdown, in which they will stop their processes and threads safely.