Fengyun Liu

Distributed Algorithms

Note: This is a summary of the course I take at EPFL(course page).

Distributed computing and storage systems are very popular today in industry. Before taking the Distributed Algorithm courses at EPFL, I’ve learned here and there things about vector clocks, quorums, replication, CAP theorem, two-phase commit and so on. These scattered pieces of knowledge can’t provide a firm theorectical foundation for me to understand the core principles behind various distributed systems.

An introductory course should provide basic concepts & techniques that enable learners to think, reason and do research in the field. It should emphasize the most basic building blocks & principles, instead of specific artefacts built from these basic building blocks. From this perspective, I think the content of this course is excellent.

General Problems of Distributed Systems

If we think abstractly, all distributed systems face and only face three basic problems:

  • Channels: communication between two processes
  • Broadcast: communication between one process and multiple processes
  • Consensus: multiple processes to reach a common decision

It turns out that any difficult problem of distributed systems is some complex combination of the three basic problems above. To some extent, distributed computing is all about communication and protocols. Why the problems above are nontrivial? This is due to following facts:

  • the communication medium may be asynchronous(non-deterministic delay)
  • any process may crash and stop, or crash and recover, or be attacked and do harmful things to the system(Byzantine process)
  • crashed processes may not be perfectly detected

Models of Distributed Systems

Depending on what assumptions we make about the distributed environment, following are widely used models for the study of distributed systems:

  • fail-stop: crash-stop process + perfect links + perfect failure detector
  • fail-noisy: crash-stop process + perfect links + eventually perfect failure detector
  • fail-silent: crash-stop process + perfect links
  • fail-recovery: crash-recovery process + stubborn links + eventual leader detector
  • fail-arbitrary: Byzantine process + authenticated perfect link

Under different models and assumptions, there are often different solutions to the problems above. And each solution to a problem has to specify clearly which model it assumes.

In theretical studies, it’s also common to assume that a majority of processes are correct or at most f of N processes may fail in addition to any of the computation models above. Additional assumptions may enable simpler solutions to some specific problems, though the domain of application becomes smaller.

Define the Problems

The three general problems can be refined by adding different properties to get variants of the problems.

Channels

For channels, we can restrict that the communication should be reliable, or in addition it should be secure. The concept reliable and secure can be formally defined in terms of liveness and safety properties. We can also restrict that the messages must be received in the same order as they are sent. Following are useful abstractions for channels:

  • Perfect links guarantee delivery and no duplication, which can be built on top of noisy and finitely lossy fair-loss links.
  • Authenticated links guarantee secure communication(confidentiality & integrity & authenticity).
  • Ordered links guarantee messages are delivered in the same order as they are sent.

Broadcast

Regarding broadcast, i.e. the communication between one process and multiple processes, what we care is communication in the direction 1->N, as the communication in the direction N->1 is just a superposition of channels. Why the communication 1->N is not a superposition of channels? That is because we can stipulate different properties on the communication. For example, there could be following different properties:

  • Best-effort broadcast is just a superposition of channels.
  • Reliable broadcast stipulates that all correct processes deliver the same set of messages.
  • Uniform broadcast requires further that no messages can be delivered by a faulty processe but are not delivered by a correct process.

Additional restrictions can be added to broadcast by specifying the order of delivered messages. For example, there could be following different properties:

  • FIFO broadcast stipulates that each process should deliver messages in the same order as they are broadcast by a process.
  • Causal broadcast stipulates that each process should deliver messages according to causal order(see definition below).
  • Total-order broadcast stipulates that all processes deliver all messages in the same order.

There are other forms of broadcast and new ones could be invented if a new property of broadcast can be defined.

Consensus

The third basic problem of distributed systems is consensus, i.e. to reach common decisions among multiple processes. This problem is easy if we have a special leader process, but the special process forms a single point of failure, which makes the whole system less fault-tolerant.

Consensus is the panacea in distrubted systems. A large number of difficult problems can be solved by using this basic construct.

Failure Detectors

Perfect failure detector

Perfect failure detector can be implemented in synchronous systems using heartbeat. It is impossible to implement perfect failure detect in asynchronous systems, as it’s impossible to differentiate delay from a failure.

Module:
  Name: PerfectFailureDetector, instance P.

Events:
  Indication: ⟨ P, Crash | p ⟩: Detects that process p has crashed.

Properties:
  1. Strong completeness: Eventually, every process that crashes is
     permanently detected by every correct process.

  2. Strong accuracy: If a process p is detected by any process, then
     p has crashed.

Eventually perfect failure detector

Eventually perfect failure detector can be implemented in a partially synchronous environment using heartbeats. Partially synchronous means there are periods during which the system is synchronous, although there is no bound on the period during which it is asynchronous. Most pratical systems are partially synchronous.

Module:
  Name: EventuallyPerfectFailureDetector, instance ♢P.

Events:
  Indication: ⟨ ♢P , Suspect | p ⟩: Notifies that process p is suspected to have crashed.
  Indication: ⟨ ♢P, Restore | p ⟩: Notifies that process p is not suspected anymore.

Properties:
  1. Strong completeness: Eventually, every process that crashes is
     permanently suspected by every correct process.

  2. Eventual strong accuracy: Eventually, no correct process is
     suspected by any correct process.

Channels

Specification:

Module:
  Name: FairLossPointToPointLinks, instance fll.

Events:
  Request: ⟨ fll, Send | q, m ⟩: Requests to send message m to process q.
  Indication: ⟨ fll, Deliver | p, m ⟩: Delivers message m sent by process p.

Properties:
  1. Fair-loss - If a correct process p infinitely often sends a message
     m to a correct process q, then q delivers m an infinite number of
     times.

  2. Finite duplication: If a correct process p sends a message m a
     finite number of times to process q, then m cannot be delivered an
     infinite number of times by q.

  3. No creation: If some process q delivers a message m with sender p,
     then m was previously sent to q by process p.

Specification:

Module:
  Name: StubbornPointToPointLinks, instance sl.

Events:
  Request: ⟨ sl, Send | q, m ⟩: Requests to send message m to process q.
  Indication: ⟨ sl, Deliver | p, m ⟩: Delivers message m sent by process p.

Properties:
  1. Stubborn delivery: If a correct process p sends a message m once to a
     correct process q, then q delivers m an infinite number of times.

  2. No creation: If some process q delivers a message m with sender p,
     then m was previously sent to q by process p.

Implementation:

Implements:
  StubbornPointToPointLinks, instance sl.

Uses:
  FairLossPointToPointLinks, instance fll.

upon event ⟨ sl, Init ⟩ do
  sent := ∅;
  starttimer (∆);

upon event ⟨ Timeout ⟩ do
  forall (q, m) ∈ sent do
    trigger ⟨ fll, Send | q, m ⟩;
    starttimer (∆);

upon event ⟨sl, Send |q, m⟩ do
  trigger ⟨ fll, Send | q, m ⟩;
  sent := sent ∪ {(q, m)};

upon event ⟨ fll, Deliver | p, m ⟩ do
  trigger ⟨ sl, Deliver | p, m ⟩;

Specification:

Module:
  Name: PerfectPointToPointLinks, instance pl.

Events:
  Request: ⟨ pl, Send | q, m ⟩: Requests to send message m to process q.
  Indication: ⟨ pl, Deliver | p, m ⟩: Delivers message m sent by process p.

Properties:
  1. Reliable delivery: If a correct process p sends a message m to a
     correct process q, then q eventually delivers m.

  2. No duplication: No message is delivered by a process more than once.

  3. No creation: If some process q delivers a message m with sender p,
     then m was previously sent to q by process p.

Implementation:

Implements:
  PerfectPointToPointLinks, instance pl.

Uses:
  StubbornPointToPointLinks, instance sl.

upon event ⟨ pl, Init ⟩ do
  delivered := ∅;

upon event ⟨ pl, Send |q, m ⟩ do
  trigger ⟨ sl, Send | q, m ⟩;

upon event ⟨ sl, Deliver | p, m ⟩ do
  if m ∉ delivered then
    delivered := delivered ∪ {m};
    trigger ⟨ pl, Deliver | p, m ⟩;

Specification:

Module:
  Name: OrderedPerfectLinks, instance opl.

