広告

Java : Semaphore (セマフォ) - API使用例

Semaphore (Java SE 19 & JDK 19) の使用例まとめです。
だいたいのメソッドを網羅済みです。
API仕様のおともにどうぞ。


概要

計数セマフォです。 概念的に、セマフォはパーミットのセットを維持します。 各acquire()は、パーミットが利用可能になるまで必要に応じてブロックし、その後パーミットを取得します。 各release()はパーミットを追加し、場合によってはブロックしている取得側を解放します。

クラス構成

Semaphore は、あるリソースに対して同時にアクセスできるスレッド数を制限するためによく使われます。
使い方は、acquiretryAcquire メソッドで パーミット(ロック) を取得して、release メソッドで パーミット(ロック) を解放します。

ただし、Semaphore が本当に必要かどうかはよく検討したほうがよいでしょう。
パーミットの管理をきちんとしないと、デッドロックなどのスレッド特有の難解な不具合が発生する可能性があります。

Executors.newFixedThreadPool などで、そもそも使うスレッド数を制限したほうがスマートになるかもしれません。

下記のコードは、Semaphore を使って、5つのスレッドに対して同時アクセス数を3つに制限する例です。

final var semaphore = new Semaphore(3);
System.out.println("permits = " + semaphore.availablePermits());

System.out.println("-- start --");
try (final var executor = Executors.newFixedThreadPool(5)) {
    for (int i = 1; i <= 5; i++) {
        final var id = i;
        TimeUnit.MILLISECONDS.sleep(100);

        executor.submit(() -> {
            try {
                System.out.println(id + " : try acquire ...");
                semaphore.acquire();
                try {
                    System.out.println(id + " : OK! : permits = " + semaphore.availablePermits());

                    TimeUnit.SECONDS.sleep(3);
                    System.out.println(id + " : sleep completed (release!)");
                } finally {
                    semaphore.release();
                }
            } catch (InterruptedException e) {
                System.out.println("InterruptedException!");
            }
        });
    }
}

System.out.println("-- end --");
System.out.println("permits = " + semaphore.availablePermits());

// 結果
// ↓
//permits = 3
//-- start --
//1 : try acquire ...
//1 : OK! : permits = 2
//2 : try acquire ...
//2 : OK! : permits = 1
//3 : try acquire ...
//3 : OK! : permits = 0
//4 : try acquire ...
//5 : try acquire ...
//1 : sleep completed (release!)
//4 : OK! : permits = 0
//2 : sleep completed (release!)
//5 : OK! : permits = 0
//3 : sleep completed (release!)
//4 : sleep completed (release!)
//5 : sleep completed (release!)
//-- end --
//permits = 3

関連記事 :


コンストラクタ

Semaphore (int permits)

指定された数のパーミットと不公平な公平性設定を使用して、Semaphoreを作成します。

このメソッドの使用例は、acquire() にまとめて記載しました。
そちらのAPI使用例をご参照ください。

Semaphore (int permits, boolean fair)

指定された数のパーミットと指定された公平性設定を使用して、Semaphoreを作成します。

permits パラメータについては acquire() の使用例をご参照ください。

void testFair(boolean fair) throws InterruptedException {
    final var semaphore = new Semaphore(5, fair);
    System.out.println("isFair = " + semaphore.isFair());
    System.out.println("permits = " + semaphore.availablePermits());

    try (final var executor = Executors.newFixedThreadPool(3)) {
        class Task implements Runnable {
            private final String name;
            private final int numOfPermits;

            Task(String name, int numOfPermits) {
                this.name = name;
                this.numOfPermits = numOfPermits;
            }

            @Override
            public void run() {
                System.out.printf("%s : try acquire ... (%d)%n", name, numOfPermits);
                semaphore.acquireUninterruptibly(numOfPermits);
                try {
                    System.out.printf("%s : OK! (%d)%n", name, numOfPermits);

                    TimeUnit.SECONDS.sleep(1);
                    System.out.printf("%s : release! (%d)%n", name, numOfPermits);
                } catch (InterruptedException e) {
                    System.out.println("InterruptedException!");
                } finally {
                    semaphore.release(numOfPermits);
                }
            }
        }

        executor.submit(new Task("A", 4));
        TimeUnit.MILLISECONDS.sleep(100);

        executor.submit(new Task("B", 5));
        TimeUnit.MILLISECONDS.sleep(100);

        executor.submit(new Task("C", 1));
    }

    System.out.println("permits = " + semaphore.availablePermits());
}
testFair(true);

// 結果
// ↓
//isFair = true
//permits = 5
//A : try acquire ... (4)
//A : OK! (4)
//B : try acquire ... (5)
//C : try acquire ... (1)
//A : release! (4)
//B : OK! (5)
//B : release! (5)
//C : OK! (1)
//C : release! (1)
//permits = 5

testFair(false);

// 結果
// ↓
//isFair = false
//permits = 5
//A : try acquire ... (4)
//A : OK! (4)
//B : try acquire ... (5)
//C : try acquire ... (1)
//C : OK! (1)
//A : release! (4)
//C : release! (1)
//B : OK! (5)
//B : release! (5)
//permits = 5

メソッド

void acquire ()

このセマフォからパーミットを取得します。パーミットが利用可能になるか、またはスレッドが割り込みされるまでブロックします。

final var semaphore = new Semaphore(3);
System.out.println("permits = " + semaphore.availablePermits());

System.out.println("-- start --");
try (final var executor = Executors.newFixedThreadPool(5)) {
    for (int i = 1; i <= 5; i++) {
        final var id = i;
        TimeUnit.MILLISECONDS.sleep(100);

        executor.submit(() -> {
            try {
                System.out.println(id + " : try acquire ...");
                semaphore.acquire();
                try {
                    System.out.println(id + " : OK! : permits = " + semaphore.availablePermits());

                    TimeUnit.SECONDS.sleep(3);
                    System.out.println(id + " : sleep completed (release!)");
                } finally {
                    semaphore.release();
                }
            } catch (InterruptedException e) {
                System.out.println("InterruptedException!");
            }
        });
    }
}

System.out.println("-- end --");
System.out.println("permits = " + semaphore.availablePermits());

// 結果
// ↓
//permits = 3
//-- start --
//1 : try acquire ...
//1 : OK! : permits = 2
//2 : try acquire ...
//2 : OK! : permits = 1
//3 : try acquire ...
//3 : OK! : permits = 0
//4 : try acquire ...
//5 : try acquire ...
//1 : sleep completed (release!)
//4 : OK! : permits = 0
//2 : sleep completed (release!)
//5 : OK! : permits = 0
//3 : sleep completed (release!)
//4 : sleep completed (release!)
//5 : sleep completed (release!)
//-- end --
//permits = 3

void acquire (int permits)

このセマフォから指定された数のパーミットを取得します。すべてのパーミットが利用可能になるか、またはスレッドが割り込みされるまでブロックします。

final var semaphore = new Semaphore(7);
try (final var executor = Executors.newFixedThreadPool(5)) {
    for (int i = 1; i <= 5; i++) {
        final var id = i;
        TimeUnit.MILLISECONDS.sleep(100);

        executor.submit(() -> {
            try {
                System.out.println(id + " : try acquire ... : permits = "
                        + semaphore.availablePermits());
                semaphore.acquire(2);
                try {
                    System.out.println(id + " : OK! : permits = "
                            + semaphore.availablePermits());

                    TimeUnit.SECONDS.sleep(3);
                    System.out.println(id + " : sleep completed (release!)");
                } finally {
                    semaphore.release(2);
                }
            } catch (InterruptedException e) {
                System.out.println("InterruptedException!");
            }
        });
    }
}

System.out.println("-- all tasks end --");
System.out.println("available : " + semaphore.availablePermits());

