作者都是各自领域经过审查的专家,并撰写他们有经验的主题. 我们所有的内容都经过同行评审,并由同一领域的Toptal专家验证.
Marko Dvečko
验证专家 in Engineering

Marko有12年以上的工作经验. 他拥有四个Salesforce.他的兴趣是数学和函数式编程.

Read More

PREVIOUSLY AT

CloudSense
Share

什么是并发编程? 简单地说,就是你同时在做不止一件事. 不要和平行搞混了, 并发或并发编程是指在重叠的时间段内运行多个操作序列. 在编程领域,并发是一个相当复杂的主题. 处理线程和锁等结构以及避免竞争条件和死锁等问题可能相当麻烦, 使并发程序难以编写. 通过并发, 程序可以设计为在特定组合中一起工作的独立进程. Such a structure may or may not be made parallel; however, 在程序中实现这样的结构提供了许多优点.

并行编程概论

在本文中, 我们将研究一些不同的并发模型, 如何在各种编程语言中实现它们 为并发性而设计.

共享可变状态模型

让我们看一个简单的示例,其中有一个计数器和两个增加计数器的线程. 这个程序不应该太复杂. 我们有一个对象,它包含一个计数器,该计数器随着方法increase而增加, 然后用get方法和两个增加它的线程来检索它.

//
// Counting.java
//
公共类计数{
    public static void main(String[] args)抛出InterruptedException {
	类计数器{
	    Int counter = 0;
	    public void increment() { counter++; }
	    public int get() { return counter; }
	}
	
        final Counter Counter = new Counter();
        
        类CountingThread扩展线程{
            公共无效运行(){
                for (int x = 0; x < 500000; x++) {
                    counter.increment();
                }
            }
        }

        CountingThread t1 = new CountingThread();
        CountingThread t2 = new CountingThread();
        t1.start(); t2.start();
        t1.join(); t2.join();
        System.out.println(计数器.get());
    }
}

这个简单的程序并不像乍一看那么简单. 当我多次运行这个程序时,我得到了不同的结果. 在我的笔记本电脑上,执行三次之后有三个值.

java Counting
553706
java Counting
547818
java Counting
613014

这是什么原因呢 不可预知的行为? 程序在一个地方增加计数器,在使用命令计数器++的方法中增加. 如果我们看一下命令字节码,我们会看到它由几个部分组成:

  1. 从内存中读取计数器值
  2. 本地增值
  3. 将计数器值存储在内存中

现在我们可以想象在这个序列中会出现什么问题. 如果我们有两个线程独立地增加计数器,那么我们可以有这样的场景:

  1. 计数器值为115
  2. 第一个线程从内存中读取计数器的值(115)
  3. 第一个线程增加本地计数器值(116)
  4. 第二个线程从内存中读取计数器的值(115)
  5. 第二个线程增加本地计数器值(116)
  6. 第二个线程将本地计数器值保存到内存中(116)
  7. 第一个线程将本地计数器值保存到内存中(116)
  8. 计数器的值为116

在这个场景中, 两个线程交织在一起,因此计数器值增加1, 但是计数器值应该增加2,因为每个线程增加1. 不同的线程相互纠缠影响程序的结果. 程序不可预见性的原因是程序对线程的缠绕没有控制,只有操作系统. 每次执行程序时,线程会以不同的方式缠绕在一起. 通过这种方式,我们为程序引入了意外的不可预测性(非决定论).

为了修正这种偶然的不可预测性(非决定论), 程序必须能够控制线程的缠绕. 当一个线程在方法增加中,另一个线程不能在同一个方法中,直到第一个线程出来. 通过这种方式,我们序列化了对方法increase的访问.

//
/ / CountingFixed.java
//
公共类计数固定{
    public static main(String[] args)抛出InterruptedException {
        类计数器{
            Int counter = 0;
            public synchronized void increase() { counter++; }
            public synchronized int get() { return counter; }
        }
        final Counter Counter = new Counter();
        
        类CountingThread扩展线程{
            公共无效运行(){
                for (int i = 0; i < 500000; i++) {
                    counter.increment();
                }
            }
        }

        thread1 = new CountingThread();
        thread2 = new CountingThread();
        thread1.start(); thread2.start();
        thread1.join(); thread2.join();
        System.out.println(计数器.get());
    }
}

另一个解决方案是使用一个可以自动增加的计数器, 意思是操作不能分割成多个操作. 通过这种方式,我们不需要有需要同步的并发代码块. Java中有原子数据类型.util.concurrent.原子名称空间,我们将使用AtomicInteger.

//
/ / CountingBetter.java
//
import java.util.concurrent.atomic.AtomicInteger;

