Distributed Algorithms
27 Dec 2014Note: This is a summary of the course I take at EPFL(course page).
Distributed computing and storage systems are very popular today in industry. Before taking the Distributed Algorithm courses at EPFL, I’ve learned here and there things about vector clocks, quorums, replication, CAP theorem, two-phase commit and so on. These scattered pieces of knowledge can’t provide a firm theorectical foundation for me to understand the core principles behind various distributed systems.
An introductory course should provide basic concepts & techniques that enable learners to think, reason and do research in the field. It should emphasize the most basic building blocks & principles, instead of specific artefacts built from these basic building blocks. From this perspective, I think the content of this course is excellent.
- General Problems of Distributed Systems
- Models of Distributed Systems
- Define the Problems
- Failure Detectors
- Channels
- Broadcast with deliveredness properties
- Broadcast with ordering properties
- Consensus
- Non-blocking atomic commit
- Group Membership
- View Synchronous Broadcast
- Registers
- Reference
General Problems of Distributed Systems
If we think abstractly, all distributed systems face and only face three basic problems:
- Channels: communication between two processes
- Broadcast: communication between one process and multiple processes
- Consensus: multiple processes to reach a common decision
It turns out that any difficult problem of distributed systems is some complex combination of the three basic problems above. To some extent, distributed computing is all about communication and protocols. Why the problems above are nontrivial? This is due to following facts:
- the communication medium may be asynchronous(non-deterministic delay)
- any process may crash and stop, or crash and recover, or be attacked and do harmful things to the system(Byzantine process)
- crashed processes may not be perfectly detected
Models of Distributed Systems
Depending on what assumptions we make about the distributed environment, following are widely used models for the study of distributed systems:
- fail-stop: crash-stop process + perfect links + perfect failure detector
- fail-noisy: crash-stop process + perfect links + eventually perfect failure detector
- fail-silent: crash-stop process + perfect links
- fail-recovery: crash-recovery process + stubborn links + eventual leader detector
- fail-arbitrary: Byzantine process + authenticated perfect link
Under different models and assumptions, there are often different solutions to the problems above. And each solution to a problem has to specify clearly which model it assumes.
In theretical studies, it’s also common to assume that a majority of processes are correct or at most f of N processes may fail in addition to any of the computation models above. Additional assumptions may enable simpler solutions to some specific problems, though the domain of application becomes smaller.
Define the Problems
The three general problems can be refined by adding different properties to get variants of the problems.
Channels
For channels, we can restrict that the communication should be reliable, or in addition it should be secure. The concept reliable and secure can be formally defined in terms of liveness and safety properties. We can also restrict that the messages must be received in the same order as they are sent. Following are useful abstractions for channels:
- Perfect links guarantee delivery and no duplication, which can be built on top of noisy and finitely lossy fair-loss links.
- Authenticated links guarantee secure communication(confidentiality & integrity & authenticity).
- Ordered links guarantee messages are delivered in the same order as they are sent.
Broadcast
Regarding broadcast, i.e. the communication between one process and multiple processes, what we care is communication in the direction 1->N, as the communication in the direction N->1 is just a superposition of channels. Why the communication 1->N is not a superposition of channels? That is because we can stipulate different properties on the communication. For example, there could be following different properties:
- Best-effort broadcast is just a superposition of channels.
- Reliable broadcast stipulates that all correct processes deliver the same set of messages.
- Uniform broadcast requires further that no messages can be delivered by a faulty processe but are not delivered by a correct process.
Additional restrictions can be added to broadcast by specifying the order of delivered messages. For example, there could be following different properties:
- FIFO broadcast stipulates that each process should deliver messages in the same order as they are broadcast by a process.
- Causal broadcast stipulates that each process should deliver messages according to causal order(see definition below).
- Total-order broadcast stipulates that all processes deliver all messages in the same order.
There are other forms of broadcast and new ones could be invented if a new property of broadcast can be defined.
Consensus
The third basic problem of distributed systems is consensus, i.e. to reach common decisions among multiple processes. This problem is easy if we have a special leader process, but the special process forms a single point of failure, which makes the whole system less fault-tolerant.
Consensus is the panacea in distrubted systems. A large number of difficult problems can be solved by using this basic construct.
Failure Detectors
Perfect failure detector
Perfect failure detector can be implemented in synchronous systems using heartbeat. It is impossible to implement perfect failure detect in asynchronous systems, as it’s impossible to differentiate delay from a failure.
Module: Name: PerfectFailureDetector, instance P. Events: Indication: ⟨ P, Crash | p ⟩: Detects that process p has crashed. Properties: 1. Strong completeness: Eventually, every process that crashes is permanently detected by every correct process. 2. Strong accuracy: If a process p is detected by any process, then p has crashed.
Eventually perfect failure detector
Eventually perfect failure detector can be implemented in a partially synchronous environment using heartbeats. Partially synchronous means there are periods during which the system is synchronous, although there is no bound on the period during which it is asynchronous. Most pratical systems are partially synchronous.
Module: Name: EventuallyPerfectFailureDetector, instance ♢P. Events: Indication: ⟨ ♢P , Suspect | p ⟩: Notifies that process p is suspected to have crashed. Indication: ⟨ ♢P, Restore | p ⟩: Notifies that process p is not suspected anymore. Properties: 1. Strong completeness: Eventually, every process that crashes is permanently suspected by every correct process. 2. Eventual strong accuracy: Eventually, no correct process is suspected by any correct process.
Channels
Fair-loss links
Specification:
Module: Name: FairLossPointToPointLinks, instance fll. Events: Request: ⟨ fll, Send | q, m ⟩: Requests to send message m to process q. Indication: ⟨ fll, Deliver | p, m ⟩: Delivers message m sent by process p. Properties: 1. Fair-loss - If a correct process p infinitely often sends a message m to a correct process q, then q delivers m an infinite number of times. 2. Finite duplication: If a correct process p sends a message m a finite number of times to process q, then m cannot be delivered an infinite number of times by q. 3. No creation: If some process q delivers a message m with sender p, then m was previously sent to q by process p.
Stubborn links
Specification:
Module: Name: StubbornPointToPointLinks, instance sl. Events: Request: ⟨ sl, Send | q, m ⟩: Requests to send message m to process q. Indication: ⟨ sl, Deliver | p, m ⟩: Delivers message m sent by process p. Properties: 1. Stubborn delivery: If a correct process p sends a message m once to a correct process q, then q delivers m an infinite number of times. 2. No creation: If some process q delivers a message m with sender p, then m was previously sent to q by process p.
Implementation:
Implements: StubbornPointToPointLinks, instance sl. Uses: FairLossPointToPointLinks, instance fll. upon event ⟨ sl, Init ⟩ do sent := ∅; starttimer (∆); upon event ⟨ Timeout ⟩ do forall (q, m) ∈ sent do trigger ⟨ fll, Send | q, m ⟩; starttimer (∆); upon event ⟨sl, Send |q, m⟩ do trigger ⟨ fll, Send | q, m ⟩; sent := sent ∪ {(q, m)}; upon event ⟨ fll, Deliver | p, m ⟩ do trigger ⟨ sl, Deliver | p, m ⟩;
Perfect links
Specification:
Module: Name: PerfectPointToPointLinks, instance pl. Events: Request: ⟨ pl, Send | q, m ⟩: Requests to send message m to process q. Indication: ⟨ pl, Deliver | p, m ⟩: Delivers message m sent by process p. Properties: 1. Reliable delivery: If a correct process p sends a message m to a correct process q, then q eventually delivers m. 2. No duplication: No message is delivered by a process more than once. 3. No creation: If some process q delivers a message m with sender p, then m was previously sent to q by process p.
Implementation:
Implements: PerfectPointToPointLinks, instance pl. Uses: StubbornPointToPointLinks, instance sl. upon event ⟨ pl, Init ⟩ do delivered := ∅; upon event ⟨ pl, Send |q, m ⟩ do trigger ⟨ sl, Send | q, m ⟩; upon event ⟨ sl, Deliver | p, m ⟩ do if m ∉ delivered then delivered := delivered ∪ {m}; trigger ⟨ pl, Deliver | p, m ⟩;
Ordered perfect links
Specification:
Module: Name: OrderedPerfectLinks, instance opl. Events: Request: ⟨ opl, Send | q, m ⟩: Requests to send message m to process q. Indication: ⟨ opl, Deliver | p, m ⟩: Delivers message m sent by process p. Properties: 1. Reliable delivery: If a correct process p sends a message m to a correct process q, then q eventually delivers m. 2. No duplication: No message is delivered by a process more than once. 3. No creation: If some process q delivers a message m with sender p, then m was previously sent to q by process p. 4. Ordered delivery: If a correct process p sends messages m1 before m2 to a correct process q, then q will deliver m1 before m2.
Implementation:
Implements: OrderedPerfectPointToPointLinks, instance opl. Uses: PerfectPointToPointLinks, instance pl. upon event ⟨ opl, Init ⟩ do foreach p ∈ Π do receive_ts[p] := 1; send_ts[p] := 1; pending := ∅; upon event ⟨opl, Send | q, m⟩ do trigger ⟨ pl, Send | q, [m, send_ts[q]] ⟩; send_ts[q] := send_ts[q] + 1; upon event ⟨ pl, Deliver | p, [m, ts] ⟩ do pending := pending ∪ {(m, ts, p)}; upon (m, ts, p) ∈ pending and ts = receive_ts[p] do trigger ⟨ opl, Deliver | p, m ⟩; receive_ts[p] := receive_ts[p] + 1; pending := pending \ {(m, ts, p)}
Authenticated Perfect Links
This abstraction is useful for the fail-arbitrary model, to prevent Byzantine processes from doing bad things to the system.
Specification
Module: Name: AuthPerfectPointToPointLinks, instance al. Events: Request: ⟨ al, Send | q, m ⟩: Requests to send message m to process q. Indication: ⟨ al, Deliver | p, m ⟩: Delivers message m sent by process p. Properties: 1. Reliable delivery: If a correct process sends a message m to a correct process q, then q eventually delivers m. 2. No duplication: No message is delivered by a correct process more than once. 3. Authenticity: If some correct process q delivers a message m with sender p and process p is correct, then m was previously sent to q by p.
Implementation:
Implements: AuthPerfectPointToPointLinks, instance al. Uses: StubbornPointToPointLinks, instance sl. upon event ⟨ al, Init ⟩ do delivered := ∅; upon event ⟨ al, Send | q, m ⟩ do a := authenticate(self, q, m); trigger ⟨ sl, Send | q, [m, a] ⟩; upon event ⟨ sl, Deliver | p, [m, a] ⟩ do if verifyauth(self, p, m, a) and m ∉ delivered then delivered := delivered ∪ {m}; trigger ⟨ al, Deliver | p, m ⟩;
Broadcast with deliveredness properties
Best-effort broadcast
Specification:
Module: Name: BestEffortBroadcast, instance beb. Events: Request: ⟨ beb, Broadcast | m ⟩: Broadcasts a message m to all processes. Indication: ⟨ beb, Deliver | p, m ⟩: Delivers a message m broadcast by process p. Properties: 1. Validity: If a correct process broadcasts a message m, then every correct process eventually delivers m. 2. No duplication: No message is delivered more than once. 3. No creation: If a process delivers a message m with sender s, then m was previously broadcast by process s.
Implementation:
Implements: BestEffortBroadcast, instance beb. Uses: PerfectPointToPointLinks, instance pl. upon event ⟨ beb, Broadcast | m ⟩ do forall q ∈ Π do trigger ⟨ pl, Send | q, m ⟩; upon event ⟨ pl, Deliver | p, m ⟩ do trigger ⟨ beb, Deliver | p, m ⟩
Problem: If the sender crashes, some correct process may receive the message, some may not, thus creates inconsistency.
Regular reliable broadcast
Specification:
Module: Name: ReliableBroadcast, instance rb. Events: Request: ⟨ rb, Broadcast | m ⟩: Broadcasts a message m to all processes. Indication: ⟨ rb, Deliver | p, m ⟩: Delivers a message m broadcast by process p. Properties: 1. Validity: If a correct process p broadcasts a message m, then p eventually delivers m. 2. No duplication: No message is delivered more than once. 3. No creation: If a process delivers a message m with sender s, then m was previously broadcast by process s. 4. Agreement: If a message m is delivered by some correct process, then m is eventually delivered by every correct process.
Implementation for Fail-Stop model - Lazy Reliable Broadcast
Idea: If a process crashes, correct processes broadcast all messages from the crashed process.
Implements: ReliableBroadcast, instance rb. Uses: BestEffortBroadcast, instance beb; PerfectFailureDetector, instance P. upon event ⟨ rb, Init ⟩ do correct := Π; foreach p ∈ Π do from[p] := [∅]; upon event ⟨ rb, Broadcast | m ⟩ do trigger ⟨ beb, Broadcast | [DATA, self, m] ⟩; upon event ⟨ beb, Deliver | p, [DATA, s, m] ⟩ do if m ∉ from[s] then trigger ⟨ rb, Deliver | s, m ⟩; from[s] := from[s] ∪ {m}; if s ∉ correct then trigger ⟨ beb, Broadcast | [DATA, s, m] ⟩; upon event ⟨P, Crash |p ⟩ do correct := correct \ {p}; forall m ∈ from[p] do trigger ⟨ beb, Broadcast | [DATA, p, m] ⟩;
Implementation for Fail-Silent model - Eager Reliable Broadcast
Idea: If a correct process deliver a message, it also broadcast the message.
Implements: ReliableBroadcast, instance rb. Uses: BestEffortBroadcast, instance beb. upon event ⟨ rb, Init ⟩ do delivered := ∅; upon event ⟨ rb, Broadcast | m ⟩ do trigger ⟨ beb, Broadcast | [DATA, self, m] ⟩; upon event ⟨ beb, Deliver | p, [DATA, s, m] ⟩ do if m ∉ delivered then delivered := delivered ∪ {m}; trigger ⟨ rb, Deliver | s, m ⟩; trigger ⟨ beb, Broadcast | [DATA, s, m] ⟩;
Problem: A faulty process may deliver a message which is not delivered by correct processes.
Uniform reliable broadcast
Specification
Module: Name: UniformReliableBroadcast, instance urb. Events: Request: ⟨ urb, Broadcast | m ⟩: Broadcasts a message m to all processes. Indication: ⟨ urb, Deliver | p, m ⟩: Delivers a message m broadcast by process p. Properties: 1. Validity: If a correct process p broadcasts a message m, then p eventually delivers m. 2. No duplication: No message is delivered more than once. 3. No creation: If a process delivers a message m with sender s, then m was previously broadcast by process s. 4. Uniform agreement: If a message m is delivered by some process (whether correct or faulty), then m is eventually delivered by every correct process.
Implementation for Fail-Stop model - All-Ack Uniform Reliable Broadcast
Idea: only deliver a message if all correct processes have acknowledged the reception of the message.
Implements: UniformReliableBroadcast, instance urb. Uses: BestEffortBroadcast, instance beb. PerfectFailureDetector, instance P. upon event ⟨ urb, Init ⟩ do delivered := ∅; pending := ∅; correct := Π; forall m do ack[m] := ∅; upon event ⟨ urb, Broadcast | m ⟩ do pending := pending ∪ {(self, m)}; trigger ⟨ beb, Broadcast | [DATA, self, m] ⟩; upon event ⟨ beb, Deliver | p, [DATA, s, m] ⟩ do ack[m] := ack[m] ∪ {p}; if (s, m) ∉ pending then pending := pending ∪ {(s, m)}; trigger ⟨ beb, Broadcast | [DATA, s, m] ⟩; upon event ⟨ P, Crash | p ⟩ do correct := correct \ {p}; function candeliver(m) returns Boolean is return (correct ⊆ ack[m]); upon exists (s, m) ∈ pending such that candeliver(m) and m ∉ delivered do delivered := delivered ∪ {m}; trigger ⟨ urb, Deliver | s, m ⟩;
Implementation for Fail-Silent model - Majority-Ack Uniform Reliable Broadcast
Idea: assume a majority of processes are correct
Implements: UniformReliableBroadcast, instance urb. Uses: BestEffortBroadcast, instance beb. upon event ⟨ urb, Init ⟩ do delivered := ∅; pending := ∅; forall m do ack[m] := ∅; upon event ⟨ urb, Broadcast | m ⟩ do pending := pending ∪ {(self, m)}; trigger ⟨ beb, Broadcast | [DATA, self, m] ⟩; upon event ⟨ beb, Deliver | p, [DATA, s, m] ⟩ do ack[m] := ack[m] ∪ {p}; if (s, m) ∉ pending then pending := pending ∪ {(s, m)}; trigger ⟨ beb, Broadcast | [DATA, s, m] ⟩; function candeliver(m) returns Boolean is return #(ack[m]) > N/2; upon exists (s, m) ∈ pending such that candeliver(m) and m ∉ delivered do delivered := delivered ∪ {m}; trigger ⟨ urb, Deliver | s, m ⟩;
Terminating Reliable Broadcast
Terminating reliable broadcast is a kind of broadcast restricts what if the sender crashes, a special value should be delivered by all processes(correct or not). In terminating reliable broadcast, only a well-know process sends, other processes are receivers, and at most one message can be sent.
It’s important to note that uniform broadcast doesn’t solve the problem of terminating reliable broadcast, because in an asynchronous system, if the sender fails, the receiver can’t decide whether to wait or not. If it decides to wait, it might be the case that no process has received any message from the sender, thus it will wait forever. If it decides to terminate, sometime later another receiver may broadcast the value it has received from the sender before it crashes.
Specification:
Module: Name: UniformTerminatingReliableBroadcast, instance utrb, with sender s. Events: Request: ⟨ utrb, Broadcast | m ⟩: Broadcasts a message m to all processes. Executed only by process s. Indication: ⟨ utrb, Deliver | p, m ⟩: Delivers a message m broadcast by process p or the symbol △. Properties: 1. Validity: If a correct process p broadcasts a message m, then p eventually delivers m. 2. Termination: No process delivers more than one message. 3. Integrity: If a correct process delivers some message m, then m was either previously broadcast by process s or it holds m = △. 4. Uniform Agreement: If any process delivers a message m, then every correct process eventually delivers m.
The implementation of termination reliable broadcast depends on the primitive uniform consensus which is discussed later in this article.
Implements: UniformTerminatingReliableBroadcast, instance utrb, with sender s. Uses: BestEffortBroadcast, instance beb; UniformConsensus, instance uc; PerfectFailureDetector, instance P. upon event ⟨ utrb, Init ⟩ do proposal := ⊥; upon event ⟨ utrb, Broadcast | m ⟩ do // only process s trigger ⟨ beb, Broadcast | m ⟩; upon event ⟨ beb, Deliver | s, m ⟩ do if proposal = ⊥ then proposal := m; trigger ⟨ uc, Propose | proposal ⟩; uponevent ⟨ P,Crash | p ⟩ do if p = s ∧ proposal = ⊥ then proposal := △; trigger ⟨ uc, Propose | proposal ⟩; upon event ⟨ uc, Decide | decided ⟩ do trigger ⟨ utrb, Deliver | s, decided ⟩;
Broadcast with ordering properties
FIFO-order reliable broadcast
Specification
Module: Name: FIFOReliableBroadcast, instance frb. Events: Request: ⟨ frb, Broadcast | m ⟩: Broadcasts a message m to all processes. Indication: ⟨ frb, Deliver | p, m ⟩: Delivers a message m broadcast by process p. Properties: 1. Validity: If a correct process p broadcasts a message m, then p eventually delivers m. 2. No duplication: No message is delivered more than once. 3. No creation: If a process delivers a message m with sender s, then m was previously broadcast by process s. 4. Agreement: If a message m is delivered by some correct process, then m is eventually delivered by every correct process. 5. FIFO delivery: If some process broadcasts message m1 before it broadcasts message m2, then no correct process delivers m2 unless it has already delivered m1.
Implementation for Fail-Silent model: Broadcast with Sequence Number
Implements: FIFOReliableBroadcast, instance frb. Uses: ReliableBroadcast, instance rb. upon event ⟨ frb, Init ⟩ do lsn := 0; pending := ∅; foreach p ∈ Π do next[p] := 1; upon event ⟨ frb, Broadcast | m ⟩ do lsn := lsn + 1; trigger ⟨ rb, Broadcast | [DATA, self, m, lsn] ⟩; upon event ⟨ rb, Deliver | p, [DATA, s, m, sn] ⟩ do pending := pending ∪ {(s, m, sn)}; while exists (s, m′, sn′) ∈ pending such that sn′ = next[s] do next[s] := next[s] + 1; pending := pending \ {(s, m′ , sn′)}; trigger ⟨ frb, Deliver | s, m′ ⟩;
Causal-order broadcast
Causal order can be defined recursively. We denote m1 → m2(meaning m1 was “sent before” m2) if and only if any of the following conditions holds:
- some process p broadcasts m1 before it broadcasts m2;
- some process p delivers m1 and subsequently broadcasts m2; or
- there exists some message m′ such that m1 → m′ and m′ → m2.
The causal order property can be added to regular reliable broadcast or uniform reliable broadcast to form causal-order regular reliable broadcast or causal-order uniform reliable broadcast respectively. Here we only concern us with the former case.
Specification:
Module: Name: CausalOrderReliableBroadcast, instance crb. Events: Request: ⟨ crb, Broadcast | m ⟩: Broadcasts a message m to all processes. Indication: ⟨ crb, Deliver | p, m ⟩: Delivers a message m broadcast by process p. Properties: 1. Validity: If a correct process p broadcasts a message m, then p eventually delivers m. 2. No duplication: No message is delivered more than once. 3. No creation: If a process delivers a message m with sender s, then m was previously broadcast by process s. 4. Agreement: If a message m is delivered by some correct process, then m is eventually delivered by every correct process. 5. Causal delivery: For any message m1 that potentially caused a message m2, i.e., m1 → m2, no process delivers m2 unless it has already delivered m1.
Implementation 1 for Fail-Silent model: No-Waiting Causal Broadcast
Idea: send message with its full history
Implements: CausalOrderReliableBroadcast, instance crb. Uses: ReliableBroadcast, instance rb. upon event ⟨ crb, Init ⟩ do delivered := ∅; past := []; upon event ⟨ crb, Broadcast | m ⟩ do trigger ⟨ rb, Broadcast | [DATA, past, m] ⟩; append(past, (self, m)); upon event ⟨ rb, Deliver | p, [DATA, mpast, m] ⟩ do if m ∉ delivered then forall (s, n) ∈ mpast do // by the order in the list if n ∉ delivered then trigger ⟨ crb, Deliver | s, n ⟩; delivered := delivered ∪ {n}; if (s, n) ∉ past then append(past, (s, n)); trigger ⟨ crb, Deliver | p, m ⟩; delivered := delivered ∪ {m}; if (p, m) ∉ past then append(past, (p, m));
Problem: history too long
Idea: if all processes have delivered the message, it doesn’t make sense for the message to be in the history any more.
Solution: With a perfect failure detector, when a process rb-delivers a message m, the process rb-broadcasts an ACK message to all processes; when an ACK message for message m has been rb-delivered from all correct processes, then m is purged from past. The implementation is omitted here.
Implementation 2 for Fail-Silent model: Waiting Causal Broadcast
Idea: use vector clock. We can think vector clock as a partial order timestamp.
In the following, if we just look at one column of the vector clock, it looks very similiar to the FIFO algorithm. It’s obvious it guarantees FIFO-delivery of messages with respect to a single process. The difference with FIFO is that each message is not only associated with its own local clock, but also associated with each other process’ clock, thus a vector clock. The algorithm maintains two vector clocks, one is the send timestamp W, one is the delivery timestamp V(to remember the next expected messages). From an abstract point of view, the general form of all timestamp-ordered algorithms looks the same.
Implements: CausalOrderReliableBroadcast, instance crb. Uses: ReliableBroadcast, instance rb. upon event ⟨ crb, Init ⟩ do foreach p ∈ Π do V[p] := 0; lsn := 0; pending := ∅; upon event ⟨ crb, Broadcast | m ⟩ do W := V; W[self] := lsn; lsn := lsn + 1; trigger ⟨ rb, Broadcast | [DATA, W, m] ⟩; upon event ⟨ rb, Deliver | p, [DATA, W, m] ⟩ do pending := pending ∪ {(p, W, m)}; while exists (p′, W′, m′) ∈ pending such that W′ ≤ V do pending := pending \ {(p′, W′, m′)}; V[p′] := V [p′] + 1; trigger ⟨ crb, Deliver | p′, m′ ⟩;
Total-order broadcast
The total-order property can be added to regular reliable broadcast or uniform reliable broadcast to form total-order regular reliable broadcast or total-order uniform reliable broadcast respectively. Here we only concern us with the former case.
The most common usage of total-order broadcast in real world is to implement replicated services to achieve fault tolerance.
Specification
Module: Name: TotalOrderBroadcast, instance tob. Events: Request: ⟨ tob, Broadcast | m ⟩: Broadcasts a message m to all processes. Indication: ⟨ tob, Deliver | p, m ⟩: Delivers a message m broadcast by process p. Properties: 1. Validity: If a correct process p broadcasts a message m, then p eventually delivers m. 2. No duplication: No message is delivered more than once. 3. No creation: If a process delivers a message m with sender s, then m was previously broadcast by process s. 4. Agreement: If a message m is delivered by some correct process, then m is eventually delivered by every correct process. 5. Total order: Let m1 and m2 be any two messages and suppose p and q are any two correct processes that deliver m1 and m2. If p delivers m1 before m2, then q delivers m1 before m2.
The implementation of total-order broadcast depends on the consensus primitive(see below), and total-order broadcast can be used to implement consensus.
Implementation for Fail-Silent model: Consensus-Based Total-Order Broadcast
Implements: TotalOrderBroadcast, instance tob. Uses: ReliableBroadcast, instance rb; Consensus (multiple instances). upon event ⟨ tob, Init ⟩ do unordered := ∅; delivered := ∅; round := 1; wait := FALSE; upon event ⟨ tob, Broadcast | m ⟩ do trigger ⟨ rb, Broadcast | m ⟩; upon event ⟨ rb, Deliver | p, m ⟩ do if m ∉ delivered then unordered := unordered ∪ {(p, m)}; upon unordered ≠ ∅ and wait = FALSE do wait := TRUE; Initialize a new instance c.round of consensus; trigger ⟨ c.round, Propose | unordered ⟩; upon event ⟨ c.r, Decide | decided ⟩ such that r = round do forall (s, m) ∈ sort(decided) do trigger ⟨ tob, Deliver | s, m ⟩; delivered := delivered ∪ decided; unordered := unordered \ decided; round := round + 1; wait := FALSE;
Consensus
Solving consensus is the key to solving many problems in distributed computing(e.g. various problems of consistency).
Fischer, Lynch, and Paterson (1985) had proved that no deterministic algorithm implements consensus in a fail-silent model, even if only one process fails. The implementation has to either assume perfect failure detector or fail-noisy model.
Regular Consensus
Specfication
Module: Name: Consensus, instance c. Events: Request: ⟨ c, Propose | v ⟩: Proposes value v for consensus. Indication: ⟨ c, Decide | v ⟩: Outputs a decided value v of consensus. Properties: 1. Termination: Every correct process eventually decides some value. 2. Validity: If a process decides v, then v was proposed by some process. 3. Integrity: No process decides twice. 4. Agreement: No two correct processes decide differently.
Implementation 1 for Fail-Stop model: Flooding Consensus
Idea: Each process collect all proposed values from all processes, and then decides on the smallest proposed value.
Implements: Consensus, instance c. Uses: BestEffortBroadcast, instance beb; PerfectFailureDetector, instance P. upon event ⟨ c, Init ⟩ do correct := Π; round := 1; decision := ⊥; receivedfrom := [∅]; proposals := [∅]; receivedfrom[0] := Π; upon event ⟨ P,Crash | p ⟩ do correct := correct \ {p}; upon event ⟨ c, Propose | v ⟩ do proposals[1] := proposals[1] ∪ {v}; trigger ⟨ beb, Broadcast | [PROPOSAL, 1, proposals[1]] ⟩; upon event ⟨ beb, Deliver | p, [PROPOSAL, r, ps] ⟩ do receivedfrom[r] := receivedfrom[r] ∪ {p}; proposals[r] := proposals[r] ∪ ps; upon correct ⊆ receivedfrom[round] ∧ decision = ⊥ do if receivedfrom[round] = receivedfrom[round − 1] then decision := min(proposals[round]); trigger ⟨ beb, Broadcast | [DECIDED, decision] ⟩; trigger ⟨ c, Decide | decision ⟩; else round := round + 1; trigger ⟨ beb, Broadcast | [PROPOSAL, round, proposals[round − 1]] ⟩; upon event ⟨ beb, Deliver | p, [DECIDED, v] ⟩ such that p ∈ correct ∧ decision = ⊥ do decision := v; trigger ⟨ beb, Broadcast | [DECIDED, decision] ⟩; trigger ⟨ c, Decide | decision ⟩;
Why the algorithm proceeds in rounds & exchange collected proposals? That’s the only possible way to ensure that all correct processes can collect all possible proposals with the presence of crash-stop processes.
Note that a process can execute at most N rounds, the worst-case is that at each round a process crashes.
Implementation 2 for Fail-Stop model: Hierarchical Consensus
Idea: The correct process with the smallest identity in the hierarchy imposes its value(may be the value decided by previous faulty processes) on all other processes.
Implements: Consensus, instance c. Uses: BestEffortBroadcast, instance beb; PerfectFailureDetector, instance P. upon event ⟨ c, Init ⟩ do suspected := ∅; round := 1; proposal := ⊥; proposer := 0; delivered := [FALSE]; broadcast := FALSE; upon event ⟨ P,Crash | p ⟩ do suspected := suspected ∪ {p}; upon event ⟨ c, Propose | v ⟩ such that proposal = ⊥ do proposal := v; upon round = self ∧ proposal ≠ ⊥ ∧ broadcast = FALSE do broadcast := TRUE; trigger ⟨ beb, Broadcast | [DECIDED, proposal] ⟩; trigger ⟨ c, Decide | proposal ⟩; upon round ∈ suspected ∨ delivered[round] = TRUE do round := round + 1; upon event ⟨ beb, Deliver | p,[DECIDED, v] ⟩ do if p < self ∧ p > proposer then proposal := v; proposer := p; delivered[p] := TRUE;
Uniform Consensus
Uniform consensus ensures that no two processes decide different values, whether they are correct or not.
Module: Name: UniformConsensus, instance uc. Events: Request: ⟨ uc, Propose | v ⟩: Proposes value v for consensus. Indication: ⟨ uc, Decide | v ⟩: Outputs a decided value v of consensus. Properties: 1. Termination: Every correct process eventually decides some value. 2. Validity: If a process decides v, then v was proposed by some process. 3. Integrity: No process decides twice. 4. Uniform agreement: No two processes decide differently.
Implementation 1 for Fail-Stop model: Flooding Uniform Consensus
Idea: avoid making decision too early by faulty processes. All processes make decision after N rounds!
Implements: UniformConsensus, instance uc. Uses: BestEffortBroadcast, instance beb; PerfectFailureDetector, instance P. upon event ⟨ uc, Init ⟩ do correct := Π; round := 1; decision := ⊥; proposalset := ∅; receivedfrom := ∅; upon event ⟨ P,Crash | p ⟩ do correct := correct \ {p}; upon event ⟨ uc, Propose | v ⟩ do proposalset := proposalset ∪ {v}; trigger ⟨ beb, Broadcast | [PROPOSAL, 1, proposalset] ⟩; upon event ⟨ beb, Deliver | p, [PROPOSAL, r, ps] ⟩ such that r = round do receivedfrom := receivedfrom ∪ {p}; proposalset := proposalset ∪ ps; upon correct ⊆ receivedfrom ∧ decision = ⊥ do if round = N then decision := min(proposalset); trigger ⟨ uc, Decide | decision ⟩; else round := round + 1; receivedfrom := ∅; // reset for next round trigger ⟨ beb, Broadcast | [PROPOSAL, round, proposalset] ⟩;
Implementation 2 for Fail-Stop model: Hierarchical Uniform Consensus
Idea: in each round, make sure the to be decided value is received by all processes. This relies on perfect failure detector and ACK messages.
Note: there’s a simpler but slower solution. To avoid making decisions too early, just like the uniform version of the flooding consensus, all processes make decisions at round N. This is just a simple modification of the regular version of hierarchical consensus algorithm, thus omitted here.
Implements: UniformConsensus, instance uc. Uses: PerfectPointToPointLinks, instance pl; BestEffortBroadcast, instance beb; ReliableBroadcast, instance rb; PerfectFailureDetector, instance P. upon event ⟨ uc, Init ⟩ do suspected := ∅; ackranks := ∅; round := 1; proposal := ⊥; decision := ⊥; proposed := [⊥]; upon event ⟨ P,Crash | p ⟩ do suspected := suspected ∪ {p}; upon event ⟨ uc, Propose | v ⟩ such that proposal = ⊥ do proposal := v; upon round = self ∧ proposal ≠ ⊥ ∧ decision = ⊥ do trigger ⟨ beb, Broadcast | [PROPOSAL, proposal] ⟩; upon event ⟨ beb, Deliver | p, [PROPOSAL, v] ⟩ do proposed[p] := v; if p ≥ round then trigger ⟨ pl, Send | p, [ACK] ⟩; upon round ∈ suspected do if proposed[round] ≠ ⊥ then proposal := proposed[round]; round := round + 1; upon event ⟨ pl, Deliver | q, [ACK] ⟩ do ackranks := ackranks ∪ {q}; upon suspected ∪ ackranks = {1, . . . , N } do trigger ⟨ rb, Broadcast | [DECIDED, proposal] ⟩; upon event ⟨ rb, Deliver | p, [DECIDED, v] ⟩ such that decision = ⊥ do decision := v; trigger ⟨ uc, Decide | decision ⟩;
A round consists of two communication steps: within the same round, the leader broad- casts a PROPOSAL message to all processes, trying to impose its value, and then expects to obtain an acknowledgment from all correct processes.
Uniform Consensus in the Fail-Noisy Model
Any fail-noisy algorithm that solves consensus also solves uniform consensus. The implementation assumes:
- a correct majority
- a ♢P failure detector
The algorithm is also round-based: processes move incrementally from one round to the other. Process pi is leader in every round k such that k mod N = i. In such a round, pi tries to decide.
To decide, pi executes following steps:
- pi selects the latest adopted value among a majority.
- pi imposes that value at a majority: any process in that majority adopts that value – pi fails if it is suspected(processes that suspect pi inform pi and move to the next round; pi does so as well).
- pi decides and reliably broadcasts the decision to all other processes.
The core of the algorithm ensures that a subsequent round respects the uniform agreement property of consensus, such that if a process might already have decided in the current round, the decision value becomes fixed. A process that decides in a later round must also decide the fixed value.
The assumption of a majority of correct processes is important in two respects:
- ensure the value decided in previous rounds is passed to later rounds;
- enable the leader process to decide within a round in a fail-silent model.
This algorithm is first introduced by Lamport under the name Paxos. The implementation is very tedious, thus it is omitted here.
Non-blocking atomic commit
In this section, we’ll see how the primitives are used in distributed databases to achieve non-blocking atomic commit.
The processes, each representing a data manager, agree on the outcome of a transaction, which is either to commit or to abort the transaction. Commit can only be decided if all processes are able to commit.
Note that a well-known algorithm in this context is two-phase commit, which has a serious bug if the coordinator crashes. This turns out to be the same problem as in terminating reliable broadcast. If the coordinator crashes, other processes can’t decide to wait or not. Unifrom broadcast doesn’t help in such cases as explained in the case of terminating reliable broadcast. If such a case happen, not only the whole system becomes unavailable, the data at different data managers may become inconsistent.
Specification:
Module: Name: NonBlockingAtomicCommit, instance nbac. Events: Request: ⟨ nbac, Propose | v ⟩: Proposes value v = COMMIT or v = ABORT for the commit. Indication: ⟨ nbac, Decide | v ⟩: Outputs the decided value for the commit. Properties: 1. Termination: Every correct process eventually decides some value. 2. Abort-Validity: A process can only decide ABORT if some process proposes ABORT or a process crashes. 3. Commit-Validity: A process can only decide COMMIT if no process proposes ABORT. 4. Integrity: No process decides twice. 5. Uniform Agreement: No two processes decide differently.
Implementation in Fail-Stop model:
Implements: NonBlockingAtomicCommit, instance nbac. Uses: BestEffortBroadcast, instance beb; UniformConsensus, instance uc; PerfectFailureDetector, instance P. upon event ⟨ nbac, Init ⟩ do voted := ∅; proposed := FALSE; upon event ⟨ P, Crash | p ⟩ do if proposed = FALSE then trigger ⟨ uc, Propose | ABORT ⟩; proposed := TRUE; upon event ⟨ nbac, Propose | v ⟩ do trigger ⟨ beb, Broadcast | v ⟩; upon event ⟨ beb, Deliver | p, v ⟩ do if v = ABORT ∧ proposed = FALSE then trigger ⟨ uc, Propose | ABORT ⟩; proposed := TRUE; else voted := voted ∪ {p}; if voted = Π ∧ proposed = FALSE do trigger ⟨ uc, Propose | COMMIT ⟩; proposed := TRUE; upon event ⟨ uc, Decide | decided ⟩ do trigger ⟨ nbac, Decide | decided ⟩;
Group Membership
Group membership is also a recurring problem in distributed systems, which can be solved using the three primitives.
Specification:
Module: Name: GroupMembership, instance gm. Events: Indication: ⟨ gm, View | V ⟩: Installs a new view V = (id,M) with view identifier id and membership M. Properties: 1. Monotonicity: If a process p installs a view V = (id, M) and subsequently installs a view V′ =(id′,M′),then id < id′ and M ⊇ M′. 2. Uniform Agreement: If some process installs a view V = (id, M) and another process installs some view V′ = (id, M′), then M = M′. 3. Completeness: If a process p crashes, then eventually every correct process installs a view (id, M) such that p ∉ M. 4. Accuracy: If some process installs a view (id, M) with q ∉ M for some process q ∈ Π, then q has crashed.
Implementation in Fail-Stop model:
Implements: GroupMembership, instance gm. Uses: UniformConsensus (multiple instance); PerfectFailureDetector, instance P. upon event ⟨ gm, Init ⟩ do (id, M) := (0, Π); correct := Π; wait := FALSE; trigger ⟨ gm, View | (id, M ) ⟩; upon event ⟨ P,Crash | p ⟩ do correct := correct \ {p}; upon correct ⊊ M ∧ wait = FALSE do id := id + 1; wait := TRUE; Initialize a new instance uc.id of uniform consensus; trigger ⟨ uc.id, Propose | correct ⟩; upon event ⟨ uc.i, Decide | M′ ⟩ such that i = id do M := M′; wait := FALSE; trigger ⟨ gm, View | (id, M ) ⟩;
View Synchronous Broadcast
View synchronous broadcast is an abstraction that results from the combination of group membership and reliable broadcast or uniform broadcast. It ensures that the delivery of messages is coordinated with the installation of views.
Specification
The specification for reliable view synchronous broadcast is listed below, the uniform version just needs to replace the properties of reliable broadcast with uniform broadcast.
Module: Name: ViewSynchronousCommunication, instance vs. Events: Request: ⟨ vs, Broadcast | m ⟩: Broadcasts a message m to all processes. Indication: ⟨ vs, Deliver | p, m ⟩: Delivers a message m broadcast by process p. Indication: ⟨ vs, View | V ⟩: Installs a new view V = (id,M) with view identifier id and membership M. Indication: ⟨ vs, Block ⟩: Requests that no new messages are broadcast temporarily until the next view is installed. Request: ⟨ vs, BlockOk ⟩: Confirms that no new messages will be broadcast until the next view is installed. Properties: 1. View Inclusion: If some process delivers a message m from process p in view V , then m was broadcast by p in view V. 2-5. Same as properties 1-4 in regular reliable broadcast. 6-9. Same as properties 1-4 in group membership.
Implementation
The idea for all algorithms in this category is to share all messages that are delivered in current view before installing a new view. Terminating reliable broadcast is used here. And it also has to make sure that no more messages are sent during this unifying period, thus the special messages Block and BlockOK is used. The upper layer must not send any message within the period between BlockOK and View.
Implementation based on Terminating Reliable Broadcast:
Implements: ViewSynchronousCommunication, instance vs. Uses: TerminatingReliableBroadcast, instance trb; GroupMembership, instance gm; BestEffortBroadcast, instance beb. upon event < vs, Init > do view := (0, Π); nextView := ⊥; pending := delivered := deliveredPair := trbDone := ∅; flushing := blocked := FALSE; upon event < vs, Broadcast | m > and (blocked = FALSE) do delivered := delivered ∪ {m} deliveredPair := deliveredPair ∪ {(self, m)} trigger < vs, Deliver | self, m >; trigger < beb, Broadcast | [Data, view.id, m] >; upon event < beb, Deliver | src, [Data, vid, m] > do if (view.id = vid) and (m ∉ delivered) and (blocked = FALSE) then delivered := delivered ∪ { m } deliveredPair := deliveredPair ∪ {(src, m)} trigger < vs, Deliver | src, m >; upon event < gm, View | V > do addtoTail(pending, V); upon (pending ≠ ∅) and (flushing = FALSE) do nextView := removeFromhead (pending); flushing := TRUE; trigger < vs, Block >; upon event < vs, BlockOk > do blocked := TRUE; trbDone := ∅; forall p ∈ view.M do Initialize a new instance trb.vid.p of uniform terminating reliable broadcast with sender p; if p = self then trigger ⟨ trb.vid.p, Broadcast | deliveredPair ⟩; upon event < trb.id.p, Deliver | p, pairs > such that view.id = id do trbDone := trbDone ∪ {p}; if body ≠ △ then forall (src, m) ∈ pairs and m ∉ delivered do delivered := delivered ∪ {m}; trigger < vs, Deliver | src, m >; upon (trbDone = view.M) and (blocked = TRUE) do view := nextView; flushing := blocked := FALSE; delivered := ∅; deliveredPair := ∅; trigger < vs, View | view >;
Implementation based on Uniform Consensus:
Implements:  ViewSynchronousCommunication, instance vs. Uses: UniformConsensus, instance uc; PerfectFailureDetector, instance P; BestEffortBroadcast, instance beb. upon event < vs, Init > do view := (0, Π); correct := Π; flushing := blocked := FALSE; delivered := dset := ∅; trigger ⟨ vs, View | view ⟩; upon event < vs, Broadcast | m > and (blocked = FALSE) do delivered := delivered ∪ {m} trigger < vs, Deliver | self, m >; trigger < beb, Broadcast | [Data,view.id,m] >; upon event < beb, Deliver | src, [Data,vid,m] > do if (view.id = vid) and m ∉ delivered and blocked = FALSE then delivered := delivered ∪ {m} trigger < vs, Deliver | src, m >; upon event < P, crash | p > do correct := correct \ {p}; if flushing = FALSE then flushing := true; trigger < vs, Block >; upon event < vs, BlockOk > do blocked := true; trigger < beb, Broadcast | [DSET,view.id,delivered] >; upon event < beb, Deliver | src, [DSET,vid,del] > do dset:= dset ∪ (src,del); if forall p ∈ correct, (p, mset) ∈ dset then trigger < uc, Propose | view.id+1, correct, dset >; upon event < uc, Decided | id, memb, vsdset > do forall (p,mset) ∈ vsdset: p ∈ memb do forall (src,m) ∈ mset: m ∉ delivered do delivered := delivered ∪ {m} trigger <vsDeliver, src, m>; view := (id, memb); flushing := blocked := FALSE; dset := delivered := ∅; trigger < vs, View | view >;
Implementation of Uniform View Synchronous Broadcast
It’s important to note that simple usage of uniform broadcast doesn’t solve the problem, as uniform broadcast only guarantees uniform delivery, not uniform delivery within view. It’s possible that following scenario would happen:
- process 1 uniform delivers m in view v(i), thus triggers vs-delivery, then crashes
- process 2 and process 3 install view v(i+1)
- process 2 and process 3 uniform delivers m, but ignored due to mismatch of view id
In the above we see that in view v(i), the faulty process 1 vs-delivers m, but which is not delivered by process 2 and process 3.
The idea is to use ACK messages to inform each other process that a message has been received, which is similar to the fail-stop implementation of uniform broadcast. But here we’ve to make sure that such ACK messages are only valid within a specific view.
Implements: UniformViewSynchronousCommunication, instance uvs. Uses: UniformConsensus (multiple instances); BestEffortBroadcast, instance beb; PerfectFailureDetector, instance P. upon event ⟨ uvs, Init ⟩ do (vid, M) := (0, Π); correct := Π; flushing := FALSE; blocked := FALSE; wait := FALSE; pending := ∅; delivered := ∅; seen := [⊥]; forall m do ack[m] := ∅; trigger ⟨ uvs, View | (vid, M ) ⟩; upon event ⟨ uvs, Broadcast | m ⟩ such that blocked = FALSE do pending := pending ∪ {(self, m)}; trigger ⟨ beb, Broadcast | [DATA, vid, self, m] ⟩; upon event ⟨ beb, Deliver | p, [DATA, id, s, m] ⟩ do if id = vid ∧ blocked = FALSE then ack[m] := ack[m] ∪ {p}; if (s, m) ∉ pending then pending := pending ∪ {(s, m)}; trigger ⟨ beb, Broadcast | [DATA, vid, s, m] ⟩; upon exists (s, m) ∈ pending such that M ⊆ ack[m] ∧ m ∉ delivered do delivered := delivered ∪ {m}; trigger ⟨ uvs, Deliver | s, m ⟩; upon event ⟨ P, Crash | p ⟩ do correct := correct \ {p}; upon correct ⊊ M ∧ flushing = FALSE do flushing := TRUE; trigger ⟨ uvs, Block ⟩; upon event ⟨ uvs, BlockOk ⟩ do blocked := TRUE; trigger ⟨ beb, Broadcast | [PENDING, vid, pending] ⟩; upon event ⟨ beb, Deliver | p, [PENDING, id, pd] ⟩ such that id = vid do seen[p] := pd; upon (forall p ∈ correct : seen[p] ≠ ⊥) ∧ wait = FALSE do wait := TRUE; vid := vid + 1; Initialize a new instance uc.vid of uniform consensus; trigger ⟨ uc.vid, Propose | (correct, seen) ⟩; upon event ⟨ uc.id, Decide | (M′, S) ⟩ such that id = vid do forall p ∈ M′ such that S[p] ≠ ⊥ do forall (s, m) ∈ S[p] such that m ∉ delivered do delivered := delivered ∪ {m}; trigger ⟨ uvs, Deliver | s, m ⟩; flushing := FALSE; blocked := FALSE; wait := FALSE; pending := ∅; seen := [⊥]; forall m do ack[m] := ∅; M := M′; trigger ⟨ uvs, View | (vid, M ) ⟩;
Registers
Register is the basic contruct to model various distributed storage systems. Though consensus can be used to easily solve problems in this section, it’s avoided because more light-weight solutions are available.
Registers store values and can be accessed through two operations, read and write. Registers can be classified by the number of readers and writers:
- (1, 1) means one writer and one reader
- (1, N) means one writer and multiple readers
- (N, N) means multiple writers and multiple readers
Registers can also be classified by the behaviors of read and write operations:
- Safe registers: read can return any value in face of concurrent writes
- Regular registers: read can only return last value or the new value in face of concurrent writes
- Atomic registers: it behaves as if all read and write operations can be linearized
(1, N) Regular Registers
Specification:
Module: Name: (1, N)-RegularRegister, instance onrr. Events: Request: ⟨ onrr, Read ⟩: Invokes a read operation on the register. Request: ⟨ onrr, Write | v ⟩: Invokes a write operation with value v on the register. Indication: ⟨ onrr, ReadReturn | v ⟩: Completes a read operation on the register with return value v. Indication: ⟨ onrr, WriteReturn ⟩: Completes a write operation on the register. Properties: 1. Termination: If a correct process invokes an operation, then the operation eventually completes. 2. Validity: A read that is not concurrent with a write returns the last value written; a read that is concurrent with a write returns the last value written or the value concurrently written.
Implementation for Fail-Stop model: Read-One Write-All (1, N) Regular Register
Implements: (1, N)-RegularRegister, instance onrr. Uses: BestEffortBroadcast, instance beb; PerfectPointToPointLinks, instance pl; PerfectFailureDetector, instance P. upon event ⟨ onrr, Init ⟩ do val := ⊥; correct := Π; writeset := ∅; upon event ⟨ P, Crash | p ⟩ do correct := correct \ {p}; upon event ⟨ onrr, Read ⟩ do trigger ⟨ onrr, ReadReturn | val ⟩; upon event ⟨ onrr, Write | v ⟩ do trigger ⟨ beb, Broadcast | [WRITE, v] ⟩; upon event ⟨ beb, Deliver | q, [WRITE, v] ⟩ do val := v; trigger ⟨ pl, Send | q, ACK ⟩; upon event ⟨ pl, Deliver | p, ACK ⟩ do writeset := writeset ∪ {p}; upon correct ⊆ writeset do writeset := ∅; trigger ⟨ onrr, WriteReturn ⟩;
Implementation for Fail-Silent model: Majority Voting Regular Register
The idea is to assume a majority of correct processes, each WRITE and READ waits for a majority of replies to make sure they interset at least one process.
Implements: (1, N)-RegularRegister, instance onrr. Uses: BestEffortBroadcast, instance beb; PerfectPointToPointLinks, instance pl. upon event ⟨ onrr, Init ⟩ do (ts, val) := (0, ⊥); wts := 0; acks := 0; rid := 0; readlist := [⊥]; upon event ⟨ onrr, Write | v ⟩ do wts := wts + 1; acks := 0; trigger ⟨ beb, Broadcast | [WRITE, wts, v] ⟩; upon event ⟨ beb, Deliver | p, [WRITE, ts′, v′] ⟩ do if ts′ > ts then (ts, val) := (ts′, v′); trigger ⟨ pl, Send | p, [ACK, ts′] ⟩; upon event ⟨ pl, Deliver | q, [ACK, ts′] ⟩ such that ts′ = wts do acks := acks + 1; if acks > N/2 then acks := 0; trigger ⟨ onrr, WriteReturn ⟩; upon event ⟨ onrr, Read ⟩ do rid := rid + 1; readlist := [⊥]; trigger ⟨ beb, Broadcast | [READ, rid] ⟩; upon event ⟨ beb, Deliver | p, [READ, r] ⟩ do trigger ⟨ pl, Send |p, [VALUE, r, ts, val] ⟩; upon event ⟨ pl, Deliver | q, [VALUE, r, ts′, v′] ⟩ such that r = rid do readlist[q] := (ts′ , v′); if #(readlist) > N/2 then v := highestval(readlist); readlist := [⊥]; trigger ⟨ onrr, ReadReturn | v ⟩;
(1, N) Atomic Registers
Specification:
Module: Name: (1, N)-AtomicRegister, instance onar. Events: Request: ⟨ onar, Read ⟩: Invokes a read operation on the register. Request: ⟨ onar, Write | v ⟩: Invokes a write operation with value v on the register. Indication: ⟨ onar, ReadReturn | v ⟩: Completes a read operation on the register with return value v. Indication: ⟨ onar, WriteReturn ⟩: Completes a write operation on the register. Properties: 1. Termination: If a correct process invokes an operation, then the operation eventually completes. 2. Validity: A read that is not concurrent with a write returns the last value written; a read that is concurrent with a write returns the last value written or the value concurrently written. 3. Ordering: If a read returns a value v and a subsequent read returns a value w, then the write of w does not precede the write of v.
Implementation for Fail-Stop model: Read-Impose Write-All
The changes with respect to the Read-One Write-All (1, N) Regular Register algorithm are as follows:
- The reader now also writes to all processes before return the value
- Timestamp is introduced
It guarantees that a later READ R1 can’t read an older value than an earlier READ R2, because R2 should have imposed the newer value to all processes.
Implements: (1, N)-AtomicRegister, instance onar. Uses: BestEffortBroadcast, instance beb; PerfectPointToPointLinks, instance pl; PerfectFailureDetector, instance P. upon event ⟨ onar, Init ⟩ do (ts, val) := (0, ⊥); correct := Π; writeset := ∅; readval := ⊥; reading := FALSE; upon event ⟨ P, Crash | p ⟩ do correct := correct \ {p}; upon event ⟨ onar, Read ⟩ do reading := TRUE; readval := val; trigger ⟨ beb, Broadcast | [WRITE, ts, val] ⟩; upon event ⟨ onar, Write | v ⟩ do trigger ⟨ beb, Broadcast | [WRITE, ts + 1, v] ⟩; upon event ⟨ beb, Deliver | p, [WRITE, ts′, v′] ⟩ do if ts′ > ts then (ts, val) := (ts′, v′); trigger ⟨ pl, Send | p, [ACK] ⟩; upon event ⟨ pl, Deliver | p, [ACK] ⟩ do writeset := writeset ∪ {p}; upon correct ⊆ writeset do writeset := ∅; if reading = TRUE then reading := FALSE; trigger ⟨ onar, ReadReturn | readval ⟩; else trigger ⟨ onar, WriteReturn ⟩;
Implementation for Fail-Silent model: Read-Impose Write-Majority
The algorithm assumes a majority of correct processes. The writer simply makes sure a majority adopts its value. A reader selects the value with the largest timestamp from a majority and imposes this value and makes sure a majority adopts it before completing the read operation.
Implements: (1, N)-AtomicRegister, instance onar. Uses: BestEffortBroadcast, instance beb; PerfectPointToPointLinks, instance pl. upon event ⟨ onar, Init ⟩ do (ts, val) := (0, ⊥); wts := 0; acks := 0; rid := 0; readlist := [⊥]; readval := ⊥; reading := FALSE; upon event ⟨ onar, Read ⟩ do rid := rid + 1; acks := 0; readlist := [⊥]; reading := TRUE; trigger ⟨ beb, Broadcast | [READ, rid] ⟩; upon event ⟨ beb, Deliver | p, [READ, r] ⟩ do trigger ⟨ pl, Send | p, [VALUE, r, ts, val] ⟩; upon event ⟨ pl, Deliver | q, [VALUE, r, ts′, v′] ⟩ such that r = rid do readlist[q] := (ts′ , v′); if #(readlist) > N/2 then (maxts, readval) := highest(readlist); readlist := [⊥]; trigger ⟨ beb, Broadcast | [WRITE, rid, maxts, readval] ⟩; upon event ⟨ onar, Write | v ⟩ do rid := rid + 1; wts := wts + 1; acks := 0; trigger ⟨ beb, Broadcast | [WRITE, rid, wts, v] ⟩; upon event ⟨ beb, Deliver | p, [WRITE, r, ts′, v′] ⟩ do if ts′ > ts then (ts, val) := (ts′, v′); trigger ⟨ pl, Send | p, [ACK, r] ⟩; upon event ⟨ pl, Deliver | q, [ACK, r] ⟩ such that r = rid do acks := acks + 1; if acks > N/2 then acks := 0; if reading = TRUE then reading := FALSE; trigger ⟨ onar, ReadReturn | readval ⟩; else trigger ⟨ onar, WriteReturn ⟩;
(N, N) Atomic Registers
Specification:
Module: Name: (N, N )-AtomicRegister, instance nnar. Events: Request: ⟨ nnar, Read ⟩: Invokes a read operation on the register. Request: ⟨ nnar, Write | v ⟩: Invokes a write operation with value v on the register. Indication: ⟨ nnar, ReadReturn | v ⟩: Completes a read operation on the register with return value v. Indication: ⟨ nnar, WriteReturn ⟩: Completes a write operation on the register. Properties: 1. Termination: If a correct process invokes an operation, then the operation eventually completes. 2. Atomicity: Every read operation returns the value that was written most recently in a hypothetical execution, where every failed operation appears to be complete or does not appear to have been invoked at all, and every complete operation appears to have been executed at some instant between its invocation and its completion.
Implementation for Fail-Stop model: Read-Impose Write-Consult-All
Question: why the algorithm Read-Impose Write-All (1, N) Atomic Register doesn’t work? This is because two concurrent writes may create the same timestamp, thus the same timestamp may associate with different values, thus creating READ scenarios impossible to linearize. The alogithm has to use process rank to impose an order if two timestamps are equal – total order is the essence of atomicity.
Implements: (N, N)-AtomicRegister, instance nnar. Uses: BestEffortBroadcast, instance beb; PerfectPointToPointLinks, instance pl; PerfectFailureDetector, instance P. upon event ⟨ nnar, Init ⟩ do (ts, wr, val) := (0, 0, ⊥); correct := Π; writeset := ∅; readval := ⊥; reading := FALSE; upon event ⟨ P, Crash | p ⟩ do correct := correct \ {p}; upon event ⟨ nnar, Read ⟩ do reading := TRUE; readval := val; trigger ⟨ beb, Broadcast | [WRITE, ts, wr, val] ⟩; upon event ⟨ nnar, Write | v ⟩ do trigger ⟨ beb, Broadcast | [WRITE, ts + 1, rank(self), v] ⟩; upon event ⟨ beb, Deliver | p, [WRITE, ts′, wr′, v′] ⟩ do if (ts′, wr′) is larger than (ts, wr) then (ts, wr, val) := (ts′, wr′, v′); trigger ⟨ pl, Send | p, [ACK] ⟩; upon event ⟨ pl, Deliver | p, [ACK] ⟩ do writeset := writeset ∪ {p}; upon correct ⊆ writeset do writeset := ∅; if reading = TRUE then reading := FALSE; trigger ⟨ nnar, ReadReturn | readval ⟩; else trigger ⟨ nnar, WriteReturn ⟩;
Implementation for Fail-Silent model: Read-Impose Write-Consult-Majority
The algorithm assumes a majority of correct processes. Compared to the Read-Impose Write-Majority algorithm, the reader is unchanged, it selects the value with the largest timestamp(now augmented with process rank) from a majority and imposes this value and makes sure a majority adopts it before completing the read operation. The writer also reads from a majority to get the latest timestamp and then make sure a majority adopts its value.
Implements: (N, N)-AtomicRegister, instance nnar. Uses: BestEffortBroadcast, instance beb; PerfectPointToPointLinks, instance pl. upon event ⟨ nnar, Init ⟩ do (ts, wr, val) := (0, 0, ⊥); acks := 0; writeval := ⊥; rid := 0; readlist := [⊥]; readval := ⊥; reading := FALSE; upon event ⟨ nnar, Read ⟩ do rid := rid + 1; acks := 0; readlist := [⊥]; reading := TRUE; trigger ⟨ beb, Broadcast | [READ, rid] ⟩; upon event ⟨ beb, Deliver | p, [READ, r] ⟩ do trigger ⟨ pl, Send | p,[VALUE, r, ts, wr, val] ⟩; upon event ⟨ pl, Deliver | q, [VALUE, r, ts′, wr′, v′] ⟩ such that r = rid do readlist[q] := (ts′, wr′, v′); if #(readlist) > N/2 then (maxts, rr, readval) := highest(readlist); readlist := [⊥]; if reading = TRUE then trigger ⟨ beb, Broadcast | [WRITE, rid, maxts, rr, readval] ⟩; else trigger ⟨ beb, Broadcast | [WRITE, rid, maxts + 1, rank(self), writeval] ⟩; upon event ⟨ nnar, Write | v ⟩ do rid := rid + 1; writeval := v; acks := 0; readlist := [⊥]; trigger ⟨ beb, Broadcast | [READ, rid] ⟩; upon event ⟨ beb, Deliver | p, [WRITE, r, ts′, wr′, v′] ⟩ do if (ts′, wr′) is larger than (ts, wr) then (ts, wr, val) := (ts′, wr′, v′); trigger ⟨ pl, Send | p, [ACK, r] ⟩; upon event ⟨ pl, Deliver | q, [ACK, r] ⟩ such that r = rid do acks := acks + 1; if acks > N/2 then acks := 0; if reading = TRUE then reading := FALSE; trigger ⟨ nnar, ReadReturn | readval ⟩; else trigger ⟨ nnar, WriteReturn ⟩;
To argue that the algorithms satisfy the atomicity property, we can follow the steps below:
- Construct a total-order relationship to linearize any execution history of the algorithm.
- Argue that the linearization preserves all precedence relationships between operations in the execution history.
Reference
- Rachid Guerraoui and Luis Rodrigues - Introduction to Reliable Distributed Programming amazon.de
- Distributed Algorithm Course Page at EPFL