実践Go言語(Effective Go)の翻訳、12回目です。
前回までの訳は実践Go言語[日本語訳]にまとめてあります。


並列性

通信による共有

並列プログラミングは大きなトピックではありますが、ここではGo言語の仕様の中で特徴的な部分だけを取り上げます。

一般的な環境における並列プログラミングは、共有変数にアクセスするためには適切な実装が厳密に求められるため扱いにくいものでした。Go言語では、共有変数をチャネル上で引き回し、実行中の異なるスレッドからは実際に同時アクセスさせないという今までとは異なる手法を推奨しています。あるタイミングで値にアクセスできるゴルーチンはひとつだけなので、設計上データ競合は起きえません。

共有メモリを使った通信は行わず、代わりに通信を使うことでメモリを共有するようにしてください。

この手法は、過剰となる側面もあります。参照カウンタであれば、たとえば整数型の変数とミューテックスを併用するほうが最適かもしれません。しかし、高水準な手法として、アクセス制御のためにチャネルを使うことは、明解で正確なプログラムを記述することをより容易くします。

このモデルについて考える手立てとして、1台のCPU上で動いている典型的なシングルスレッドプログラムがあると仮定します。これには同期プリミティブは不要です。同様に同期が不要であるインスタンスを新しく起動して、これらスレッド間を通信させます。この通信が同期していれば、それ以外の同期はとりあえず必要ありません。このモデルと一致する例としては、Unixのパイプラインが良く知られています。Go言語におけるこの並列性へのアプローチは、Hoare氏のCommunicating Sequential Processes (CSP)が基になっており、Unixパイプにおけるタイプセーフの汎用化にもこれが見受けられます。

ゴルーチン

ゴルーチンと呼ぶようにした理由は、スレッド、コルーチン、プロセスといった既存の用語では正確に伝わらないためです。ゴルーチンが持つモデルは単純で、その役割は同一アドレス空間内で他のゴルーチン同士を並列実行することです。またゴルーチンは軽量で、スタック空間を割り当てるより若干コストがかかる程度です。開始時のスタックサイズは節約のため小さく取り、必要に応じてヒープ領域を割り当て(自由に)拡張します。

ゴルーチンはOSの複数スレッド上へ多重化されるので、ひとつが例えばI/O待ちなどでブロックされていても、他のゴルーチンの実行は継続します。ゴルーチンの仕組みによってスレッドを作成・管理する複雑さの大部分が軽減されます。

関数またはメソッドの呼び出しを新規ゴルーチンとして実行するには、呼び出しの直前にキーワードgoと記述してください。呼び出しが完了した時点で、すでにゴルーチンは作成されています。(この効能はUnixシェルにてバックグラウンドでコマンドを実行するために使用する「&」表記とほぼ同じです。)

go list.Sort()  // list.Sortを並列実行する(処理の完了は待たない)

関数リテラルを使うと手軽にゴルーチンが実行できます。

func Announce(message string, delay int64) {
    go func() {
        time.Sleep(delay)
        fmt.Println(message)
    }()  // 括弧に注意。関数を呼び出すために必要。
}

Go言語における関数リテラルはクロージャです。このとき関数内から参照されている変数は関数が実行している間、生存しつづけます。

この例では関数が完了したことを通知する手立てがないため、あまり実践的ではありません。完了通知を行うにはチャネルを使います。

チャネル

マップと同じくチャネルは参照型であり、makeを使って割り当てられます。作成時にオプションの整数パラメータを指定すると、チャネルのバッファサイズとして使われます。この値のデフォルトはゼロであり、ゼロのときバッファリングは行われず同期チャネルとなります。

ci := make(chan int)            // 整数型のバッファなしチャネル
cj := make(chan int, 0)         // 整数型のバッファなしチャネル
cs := make(chan *os.File, 100)  // Fileへのポインタ型のバッファありチャネル

チャネルは、値の交換および同期という通信機能を兼ね備えており、2つの計算処理(ゴルーチン)が予期しない状態とならないことを保証します。

チャネルを利用した優れたイディオムはたくさんありますが、手始めにひとつ紹介します。前のセクションではソート処理をバックグラウンドで起動しましたが、これにチャネルを使って起動したゴルーチンのソート完了を待つよう変更できます。

c := make(chan int)  // チャネルの割り当て
// ゴルーチンとしてsortを起動。完了時にチャネルへ通知
go func() {
    list.Sort()
    c <- 1  // 通知を送信。値は何でも良い
}()
doSomethingForAWhile()
<-c   // sortの完了待ち。送られてきた値は破棄

受信側では常に、受信可能なデータが来るまでブロックされます。送信側はチャネルがバッファリングしていないときは、受信側が値を受信するまでブロックされます。チャネルがバッファリングしているときは、送信側がブロックされるのは値がバッファへコピーされる間だけです。このためバッファがいっぱいのときは、受信側で値を取り出すまで待機します。

スループットを制限するようなケースで、バッファありチャネルをセマフォのように使うことができます。下の例では、入って来たリクエストはhandleに渡されます。その中でまず値をチャネルへ送信し、リクエストを処理したのちチャネルから値を受信しています。チャネルのバッファが持つ容量を利用してprocessの同時呼び出し数を制限しています。

var sem = make(chan int, MaxOutstanding)

func handle(r *Request) {
    sem <- 1    // アクティブキューの空き待ち
    process(r)  // 時間のかかる処理
    <-sem       // 完了。次のリクエストを処理可能にする
}

func Serve(queue chan *Request) {
    for {
        req := <-queue
        go handle(req)  // handleの終了待ちはしない
    }
}

次も同様のアイデアで、一定数のhandleゴルーチンを起動しておき、その中でリクエストチャネルから全て読み込んでいます。ここではゴルーチンの数でprocessの同時呼び出し数を制限しています。このServe関数自体も、終了指示を受信するためのチャネルを引数として受け取り、ゴルーチンを起動したあと、このチャネルを受信することでブロックしています。

func handle(queue chan *Request) {
    for r := range queue {
        process(r)
    }
}

func Serve(clientRequests chan *clientRequests, quit chan bool) {
    // Start handlers
    for i := 0; i < MaxOutstanding; i++ {
        go handle(clientRequests)
    }
    <-quit  // 終了指示待ち
}

チャネル内のチャネル

Go言語の特徴のなかで重要なもののひとつは、チャネルがメモリを割り当てることができ、かつ他の値と同じように引き回すことができる「優れた値」であることです。この機能は一般的に並列、非多重化を安全に実装するために使われます。

前セクションの例におけるhandleはリクエストを処理するための理想的なハンドラではありましたが、そこで扱っていたRequest型は未定義でした。もしその型に応答用チャネルが含まれていれば、各クライアント側でそれぞれ処理結果を受信する経路を用意することができます。下はRequest型の定義の概略です。

type Request struct {
    args        []int
    f           func([]int) int
    resultChan  chan int
}

リクエストオブジェクト内の処理結果受信用チャネルと同様に、クライアント側で関数とその引数も準備します。

func sum(a []int) (s int) {
    for _, v := range a {
        s += v
    }
    return
}

request := &Request{[]int{3, 4, 5}, sum, make(chan int)}
// リクエスト送信
clientRequests <- request
// 応答待ち
fmt.Printf("answer: %d\n", <-request.resultChan)

サーバ側で変更するのは、ハンドラ関数だけです。

func handle(queue chan *Request) {
    for req := range queue {
        req.resultChan <- req.f(req.args)
    }
}

これを現実的なコードにするには、やるべきことが多々残されていることは明らかですが、このコードはあくまで帯域制限、平行処理、非ブロック型RPC機構を実装したフレームワークであり、相互排他は考慮していません。

並列化

これらの考え方のもう一つの応用は、複数のCPUコアすべてを使い計算を並列処理することです。計算処理を個々に分割できるのであれば、各計算の完了を伝えるチャネルを用意することで並列化可能です。