Events:
  Request: ⟨ opl, Send | q, m ⟩: Requests to send message m to process q.
  Indication: ⟨ opl, Deliver | p, m ⟩: Delivers message m sent by process p.

Properties:
  1. Reliable delivery: If a correct process p sends a message m to a
     correct process q, then q eventually delivers m.

  2. No duplication: No message is delivered by a process more than once.

  3. No creation: If some process q delivers a message m with sender p,
     then m was previously sent to q by process p.

  4. Ordered delivery: If a correct process p sends messages m1 before m2
     to a correct process q, then q will deliver m1 before m2.

Implementation:

Implements:
  OrderedPerfectPointToPointLinks, instance opl.

Uses:
  PerfectPointToPointLinks, instance pl.

upon event ⟨ opl, Init ⟩ do
  foreach p ∈ Π do
    receive_ts[p] := 1;
    send_ts[p] := 1;

  pending := ∅;

upon event ⟨opl, Send | q, m⟩ do
  trigger ⟨ pl, Send | q, [m, send_ts[q]] ⟩;
  send_ts[q] := send_ts[q] + 1;

upon event ⟨ pl, Deliver | p, [m, ts] ⟩ do
  pending := pending ∪ {(m, ts, p)};

upon (m, ts, p) ∈ pending and ts = receive_ts[p] do
  trigger ⟨ opl, Deliver | p, m ⟩;
  receive_ts[p] := receive_ts[p] + 1;
  pending := pending \ {(m, ts, p)}

This abstraction is useful for the fail-arbitrary model, to prevent Byzantine processes from doing bad things to the system.

Specification

Module:
  Name: AuthPerfectPointToPointLinks, instance al.

Events:
  Request: ⟨ al, Send | q, m ⟩: Requests to send message m to process q.
  Indication: ⟨ al, Deliver | p, m ⟩: Delivers message m sent by process p.

Properties:
  1. Reliable delivery: If a correct process sends a message m to a
     correct process q, then q eventually delivers m.

  2. No duplication: No message is delivered by a correct process more
     than once.

  3. Authenticity: If some correct process q delivers a message m with
     sender p and process p is correct, then m was previously sent to q
     by p.

Implementation:

Implements:
  AuthPerfectPointToPointLinks, instance al.

Uses:
  StubbornPointToPointLinks, instance sl.

upon event ⟨ al, Init ⟩ do
  delivered := ∅;

upon event ⟨ al, Send | q, m ⟩ do
  a := authenticate(self, q, m);
  trigger ⟨ sl, Send | q, [m, a] ⟩;

upon event ⟨ sl, Deliver | p, [m, a] ⟩ do
  if verifyauth(self, p, m, a) and m ∉ delivered then
     delivered := delivered ∪ {m};
     trigger ⟨ al, Deliver | p, m ⟩;

Broadcast with deliveredness properties

Best-effort broadcast

Specification:

Module:
  Name: BestEffortBroadcast, instance beb.

Events:
  Request: ⟨ beb, Broadcast | m ⟩: Broadcasts a message m to all processes.
  Indication: ⟨ beb, Deliver | p, m ⟩: Delivers a message m broadcast by process p.

Properties:
  1. Validity: If a correct process broadcasts a message m, then every
     correct process eventually delivers m.

  2. No duplication: No message is delivered more than once.

  3. No creation: If a process delivers a message m with sender s, then
     m was previously broadcast by process s.

Implementation:

Implements:
  BestEffortBroadcast, instance beb.

Uses:
  PerfectPointToPointLinks, instance pl.

upon event ⟨ beb, Broadcast | m ⟩ do
  forall q ∈ Π do
    trigger ⟨ pl, Send | q, m ⟩;

upon event ⟨ pl, Deliver | p, m ⟩ do
   trigger ⟨ beb, Deliver | p, m ⟩

Problem: If the sender crashes, some correct process may receive the message, some may not, thus creates inconsistency.

Regular reliable broadcast

Specification:

Module:
  Name: ReliableBroadcast, instance rb.

Events:
  Request: ⟨ rb, Broadcast | m ⟩: Broadcasts a message m to all processes.
  Indication: ⟨ rb, Deliver | p, m ⟩: Delivers a message m broadcast by process p.

Properties:
  1. Validity: If a correct process p broadcasts a message m, then p
     eventually delivers m.

  2. No duplication: No message is delivered more than once.

  3. No creation: If a process delivers a message m with sender s, then m
     was previously broadcast by process s.

  4. Agreement: If a message m is delivered by some correct process, then
     m is eventually delivered by every correct process.

Implementation for Fail-Stop model - Lazy Reliable Broadcast

Idea: If a process crashes, correct processes broadcast all messages from the crashed process.

Implements:
  ReliableBroadcast, instance rb.

Uses:
  BestEffortBroadcast, instance beb;
  PerfectFailureDetector, instance P.

upon event ⟨ rb, Init ⟩ do
  correct := Π;

  foreach p ∈ Π do
    from[p] := [∅];

upon event ⟨ rb, Broadcast | m ⟩ do
  trigger ⟨ beb, Broadcast | [DATA, self, m] ⟩;

upon event ⟨ beb, Deliver | p, [DATA, s, m] ⟩ do
  if m ∉ from[s] then
     trigger ⟨ rb, Deliver | s, m ⟩;
     from[s] := from[s] ∪ {m};

     if s ∉ correct then
        trigger ⟨ beb, Broadcast | [DATA, s, m] ⟩;

upon event ⟨P, Crash |p ⟩ do
  correct := correct \ {p};

  forall m ∈ from[p] do
    trigger ⟨ beb, Broadcast | [DATA, p, m] ⟩;

Implementation for Fail-Silent model - Eager Reliable Broadcast

Idea: If a correct process deliver a message, it also broadcast the message.

Implements:
  ReliableBroadcast, instance rb.

Uses:
  BestEffortBroadcast, instance beb.

upon event ⟨ rb, Init ⟩ do
  delivered := ∅;

upon event ⟨ rb, Broadcast | m ⟩ do
  trigger ⟨ beb, Broadcast | [DATA, self, m] ⟩;

upon event ⟨ beb, Deliver | p, [DATA, s, m] ⟩ do
  if m ∉ delivered then
     delivered := delivered ∪ {m};
     trigger ⟨ rb, Deliver | s, m ⟩;
     trigger ⟨ beb, Broadcast | [DATA, s, m] ⟩;

Problem: A faulty process may deliver a message which is not delivered by correct processes.

Uniform reliable broadcast

Specification

Module:
  Name: UniformReliableBroadcast, instance urb.

Events:
  Request: ⟨ urb, Broadcast | m ⟩: Broadcasts a message m to all processes.
  Indication: ⟨ urb, Deliver | p, m ⟩: Delivers a message m broadcast by process p.

Properties:
  1. Validity: If a correct process p broadcasts a message m, then p
     eventually delivers m.

  2. No duplication: No message is delivered more than once.

  3. No creation: If a process delivers a message m with sender s, then m
     was previously broadcast by process s.

  4. Uniform agreement: If a message m is delivered by some process
     (whether correct or faulty), then m is eventually delivered by every
     correct process.

Implementation for Fail-Stop model - All-Ack Uniform Reliable Broadcast

Idea: only deliver a message if all correct processes have acknowledged the reception of the message.

Implements:
  UniformReliableBroadcast, instance urb.

Uses:
  BestEffortBroadcast, instance beb.
  PerfectFailureDetector, instance P.

upon event ⟨ urb, Init ⟩ do
  delivered := ∅;
  pending := ∅;
  correct := Π;

  forall m do ack[m] := ∅;

upon event ⟨ urb, Broadcast | m ⟩ do
  pending := pending ∪ {(self, m)};
  trigger ⟨ beb, Broadcast | [DATA, self, m] ⟩;

upon event ⟨ beb, Deliver | p, [DATA, s, m] ⟩ do
  ack[m] := ack[m] ∪ {p};
  if (s, m) ∉ pending then
     pending := pending ∪ {(s, m)};
     trigger ⟨ beb, Broadcast | [DATA, s, m] ⟩;

upon event ⟨ P, Crash | p ⟩ do
  correct := correct \ {p};

