This document is part of the online Horus Documentation, under Horus Utilities.
In a complex distributed system, many protocols implement the support that the various applications require. Rather than implementing one ``super''-protocol that implements any possible feature that may ever be necessary, it is necessary for performance and maintenance reasons to implement and maintain protocols independently. Applications should be able to choose the protocols they require and combine the protocols in some desirable way. Although many distributed systems are built in some layered fashion, they are usually configured at system-building time rather than configured by the application program itself. To be able to do configure protocol stacks after a system has been booted, a generic interface between the layers needs to exist which allows the layers to be bound in an arbitrary fashion.
The Horus architecture allows protocol modules to be stacked at run-time in any order. The interface between the layers, called the Uniform Group Interface is strong enough to support a wide variety of known protocols and their semantics in a convenient way. The architecture introduces objects for communication endpoints and for groups of endpoints.
Horus explicitly separates the notions group and view of a group. A group is an addressing abstraction. A view is a list of endpoints. The notion joining a group should be understood in light of this difference. An endpoint that joins a group is willing to receive messages addressed to the group, but is not guaranteed to receive all messages that will be sent to the group during the membership. It often also indicates a desire to send to the group. Again, messages send to the group are not guaranteed to be delivered to all (or even any) members of the group.
Delivery guarantees are usually specified in terms of views: messages sent to a view are often guaranteed to be delivered to all (non-failing) endpoints in the view. Weaker and stronger guarantees are possible, and depend on which protocol modules the particular endpoints have stacked. We do not present any guarantees here, but just an architecture for a system that can implement a variety of guarantees.
include/horus/xxx_layer.h src/hutil/xxx_layer.c
#include "muts.h" #include "muts/xxx_layer.h"
When Horus initializes, each protocol layer has to register itself, specifying
a unique ASCII name for the layer, and a dispatch table of all the entry
function in the layer. For this it uses the xxx_config
structure:
struct xxx_config { char *name; xxx_ack_f *ack; xxx_address_f *address; xxx_cast_f *cast; xxx_contact_f *contact; xxx_destroy_f *destroy; xxx_done_f *done; xxx_endpoint_f *endpoint; xxx_entity_f *entity; xxx_flush_f *flush; xxx_flush_ok_f *flush_ok; xxx_join_denied_f *join_denied; xxx_join_f *join; xxx_join_granted_f *join_granted; xxx_leave_f *leave; xxx_merge_f *merge; xxx_pt2pt_group_f *pt2pt_group; xxx_send_f *send; xxx_view_f *view; xxx_focus_f *focus; xxx_stable_f *stable; xxx_dump_f *dump; };
In this structure, name
is the ASCII name of the layer. The rest
are pointers to the various downcalls that layers can provide to upper
layers or applications. (Note that the xxx_pt2pt_group
function
pointer is only included for compability reasons, but its use is no
longer supported.) At initialization time, the layer calls
xxx_config()
to pass the structure to the Horus system (most Horus
routines return a MUTS error_t
indication of success or error):
error_t xxx_config( struct xxx_config *config );
For example, the fragmentation/reassembly layer FRAG contains the following initialization code:
static struct xxx_config frag_config = { "FRAG", (xxx_ack_f *) frag_ack, (xxx_address_f *) frag_address, (xxx_cast_f *) frag_cast, (xxx_contact_f *) frag_contact, (xxx_destroy_f *) frag_destroy, (xxx_done_f *) frag_done, (xxx_endpoint_f *) frag_endpoint, (xxx_entity_f *) frag_entity, (xxx_flush_f *) frag_flush, (xxx_flush_ok_f *) frag_flush_ok, (xxx_join_denied_f *) frag_join_denied, (xxx_join_f *) frag_join, (xxx_join_granted_f *) frag_join_granted, (xxx_leave_f *) frag_leave, (xxx_merge_f *) frag_merge, (xxx_pt2pt_group_f *) 0, (xxx_send_f *) frag_send, (xxx_view_f *) frag_view, (xxx_focus_f *) frag_focus, (xxx_stable_f *) 0, (xxx_dump_f *) frag_dump }; error_t frag_interface_init(void){ e_enter(frag_interface_init); return e_pass(xxx_config(&frag_config)); }
The frag_interface_init
routine can be included in the MUTS conf
table, which is a list of function that is invoked at initialization.
Endpoints are objects of type xxx_endpoint_id
. They are generated
by the function xxx_endpoint()
:
error_t xxx_endpoint( xxx_endpoint_id below, char *stack, unsigned int options, thread upcall, void *env, OUT xxx_endpoint_id *endpoint );
Here, stack
is a null-terminated ASCII string that specifies the
stack of protocols that this endpoint will use for communication.
Below
specifies a previously generated endpoint on which this
protocols will be stacked. Below
is null in the common case in which
an endpoint is generated from scratch.
Options
is a bit mask of flags that influences the detailed behavior
of xxx_endpoint(). Upcall
is a descriptor of how events
specific to the endpoint should be delivered.
Env is passed to the upcall
. The result of xxx_endpoint()
is returned, by reference, in endpoint
.
The stack string specified to xxx_endpoint()
has a syntactical
structure. It consists of a list of layer descriptors separated by
colons. Each layer descriptor is an alpha-numeric name, optionally followed
by arguments enclosed in braces. The arguments are separated by commas
(each comma may optionally be followed by spaces). Each argument is of the
form name=value, where name is an alphanumeric string. For
example, this is a valid stack string:
``MBRSHIP:FRAG(size=1024):NAK(window=16, sweep=1000):COM''.
The effect of calling xxx_endpoint()
with this stack is that the
endpoint entry function of the MBRSHIP layer will be invoked with the
remaining string ``FRAG(size=1024):NAK(window=16, sweep=1000):COM''.
The endpoint entry function has the same signature as xxx_endpoint()
.
Typically, this function will allocate a record to return to the invoker,
and will then call xxx_endpoint()
itself with the remaining stack.
The xxx_endpoint_id
this is returned by this function is saved in
the record. As this progresses recursively, a linked list of endpoint
records is formed that resembles the protocol stack.
In addition, each entry routine will call xxx_get_args()
to get the
arguments optionally provided to each layer through the stack
specification:
error_t xxx_get_args( OUT xxx_args_t *args, IN OUT char **stackp );
This routine takes a pointer to the stack specification string (pointed
to be stackp
, and extracts the arguments to this layer, which it
saves in args
. The arguments can then be inspected using
xxx_getstr()
:
char *xxx_getstr( struct mem_chan *mc, xxx_args_t args, char *name );
This will return the ASCII value for the given name
if specified,
allocated from the mc
memory channel.
If no such argument is specified, NULL is returned.
xxx_rel_args()
is invoked to release the information kept to save
the arguments:
void xxx_rel_args( xxx_args_t args );
The endpoint structure that is returned is of type xxx_endpoint_id
,
which is defined as follows:
struct xxx_ident { struct xxx_config *config; enum { XXX_ENDPOINT, XXX_GROUP } type; char *name; }; typedef struct xxx_ident *xxx_id; typedef xxx_id xxx_endpoint_id;
Here config
points to the layers configuration table (so future
generic xxx
operations on the endpoint can dispatch the right
function), and the type of the identifier, which for this case has to
be XXX_ENDPOINT
. Optionally, for debugging purposes, the endpoint
may be given an ASCII name.
Most layers will want to include additional information in the endpoint.
This is why the layers typically define an endpoint as a structure that
starts out with the struct xxx_ident
structure, but is then followed
by layer-specific information. For example, the fragmentation layer uses
the following endpoint definition:
struct frag_endpoint { struct xxx_ident xxx; xxx_endpoint_id below; int frag_size; }; typedef struct frag_endpoint *frag_endpoint_id;
Here below
points to the endpoint of the layer below this layer, and
frag_size
is the fragmentation size used by this endpoint. The
frag_endpoint()
entry in the FRAG layer allocates and initializes
this structure and returns it to the invoker:
error_t frag_endpoint( xxx_endpoint_id below, char *stack unsigned int options, thread upcall, void *env, frag_endpoint_id *epp ) { frag_endpoint_id ep; xxx_args_t args; char *mem; error_t err; e_enter(frag_endpoint); /* Get the arguments. */ err = xxx_get_args(&args, &stack); if (!e_isok(err)) return e_pass(err); /* Allocate an endpoint structure. */ ep = mc_alloc(frag_memory, sizeof(*ep)); ep->xxx.config = &frag_config; ep->xxx.type = XXX_ENDPOINT; /* Get the 'size' argument. If it isn't there, assign a default. */ mem = xxx_getstr(frag_memory, args, "size"); if (mem != 0) { ep->frag_size = atoi(mem); mc_free(frag_memory, mem); } else ep->frag_size = FRAG_SIZE; xxx_rel_args(args); /* Call the layer below. */ err = xxx_endpoint(below, stack, options, upcall, env, &ep->below); if (!e_isok(err)) { mc_free(frag_memory, ep); return e_pass(err); } *epp = ep; return e_ok(); }
In MUTS, addresses are called
entity identifiers.
MUTS defines the
type eid_t
to contain addresses for various protocols. Horus, for
software engineering technical reasons, has defines its own address type:
xxx_entity_t
. Although currently an xxx_entity_t
is
a structure containing nothing but an eid_t
, in the future it is
possible that Horus will associate extra information with entity identifiers.
Given a group handle of type xxx_endpoint_id
, it is possible to
retrieve the corresponding (local) address using the function
xxx_entity()
:
error_t xxx_entity( xxx_endpoint_id ep, OUT xxx_entity_t *address );
An endpoint object is allowed to join one or more groups. For each of the
groups that it wishes to join, it has to allocate a group structure, which
is termed, perhaps unfortunately, a group object
. And perhaps even
more unfortunately, the routine to allocate the group object is called
xxx_join()
. It is similar to the xxx_endpoint()
function:
error_t xxx_join( xxx_endpoint_id ep, eid_t *address, unsigned char *key, eid_t *contact, unsigned int options, thread upcall, void *env, xxx_message_t *msg, char *name, OUT xxx_group_id *group );
In this function, ep
specifies which endpoint is allocating the
group object. Address
is the MUTS group entity identifier that
serves as the address of the group. It is allocated by the MUTS
mt_neweid
function. Key
is an optional group key. For now
we assume that NULL is specified. Contact
is the MUTS entity
identifier for the group contact. Again, contact
is optional, and
we will for now assume that NULL is specified. Msg
is an optional
message that is transfered to the contact. It is ignore if contact is
NULL. name
is an ASCII string used for debugging. Options
,
upcall
, and env
are as in xxx_endpoint()
, and more
about this will follow later. A group object identifier is returned.
The group object identifier type is defined the same as endpoints:
typedef xxx_id xxx_group_id;
That is, technically, there is no difference between an endpoint type and
a group object type. However, the type
field in a group
object identifier is set to XXX_GROUP
. As with endpoints,
layers can add their own information to group objects. For example,
the FRAG layer defines its group object as follows:
struct frag_group { struct xxx_ident xxx; lock_t lock; frag_endpoint_id endpoint; xxx_group_id below; evcnt_t evcnt; thread upcall; void *env; };
The frag_join
operation is implemented as follows (for brevity,
error handling has been omitted):
error_t frag_join( frag_endpoint_id ep, eid_t *group_address, unsigned char *public_key, eid_t *group_contact, unsigned int options, thread upcall, void *env, xxx_message_t *msg, char *name, frag_group_id *pgroup ) { frag_group_id group; error_t err; e_enter(frag_join); group = mc_alloc(frag_memory, sizeof(*group)); sys_zero(group, sizeof(*group)); group->xxx.config = &frag_config; group->xxx.type = XXX_GROUP; l_init("frag_group", &group->lock); ec_init("frag_group", &group->evcnt); group->endpoint = ep; group->upcall = upcall; group->env = env; l_lock(&group->lock); err = xxx_join(ep->below, group_address, public_key, group_contact, options, frag_group_event_thread, group, msg, name, &group->below); l_unlock(&group->lock); return e_pass(err); }
When comparing this function with the frag_endpoint()
function, it
can be seen they are quite similar. In fact, two of the basic differences
(using a lock, and specifying a local upcall function) are due to
simplifications in the presentation of frag_endpoint()
. Thus the
only real difference is that frag_join
does not receive an ASCII
protocol stack specification, and thus does not need to parse this.
Instead, a group object basically inherits the protocol stack from the
endpoint.
As with endpoint objects, there is a function to retrieve the address
of a group: xxx_address()
. This function can also be used to
retrieve the key of a secure group:
error_t xxx_address( xxx_group_id group, OUT eid_t *address, OUT unsigned char *key );
Either the address
or key
pointer can be null if that
information is not necessary. The buffer pointed to by key
has
to be large enough to hold the key. It is assumed that, since the key
was provided when xxx_join()
was invoked, the size is known.
Note that this function is strictly for convenience only: the data
returned should be the same as which was passed to xxx_join()
before.
Both Horus endpoint and group objects have upcalls
associated with
them. These upcalls are basically functions that are invoked in response
to certain events, which we will define below. The upcalls are defined
by the MUTS t_declare()
function. Upcalls can currently be functions,
queues, or threads. For now, we will assume that each upcall results in
a thread invocation. As defined by MUTS, a thread invokes a procedure
(specified in t_declare()
with two void*
arguments. The
procedure returns void
. The first argument passed to this procedure
is the env
argument that was previously specified by the invoker of
xxx_endpoint()
or xxx_join()
. It is typically a pointer to
the endpoint resp. the group structure that the layer allocated, so that
the upcall has access to this structure. The second argument points to
an struct xxx_event
structure that describes the event. This
structure is defined as follows:
struct xxx_event { struct xxx_event *next; enum xxx_event_type type; int event_number; int nmembers; int my_rank; int origin; unsigned int flags; xxx_entity_t *view; xxx_entity_t *joiners; int njoiners; xxx_join_id join_ident; int join_granted; xxx_entity_t *problem_list; int problem_size; xxx_message_t *msg; struct xxx_ack_matrix *mcast_acks; struct xxx_ack_matrix *pt2pt_acks; struct xxx_ack_matrix *current_time; struct xxx_ack_matrix *msg_time; };
Here, next
is a pointer that can be used by the upcall in any way
it would like, and is useful for building lists of event structures.
Type
specifies the type of the event, which is currently one of:
enum xxx_event_type { XXX_JOIN_REQUEST, XXX_JOIN_FAILED, XXX_JOIN_DENIED, XXX_JOIN_READY, XXX_FLUSH, XXX_FLUSH_OK, XXX_VIEW, XXX_CAST, XXX_SEND, XXX_LEAVE, XXX_LOST_MESSAGE, XXX_STABLE, XXX_PROBLEM, XXX_SYSTEM_ERROR, XXX_EXIT };
Each event passed to an upcall is numbered, starting at 0, in the field
event_number
. Currently, the remaining fields only apply to group
objects. nmembers
specifies the number of members in the view of
this endpoint. The members are ranked 0, ..., nmembers
- 1. The rank
of this endpoint in the view is my_rank
. The rank of the member
causing this event is in origin
. Flags
is a bitmap of boolean
values. For example, the XXX_BYTE_SWAPPED
bit specifies that origin
machine of the event uses a different byte order from this machine. The
other fields are defined on a per upcall type basis. Details about each
event will be described later.
Many upcalls have a message associated with them. This message is passed
up as a MUTS message in the msg
field. In case the message was not
sent by the same endpoint (i.e., origin != my_rank
), the
message should be separately acknowledged using the xxx_ack()
downcall. Only this way the layer below can be sure that this endpoint
has safely received the message. Often xxx_ack()
is invoked
immediately, but some applications may prefer to take other actions
(like logging the message) first. The signature of xxx_ack()
is
as follows:
error_t xxx_ack( xxx_id xxx, xxx_message_t *msg );
(Currently no events containing messages are ever delivered to endpoint
objects, so the first argument is always an xxx_group_id
.)
When an upcall is finished with an event, it has to invoke the routine
xxx_done()
, which is defined as follows:
error_t xxx_done( xxx_id xxx, struct xxx_event *ev );
Here, xxx
is the group or endpoint to which the upcall applied, and
ev
the pointer to the event descriptor that was passed to the upcall.
xxx_done()
invokes the appropriate layer function (frag_done()
in our example), which is responsible for cleaning up the data structures
that were allocated by the layer for the purpose of the event.
The last event ever passed to an endpoint or group upcall is the
XXX_EXIT
event. It signals that the corresponding
xxx_endpoint_id
or xxx_group_id
handles should no longer be
used. In fact, when the corresponding xxx_done()
is invoked on
the event, the handle becomes invalid.
For example, the FRAG layer uses an upcall thread like this one:
static void frag_event(void *env, void *param){ frag_group_id group = env; struct xxx_event *ev = param; int cleanup = 0; e_enter(frag_event); /* Synchronize the event, and get the group lock. */ ec_sync(&group->evcnt, ev->event_number); l_lock(&group->lock); /* Acknowledge the message, if any. */ if (ev->msg != 0 && ev->origin != ev->my_rank) (void) xxx_ack(group, ev->msg); switch (ev->type) { case XXX_EXIT: cleanup = 1; /* FALL THROUGH */ default: frag_deliver(group, ev); } l_unlock(&group->lock); ec_inc(&group->evcnt); /* Release the event structure. */ (void) xxx_done(group->below, ev); if (cleanup) frag_cleanup(group); e_exit(); }
(NOTE: some layers add information to events by defining a new event
structure that starts out with an struct xxx_event
field, in the
same way endpoint and group objects are built. When the corresponding
xxx_done()
routine of the layer is invoked, it can access this
information.)
XXX_VIEW
upcall
Typically the first event to be delivered to a group object is the
XXX_VIEW
upcall. In the event structure, view
is an array
of size nmembers
containing the addresses of the members that are
in the view of the upcall receiver. (In other upcalls, this pointer is
usually set to NULL to minimize overhead.) The msg
field points
to a message which is associated with each view. The contents of the
message is layer and application dependent, and will become clear when
the xxx_view()
downcall is described.
The view
list may be stolen from the event structure by
clearing the field in the structure. For example, XXX_VIEW
upcalls often execute the following code:
if (group->view != 0) mc_free(group->mc, group->view); group->view = ev->view; ev->view = 0;
The view array, as everything else in Horus, was allocated through
memory channels (using mc_alloc()
), and should be freed using
mc_free()
.
Right after invoking xxx_join()
, the initial view consists of only
the endpoint. Depending on the layer, this view may or may not be delivered
back to the endpoint using an XXX_VIEW
upcall. Every view is
guaranteed to contain the address of the endpoint, and its index in the
view
array is indicated by my_rank
.
All events other than the XXX_VIEW
and XXX_EXIT
events are
delivered to the current view. The current view is the last view that was
delivered, or the initial view if no XXX_VIEW
upcall has been
delivered yet.
Downcalls, such as sending messages, are also done to the current
view. However, since upcalls are delivered asynchronously, it may not
always be clear what the current view is (after all, a XXX_VIEW
upcall may be scheduled, but not have been delivered). Therefore
neighboring layers go through a process called flushing.
Implementing flushing is optional, and some layers do not. As a result,
these layers do not guarantee that messages are delivered to the view in
which they were sent.
For the remainder, we will assume that layers do wish to synchronize view
changes and provide a flush interface. We will talk about the lower
layer
that wishes to install a new view, and the upper layer
that
receives the view. (Typically, the lower layer installs a view at the
request of the upper layer, but this will be discussed later.)
The lower layer first delivers an XXX_FLUSH
upcall to the upper layer,
notifying that the lower layer wishes to change the view. This upcall has
a message associated with it just like XXX_VIEW
upcalls. The upper
layer acknowledges the upcall with the xxx_flush_ok()
downcall.
Until this is done, the downcalls issued by the upper layer are to the old
view. The signature of xxx_flush_ok()
is as follows:
error_t xxx_flush_ok( xxx_group_id group );
This will invoke the corresponding entry in the lower layer, and brings
the endpoint in a state where the lower layer typically disallows sending
messages, or sometimes buffers newly sent messages for transmission to the
next view. This state is terminated when the next XXX_VIEW
upcall
arrives. Before this happens, it is possible to receive other upcalls that
pertain to the old view.
The view is usually flushed at the request of the upper layer (but not
necessarily). When the upper layer wishes to flush the view (in response
to join requests or failures), it can do so by invoking xxx_flush()
:
error_t xxx_flush( xxx_group_id group, xxx_entity_t remove_members, int n, unsigned int options, xxx_message_t *msg );
As with XXX_FLUSH
upcalls, this brings the endpoint in a state where
it is usually not allowed to send messages, or where it is not clear to which
view the messages will be delivered (until the next XXX_VIEW
event
is delivered). In the downcall, it is possible to request a set of members
to be removed from the next view, using the remove_members
list of
size n
. It is also possible to specify a message that is delivered to
an eventual XXX_FLUSH
upcall generated by this downcall. This upcall
is typically (but not necessarily) delivered to all other members of the old
view, but not to the member that issued the xxx_flush()
downcall. We
call this member the coordinator of the flush.
Eventually, each xxx_flush()
downcall results in exactly one
XXX_FLUSH_OK
upcall to the coordinator, indicating the termination
of the flush process. As described later, layers typically only deliver
an XXX_FLUSH_OK
upcall after having received a response for each
member that is not being removed from the view. Therefore, to terminate
a flush, it may be necessary to invoke the xxx_flush()
downcall
again. The corresponding XXX_FLUSH_OK
upcalls are delivered in
the order of the corresponding xxx_flush()
downcalls.
Layers need not support the xxx_flush()
downcall. If invoked, the
downcall will return a MUTS error indication.
Installing new views
Often, a new view installation is requested by the upper layer to the lower
layer. This is done by the xxx_view()
downcall:
error_t xxx_view( xxx_group_id group, xxx_entity_t add_members, int n, unsigned int options, xxx_message_t *msg );
Note that this function is almost like the xxx_flush()
counterpart.
Instead of specifying a list of members to remove from the view, a list
of members to add to the view is specified. It is allowed to include in this
list addresses of members that are already in the view. The msg
argument is passed to the corresponding XXX_VIEW
upcall.
xxx_view()
may only be invoked by flushed members. An
endpoint becomes flushed after it invoked xxx_flush()
and this
operation completed. In many cases, endpoints start out initially in a
flushed state.
It should be clear by now that members effectively run a state machine, the transitions triggered by various upcalls and downcalls. Not all operations are possible in all states. It is therefore useful to make this state machine explicit. A somewhat simplified representation is depicted below:
Here, the large arrow labeled xxx_flush()
indicates that this
transition is possible from any state. The NORMAL state is the normal
running state in which message send primitives are allowed. After an
XXX_FLUSH
event arrives, an xxx_flush_ok()
downcall
brings the endpoint in the state WAITING_FOR_VIEW where it is waiting
for a view upcall. (Technically, there should be a FLUSH_RECEIVED state
in between the NORMAL and WAITING_FOR_VIEW states, but for simplicity
this has been omitted.) An XXX_VIEW
upcall brings the endpoint
back in NORMAL state.
In any state the member can invoke xxx_flush()
, bringing it in
FLUSHING state (if the layer supports xxx_flush()
. More than
one xxx_flush()
may be issued, keeping the state at FLUSHING.
The last XXX_FLUSH_OK
upcall brings endpoint in the FLUSHED
state. From there, an xxx_view()
downcall brings the endpoint back
in NORMAL state. Alternatively, the endpoint can invoke xxx_merge()
to go into the WAITING_FOR_VIEW
state. This downcall will be
described below.
At layers where group membership is an issue, it is undesirable to have
multiple views of a group. Yet, due to network partitions, different
endpoints may have different views. Also, when a new endpoint tries to
join a group, it initially has only itself in its view. Two members that
can communicate, but with different views of a group, will want to
merge their views. In Horus, this can be done by bringing one
member (the coordinator) in FLUSHED state, and all the other members
involved in the WAITING_FOR_VIEW state. The coordinator now specifies
the members that are not in its view already in an xxx_view()
downcall. At a layer that implements group membership, this will result
in an XXX_VIEW
upcall at all members (including the coordinator),
completing the merge.
Bringing all members but one in the WAITING_FOR_VIEW state involves
one more step. Consider two views, V_1 and V_2.
Only a member of
V_1 can flush V_1.
This member is the coordinator C_1 of V_1.
Similarly, only the coordinator C_2 can flush V_2.
After both
flushes complete, all members but two are in WAITING_FOR_VIEW
state. One of the coordinators has to decide to become the
subordinate endpoint, while the other will install the view.
The downcall xxx_merge()
brings a coordinator in WAITING_FOR_VIEW
state, and, as a side-effect, can send a message to the other coordinator
signaling that it (and the other members of its view) are ready to receive
a view. The signature of xxx_merge()
is the following:
error_t xxx_merge( xxx_group_id group, eid_t *contact, unsigned int options, xxx_message_t *msg );
Optionally, contact
may be null, in which case the msg
may
be omitted. Otherwise the message is forwarded to the given contact
endpoint, where it results in an XXX_JOIN_REQUEST
upcall. On
the message, the view of the invoker will be piggybacked. The event
structure has a field joiners
containing the view of the member
requesting the join, and a field njoiners
containing the size of
this view. As with views, the joiners
list may be stolen by
overwriting the field with a null pointer. If the contact does not
respond within a certain time, an XXX_JOIN_FAILED
event may
result at the upcall of the invoker of xxx_merge()
. This brings
its state back to FLUSHED
.
The JOIN_REQUEST
event also contains a join_granted
field.
If non-zero, the upcall is supposed to either
invoke xxx_join_granted()
or xxx_join_denied()
, depending
on whether it accepts the request or not. Both are invoked with the
join_ident
field of the event structure. If the join_granted
field is non-zero, neither of these routines should be invoked. The
signatures are as follows:
error_t xxx_join_granted( xxx_group_id group, xxx_join_id join_ident, void *grant_ident ); error_t xxx_join_denied( xxx_group_id group, xxx_join_id join_ident, unsigned int options, xxx_message_t *msg );
xxx_join_denied()
should send a notification back to the invoker
of xxx_join()
explaining the refusal. This notification is delivered
as a JOIN_DENIED
event.
When a contact (and a message) is specified to xxx_join()
, it will
automatically invoke xxx_merge()
. Thus xxx_join()
without
specifying a contact brings the endpoint in FLUSHED state, whereas the
state is initialized to WAITING_FOR_VIEW if a contact is specified.
If an endpoint wants to be able to serve as a contact for the merge
operation, it will have to obtain and publish its address. To obtain
its contact address, the endpoint invokes xxx_contact()
:
error_t xxx_contact( xxx_group_id group, OUT eid_t *contact );
Note, this routine does not return the address of the contact, but
the address of the local endpoint for use with xxx_merge()
or
xxx_join()
. Often the address is the local endpoint address.
However, there are other cases. For example, in secure operation, the
contact address contains different information.
Publishing the contact address is up to the application. For example, it can do so via a name or directory service, or it can write the address on a shared file.
Usually, only one member in the view serves as contact. When the contact
receives a join request, it buffers the request for later while flushing
the group. Other join requests that arrive while flushing are also
buffered, but no new flushes need be started. When the XXX_FLUSH_OK
event arrives, the contact invokes xxx_view()
with its current queue
of join requests and truncates the queue.
Typically, the contact is the member that is ranked 0. When an endpoint
receives an XXX_VIEW
event with my_rank = 0
, it invokes
xxx_contact()
and publishes the result somewhere. It only needs to
do so if it did not previously have rank 0. This same contact should also
be the coordinator for the flush protocol in response to failures, so that
no two members attempt the flush the view concurrently.
The Horus interface has provisions for access control, allowing group members
to deny access to endpoints that wish to join (or merge in their views).
This is done on a per-view basis. Currently, the only way to join a secure
view is through xxx_join()
(not xxx_merge()
). When joining the
group, the endpoint specifies the XXX_SECURE
option, a key
, and
a contact of the view it wishes the join into.
The JOIN_REQUEST
upcall is invoked with an event structure containing
a zero join_granted
field, meaning that xxx_join_granted()
or xxx_join_denied()
has to be invoked. The joiners
list is
currently guaranteed to be of size one, and to contain an authenticated
endpoint. The xxx_join_granted()
downcall eventually results in an
XXX_JOIN_READY
event. Before this event occurs, the joiner should
not be included in a new view. However, the old view can be flushed
while waiting for this event. During this time, the security code runs a
protocol with the joiner to transfer some necessary state to the joiner
allowing it to receive the next view.
In addition to access control, XXX_SECURE
option results in views
where each packet between members is cryptographically signed and checked
on receipt. Also, if the XXX_ENCRYPT
option is specified in
message send operations, the data is encrypted during transmission.
Many fault-tolerant algorithms need accurate information about failures. Unfortunately, due to a theoretical result, it is often not possible to distinguish crashed processes from slow processes. Since Horus cannot solve this as well as possible, it allows for the application to decide which problems constitute failures.
When layers detect problems, they deliver an XXX_PROBLEM
event.
Such problems include broken connections, crashed processes, and timeouts.
The problem event structure contains a list of member addresses,
problem_list
of size problem_size
. As with views, this
list may be stolen by overwriting the problem_list
field
with a null pointer.
Based on the algorithm the application is trying to implement, the
application decides which of these members can be removed from the
membership. The members can be removed using xxx_flush()
.
EXAMPLE HERE OF PROBLEMS AND JOIN REQUEST WITH FLUSH.
A member of a view may send a message to all members using xxx_cast()
:
error_t xxx_cast( xxx_group_id group, unsigned int options, xxx_message_t *msg );
This message is delivered as an XXX_CAST
upcall. Different layers
will provide different semantics and ordering. Not all layers deliver
locally. Obviously, a layer that provides total ordering has to deliver
locally, but a layer that provides only FIFO or causal ordering does not
need to do so. Some of these layers recognize the XXX_LOCAL
option,
forcing a local delivery.
The semantics of xxx_cast()
are typically only provides while in
NORMAL state. Most layers guarantee that the message is then delivered
in the view in which it was sent. However, while flushing, or waiting for a
view to be installed, layers are allowed to deny message send operations,
or delay delivery until a next view.
There is also an interface to send to a subset of the view membership:
error_t xxx_send( xxx_group_id group, int rank_list[], int n, unsigned int options, xxx_message_t *msg );
With this interface, a list of member ranks is specified (or size n
),
to which the message should be delivered. The semantics of this interface
is again up to the layer. Many layers only accepts lists of size one,
while other layers do not implement this interface at all (and return an
error). Layers may also implement special options where the rank_list
is not a list of ranks, but instead a list of int-sized words encoding the
destination set of the view in some other way.
Some layers can lose messages under certain circumstances. For example,
in a layer that implements FIFO transmission purely based on negative
acknowledgements, a slow receiver may not be able to retrieve an old
message. Under such conditions, the layer may deliver an
XXX_LOST_MESSAGE
event to notify the application, without
terminating the membership. Applications that cannot deal with this
can treat this event as a XXX_PROBLEM
event.
The Horus interface includes functions to create, update, and inspect two-dimensional matrices of integers. Several of these matrices are used to maintain message stability information and causal time information. Other uses are feasible. A matrix contains a reference counter and a lock to allow for sharing between layers.
The matrix is of type struct xxx_ack_matrix
, and can be
allocated using xxx_matrix_alloc()
:
error_t xxx_matrix_alloc( struct mem_chan *mem, int nrows, int ncols, OUT struct xxx_ack_matrix **pmat );
To retrieve information out of the matrix, you specify a row x
and a column y
, and the number of entries n
you wish to extract.
Per default, the information is extracted a row at a time, but by
specifying the XXX_COLUMN
this can be overridden. Overflowing from
one row into the next is handled naturally (same for columns). Rows and
columns both are numbered from zero. The result is returned in an array
of 32-bit unsigned integers:
error_t xxx_get_acks( struct xxx_ack_matrix *mat, int x, int y, int n, OUT uint32_t *out );
To release the matrix, use xxx_matrix_release()
:
error_t xxx_matrix_release( struct xxx_ack_matrix *mat );
The interface is otherwise very incomplete, and often requires an
understanding of the contents of struct xxx_ack_matrix
.
Luckily, most applications only need the interfaces specified above.
Optionally, layers can provide information about which endpoints
acknowledged (using xxx_ack()
) which messages. For this
purpose, every message. This allows processes to inspect the
``stability'' of messages they sent or received.
For xxx_cast
messages, the information is maintained in the
mcast_acks
matrix in the event upcall. This matrix is of size
nmembers
X nmembers
. Entry (x, y)
contains the number of messages
from x, acknowledged by y.
On the diagonal, entry (x, x) contains
the number of messages that were sent by x.
The minimum of a column y
is then the number of messages that are fully acknowledged.
For xxx_send
messages, less information is provided. The
matrix pt2pt_acks
is a vector of size
1 X nmembers
.
Entry i contains the number of acknowledged messages sent using
xxx_send()
. It depends on the particular layer whether these
messages were acknowledged by all members in the view, or whether
only a partial stability is indicated.
The fields current_time
and msg_time
are matrices of size
nmembers
X nmembers
.
There uses depends on the layer that provides
them (they usually keep information about causal time). Any of these
matrix fields may be null, in which case the layer does not provide
the corresponding information.
Sometimes, the application (or a higher level layer) has more information
about message stability. Using the xxx_stable()
downcall it can
pass the information to the layer below:
error_t xxx_stable( xxx_group_id group, unsigned int options, struct xxx_ack_matrix *am );
Currently there has not been much use of xxx_stable()
, and the
exact semantics of the call are not thoroughly defined.
Layers and applications sometimes need to be able to interact with a particular lower-level layer. For example, an application that uses the TOTAL layer, which implements total order through a token, may want to explicitly manipulate the token, or may want to change the token passing policy used by the layer. For this, the TOTAL layer provides functions that operate on the endpoint and group object handles that it returns to the layer directly on top of the TOTAL layer. Because the use of Horus is not limited to object-oriented languages (and, in fact, most layers are written in C rather than C++), we cannot assume that these function are inherited by the layers that are stacked on top of the TOTAL layer.
Therefore, Horus provides a function that can return endpoint and group
object handles anywhere in the protocol stack. This function, called
xxx_focus()
, can search through the stack recursively (like most
other Horus layer functions) until it finds the relevant layer. Its
signature is as follows:
error_t xxx_focus( xxx_id in, char *ident, OUT xxx_id *out, );
The function takes a top-level group or endpoint handle, and returns
a lower-level handle based on the specified ident
. Each layer
is supposed to implement this function, recursively invoking it unless
it recognizes the given ident. This is perhaps best clarified by an
example. Below is the implementation of the focus function of the
TOTAL layer:
error_t to_focus( to_group_id group, char *ident, xxx_group_id *pg ) { e_enter(to_focus); /* Intercept if a TOTAL or TOKEN focus is specified. */ if (!strcmp(ident, "TOTAL") || !strcmp(ident, "TOKEN")) { *pg = (xxx_group_id) group; return e_ok(); } /* Otherwise pass it on. */ return e_pass(xxx_focus(group->below, ident, pg)); }
When invoked with the string TOTAL or the string TOKEN, it returns the
group pointer. Otherwise, it recursively invokes xxx_focus()
to find another layer that may recognize the given ident
. When
the bottom of the protocol stack is hit, xxx_focus()
returns an
error.
Note that the example is somewhat simplified. It really should check that
a group object was passed to the focus function (by testing the type
field of the xxx_ident
structure). In the case an endpoint object
is specified, similar code should be executed for endpoints instead.
For debugging, it is useful to see the state of the various protocols
in the stack. For this, Horus provides the xxx_dump()
function.
Like xxx_focus()
, it can travel recursively down a stack of endpoint
or group objects. Each layer prints the information that it wants to supply.
The prototype of xxx_dump()
is:
error_t xxx_dump( xxx_id xxx, unsigned int options, int cmd );
Only if the XXX_DUMP_RECUR
option is specified, the xxx_dump()
should be invoked recursively. The cmd
argument is currently unused,
but may later specify which classes of information should be printed. Here
is, for example, the implementation of frag_dump()
:
error_t frag_dump(xxx_id xxx, unsigned int options, int cmd){ frag_endpoint_id ep; frag_group_id grp; e_enter(frag_dump); switch (xxx->type) { case XXX_ENDPOINT: ep = (frag_endpoint_id) xxx; pprintf("FRAGMENT endpoint\n"); pprintf("\tfrag_size = %d\n", ep->frag_size); if (options & XXX_DUMP_RECUR) return e_pass(xxx_dump(ep->below, options, cmd)); break; case XXX_GROUP: grp = (frag_group_id) xxx; pprintf("FRAGMENT group\n"); pprintf("\tmy_rank = %d\n", grp->my_rank); pprintf("\tnmembers = %d\n", grp->nmembers); if (options & XXX_DUMP_RECUR) return e_pass(xxx_dump(grp->below, options, cmd)); } return e_ok(); }
As one of the target uses for Horus is fault-tolerant applications, the
use of panic()
(or sys_panic() as available in MUTS
is avoided.
Rather, Horus defines a XXX_SYSTEM_ERROR
upcall which allows
applications to save state or take other useful actions before restarting
to recover from an internal error. Alternatively, the MUTS
sys_on_exit()
functionality may be used, which allows function to
be specified that are invoked before the process terminates.
Leaving the group behaves much like a crash. An endpoint that wants to leave invokes:
error_t xxx_leave( xxx_group_id group, unsigned int options, xxx_message_t *msg );
This is often much like xxx_cast()
, but instead results in an
XXX_LEAVE
upcall (which is usually delivered with the same
semantics as XXX_CAST
. However, some layers do not send a message
in response to an xxx_leave()
at all.
xxx_leave()
results in an XXX_EXIT
upcall. After
xxx_done()
is invoked, the group pointer becomes invalid, allowing
the layer to release all its resources. Before this upcall, many layers
will not allow the same endpoint, or even a different endpoint within the
same process, to join the same group.
EXAMPLE HERE OF GROUP CLEAN UP.
To release the data structures associated with an endpoint object, use
xxx_destroy()
:
error_t xxx_destroy( xxx_endpoint_id endpoint, unsigned int options, xxx_message_t *msg );
Layers may require that the endpoint may not be joined to any group.
In other layers the endpoint is automatically removed from the membership
of groups. Eventually, xxx_destroy()
has to lead to an
XXX_EXIT
event to the endpoint upcall, which is guaranteed to be
the last event passed to the upcall. Currently, it is up to the individual
layers how the msg
argument is used.