Java : Semaphore (セマフォ) - API使用例
Semaphore (Java SE 19 & JDK 19) の使用例まとめです。
だいたいのメソッドを網羅済みです。
API仕様のおともにどうぞ。
概要
Semaphore は、あるリソースに対して同時にアクセスできるスレッド数を制限するためによく使われます。
使い方は、acquire や tryAcquire メソッドで パーミット(ロック) を取得して、release メソッドで パーミット(ロック) を解放します。
ただし、Semaphore が本当に必要かどうかはよく検討したほうがよいでしょう。
パーミットの管理をきちんとしないと、デッドロックなどのスレッド特有の難解な不具合が発生する可能性があります。
Executors.newFixedThreadPool などで、そもそも使うスレッド数を制限したほうがスマートになるかもしれません。
下記のコードは、Semaphore を使って、5つのスレッドに対して同時アクセス数を3つに制限する例です。
final var semaphore = new Semaphore(3);
System.out.println("permits = " + semaphore.availablePermits());
System.out.println("-- start --");
try (final var executor = Executors.newFixedThreadPool(5)) {
for (int i = 1; i <= 5; i++) {
final var id = i;
TimeUnit.MILLISECONDS.sleep(100);
executor.submit(() -> {
try {
System.out.println(id + " : try acquire ...");
semaphore.acquire();
try {
System.out.println(id + " : OK! : permits = " + semaphore.availablePermits());
TimeUnit.SECONDS.sleep(3);
System.out.println(id + " : sleep completed (release!)");
} finally {
semaphore.release();
}
} catch (InterruptedException e) {
System.out.println("InterruptedException!");
}
});
}
}
System.out.println("-- end --");
System.out.println("permits = " + semaphore.availablePermits());
// 結果
// ↓
//permits = 3
//-- start --
//1 : try acquire ...
//1 : OK! : permits = 2
//2 : try acquire ...
//2 : OK! : permits = 1
//3 : try acquire ...
//3 : OK! : permits = 0
//4 : try acquire ...
//5 : try acquire ...
//1 : sleep completed (release!)
//4 : OK! : permits = 0
//2 : sleep completed (release!)
//5 : OK! : permits = 0
//3 : sleep completed (release!)
//4 : sleep completed (release!)
//5 : sleep completed (release!)
//-- end --
//permits = 3
関連記事 :
コンストラクタ
Semaphore (int permits)
このメソッドの使用例は、acquire() にまとめて記載しました。
そちらのAPI使用例をご参照ください。
Semaphore (int permits, boolean fair)
permits パラメータについては acquire() の使用例をご参照ください。
void testFair(boolean fair) throws InterruptedException {
final var semaphore = new Semaphore(5, fair);
System.out.println("isFair = " + semaphore.isFair());
System.out.println("permits = " + semaphore.availablePermits());
try (final var executor = Executors.newFixedThreadPool(3)) {
class Task implements Runnable {
private final String name;
private final int numOfPermits;
Task(String name, int numOfPermits) {
this.name = name;
this.numOfPermits = numOfPermits;
}
@Override
public void run() {
System.out.printf("%s : try acquire ... (%d)%n", name, numOfPermits);
semaphore.acquireUninterruptibly(numOfPermits);
try {
System.out.printf("%s : OK! (%d)%n", name, numOfPermits);
TimeUnit.SECONDS.sleep(1);
System.out.printf("%s : release! (%d)%n", name, numOfPermits);
} catch (InterruptedException e) {
System.out.println("InterruptedException!");
} finally {
semaphore.release(numOfPermits);
}
}
}
executor.submit(new Task("A", 4));
TimeUnit.MILLISECONDS.sleep(100);
executor.submit(new Task("B", 5));
TimeUnit.MILLISECONDS.sleep(100);
executor.submit(new Task("C", 1));
}
System.out.println("permits = " + semaphore.availablePermits());
}
testFair(true);
// 結果
// ↓
//isFair = true
//permits = 5
//A : try acquire ... (4)
//A : OK! (4)
//B : try acquire ... (5)
//C : try acquire ... (1)
//A : release! (4)
//B : OK! (5)
//B : release! (5)
//C : OK! (1)
//C : release! (1)
//permits = 5
testFair(false);
// 結果
// ↓
//isFair = false
//permits = 5
//A : try acquire ... (4)
//A : OK! (4)
//B : try acquire ... (5)
//C : try acquire ... (1)
//C : OK! (1)
//A : release! (4)
//C : release! (1)
//B : OK! (5)
//B : release! (5)
//permits = 5
メソッド
void acquire ()
final var semaphore = new Semaphore(3);
System.out.println("permits = " + semaphore.availablePermits());
System.out.println("-- start --");
try (final var executor = Executors.newFixedThreadPool(5)) {
for (int i = 1; i <= 5; i++) {
final var id = i;
TimeUnit.MILLISECONDS.sleep(100);
executor.submit(() -> {
try {
System.out.println(id + " : try acquire ...");
semaphore.acquire();
try {
System.out.println(id + " : OK! : permits = " + semaphore.availablePermits());
TimeUnit.SECONDS.sleep(3);
System.out.println(id + " : sleep completed (release!)");
} finally {
semaphore.release();
}
} catch (InterruptedException e) {
System.out.println("InterruptedException!");
}
});
}
}
System.out.println("-- end --");
System.out.println("permits = " + semaphore.availablePermits());
// 結果
// ↓
//permits = 3
//-- start --
//1 : try acquire ...
//1 : OK! : permits = 2
//2 : try acquire ...
//2 : OK! : permits = 1
//3 : try acquire ...
//3 : OK! : permits = 0
//4 : try acquire ...
//5 : try acquire ...
//1 : sleep completed (release!)
//4 : OK! : permits = 0
//2 : sleep completed (release!)
//5 : OK! : permits = 0
//3 : sleep completed (release!)
//4 : sleep completed (release!)
//5 : sleep completed (release!)
//-- end --
//permits = 3
void acquire (int permits)
final var semaphore = new Semaphore(7);
try (final var executor = Executors.newFixedThreadPool(5)) {
for (int i = 1; i <= 5; i++) {
final var id = i;
TimeUnit.MILLISECONDS.sleep(100);
executor.submit(() -> {
try {
System.out.println(id + " : try acquire ... : permits = "
+ semaphore.availablePermits());
semaphore.acquire(2);
try {
System.out.println(id + " : OK! : permits = "
+ semaphore.availablePermits());
TimeUnit.SECONDS.sleep(3);
System.out.println(id + " : sleep completed (release!)");
} finally {
semaphore.release(2);
}
} catch (InterruptedException e) {
System.out.println("InterruptedException!");
}
});
}
}
System.out.println("-- all tasks end --");
System.out.println("available : " + semaphore.availablePermits());
// 結果
// ↓
//1 : try acquire ... : permits = 7
//1 : OK! : permits = 5
//2 : try acquire ... : permits = 5
//2 : OK! : permits = 3
//3 : try acquire ... : permits = 3
//3 : OK! : permits = 1
//4 : try acquire ... : permits = 1
//5 : try acquire ... : permits = 1
//1 : sleep completed (release!)
//4 : OK! : permits = 1
//2 : sleep completed (release!)
//5 : OK! : permits = 1
//3 : sleep completed (release!)
//4 : sleep completed (release!)
//5 : sleep completed (release!)
//-- all tasks end --
//available : 7
void acquireUninterruptibly ()
InterruptedException が発生しない、ということ以外については acquire() の使用例をご参照ください。
final var semaphore = new Semaphore(0);
try (final var executor = Executors.newSingleThreadExecutor()) {
try {
final var future = executor.submit(() -> {
System.out.println("try acquire ... : permits = " + semaphore.availablePermits());
semaphore.acquireUninterruptibly();
System.out.println("acquire OK!");
System.out.println("isInterrupted : " + Thread.currentThread().isInterrupted());
});
TimeUnit.SECONDS.sleep(1);
final var ret = future.cancel(true);
System.out.println("cancel = " + ret);
System.out.println("release!");
} finally {
semaphore.release();
}
}
System.out.println("------");
System.out.println("permits = " + semaphore.availablePermits());
// 結果
// ↓
//try acquire ... : permits = 0
//cancel = true
//release!
//acquire OK!
//isInterrupted : true
//------
//permits = 0
final var semaphore = new Semaphore(0);
try (final var executor = Executors.newSingleThreadExecutor()) {
final var future = executor.submit(() -> {
try {
System.out.println("try acquire ... : permits = " + semaphore.availablePermits());
semaphore.acquire();
System.out.println("acquire OK!");
} catch (InterruptedException e) {
System.out.println("InterruptedException!");
}
});
TimeUnit.SECONDS.sleep(1);
final var ret = future.cancel(true);
System.out.println("cancel = " + ret);
}
System.out.println("------");
System.out.println("permits = " + semaphore.availablePermits());
// 結果
// ↓
//try acquire ... : permits = 0
//InterruptedException!
//cancel = true
//------
//permits = 0
void acquireUninterruptibly (int permits)
InterruptedException が発生しない、ということ以外については acquire(int permits) の使用例をご参照ください。
final var semaphore = new Semaphore(0);
try (final var executor = Executors.newSingleThreadExecutor()) {
try {
final var future = executor.submit(() -> {
System.out.println("try acquire ... : permits = " + semaphore.availablePermits());
semaphore.acquireUninterruptibly(2);
System.out.println("acquire OK!");
System.out.println("isInterrupted : " + Thread.currentThread().isInterrupted());
});
TimeUnit.SECONDS.sleep(1);
final var ret = future.cancel(true);
System.out.println("cancel = " + ret);
System.out.println("release!");
} finally {
semaphore.release(2);
}
}
System.out.println("------");
System.out.println("permits = " + semaphore.availablePermits());
// 結果
// ↓
//try acquire ... : permits = 0
//cancel = true
//release!
//acquire OK!
//isInterrupted : true
//------
//permits = 0
final var semaphore = new Semaphore(0);
try (final var executor = Executors.newSingleThreadExecutor()) {
final var future = executor.submit(() -> {
try {
System.out.println("try acquire ... : permits = " + semaphore.availablePermits());
semaphore.acquire(2);
System.out.println("acquire OK!");
} catch (InterruptedException e) {
System.out.println("InterruptedException!");
}
});
TimeUnit.SECONDS.sleep(1);
final var ret = future.cancel(true);
System.out.println("cancel = " + ret);
}
System.out.println("------");
System.out.println("permits = " + semaphore.availablePermits());
// 結果
// ↓
//try acquire ... : permits = 0
//InterruptedException!
//cancel = true
//------
//permits = 0
int availablePermits ()
このメソッドの使用例は、acquire() にまとめて記載しました。
そちらのAPI使用例をご参照ください。
int drainPermits ()
final var semaphore = new Semaphore(5);
System.out.println(semaphore.availablePermits()); // 5
final var ret = semaphore.drainPermits();
System.out.println(ret); // 5
System.out.println(semaphore.availablePermits()); // 0
final var semaphore = new Semaphore(-3);
System.out.println(semaphore.availablePermits()); // -3
final var ret = semaphore.drainPermits();
System.out.println(ret); // -3
System.out.println(semaphore.availablePermits()); // 0
protected Collection<Thread> getQueuedThreads ()
protectedです。
独自にサブクラスを作ることは少ないと思いますので、コード例は割愛します。
final int getQueueLength ()
final var semaphore = new Semaphore(2);
try (final var executor = Executors.newFixedThreadPool(6)) {
for (int i = 1; i <= 6; i++) {
final var id = i;
TimeUnit.MILLISECONDS.sleep(100);
executor.submit(() -> {
try {
System.out.printf("%d : try acquire ... : queue length = %d (%b)%n",
id, semaphore.getQueueLength(), semaphore.hasQueuedThreads());
semaphore.acquire();
try {
System.out.printf("%d : OK! : queue length = %d%n",
id, semaphore.getQueueLength());
TimeUnit.SECONDS.sleep(3);
System.out.printf("%d : sleep completed (release!)%n", id);
} finally {
semaphore.release();
}
} catch (InterruptedException e) {
System.out.println("InterruptedException!");
}
});
}
}
// 結果
// ↓
//1 : try acquire ... : queue length = 0 (false)
//1 : OK! : queue length = 0
//2 : try acquire ... : queue length = 0 (false)
//2 : OK! : queue length = 0
//3 : try acquire ... : queue length = 0 (false)
//4 : try acquire ... : queue length = 1 (true)
//5 : try acquire ... : queue length = 2 (true)
//6 : try acquire ... : queue length = 3 (true)
//1 : sleep completed (release!)
//3 : OK! : queue length = 3
//2 : sleep completed (release!)
//4 : OK! : queue length = 2
//3 : sleep completed (release!)
//5 : OK! : queue length = 1
//4 : sleep completed (release!)
//6 : OK! : queue length = 0
//5 : sleep completed (release!)
//6 : sleep completed (release!)
final boolean hasQueuedThreads ()
このメソッドの使用例は、getQueueLength() にまとめて記載しました。
そちらのAPI使用例をご参照ください。
boolean isFair ()
このメソッドの使用例は、Semaphore(int permits, boolean fair) にまとめて記載しました。
そちらのAPI使用例をご参照ください。
protected void reducePermits (int reduction)
protectedです。
独自にサブクラスを作ることは少ないと思いますので、コード例は割愛します。
void release ()
このメソッドの使用例は、acquire() にまとめて記載しました。
そちらのAPI使用例をご参照ください。
void release (int permits)
このメソッドの使用例は、acquire(int permits) にまとめて記載しました。
そちらのAPI使用例をご参照ください。
String toString ()
final var semaphore = new Semaphore(5);
final var str1 = semaphore.toString();
System.out.println(str1); // java.util.concurrent.Semaphore@503f91c3[Permits = 5]
semaphore.drainPermits();
final var str2 = semaphore.toString();
System.out.println(str2); // java.util.concurrent.Semaphore@503f91c3[Permits = 0]
boolean tryAcquire ()
final var semaphore = new Semaphore(3);
try (final var executor = Executors.newFixedThreadPool(5)) {
for (int i = 1; i <= 5; i++) {
final var id = i;
TimeUnit.MILLISECONDS.sleep(100);
executor.submit(() -> {
if (semaphore.tryAcquire()) {
try {
System.out.printf("%d : try acquire = true : permits = %d%n",
id, semaphore.availablePermits());
TimeUnit.SECONDS.sleep(3);
System.out.printf("%d : sleep completed (release!)%n", id);
} catch (InterruptedException e) {
System.out.println("InterruptedException!");
} finally {
semaphore.release();
}
} else {
System.out.printf("%d : try acquire = false : permits = %d%n",
id, semaphore.availablePermits());
}
});
}
}
System.out.println("-- all tasks end --");
System.out.println("permits : " + semaphore.availablePermits());
// 結果
// ↓
//1 : try acquire = true : permits = 2
//2 : try acquire = true : permits = 1
//3 : try acquire = true : permits = 0
//4 : try acquire = false : permits = 0
//5 : try acquire = false : permits = 0
//1 : sleep completed (release!)
//2 : sleep completed (release!)
//3 : sleep completed (release!)
//-- all tasks end --
//permits : 3
boolean tryAcquire (int permits)
final var semaphore = new Semaphore(6);
try (final var executor = Executors.newFixedThreadPool(5)) {
for (int i = 1; i <= 5; i++) {
final var id = i;
TimeUnit.MILLISECONDS.sleep(100);
executor.submit(() -> {
if (semaphore.tryAcquire(2)) {
try {
System.out.printf("%d : try acquire = true : permits = %d%n",
id, semaphore.availablePermits());
TimeUnit.SECONDS.sleep(3);
System.out.printf("%d : sleep completed (release!)%n", id);
} catch (InterruptedException e) {
System.out.println("InterruptedException!");
} finally {
semaphore.release(2);
}
} else {
System.out.printf("%d : try acquire = false : permits = %d%n",
id, semaphore.availablePermits());
}
});
}
}
System.out.println("-- all tasks end --");
System.out.println("permits : " + semaphore.availablePermits());
// 結果
// ↓
//1 : try acquire = true : permits = 4
//2 : try acquire = true : permits = 2
//3 : try acquire = true : permits = 0
//4 : try acquire = false : permits = 0
//5 : try acquire = false : permits = 0
//1 : sleep completed (release!)
//2 : sleep completed (release!)
//3 : sleep completed (release!)
//-- all tasks end --
//permits : 6
boolean tryAcquire (int permits, long timeout, TimeUnit unit)
timeout と unit 以外のパラメータについては tryAcquire(int permits) の使用例をご参照ください。
// 基準となる時刻
final var current = System.nanoTime();
// 基準となる時刻からの差分を秒として取得
final DoubleSupplier elapsedTime = () -> (System.nanoTime() - current) / 1000000000.0;
final var semaphore = new Semaphore(2);
try (final var executor = Executors.newFixedThreadPool(3)) {
class Task implements Runnable {
private final String id;
private final long timeout;
Task(String id, long timeout) {
this.id = id;
this.timeout = timeout;
}
@Override
public void run() {
System.out.printf("%s : start : timeout = %d sec. : permits = %d%n",
id, timeout, semaphore.availablePermits());
try {
if (semaphore.tryAcquire(2, timeout, TimeUnit.SECONDS)) {
try {
System.out.printf("%s : try acquire = true (%f sec.)%n",
id, elapsedTime.getAsDouble());
TimeUnit.SECONDS.sleep(3);
System.out.printf("%s : sleep completed : release! (%f sec.)%n",
id, elapsedTime.getAsDouble());
} finally {
semaphore.release(2);
}
} else {
System.out.printf("%s : try acquire = false : Timeout! (%f sec.)%n",
id, elapsedTime.getAsDouble());
}
} catch (InterruptedException e) {
System.out.println("InterruptedException!");
}
}
}
executor.submit(new Task("A", 1));
TimeUnit.MILLISECONDS.sleep(100);
executor.submit(new Task("B", 2));
TimeUnit.MILLISECONDS.sleep(100);
executor.submit(new Task("C", 5));
}
System.out.println("-- all tasks end --");
System.out.println("permits : " + semaphore.availablePermits());
// 結果
// ↓
//A : start : timeout = 1 sec. : permits = 2
//A : try acquire = true (0.003661 sec.)
//B : start : timeout = 2 sec. : permits = 0
//C : start : timeout = 5 sec. : permits = 0
//B : try acquire = false : Timeout! (2.126545 sec.)
//A : sleep completed : release! (3.017536 sec.)
//C : try acquire = true (3.018140 sec.)
//C : sleep completed : release! (6.027912 sec.)
//-- all tasks end --
//permits : 2
boolean tryAcquire (long timeout, TimeUnit unit)
timeout と unit パラメータ以外については tryAcquire() の使用例をご参照ください。
// 基準となる時刻
final var current = System.nanoTime();
// 基準となる時刻からの差分を秒として取得
final DoubleSupplier elapsedTime = () -> (System.nanoTime() - current) / 1000000000.0;
final var semaphore = new Semaphore(1);
try (final var executor = Executors.newFixedThreadPool(3)) {
class Task implements Runnable {
private final String id;
private final long timeout;
Task(String id, long timeout) {
this.id = id;
this.timeout = timeout;
}
@Override
public void run() {
System.out.printf("%s : start : timeout = %d sec. : permits = %d%n",
id, timeout, semaphore.availablePermits());
try {
if (semaphore.tryAcquire(timeout, TimeUnit.SECONDS)) {
try {
System.out.printf("%s : try acquire = true (%f sec.)%n",
id, elapsedTime.getAsDouble());
TimeUnit.SECONDS.sleep(3);
System.out.printf("%s : sleep completed : release! (%f sec.)%n",
id, elapsedTime.getAsDouble());
} finally {
semaphore.release();
}
} else {
System.out.printf("%s : try acquire = false : Timeout! (%f sec.)%n",
id, elapsedTime.getAsDouble());
}
} catch (InterruptedException e) {
System.out.println("InterruptedException!");
}
}
}
executor.submit(new Task("A", 1));
TimeUnit.MILLISECONDS.sleep(100);
executor.submit(new Task("B", 2));
TimeUnit.MILLISECONDS.sleep(100);
executor.submit(new Task("C", 5));
}
System.out.println("-- all tasks end --");
System.out.println("permits : " + semaphore.availablePermits());
// 結果
// ↓
//A : start : timeout = 1 sec. : permits = 1
//A : try acquire = true (0.003326 sec.)
//B : start : timeout = 2 sec. : permits = 0
//C : start : timeout = 5 sec. : permits = 0
//B : try acquire = false : Timeout! (2.116379 sec.)
//A : sleep completed : release! (3.007164 sec.)
//C : try acquire = true (3.007721 sec.)
//C : sleep completed : release! (6.021869 sec.)
//-- all tasks end --
//permits : 1
関連記事
- API 使用例
- BlockingQueue (ブロッキング・キュー)
- Callable
- CancellationException
- ConcurrentHashMap.KeySetView (並列処理用セット)
- ConcurrentLinkedDeque (並列処理用・両端キュー)
- ConcurrentLinkedQueue (並列処理用キュー)
- ConcurrentMap (並列処理用マップ)
- ConcurrentModificationException (並列処理例外)
- ConcurrentSkipListSet (並列処理用セット)
- Condition (同期)
- CopyOnWriteArrayList (並列処理用リスト)
- CopyOnWriteArraySet (並列処理用セット)
- CountDownLatch (同期)
- CyclicBarrier (同期)
- Exchanger (同期)
- Executor
- ExecutorService
- Executors
- Future
- Future.State
- FutureTask
- InterruptedException (割込み例外)
- Lock (同期)
- Object (オブジェクト)
- Runnable
- Thread (スレッド)
- ThreadGroup
- ThreadLocal
- TimeUnit