类CountingBetter {
    public static void main(String[] args)抛出InterruptedException {
        最终AtomicInteger计数器=新的AtomicInteger(0);

        类CountingThread扩展线程{
            公共视频运行(){
                for (int i = 0; i < 500000; i++) {
                    counter.incrementAndGet ();
                }
            }
        }
        thread1 = new CountingThread();
        thread2 = new CoutningThread();
        thread1.start(); thread2.start();
        thread1.join(); thread2.join();
        System.out.println(计数器.get());
    }
}

原子整数具有我们需要的操作,因此可以使用它来代替Counter类. 有趣的是,atomicinteger的所有方法都不使用锁, 这样就不会有死锁的可能, 哪些有助于程序的设计.

Using synchronized 关键字同步方法应该解决所有问题,对吧? 假设我们有两个账户,它们可以存款、取款和转账到另一个账户. 如果同时我们想把钱从一个账户转到另一个账户,反之亦然,会发生什么? 让我们来看一个例子.

//
// Deadlock.java
//
公共类死锁{
    public static void main(String[] args)抛出InterruptedException {
        class Account {
            Int balance = 100;
            公共帐户(int balance) {this.Balance =余额; }
            public synchronized void deposit(int amount) { 余额+=金额; }
            公共同步布尔撤回(int amount) {
                if (balance >= amount) {
                    Balance -=金额;
                    return true;
                }
                return false;
            }
            公共同步布尔转账(帐户目的地,int金额){
                if (balance >= amount) {
                    Balance -=金额;
                    同步(目的地){
                        destination.余额+=金额;
                    };
                    return true;
                }
                return false;
            }
            public int getBalance() { return balance; }
        }

        最终账户bob =新账户(200000);
        最终账户joe =新账户(300000);

        类FirstTransfer扩展线程{
            公共无效运行(){
                for (int i = 0; i < 100000; i++) {
                    bob.转让(乔,2);
                }
            }
        }
        类SecondTransfer扩展线程{
            公共无效运行(){
                for (int i = 0; i < 100000; i++) {
                    joe.转让(bob, 1);
                }
            }
        }

        FirstTransfer thread1 = new FirstTransfer();
        thread2 = new SecondTransfer();
        thread1.start(); thread2.start();
        thread1.join(); thread2.join();
        System.out.println("Bob's balance: " + Bob.getBalance());
        System.out.println(“乔的余额:”+乔.getBalance());
    }
}

当我在笔记本电脑上运行这个程序时,它通常会卡住. 为什么会发生这种情况?? 如果我们仔细观察, 我们可以看到,当我们转账时,我们正在进入同步的转账方法,并锁定源账户上所有同步方法的访问权限, 然后锁定目标帐户,该帐户锁定对其上所有同步方法的访问.

想象一下下面的场景:

  1. 第一个线程调用从Bob的帐户转移到Joe的帐户
  2. 第二个线程调用从Joe的帐户转移到Bob的帐户
  3. 第二个线程从Joe的帐户中减少金额
  4. 第二个线程将存款金额存入Bob的账户,但等待第一个线程完成转账.
  5. 第一个线程从Bob的帐户中减少金额
  6. 第一个线程将存款金额存入Joe的帐户,但等待第二个线程完成转账.

在这个场景中,一个线程等待另一个线程完成传输,反之亦然. 他们被困在一起,计划无法继续. This is called deadlock. 为了避免死锁,必须以相同的顺序锁定帐户. 为了修复这个程序,我们将给每个账户一个唯一的号码,这样我们就可以在转账时以相同的顺序锁定账户.

//
/ / DeadlockFixed.java
//
import java.util.concurrent.atomic.AtomicInteger;

公共类DeadlockFixed {
    public static void main(String[] args)抛出InterruptedException {
        最终AtomicInteger计数器=新的AtomicInteger(0);
        class Account {
            Int balance = 100;
            int order;
            公共帐户(int balance) {
                this.Balance =余额;
                this.order = counter.getAndIncrement ();
            }
            public synchronized void deposit(int amount) { 余额+=金额; }
            公共同步布尔撤回(int amount) {
                if (balance >= amount) {
                    Balance -=金额;
                    return true;
                }
                return false;
            }
            public boolean transfer(帐户目的地,int amount) {
                Account first;
                Account second;
                if (this.order < destination.order) {
                    first = this;
                    Second = destination;
                }
                else {
                    First =目的地;
                    second = this;
                }
                同步(第一个){
                    同步(秒){
                        if (balance >= amount) {
                            Balance -=金额;
                            destination.余额+=金额;
                            return true;
                        }
                        return false;
                    }
                }
            }
            public synchronized int getBalance() { return balance; }
        }

        最终账户bob =新账户(200000);
        最终账户joe =新账户(300000);

        类FirstTransfer扩展线程{
            公共无效运行(){
                for (int i = 0; i < 100000; i++) {
                    bob.转让(乔,2);
                }
            }
        }
        类SecondTransfer扩展线程{
            公共无效运行(){
                for (int i = 0; i < 100000; i++) {
                    joe.转让(bob, 1);
                }
            }
        }

        FirstTransfer thread1 = new FirstTransfer();
        thread2 = new SecondTransfer();
        thread1.start(); thread2.start();
        thread1.join(); thread2.join();
        System.out.println("Bob's balance: " + Bob.getBalance());
        System.out.println(“乔的余额:”+乔.getBalance());
    }
}

