Logger

maro.utils.logger

class maro.utils.logger.CliLogger(name)[source]

Bases: object

An internal logger for CLI logging.

It maintains a singleton logger in a CLI command lifecycle. The logger is inited at the very beginning, and use different logging formats based on the --debug argument.

debug(message: str) → None[source]

logger.debug() with passive init.

Parameters

message (str) – logged message.

debug_green(message: str) → None[source]

logger.debug() with color green and passive init.

Parameters

message (str) – logged message.

debug_yellow(message: str) → None[source]

logger.debug() with color yellow and passive init.

Parameters

message (str) – logged message.

error(message: str) → None[source]

logger.error() with passive init.

Parameters

message (str) – logged message.

error_red(message: str) → None[source]

logger.error() with color red and passive init.

Parameters

message (str) – logged message.

info(message: str) → None[source]

logger.info() with passive init.

Parameters

message (str) – logged message.

info_green(message: str) → None[source]

logger.info() with color green and passive init.

Parameters

message (str) – logged message.

passive_init() → None[source]

Init a new CliLogger if current logger is not matched with the parameters.

warning(message: str) → None[source]

logger.warning() with passive init.

Parameters

message (str) – logged message.

warning_yellow(message: str) → None[source]

logger.warning() with color yellow and passive init.

Parameters

message (str) – logged message.

class maro.utils.logger.DummyLogger[source]

Bases: object

A dummy Logger, which is used when disabling logs.

critical(msg, *args)[source]
debug(msg, *args)[source]
error(msg, *args)[source]
info(msg, *args)[source]
warn(msg, *args)[source]
class maro.utils.logger.InternalLogger(component_name: str, tag: str = 'maro_internal', format_: maro.utils.logger.LogFormat = <LogFormat.internal: 3>, dump_folder: str = None, dump_mode: str = 'a', extension_name: str = 'log', auto_timestamp: bool = False)[source]

Bases: maro.utils.logger.Logger

An internal logger uses for recording the internal system’s log.

class maro.utils.logger.LogFormat(value)[source]

Bases: enum.Enum

The Enum class of the log format.

Example

  • LogFormat.full: full time | host | user | pid | tag | level | msg

  • LogFormat.simple: simple time | tag | level | msg

  • LogFormat.cli_debug : simple time | level | msg

  • LogFormat.cli_info (file): simple time | level | msg

  • LogFormat.cli_info (stdout): msg

cli_debug = 4
cli_info = 5
full = 1
internal = 3
none = 6
simple = 2
class maro.utils.logger.Logger(tag: str, format_: maro.utils.logger.LogFormat = <LogFormat.simple: 2>, dump_folder: str = '', dump_mode: str = 'w', extension_name: str = 'log', auto_timestamp: bool = False, stdout_level='INFO')[source]

Bases: object

A simple wrapper for logging.

The Logger hosts a file handler and a stdout handler. The file handler is set to DEBUG level and will dump all the logging info to the given dump_folder. The logging level of the stdout handler is decided by the stdout_level, and can be redirected by setting the environment variable LOG_LEVEL. Supported LOG_LEVEL includes: DEBUG, INFO, WARN, ERROR, CRITICAL, PROCESS.

Example

$ export LOG_LEVEL=INFO

Parameters
  • tag (str) – Log tag for stream and file output.

  • format_ (LogFormat) – Predefined formatter. Defaults to LogFormat.full.

  • dump_folder (str) – Log dumped folder. Defaults to the current folder. The dumped log level is logging.DEBUG. The full path of the dumped log file is dump_folder/tag.log.

  • dump_mode (str) – Write log file mode. Defaults to w. Use a to append log.

  • extension_name (str) – Final dumped file extension name. Defaults to log.

  • auto_timestamp (bool) – Add a timestamp to the dumped log file name or not. E.g: tag.1574953673.137387.log.

  • stdout_level (str) – the logging level of the stdout handler. Defaults to DEBUG.