function candeliver(m) returns Boolean is
  return (correct ⊆ ack[m]);

upon exists (s, m) ∈ pending such that candeliver(m) and m ∉ delivered do
  delivered := delivered ∪ {m};
  trigger ⟨ urb, Deliver | s, m ⟩;

Implementation for Fail-Silent model - Majority-Ack Uniform Reliable Broadcast

Idea: assume a majority of processes are correct

Implements:
  UniformReliableBroadcast, instance urb.

Uses:
  BestEffortBroadcast, instance beb.

upon event ⟨ urb, Init ⟩ do
  delivered := ∅;
  pending := ∅;

  forall m do ack[m] := ∅;

upon event ⟨ urb, Broadcast | m ⟩ do
  pending := pending ∪ {(self, m)};
  trigger ⟨ beb, Broadcast | [DATA, self, m] ⟩;

upon event ⟨ beb, Deliver | p, [DATA, s, m] ⟩ do
  ack[m] := ack[m] ∪ {p};
  if (s, m) ∉ pending then
     pending := pending ∪ {(s, m)};
     trigger ⟨ beb, Broadcast | [DATA, s, m] ⟩;

function candeliver(m) returns Boolean is 
  return #(ack[m]) > N/2;

upon exists (s, m) ∈ pending such that candeliver(m) and m ∉ delivered do
  delivered := delivered ∪ {m};
  trigger ⟨ urb, Deliver | s, m ⟩;

Terminating Reliable Broadcast

Terminating reliable broadcast is a kind of broadcast restricts what if the sender crashes, a special value should be delivered by all processes(correct or not). In terminating reliable broadcast, only a well-know process sends, other processes are receivers, and at most one message can be sent.

It’s important to note that uniform broadcast doesn’t solve the problem of terminating reliable broadcast, because in an asynchronous system, if the sender fails, the receiver can’t decide whether to wait or not. If it decides to wait, it might be the case that no process has received any message from the sender, thus it will wait forever. If it decides to terminate, sometime later another receiver may broadcast the value it has received from the sender before it crashes.

Specification:

Module:
  Name: UniformTerminatingReliableBroadcast, instance utrb, with sender s.

Events:
  Request: ⟨ utrb, Broadcast | m ⟩: Broadcasts a message m to all processes. Executed only by process s.
  Indication: ⟨ utrb, Deliver | p, m ⟩: Delivers a message m broadcast by process p or the symbol △.

Properties:
  1. Validity: If a correct process p broadcasts a message m, then p
     eventually delivers m.

  2. Termination: No process delivers more than one message.

  3. Integrity: If a correct process delivers some message m, then m was
     either previously broadcast by process s or it holds m = △.

  4. Uniform Agreement: If any process delivers a message m, then every
     correct process eventually delivers m.

The implementation of termination reliable broadcast depends on the primitive uniform consensus which is discussed later in this article.

Implements:
  UniformTerminatingReliableBroadcast, instance utrb, with sender s.

Uses:
  BestEffortBroadcast, instance beb;
  UniformConsensus, instance uc;
  PerfectFailureDetector, instance P.

upon event ⟨ utrb, Init ⟩ do
  proposal := ⊥;

upon event ⟨ utrb, Broadcast | m ⟩ do      // only process s
  trigger ⟨ beb, Broadcast | m ⟩;

upon event ⟨ beb, Deliver | s, m ⟩ do
  if proposal = ⊥ then
    proposal := m;
    trigger ⟨ uc, Propose | proposal ⟩;

uponevent ⟨ P,Crash | p ⟩ do
  if p = s ∧ proposal = ⊥ then
    proposal := △;
    trigger ⟨ uc, Propose | proposal ⟩;

upon event ⟨ uc, Decide | decided ⟩ do
  trigger ⟨ utrb, Deliver | s, decided ⟩;

Broadcast with ordering properties

FIFO-order reliable broadcast

Specification

Module:
  Name: FIFOReliableBroadcast, instance frb.

Events:
  Request: ⟨ frb, Broadcast | m ⟩: Broadcasts a message m to all processes.
  Indication: ⟨ frb, Deliver | p, m ⟩: Delivers a message m broadcast by process p.

Properties:
  1. Validity: If a correct process p broadcasts a message m, then p
     eventually delivers m.

  2. No duplication: No message is delivered more than once.

  3. No creation: If a process delivers a message m with sender s, then m
     was previously broadcast by process s.

  4. Agreement: If a message m is delivered by some correct process, then
     m is eventually delivered by every correct process.

  5. FIFO delivery: If some process broadcasts message m1 before it
     broadcasts message m2, then no correct process delivers m2 unless it
     has already delivered m1.

Implementation for Fail-Silent model: Broadcast with Sequence Number

Implements:
  FIFOReliableBroadcast, instance frb.

Uses:
  ReliableBroadcast, instance rb.

upon event ⟨ frb, Init ⟩ do
  lsn := 0;
  pending := ∅;

  foreach p ∈ Π do
    next[p] := 1;

upon event ⟨ frb, Broadcast | m ⟩ do
  lsn := lsn + 1;
  trigger ⟨ rb, Broadcast | [DATA, self, m, lsn] ⟩;

upon event ⟨ rb, Deliver | p, [DATA, s, m, sn] ⟩ do
  pending := pending ∪ {(s, m, sn)};

  while exists (s, m′, sn′) ∈ pending such that sn′ = next[s] do
    next[s] := next[s] + 1;
    pending := pending \ {(s, m′ , sn′)};
    trigger ⟨ frb, Deliver | s, m′ ⟩;

Causal-order broadcast

Causal order can be defined recursively. We denote m1 → m2(meaning m1 was “sent before” m2) if and only if any of the following conditions holds:

  • some process p broadcasts m1 before it broadcasts m2;
  • some process p delivers m1 and subsequently broadcasts m2; or
  • there exists some message m′ such that m1 → m′ and m′ → m2.

The causal order property can be added to regular reliable broadcast or uniform reliable broadcast to form causal-order regular reliable broadcast or causal-order uniform reliable broadcast respectively. Here we only concern us with the former case.

Specification:

Module:
  Name: CausalOrderReliableBroadcast, instance crb.

Events:
  Request: ⟨ crb, Broadcast | m ⟩: Broadcasts a message m to all processes.
  Indication: ⟨ crb, Deliver | p, m ⟩: Delivers a message m broadcast by process p.

Properties:
  1. Validity: If a correct process p broadcasts a message m, then p
     eventually delivers m.

  2. No duplication: No message is delivered more than once.

  3. No creation: If a process delivers a message m with sender s, then m
     was previously broadcast by process s.

  4. Agreement: If a message m is delivered by some correct process, then
     m is eventually delivered by every correct process.

  5. Causal delivery: For any message m1 that potentially caused a
     message m2, i.e., m1 → m2, no process delivers m2 unless it has
     already delivered m1.

Implementation 1 for Fail-Silent model: No-Waiting Causal Broadcast

Idea: send message with its full history

Implements:
  CausalOrderReliableBroadcast, instance crb.

Uses:
  ReliableBroadcast, instance rb.

upon event ⟨ crb, Init ⟩ do
  delivered := ∅;
  past := [];

upon event ⟨ crb, Broadcast | m ⟩ do
  trigger ⟨ rb, Broadcast | [DATA, past, m] ⟩;
  append(past, (self, m));

upon event ⟨ rb, Deliver | p, [DATA, mpast, m] ⟩ do
  if m ∉ delivered then
    forall (s, n) ∈ mpast do                // by the order in the list
      if n ∉ delivered then
         trigger ⟨ crb, Deliver | s, n ⟩;
         delivered := delivered ∪ {n};

         if (s, n) ∉ past then
            append(past, (s, n));

         trigger ⟨ crb, Deliver | p, m ⟩;
         delivered := delivered ∪ {m};

         if (p, m) ∉ past then
            append(past, (p, m));

Problem: history too long

Idea: if all processes have delivered the message, it doesn’t make sense for the message to be in the history any more.

Solution: With a perfect failure detector, when a process rb-delivers a message m, the process rb-broadcasts an ACK message to all processes; when an ACK message for message m has been rb-delivered from all correct processes, then m is purged from past. The implementation is omitted here.

Implementation 2 for Fail-Silent model: Waiting Causal Broadcast

Idea: use vector clock. We can think vector clock as a partial order timestamp.

In the following, if we just look at one column of the vector clock, it looks very similiar to the FIFO algorithm. It’s obvious it guarantees FIFO-delivery of messages with respect to a single process. The difference with FIFO is that each message is not only associated with its own local clock, but also associated with each other process’ clock, thus a vector clock. The algorithm maintains two vector clocks, one is the send timestamp W, one is the delivery timestamp V(to remember the next expected messages). From an abstract point of view, the general form of all timestamp-ordered algorithms looks the same.

Implements:
  CausalOrderReliableBroadcast, instance crb.

Uses:
  ReliableBroadcast, instance rb.

upon event ⟨ crb, Init ⟩ do
  foreach p ∈ Π do
    V[p] := 0;

  lsn := 0;
  pending := ∅;

upon event ⟨ crb, Broadcast | m ⟩ do
  W := V;
  W[self] := lsn;
  lsn := lsn + 1;
  trigger ⟨ rb, Broadcast | [DATA, W, m] ⟩;

upon event ⟨ rb, Deliver | p, [DATA, W, m] ⟩ do
  pending := pending ∪ {(p, W, m)};

  while exists (p′, W′, m′) ∈ pending such that W′ ≤ V do
     pending := pending \ {(p′, W′, m′)};
     V[p′] := V [p′] + 1;
     trigger ⟨ crb, Deliver | p′, m′ ⟩;

Total-order broadcast

The total-order property can be added to regular reliable broadcast or uniform reliable broadcast to form total-order regular reliable broadcast or total-order uniform reliable broadcast respectively. Here we only concern us with the former case.

The most common usage of total-order broadcast in real world is to implement replicated services to achieve fault tolerance.

Specification

Module:
  Name: TotalOrderBroadcast, instance tob.

Events:
  Request: ⟨ tob, Broadcast | m ⟩: Broadcasts a message m to all processes.
  Indication: ⟨ tob, Deliver | p, m ⟩: Delivers a message m broadcast by process p.

Properties:
  1. Validity: If a correct process p broadcasts a message m, then p
    eventually delivers m.

  2. No duplication: No message is delivered more than once.

  3. No creation: If a process delivers a message m with sender s, then
     m was previously broadcast by process s.

  4. Agreement: If a message m is delivered by some correct process, then
     m is eventually delivered by every correct process.

  5. Total order: Let m1 and m2 be any two messages and suppose p and q
     are any two correct processes that deliver m1 and m2. If p delivers
     m1 before m2, then q delivers m1 before m2.

The implementation of total-order broadcast depends on the consensus primitive(see below), and total-order broadcast can be used to implement consensus.

Implementation for Fail-Silent model: Consensus-Based Total-Order Broadcast

Implements:
  TotalOrderBroadcast, instance tob.

Uses:
  ReliableBroadcast, instance rb;
  Consensus (multiple instances).

upon event ⟨ tob, Init ⟩ do
  unordered := ∅;
  delivered := ∅;
  round := 1;
  wait := FALSE;

upon event ⟨ tob, Broadcast | m ⟩ do
  trigger ⟨ rb, Broadcast | m ⟩;

upon event ⟨ rb, Deliver | p, m ⟩ do
  if m ∉ delivered then
     unordered := unordered ∪ {(p, m)};

upon unordered ≠ ∅ and wait = FALSE do
  wait := TRUE;
  Initialize a new instance c.round of consensus;
  trigger ⟨ c.round, Propose | unordered ⟩;

upon event ⟨ c.r, Decide | decided ⟩ such that r = round do
  forall (s, m) ∈ sort(decided) do
    trigger ⟨ tob, Deliver | s, m ⟩;

  delivered := delivered ∪ decided;
  unordered := unordered \ decided;
  round := round + 1;
  wait := FALSE;

Consensus

Solving consensus is the key to solving many problems in distributed computing(e.g. various problems of consistency).

Fischer, Lynch, and Paterson (1985) had proved that no deterministic algorithm implements consensus in a fail-silent model, even if only one process fails. The implementation has to either assume perfect failure detector or fail-noisy model.

Regular Consensus

Specfication

Module:
  Name: Consensus, instance c.

Events:
  Request: ⟨ c, Propose | v ⟩: Proposes value v for consensus.
  Indication: ⟨ c, Decide | v ⟩: Outputs a decided value v of consensus.

Properties:
  1. Termination: Every correct process eventually decides some value.

  2. Validity: If a process decides v, then v was proposed by some process.

  3. Integrity: No process decides twice.

  4. Agreement: No two correct processes decide differently.

Implementation 1 for Fail-Stop model: Flooding Consensus

Idea: Each process collect all proposed values from all processes, and then decides on the smallest proposed value.

Implements:
  Consensus, instance c.

Uses:
  BestEffortBroadcast, instance beb;
  PerfectFailureDetector, instance P.

upon event ⟨ c, Init ⟩ do
  correct := Π;
  round := 1;
  decision := ⊥;
  receivedfrom := [∅];
  proposals := [∅];
  receivedfrom[0] := Π;

upon event ⟨ P,Crash | p ⟩ do
  correct := correct \ {p};

upon event ⟨ c, Propose | v ⟩ do
  proposals[1] := proposals[1] ∪ {v};
  trigger ⟨ beb, Broadcast | [PROPOSAL, 1, proposals[1]] ⟩;

upon event ⟨ beb, Deliver | p, [PROPOSAL, r, ps] ⟩ do
  receivedfrom[r] := receivedfrom[r] ∪ {p};
  proposals[r] := proposals[r] ∪ ps;

upon correct ⊆ receivedfrom[round] ∧ decision = ⊥ do
  if receivedfrom[round] = receivedfrom[round − 1] then
    decision := min(proposals[round]);
    trigger ⟨ beb, Broadcast | [DECIDED, decision] ⟩;
    trigger ⟨ c, Decide | decision ⟩;
  else
    round := round + 1;
    trigger ⟨ beb, Broadcast | [PROPOSAL, round, proposals[round − 1]] ⟩;

upon event ⟨ beb, Deliver | p, [DECIDED, v] ⟩ such that p ∈ correct ∧ decision = ⊥ do
  decision := v;
  trigger ⟨ beb, Broadcast | [DECIDED, decision] ⟩;
  trigger ⟨ c, Decide | decision ⟩;

Why the algorithm proceeds in rounds & exchange collected proposals? That’s the only possible way to ensure that all correct processes can collect all possible proposals with the presence of crash-stop processes.

Note that a process can execute at most N rounds, the worst-case is that at each round a process crashes.

Implementation 2 for Fail-Stop model: Hierarchical Consensus

Idea: The correct process with the smallest identity in the hierarchy imposes its value(may be the value decided by previous faulty processes) on all other processes.

Implements:
  Consensus, instance c.

Uses:
  BestEffortBroadcast, instance beb;
  PerfectFailureDetector, instance P.

upon event ⟨ c, Init ⟩ do
  suspected := ∅;
  round := 1;
  proposal := ⊥;
  proposer := 0;
  delivered := [FALSE];
  broadcast := FALSE;

upon event ⟨ P,Crash | p ⟩ do
  suspected := suspected ∪ {p};

upon event ⟨ c, Propose | v ⟩ such that proposal = ⊥ do
  proposal := v;

upon round = self ∧ proposal ≠ ⊥ ∧ broadcast = FALSE do
  broadcast := TRUE;
  trigger ⟨ beb, Broadcast | [DECIDED, proposal] ⟩;
  trigger ⟨ c, Decide | proposal ⟩;

upon round ∈ suspected ∨ delivered[round] = TRUE do
  round := round + 1;

upon event ⟨ beb, Deliver | p,[DECIDED, v] ⟩ do
  if p < self ∧ p > proposer then
     proposal := v;
     proposer := p;

  delivered[p] := TRUE;

Uniform Consensus

Uniform consensus ensures that no two processes decide different values, whether they are correct or not.

Module:
  Name: UniformConsensus, instance uc.

Events:
  Request: ⟨ uc, Propose | v ⟩: Proposes value v for consensus.
  Indication: ⟨ uc, Decide | v ⟩: Outputs a decided value v of consensus.