由于这种错误的不可预测性, 这种情况时有发生, 但并非总是如此,而且它们很难繁殖. 如果程序的行为不可预测, 它通常是由并发性引起的,并发性引入了偶然的非确定性. 为了避免偶然的非决定论,我们应该提前设计程序来考虑所有的交织.

一个具有意外非确定性的程序的例子.

//
/ / NonDeteminism.java
//
NonDeterminism {
    public static void main(String[] args)抛出InterruptedException {
        容器类{
            public String value = "空";
        }
        final Container = new Container();

        类FastThread扩展线程{
            公共无效运行(){
                container.value = "Fast";
            }
        }

        类SlowThread扩展线程{
            公共无效运行(){
                try {
                    Thread.sleep(50);
                }
                catch(异常e) {}
                container.value = "慢";
            }
        }
        
        FastThread = new FastThread();
        SlowThread = new SlowThread();
        fast.start(); slow.start();
        fast.join(); slow.join();
        System.out.println(容器.value);
    }
}

这个程序有偶然的非决定论. 将显示在容器中输入的最后一个值.

java非确定性
Slow

较慢的线程将稍后输入该值,并且将打印此值(慢). 但事实并非如此. 如果计算机同时执行另一个需要大量CPU资源的程序怎么办? 我们不能保证它一定是最后进入value的较慢的线程,因为它是由操作系统控制的, 不是程序. 我们可能会遇到程序在一台计算机上运行而在另一台计算机上表现不同的情况. 这种并发计算错误很难发现,而且会给开发人员带来麻烦. 由于所有这些原因,这种并发模型很难做好.

Functional Way

Parallelism

让我们看看函数式语言使用的另一个模型. 例如,我们将使用 Clojure,这可以用这个工具来解释 Leiningen. Clojure是一种非常有趣的语言,对并发性有很好的支持. 以前的并发模型采用共享可变状态. 我们使用的类也可能有一个隐藏状态,它会在我们不知道的情况下发生变异, 因为从他们的API中看不出来. 正如我们所看到的,如果我们不小心,这个模型可能会导致意外的非确定性和死锁. 函数式语言的数据类型不会发生变化,因此可以安全地共享,而不会有数据类型发生变化的风险. 函数具有属性和其他数据类型. 函数可以在程序执行期间创建,并作为参数传递给另一个函数,或者作为函数调用的结果返回.

并发编程的基本原语是未来的和有希望的. Future在另一个线程中执行一个代码块,并返回一个对象,该对象将在执行该块时输入未来值.

;
; future.clj
;
(let [a (future
          (打印“开始A”)
          (线程睡眠/ 1000)
          (打印“Finished A”)
          (+ 1 2))
      b (future
          (打印“start B”)
          (线程睡眠/ 2000)
          (印刷“成品B”)
          (+ 3 4))]
  (印刷“等待期货”)
  (+ @a @b))

当我执行这个脚本时,输出是:

Started A
Started B
等待未来
Finished A
Finished B
10

在这个例子中,我们有两个独立执行的future块. 程序仅在从尚未可用的future对象中读取值时阻塞. 在本例中,等待对未来区块的两个结果进行求和. 行为是可预测的(确定性的),并且总是给出相同的结果,因为没有共享的可变状态.

另一个用于并发的原语是承诺. Promise是一个容器,一个值只能放进去一次. 当读取promise时,线程将等待,直到promise的值被填满.

;
; promise.clj
;
(def result (promise))
(future (println " result is: " @result))
(线程睡眠/ 2000)
(提交结果42)

在本例中, future 会不会等到打印结果,只要承诺不保存值. 两秒钟后,在promise中会存储值42,以便在以后的线程中打印. Using promises 会导致死锁而不是未来,所以在使用promise的时候要小心.

;
; promise-deadlock.clj
;
(def promise-result (promise))
(def future-result
  (future
    (println "The result is: " + @promise-result)
    13))