critical(msg, *args)[source]
debug(msg, *args)[source]
error(msg, *args)[source]
info(msg, *args)[source]
warn(msg, *args)[source]
maro.utils.logger.msgformat(logfunc)[source]

The decorator used to construct the log msg.

Exceptions

maro.utils.exception.backends_exception

exception maro.utils.exception.backends_exception.BackendsAccessDeletedNodeException[source]

Bases: maro.utils.exception.base_exception.MAROException

Exception when try to access a deleted node.

exception maro.utils.exception.backends_exception.BackendsAppendToNonListAttributeException[source]

Bases: maro.utils.exception.base_exception.MAROException

Exception when append value to a non list attribute.

exception maro.utils.exception.backends_exception.BackendsArrayAttributeAccessException[source]

Bases: maro.utils.exception.base_exception.MAROException

Exception when access attribute that slot number greater than 1.

This exception is caused when using invalid slice interface to access slots.

exception maro.utils.exception.backends_exception.BackendsClearNonListAttributeException[source]

Bases: maro.utils.exception.base_exception.MAROException

Exception when try to clear a non list attribute.

exception maro.utils.exception.backends_exception.BackendsGetItemInvalidException[source]

Bases: maro.utils.exception.base_exception.MAROException

Exception if the parameters is invalid when getting item from backend.

Usually this exception is caused by invalid node or attribute index.

exception maro.utils.exception.backends_exception.BackendsInsertNonListAttributeException[source]

Bases: maro.utils.exception.base_exception.MAROException

Exception when try to insert a value to non list attribute.

exception maro.utils.exception.backends_exception.BackendsInvalidAttributeException[source]

Bases: maro.utils.exception.base_exception.MAROException

Exception when try to access a not exist attribute type.

exception maro.utils.exception.backends_exception.BackendsInvalidNodeException[source]

Bases: maro.utils.exception.base_exception.MAROException

Exception when try to access a not exist node type.

exception maro.utils.exception.backends_exception.BackendsRemoveFromNonListAttributeException[source]

Bases: maro.utils.exception.base_exception.MAROException

Exception when try to from a value to non list attribute.

exception maro.utils.exception.backends_exception.BackendsResizeNonListAttributeException[source]

Bases: maro.utils.exception.base_exception.MAROException

Exception when try to resize a non list attribute.

exception maro.utils.exception.backends_exception.BackendsSetItemInvalidException[source]

Bases: maro.utils.exception.base_exception.MAROException

Exception if the parameter is invalid when setting item from backend.

Usually this exception is caused by invalid node or attribute index.

maro.utils.exception.base_exception

exception maro.utils.exception.base_exception.MAROException(error_code: int = 1000, msg: str = None)[source]

Bases: Exception

The base exception class for MARO.

Parameters
  • error_code (int) – the predefined MARO error code. You can find the detailed definition in: maro.utils.exception.error_code.py.

  • msg (str) – Description of the error. Defaults to None, which will show the base error information.

maro.utils.exception.cli_exception

exception maro.utils.exception.cli_exception.BadRequestError(message: str = None)[source]

Bases: maro.utils.exception.cli_exception.UserFault

Bad request from client.

exception maro.utils.exception.cli_exception.CliError(message: str = None, error_code: int = 3000)[source]

Bases: maro.utils.exception.base_exception.MAROException

Base class for all MARO CLI errors.

get_message() → str[source]

Get the error message of the Exception.

Returns

Error message.

Return type

str

exception maro.utils.exception.cli_exception.CliInternalError(message: str = None)[source]

Bases: maro.utils.exception.cli_exception.ClientError

MARO CLI internal error.

exception maro.utils.exception.cli_exception.ClientError(message: str = None, error_code: int = 3000)[source]

Bases: maro.utils.exception.cli_exception.CliError

MARO CLI should be responsible for the errors. ErrorCode with 31XX.

exception maro.utils.exception.cli_exception.ClusterInternalError(message: str = None)[source]

Bases: maro.utils.exception.cli_exception.ServiceError

MARO Cluster internal error.

exception maro.utils.exception.cli_exception.CommandError(cli_command: str, message: str = None)[source]