// 結果
// ↓
//1 : try acquire ... : permits = 7
//1 : OK! : permits = 5
//2 : try acquire ... : permits = 5
//2 : OK! : permits = 3
//3 : try acquire ... : permits = 3
//3 : OK! : permits = 1
//4 : try acquire ... : permits = 1
//5 : try acquire ... : permits = 1
//1 : sleep completed (release!)
//4 : OK! : permits = 1
//2 : sleep completed (release!)
//5 : OK! : permits = 1
//3 : sleep completed (release!)
//4 : sleep completed (release!)
//5 : sleep completed (release!)
//-- all tasks end --
//available : 7

void acquireUninterruptibly ()

このセマフォからパーミットを取得します。パーミットが利用可能になるまでブロックします。

InterruptedException が発生しない、ということ以外については acquire() の使用例をご参照ください。

final var semaphore = new Semaphore(0);

try (final var executor = Executors.newSingleThreadExecutor()) {
    try {
        final var future = executor.submit(() -> {
            System.out.println("try acquire ... : permits = " + semaphore.availablePermits());
            semaphore.acquireUninterruptibly();
            System.out.println("acquire OK!");
            System.out.println("isInterrupted : " + Thread.currentThread().isInterrupted());
        });

        TimeUnit.SECONDS.sleep(1);

        final var ret = future.cancel(true);
        System.out.println("cancel = " + ret);
        System.out.println("release!");
    } finally {
        semaphore.release();
    }
}

System.out.println("------");
System.out.println("permits = " + semaphore.availablePermits());

// 結果
// ↓
//try acquire ... : permits = 0
//cancel = true
//release!
//acquire OK!
//isInterrupted : true
//------
//permits = 0
final var semaphore = new Semaphore(0);

try (final var executor = Executors.newSingleThreadExecutor()) {
    final var future = executor.submit(() -> {
        try {
            System.out.println("try acquire ... : permits = " + semaphore.availablePermits());
            semaphore.acquire();
            System.out.println("acquire OK!");
        } catch (InterruptedException e) {
            System.out.println("InterruptedException!");
        }
    });

    TimeUnit.SECONDS.sleep(1);

    final var ret = future.cancel(true);
    System.out.println("cancel = " + ret);
}

System.out.println("------");
System.out.println("permits = " + semaphore.availablePermits());

// 結果
// ↓
//try acquire ... : permits = 0
//InterruptedException!
//cancel = true
//------
//permits = 0

void acquireUninterruptibly (int permits)

このセマフォから指定された数のパーミットを取得します。すべてのパーミットが利用可能になるまでブロックします。

InterruptedException が発生しない、ということ以外については acquire(int permits) の使用例をご参照ください。

final var semaphore = new Semaphore(0);

try (final var executor = Executors.newSingleThreadExecutor()) {
    try {
        final var future = executor.submit(() -> {
            System.out.println("try acquire ... : permits = " + semaphore.availablePermits());
            semaphore.acquireUninterruptibly(2);
            System.out.println("acquire OK!");
            System.out.println("isInterrupted : " + Thread.currentThread().isInterrupted());
        });

        TimeUnit.SECONDS.sleep(1);

        final var ret = future.cancel(true);
        System.out.println("cancel = " + ret);
        System.out.println("release!");
    } finally {
        semaphore.release(2);
    }
}

System.out.println("------");
System.out.println("permits = " + semaphore.availablePermits());

// 結果
// ↓
//try acquire ... : permits = 0
//cancel = true
//release!
//acquire OK!
//isInterrupted : true
//------
//permits = 0
final var semaphore = new Semaphore(0);

try (final var executor = Executors.newSingleThreadExecutor()) {
    final var future = executor.submit(() -> {
        try {
            System.out.println("try acquire ... : permits = " + semaphore.availablePermits());
            semaphore.acquire(2);
            System.out.println("acquire OK!");
        } catch (InterruptedException e) {
            System.out.println("InterruptedException!");
        }
    });

    TimeUnit.SECONDS.sleep(1);

    final var ret = future.cancel(true);
    System.out.println("cancel = " + ret);
}

System.out.println("------");
System.out.println("permits = " + semaphore.availablePermits());

// 結果
// ↓
//try acquire ... : permits = 0
//InterruptedException!
//cancel = true
//------
//permits = 0

int availablePermits ()

このセマフォで現在利用可能なパーミットの数を返します。