(println "Future -result is: " @future-result)
(提交结果42)

在这个例子中,我们使用了未来的结果和承诺的结果. 设置和读取值的顺序是:主线程等待来自未来线程的值,而未来线程等待来自主线程的值. 这种行为将是可预测的(确定性的),并将在每次程序执行时播放,这使得查找和删除错误更容易.

使用future允许程序继续练习,直到它需要future执行的结果. 这将导致更快的程序执行. 如果将来有多个处理器, 您可以并行执行具有可预测(确定性)行为的程序(每次都给出相同的结果)。. 这样我们就能更好地利用计算机的能力.

;
; fibonacci.clj
;
(defn斐波纳契(一个)
  (if (<= a 2)
    1
    (+ (fibonacci (- a 1)) (fibonacci (- a 2))))))

(println“开始串行计算”)
(time (println "结果是:" (+ (fibonacci 36) (fibonacci 36)))) .
(println“开始并行计算”)

(defn parallel-fibonacci []
  (defresult -1 (future (fibonacci 36)))
  (defresult -2 (future (fibonacci 36)))
  (+ @result-1 @result-2))
(time (println "结果是:" (parallel-fibonacci))))

在这个例子中,你可以看到如何使用future可以更好地利用计算机的速度. 我们有两个斐波那契数相加. 我们可以看到程序计算了两次结果, 第一次顺序地在单个线程中执行, 第二次是在两个线程中并行执行. 因为我的笔记本电脑有多核处理器, 并行执行的速度是顺序计算的两倍.

在我的笔记本电脑上执行这个脚本的结果:

开始串行计算
结果为:29860704
运行时间:2568.816524 msecs"
开始并行计算
结果为:29860704
“运行时间:1216.991448 msecs"

Concurrency

在Clojure编程语言中支持并发性和不可预测性, 我们必须使用可变的数据类型,以便其他线程可以看到更改. 最简单的变量数据类型是原子. Atom 是一个容器,总是有价值,可以由另一个值取代. 该值可以通过输入一个新值或调用一个函数来替换,该函数接受旧值并返回更常用的新值. 有趣的是,atom是在没有锁定的情况下实现的,并且在线程中使用它是安全的, 这意味着不可能陷入僵局. 在内部,atom使用java.util.concurrent.AtomicReference图书馆. 让我们看一个用atom实现的反例.

;
; atom-counter.clj
;
(defcounter(原子0))
(defattempts (atom 0))

(defn counter-increases []
  [cnt 500000]
    (swap! 计数器(fn[计数器])
                     (swap! attempts inc) ; side effect DO NOT DO THIS
                     (公司柜台)))))

(def - first-future (future (counter-increase)))
(def - second-future (future (counter-increase)))
; Wait for futures to complete
@first-future
@second-future
; Print value of the counter
(println "The counter is: " @counter)
(println“尝试次数:”@attempts)

脚本在我的笔记本电脑上执行的结果:

计数器为:1000000
尝试次数:1680212

在本例中,我们使用了一个包含计数器值的原子. 计数器随着swap的增加而增加! counter inc). Swap函数是这样工作的: 1. 取下计数器的值并保存 2. 对于这个值,调用计算新值的给定函数 3. 为了保存新值,它使用原子操作来检查旧值是否已更改 3a. 如果该值没有更改,则输入一个新值 3b. 如果其间有修改,则请执行步骤1 我们看到,如果在此期间改变了值,则可以再次调用该函数. 该值只能从另一个线程更改. Therefore, 重要的是,计算新值的函数没有副作用,因此如果它被调用多次也没关系. atom的一个限制是它将更改同步到一个值.

;
; atom-acocunts.clj
;
(def Bob (atom 200000))
(def Joe (atom 300000))
(定义不一致(原子0))

(定义transfer[来源目的地金额]
  (if (not= (+ @bob @joe) 500000! 不一致(aapl . o:行情)))
  (swap! 来源-金额)
  (swap! 目的地+金额))

(定义first-transfer []
  [cnt 100000]
    (转Bob Joe 2))

(定义second-transfer []
  [cnt 100000]
    (调动乔·鲍勃))

(def - first-future (future (first-transfer)))
(def - second-future (future (second-transfer)))
@first-future
@second-future
(println "Bob has in account: " @bob)
(println“Joe has in account:”@joe)
(println "传输时不一致:" @不一致)

当我执行这个脚本时,我得到:

鲍勃的账户是100000
乔的账户有40万
传输时不一致:36525

在这个例子中,我们可以看到如何改变更多的原子. 在某一点上,可能会发生不一致. 有时两个账户的金额不一样. 如果我们必须协调多个值的变化,有两种解决方案:

  1. 在一个原子中放置更多的值
  2. 使用引用和软件事务性内存,我们将在后面看到