Properties:
  1. Termination: Every correct process eventually decides some value.

  2. Validity: If a process decides v, then v was proposed by some process.

  3. Integrity: No process decides twice.

  4. Uniform agreement: No two processes decide differently.

Implementation 1 for Fail-Stop model: Flooding Uniform Consensus

Idea: avoid making decision too early by faulty processes. All processes make decision after N rounds!

Implements:
  UniformConsensus, instance uc.

Uses:
  BestEffortBroadcast, instance beb;
  PerfectFailureDetector, instance P.

upon event ⟨ uc, Init ⟩ do
  correct := Π;
  round := 1;
  decision := ⊥;
  proposalset := ∅;
  receivedfrom := ∅;

upon event ⟨ P,Crash | p ⟩ do
  correct := correct \ {p};

upon event ⟨ uc, Propose | v ⟩ do
  proposalset := proposalset ∪ {v};
  trigger ⟨ beb, Broadcast | [PROPOSAL, 1, proposalset] ⟩;

upon event ⟨ beb, Deliver | p, [PROPOSAL, r, ps] ⟩ such that r = round do
  receivedfrom := receivedfrom ∪ {p};
  proposalset := proposalset ∪ ps;

upon correct ⊆ receivedfrom ∧ decision = ⊥ do
  if round = N then
    decision := min(proposalset);
    trigger ⟨ uc, Decide | decision ⟩;
  else
    round := round + 1;
    receivedfrom := ∅;               // reset for next round
    trigger ⟨ beb, Broadcast | [PROPOSAL, round, proposalset] ⟩;

Implementation 2 for Fail-Stop model: Hierarchical Uniform Consensus

Idea: in each round, make sure the to be decided value is received by all processes. This relies on perfect failure detector and ACK messages.

Note: there’s a simpler but slower solution. To avoid making decisions too early, just like the uniform version of the flooding consensus, all processes make decisions at round N. This is just a simple modification of the regular version of hierarchical consensus algorithm, thus omitted here.

Implements:
  UniformConsensus, instance uc.

Uses:
  PerfectPointToPointLinks, instance pl;
  BestEffortBroadcast, instance beb;
  ReliableBroadcast, instance rb;
  PerfectFailureDetector, instance P.

upon event ⟨ uc, Init ⟩ do
  suspected := ∅;
  ackranks := ∅;
  round := 1;
  proposal := ⊥;
  decision := ⊥;
  proposed := [⊥];

upon event ⟨ P,Crash | p ⟩ do
  suspected := suspected ∪ {p};

upon event ⟨ uc, Propose | v ⟩ such that proposal = ⊥ do
  proposal := v;

upon round = self ∧ proposal ≠ ⊥ ∧ decision = ⊥ do
  trigger ⟨ beb, Broadcast | [PROPOSAL, proposal] ⟩;

upon event ⟨ beb, Deliver | p, [PROPOSAL, v] ⟩ do
  proposed[p] := v;
  if p ≥ round then
     trigger ⟨ pl, Send | p, [ACK] ⟩;

upon round ∈ suspected do
  if proposed[round] ≠ ⊥ then
     proposal := proposed[round];

  round := round + 1;

upon event ⟨ pl, Deliver | q, [ACK] ⟩ do
  ackranks := ackranks ∪ {q};

upon suspected ∪ ackranks = {1, . . . , N } do
  trigger ⟨ rb, Broadcast | [DECIDED, proposal] ⟩;

upon event ⟨ rb, Deliver | p, [DECIDED, v] ⟩ such that decision = ⊥ do
  decision := v;
  trigger ⟨ uc, Decide | decision ⟩;

A round consists of two communication steps: within the same round, the leader broad- casts a PROPOSAL message to all processes, trying to impose its value, and then expects to obtain an acknowledgment from all correct processes.

Uniform Consensus in the Fail-Noisy Model

Any fail-noisy algorithm that solves consensus also solves uniform consensus. The implementation assumes:

  • a correct majority
  • a ♢P failure detector

The algorithm is also round-based: processes move incrementally from one round to the other. Process pi is leader in every round k such that k mod N = i. In such a round, pi tries to decide.

To decide, pi executes following steps:

  1. pi selects the latest adopted value among a majority.
  2. pi imposes that value at a majority: any process in that majority adopts that value – pi fails if it is suspected(processes that suspect pi inform pi and move to the next round; pi does so as well).
  3. pi decides and reliably broadcasts the decision to all other processes.

The core of the algorithm ensures that a subsequent round respects the uniform agreement property of consensus, such that if a process might already have decided in the current round, the decision value becomes fixed. A process that decides in a later round must also decide the fixed value.

The assumption of a majority of correct processes is important in two respects:

  • ensure the value decided in previous rounds is passed to later rounds;
  • enable the leader process to decide within a round in a fail-silent model.

This algorithm is first introduced by Lamport under the name Paxos. The implementation is very tedious, thus it is omitted here.

Non-blocking atomic commit

In this section, we’ll see how the primitives are used in distributed databases to achieve non-blocking atomic commit.

The processes, each representing a data manager, agree on the outcome of a transaction, which is either to commit or to abort the transaction. Commit can only be decided if all processes are able to commit.

Note that a well-known algorithm in this context is two-phase commit, which has a serious bug if the coordinator crashes. This turns out to be the same problem as in terminating reliable broadcast. If the coordinator crashes, other processes can’t decide to wait or not. Unifrom broadcast doesn’t help in such cases as explained in the case of terminating reliable broadcast. If such a case happen, not only the whole system becomes unavailable, the data at different data managers may become inconsistent.

Specification:

Module:
  Name: NonBlockingAtomicCommit, instance nbac.

Events:
  Request: ⟨ nbac, Propose | v ⟩: Proposes value v = COMMIT or v = ABORT for the commit.
  Indication: ⟨ nbac, Decide | v ⟩: Outputs the decided value for the commit.

Properties:
  1. Termination: Every correct process eventually decides some value.

  2. Abort-Validity: A process can only decide ABORT if some process
     proposes ABORT or a process crashes.

  3. Commit-Validity: A process can only decide COMMIT if no process
     proposes ABORT.

  4. Integrity: No process decides twice.

  5. Uniform Agreement: No two processes decide differently.

Implementation in Fail-Stop model:

Implements:
  NonBlockingAtomicCommit, instance nbac.

Uses:
  BestEffortBroadcast, instance beb;
  UniformConsensus, instance uc;
  PerfectFailureDetector, instance P.

upon event ⟨ nbac, Init ⟩ do
  voted := ∅;
  proposed := FALSE;

upon event ⟨ P, Crash | p ⟩ do
  if proposed = FALSE then
     trigger ⟨ uc, Propose | ABORT ⟩;
     proposed := TRUE;

upon event ⟨ nbac, Propose | v ⟩ do
  trigger ⟨ beb, Broadcast | v ⟩;

upon event ⟨ beb, Deliver | p, v ⟩ do
  if v = ABORT ∧ proposed = FALSE then
     trigger ⟨ uc, Propose | ABORT ⟩;
     proposed := TRUE;
  else
     voted := voted ∪ {p};

     if voted = Π ∧ proposed = FALSE do
        trigger ⟨ uc, Propose | COMMIT ⟩;
        proposed := TRUE;

upon event ⟨ uc, Decide | decided ⟩ do
  trigger ⟨ nbac, Decide | decided ⟩;

Group Membership

Group membership is also a recurring problem in distributed systems, which can be solved using the three primitives.

Specification:

Module:
  Name: GroupMembership, instance gm.

Events:
  Indication: ⟨ gm, View | V ⟩: Installs a new view V = (id,M) with view identifier id and membership M.

Properties:
  1. Monotonicity: If a process p installs a view V = (id, M) and
     subsequently installs a view V′ =(id′,M′),then id < id′ and M ⊇ M′.

  2. Uniform Agreement: If some process installs a view V = (id, M) and
     another process installs some view V′ = (id, M′), then M = M′.

  3. Completeness: If a process p crashes, then eventually every correct
     process installs a view (id, M) such that p ∉ M.

  4. Accuracy: If some process installs a view (id, M) with q ∉ M for
     some process q ∈ Π, then q has crashed.

Implementation in Fail-Stop model:

Implements:
  GroupMembership, instance gm.

Uses:
  UniformConsensus (multiple instance);
  PerfectFailureDetector, instance P.

upon event ⟨ gm, Init ⟩ do
  (id, M) := (0, Π);
  correct := Π;
  wait := FALSE;
  trigger ⟨ gm, View | (id, M ) ⟩;

upon event ⟨ P,Crash | p ⟩ do
  correct := correct \ {p};

upon correct ⊊ M ∧ wait = FALSE do
  id := id + 1;
  wait := TRUE;
  Initialize a new instance uc.id of uniform consensus;
  trigger ⟨ uc.id, Propose | correct ⟩;

upon event ⟨ uc.i, Decide | M′ ⟩ such that i = id do
  M := M′;
  wait := FALSE;
  trigger ⟨ gm, View | (id, M ) ⟩;

View Synchronous Broadcast

View synchronous broadcast is an abstraction that results from the combination of group membership and reliable broadcast or uniform broadcast. It ensures that the delivery of messages is coordinated with the installation of views.

Specification

The specification for reliable view synchronous broadcast is listed below, the uniform version just needs to replace the properties of reliable broadcast with uniform broadcast.

Module:
  Name: ViewSynchronousCommunication, instance vs.

Events:
  Request: ⟨ vs, Broadcast | m ⟩: Broadcasts a message m to all processes.
  Indication: ⟨ vs, Deliver | p, m ⟩: Delivers a message m broadcast by process p.
  Indication: ⟨ vs, View | V ⟩: Installs a new view V = (id,M) with view identifier id and membership M.
  Indication: ⟨ vs, Block ⟩: Requests that no new messages are broadcast temporarily until the next view is installed.
  Request: ⟨ vs, BlockOk ⟩: Confirms that no new messages will be broadcast until the next view is installed.

Properties:
  1. View Inclusion: If some process delivers a message m from process
     p in view V , then m was broadcast by p in view V.

  2-5. Same as properties 1-4 in regular reliable broadcast.

  6-9. Same as properties 1-4 in group membership.

Implementation

The idea for all algorithms in this category is to share all messages that are delivered in current view before installing a new view. Terminating reliable broadcast is used here. And it also has to make sure that no more messages are sent during this unifying period, thus the special messages Block and BlockOK is used. The upper layer must not send any message within the period between BlockOK and View.

Implementation based on Terminating Reliable Broadcast:

Implements:
  ViewSynchronousCommunication, instance vs.

Uses:
  TerminatingReliableBroadcast, instance trb;
  GroupMembership, instance gm;
  BestEffortBroadcast, instance beb.

upon event < vs, Init > do
  view := (0, Π);
  nextView := ⊥;
  pending := delivered := deliveredPair := trbDone := ∅;
  flushing := blocked := FALSE;

upon event < vs, Broadcast | m > and (blocked = FALSE) do
  delivered := delivered ∪ {m}
  deliveredPair := deliveredPair ∪ {(self, m)}
  trigger < vs, Deliver | self, m >;
  trigger < beb, Broadcast | [Data, view.id, m] >;

upon event < beb, Deliver | src, [Data, vid, m] > do
  if (view.id = vid) and (m ∉ delivered) and (blocked = FALSE) then
    delivered := delivered ∪ { m }
    deliveredPair := deliveredPair ∪ {(src, m)}
    trigger < vs, Deliver | src, m >;

upon event < gm, View | V > do
  addtoTail(pending, V);

upon (pending ≠ ∅) and (flushing = FALSE) do
  nextView := removeFromhead (pending);
  flushing := TRUE;
  trigger < vs, Block >;

upon event < vs, BlockOk > do
  blocked := TRUE;
  trbDone := ∅;

  forall p ∈ view.M do
    Initialize a new instance trb.vid.p of uniform terminating reliable
    broadcast with sender p;

    if p = self then
       trigger ⟨ trb.vid.p, Broadcast | deliveredPair ⟩;

upon event < trb.id.p, Deliver | p, pairs > such that view.id = id do
  trbDone := trbDone ∪ {p};

  if body ≠ △ then
    forall (src, m) ∈ pairs and m ∉ delivered do
      delivered := delivered ∪ {m};
      trigger < vs, Deliver | src, m >;

upon (trbDone = view.M) and (blocked = TRUE) do
  view := nextView;
  flushing := blocked := FALSE;
  delivered := ∅;
  deliveredPair := ∅;
  trigger < vs, View | view >;

Implementation based on Uniform Consensus:

Implements:
 ViewSynchronousCommunication, instance vs.

Uses:
  UniformConsensus, instance uc;
  PerfectFailureDetector, instance P;
  BestEffortBroadcast, instance beb.

upon event < vs, Init > do
  view := (0, Π);
  correct := Π;
  flushing := blocked := FALSE;
  delivered := dset := ∅;

  trigger ⟨ vs, View | view ⟩;

upon event < vs, Broadcast | m > and (blocked = FALSE) do
  delivered := delivered ∪ {m}
  trigger < vs, Deliver | self, m >;
  trigger < beb, Broadcast | [Data,view.id,m] >;

upon event < beb, Deliver | src, [Data,vid,m] > do
  if (view.id = vid) and m ∉ delivered and blocked = FALSE then
     delivered := delivered ∪ {m}
     trigger < vs, Deliver | src, m >;

upon event < P, crash | p > do
  correct := correct \ {p};
  if flushing = FALSE then
     flushing := true;
     trigger < vs, Block >;

upon event < vs, BlockOk > do
  blocked := true;
  trigger < beb, Broadcast | [DSET,view.id,delivered] >;

upon event < beb, Deliver | src, [DSET,vid,del] > do
  dset:= dset ∪ (src,del);
  if forall p ∈ correct, (p, mset) ∈ dset then
     trigger < uc, Propose | view.id+1, correct, dset >;

upon event < uc, Decided | id, memb, vsdset > do
  forall (p,mset) ∈ vsdset: p ∈ memb do
      forall (src,m) ∈ mset: m ∉ delivered do
          delivered := delivered ∪ {m}
          trigger <vsDeliver, src, m>;

  view := (id, memb);
  flushing := blocked := FALSE;
  dset := delivered := ∅;

  trigger < vs, View | view >;

Implementation of Uniform View Synchronous Broadcast

It’s important to note that simple usage of uniform broadcast doesn’t solve the problem, as uniform broadcast only guarantees uniform delivery, not uniform delivery within view. It’s possible that following scenario would happen:

  • process 1 uniform delivers m in view v(i), thus triggers vs-delivery, then crashes
  • process 2 and process 3 install view v(i+1)
  • process 2 and process 3 uniform delivers m, but ignored due to mismatch of view id

In the above we see that in view v(i), the faulty process 1 vs-delivers m, but which is not delivered by process 2 and process 3.

The idea is to use ACK messages to inform each other process that a message has been received, which is similar to the fail-stop implementation of uniform broadcast. But here we’ve to make sure that such ACK messages are only valid within a specific view.

Implements:
  UniformViewSynchronousCommunication, instance uvs.

Uses:
  UniformConsensus (multiple instances);
  BestEffortBroadcast, instance beb;
  PerfectFailureDetector, instance P.

upon event ⟨ uvs, Init ⟩ do
  (vid, M) := (0, Π);
  correct := Π;
  flushing := FALSE;
  blocked := FALSE;
  wait := FALSE;
  pending := ∅;
  delivered := ∅;
  seen := [⊥];

  forall m do
    ack[m] := ∅;

  trigger ⟨ uvs, View | (vid, M ) ⟩;

upon event ⟨ uvs, Broadcast | m ⟩ such that blocked = FALSE do
  pending := pending ∪ {(self, m)};
  trigger ⟨ beb, Broadcast | [DATA, vid, self, m] ⟩;

upon event ⟨ beb, Deliver | p, [DATA, id, s, m] ⟩ do
  if id = vid ∧ blocked = FALSE then
     ack[m] := ack[m] ∪ {p};

     if (s, m) ∉ pending then
        pending := pending ∪ {(s, m)};
        trigger ⟨ beb, Broadcast | [DATA, vid, s, m] ⟩;