Bases: maro.utils.exception.cli_exception.CliError

Failed execution error of CLI command.

exception maro.utils.exception.cli_exception.CommandExecutionError(message: str = None, command: str = None)[source]

Bases: maro.utils.exception.cli_exception.ServiceError

Subprocess execution error.

get_message() → str[source]

Get the error message of the Exception.

Returns

Error message.

Return type

str

exception maro.utils.exception.cli_exception.CommandNotFoundError(message: str = None, usage: str = '')[source]

Bases: maro.utils.exception.cli_exception.UserFault

Command is misspelled or not recognized by MARO CLI.

exception maro.utils.exception.cli_exception.DeploymentError(message: str = None)[source]

Bases: maro.utils.exception.cli_exception.UserFault

MARO deployment fails.

exception maro.utils.exception.cli_exception.FileOperationError(message: str = None)[source]

Bases: maro.utils.exception.cli_exception.UserFault

For file or directory operation related errors.

exception maro.utils.exception.cli_exception.InvalidDeploymentTemplateError(message: str = None)[source]

Bases: maro.utils.exception.cli_exception.UserFault

MARO deployment template validation fails.

exception maro.utils.exception.cli_exception.ProcessInternalError(message: str = None)[source]

Bases: maro.utils.exception.cli_exception.UserFault

Errors in MARO CLI process mode.

exception maro.utils.exception.cli_exception.ServiceError(message: str = None, error_code: int = 3000)[source]

Bases: maro.utils.exception.cli_exception.CliError

MARO Services should be responsible for the errors. ErrorCode with 32XX.

exception maro.utils.exception.cli_exception.UserFault(message: str = None, error_code: int = 3000)[source]

Bases: maro.utils.exception.cli_exception.CliError

Users should be responsible for the errors. ErrorCode with 30XX.

maro.utils.exception.communication_exception

exception maro.utils.exception.communication_exception.ConditionalEventSyntaxError(msg: str = None)[source]

Bases: maro.utils.exception.base_exception.MAROException

The syntax error of a conditional event.

exception maro.utils.exception.communication_exception.DriverReceiveError(msg: str = None)[source]

Bases: maro.utils.exception.base_exception.MAROException

Failure to receive message in the driver.

exception maro.utils.exception.communication_exception.DriverSendError(msg: str = None)[source]

Bases: maro.utils.exception.base_exception.MAROException

Failure to send message in the driver.

exception maro.utils.exception.communication_exception.DriverTypeError(msg: str = None)[source]

Bases: maro.utils.exception.base_exception.MAROException

The unrecognized driver type, occurs in the proxy.

exception maro.utils.exception.communication_exception.InformationUncompletedError(msg: str = None)[source]

Bases: maro.utils.exception.base_exception.MAROException

No enough information from the Redis.

exception maro.utils.exception.communication_exception.MessageSessionTypeError(msg: str = None)[source]

Bases: maro.utils.exception.base_exception.MAROException

The unrecognized session type, occurs in the SessionMessage.

exception maro.utils.exception.communication_exception.PeersConnectionError(msg: str = None)[source]

Bases: maro.utils.exception.base_exception.MAROException

Peers connection error, occurs in the driver.

exception maro.utils.exception.communication_exception.PeersDisconnectionError(msg: str = None)[source]

Bases: maro.utils.exception.base_exception.MAROException

Peers disconnection error, occurs in the driver.

exception maro.utils.exception.communication_exception.PeersMissError(msg: str = None)[source]

Bases: maro.utils.exception.base_exception.MAROException

Proxy do not have any expected peers.

exception maro.utils.exception.communication_exception.PeersRejoinTimeout(msg: str = None)[source]

Bases: maro.utils.exception.base_exception.MAROException

Failure to get enough peers during the max waiting time.

exception maro.utils.exception.communication_exception.PendingToSend(msg: str = None)[source]

Bases: maro.utils.exception.base_exception.MAROException

Temporary failure to send message, try to rejoin.