;
; atom-accounts-fixed.clj
;
(def accounts (atom {:bob 200000,:joe 300000}))
(定义不一致(原子0))

(定义transfer[来源目的地金额]
  (let [deleef -accounts @accounts]
    (if (not= (+ (get deref-accounts:bob) (get deref-accounts:joe)) 500000)
      (swap! 不一致(aapl . o:行情)))
    (swap! accounts
           (fn [accs]
             (update (update access源-量)目的+量)))))

(定义first-transfer []
  [cnt 100000]
    (transfer:bob:joe))


(定义second-transfer []
  [cnt 100000]
    (transfer:joe:bob))

(def - first-future (future (first-transfer)))
(def - second-future (future (second-transfer)))
@first-future
@second-future
(println "Bob has in account: "(获取@accounts: Bob))
(println“Joe has in account:”(获取@accounts: Joe))
(println "传输时不一致:" @不一致)

当我在我的电脑上运行这个脚本时,我得到:

鲍勃的账户是100000
乔的账户有40万
传输时不一致性:0

在本例中,已经解决了协调问题,因此我们可以使用映射赋予更多的价值. 当我们从账户转账时, 我们同时更改了所有的账户,这样就永远不会发生钱的总和不一样的情况.

下一个变量数据类型是agent. Agent的行为类似于原子,只是改变值的函数在不同的线程中执行, 所以改变需要一些时间才能显现出来. Therefore, 在读取代理的值时,有必要调用一个函数,该函数等待所有更改代理值的函数执行完毕. 与原子函数不同,函数只调用一次值,因此可能有副作用. 此类型还可以同步一个值,并且不会死锁.

;
; agent-counter.clj
;
(defcounter(代理0))
(defattempts (atom 0))

(defn counter-increases []
  [cnt 500000]
    (发送计数器(fn [counter])
                    (swap! attempts inc)
                    (公司柜台)))))

(def - first-future (future (counter-increase)))
(def - second-future (future (counter-increase)))
; wait for futures to complete
@first-future
@second-future
; wait for counter to be finished with updating
(await counter)
; print the value of the counter
(println "The counter is: " @counter)
(println“尝试次数:”@attempts)

当我在笔记本电脑上运行这个脚本时,我得到:

计数器为:1000000
尝试次数:1000000

这个示例与使用原子的计数器的实现相同. 唯一的区别是,这里我们在使用await读取最终值之前等待所有代理更改完成.

最后一个变量数据类型是引用. 与原子不同,引用可以同步对多个值的更改. 对引用的每个操作都应该在使用dosync的事务中. 这种改变数据的方式被称为软件事务性内存或缩写为STM. 让我们看一个在账户中转账的例子.

;
;  stm-accounts.clj
;
(def Bob (ref 200000))
(def Joe (ref 300000))
(定义不一致(原子0))
(defattempts (atom 0))
(deftransfers(代理0))

