123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806 |
|
/**
* The Atomic module is intended to provide some basic support for the so called lock-free
* concurrent programming.
* The current design replaces the previous Atomic module by Sean and is inspired
* partly by the llvm atomic operations, and Sean's version
*
* If no atomic ops are available an (inefficent) fallback solution is provided
* For classes atomic access means atomic access to their *address* not their content
*
* If you want unique counters or flags to communicate in multithreading settings
* look at tango.core.sync.Counter that provides them in a better way and handles
* better the absence of atomic ops.
*
* Copyright: Copyright (C) 2008-2010 the blip developer group
* License: BSD style: $(LICENSE)
* Author: Fawzi Mohamed
*/
module tango.core.sync.Atomic;
version( LDC )
{
import ldc.intrinsics;
}
private {
// from tango.core.traits:
/**
* Evaluates to true if T is a signed or unsigned integer type.
*/
template isIntegerType( T )
{
const bool isIntegerType = isSignedIntegerType!(T) ||
isUnsignedIntegerType!(T);
}
/**
* Evaluates to true if T is a pointer type.
*/
template isPointerOrClass(T)
{
const isPointerOrClass = is(T==class);
}
template isPointerOrClass(T : T*)
{
const isPointerOrClass = true;
}
/**
* Evaluates to true if T is a signed integer type.
*/
template isSignedIntegerType( T )
{
const bool isSignedIntegerType = is( T == byte ) ||
is( T == short ) ||
is( T == int ) ||
is( T == long )/+||
is( T == cent )+/;
}
/**
* Evaluates to true if T is an unsigned integer type.
*/
template isUnsignedIntegerType( T )
{
const bool isUnsignedIntegerType = is( T == ubyte ) ||
is( T == ushort ) ||
is( T == uint ) ||
is( T == ulong )/+||
is( T == ucent )+/;
}
/// substitutes classes with void*
template ClassPtr(T){
static if (is(T==class)){
alias void* ClassPtr;
} else {
alias T ClassPtr;
}
}
}
extern(C) void thread_yield();
// NOTE: Strictly speaking, the x86 supports atomic operations on
// unaligned values. However, this is far slower than the
// common case, so such behavior should be prohibited.
template atomicValueIsProperlyAligned( T )
{
bool atomicValueIsProperlyAligned( size_t addr )
{
return addr % ClassPtr!(T).sizeof == 0;
}
}
/*
* A barrier does not allow some kinds of intermixing and out of order execution
* and ensures that all operations of one kind are executed before the operations of the other type
* which kind of mixing are not allowed depends from the template arguments
* These are global barriers: the whole memory is synchronized (devices excluded if device is false)
*
* the actual barrier eforced might be stronger than the requested one
*
* if ll is true loads before the barrier are not allowed to mix with loads after the barrier
* if ls is true loads before the barrier are not allowed to mix with stores after the barrier
* if sl is true stores before the barrier are not allowed to mix with loads after the barrier
* if ss is true stores before the barrier are not allowed to mix with stores after the barrier
* if device is true als uncached and device memory is synchronized
*
* Barriers are typically paired
*
* For example if you want to ensure that all writes
* are done before setting a flags that communicates that an objects is initialized you would
* need memoryBarrier(false,false,false,true) before setting the flag.
* To read that flag before reading the rest of the object you would need a
* memoryBarrier(true,false,false,false) after having read the flag.
*
* I believe that these two barriers are called acquire and release, but you find several
* incompatible definitions around (some obviously wrong), so some care migth be in order
* To be safer memoryBarrier(false,true,false,true) might be used for acquire, and
* memoryBarrier(true,false,true,false) for release which are slighlty stronger.
*
* These barriers are also called write barrier and read barrier respectively.
*
* A full memory fence is (true,true,true,true) and ensures that stores and loads before the
* barrier are done before stores and loads after it.
* Keep in mind even with a full barrier you still normally need two of them, to avoid that the
* other process reorders loads (for example) and still sees things in the wrong order.
*/
/* llvm_memory_barrier gone?
version( LDC )
{
void memoryBarrier(bool ll, bool ls, bool sl,bool ss,bool device=false)(){
// XXX: this is overly conservative
llvm_memory_fence();
}
} else */version(D_InlineAsm_X86){
void memoryBarrier(bool ll, bool ls, bool sl,bool ss,bool device=false)(){
static if (device) {
if (ls || sl || ll || ss){
// cpid should sequence even more than mfence
asm {
push EBX;
mov EAX, 0; // model, stepping
cpuid;
pop EBX;
}
}
} else static if (ls || sl || (ll && ss)){ // use a sequencing operation like cpuid or simply cmpxch instead?
asm {
mfence;
}
// this is supposedly faster and correct, but let's play it safe and use the specific instruction
// push rax
// xchg rax
// pop rax
} else static if (ll){
asm {
lfence;
}
} else static if( ss ){
asm {
sfence;
}
}
}
} else version(D_InlineAsm_X86_64){
void memoryBarrier(bool ll, bool ls, bool sl,bool ss,bool device=false)(){
static if (device) {
if (ls || sl || ll || ss){
// cpid should sequence even more than mfence
asm {
push RBX;
mov RAX, 0; // model, stepping
cpuid;
pop RBX;
}
}
} else static if (ls || sl || (ll && ss)){ // use a sequencing operation like cpuid or simply cmpxch instead?
asm {
mfence;
}
// this is supposedly faster and correct, but let's play it safe and use the specific instruction
// push rax
// xchg rax
// pop rax
} else static if (ll){
asm {
lfence;
}
} else static if( ss ){
asm {
sfence;
}
}
}
} else {
pragma(msg,"WARNING: no atomic operations on this architecture");
pragma(msg,"WARNING: this is *slow* you probably want to change this!");
int dummy;
// acquires a lock... probably you will want to skip this
void memoryBarrier(bool ll, bool ls, bool sl,bool ss,bool device=false)(){
synchronized { dummy=1; }
}
enum{LockVersion=true}
}
static if (!is(typeof(LockVersion))) {
enum{LockVersion=false}
}
// use stricter fences
enum{strictFences=false}
/// Utility function for a write barrier (disallow store and store reorderig.)
void writeBarrier(){
memoryBarrier!(false,false,strictFences,true)();
}
/// Utility function for a read barrier (disallow load and load reorderig.)
void readBarrier(){
memoryBarrier!(true,strictFences,false,false)();
}
/// Utility function for a full barrier (disallow reorderig.)
void fullBarrier(){
memoryBarrier!(true,true,true,true)();
}
/*
* Atomic swap.
* val and newval in one atomic operation
* barriers are not implied, just atomicity!
*/
version(LDC){
T atomicSwap( T )( ref T val, T newval )
{
T oldval = void;
static if (isPointerOrClass!(T))
{
oldval = cast(T)llvm_atomic_swap!(size_t)(cast(shared(size_t)*)&val, cast(size_t)newval);
}
else static if (is(T == bool))
{
oldval = llvm_atomic_swap!(ubyte)(cast(shared(ubyte)*)&val, newval?1:0)?0:1;
}
else
{
oldval = llvm_atomic_swap!(T)(cast(shared)&val, newval);
}
return oldval;
}
} else version(D_InlineAsm_X86) {
T atomicSwap( T )( ref T val, T newval )
in {
// NOTE: 32 bit x86 systems support 8 byte CAS, which only requires
// 4 byte alignment, so use size_t as the align type here.
static if( T.sizeof > size_t.sizeof )
assert( atomicValueIsProperlyAligned!(size_t)( cast(size_t) &val ) );
else
assert( atomicValueIsProperlyAligned!(T)( cast(size_t) &val ) );
} body {
T*posVal=&val;
static if( T.sizeof == byte.sizeof ) {
asm {
mov AL, newval;
mov ECX, posVal;
lock; // lock always needed to make this op atomic
xchg [ECX], AL;
}
}
else static if( T.sizeof == short.sizeof ) {
asm {
mov AX, newval;
mov ECX, posVal;
lock; // lock always needed to make this op atomic
xchg [ECX], AX;
}
}
else static if( T.sizeof == int.sizeof ) {
asm {
mov EAX, newval;
mov ECX, posVal;
lock; // lock always needed to make this op atomic
xchg [ECX], EAX;
}
}
else static if( T.sizeof == long.sizeof ) {
// 8 Byte swap on 32-Bit Processor, use CAS?
static assert( false, "Invalid template type specified, 8bytes in 32 bit mode: "~T.stringof );
}
else
{
static assert( false, "Invalid template type specified: "~T.stringof );
}
}
} else version (D_InlineAsm_X86_64){
T atomicSwap( T )( ref T val, T newval )
in {
assert( atomicValueIsProperlyAligned!(T)( cast(size_t) &val ) );
} body {
T*posVal=&val;
static if( T.sizeof == byte.sizeof ) {
asm {
mov AL, newval;
mov RCX, posVal;
lock; // lock always needed to make this op atomic
xchg [RCX], AL;
}
}
else static if( T.sizeof == short.sizeof ) {
asm {
mov AX, newval;
mov RCX, posVal;
lock; // lock always needed to make this op atomic
xchg [RCX], AX;
}
}
else static if( T.sizeof == int.sizeof ) {
asm {
mov EAX, newval;
mov RCX, posVal;
lock; // lock always needed to make this op atomic
xchg [RCX], EAX;
}
}
else static if( T.sizeof == long.sizeof ) {
asm {
mov RAX, newval;
mov RCX, posVal;
lock; // lock always needed to make this op atomic
xchg [RCX], RAX;
}
}
else
{
static assert( false, "Invalid template type specified: "~T.stringof );
}
}
} else {
T atomicSwap( T )( ref T val, T newval )
in {
assert( atomicValueIsProperlyAligned!(T)( cast(size_t) &val ) );
} body {
T oldVal;
synchronized(typeid(T)){
oldVal=val;
val=newval;
}
return oldVal;
}
}
//---------------------
// internal conversion template
private T aCasT(T,V)(ref T val, T newval, T equalTo){
union UVConv{V v; T t;}
union UVPtrConv{V *v; T *t;}
UVConv vNew,vOld,vAtt;
UVPtrConv valPtr;
vNew.t=newval;
vOld.t=equalTo;
valPtr.t=&val;
vAtt.v=atomicCAS(*valPtr.v,vNew.v,vOld.v);
return vAtt.t;
}
/// internal reduction
private T aCas(T)(ref T val, T newval, T equalTo){
static if (T.sizeof==1){
return aCasT!(T,ubyte)(val,newval,equalTo);
} else static if (T.sizeof==2){
return aCasT!(T,ushort)(val,newval,equalTo);
} else static if (T.sizeof==4){
return aCasT!(T,uint)(val,newval,equalTo);
} else static if (T.sizeof==8){ // unclear if it is always supported...
return aCasT!(T,ulong)(val,newval,equalTo);
} else {
static assert(0,"invalid type "~T.stringof);
}
}
/*
* Atomic compare & exchange (can be used to implement everything else)
* stores newval into val if val==equalTo in one atomic operation.
* Barriers are not implied, just atomicity!
* Returns the value that is checked against equalTo (i.e. an exchange was performed
* if result==equalTo, otherwise one can use the result as the current value).
*/
version(LDC){
T atomicCAS( T )( ref T val, T newval, T equalTo )
{
T oldval = void;
static if (isPointerOrClass!(T))
{
oldval = cast(T)cast(void*)llvm_atomic_cmp_swap!(size_t)(cast(shared(size_t)*)cast(void*)&val, cast(size_t)cast(void*)equalTo, cast(size_t)cast(void*)newval);
}
else static if (is(T == bool)) // correct also if bol has different size?
{
oldval = aCas(val,newval,equalTo); // assuming true is *always* 1 and not a non zero value...
}
else static if (isIntegerType!(T))
{
oldval = llvm_atomic_cmp_swap!(T)(cast(shared)&val, equalTo, newval);
} else {
oldval = aCas(val,newval,equalTo);
}
return oldval;
}
} else version(D_InlineAsm_X86) {
version(darwin){
extern(C) ubyte OSAtomicCompareAndSwap64(long oldValue, long newValue,
long *theValue); // assumes that in C sizeof(_Bool)==1 (as given in osx IA-32 ABI)
}
T atomicCAS( T )( ref T val, T newval, T equalTo )
in {
// NOTE: 32 bit x86 systems support 8 byte CAS, which only requires
// 4 byte alignment, so use size_t as the align type here.
static if( ClassPtr!(T).sizeof > size_t.sizeof )
assert( atomicValueIsProperlyAligned!(size_t)( cast(size_t) &val ) );
else
assert( atomicValueIsProperlyAligned!(ClassPtr!(T))( cast(size_t) &val ) );
} body {
T*posVal=&val;
static if( T.sizeof == byte.sizeof ) {
asm {
mov DL, newval;
mov AL, equalTo;
mov ECX, posVal;
lock; // lock always needed to make this op atomic
cmpxchg [ECX], DL;
}
}
else static if( T.sizeof == short.sizeof ) {
asm {
mov DX, newval;
mov AX, equalTo;
mov ECX, posVal;
lock; // lock always needed to make this op atomic
cmpxchg [ECX], DX;
}
}
else static if( ClassPtr!(T).sizeof == int.sizeof ) {
asm {
mov EDX, newval;
mov EAX, equalTo;
mov ECX, posVal;
lock; // lock always needed to make this op atomic
cmpxchg [ECX], EDX;
}
}
else static if( T.sizeof == long.sizeof ) {
// 8 Byte StoreIf on 32-Bit Processor
version(darwin){
union UVConv{long v; T t;}
union UVPtrConv{long *v; T *t;}
UVConv vEqual,vNew;
UVPtrConv valPtr;
vEqual.t=equalTo;
vNew.t=newval;
valPtr.t=&val;
while(1){
if(OSAtomicCompareAndSwap64(vEqual.v, vNew.v, valPtr.v)!=0)
{
return equalTo;
} else {
{
T res=val;
if (res!is equalTo) return res;
}
}
}
} else {
T res;
asm
{
push EDI;
push EBX;
lea EDI, newval;
mov EBX, [EDI];
mov ECX, 4[EDI];
lea EDI, equalTo;
mov EAX, [EDI];
mov EDX, 4[EDI];
mov EDI, val;
lock; // lock always needed to make this op atomic
cmpxchg8b [EDI];
lea EDI, res;
mov [EDI], EAX;
mov 4[EDI], EDX;
pop EBX;
pop EDI;
}
return res;
}
}
else
{
static assert( false, "Invalid template type specified: "~T.stringof );
}
}
} else version (D_InlineAsm_X86_64){
T atomicCAS( T )( ref T val, T newval, T equalTo )
in {
assert( atomicValueIsProperlyAligned!(T)( cast(size_t) &val ) );
} body {
T*posVal=&val;
static if( T.sizeof == byte.sizeof ) {
asm {
mov DL, newval;
mov AL, equalTo;
mov RCX, posVal;
lock; // lock always needed to make this op atomic
cmpxchg [RCX], DL;
}
}
else static if( T.sizeof == short.sizeof ) {
asm {
mov DX, newval;
mov AX, equalTo;
mov RCX, posVal;
lock; // lock always needed to make this op atomic
cmpxchg [RCX], DX;
}
}
else static if( ClassPtr!(T).sizeof == int.sizeof ) {
asm {
mov EDX, newval;
mov EAX, equalTo;
mov RCX, posVal;
lock; // lock always needed to make this op atomic
cmpxchg [RCX], EDX;
}
}
else static if( ClassPtr!(T).sizeof == long.sizeof ) {
asm {
mov RDX, newval;
mov RAX, equalTo;
mov RCX, posVal;
lock; // lock always needed to make this op atomic
cmpxchg [RCX], RDX;
}
}
else
{
static assert( false, "Invalid template type specified: "~T.stringof );
}
}
} else {
T atomicCAS( T )( ref T val, T newval, T equalTo )
in {
assert( atomicValueIsProperlyAligned!(T)( cast(size_t) &val ) );
} body {
T oldval;
synchronized(typeid(T)){
oldval=val;
if(oldval==equalTo) {
val=newval;
}
}
return oldval;
}
}
bool atomicCASB(T)( ref T val, T newval, T equalTo ){
return (equalTo is atomicCAS(val,newval,equalTo));
}
/*
* Loads a value from memory.
*
* At the moment it is assumed that all aligned memory accesses are atomic
* in the sense that all bits are consistent with some store.
*
* Remove this? I know no actual architecture where this would be different.
*/
T atomicLoad(T)(ref T val)
in {
assert( atomicValueIsProperlyAligned!(T)( cast(size_t) &val ) );
static assert(ClassPtr!(T).sizeof<=size_t.sizeof,"invalid size for "~T.stringof);
} body {
T res=val;
return res;
}
/*
* Stores a value the the memory.
*
* At the moment it is assumed that all aligned memory accesses are atomic
* in the sense that a load either sees the complete store or the previous value.
*
* Remove this? I know no actual architecture where this would be different.
*/
void atomicStore(T)(ref T val, T newVal)
in {
assert( atomicValueIsProperlyAligned!(T)( cast(size_t) &val ), "invalid alignment" );
static assert(ClassPtr!(T).sizeof<=size_t.sizeof,"invalid size for "~T.stringof);
} body {
val=newVal;
}
/*
* Increments the given value and returns the previous value with an atomic operation.
*
* Some architectures might allow just increments/decrements by 1.
* No barriers implied, only atomicity!
*/
version(LDC){
T atomicAdd(T)(ref T val, T incV){
static if (isPointerOrClass!(T))
{
return cast(T)llvm_atomic_load_add!(size_t)(cast(shared(size_t)*)&val, incV);
}
else static if (isIntegerType!(T))
{
static assert( isIntegerType!(T), "invalid type "~T.stringof );
return llvm_atomic_load_add!(T)(cast(shared)&val, cast(T)incV);
} else {
return atomicOp(val,delegate T(T a){ return a+incV; });
}
}
} else version (D_InlineAsm_X86){
T atomicAdd(T,U=T)(ref T val, U incV_){
T incV=cast(T)incV_;
static if (isIntegerType!(T)||isPointerOrClass!(T)){
T* posVal=&val;
T res;
static if (T.sizeof==1){
asm {
mov DL, incV;
mov ECX, posVal;
lock;
xadd byte ptr [ECX],DL;
mov byte ptr res[EBP],DL;
}
} else static if (T.sizeof==2){
asm {
mov DX, incV;
mov ECX, posVal;
lock;
xadd short ptr [ECX],DX;
mov short ptr res[EBP],DX;
}
} else static if (T.sizeof==4){
asm
{
mov EDX, incV;
mov ECX, posVal;
lock;
xadd int ptr [ECX],EDX;
mov int ptr res[EBP],EDX;
}
} else static if (T.sizeof==8){
return atomicOp(val,delegate (T x){ return x+incV; });
} else {
static assert(0,"Unsupported type size");
}
return res;
} else {
return atomicOp(val,delegate T(T a){ return a+incV; });
}
}
} else version (D_InlineAsm_X86_64){
T atomicAdd(T,U=T)(ref T val, U incV_){
T incV=cast(T)incV_;
static if (isIntegerType!(T)||isPointerOrClass!(T)){
T* posVal=&val;
T res;
static if (T.sizeof==1){
asm {
mov DL, incV;
mov RCX, posVal;
lock;
xadd byte ptr [RCX],DL;
mov byte ptr res[EBP],DL;
}
} else static if (T.sizeof==2){
asm {
mov DX, incV;
mov RCX, posVal;
lock;
xadd short ptr [RCX],DX;
mov short ptr res[EBP],DX;
}
} else static if (T.sizeof==4){
asm
{
mov EDX, incV;
mov RCX, posVal;
lock;
xadd int ptr [RCX],EDX;
mov int ptr res[EBP],EDX;
}
} else static if (T.sizeof==8){
asm
{
mov RAX, val;
mov RDX, incV;
lock; // lock always needed to make this op atomic
xadd qword ptr [RAX],RDX;
mov res[EBP],RDX;
}
} else {
static assert(0,"Unsupported type size for type:"~T.stringof);
}
return res;
} else {
return atomicOp(val,delegate T(T a){ return a+incV; });
}
}
} else {
static if (LockVersion){
T atomicAdd(T,U=T)(ref T val, U incV_){
T incV=cast(T)incV_;
static assert( isIntegerType!(T)||isPointerOrClass!(T),"invalid type: "~T.stringof );
synchronized(typeid(T)){
T oldV=val;
val+=incV;
return oldV;
}
}
} else {
T atomicAdd(T,U=T)(ref T val, U incV_){
T incV=cast(T)incV_;
static assert( isIntegerType!(T)||isPointerOrClass!(T),"invalid type: "~T.stringof );
synchronized(typeid(T)){
T oldV,newVal,nextVal;
nextVal=val;
do{
oldV=nextVal;
newV=oldV+incV;
auto nextVal=atomicCAS!(T)(val,newV,oldV);
} while(nextVal!=oldV);
return oldV;
}
}
}
}
/*
* Applies a pure function atomically.
* The function should be pure as it might be called several times to ensure atomicity
* The function should take a short time to compute otherwise contention is possible
* and no "fair" share is applied between fast function (more likely to succeed) and
* the others (i.e. do not use this in case of high contention).
*/
T atomicOp(T)(ref T val, T delegate(T) f){
T oldV,newV,nextV;
int i=0;
nextV=val;
do {
oldV=nextV;
newV=f(oldV);
nextV=aCas!(T)(val,newV,oldV);
if (nextV is oldV || newV is oldV) return oldV;
} while(++i<200);
while (true){
thread_yield();
oldV=val;
newV=f(oldV);
nextV=aCas!(T)(val,newV,oldV);
if (nextV is oldV || newV is oldV) return oldV;
}
}
/*
* Reads a flag (ensuring that other accesses can not happen before you read it).
*/
T flagGet(T)(ref T flag){
T res;
res=flag;
memoryBarrier!(true,false,strictFences,false)();
return res;
}
/*
* Sets a flag (ensuring that all pending writes are executed before this).
* the original value is returned.
*/
T flagSet(T)(ref T flag,T newVal){
memoryBarrier!(false,strictFences,false,true)();
return atomicSwap(flag,newVal);
}
/*
* Writes a flag (ensuring that all pending writes are executed before this).
* the original value is returned.
*/
T flagOp(T)(ref T flag,T delegate(T) op){
memoryBarrier!(false,strictFences,false,true)();
return atomicOp(flag,op);
}
/*
* Reads a flag (ensuring that all pending writes are executed before this).
*/
T flagAdd(T)(ref T flag,T incV=cast(T)1){
static if (!LockVersion)
memoryBarrier!(false,strictFences,false,true)();
return atomicAdd(flag,incV);
}
/*
* Returns the value of val and increments it in one atomic operation
* useful for counters, and to generate unique values (fast)
* no barriers are implied.
*/
T nextValue(T)(ref T val){
return atomicAdd(val,cast(T)1);
}
|