exception maro.utils.exception.communication_exception.RedisConnectionError(msg: str = None)[source]

Bases: maro.utils.exception.base_exception.MAROException

Failure to connect to redis, occurs in the proxy.

exception maro.utils.exception.communication_exception.SocketTypeError(msg: str = None)[source]

Bases: maro.utils.exception.base_exception.MAROException

The unrecognized socket type, occurs in the driver.

maro.utils.exception.data_lib_exeption

exception maro.utils.exception.data_lib_exeption.CimGeneratorInvalidParkingDuration[source]

Bases: maro.utils.exception.base_exception.MAROException

Exception when parking duration is less than 0 in CIM topology file.

exception maro.utils.exception.data_lib_exeption.MetaTimestampNotExist[source]

Bases: maro.utils.exception.base_exception.MAROException

Exception that there is no timestamp specified in meta file, as this is required field.

maro.utils.exception.error_code

maro.utils.exception.rl_toolkit_exception

exception maro.utils.exception.rl_toolkit_exception.MissingOptimizer(msg: str = None)[source]

Bases: maro.utils.exception.base_exception.MAROException

Raised when the optimizers are missing when calling CoreModel’s step() method.

exception maro.utils.exception.rl_toolkit_exception.StoreMisalignment(msg: str = None)[source]

Bases: maro.utils.exception.base_exception.MAROException

Raised when a put operation on a SimpleStore would cause the underlying lists to have different sizes.

exception maro.utils.exception.rl_toolkit_exception.UnrecognizedTask(msg: str = None)[source]

Bases: maro.utils.exception.base_exception.MAROException

Raised when a CoreModel has task names that are not unrecognized by an algorithm.

maro.utils.exception.simulator_exception

exception maro.utils.exception.simulator_exception.BusinessEngineNotFoundError[source]

Bases: maro.utils.exception.base_exception.MAROException

Exception then the simulator cannot find specified business engine module.

Communication

maro.communication.driver.abs_driver

class maro.communication.driver.abs_driver.AbsDriver[source]

Bases: abc.ABC

Abstract class of the communication driver.

abstract property address

The socket’s address. Based on the real socket driver, it usually be a Dict.

Type

Dict

abstract broadcast(component_type, message)[source]

Broadcast send message.

abstract close()[source]

Close all sockets.

abstract connect(peers_address)[source]

Build the connection with other peers which is given by the peer address.

Parameters

peers_address – The store of peers’ socket address. Based on the real socket driver, the peers’ socket address usually be a Dict.

abstract receive()[source]

Receive message.

abstract send(message)[source]

Unicast send message.

maro.communication.driver.driver_type

class maro.communication.driver.driver_type.DriverType(value)[source]

Bases: enum.Enum

Communication driver categories.

  • ZMQ: The communication driver mode based on ZMQ.

ZMQ = 'zmq_driver'

maro.communication.driver.zmq_driver

class maro.communication.driver.zmq_driver.ZmqDriver(component_type: str, protocol: str = 'tcp', send_timeout: int = -1, receive_timeout: int = -1, logger=<maro.utils.logger.DummyLogger object>)[source]

Bases: maro.communication.driver.abs_driver.AbsDriver

The communication driver based on ZMQ.

Parameters
  • component_type (str) – Component type in the current group.

  • protocol (str) – The underlying transport-layer protocol for transferring messages. Defaults to tcp.

  • send_timeout (int) – The timeout in milliseconds for sending message. If -1, no timeout (infinite). Defaults to -1.

  • receive_timeout (int) – The timeout in milliseconds for receiving message. If -1, no timeout (infinite). Defaults to -1.

  • logger – The logger instance or DummyLogger. Defaults to DummyLogger().

property address
Returns

The sockets’ address Dict of zmq.PULL socket and zmq.SUB socket. The key of dict is the socket’s type, while the value of dict is socket’s ip address, which forms by protocol+ip+port.

Return type

Dict[int, str]

Example

