Logger¶
maro.utils.logger¶
-
class
maro.utils.logger.CliLogger(name)[source]¶ Bases:
objectAn internal logger for CLI logging.
It maintains a singleton logger in a CLI command lifecycle. The logger is inited at the very beginning, and use different logging formats based on the
--debugargument.-
debug(message: str) → None[source]¶ logger.debug()with passive init.- Parameters
message (str) – logged message.
-
debug_green(message: str) → None[source]¶ logger.debug()with color green and passive init.- Parameters
message (str) – logged message.
-
debug_yellow(message: str) → None[source]¶ logger.debug()with color yellow and passive init.- Parameters
message (str) – logged message.
-
error(message: str) → None[source]¶ logger.error()with passive init.- Parameters
message (str) – logged message.
-
error_red(message: str) → None[source]¶ logger.error()with color red and passive init.- Parameters
message (str) – logged message.
-
info(message: str) → None[source]¶ logger.info()with passive init.- Parameters
message (str) – logged message.
-
info_green(message: str) → None[source]¶ logger.info()with color green and passive init.- Parameters
message (str) – logged message.
-
passive_init() → None[source]¶ Init a new
CliLoggerif current logger is not matched with the parameters.
-
-
class
maro.utils.logger.DummyLogger[source]¶ Bases:
objectA dummy Logger, which is used when disabling logs.
-
class
maro.utils.logger.InternalLogger(component_name: str, tag: str = 'maro_internal', format_: maro.utils.logger.LogFormat = <LogFormat.internal: 3>, dump_folder: str = None, dump_mode: str = 'a', extension_name: str = 'log', auto_timestamp: bool = False)[source]¶ Bases:
maro.utils.logger.LoggerAn internal logger uses for recording the internal system’s log.
-
class
maro.utils.logger.LogFormat(value)[source]¶ Bases:
enum.EnumThe Enum class of the log format.
Example
LogFormat.full: full time | host | user | pid | tag | level | msgLogFormat.simple: simple time | tag | level | msgLogFormat.cli_debug: simple time | level | msgLogFormat.cli_info(file): simple time | level | msgLogFormat.cli_info(stdout): msg
-
cli_debug= 4¶
-
cli_info= 5¶
-
full= 1¶
-
internal= 3¶
-
none= 6¶
-
simple= 2¶
-
class
maro.utils.logger.Logger(tag: str, format_: maro.utils.logger.LogFormat = <LogFormat.simple: 2>, dump_folder: str = '', dump_mode: str = 'w', extension_name: str = 'log', auto_timestamp: bool = False, stdout_level='INFO')[source]¶ Bases:
objectA simple wrapper for logging.
The Logger hosts a file handler and a stdout handler. The file handler is set to
DEBUGlevel and will dump all the logging info to the givendump_folder. The logging level of the stdout handler is decided by thestdout_level, and can be redirected by setting the environment variableLOG_LEVEL. SupportedLOG_LEVELincludes:DEBUG,INFO,WARN,ERROR,CRITICAL,PROCESS.Example
$ export LOG_LEVEL=INFO- Parameters
tag (str) – Log tag for stream and file output.
format_ (LogFormat) – Predefined formatter. Defaults to
LogFormat.full.dump_folder (str) – Log dumped folder. Defaults to the current folder. The dumped log level is
logging.DEBUG. The full path of the dumped log file is dump_folder/tag.log.dump_mode (str) – Write log file mode. Defaults to
w. Useato append log.extension_name (str) – Final dumped file extension name. Defaults to log.
auto_timestamp (bool) – Add a timestamp to the dumped log file name or not. E.g: tag.1574953673.137387.log.
stdout_level (str) – the logging level of the stdout handler. Defaults to
DEBUG.
Exceptions¶
maro.utils.exception.backends_exception¶
-
exception
maro.utils.exception.backends_exception.BackendsAccessDeletedNodeException[source]¶ Bases:
maro.utils.exception.base_exception.MAROExceptionException when try to access a deleted node.
-
exception
maro.utils.exception.backends_exception.BackendsAppendToNonListAttributeException[source]¶ Bases:
maro.utils.exception.base_exception.MAROExceptionException when append value to a non list attribute.
-
exception
maro.utils.exception.backends_exception.BackendsArrayAttributeAccessException[source]¶ Bases:
maro.utils.exception.base_exception.MAROExceptionException when access attribute that slot number greater than 1.
This exception is caused when using invalid slice interface to access slots.
-
exception
maro.utils.exception.backends_exception.BackendsClearNonListAttributeException[source]¶ Bases:
maro.utils.exception.base_exception.MAROExceptionException when try to clear a non list attribute.
-
exception
maro.utils.exception.backends_exception.BackendsGetItemInvalidException[source]¶ Bases:
maro.utils.exception.base_exception.MAROExceptionException if the parameters is invalid when getting item from backend.
Usually this exception is caused by invalid node or attribute index.
-
exception
maro.utils.exception.backends_exception.BackendsInsertNonListAttributeException[source]¶ Bases:
maro.utils.exception.base_exception.MAROExceptionException when try to insert a value to non list attribute.
-
exception
maro.utils.exception.backends_exception.BackendsInvalidAttributeException[source]¶ Bases:
maro.utils.exception.base_exception.MAROExceptionException when try to access a not exist attribute type.
-
exception
maro.utils.exception.backends_exception.BackendsInvalidNodeException[source]¶ Bases:
maro.utils.exception.base_exception.MAROExceptionException when try to access a not exist node type.
-
exception
maro.utils.exception.backends_exception.BackendsRemoveFromNonListAttributeException[source]¶ Bases:
maro.utils.exception.base_exception.MAROExceptionException when try to from a value to non list attribute.
-
exception
maro.utils.exception.backends_exception.BackendsResizeNonListAttributeException[source]¶ Bases:
maro.utils.exception.base_exception.MAROExceptionException when try to resize a non list attribute.
-
exception
maro.utils.exception.backends_exception.BackendsSetItemInvalidException[source]¶ Bases:
maro.utils.exception.base_exception.MAROExceptionException if the parameter is invalid when setting item from backend.
Usually this exception is caused by invalid node or attribute index.
maro.utils.exception.base_exception¶
-
exception
maro.utils.exception.base_exception.MAROException(error_code: int = 1000, msg: str = None)[source]¶ Bases:
ExceptionThe base exception class for MARO.
- Parameters
error_code (int) – the predefined MARO error code. You can find the detailed definition in: maro.utils.exception.error_code.py.
msg (str) – Description of the error. Defaults to None, which will show the base error information.
maro.utils.exception.cli_exception¶
-
exception
maro.utils.exception.cli_exception.BadRequestError(message: str = None)[source]¶ Bases:
maro.utils.exception.cli_exception.UserFaultBad request from client.
-
exception
maro.utils.exception.cli_exception.CliError(message: str = None, error_code: int = 3000)[source]¶ Bases:
maro.utils.exception.base_exception.MAROExceptionBase class for all MARO CLI errors.
-
exception
maro.utils.exception.cli_exception.CliInternalError(message: str = None)[source]¶ Bases:
maro.utils.exception.cli_exception.ClientErrorMARO CLI internal error.
-
exception
maro.utils.exception.cli_exception.ClientError(message: str = None, error_code: int = 3000)[source]¶ Bases:
maro.utils.exception.cli_exception.CliErrorMARO CLI should be responsible for the errors. ErrorCode with 31XX.
-
exception
maro.utils.exception.cli_exception.ClusterInternalError(message: str = None)[source]¶ Bases:
maro.utils.exception.cli_exception.ServiceErrorMARO Cluster internal error.
-
exception
maro.utils.exception.cli_exception.CommandError(cli_command: str, message: str = None)[source]¶ Bases:
maro.utils.exception.cli_exception.CliErrorFailed execution error of CLI command.
-
exception
maro.utils.exception.cli_exception.CommandExecutionError(message: str = None, command: str = None)[source]¶ Bases:
maro.utils.exception.cli_exception.ServiceErrorSubprocess execution error.
-
exception
maro.utils.exception.cli_exception.CommandNotFoundError(message: str = None, usage: str = '')[source]¶ Bases:
maro.utils.exception.cli_exception.UserFaultCommand is misspelled or not recognized by MARO CLI.
-
exception
maro.utils.exception.cli_exception.DeploymentError(message: str = None)[source]¶ Bases:
maro.utils.exception.cli_exception.UserFaultMARO deployment fails.
-
exception
maro.utils.exception.cli_exception.FileOperationError(message: str = None)[source]¶ Bases:
maro.utils.exception.cli_exception.UserFaultFor file or directory operation related errors.
-
exception
maro.utils.exception.cli_exception.InvalidDeploymentTemplateError(message: str = None)[source]¶ Bases:
maro.utils.exception.cli_exception.UserFaultMARO deployment template validation fails.
-
exception
maro.utils.exception.cli_exception.ProcessInternalError(message: str = None)[source]¶ Bases:
maro.utils.exception.cli_exception.UserFaultErrors in MARO CLI process mode.
-
exception
maro.utils.exception.cli_exception.ServiceError(message: str = None, error_code: int = 3000)[source]¶ Bases:
maro.utils.exception.cli_exception.CliErrorMARO Services should be responsible for the errors. ErrorCode with 32XX.
-
exception
maro.utils.exception.cli_exception.UserFault(message: str = None, error_code: int = 3000)[source]¶ Bases:
maro.utils.exception.cli_exception.CliErrorUsers should be responsible for the errors. ErrorCode with 30XX.
maro.utils.exception.communication_exception¶
-
exception
maro.utils.exception.communication_exception.ConditionalEventSyntaxError(msg: str = None)[source]¶ Bases:
maro.utils.exception.base_exception.MAROExceptionThe syntax error of a conditional event.
-
exception
maro.utils.exception.communication_exception.DriverReceiveError(msg: str = None)[source]¶ Bases:
maro.utils.exception.base_exception.MAROExceptionFailure to receive message in the driver.
-
exception
maro.utils.exception.communication_exception.DriverSendError(msg: str = None)[source]¶ Bases:
maro.utils.exception.base_exception.MAROExceptionFailure to send message in the driver.
-
exception
maro.utils.exception.communication_exception.DriverTypeError(msg: str = None)[source]¶ Bases:
maro.utils.exception.base_exception.MAROExceptionThe unrecognized driver type, occurs in the proxy.
-
exception
maro.utils.exception.communication_exception.InformationUncompletedError(msg: str = None)[source]¶ Bases:
maro.utils.exception.base_exception.MAROExceptionNo enough information from the Redis.
-
exception
maro.utils.exception.communication_exception.MessageSessionTypeError(msg: str = None)[source]¶ Bases:
maro.utils.exception.base_exception.MAROExceptionThe unrecognized session type, occurs in the
SessionMessage.
-
exception
maro.utils.exception.communication_exception.PeersConnectionError(msg: str = None)[source]¶ Bases:
maro.utils.exception.base_exception.MAROExceptionPeers connection error, occurs in the driver.
-
exception
maro.utils.exception.communication_exception.PeersDisconnectionError(msg: str = None)[source]¶ Bases:
maro.utils.exception.base_exception.MAROExceptionPeers disconnection error, occurs in the driver.
-
exception
maro.utils.exception.communication_exception.PeersMissError(msg: str = None)[source]¶ Bases:
maro.utils.exception.base_exception.MAROExceptionProxy do not have any expected peers.
-
exception
maro.utils.exception.communication_exception.PeersRejoinTimeout(msg: str = None)[source]¶ Bases:
maro.utils.exception.base_exception.MAROExceptionFailure to get enough peers during the max waiting time.
-
exception
maro.utils.exception.communication_exception.PendingToSend(msg: str = None)[source]¶ Bases:
maro.utils.exception.base_exception.MAROExceptionTemporary failure to send message, try to rejoin.
-
exception
maro.utils.exception.communication_exception.RedisConnectionError(msg: str = None)[source]¶ Bases:
maro.utils.exception.base_exception.MAROExceptionFailure to connect to redis, occurs in the proxy.
-
exception
maro.utils.exception.communication_exception.SocketTypeError(msg: str = None)[source]¶ Bases:
maro.utils.exception.base_exception.MAROExceptionThe unrecognized socket type, occurs in the driver.
maro.utils.exception.data_lib_exeption¶
-
exception
maro.utils.exception.data_lib_exeption.CimGeneratorInvalidParkingDuration[source]¶ Bases:
maro.utils.exception.base_exception.MAROExceptionException when parking duration is less than 0 in CIM topology file.
-
exception
maro.utils.exception.data_lib_exeption.MetaTimestampNotExist[source]¶ Bases:
maro.utils.exception.base_exception.MAROExceptionException that there is no timestamp specified in meta file, as this is required field.
maro.utils.exception.error_code¶
maro.utils.exception.rl_toolkit_exception¶
-
exception
maro.utils.exception.rl_toolkit_exception.MissingOptimizer(msg: str = None)[source]¶ Bases:
maro.utils.exception.base_exception.MAROExceptionRaised when the optimizers are missing when calling CoreModel’s step() method.
-
exception
maro.utils.exception.rl_toolkit_exception.StoreMisalignment(msg: str = None)[source]¶ Bases:
maro.utils.exception.base_exception.MAROExceptionRaised when a
putoperation on aSimpleStorewould cause the underlying lists to have different sizes.
-
exception
maro.utils.exception.rl_toolkit_exception.UnrecognizedTask(msg: str = None)[source]¶ Bases:
maro.utils.exception.base_exception.MAROExceptionRaised when a CoreModel has task names that are not unrecognized by an algorithm.
maro.utils.exception.simulator_exception¶
-
exception
maro.utils.exception.simulator_exception.BusinessEngineNotFoundError[source]¶ Bases:
maro.utils.exception.base_exception.MAROExceptionException then the simulator cannot find specified business engine module.
Communication¶
maro.communication.driver.abs_driver¶
-
class
maro.communication.driver.abs_driver.AbsDriver[source]¶ Bases:
abc.ABCAbstract class of the communication driver.
-
abstract property
address¶ The socket’s address. Based on the real socket driver, it usually be a
Dict.- Type
Dict
-
abstract property
maro.communication.driver.driver_type¶
maro.communication.driver.zmq_driver¶
-
class
maro.communication.driver.zmq_driver.ZmqDriver(component_type: str, protocol: str = 'tcp', send_timeout: int = -1, receive_timeout: int = -1, logger=<maro.utils.logger.DummyLogger object>)[source]¶ Bases:
maro.communication.driver.abs_driver.AbsDriverThe communication driver based on
ZMQ.- Parameters
component_type (str) – Component type in the current group.
protocol (str) – The underlying transport-layer protocol for transferring messages. Defaults to tcp.
send_timeout (int) – The timeout in milliseconds for sending message. If -1, no timeout (infinite). Defaults to -1.
receive_timeout (int) – The timeout in milliseconds for receiving message. If -1, no timeout (infinite). Defaults to -1.
logger – The logger instance or DummyLogger. Defaults to DummyLogger().
-
property
address¶ - Returns
The sockets’ address Dict of
zmq.PULLsocket andzmq.SUBsocket. The key of dict is the socket’s type, while the value of dict is socket’s ip address, which forms by protocol+ip+port.- Return type
Dict[int, str]
Example
Dict{zmq.PULL: “tcp://0.0.0.0:1234”, zmq.SUB: “tcp://0.0.0.0:1235”}
-
broadcast(topic: str, message: maro.communication.message.Message)[source]¶ Broadcast message.
- Parameters
topic (str) – The topic of broadcast.
message (class) – Message to be sent.
-
connect(peers_address_dict: Dict[str, Dict[str, str]])[source]¶ Build a connection with all peers in peers socket address.
Set up the unicast sender which is
zmq.PUSHsocket and the broadcast sender which iszmq.PUBsocket.- Parameters
peers_address_dict (Dict[str, Dict[str, str]]) – Peers’ socket address dict. The key of dict is the peer’s name, while the value of dict is the peer’s socket connection address. E.g. Dict{‘peer1’, Dict[zmq.PULL, ‘tcp://0.0.0.0:1234’]}.
-
disconnect(peers_address_dict: Dict[str, Dict[str, str]])[source]¶ Disconnect with all peers in peers socket address.
Disconnect and delete the unicast sender which is
zmq.PUSHsocket for the peers in dict.- Parameters
peers_address_dict (Dict[str, Dict[str, str]]) – Peers’ socket address dict. The key of dict is the peer’s name, while the value of dict is the peer’s socket connection address. E.g. Dict{‘peer1’, Dict[zmq.PULL, ‘tcp://0.0.0.0:1234’]}.
-
receive(is_continuous: bool = True, timeout: int = None)[source]¶ Receive message from
zmq.POLLER.- Parameters
is_continuous (bool) – Continuously receive message or not. Defaults to True.
- Yields
recv_message (Message) – The received message from the poller.
-
send(message: maro.communication.message.Message)[source]¶ Send message.
- Parameters
message (class) – Message to be sent.
maro.communication.dist_decorator¶
maro.communication.message¶
-
class
maro.communication.message.Message(tag: Union[str, enum.Enum], source: str, destination: str, payload=None)[source]¶ Bases:
objectGeneral Message for hosting payload between receiver and sender.
- Parameters
tag (str|Enum) – Message tag, which is customized by the user, for specific application logic.
source (str) – The sender of message.
destination (str) – The receiver of message.
payload (object) – Message payload, such as model parameters, experiences, etc. Defaults to None.
session_id (str) – Message belonged session id, it will be generated automatically by default, you can use it to group message based on your application logic.
-
class
maro.communication.message.NotificationSessionStage(value)[source]¶ Bases:
enum.EnumNotification session stages.
REQUEST: Notification session stage 1.RECEIVE: Notification session stage 2.
-
RECEIVE= 'notification_receive'¶
-
REQUEST= 'notification_request'¶
-
class
maro.communication.message.SessionMessage(tag: Union[str, enum.Enum], source: str, destination: str, payload=None, session_type: maro.communication.message.SessionType = <SessionType.TASK: 'task'>, session_stage=None)[source]¶ Bases:
maro.communication.message.MessageThe session message class.
It is used by a specific session, which will contain session stage to support more complex application logic.
- Parameters
session_type (Enum) – It indicates the current session type.
session_stage (Enum) – It indicates the current session stage.
maro.communication.proxy¶
-
class
maro.communication.proxy.Proxy(group_name: str, component_type: str, expected_peers: dict, driver_type: maro.communication.driver.driver_type.DriverType = <DriverType.ZMQ: 'zmq_driver'>, driver_parameters: dict = None, redis_address: Tuple = ('localhost', 6379), max_retries: int = 10, retry_interval_base_value: float = 0.1, log_enable: bool = True, enable_rejoin: bool = False, minimal_peers: Union[int, dict] = 1, peers_catch_lifetime: int = 10, enable_message_cache_for_rejoin: bool = False, max_length_for_message_cache: int = 1024, timeout_for_minimal_peer_number: int = 300, is_remove_failed_container: bool = False, max_rejoin_times: int = 5)[source]¶ Bases:
objectThe communication module is responsible for receiving and sending messages.
There are three ways of sending messages:
send,scatter, andbroadcast. Also, there are two ways to receive messages from other peers:receiveandreceive_by_id.- Parameters
group_name (str) – Identifier for the group of all distributed components.
component_type (str) – Component’s type in the current group.
expected_peers (Dict) – Dict of peers’ information which contains peer type and expected number. E.g. Dict[‘learner’: 1, ‘actor’: 2]
driver_type (Enum) – A type of communication driver class uses to communicate with other components. Defaults to
DriverType.ZMQ.driver_parameters (Dict) – The arguments for communication driver class initial. Defaults to None.
redis_address (Tuple) – Hostname and port of the Redis server. Defaults to (“localhost”, 6379).
max_retries (int) – Maximum number of retries before raising an exception. Defaults to 5.
retry_interval_base_value (float) – The time interval between attempts. Defaults to 0.1.
log_enable (bool) – Open internal logger or not. Defaults to True.
enable_rejoin (bool) – Allow peers rejoin or not. Defaults to False, and must use with maro cli.
minimal_peers Union[int, dict] – The minimal number of peers for each peer type.
peers_catch_lifetime (int) – The lifetime for onboard peers’ information.
enable_message_cache_for_rejoin (bool) – Enable message cache for failed peers or not. Default to False.
max_length_for_message_cache (int) – The maximal number of cached messages.
timeout_for_minimal_peer_number (int) – The timeout of waiting enough alive peers.
is_remove_failed_container (bool) – Enable remove failed containers automatically or not. Default to False.
max_rejoin_times (int) – The maximal retry times for one peer rejoins.
-
broadcast(component_type: str, tag: Union[str, enum.Enum], session_type: maro.communication.message.SessionType, payload=None, timeout: int = None) → List[maro.communication.message.Message][source]¶ Broadcast message to all peers, and return all replied messages.
- Parameters
component_type (str) – Broadcast to all peers in this type.
tag (str|Enum) – Message’s tag.
session_type (Enum) – Message’s session type.
payload (object) – The true data. Defaults to None.
- Returns
List of replied messages.
- Return type
List[Message]
-
property
component_type¶ Component’s type in the current group.
- Type
str
-
forward(message: Union[maro.communication.message.SessionMessage, maro.communication.message.Message], destination: str, tag: Union[str, enum.Enum] = None, payload=None) → List[str][source]¶ Forward a received message.
- Parameters
message (Message) – The message need to forward.
destination (str) – The receiver of message.
tag (str|Enum) – New message tag, if None, keeps the original message’s tag. Defaults to None.
payload (object) – Message payload, if None, keeps the original message’s payload. Defaults to None.
- Returns
Message belonged session id.
- Return type
List[str]
-
get_peer_type(peer_name: str) → str[source]¶ Get peer type from given peer name.
- Parameters
peer_name (str) – The component name of a peer, which form by peer_type and UUID.
- Returns
The component type of a peer in current group.
- Return type
str
-
property
group_name¶ Identifier for the group of all communication components.
- Type
str
-
ibroadcast(component_type: str, tag: Union[str, enum.Enum], session_type: maro.communication.message.SessionType, payload=None) → List[str][source]¶ Broadcast message to all subscribers, and return list of message’s session id.
- Parameters
component_type (str) – Broadcast to all peers in this type.
tag (str|Enum) – Message’s tag.
session_type (Enum) – Message’s session type.
payload (object) – The true data. Defaults to None.
- Returns
List of message’s session id which related to the replied message.
- Return type
List[str]
-
iscatter(tag: Union[str, enum.Enum], session_type: maro.communication.message.SessionType, destination_payload_list: list) → List[str][source]¶ Scatters a list of data to peers, and return list of message id.
- Parameters
tag (str|Enum) – Message’s tag.
session_type (Enum) – Message’s session type.
destination_payload_list ([Tuple(str, object)]) – The destination-payload list. The first item of the tuple in list is the message’s destination, and the second item of the tuple in list is the message’s payload.
- Returns
List of message’s session id.
- Return type
List[str]
-
isend(message: maro.communication.message.Message) → Optional[List[str]][source]¶ Send a message to a remote peer.
- Parameters
message – Message to be sent.
- Returns
- The list of message’s session id;
If enable rejoin, it will return None when sending message to the failed peers. If enable rejoin and message cache, it may return list of session id which from the pending messages.
- Return type
Union[List[str], None]
-
property
name¶ Unique identifier in the current group.
- Type
str
-
property
peers_name¶ The
Dictof all connected peers’ names, stored by peer type.- Type
Dict
-
receive(is_continuous: bool = True, timeout: int = None)[source]¶ Receive messages from communication driver.
- Parameters
is_continuous (bool) – Continuously receive message or not. Defaults to True.
-
receive_by_id(targets: List[str], timeout: int = None) → List[maro.communication.message.Message][source]¶ Receive target messages from communication driver.
- Parameters
targets List[str] – List of
session_id. E.g. [‘0_learner0_actor0’, ‘1_learner1_actor1’, …].- Returns
List of received messages.
- Return type
List[Message]
-
reply(message: Union[maro.communication.message.SessionMessage, maro.communication.message.Message], tag: Union[str, enum.Enum] = None, payload=None, ack_reply: bool = False) → List[str][source]¶ Reply a received message.
- Parameters
message (Message) – The message need to reply.
tag (str|Enum) – New message tag, if None, keeps the original message’s tag. Defaults to None.
payload (object) – New message payload, if None, keeps the original message’s payload. Defaults to None.
ack_reply (bool) – If True, it is acknowledge reply. Defaults to False.
- Returns
Message belonged session id.
- Return type
List[str]
-
scatter(tag: Union[str, enum.Enum], session_type: maro.communication.message.SessionType, destination_payload_list: list, timeout: int = - 1) → List[maro.communication.message.Message][source]¶ Scatters a list of data to peers, and return replied messages.
- Parameters
tag (str|Enum) – Message’s tag.
session_type (Enum) – Message’s session type.
destination_payload_list ([Tuple(str, object)]) – The destination-payload list. The first item of the tuple in list is the message destination, and the second item of the tuple in list is the message payload.
- Returns
List of replied message.
- Return type
List[Message]
-
send(message: maro.communication.message.Message, timeout: int = None) → Optional[List[maro.communication.message.Message]][source]¶ Send a message to a remote peer.
- Parameters
message – Message to be sent.
- Returns
- The list of received message;
If enable rejoin, it will return None when sending message to the failed peers. If enable rejoin and message cache, it may return list of messages which from the pending messages.
- Return type
Union[List[Message], None]
maro.communication.registry_table¶
-
class
maro.communication.registry_table.ConditionalEvent(event: Union[str, Tuple], peers_name: Union[property, dict])[source]¶ Bases:
objectThe description of the messages’ combination.
The conditional event can be composed of any number of unit conditional events and end with an Operation. For unit conditional event, It must be three parts and divided by
::The first part of unit event represent the message’s source. E.g.
learneror*.The second part of unit event represent the message’s type. E.g.
experienceor*.The third part of unit event represent how much messages needed. E.g.
1or90%.
Note
Do not use special symbol in the unit event, such as
,,(,).- Parameters
event (str|Tuple) – The description of the requisite messages’ combination. E.g. “actor:rollout:1” or (“learner:rollout:1”, “learner:update:1”, “AND”).
peers_name (dict|property) – The property function which returns the newest peer’s name dict from proxy, or the Dict with the key (peer type) and the value (peer name list).
-
get_qualified_message() → List[maro.communication.message.Message][source]¶ Self-inspection of conditional event, if satisfied, pop qualified messages.
- Returns
List of qualified messages.
- Return type
List[Message]
-
push_message(message: maro.communication.message.Message)[source]¶ Push message to all satisfied unit conditional events.
- Parameters
message (Message) – Received message.
-
class
maro.communication.registry_table.Operation(value)[source]¶ Bases:
enum.EnumThe Enum class of the logic operations.
-
AND= 'AND'¶
-
OR= 'OR'¶
-
-
class
maro.communication.registry_table.RegisterTable(peers_name: Union[property, dict])[source]¶ Bases:
objectThe RegisterTable is responsible for matching
conditional eventsand user-definedmessage handlers.- Parameters
peers_name (dict|property) – The property function which returns the newest peer’s name dict from proxy, or the Dict with the key (peer type) and the value (peers name list).
-
get() → List[Tuple[callable, List[maro.communication.message.Message]]][source]¶ If any
conditional eventhas been satisfied, return the requisite message list and the correlational handler function.- Returns
The list of triggered handler functions and messages. E.g. [(handle_function_1, [messages]), (handle_function_2, [messages])]
- Return type
List[Tuple[callable, List[Message]]]
-
push(message: maro.communication.message.Message, auto_trigger: bool = True)[source]¶ Push a newly received message into the corresponding unit event cache. If some conditional event is satisfied and
auto_triggeris true, the set of messages forming the satisfied conditional event will be processed by the corresponding handler functions.- Parameters
message (Message) – Received message.
auto_trigger (bool) – If true, the set of messages forming the satisfied conditional event will be processed by the corresponding handler functions.
-
register_event_handler(event: Union[str, tuple], handler_fn: callable)[source]¶ Register conditional event in the RegisterTable, and create a dict which match
message handlerandconditional event.- Parameters
event (str|Tuple) – The description of the requisite messages’ combination,
handler_fn (callable) – The user-define function which usually uses to handle incoming messages.
-
class
maro.communication.registry_table.SuffixTree(value=None, nodes=None)[source]¶ Bases:
objectThe suffix tree uses to decompose the conditional event.
For the unit conditional event, the
SuffixTree.valueis the unit conditional event; For the conditional event, theSuffixTree.valueis one of theOperation, and theSuffixTree.nodesis the list of the unit conditional event.- Parameters
value (Operation|str) – Event operation:
Operation.ANDorOperation.OR, or the unit conditional event.nodes List[SuffixTree] – List of the SuffixTrees.
Others¶
maro.utils.utils¶
-
class
maro.utils.utils.DottableDict(*args, **kwargs)[source]¶ Bases:
dictA wrapper to dictionary to make possible to key as property.
-
maro.utils.utils.check_deployment_status() → bool[source]¶ Check if re-deployment is needed.
- Returns
If need to re-deploy then True, else False.
- Return type
bool
-
maro.utils.utils.clone(obj: any) → any[source]¶ Clone an object with pickle dumps and loads.
- Parameters
obj (any) – The object to clone.
- Returns
The clone of the object.
- Return type
any
-
maro.utils.utils.convert_dottable(natural_dict: dict) → maro.utils.utils.DottableDict[source]¶ Convert a dictionary to DottableDict.
- Parameters
natural_dict (dict) – Dictionary to convert to DottableDict.
- Returns
Dottable object.
- Return type