下のサンプルのように、アイテムを格納したVector上で実行される高コストの演算処理があり、またその各アイテムにて演算される値同士が、お互い関連性を持たないと仮定します。

type Vector []float64

// v[i], v[i+1] ... v[n-1]までを演算処理に適用
func (v Vector) DoSome(i, n int, u Vector, c chan int) {
    for ; i < n; i++ {
        v[i] += u.Op(v[i])
    }
    c <- 1    // この部分が終わったことを伝達
}

データをCPUコア数分割し、ループでコア毎に処理をひとつ起動します。演算処理が完了する順番は不定ですが、それは問題ではありません。単に全ゴルーチンを起動後、チャネルから受信した完了通知数をカウントするだけです。

const NCPU = 4  // CPUコア数

func (v Vector) DoAll(u Vector) {
    c := make(chan int, NCPU)  // 任意だが、バッファリングしたほうが賢明
    for i := 0; i < NCPU; i++ {
        go v.DoSome(i*len(v)/NCPU, (i+1)*len(v)/NCPU, u, c)
    }
    // チャネルを空に
    for i := 0; i < NCPU; i++ {
        <-c    // 処理の完了をひとつ待つ
    }
    // すべて完了
}

gc6g等)の現実装のデフォルトでは、このコードは各CPUコアへ並列化はされません。ユーザ処理に対してはコアをひとつしか使用しないようになっているためです。システムコールで任意の数のゴルーチンをブロックすることは可能ですが、デフォルトで実行されるユーザレベルコードは常にひとつだけです。これはもっと賢くあるべきで、また今後賢くしていく予定はありますが、そうなるまでCPUを並列処理させたいときはコードを同時実行したいゴルーチン数をランタイムに指定してください。この指定方法は2つあり、環境変数GOMAXPROCSに使用するコア数(デフォルト1)を設定してからジョブを実行するか、もしくはruntimeパッケージをインポートし、runtime.GOMAXPROCS(NCPU)を呼び出してください。また、スケジューリングとランタイムが改善されたときは、この設定は不要となる予定です。

溢れるバッファ

並列プログラミング向け機能を利用して、より実装が簡単な非並列処理を記述することもできます。下はRPCパッケージから取り出した例です。クライアント側のゴルーチンではループ内でどこかのソース(おそらくネットワーク)からデータを受信します。バッファの割り当て・開放を減らすために使わなくなったバッファはリストに格納しておきます。これを実現するためにバッファありチャネルを使い、このチャネルが空のときは新たにバッファを割り当てます。受信したメッセージをバッファに格納すると、そのバッファはserverChan経由でサーバへ送信されます。

var freeList = make(chan *Buffer, 100)
var serverChan = make(chan *Buffer)

func client() {
    for {
        b, ok := <-freeList  // バッファがあれば取得
        if !ok {              // なければ新たに割り当てる
            b = new(Buffer)
        }
        load(b)              // 次のメッセージをネットから読み込む
        serverChan <- b      // サーバへ送信
    }
}

サーバ側のループではクライアントからメッセージを受信し処理を行ったあと、バッファリストにバッファを返却します。

func server() {
    for {
        b := <-serverChan    // 作業待ち
        process(b)
        _ = freeList <- b    // 空きがあればバッファを再利用
    }
}

クライアントの非ブロック受信では、利用可能であればバッファリストからバッファを取得します。利用可能なバッファがないときは新しくバッファを割り当てます。サーバのfreeListへの非ブロック送信は、バッファリストがいっぱいでなければbfreeListに戻します。バッファが溢れたときはガーベジコレクタによって回収されます。(送信演算子をブランク識別子へ代入することで非ブロック送信となりますが、操作が成功したかどうかは無視されます。) この実装では、ある境界線で溢れてしまうバケツのような空きバッファリストを作成し、冗長なコードを記述する代わりにチャネルのバッファとガーベジコレクタ任せにしています。