このメソッドの使用例は、acquire() にまとめて記載しました。
そちらのAPI使用例をご参照ください。

int drainPermits ()

すぐに利用可能なすべての許可を取得して返します。または、否定的な許可が利用可能な場合は、それらを解放します。

final var semaphore = new Semaphore(5);
System.out.println(semaphore.availablePermits()); // 5

final var ret = semaphore.drainPermits();
System.out.println(ret); // 5

System.out.println(semaphore.availablePermits()); // 0
final var semaphore = new Semaphore(-3);
System.out.println(semaphore.availablePermits()); // -3

final var ret = semaphore.drainPermits();
System.out.println(ret); // -3

System.out.println(semaphore.availablePermits()); // 0

protected Collection<Thread> getQueuedThreads ()

パーミットの取得を待機しているスレッドを含むコレクションを返します。

protectedです。
独自にサブクラスを作ることは少ないと思いますので、コード例は割愛します。

final int getQueueLength ()

パーミットの取得を待機しているスレッドの推定数を返します。

final var semaphore = new Semaphore(2);

try (final var executor = Executors.newFixedThreadPool(6)) {
    for (int i = 1; i <= 6; i++) {
        final var id = i;
        TimeUnit.MILLISECONDS.sleep(100);

        executor.submit(() -> {
            try {
                System.out.printf("%d : try acquire ... : queue length = %d (%b)%n",
                        id, semaphore.getQueueLength(), semaphore.hasQueuedThreads());
                semaphore.acquire();
                try {
                    System.out.printf("%d : OK! : queue length = %d%n",
                            id, semaphore.getQueueLength());

                    TimeUnit.SECONDS.sleep(3);
                    System.out.printf("%d : sleep completed (release!)%n", id);
                } finally {
                    semaphore.release();
                }
            } catch (InterruptedException e) {
                System.out.println("InterruptedException!");
            }
        });
    }
}

// 結果
// ↓
//1 : try acquire ... : queue length = 0 (false)
//1 : OK! : queue length = 0
//2 : try acquire ... : queue length = 0 (false)
//2 : OK! : queue length = 0
//3 : try acquire ... : queue length = 0 (false)
//4 : try acquire ... : queue length = 1 (true)
//5 : try acquire ... : queue length = 2 (true)
//6 : try acquire ... : queue length = 3 (true)
//1 : sleep completed (release!)
//3 : OK! : queue length = 3
//2 : sleep completed (release!)
//4 : OK! : queue length = 2
//3 : sleep completed (release!)
//5 : OK! : queue length = 1
//4 : sleep completed (release!)
//6 : OK! : queue length = 0
//5 : sleep completed (release!)
//6 : sleep completed (release!)

final boolean hasQueuedThreads ()

パーミットの取得を待機中のスレッドが存在するかどうかを照会します。

このメソッドの使用例は、getQueueLength() にまとめて記載しました。
そちらのAPI使用例をご参照ください。

boolean isFair ()

このセマフォで公平性がtrueに設定されている場合はtrueを返します。

このメソッドの使用例は、Semaphore(int permits, boolean fair) にまとめて記載しました。
そちらのAPI使用例をご参照ください。

protected void reducePermits (int reduction)

指定されたreductionの数だけ利用可能なパーミットの数を減らします。

protectedです。
独自にサブクラスを作ることは少ないと思いますので、コード例は割愛します。

void release ()

パーミットを解放し、セマフォに戻します。

このメソッドの使用例は、acquire() にまとめて記載しました。
そちらのAPI使用例をご参照ください。

void release (int permits)

指定された数のパーミットを解放し、セマフォに戻します。

このメソッドの使用例は、acquire(int permits) にまとめて記載しました。
そちらのAPI使用例をご参照ください。

String toString ()

セマフォおよびその状態を識別する文字列を返します。

final var semaphore = new Semaphore(5);

final var str1 = semaphore.toString();
System.out.println(str1); // java.util.concurrent.Semaphore@503f91c3[Permits = 5]

semaphore.drainPermits();

