Just like agents, our services also communicate through signals. This allows for a highly decoupled and performant architecture and enables service-to-service as well as service-to-agent signal exchange.
Basic structure of every signal consists of 4 key fields:
"signalType": "<signal-type>", // eg. "MESSAGE", "EVENT", "REPLY"
... // other fields depending on the signal type
All these fields are generated and filled in automatically for you, and all are meant to be subclassed, allowing you to add additional fields at any time.
Let's just clarify the last two fields a bit: the
from field contains the id of the entity sending the signal, so either service or agent id.
signalType field is especially interesting to look at: here we specify which type of signal we're sending.
In total, there are 5 different signal types:
Here you can read what differentiates them.
Every signal, no matter the type, is serialized to JSON before being sent to the message broker, and deserialized on the receiving side. Of course, the platform does this process automatically, so you don't have to worry about it.
Input topics and addresses
Each service defines its own input topic on which it listens for messages. You can think of topics like queues with unique addresses.
To make the communication easier, most services define their own clients, which automatically build messages in the correct format and send them to the right topic (e.g.
TelegramAdapterAPI). All agents share a common input topic.
Every service and agent has an address on which it receives signals. Service's default address is its name, while all agents share one address (
No matter what the type of signal is being sent or emitted, the address stays the same.
Messages are signals that are used when there's a specific agent or service that you want to send the signal to.
Examples of messages:
- agent sends an offer to user
- user sends a new metric input to agent
- agent sends a piece of information to another agent
Compared to the
Signal class, the
Message has a couple extra fields: you should especially note the obligatory
to field, which specifies the recipient's id. The remaining two fields are
messageType, which defaults to the name of the message class (e.g.
Offer), and the optional
returnAddress, which specifies the address to which the reply should be sent if it's expected for that
Apart from direct messages (going to a specific service/agent), there are also
We use them when an agent or a service wants to notify its surroundings that something has occurred.
Events are used as broadcast signals, which means that there is no
to field to specify.
Event type has its own event topic, which anyone can listen to.
This means other agents and services can subscribe to a specific event.
Some examples of events:
- a new user is registered
- some threshold is reached
- an ML model was retrained
Events are great for decoupling services, which allows for more flexibility and extensibility. You can learn how to create and subscribe to events here.
Reply signals are special
Message signals that represent responses to previously sent requests (messages).
In addition to all the fields a normal message would have, replies also have
replyTo that should contain the message to which this response relates to.
This field defaults to the
Message signal that's currently being processed.
An example of a reply would be:
- We send a request to the GCalendar adapter asking it which events the user has tomorrow
- GCalendar adapter queries the user's calendar
- GCalendar adapter sends the reply with the result back to the sender
Note that if no address is specified when sending a
replyTo field defaults to
replyTo.returnAddress, which is also an optional field. In case
replyTo.returnAddress is undefined, the address is generated from
ErrorReply is a subtype of the
Reply signal which signalizes that the received
Message couldn't be properly processed.
This signal has some additional fields compared to a
code is an
integer number that represents the error group this error belongs to and is obligatory to fill. The
description is a
string field that provides information about how and where the error occurred, and is also mandatory to fill. The field
help is an optional
string that provides additional info on the error, and a way to fix it.
DataChangeEvent is a special type of event that is sent when an "emmitable" data model is either created, updated or deleted.
Other agents and services can keep their own copy of a database and subscribe to the changes of a specific data object, so they can update the data when needed.
There are several fields on the
DataChangeEvent we need to note. The field which indicates what type of change has occurred and is obligatory to define is
changeType (which can be
Another one is
objectType, the type of object that has changed. This field defaults to the data model class name (e.g.
User). For the deserialization to happen correctly, it's important to keep the naming consistent and formatted correctly.
Also, the field
object represents the version of the newly created or recently updated object along with the last version of the object if it was deleted.
Creating and subscribing to events
To emit an event to other services, you first need to define the event model. However, you need to be careful to define
it somewhere in the service
api/ directory, because other services only have access to that package.
To emit the event, just do:
UserRegistered(userId=userId, registeredAt=get_utc_datetime(), user=user).emit()
To listen to events from other services, simply add a new function and decorate it with
@on_event(<EventCls>), for example:
from another_service.api import UserRegistered
def on_user_registered(self, user_registered: UserRegistered) -> None: