java手动实现CAS队列

2021-02-26

手动实现CAS队列

  • Atomic*FieldUpdater

    使用场景

  • 读多写少
  • 节省内存开销
  • sun.misc.Unsafe

    jdk直接操作内存

package consumer.queue.cas;

import consumer.queue.SQueue;

import java.util.Arrays;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

import static consumer.util.UnsafeAccess.UNSAFE;

/**
 * {@link SQueue}实现,基于CAS无锁操作.用于测试目的.
 *
 * @author skywalker
 */
class CASQueue<T> implements SQueue<T> {

	private final T[] array;
	private final long[] pad1 = new long[8];
	private volatile long readIndex = 0;
	private final long[] pad2 = new long[8];
	private volatile long writeIndex = 0;
	private final long[] pad3 = new long[8];
	private final int capacity;
	private final long mask;
	private final int arrayBaseOffset;
	private final int arrayPointerOffset;
	private final AtomicLongFieldUpdater<CASQueue> readIndexUpdater =
			AtomicLongFieldUpdater.newUpdater(CASQueue.class, "readIndex");
	private final AtomicLongFieldUpdater<CASQueue> writeIndexUpdater =
			AtomicLongFieldUpdater.newUpdater(CASQueue.class, "writeIndex");

	public CASQueue(int capacity) {
		if (capacity < 1) {
			throw new IllegalArgumentException("The param capacity must be bigger than 0.");
		}
		if (!isPowerOfTwo(capacity)) {
			throw new IllegalArgumentException("The param capacity must be power of 2.");
		}
		this.array = (T[]) new Object[capacity];
		this.capacity = capacity;
		this.mask = capacity - 1;
		this.arrayBaseOffset = UNSAFE.arrayBaseOffset(Object[].class);
		int scale = UNSAFE.arrayIndexScale(Object[].class);
		if (scale == 4) {
			this.arrayPointerOffset = 2;
		} else if (scale == 8) {
			this.arrayPointerOffset = 3;
		} else {
			throw new IllegalStateException("Unkown pointer size: " + scale);
		}
	}

	/**
	 * 判断给定的数字是否是2的整次幂.
	 */
	private boolean isPowerOfTwo(int n) {
		return ((n & (n - 1)) == 0);
	}

	@Override
	public boolean offer(T element) {
		long index = 0;
		do {
			index = writeIndex;
			if ((index - readIndex) == capacity) {
				return false;
			}
		} while (!writeIndexUpdater.compareAndSet(this, index, index + 1));
		long offset = calOffset(index);
		UNSAFE.putOrderedObject(array, offset, element);
		return true;
	}

	@Override
	public T poll() {
		long index = 0;
		do {
			index = readIndex;
			if (index >= writeIndex) {
				return null;
			}
		} while (!readIndexUpdater.compareAndSet(this, index, index + 1));
		long offset = calOffset(index);
		T result = (T) UNSAFE.getObjectVolatile(array, offset);
		if (result == null) {
			//生产者尚未完成element保存
			do {
				result = (T) UNSAFE.getObjectVolatile(array, offset);
			} while (result == null);
		}
		return result;
	}

	/**
	 * 计算给定的索引在数组中的偏移.
	 */
	private long calOffset(long index) {
		return (arrayBaseOffset + ((index & mask) << arrayPointerOffset));
	}

	@Override
	public int size() {
		return (int) (writeIndex - readIndex);
	}

	@Override
	public boolean isEmpty() {
		return (readIndex >= writeIndex);
	}

	@Override
	public String toString() {
		return "CASQueue{" +
				"array=" + Arrays.toString(array) +
				", readIndex=" + readIndex +
				", writeIndex=" + writeIndex +
				", capacity=" + capacity +
				'}';
	}

}

标签: java

留言请用 Github Issues

聊天请在 telegram

授权协议 (CC) BY-NC-SA | Powered by Jekyll & fzheng.me | 订阅 RSS | 邮箱 paulvsjames@gmail.com