final var str2 = semaphore.toString();
System.out.println(str2); // java.util.concurrent.Semaphore@503f91c3[Permits = 0]

boolean tryAcquire ()

パーミットが呼出し時に利用可能な場合に限り、このセマフォからパーミットを取得します。

final var semaphore = new Semaphore(3);
try (final var executor = Executors.newFixedThreadPool(5)) {
    for (int i = 1; i <= 5; i++) {
        final var id = i;
        TimeUnit.MILLISECONDS.sleep(100);

        executor.submit(() -> {
            if (semaphore.tryAcquire()) {
                try {
                    System.out.printf("%d : try acquire = true : permits = %d%n",
                            id, semaphore.availablePermits());

                    TimeUnit.SECONDS.sleep(3);
                    System.out.printf("%d : sleep completed (release!)%n", id);
                } catch (InterruptedException e) {
                    System.out.println("InterruptedException!");
                } finally {
                    semaphore.release();
                }
            } else {
                System.out.printf("%d : try acquire = false : permits = %d%n",
                        id, semaphore.availablePermits());
            }
        });
    }
}

System.out.println("-- all tasks end --");
System.out.println("permits : " + semaphore.availablePermits());

// 結果
// ↓
//1 : try acquire = true : permits = 2
//2 : try acquire = true : permits = 1
//3 : try acquire = true : permits = 0
//4 : try acquire = false : permits = 0
//5 : try acquire = false : permits = 0
//1 : sleep completed (release!)
//2 : sleep completed (release!)
//3 : sleep completed (release!)
//-- all tasks end --
//permits : 3

boolean tryAcquire (int permits)

指定された数のパーミットが呼出し時に利用可能な場合に限り、それらすべてのパーミットを取得します。

final var semaphore = new Semaphore(6);
try (final var executor = Executors.newFixedThreadPool(5)) {
    for (int i = 1; i <= 5; i++) {
        final var id = i;
        TimeUnit.MILLISECONDS.sleep(100);

        executor.submit(() -> {
            if (semaphore.tryAcquire(2)) {
                try {
                    System.out.printf("%d : try acquire = true : permits = %d%n",
                            id, semaphore.availablePermits());

                    TimeUnit.SECONDS.sleep(3);
                    System.out.printf("%d : sleep completed (release!)%n", id);
                } catch (InterruptedException e) {
                    System.out.println("InterruptedException!");
                } finally {
                    semaphore.release(2);
                }
            } else {
                System.out.printf("%d : try acquire = false : permits = %d%n",
                        id, semaphore.availablePermits());
            }
        });
    }
}

System.out.println("-- all tasks end --");
System.out.println("permits : " + semaphore.availablePermits());

// 結果
// ↓
//1 : try acquire = true : permits = 4
//2 : try acquire = true : permits = 2
//3 : try acquire = true : permits = 0
//4 : try acquire = false : permits = 0
//5 : try acquire = false : permits = 0
//1 : sleep completed (release!)
//2 : sleep completed (release!)
//3 : sleep completed (release!)
//-- all tasks end --
//permits : 6

boolean tryAcquire (int permits, long timeout, TimeUnit unit)

指定された待機時間内で指定された数のパーミットが利用可能であり、現在のスレッドで割り込みが発生していない場合に、このセマフォから指定された数のパーミットを取得します。

timeoutunit 以外のパラメータについては tryAcquire(int permits) の使用例をご参照ください。

// 基準となる時刻
final var current = System.nanoTime();

// 基準となる時刻からの差分を秒として取得
final DoubleSupplier elapsedTime = () -> (System.nanoTime() - current) / 1000000000.0;