(定义transfer[来源目的地金额]
  (dosync
   (swap! attempts inc) ; side effect DO NOT DO THIS
   (汇款公司)
   (当(not= (+ @bob @joe) 500000)
     (swap! 不一致(aapl . o:行情))) ; side effect DO NOT DO THIS
   (alter 来源-金额)
   (alter 目的地+金额))

(定义first-transfer []
  [cnt 100000]
    (转Bob Joe 2))

(定义second-transfer []
  [cnt 100000]
    (调动乔·鲍勃))

(def - first-future (future (first-transfer)))
(def - second-future (future (second-transfer)))
@first-future
@second-future
(等待转移)
(println "Bob has in account: " @bob)
(println“Joe has in account:”@joe)
(println "传输时不一致:" @不一致)
(println "Attempts: " @attempts)
(println "Transfers: " @transfers)

当我运行这个脚本时,我得到:

鲍勃的账户是100000
乔的账户有40万
传输时不一致性:0
尝试:330841
传输:200000

有趣的是,尝试次数比交易次数要多. 这是因为STM不使用锁, 所以如果有冲突, (就像两个线程试图改变相同的值)事务将被重新执行. 由于这个原因,事务不应该有副作用. 我们可以看到,在事务中值变化的代理的行为是可预测的. 改变代理值的函数将在有交易的情况下被评估多少次. 原因是代理是事务感知的. 如果事务必须有副作用,它们应该在代理中发挥作用. 通过这种方式,程序将具有可预测的行为. 您可能认为应该始终使用STM, 但经验丰富的程序员通常会使用原子,因为原子比STM更简单、更快. 当然,这是在有可能用这种方式制作程序的情况下. 如果你有副作用,那么就没有其他选择,只能使用STM和药物.

Actor Model

下面的并发模型是 actor model. 这个模型的原理与现实世界相似. 如果我们和很多人达成协议去创造一些东西, 比如一栋建筑, 建筑工地的每个人都有自己的角色. 一群人由主管监督. 如果工人在工作中受伤, 主管将把受伤的人的工作分配给其他可用的人. 如果有必要,他可能会带一个新人去那个地方. 在网站上,我们有更多的人同时做这项工作。, 同时也通过相互交流来实现同步. 如果我们把建筑工地的工作纳入计划, 那么每个人都是一个行动者,有自己的状态,按照自己的过程执行, 谈话将被信息所取代. 基于这种模型的流行编程语言是Erlang. 这种有趣的语言具有不可变的数据类型和函数,它们与其他数据类型具有相同的属性. 函数可以在程序执行期间创建,并作为参数传递给另一个函数,或者作为函数调用的结果返回. 我会在 Elixir 使用Erlang虚拟机的语言, 所以我将有与Erlang相同的编程模型,只是语法不同. 《欧博体育app下载》中最重要的三个基本元素是刷出、发送和接收. Spawn在新进程中执行函数, Send将消息发送到流程,receive接收发送到当前流程的消息.

带有参与者模型的第一个示例将同时进行计数器增加. 用这个模型做一个程序, 有必要使参与者具有计数器的值,并接收消息来设置和检索计数器的值, 让两个参与者同时增加计数器的价值.

#
# Counting.exs
#
defmodule计数
  Def counter(value) do
    receive do
      {:get, sender} ->
        发送发送方,{:计数器,值}
        counter value
      {:set, new_value} -> counter(new_value)
    end
  end

 defcounting (sender, counter, times
    if times > 0 do
      发送计数器,{:get, self}
      receive do
        {:counter, value} -> send counter, {:set, value + 1}
      end
      计数(sender, counter, times - 1)
    else
      发送发送者,{:done, self}
    end
  end
end

counter = spawn fn -> Counting.counter 0 end

IO.显示“开始计数进程”
this = self
counting1 = spawn fn ->
  IO.开始"计数A "
  Counting.计数,计数器,500_000
  IO."数A "结束了
end
counting2 = spawn fn ->
  IO.“计数B开始”
  Counting.计数,计数器,500_000
  IO.“计数B完成”
end

IO.写着“等待计数完成”
receive do
  {:done, ^counting1} -> nil
end
receive do
  {:done, ^counting2} -> nil
end

发送计数器,{:get, self}
receive do
  {:counter, value} -> IO.put "Counter is: #{value}"
end

当我执行这个例子时,我得到:

开始计数过程
A开始计数
等待计数完成
B开始计数
计数A完成
B完成
柜台是:516827

我们可以看到,最后的计数器是516827,而不是我们预期的1000000. 当我下次运行脚本时,我收到了511010. 产生此行为的原因是计数器接收两条消息:检索当前值和设置新值. 增加计数器, 程序需要获取当前值, 将其增加1并设置增加的值. 两个进程使用发送给计数器进程的消息同时读写计数器的值. 计数器接收的消息顺序是不可预测的,程序无法控制它. 我们可以想象这样的场景:

  1. 计数器值为115
  2. 进程A读取计数器的值(115)
  3. 进程B读取计数器的值(115)
  4. 进程B在本地增加值(116)
  5. 进程B将增加的值设置到计数器(116)
  6. 进程A增加计数器的值(116)
  7. 进程A将增加的值设置到计数器(116)
  8. 计数器值为116

如果我们看一下这个场景, 两个进程将计数器增加1, counter最后增加了1,而不是2. 这样的纠缠可以发生不可预测的次数,因此计数器的值是不可预测的. 为了防止这种行为,递增操作必须由一条消息完成.

#
# CountingFixed.exs
#
defmodule计数
  Def counter(value) do
    receive do
      :increase -> counter(value + 1)

      {:get, sender} ->
        发送发送方,{:计数器,值}
        counter value
    end
  end

 defcounting (sender, counter, times
    if times > 0 do
      发送计数器,:增加
      计数(sender, counter, times - 1)
    else
      发送发送者,{:done, self}
    end
  end
end

counter = spawn fn -> Counting.counter 0 end

IO.显示“开始计数进程”
this = self
counting1 = spawn fn ->
  IO.开始"计数A "
  Counting.计数,计数器,500_000
  IO."数A "结束了
end
counting2 = spawn fn ->
  IO.“计数B开始”
  Counting.计数,计数器,500_000
  IO.“计数B完成”
end

IO.写着“等待计数完成”
receive do
  {:done, ^counting1} -> nil
end
receive do
  {:done, ^counting2} -> nil
end

发送计数器,{:get, self}
receive do
  {:counter, value} -> IO.put "Counter is: #{value}"
end

通过运行这个脚本,我得到:

开始计数过程
A开始计数
等待计数完成
B开始计数
计数A完成
B完成
计数器为:1000000

我们可以看到计数器的值是正确的. 可预测(确定性)行为的原因是,计数器的值每增加一条消息,以便增加计数器的消息序列不会影响其最终值. 使用演员模型, 我们必须注意信息是如何相互交织的,并仔细设计信息和对信息的操作,以避免意外的不可预测性(非决定论)。.

使用这种模式,我们如何在两个账户之间转账?

#
# Accounts.exs
#
defmodule帐户
  Def账户(状态)可以
    receive do
      {:transfer, source, destination, amount} ->
        accounts %{state | source => state[source] - amount , destination => state[destination] + amount}
      {:amounts, accounts, sender } ->
        send sender, {:amounts, for account <- accounts do
                        {账户,状态(账户)}
                     end}
        accounts(state)
    end
  end

  Def transfer(发送者,账户,来源,目的地,数量,时间,不一致)做
    if times > 0 do
      发送账户,{:金额,[来源,目的地],self}
      receive do
        {:amounts, amounts} ->
          如果amount[来源]+ amount[目的地] != 500_000 do
            Agent.update(inconsistencies, fn value -> value + 1 end)
          end
      end
      发送账户,{:转账,来源,目的,金额}
      转账(发件人、账户、来源、目的地、金额、次数- 1、不一致)
    else
      发送发送者,{:done, self}
    end
  end
end

accounts = spawn fn -> Accounts.帐户(%{bob: 200_000, joe: 300_000})结束
{:ok,不一致}= Agent.start(fn -> 0 end)
this = self
transfer1 = spawn fn ->
  IO.将“转移A启动”
  Accounts.转账(this, accounts,:bob,:joe, 2,100_000,不一致)
  IO.显示“转移A完成”
end
transfer2 = spawn fn ->
  IO.设置“转移B开始”
  Accounts.转账(this, accounts,:joe,:bob, 1,100_000,不一致)
  IO.显示“转让B完成”
end

IO.显示“等待转账完成”
receive do
  {:done, ^transfer1} -> nil
end
receive do
  {:done, ^transfer2} -> nil
end

发送账户,{:金额,[:bob,:joe], self}
receive do
  {:amounts, amounts} ->
    IO.放入“Bob在账户:#{amount [: Bob]}”
    IO.将“Joe在帐户:#{amount [: Joe]}”
    IO.放置“传输时不一致:#{代理.get(inconsistencies, fn x -> x end)}"
end 

当我运行这个脚本时,我得到:

等待传输完成
转移A开始
转移B开始
转移乙完成
转让A完成
鲍勃的账户是100000
乔的账户有40万
传输时不一致性:0

我们可以看到,资金转移工作没有不一致, 因为我们选择了消息传输来转移资金和消息金额以获得帐户的价值,这使我们可以预测程序的行为. 无论何时我们进行转账,任何时候的总金额都应该是相同的.

参与者模型可能导致锁,从而导致死锁,因此在设计程序时要谨慎使用. 下面的脚本展示了如何模拟锁定和死锁场景.

#
# Deadlock.exs
#
defmodule锁定do
  Def loop(state
    receive do
      {:lock, sender} ->
        case state do
          [] ->
            发送发件人,:锁定
            loop([sender])
          _ ->
              循环(state ++ [sender]) 
        end
      {:unlock, sender} ->
        case state do
          [] ->
            loop(state)
          [^sender | []] ->
            loop([])
          [^sender | [next | tail]] ->
            下一个发送,:锁定
            循环([next | tail])
          _ ->
            loop(state)
        end
    end
  end

  Def lock(pid
    发送pid,{:锁,self}
    receive do
      :locked -> nil # This will block until we receive message
    end
  end

  Def解锁(pid)
    发送pid,{:解锁,self}
  end

  Def locking(first, second, times
    if times > 0 do
      lock(first)
      lock(second)
      unlock(second)
      unlock(first)
      锁定(first, second, times - 1)
    end
  end
end

a_lock = spawn fn -> Lock.loop([]) end
b_lock = spawn fn -> Lock.loop([]) end

this = self
IO.“锁定A, B启动”
spawn fn ->
  Lock.锁定(a_lock, b_lock, 1_000)
  IO.显示“锁定A, B完成”
  发送这个,完成
end
IO.“锁定B, A启动”
spawn fn ->
  Lock.锁定(b_lock, a_lock, 1_000)
  IO.显示“锁定B, A完成”
  发送这个,完成
end

IO.放置“等待锁定完成”
receive do
  :done -> nil
end
receive do
  :done -> nil
End

当我在笔记本电脑上运行这个脚本时,我得到:

锁定A, B开始
锁住B, A开始了
等待锁定完成

从输出中我们可以看到锁住A和B的进程被卡住了. 这是因为第一个进程等待第二个进程释放B,而第二个进程等待第一个进程释放A. 他们互相等待,永远被困住了. 为了避免这种锁定, 顺序应该总是一样的, 或者设计一个不使用锁的程序(这意味着它不等待特定的消息). 下面的清单总是先锁定A,然后锁定B.

#
修复死锁
#
defmodule锁定do
  Def loop(state
    receive do
      {:lock, sender} ->
        case state do
          [] ->
            发送发件人,:锁定
            loop([sender])
          _ ->
              循环(state ++ [sender]) 
        end
      {:unlock, sender} ->
        case state do
          [] ->
            loop(state)
          [^sender | []] ->
            loop([])
          [^sender | [next | tail]] ->
            下一个发送,:锁定
            循环([next | tail])
          _ ->
            loop(state)
        end
    end
  end

  Def lock(pid
    发送pid,{:锁,self}
    receive do
      :locked -> nil # This will block until we receive message
    end
  end

  Def解锁(pid)
    发送pid,{:解锁,self}
  end

  Def locking(first, second, times
    if times > 0 do
      lock(first)
      lock(second)
      unlock(second)
      unlock(first)
      锁定(first, second, times - 1)
    end
  end
end

a_lock = spawn fn -> Lock.loop([]) end
b_lock = spawn fn -> Lock.loop([]) end

this = self
IO.“锁定A, B启动”
spawn fn ->
  Lock.锁定(a_lock, b_lock, 1_000)
  IO.显示“锁定A, B完成”
  发送这个,完成
end
IO.“锁定A, B启动”
spawn fn ->
  Lock.锁定(a_lock, b_lock, 1_000)
  IO.显示“锁定A, B完成”
  发送这个,完成
end

IO.放置“等待锁定完成”
receive do
  :done -> nil
end
receive do
  :done -> nil
End

当我在笔记本电脑上运行这个脚本时,我得到:

锁定A, B开始
锁定A, B开始
等待锁定完成
锁定A, B结束
锁定A, B结束

现在,僵局已不复存在.

软件工程中的并发

作为并发编程的介绍,我们已经介绍了一些并发模型. 我们没有涵盖所有的模型,因为这篇文章太大了. 仅举几个例子, 通道和响应式流是其他一些常用的并发模型. 通道和响应式流与参与者模型有许多相似之处. 它们都能传递信息, 但是许多线程可以从一个通道接收消息, 响应流沿一个方向发送消息,形成一端接收消息,另一端发送消息的有向图.

如果我们不提前考虑,共享可变状态模型很容易出错. 它存在竞争条件和死锁问题. 如果我们可以在不同的并发编程模型之间做出选择, 这将更容易实现和维护,但否则我们必须非常小心我们所做的.

函数式方法更容易推理和实现. 不能有死锁. 此模型的性能可能比共享可变状态模型差, 但是一个有效的程序总是比一个不有效的程序要快.

Actor模型是并发编程的一个很好的选择. 尽管存在竞争条件和死锁的问题, 与共享可变状态模型相比,这种情况较少发生,因为进程之间的唯一通信方式是通过消息. 通过良好的进程间消息设计,可以避免这种情况. 如果出现问题,则是在进程之间通信的消息顺序或含义中,并且您知道从哪里查找.

我希望本文能让您对并发编程是什么以及它如何为您编写的程序提供结构有所了解.

了解基本知识

  • 什么是并发编程?

    并发编程是将多个代码块编排在重叠的时间段内运行.

聘请Toptal这方面的专家.
Hire Now
Marko Dvečko

Marko Dvečko

验证专家 in Engineering

Zagreb, Croatia

2016年1月11日成为会员

作者简介

Marko有12年以上的工作经验. 他拥有四个Salesforce.他的兴趣是数学和函数式编程.

Read More
作者都是各自领域经过审查的专家,并撰写他们有经验的主题. 我们所有的内容都经过同行评审,并由同一领域的Toptal专家验证.

PREVIOUSLY AT

CloudSense

世界级的文章,每周发一次.

订阅意味着同意我们的 privacy policy

世界级的文章,每周发一次.

订阅意味着同意我们的 privacy policy

Toptal开发者

Join the Toptal® community.