本文介绍的Semaphore实现基于synchronized,wait()和notify/notifyAll(),这是java并发包之前的典型实现方式.在eclipse的源码中可以找到不少这样的案例,下文中也会把eclipse中的几个实现类作为案例以分析之.
注,这里介绍的信号量实现是基于java语言机制,用于实现多线程间的同步操作,所以对S,P(S),V(S)等概念的介绍将结合本文内容,做合适的调整,读者可阅读操作系统相关书籍的信号量章节获取标准定义.
*本文内容
---信号量简介
---典型案例
*Semaphore概述
---通常把一个非负整数称为Semaphore,表示为S.
S可以理解为可用的资源数量.这里不涉及进程问题,所以就假定S>=0.
---S实现的同步机制表示为PV原语操作
P(S):若S=0,线程进入等待队列;否则,—S;
V(S):++S,唤醒处于等待中的线程.
(注,P是荷兰语的Passeren,相当于英文的pass, V是荷兰语的Verhoog,相当于英文中的incremnet).
*案例
1)典型实现
这段程序源自ibm的一本并发书籍,实现了计数信号量{S|S∈{0,N}}和二元信号量(S={0,1})
public abstract class Semaphore { private int value = 0; public Semaphore() { } public Semaphore(int initial) { if (initial >= 0) value = initial; else throw new IllegalArgumentException("initial < 0"); } public final synchronized void P() throws InterruptedException { while (value == 0) wait(); value--; } protected final synchronized void Vc() { value++; notifyAll(); } protected final synchronized void Vb() { value++; notifyAll(); if (value > 1) value = 1; } public abstract void V(); public String toString() { return ".value=" + value; }}public final class BinarySemaphore extends Semaphore { public BinarySemaphore() { super(); } public BinarySemaphore(int initial) { super(initial); if (initial > 1) throw new IllegalArgumentException("initial > 1"); } public final synchronized void V() { super.Vb(); }}public final class CountingSemaphore extends Semaphore { public CountingSemaphore() { super(); } public CountingSemaphore(int initial) { super(initial); } public final synchronized void V() { super.Vc(); }}
2)实现读写锁
eclipse使用它,解决日志操作相关类在map,数组中的同步问题.
/******************************************************************************* * Copyright (c) 2008, 2011 IBM Corporation and others * All rights reserved. This program and the accompanying materials are made * available under the terms of the Eclipse Public License v1.0 which * accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html ******************************************************************************/package org.eclipse.equinox.log.internal;public class BasicReadWriteLock { private int currentReaders = 0; private int writersWaiting = 0; private boolean writing = false; public synchronized void readLock() { while (writing || writersWaiting != 0) { try { wait(); } catch (InterruptedException e) { // reset interrupted state but keep waiting Thread.currentThread().interrupt(); } } currentReaders++; } public synchronized void readUnlock() { currentReaders--; notifyAll(); } public synchronized void writeLock() { writersWaiting++; while (writing || currentReaders != 0) { try { wait(); } catch (InterruptedException e) { // reset interrupted state but keep waiting Thread.currentThread().interrupt(); } } writersWaiting--; writing = true; } public synchronized void writeUnlock() { writing = false; notifyAll(); }}
3)延迟信号量
这个信号量的亮点在acquire(long delay).
/******************************************************************************* * Copyright (c) 2003, 2006 IBM Corporation and others. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * IBM Corporation - initial API and implementation *******************************************************************************/package org.eclipse.core.runtime.internal.adaptor;/** * Internal class. */public class Semaphore { protected long notifications; public Semaphore(int count) { notifications = count; } /** * Attempts to acquire this semaphore. Returns only when the semaphore has been acquired. */ public synchronized void acquire() { while (true) { if (notifications > 0) { notifications--; return; } try { wait(); } catch (InterruptedException e) { //Ignore } } } /** * Attempts to acquire this semaphore. Returns true if it was successfully acquired, * and false otherwise. */ public synchronized boolean acquire(long delay) { //若传入负数,用于判断是否资源已被占 long start = System.currentTimeMillis(); long timeLeft = delay; while (true) { if (notifications > 0) { notifications--; return true; } if (timeLeft <= 0) //在延迟后不再继续尝试获取锁 return false; try { wait(timeLeft); } catch (InterruptedException e) { //Ignore } timeLeft = start + delay - System.currentTimeMillis(); } } public synchronized void release() { notifications++; notifyAll(); } // for debug only public String toString() { return "Semaphore(" + notifications + ")"; //$NON-NLS-1$ //$NON-NLS-2$ }
*总结
---通过java的对象锁,wait/notify机制模拟的信号量,可以呈现多种形态以应对各种的互斥需求.
---本文给出的例子,具有普遍的适用性.在实践中,咱们可以根据需求定制各种信号量实现.
---jdk1.5提供了Semaphore的另一种实现机制.