final var semaphore = new Semaphore(2);
try (final var executor = Executors.newFixedThreadPool(3)) {

    class Task implements Runnable {
        private final String id;
        private final long timeout;

        Task(String id, long timeout) {
            this.id = id;
            this.timeout = timeout;
        }

        @Override
        public void run() {
            System.out.printf("%s : start : timeout = %d sec. : permits = %d%n",
                    id, timeout, semaphore.availablePermits());
            try {
                if (semaphore.tryAcquire(2, timeout, TimeUnit.SECONDS)) {
                    try {
                        System.out.printf("%s : try acquire = true (%f sec.)%n",
                                id, elapsedTime.getAsDouble());

                        TimeUnit.SECONDS.sleep(3);
                        System.out.printf("%s : sleep completed : release! (%f sec.)%n",
                                id, elapsedTime.getAsDouble());
                    } finally {
                        semaphore.release(2);
                    }
                } else {
                    System.out.printf("%s : try acquire = false : Timeout! (%f sec.)%n",
                            id, elapsedTime.getAsDouble());
                }
            } catch (InterruptedException e) {
                System.out.println("InterruptedException!");
            }
        }
    }

    executor.submit(new Task("A", 1));
    TimeUnit.MILLISECONDS.sleep(100);

    executor.submit(new Task("B", 2));
    TimeUnit.MILLISECONDS.sleep(100);

    executor.submit(new Task("C", 5));
}

System.out.println("-- all tasks end --");
System.out.println("permits : " + semaphore.availablePermits());

// 結果
// ↓
//A : start : timeout = 1 sec. : permits = 2
//A : try acquire = true (0.003661 sec.)
//B : start : timeout = 2 sec. : permits = 0
//C : start : timeout = 5 sec. : permits = 0
//B : try acquire = false : Timeout! (2.126545 sec.)
//A : sleep completed : release! (3.017536 sec.)
//C : try acquire = true (3.018140 sec.)
//C : sleep completed : release! (6.027912 sec.)
//-- all tasks end --
//permits : 2

boolean tryAcquire (long timeout, TimeUnit unit)

指定された待機時間内でパーミットが利用可能になり、現在のスレッドで割り込みが発生していない場合に、このセマフォからパーミットを取得します。

timeoutunit パラメータ以外については tryAcquire() の使用例をご参照ください。

// 基準となる時刻
final var current = System.nanoTime();

// 基準となる時刻からの差分を秒として取得
final DoubleSupplier elapsedTime = () -> (System.nanoTime() - current) / 1000000000.0;

final var semaphore = new Semaphore(1);
try (final var executor = Executors.newFixedThreadPool(3)) {

    class Task implements Runnable {
        private final String id;
        private final long timeout;

        Task(String id, long timeout) {
            this.id = id;
            this.timeout = timeout;
        }

        @Override
        public void run() {
            System.out.printf("%s : start : timeout = %d sec. : permits = %d%n",
                    id, timeout, semaphore.availablePermits());
            try {
                if (semaphore.tryAcquire(timeout, TimeUnit.SECONDS)) {
                    try {
                        System.out.printf("%s : try acquire = true (%f sec.)%n",
                                id, elapsedTime.getAsDouble());

                        TimeUnit.SECONDS.sleep(3);
                        System.out.printf("%s : sleep completed : release! (%f sec.)%n",
                                id, elapsedTime.getAsDouble());
                    } finally {
                        semaphore.release();
                    }
                } else {
                    System.out.printf("%s : try acquire = false : Timeout! (%f sec.)%n",
                            id, elapsedTime.getAsDouble());
                }
            } catch (InterruptedException e) {
                System.out.println("InterruptedException!");
            }
        }
    }

    executor.submit(new Task("A", 1));
    TimeUnit.MILLISECONDS.sleep(100);

    executor.submit(new Task("B", 2));
    TimeUnit.MILLISECONDS.sleep(100);

    executor.submit(new Task("C", 5));
}

System.out.println("-- all tasks end --");
System.out.println("permits : " + semaphore.availablePermits());

// 結果
// ↓
//A : start : timeout = 1 sec. : permits = 1
//A : try acquire = true (0.003326 sec.)
//B : start : timeout = 2 sec. : permits = 0
//C : start : timeout = 5 sec. : permits = 0
//B : try acquire = false : Timeout! (2.116379 sec.)
//A : sleep completed : release! (3.007164 sec.)
//C : try acquire = true (3.007721 sec.)
//C : sleep completed : release! (6.021869 sec.)
//-- all tasks end --
//permits : 1

関連記事

ページの先頭へ