upon exists (s, m) ∈ pending such that M ⊆ ack[m] ∧ m ∉ delivered do
  delivered := delivered ∪ {m};
  trigger ⟨ uvs, Deliver | s, m ⟩;

upon event ⟨ P, Crash | p ⟩ do
 correct := correct \ {p};

upon correct ⊊ M ∧ flushing = FALSE do
  flushing := TRUE;
  trigger ⟨ uvs, Block ⟩;

upon event ⟨ uvs, BlockOk ⟩ do
  blocked := TRUE;
  trigger ⟨ beb, Broadcast | [PENDING, vid, pending] ⟩;

upon event ⟨ beb, Deliver | p, [PENDING, id, pd] ⟩ such that id = vid do
  seen[p] := pd;

upon (forall p ∈ correct : seen[p] ≠ ⊥) ∧ wait = FALSE do
  wait := TRUE;
  vid := vid + 1;

  Initialize a new instance uc.vid of uniform consensus;
  trigger ⟨ uc.vid, Propose | (correct, seen) ⟩;

upon event ⟨ uc.id, Decide | (M′, S) ⟩ such that id = vid do
  forall p ∈ M′ such that S[p] ≠ ⊥ do
      forall (s, m) ∈ S[p] such that m ∉ delivered do
          delivered := delivered ∪ {m};
          trigger ⟨ uvs, Deliver | s, m ⟩;

  flushing := FALSE;
  blocked := FALSE;
  wait := FALSE;
  pending := ∅;
  seen := [⊥];
  forall m do ack[m] := ∅;

  M := M′;
  trigger ⟨ uvs, View | (vid, M ) ⟩;

Registers

Register is the basic contruct to model various distributed storage systems. Though consensus can be used to easily solve problems in this section, it’s avoided because more light-weight solutions are available.

Registers store values and can be accessed through two operations, read and write. Registers can be classified by the number of readers and writers:

  • (1, 1) means one writer and one reader
  • (1, N) means one writer and multiple readers
  • (N, N) means multiple writers and multiple readers

Registers can also be classified by the behaviors of read and write operations:

  • Safe registers: read can return any value in face of concurrent writes
  • Regular registers: read can only return last value or the new value in face of concurrent writes
  • Atomic registers: it behaves as if all read and write operations can be linearized

(1, N) Regular Registers

Specification:

Module:
  Name: (1, N)-RegularRegister, instance onrr.

Events:
  Request: ⟨ onrr, Read ⟩: Invokes a read operation on the register.
  Request: ⟨ onrr, Write | v ⟩: Invokes a write operation with value v on the register.
  Indication: ⟨ onrr, ReadReturn | v ⟩: Completes a read operation on the register with return value v.
  Indication: ⟨ onrr, WriteReturn ⟩: Completes a write operation on the register.

Properties:
  1. Termination: If a correct process invokes an operation, then the
     operation eventually completes.

  2. Validity: A read that is not concurrent with a write returns the
     last value written; a read that is concurrent with a write returns
     the last value written or the value concurrently written.

Implementation for Fail-Stop model: Read-One Write-All (1, N) Regular Register

Implements:
  (1, N)-RegularRegister, instance onrr.

Uses:
  BestEffortBroadcast, instance beb;
  PerfectPointToPointLinks, instance pl;
  PerfectFailureDetector, instance P.

upon event ⟨ onrr, Init ⟩ do
  val := ⊥;
  correct := Π;
  writeset := ∅;

upon event ⟨ P, Crash | p ⟩ do
 correct := correct \ {p};

upon event ⟨ onrr, Read ⟩ do
  trigger ⟨ onrr, ReadReturn | val ⟩;

upon event ⟨ onrr, Write | v ⟩ do
  trigger ⟨ beb, Broadcast | [WRITE, v] ⟩;

upon event ⟨ beb, Deliver | q, [WRITE, v] ⟩ do
  val := v;
  trigger ⟨ pl, Send | q, ACK ⟩;

upon event ⟨ pl, Deliver | p, ACK ⟩ do
  writeset := writeset ∪ {p};

upon correct ⊆ writeset do
  writeset := ∅;
  trigger ⟨ onrr, WriteReturn ⟩;

Implementation for Fail-Silent model: Majority Voting Regular Register

The idea is to assume a majority of correct processes, each WRITE and READ waits for a majority of replies to make sure they interset at least one process.

Implements:
  (1, N)-RegularRegister, instance onrr.

Uses:
  BestEffortBroadcast, instance beb;
  PerfectPointToPointLinks, instance pl.

upon event ⟨ onrr, Init ⟩ do
  (ts, val) := (0, ⊥);
  wts := 0;
  acks := 0;
  rid := 0;
  readlist := [⊥];

upon event ⟨ onrr, Write | v ⟩ do
  wts := wts + 1;
  acks := 0;
  trigger ⟨ beb, Broadcast | [WRITE, wts, v] ⟩;

upon event ⟨ beb, Deliver | p, [WRITE, ts′, v′] ⟩ do
  if ts′ > ts then
     (ts, val) := (ts′, v′);

  trigger ⟨ pl, Send | p, [ACK, ts′] ⟩;

upon event ⟨ pl, Deliver | q, [ACK, ts′] ⟩ such that ts′ = wts do
  acks := acks + 1;

  if acks > N/2 then
     acks := 0;
     trigger ⟨ onrr, WriteReturn ⟩;

upon event ⟨ onrr, Read ⟩ do
  rid := rid + 1;
  readlist := [⊥];
  trigger ⟨ beb, Broadcast | [READ, rid] ⟩;

upon event ⟨ beb, Deliver | p, [READ, r] ⟩ do
  trigger ⟨ pl, Send |p, [VALUE, r, ts, val] ⟩;

upon event ⟨ pl, Deliver | q, [VALUE, r, ts′, v′] ⟩ such that r = rid do
  readlist[q] := (ts′ , v′);

  if #(readlist) > N/2 then
     v := highestval(readlist);
     readlist := [⊥];
     trigger ⟨ onrr, ReadReturn | v ⟩;

(1, N) Atomic Registers

Specification:

Module:
  Name: (1, N)-AtomicRegister, instance onar.

Events:
  Request: ⟨ onar, Read ⟩: Invokes a read operation on the register.
  Request: ⟨ onar, Write | v ⟩: Invokes a write operation with value v on the register.
  Indication: ⟨ onar, ReadReturn | v ⟩: Completes a read operation on the register with return value v.
  Indication: ⟨ onar, WriteReturn ⟩: Completes a write operation on the register.

Properties:
  1. Termination: If a correct process invokes an operation, then the
     operation eventually completes.

  2. Validity: A read that is not concurrent with a write returns the
     last value written; a read that is concurrent with a write returns
     the last value written or the value concurrently written.

  3. Ordering: If a read returns a value v and a subsequent read returns
     a value w, then the write of w does not precede the write of v.

Implementation for Fail-Stop model: Read-Impose Write-All

The changes with respect to the Read-One Write-All (1, N) Regular Register algorithm are as follows:

  • The reader now also writes to all processes before return the value
  • Timestamp is introduced

It guarantees that a later READ R1 can’t read an older value than an earlier READ R2, because R2 should have imposed the newer value to all processes.

Implements:
  (1, N)-AtomicRegister, instance onar.

Uses:
  BestEffortBroadcast, instance beb;
  PerfectPointToPointLinks, instance pl;
  PerfectFailureDetector, instance P.

upon event ⟨ onar, Init ⟩ do
  (ts, val) := (0, ⊥);
  correct := Π;
  writeset := ∅;
  readval := ⊥;
  reading := FALSE;

upon event ⟨ P, Crash | p ⟩ do
  correct := correct \ {p};

upon event ⟨ onar, Read ⟩ do
  reading := TRUE;
  readval := val;
  trigger ⟨ beb, Broadcast | [WRITE, ts, val] ⟩;

upon event ⟨ onar, Write | v ⟩ do
  trigger ⟨ beb, Broadcast | [WRITE, ts + 1, v] ⟩;

upon event ⟨ beb, Deliver | p, [WRITE, ts′, v′] ⟩ do
  if ts′ > ts then
     (ts, val) := (ts′, v′);
  trigger ⟨ pl, Send | p, [ACK] ⟩;

upon event ⟨ pl, Deliver | p, [ACK] ⟩ do
  writeset := writeset ∪ {p};

upon correct ⊆ writeset do
  writeset := ∅;

  if reading = TRUE then
     reading := FALSE;
     trigger ⟨ onar, ReadReturn | readval ⟩;
  else
     trigger ⟨ onar, WriteReturn ⟩;

Implementation for Fail-Silent model: Read-Impose Write-Majority

The algorithm assumes a majority of correct processes. The writer simply makes sure a majority adopts its value. A reader selects the value with the largest timestamp from a majority and imposes this value and makes sure a majority adopts it before completing the read operation.

Implements:
  (1, N)-AtomicRegister, instance onar.

Uses:
  BestEffortBroadcast, instance beb;
  PerfectPointToPointLinks, instance pl.

upon event ⟨ onar, Init ⟩ do
  (ts, val) := (0, ⊥);
  wts := 0;
  acks := 0;
  rid := 0;
  readlist := [⊥];
  readval := ⊥;
  reading := FALSE;

upon event ⟨ onar, Read ⟩ do
  rid := rid + 1;
  acks := 0;
  readlist := [⊥];
  reading := TRUE;

  trigger ⟨ beb, Broadcast | [READ, rid] ⟩;

upon event ⟨ beb, Deliver | p, [READ, r] ⟩ do
  trigger ⟨ pl, Send | p, [VALUE, r, ts, val] ⟩;

upon event ⟨ pl, Deliver | q, [VALUE, r, ts′, v′] ⟩ such that r = rid do
  readlist[q] := (ts′ , v′);

  if #(readlist) > N/2 then
     (maxts, readval) := highest(readlist);
     readlist := [⊥];
     trigger ⟨ beb, Broadcast | [WRITE, rid, maxts, readval] ⟩;

upon event ⟨ onar, Write | v ⟩ do
  rid := rid + 1;
  wts := wts + 1;
  acks := 0;
  trigger ⟨ beb, Broadcast | [WRITE, rid, wts, v] ⟩;

upon event ⟨ beb, Deliver | p, [WRITE, r, ts′, v′] ⟩ do
  if ts′ > ts then
     (ts, val) := (ts′, v′);
  trigger ⟨ pl, Send | p, [ACK, r] ⟩;

upon event ⟨ pl, Deliver | q, [ACK, r] ⟩ such that r = rid do
  acks := acks + 1;

  if acks > N/2 then
     acks := 0;

     if reading = TRUE then
        reading := FALSE;
        trigger ⟨ onar, ReadReturn | readval ⟩;
     else
        trigger ⟨ onar, WriteReturn ⟩;

(N, N) Atomic Registers

Specification:

Module:
  Name: (N, N )-AtomicRegister, instance nnar.

Events:
  Request: ⟨ nnar, Read ⟩: Invokes a read operation on the register.
  Request: ⟨ nnar, Write | v ⟩: Invokes a write operation with value v on the register.
  Indication: ⟨ nnar, ReadReturn | v ⟩: Completes a read operation on the register with return value v.
  Indication: ⟨ nnar, WriteReturn ⟩: Completes a write operation on the register.

Properties:
  1. Termination: If a correct process invokes an operation, then the
     operation eventually completes.

  2. Atomicity: Every read operation returns the value that was written
     most recently in a hypothetical execution, where every failed
     operation appears to be complete or does not appear to have been
     invoked at all, and every complete operation appears to have been
     executed at some instant between its invocation and its completion.

Implementation for Fail-Stop model: Read-Impose Write-Consult-All

Question: why the algorithm Read-Impose Write-All (1, N) Atomic Register doesn’t work? This is because two concurrent writes may create the same timestamp, thus the same timestamp may associate with different values, thus creating READ scenarios impossible to linearize. The alogithm has to use process rank to impose an order if two timestamps are equal – total order is the essence of atomicity.

Implements:
  (N, N)-AtomicRegister, instance nnar.

Uses:
  BestEffortBroadcast, instance beb;
  PerfectPointToPointLinks, instance pl;
  PerfectFailureDetector, instance P.

upon event ⟨ nnar, Init ⟩ do
  (ts, wr, val) := (0, 0, ⊥);
  correct := Π;
  writeset := ∅;
  readval := ⊥;
  reading := FALSE;

upon event ⟨ P, Crash | p ⟩ do
  correct := correct \ {p};

upon event ⟨ nnar, Read ⟩ do
  reading := TRUE;
  readval := val;
  trigger ⟨ beb, Broadcast | [WRITE, ts, wr, val] ⟩;

upon event ⟨ nnar, Write | v ⟩ do
  trigger ⟨ beb, Broadcast | [WRITE, ts + 1, rank(self), v] ⟩;

upon event ⟨ beb, Deliver | p, [WRITE, ts′, wr′, v′] ⟩ do
  if (ts′, wr′) is larger than (ts, wr) then
     (ts, wr, val) := (ts′, wr′, v′);
  trigger ⟨ pl, Send | p, [ACK] ⟩;

upon event ⟨ pl, Deliver | p, [ACK] ⟩ do
  writeset := writeset ∪ {p};

upon correct ⊆ writeset do
  writeset := ∅;
  if reading = TRUE then
     reading := FALSE;
     trigger ⟨ nnar, ReadReturn | readval ⟩;
  else
     trigger ⟨ nnar, WriteReturn ⟩;

Implementation for Fail-Silent model: Read-Impose Write-Consult-Majority

The algorithm assumes a majority of correct processes. Compared to the Read-Impose Write-Majority algorithm, the reader is unchanged, it selects the value with the largest timestamp(now augmented with process rank) from a majority and imposes this value and makes sure a majority adopts it before completing the read operation. The writer also reads from a majority to get the latest timestamp and then make sure a majority adopts its value.

Implements:
  (N, N)-AtomicRegister, instance nnar.

Uses:
  BestEffortBroadcast, instance beb;
  PerfectPointToPointLinks, instance pl.

upon event ⟨ nnar, Init ⟩ do
  (ts, wr, val) := (0, 0, ⊥);
  acks := 0;
  writeval := ⊥;
  rid := 0;
  readlist := [⊥];
  readval := ⊥;
  reading := FALSE;

upon event ⟨ nnar, Read ⟩ do
  rid := rid + 1;
  acks := 0;
  readlist := [⊥];
  reading := TRUE;

  trigger ⟨ beb, Broadcast | [READ, rid] ⟩;

upon event ⟨ beb, Deliver | p, [READ, r] ⟩ do
  trigger ⟨ pl, Send | p,[VALUE, r, ts, wr, val] ⟩;

upon event ⟨ pl, Deliver | q, [VALUE, r, ts′, wr′, v′] ⟩ such that r = rid do
  readlist[q] := (ts′, wr′, v′);

  if #(readlist) > N/2 then
    (maxts, rr, readval) := highest(readlist);
    readlist := [⊥];

    if reading = TRUE then
       trigger ⟨ beb, Broadcast | [WRITE, rid, maxts, rr, readval] ⟩;
    else
       trigger ⟨ beb, Broadcast | [WRITE, rid, maxts + 1, rank(self), writeval] ⟩;

upon event ⟨ nnar, Write | v ⟩ do rid := rid + 1;
  writeval := v;
  acks := 0;
  readlist := [⊥];

  trigger ⟨ beb, Broadcast | [READ, rid] ⟩;

upon event ⟨ beb, Deliver | p, [WRITE, r, ts′, wr′, v′] ⟩ do
  if (ts′, wr′) is larger than (ts, wr) then
     (ts, wr, val) := (ts′, wr′, v′);
  trigger ⟨ pl, Send | p, [ACK, r] ⟩;

upon event ⟨ pl, Deliver | q, [ACK, r] ⟩ such that r = rid do
  acks := acks + 1;

  if acks > N/2 then
     acks := 0;

     if reading = TRUE then
        reading := FALSE;
        trigger ⟨ nnar, ReadReturn | readval ⟩;
     else
        trigger ⟨ nnar, WriteReturn ⟩;

To argue that the algorithms satisfy the atomicity property, we can follow the steps below:

  1. Construct a total-order relationship to linearize any execution history of the algorithm.
  2. Argue that the linearization preserves all precedence relationships between operations in the execution history.

Reference