Dict{zmq.PULL: “tcp://0.0.0.0:1234”, zmq.SUB: “tcp://0.0.0.0:1235”}

broadcast(topic: str, message: maro.communication.message.Message)[source]

Broadcast message.

Parameters
  • topic (str) – The topic of broadcast.

  • message (class) – Message to be sent.

close()[source]

Close ZMQ context and sockets.

connect(peers_address_dict: Dict[str, Dict[str, str]])[source]

Build a connection with all peers in peers socket address.

Set up the unicast sender which is zmq.PUSH socket and the broadcast sender which is zmq.PUB socket.

Parameters

peers_address_dict (Dict[str, Dict[str, str]]) – Peers’ socket address dict. The key of dict is the peer’s name, while the value of dict is the peer’s socket connection address. E.g. Dict{‘peer1’, Dict[zmq.PULL, ‘tcp://0.0.0.0:1234’]}.

disconnect(peers_address_dict: Dict[str, Dict[str, str]])[source]

Disconnect with all peers in peers socket address.

Disconnect and delete the unicast sender which is zmq.PUSH socket for the peers in dict.

Parameters

peers_address_dict (Dict[str, Dict[str, str]]) – Peers’ socket address dict. The key of dict is the peer’s name, while the value of dict is the peer’s socket connection address. E.g. Dict{‘peer1’, Dict[zmq.PULL, ‘tcp://0.0.0.0:1234’]}.

receive(is_continuous: bool = True, timeout: int = None)[source]

Receive message from zmq.POLLER.

Parameters

is_continuous (bool) – Continuously receive message or not. Defaults to True.

Yields

recv_message (Message) – The received message from the poller.

send(message: maro.communication.message.Message)[source]

Send message.

Parameters

message (class) – Message to be sent.

maro.communication.dist_decorator

maro.communication.dist_decorator.dist(proxy: maro.communication.proxy.Proxy, handler_dict: {<class 'object'>: typing.Callable})[source]

A decorator used to inject a communication module and message handlers to a local class so that it can be run in distributed mode.

maro.communication.message

class maro.communication.message.Message(tag: Union[str, enum.Enum], source: str, destination: str, payload=None)[source]

Bases: object

General Message for hosting payload between receiver and sender.

Parameters
  • tag (str|Enum) – Message tag, which is customized by the user, for specific application logic.

  • source (str) – The sender of message.

  • destination (str) – The receiver of message.

  • payload (object) – Message payload, such as model parameters, experiences, etc. Defaults to None.

  • session_id (str) – Message belonged session id, it will be generated automatically by default, you can use it to group message based on your application logic.

forward(destination: str, tag: Union[str, enum.Enum] = None, payload=None)[source]
reply(tag: Union[str, enum.Enum] = None, payload=None)[source]
class maro.communication.message.NotificationSessionStage(value)[source]

Bases: enum.Enum

Notification session stages.

  • REQUEST: Notification session stage 1.

  • RECEIVE: Notification session stage 2.

RECEIVE = 'notification_receive'
REQUEST = 'notification_request'
class maro.communication.message.SessionMessage(tag: Union[str, enum.Enum], source: str, destination: str, payload=None, session_type: maro.communication.message.SessionType = <SessionType.TASK: 'task'>, session_stage=None)[source]

Bases: maro.communication.message.Message

The session message class.

It is used by a specific session, which will contain session stage to support more complex application logic.

Parameters
  • session_type (Enum) – It indicates the current session type.

  • session_stage (Enum) – It indicates the current session stage.

class maro.communication.message.SessionType(value)[source]

Bases: enum.Enum

Communication session categories.

  • TASK: Task session is used to trigger remote job(s).

  • NOTIFICATION: Notification session is used to sync information to peers.

NOTIFICATION = 'notification'
TASK = 'task'
class maro.communication.message.TaskSessionStage(value)[source]

Bases: enum.Enum

Task session stages.

  • REQUEST: Task session stage 1.

  • RECEIVE: Task session stage 2.

  • COMPLETE: Task session stage 3.

COMPLETE = 'task_complete'
RECEIVE = 'task_receive'
REQUEST = 'task_request'

maro.communication.proxy

class maro.communication.proxy.Proxy(group_name: str, component_type: str, expected_peers: dict, driver_type: maro.communication.driver.driver_type.DriverType = <DriverType.ZMQ: 'zmq_driver'>, driver_parameters: dict = None, redis_address: Tuple = ('localhost', 6379), max_retries: int = 10, retry_interval_base_value: float = 0.1, log_enable: bool = True, enable_rejoin: bool = False, minimal_peers: Union[int, dict] = 1, peers_catch_lifetime: int = 10, enable_message_cache_for_rejoin: bool = False, max_length_for_message_cache: int = 1024, timeout_for_minimal_peer_number: int = 300, is_remove_failed_container: bool = False, max_rejoin_times: int = 5)[source]

Bases: object

The communication module is responsible for receiving and sending messages.

There are three ways of sending messages: send, scatter, and broadcast. Also, there are two ways to receive messages from other peers: receive and receive_by_id.

Parameters
  • group_name (str) – Identifier for the group of all distributed components.

  • component_type (str) – Component’s type in the current group.

  • expected_peers (Dict) – Dict of peers’ information which contains peer type and expected number. E.g. Dict[‘learner’: 1, ‘actor’: 2]

  • driver_type (Enum) – A type of communication driver class uses to communicate with other components. Defaults to DriverType.ZMQ.

  • driver_parameters (Dict) – The arguments for communication driver class initial. Defaults to None.

  • redis_address (Tuple) – Hostname and port of the Redis server. Defaults to (“localhost”, 6379).

  • max_retries (int) – Maximum number of retries before raising an exception. Defaults to 5.

  • retry_interval_base_value (float) – The time interval between attempts. Defaults to 0.1.

  • log_enable (bool) – Open internal logger or not. Defaults to True.

  • enable_rejoin (bool) – Allow peers rejoin or not. Defaults to False, and must use with maro cli.

  • minimal_peers Union[int, dict] – The minimal number of peers for each peer type.

  • peers_catch_lifetime (int) – The lifetime for onboard peers’ information.

  • enable_message_cache_for_rejoin (bool) – Enable message cache for failed peers or not. Default to False.

  • max_length_for_message_cache (int) – The maximal number of cached messages.

  • timeout_for_minimal_peer_number (int) – The timeout of waiting enough alive peers.

  • is_remove_failed_container (bool) – Enable remove failed containers automatically or not. Default to False.

  • max_rejoin_times (int) – The maximal retry times for one peer rejoins.

broadcast(component_type: str, tag: Union[str, enum.Enum], session_type: maro.communication.message.SessionType, payload=None, timeout: int = None) → List[maro.communication.message.Message][source]

Broadcast message to all peers, and return all replied messages.

Parameters
  • component_type (str) – Broadcast to all peers in this type.

  • tag (str|Enum) – Message’s tag.

  • session_type (Enum) – Message’s session type.

  • payload (object) – The true data. Defaults to None.

Returns

List of replied messages.

Return type

List[Message]

property component_type

Component’s type in the current group.

Type

str

forward(message: Union[maro.communication.message.SessionMessage, maro.communication.message.Message], destination: str, tag: Union[str, enum.Enum] = None, payload=None) → List[str][source]

Forward a received message.

Parameters
  • message (Message) – The message need to forward.

  • destination (str) – The receiver of message.

  • tag (str|Enum) – New message tag, if None, keeps the original message’s tag. Defaults to None.

  • payload (object) – Message payload, if None, keeps the original message’s payload. Defaults to None.

Returns

Message belonged session id.

Return type

List[str]

get_peer_type(peer_name: str) → str[source]

Get peer type from given peer name.

Parameters

peer_name (str) – The component name of a peer, which form by peer_type and UUID.

Returns

The component type of a peer in current group.

Return type

str

property group_name

Identifier for the group of all communication components.

Type

str

ibroadcast(component_type: str, tag: Union[str, enum.Enum], session_type: maro.communication.message.SessionType, payload=None) → List[str][source]

Broadcast message to all subscribers, and return list of message’s session id.

Parameters
  • component_type (str) – Broadcast to all peers in this type.

  • tag (str|Enum) – Message’s tag.

  • session_type (Enum) – Message’s session type.

  • payload (object) – The true data. Defaults to None.

Returns

List of message’s session id which related to the replied message.

Return type

List[str]

iscatter(tag: Union[str, enum.Enum], session_type: maro.communication.message.SessionType, destination_payload_list: list) → List[str][source]

Scatters a list of data to peers, and return list of message id.

Parameters
  • tag (str|Enum) – Message’s tag.

  • session_type (Enum) – Message’s session type.

  • destination_payload_list ([Tuple(str, object)]) – The destination-payload list. The first item of the tuple in list is the message’s destination, and the second item of the tuple in list is the message’s payload.

Returns

List of message’s session id.

Return type

List[str]

isend(message: maro.communication.message.Message) → Optional[List[str]][source]

Send a message to a remote peer.

Parameters

message – Message to be sent.

Returns

The list of message’s session id;

If enable rejoin, it will return None when sending message to the failed peers. If enable rejoin and message cache, it may return list of session id which from the pending messages.

Return type

Union[List[str], None]

property name

Unique identifier in the current group.

Type

str

property peers_name

The Dict of all connected peers’ names, stored by peer type.

Type

Dict

receive(is_continuous: bool = True, timeout: int = None)[source]

Receive messages from communication driver.

Parameters

is_continuous (bool) – Continuously receive message or not. Defaults to True.

receive_by_id(targets: List[str], timeout: int = None) → List[maro.communication.message.Message][source]

Receive target messages from communication driver.

Parameters

targets List[str] – List of session_id. E.g. [‘0_learner0_actor0’, ‘1_learner1_actor1’, …].

Returns

List of received messages.

Return type

List[Message]

reply(message: Union[maro.communication.message.SessionMessage, maro.communication.message.Message], tag: Union[str, enum.Enum] = None, payload=None, ack_reply: bool = False) → List[str][source]

Reply a received message.

Parameters
  • message (Message) – The message need to reply.

  • tag (str|Enum) – New message tag, if None, keeps the original message’s tag. Defaults to None.

  • payload (object) – New message payload, if None, keeps the original message’s payload. Defaults to None.

  • ack_reply (bool) – If True, it is acknowledge reply. Defaults to False.

Returns

Message belonged session id.

Return type

List[str]

scatter(tag: Union[str, enum.Enum], session_type: maro.communication.message.SessionType, destination_payload_list: list, timeout: int = - 1) → List[maro.communication.message.Message][source]

Scatters a list of data to peers, and return replied messages.

Parameters
  • tag (str|Enum) – Message’s tag.

  • session_type (Enum) – Message’s session type.

  • destination_payload_list ([Tuple(str, object)]) – The destination-payload list. The first item of the tuple in list is the message destination, and the second item of the tuple in list is the message payload.

Returns

List of replied message.

Return type

List[Message]

send(message: maro.communication.message.Message, timeout: int = None) → Optional[List[maro.communication.message.Message]][source]

Send a message to a remote peer.

Parameters

message – Message to be sent.

Returns

The list of received message;

If enable rejoin, it will return None when sending message to the failed peers. If enable rejoin and message cache, it may return list of messages which from the pending messages.

Return type

Union[List[Message], None]

maro.communication.registry_table

class maro.communication.registry_table.ConditionalEvent(event: Union[str, Tuple], peers_name: Union[property, dict])[source]

Bases: object

The description of the messages’ combination.

The conditional event can be composed of any number of unit conditional events and end with an Operation. For unit conditional event, It must be three parts and divided by ::

  • The first part of unit event represent the message’s source. E.g. learner or *.

  • The second part of unit event represent the message’s type. E.g. experience or *.

  • The third part of unit event represent how much messages needed. E.g. 1 or 90%.

Note

Do not use special symbol in the unit event, such as ,, (, ).

Parameters
  • event (str|Tuple) – The description of the requisite messages’ combination. E.g. “actor:rollout:1” or (“learner:rollout:1”, “learner:update:1”, “AND”).

  • peers_name (dict|property) – The property function which returns the newest peer’s name dict from proxy, or the Dict with the key (peer type) and the value (peer name list).

clear()[source]

Clear all messages from cache.

get_qualified_message() → List[maro.communication.message.Message][source]

Self-inspection of conditional event, if satisfied, pop qualified messages.

Returns

List of qualified messages.

Return type

List[Message]

push_message(message: maro.communication.message.Message)[source]

Push message to all satisfied unit conditional events.

Parameters

message (Message) – Received message.

class maro.communication.registry_table.Operation(value)[source]

Bases: enum.Enum

The Enum class of the logic operations.

AND = 'AND'
OR = 'OR'
class maro.communication.registry_table.RegisterTable(peers_name: Union[property, dict])[source]

Bases: object

The RegisterTable is responsible for matching conditional events and user-defined message handlers.

Parameters

peers_name (dict|property) – The property function which returns the newest peer’s name dict from proxy, or the Dict with the key (peer type) and the value (peers name list).

clear()[source]

Clear all messages from conditional event caches.

get() → List[Tuple[callable, List[maro.communication.message.Message]]][source]

If any conditional event has been satisfied, return the requisite message list and the correlational handler function.

Returns

The list of triggered handler functions and messages. E.g. [(handle_function_1, [messages]), (handle_function_2, [messages])]

Return type

List[Tuple[callable, List[Message]]]

push(message: maro.communication.message.Message, auto_trigger: bool = True)[source]

Push a newly received message into the corresponding unit event cache. If some conditional event is satisfied and auto_trigger is true, the set of messages forming the satisfied conditional event will be processed by the corresponding handler functions.

Parameters
  • message (Message) – Received message.

  • auto_trigger (bool) – If true, the set of messages forming the satisfied conditional event will be processed by the corresponding handler functions.

register_event_handler(event: Union[str, tuple], handler_fn: callable)[source]

Register conditional event in the RegisterTable, and create a dict which match message handler and conditional event.

Parameters
  • event (str|Tuple) – The description of the requisite messages’ combination,

  • handler_fn (callable) – The user-define function which usually uses to handle incoming messages.

class maro.communication.registry_table.SuffixTree(value=None, nodes=None)[source]

Bases: object

The suffix tree uses to decompose the conditional event.

For the unit conditional event, the SuffixTree.value is the unit conditional event; For the conditional event, the SuffixTree.value is one of the Operation, and the SuffixTree.nodes is the list of the unit conditional event.

Parameters
  • value (Operation|str) – Event operation: Operation.AND or Operation.OR, or the unit conditional event.

  • nodes List[SuffixTree] – List of the SuffixTrees.

Others

maro.utils.utils

class maro.utils.utils.DottableDict(*args, **kwargs)[source]

Bases: dict

A wrapper to dictionary to make possible to key as property.

maro.utils.utils.check_deployment_status() → bool[source]

Check if re-deployment is needed.

Returns

If need to re-deploy then True, else False.

Return type

bool

maro.utils.utils.clone(obj: any) → any[source]

Clone an object with pickle dumps and loads.

Parameters

obj (any) – The object to clone.

Returns

The clone of the object.

Return type

any

maro.utils.utils.convert_dottable(natural_dict: dict)maro.utils.utils.DottableDict[source]

Convert a dictionary to DottableDict.

Parameters

natural_dict (dict) – Dictionary to convert to DottableDict.

Returns

Dottable object.

Return type

DottableDict

maro.utils.utils.deploy(hide_info: bool = True)[source]

Deploy the meta files, and lib files of MARO to ~/.maro.

Parameters

hide_info (bool) – (optional) If True, will not display any information,

else info and error logs will be shown.

maro.utils.utils.set_seeds(seed)[source]

Set the seeds of Torch, np.random and random.

Parameters

seed – Value of the seed.