123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417341834193420342134223423342434253426342734283429343034313432343334343435343634373438343934403441344234433444344534463447344834493450345134523453345434553456345734583459346034613462346334643465346634673468346934703471347234733474347534763477347834793480348134823483348434853486348734883489349034913492349334943495349634973498349935003501350235033504350535063507350835093510351135123513351435153516351735183519352035213522352335243525352635273528352935303531353235333534353535363537353835393540354135423543354435453546354735483549355035513552355335543555355635573558355935603561356235633564356535663567356835693570357135723573357435753576357735783579358035813582358335843585358635873588358935903591359235933594359535963597359835993600360136023603360436053606360736083609361036113612361336143615361636173618361936203621362236233624362536263627362836293630363136323633363436353636363736383639364036413642364336443645364636473648364936503651365236533654365536563657365836593660366136623663366436653666366736683669367036713672367336743675367636773678367936803681368236833684368536863687368836893690369136923693369436953696369736983699370037013702370337043705370637073708370937103711371237133714371537163717371837193720372137223723372437253726372737283729373037313732373337343735373637373738373937403741374237433744374537463747374837493750375137523753375437553756375737583759376037613762376337643765376637673768376937703771377237733774377537763777377837793780378137823783378437853786378737883789379037913792379337943795379637973798379938003801380238033804380538063807380838093810381138123813381438153816381738183819382038213822382338243825382638273828382938303831383238333834383538363837383838393840 |
|
/**
* The thread module provides support for thread creation and management.
*
* If AtomicSuspendCount is used for speed reasons all signals are sent together.
* When debugging gdb funnels all signals through one single handler, and if
* the signals arrive quickly enough they will be coalesced in a single signal,
* (discarding the second) thus it is possible to loose signals, which blocks
* the program. Thus when debugging it is better to use the slower SuspendOneAtTime
* version.
*
* Copyright: Copyright (C) 2005-2006 Sean Kelly, Fawzi. All rights reserved.
* License: BSD style: $(LICENSE)
* Authors: Sean Kelly, Fawzi Mohamed
*/
module tango.core.Thread;
public import core.thread;
import Time = tango.core.Time;
extern(C)
{
void thread_yield()
{
Thread.yield();
}
void thread_sleep(double period)
{
Thread.sleep(Time.seconds(period));
}
}
/+
import tango.core.sync.Atomic;
debug(Thread)
import tango.stdc.stdio : printf;
// this should be true for most architectures
version = StackGrowsDown;
version(darwin){
version=AtomicSuspendCount;
}
version(linux){
version=AtomicSuspendCount;
}
public
{
// import tango.core.TimeSpan;
}
private
{
import tango.core.Exception;
extern (C) void _d_monitorenter(Object);
extern (C) void _d_monitorexit(Object);
//
// exposed by compiler runtime
//
extern (C) void* rt_stackBottom();
extern (C) void* rt_stackTop();
void* getStackBottom()
{
return rt_stackBottom();
}
void* getStackTop()
{
version( D_InlineAsm_X86 )
{
asm
{
naked;
mov EAX, ESP;
ret;
}
}
else
{
return rt_stackTop();
}
}
version(D_InlineAsm_X86){
uint getEBX(){
uint retVal;
asm{
mov retVal,EBX;
}
return retVal;
}
}
}
////////////////////////////////////////////////////////////////////////////////
// Thread Entry Point and Signal Handlers
////////////////////////////////////////////////////////////////////////////////
version( Win32 )
{
private
{
import tango.stdc.stdint : uintptr_t; // for _beginthreadex decl below
import tango.sys.win32.UserGdi;
enum DWORD TLS_OUT_OF_INDEXES = 0xFFFFFFFF;
//
// avoid multiple imports via tango.sys.windows.process
//
extern (Windows) alias uint function(void*) btex_fptr;
extern (C) uintptr_t _beginthreadex(void*, uint, btex_fptr, void*, uint, uint*);
//
// entry point for Windows threads
//
extern (Windows) uint thread_entryPoint( void* arg )
{
Thread obj = cast(Thread) arg;
assert( obj );
scope( exit ) Thread.remove( obj );
assert( obj.m_curr is &obj.m_main );
obj.m_main.bstack = getStackBottom();
obj.m_main.tstack = obj.m_main.bstack;
Thread.add( &obj.m_main );
Thread.setThis( obj );
// NOTE: No GC allocations may occur until the stack pointers have
// been set and Thread.getThis returns a valid reference to
// this thread object (this latter condition is not strictly
// necessary on Win32 but it should be followed for the sake
// of consistency).
// TODO: Consider putting an auto exception object here (using
// alloca) forOutOfMemoryError plus something to track
// whether an exception is in-flight?
try
{
obj.run();
}
catch( Object o )
{
obj.m_unhandled = o;
}
return 0;
}
//
// copy of the same-named function in phobos.std.thread--it uses the
// Windows naming convention to be consistent with GetCurrentThreadId
//
HANDLE GetCurrentThreadHandle()
{
enum uint DUPLICATE_SAME_ACCESS = 0x00000002;
HANDLE curr = GetCurrentThread(),
proc = GetCurrentProcess(),
hndl;
DuplicateHandle( proc, curr, proc, &hndl, 0, TRUE, DUPLICATE_SAME_ACCESS );
return hndl;
}
}
}
else version( Posix )
{
private
{
import tango.stdc.posix.semaphore;
import tango.stdc.posix.pthread;
import tango.stdc.posix.signal;
import tango.stdc.posix.time;
import tango.stdc.errno;
extern (C) int getErrno();
version( GNU )
{
import gcc.builtins;
}
//
// entry point for POSIX threads
//
extern (C) void* thread_entryPoint( void* arg )
{
Thread obj = cast(Thread) arg;
assert( obj );
scope( exit )
{
// NOTE: isRunning should be set to false after the thread is
// removed or a double-removal could occur between this
// function and thread_suspendAll.
Thread.remove( obj );
obj.m_isRunning = false;
}
static extern (C) void thread_cleanupHandler( void* arg )
{
Thread obj = cast(Thread) arg;
assert( obj );
// NOTE: If the thread terminated abnormally, just set it as
// not running and let thread_suspendAll remove it from
// the thread list. This is safer and is consistent
// with the Windows thread code.
obj.m_isRunning = false;
}
// NOTE: Using void to skip the initialization here relies on
// knowledge of how pthread_cleanup is implemented. It may
// not be appropriate for all platforms. However, it does
// avoid the need to link the pthread module. If any
// implementation actually requires default initialization
// then pthread_cleanup should be restructured to maintain
// the current lack of a link dependency.
version( linux )
{
pthread_cleanup cleanup = void;
cleanup.push( &thread_cleanupHandler, cast(void*) obj );
}
else version( darwin )
{
pthread_cleanup cleanup = void;
cleanup.push( &thread_cleanupHandler, cast(void*) obj );
}
else version( solaris )
{
pthread_cleanup cleanup = void;
cleanup.push( &thread_cleanupHandler, cast(void*) obj );
}
else
{
pthread_cleanup_push( &thread_cleanupHandler, cast(void*) obj );
}
// NOTE: For some reason this does not always work for threads.
//obj.m_main.bstack = getStackBottom();
version( D_InlineAsm_X86 )
{
static void* getBasePtr()
{
asm
{
naked;
mov EAX, EBP;
ret;
}
}
obj.m_main.bstack = getBasePtr();
}
else version( StackGrowsDown )
obj.m_main.bstack = &obj + 1;
else
obj.m_main.bstack = &obj;
obj.m_main.tstack = obj.m_main.bstack;
assert( obj.m_curr == &obj.m_main );
Thread.add( &obj.m_main );
Thread.setThis( obj );
// NOTE: No GC allocations may occur until the stack pointers have
// been set and Thread.getThis returns a valid reference to
// this thread object (this latter condition is not strictly
// necessary on Win32 but it should be followed for the sake
// of consistency).
// TODO: Consider putting an auto exception object here (using
// alloca) forOutOfMemoryError plus something to track
// whether an exception is in-flight?
try
{
obj.run();
}
catch( Throwable o )
{
obj.m_unhandled = o;
}
return null;
}
//
// used to track the number of suspended threads
//
version(AtomicSuspendCount){
int suspendCount;
} else {
sem_t suspendCount;
}
extern (C) void thread_suspendHandler( int sig )
in
{
assert( sig == SIGUSR1 );
}
body
{
version( LDC)
{
version(X86)
{
uint eax,ecx,edx,ebx,ebp,esi,edi;
asm
{
mov eax[EBP], EAX ;
mov ecx[EBP], ECX ;
mov edx[EBP], EDX ;
mov ebx[EBP], EBX ;
mov ebp[EBP], EBP ;
mov esi[EBP], ESI ;
mov edi[EBP], EDI ;
}
}
else version (X86_64)
{
ulong rax,rbx,rcx,rdx,rbp,rsi,rdi,rsp,r8,r9,r10,r11,r12,r13,r14,r15;
asm
{
movq rax[RBP], RAX ;
movq rbx[RBP], RBX ;
movq rcx[RBP], RCX ;
movq rdx[RBP], RDX ;
movq rbp[RBP], RBP ;
movq rsi[RBP], RSI ;
movq rdi[RBP], RDI ;
movq rsp[RBP], RSP ;
movq r8 [RBP], R8 ;
movq r9 [RBP], R9 ;
movq r10[RBP], R10 ;
movq r11[RBP], R11 ;
movq r12[RBP], R12 ;
movq r13[RBP], R13 ;
movq r14[RBP], R14 ;
movq r15[RBP], R15 ;
}
}
else
{
static assert( false, "Architecture not supported." );
}
}
else version( D_InlineAsm_X86 )
{
asm
{
pushad;
}
}
else version( GNU )
{
__builtin_unwind_init();
}
else version ( D_InlineAsm_X86_64 )
{
asm
{
// Not sure what goes here, pushad is invalid in 64 bit code
push RAX ;
push RBX ;
push RCX ;
push RDX ;
push RSI ;
push RDI ;
push RBP ;
push R8 ;
push R9 ;
push R10 ;
push R11 ;
push R12 ;
push R13 ;
push R14 ;
push R15 ;
push RAX ; // 16 byte align the stack
}
}
else
{
static assert( false, "Architecture not supported." );
}
// NOTE: Since registers are being pushed and popped from the stack,
// any other stack data used by this function should be gone
// before the stack cleanup code is called below.
{
Thread obj = Thread.getThis();
// NOTE: The thread reference returned by getThis is set within
// the thread startup code, so it is possible that this
// handler may be called before the reference is set. In
// this case it is safe to simply suspend and not worry
// about the stack pointers as the thread will not have
// any references to GC-managed data.
if( obj && !obj.m_lock )
{
obj.m_curr.tstack = getStackTop();
}
sigset_t sigres = void;
int status;
status = sigfillset( &sigres );
assert( status == 0 );
status = sigdelset( &sigres, SIGUSR2 );
assert( status == 0 );
version (AtomicSuspendCount){
auto oldV=flagAdd(suspendCount,1);
} else {
status = sem_post( &suspendCount );
assert( status == 0 );
}
// here one could do some work (like scan the current stack in this thread...)
sigsuspend( &sigres );
if( obj && !obj.m_lock )
{
obj.m_curr.tstack = obj.m_curr.bstack;
}
}
version( LDC)
{
// nothing to pop
}
else version( D_InlineAsm_X86 )
{
asm
{
popad;
}
}
else version( GNU )
{
// registers will be popped automatically
}
else version ( D_InlineAsm_X86_64 )
{
asm
{
// Not sure what goes here, popad is invalid in 64 bit code
pop RAX ; // 16 byte align the stack
pop R15 ;
pop R14 ;
pop R13 ;
pop R12 ;
pop R11 ;
pop R10 ;
pop R9 ;
pop R8 ;
pop RBP ;
pop RDI ;
pop RSI ;
pop RDX ;
pop RCX ;
pop RBX ;
pop RAX ;
}
}
else
{
static assert( false, "Architecture not supported." );
}
}
extern (C) void thread_resumeHandler( int sig )
in
{
assert( sig == SIGUSR2 );
}
body
{
int status;
version (AtomicSuspendCount){
auto oldV=flagAdd(suspendCount,-1);
} else {
status = sem_post( &suspendCount );
}
assert( status == 0 );
}
}
alias void function(int) sHandler;
sHandler _thread_abortHandler=null;
extern (C) void thread_abortHandler( int sig ){
if (_thread_abortHandler!is null){
_thread_abortHandler(sig);
} else {
exit(-1);
}
}
extern (C) void setthread_abortHandler(sHandler f){
_thread_abortHandler=f;
}
}
else
{
// NOTE: This is the only place threading versions are checked. If a new
// version is added, the module code will need to be searched for
// places where version-specific code may be required. This can be
// easily accomlished by searching for 'Windows' or 'Posix'.
static assert( false, "Unknown threading implementation." );
}
////////////////////////////////////////////////////////////////////////////////
// Thread
////////////////////////////////////////////////////////////////////////////////
/**
* This class encapsulates all threading functionality for the D
* programming language. As thread manipulation is a required facility
* for garbage collection, all user threads should derive from this
* class, and instances of this class should never be explicitly deleted.
* A new thread may be created using either derivation or composition, as
* in the following example.
*
* Example:
* -----------------------------------------------------------------------------
* class DerivedThread : Thread
* {
* this()
* {
* super( &run );
* }
*
* private :
* void run()
* {
* printf( "Derived thread running.\n" );
* }
* }
*
* void threadFunc()
* {
* printf( "Composed thread running.\n" );
* }
*
* // create instances of each type
* Thread derived = new DerivedThread();
* Thread composed = new Thread( &threadFunc );
*
* // start both threads
* derived.start();
* composed.start();
* -----------------------------------------------------------------------------
*/
class Thread
{
////////////////////////////////////////////////////////////////////////////
// Initialization
////////////////////////////////////////////////////////////////////////////
/**
* Initializes a thread object which is associated with a static
* D function.
*
* Params:
* fn = The thread function.
* sz = The stack size for this thread.
*
* In:
* fn must not be null.
*/
this( void function() fn, size_t sz = 0 )
in
{
assert( fn );
}
body
{
m_fn = fn;
m_sz = sz;
m_call = Call.FN;
m_curr = &m_main;
}
/**
* Initializes a thread object which is associated with a dynamic
* D function.
*
* Params:
* dg = The thread function.
* sz = The stack size for this thread.
*
* In:
* dg must not be null.
*/
this( void delegate() dg, size_t sz = 0 )
in
{
assert( dg );
}
body
{
m_dg = dg;
m_sz = sz;
m_call = Call.DG;
m_curr = &m_main;
}
/**
* Cleans up any remaining resources used by this object.
*/
~this()
{
if( m_addr == m_addr.init )
{
return;
}
version( Win32 )
{
m_addr = m_addr.init;
CloseHandle( m_hndl );
m_hndl = m_hndl.init;
}
else version( Posix )
{
pthread_detach( m_addr );
m_addr = m_addr.init;
}
}
////////////////////////////////////////////////////////////////////////////
// General Actions
////////////////////////////////////////////////////////////////////////////
/**
* Starts the thread and invokes the function or delegate passed upon
* construction.
*
* In:
* This routine may only be called once per thread instance.
*
* Throws:
* ThreadException if the thread fails to start.
*/
final void start()
in
{
assert( !next && !prev );
}
body
{
version( Win32 ) {} else
version( Posix )
{
pthread_attr_t attr;
if( pthread_attr_init( &attr ) )
throw new ThreadException( "Error initializing thread attributes" );
if( m_sz && pthread_attr_setstacksize( &attr, m_sz ) )
throw new ThreadException( "Error initializing thread stack size" );
if( pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_JOINABLE ) )
throw new ThreadException( "Error setting thread joinable" );
}
// NOTE: This operation needs to be synchronized to avoid a race
// condition with the GC. Without this lock, the thread
// could start and allocate memory before being added to
// the global thread list, preventing it from being scanned
// and causing memory to be collected that is still in use.
synchronized( slock )
{
volatile multiThreadedFlag = true;
version( Win32 )
{
m_hndl = cast(HANDLE) _beginthreadex( null, m_sz, &thread_entryPoint, cast(void*) this, 0, &m_addr );
if( cast(size_t) m_hndl == 0 )
throw new ThreadException( "Error creating thread" );
}
else version( Posix )
{
m_isRunning = true;
scope( failure ) m_isRunning = false;
if( pthread_create( &m_addr, &attr, &thread_entryPoint, cast(void*) this ) != 0 )
throw new ThreadException( "Error creating thread" );
}
add( this );
}
}
/**
* Waits for this thread to complete. If the thread terminated as the
* result of an unhandled exception, this exception will be rethrown.
*
* Params:
* rethrow = Rethrow any unhandled exception which may have caused this
* thread to terminate.
*
* Throws:
* ThreadException if the operation fails.
* Any exception not handled by the joined thread.
*
* Returns:
* Any exception not handled by this thread if rethrow = false, null
* otherwise.
*/
final Object join( bool rethrow = true )
{
if(!isRunning())
return null;
version( Win32 )
{
if( WaitForSingleObject( m_hndl, INFINITE ) != WAIT_OBJECT_0 )
throw new ThreadException( "Unable to join thread" );
// NOTE: m_addr must be cleared before m_hndl is closed to avoid
// a race condition with isRunning. The operation is labeled
// volatile to prevent compiler reordering.
volatile m_addr = m_addr.init;
CloseHandle( m_hndl );
m_hndl = m_hndl.init;
}
else version( Posix )
{
if( pthread_join( m_addr, null ) != 0 )
throw new ThreadException( "Unable to join thread" );
// NOTE: pthread_join acts as a substitute for pthread_detach,
// which is normally called by the dtor. Setting m_addr
// to zero ensures that pthread_detach will not be called
// on object destruction.
volatile m_addr = m_addr.init;
}
if( m_unhandled )
{
if( rethrow )
throw m_unhandled;
return m_unhandled;
}
return null;
}
////////////////////////////////////////////////////////////////////////////
// General Properties
////////////////////////////////////////////////////////////////////////////
/**
* Gets the user-readable label for this thread.
*
* Returns:
* The name of this thread.
*/
const final const(char[]) name()
{
synchronized( this )
{
return m_name;
}
}
/**
* Sets the user-readable label for this thread.
*
* Params:
* val = The new name of this thread.
*/
final void name( const(char[]) val )
{
synchronized( this )
{
m_name = val.dup;
}
}
/**
* Gets the daemon status for this thread. While the runtime will wait for
* all normal threads to complete before tearing down the process, daemon
* threads are effectively ignored and thus will not prevent the process
* from terminating. In effect, daemon threads will be terminated
* automatically by the OS when the process exits.
*
* Returns:
* true if this is a daemon thread.
*/
const final bool isDaemon()
{
synchronized( this )
{
return m_isDaemon;
}
}
/**
* Sets the daemon status for this thread. While the runtime will wait for
* all normal threads to complete before tearing down the process, daemon
* threads are effectively ignored and thus will not prevent the process
* from terminating. In effect, daemon threads will be terminated
* automatically by the OS when the process exits.
*
* Params:
* val = The new daemon status for this thread.
*/
final void isDaemon( bool val )
{
synchronized( this )
{
m_isDaemon = val;
}
}
/**
* Tests whether this thread is running.
*
* Returns:
* true if the thread is running, false if not.
*/
const final bool isRunning()
{
if( m_addr == m_addr.init )
{
return false;
}
version( Win32 )
{
uint ecode = 0;
GetExitCodeThread( m_hndl, &ecode );
return ecode == STILL_ACTIVE;
}
else version( Posix )
{
// NOTE: It should be safe to access this value without
// memory barriers because word-tearing and such
// really isn't an issue for boolean values.
return m_isRunning;
}
}
////////////////////////////////////////////////////////////////////////////
// Thread Priority Actions
////////////////////////////////////////////////////////////////////////////
/**
* The minimum scheduling priority that may be set for a thread. On
* systems where multiple scheduling policies are defined, this value
* represents the minimum valid priority for the scheduling policy of
* the process.
*/
static __gshared const int PRIORITY_MIN;
/**
* The maximum scheduling priority that may be set for a thread. On
* systems where multiple scheduling policies are defined, this value
* represents the minimum valid priority for the scheduling policy of
* the process.
*/
static __gshared const int PRIORITY_MAX;
/**
* Gets the scheduling priority for the associated thread.
*
* Returns:
* The scheduling priority of this thread.
*/
const final int priority()
{
version( Win32 )
{
return GetThreadPriority( m_hndl );
}
else version( Posix )
{
int policy;
sched_param param;
if( pthread_getschedparam( m_addr, &policy, ¶m ) )
throw new ThreadException( "Unable to get thread priority" );
return param.sched_priority;
}
}
/**
* Sets the scheduling priority for the associated thread.
*
* Params:
* val = The new scheduling priority of this thread.
*/
final void priority( int val )
{
version( Win32 )
{
if( !SetThreadPriority( m_hndl, val ) )
throw new ThreadException( "Unable to set thread priority" );
}
else version( Posix )
{
// NOTE: pthread_setschedprio is not implemented on linux, so use
// the more complicated get/set sequence below.
//if( pthread_setschedprio( m_addr, val ) )
// throw new ThreadException( "Unable to set thread priority" );
int policy;
sched_param param;
if( pthread_getschedparam( m_addr, &policy, ¶m ) )
throw new ThreadException( "Unable to set thread priority" );
param.sched_priority = val;
if( pthread_setschedparam( m_addr, policy, ¶m ) )
throw new ThreadException( "Unable to set thread priority" );
}
}
////////////////////////////////////////////////////////////////////////////
// Actions on Calling Thread
////////////////////////////////////////////////////////////////////////////
/**
* Suspends the calling thread for at least the supplied time, up to a
* maximum of (uint.max - 1) milliseconds.
*
* Params:
* period = The minimum duration the calling thread should be suspended,
* in seconds. Sub-second durations are specified as fractional
* values.
*
* In:
* period must be less than (uint.max - 1) milliseconds.
*
* Example:
* -------------------------------------------------------------------------
* Thread.sleep( 0.05 ); // sleep for 50 milliseconds
* Thread.sleep( 5 ); // sleep for 5 seconds
* -------------------------------------------------------------------------
*/
static void sleep( double period )
in
{
// NOTE: The fractional value added to period is to correct fp error.
assert( period * 1000 + 0.1 < uint.max - 1 );
}
body
{
version( Win32 )
{
Sleep( cast(uint)( period * 1000 + 0.1 ) );
}
else version( Posix )
{
timespec tin = void;
timespec tout = void;
period += 0.000_000_000_1;
if( tin.tv_sec.max < period )
{
tin.tv_sec = tin.tv_sec.max;
tin.tv_nsec = 0;
}
else
{
tin.tv_sec = cast(typeof(tin.tv_sec)) period;
tin.tv_nsec = cast(typeof(tin.tv_nsec)) ((period % 1.0) * 1_000_000_000);
}
while( true )
{
if( !nanosleep( &tin, &tout ) )
return;
if( getErrno() != EINTR )
throw new ThreadException( "Unable to sleep for specified duration" );
tin = tout;
}
}
}
/+
/**
* Suspends the calling thread for at least the supplied time, up to a
* maximum of (uint.max - 1) milliseconds.
*
* Params:
* period = The minimum duration the calling thread should be suspended.
*
* In:
* period must be less than (uint.max - 1) milliseconds.
*
* Example:
* -------------------------------------------------------------------------
* Thread.sleep( TimeSpan.milliseconds( 50 ) ); // sleep for 50 milliseconds
* Thread.sleep( TimeSpan.seconds( 5 ) ); // sleep for 5 seconds
* -------------------------------------------------------------------------
*/
static void sleep( TimeSpan period )
in
{
assert( period.milliseconds < uint.max - 1 );
}
body
{
version( Win32 )
{
Sleep( cast(uint)( period.milliseconds ) );
}
else version( Posix )
{
timespec tin = void;
timespec tout = void;
if( tin.tv_sec.max < period.seconds )
{
tin.tv_sec = tin.tv_sec.max;
tin.tv_nsec = 0;
}
else
{
tin.tv_sec = cast(typeof(tin.tv_sec)) period.seconds;
tin.tv_nsec = cast(typeof(tin.tv_nsec)) period.nanoseconds % 1_000_000_000;
}
while( true )
{
if( !nanosleep( &tin, &tout ) )
return;
if( getErrno() != EINTR )
throw new ThreadException( "Unable to sleep for specified duration" );
tin = tout;
}
}
}
/**
* Suspends the calling thread for at least the supplied time, up to a
* maximum of (uint.max - 1) milliseconds.
*
* Params:
* period = The minimum duration the calling thread should be suspended,
* in seconds. Sub-second durations are specified as fractional
* values. Please note that because period is a floating-point
* number, some accuracy may be lost for certain intervals. For
* this reason, the TimeSpan overload is preferred in instances
* where an exact interval is required.
*
* In:
* period must be less than (uint.max - 1) milliseconds.
*
* Example:
* -------------------------------------------------------------------------
* Thread.sleep( 0.05 ); // sleep for 50 milliseconds
* Thread.sleep( 5 ); // sleep for 5 seconds
* -------------------------------------------------------------------------
*/
static void sleep( double period )
{
sleep( TimeSpan.interval( period ) );
}
+/
/**
* Forces a context switch to occur away from the calling thread.
*/
static void yield()
{
version( Win32 )
{
// NOTE: Sleep(1) is necessary because Sleep(0) does not give
// lower priority threads any timeslice, so looping on
// Sleep(0) could be resource-intensive in some cases.
Sleep( 1 );
}
else version( Posix )
{
sched_yield();
}
}
////////////////////////////////////////////////////////////////////////////
// Thread Accessors
////////////////////////////////////////////////////////////////////////////
/**
* Provides a reference to the calling thread.
*
* Returns:
* The thread object representing the calling thread. The result of
* deleting this object is undefined.
*/
static Thread getThis()
{
// NOTE: This function may not be called until thread_init has
// completed. See thread_suspendAll for more information
// on why this might occur.
version( Win32 )
{
return cast(Thread) TlsGetValue( sm_this );
}
else version( Posix )
{
return cast(Thread) pthread_getspecific( sm_this );
}
}
/**
* Provides a list of all threads currently being tracked by the system.
*
* Returns:
* An array containing references to all threads currently being
* tracked by the system. The result of deleting any contained
* objects is undefined.
*/
static Thread[] getAll()
{
Thread[] buf;
while(1){
if (buf) delete buf;
buf = new Thread[sm_tlen];
synchronized( slock )
{
size_t pos = 0;
if (buf.length<sm_tlen) {
continue;
} else {
buf.length=sm_tlen;
}
foreach( Thread t; Thread )
{
buf[pos++] = t;
}
return buf;
}
}
}
/**
* Operates on all threads currently being tracked by the system. The
* result of deleting any Thread object is undefined.
*
* Params:
* dg = The supplied code as a delegate.
*
* Returns:
* Zero if all elemented are visited, nonzero if not.
*/
static int opApply(scope int delegate( ref Thread ) dg )
{
synchronized( slock )
{
int ret = 0;
for( Thread t = sm_tbeg; t; t = t.next )
{
ret = dg( t );
if( ret )
break;
}
return ret;
}
}
////////////////////////////////////////////////////////////////////////////
// Local Storage Actions
////////////////////////////////////////////////////////////////////////////
/**
* Indicates the number of local storage pointers available at program
* startup. It is recommended that this number be at least 64.
*/
static const uint LOCAL_MAX = 64;
/**
* Reserves a local storage pointer for use and initializes this location
* to null for all running threads.
*
* Returns:
* A key representing the array offset of this memory location.
*/
static uint createLocal()
{
synchronized( slock )
{
foreach( uint key, ref bool set; sm_local )
{
if( !set )
{
//foreach( Thread t; sm_tbeg ) Bug in GDC 0.24 SVN (r139)
for( Thread t = sm_tbeg; t; t = t.next )
{
t.m_local[key] = null;
}
set = true;
return key;
}
}
throw new ThreadException( "No more local storage slots available" );
}
}
/**
* Marks the supplied key as available and sets the associated location
* to null for all running threads. It is assumed that any key passed
* to this function is valid. The result of calling this function for
* a key which is still in use is undefined.
*
* Params:
* key = The key to delete.
*/
static void deleteLocal( uint key )
{
synchronized( slock )
{
sm_local[key] = false;
// foreach( Thread t; sm_tbeg ) Bug in GDC 0.24 SVN (r139)
for( Thread t = sm_tbeg; t; t = t.next )
{
t.m_local[key] = null;
}
}
}
/**
* Loads the value stored at key within a thread-local static array. It is
* assumed that any key passed to this function is valid.
*
* Params:
* key = The location which holds the desired data.
*
* Returns:
* The data associated with the supplied key.
*/
static void* getLocal( uint key )
{
return getThis().m_local[key];
}
/**
* Stores the supplied value at key within a thread-local static array. It
* is assumed that any key passed to this function is valid.
*
* Params:
* key = The location to store the supplied data.
* val = The data to store.
*
* Returns:
* A copy of the data which has just been stored.
*/
static void* setLocal( uint key, void* val )
{
return getThis().m_local[key] = val;
}
////////////////////////////////////////////////////////////////////////////
// Static Initalizer
////////////////////////////////////////////////////////////////////////////
/**
* This initializer is used to set thread constants. All functional
* initialization occurs within thread_init().
*/
static this()
{
version( Win32 )
{
PRIORITY_MIN = -15;
PRIORITY_MAX = 15;
}
else version( Posix )
{
int policy;
sched_param param;
pthread_t self = pthread_self();
int status = pthread_getschedparam( self, &policy, ¶m );
assert( status == 0 );
PRIORITY_MIN = sched_get_priority_min( policy );
assert( PRIORITY_MIN != -1 );
PRIORITY_MAX = sched_get_priority_max( policy );
assert( PRIORITY_MAX != -1 );
}
}
private:
//
// Initializes a thread object which has no associated executable function.
// This is used for the main thread initialized in thread_init().
//
this()
{
m_call = Call.NO;
m_curr = &m_main;
}
//
// Thread entry point. Invokes the function or delegate passed on
// construction (if any).
//
final void run()
{
switch( m_call )
{
case Call.FN:
m_fn();
break;
case Call.DG:
m_dg();
break;
default:
break;
}
}
private:
//
// The type of routine passed on thread construction.
//
enum Call
{
NO,
FN,
DG
}
//
// Standard types
//
version( Win32 )
{
alias uint TLSKey;
alias uint ThreadAddr;
}
else version( Posix )
{
alias pthread_key_t TLSKey;
alias pthread_t ThreadAddr;
}
//
// Local storage
//
static __gshared bool[LOCAL_MAX] sm_local;
static __gshared TLSKey sm_this;
void*[LOCAL_MAX] m_local;
//
// Standard thread data
//
version( Win32 )
{
HANDLE m_hndl;
}
public ThreadAddr m_addr;
Call m_call;
const(char)[] m_name;
union
{
void function() m_fn;
void delegate() m_dg;
}
size_t m_sz;
version( Posix )
{
bool m_isRunning;
}
bool m_isDaemon;
public Throwable m_unhandled;
private:
////////////////////////////////////////////////////////////////////////////
// Storage of Active Thread
////////////////////////////////////////////////////////////////////////////
//
// Sets a thread-local reference to the current thread object.
//
static void setThis( Thread t )
{
version( Win32 )
{
TlsSetValue( sm_this, cast(void*) t );
}
else version( Posix )
{
pthread_setspecific( sm_this, cast(void*) t );
}
}
private:
////////////////////////////////////////////////////////////////////////////
// Thread Context and GC Scanning Support
////////////////////////////////////////////////////////////////////////////
final void pushContext( Context* c )
in
{
assert( !c.within );
}
body
{
c.within = m_curr;
m_curr = c;
}
final void popContext()
in
{
assert( m_curr && m_curr.within );
}
body
{
Context* c = m_curr;
m_curr = c.within;
c.within = null;
}
public final Context* topContext()
in
{
assert( m_curr );
}
body
{
return m_curr;
}
public static struct Context
{
void* bstack,
tstack;
Context* within;
Context* next,
prev;
}
Context m_main;
Context* m_curr;
bool m_lock;
version( Win32 )
{
uint[8] m_reg; // edi,esi,ebp,esp,ebx,edx,ecx,eax
}
private:
////////////////////////////////////////////////////////////////////////////
// GC Scanning Support
////////////////////////////////////////////////////////////////////////////
// NOTE: The GC scanning process works like so:
//
// 1. Suspend all threads.
// 2. Scan the stacks of all suspended threads for roots.
// 3. Resume all threads.
//
// Step 1 and 3 require a list of all threads in the system, while
// step 2 requires a list of all thread stacks (each represented by
// a Context struct). Traditionally, there was one stack per thread
// and the Context structs were not necessary. However, Fibers have
// changed things so that each thread has its own 'main' stack plus
// an arbitrary number of nested stacks (normally referenced via
// m_curr). Also, there may be 'free-floating' stacks in the system,
// which are Fibers that are not currently executing on any specific
// thread but are still being processed and still contain valid
// roots.
//
// To support all of this, the Context struct has been created to
// represent a stack range, and a global list of Context structs has
// been added to enable scanning of these stack ranges. The lifetime
// (and presence in the Context list) of a thread's 'main' stack will
// be equivalent to the thread's lifetime. So the Ccontext will be
// added to the list on thread entry, and removed from the list on
// thread exit (which is essentially the same as the presence of a
// Thread object in its own global list). The lifetime of a Fiber's
// context, however, will be tied to the lifetime of the Fiber object
// itself, and Fibers are expected to add/remove their Context struct
// on construction/deletion.
//
// All use of the global lists should synchronize on this lock.
//
static Object slock()
{
return Thread.classinfo;
}
static __gshared Context* sm_cbeg;
static __gshared size_t sm_clen;
static __gshared Thread sm_tbeg;
static __gshared size_t sm_tlen;
//
// Used for ordering threads in the global thread list.
//
Thread prev;
Thread next;
////////////////////////////////////////////////////////////////////////////
// Global Context List Operations
////////////////////////////////////////////////////////////////////////////
//
// Add a context to the global context list.
//
static void add( Context* c )
in
{
assert( c );
assert( !c.next && !c.prev );
}
body
{
synchronized( slock )
{
if( sm_cbeg )
{
c.next = sm_cbeg;
sm_cbeg.prev = c;
}
sm_cbeg = c;
++sm_clen;
}
}
//
// Remove a context from the global context list.
//
static void remove( Context* c )
in
{
assert( c );
assert( c.next || c.prev );
}
body
{
synchronized( slock )
{
if( c.prev )
c.prev.next = c.next;
if( c.next )
c.next.prev = c.prev;
if( sm_cbeg == c )
sm_cbeg = c.next;
--sm_clen;
}
// NOTE: Don't null out c.next or c.prev because opApply currently
// follows c.next after removing a node. This could be easily
// addressed by simply returning the next node from this function,
// however, a context should never be re-added to the list anyway
// and having next and prev be non-null is a good way to
// ensure that.
}
////////////////////////////////////////////////////////////////////////////
// Global Thread List Operations
////////////////////////////////////////////////////////////////////////////
//
// Add a thread to the global thread list.
//
static void add( Thread t )
in
{
assert( t );
assert( !t.next && !t.prev );
assert( t.isRunning );
}
body
{
synchronized( slock )
{
if( sm_tbeg )
{
t.next = sm_tbeg;
sm_tbeg.prev = t;
}
sm_tbeg = t;
++sm_tlen;
}
}
//
// Remove a thread from the global thread list.
//
static void remove( Thread t )
in
{
assert( t );
assert( t.next || t.prev );
}
body
{
synchronized( slock )
{
// NOTE: When a thread is removed from the global thread list its
// main context is invalid and should be removed as well.
// It is possible that t.m_curr could reference more
// than just the main context if the thread exited abnormally
// (if it was terminated), but we must assume that the user
// retains a reference to them and that they may be re-used
// elsewhere. Therefore, it is the responsibility of any
// object that creates contexts to clean them up properly
// when it is done with them.
remove( &t.m_main );
if( t.prev )
t.prev.next = t.next;
if( t.next )
t.next.prev = t.prev;
if( sm_tbeg == t )
sm_tbeg = t.next;
--sm_tlen;
}
// NOTE: Don't null out t.next or t.prev because opApply currently
// follows t.next after removing a node. This could be easily
// addressed by simply returning the next node from this function,
// however, a thread should never be re-added to the list anyway
// and having next and prev be non-null is a good way to
// ensure that.
}
}
////////////////////////////////////////////////////////////////////////////////
// GC Support Routines
////////////////////////////////////////////////////////////////////////////////
/**
* Initializes the thread module. This function must be called by the
* garbage collector on startup and before any other thread routines
* are called.
*/
extern (C) void thread_init()
{
// NOTE: If thread_init itself performs any allocations then the thread
// routines reserved for garbage collector use may be called while
// thread_init is being processed. However, since no memory should
// exist to be scanned at this point, it is sufficient for these
// functions to detect the condition and return immediately.
version( Win32 )
{
Thread.sm_this = TlsAlloc();
assert( Thread.sm_this != TLS_OUT_OF_INDEXES );
Fiber.sm_this = TlsAlloc();
assert( Thread.sm_this != TLS_OUT_OF_INDEXES );
}
else version( Posix )
{
int status;
sigaction_t sigusr1 = void;
sigaction_t sigusr2 = void;
sigaction_t sigabrt = void;
// This is a quick way to zero-initialize the structs without using
// memset or creating a link dependency on their static initializer.
(cast(byte*) &sigusr1)[0 .. sigaction_t.sizeof] = 0;
(cast(byte*) &sigusr2)[0 .. sigaction_t.sizeof] = 0;
(cast(byte*) &sigabrt)[0 .. sigaction_t.sizeof] = 0;
// NOTE: SA_RESTART indicates that system calls should restart if they
// are interrupted by a signal, but this is not available on all
// Posix systems, even those that support multithreading.
static if( is( typeof( SA_RESTART ) ) )
sigusr1.sa_flags = SA_RESTART;
else
sigusr1.sa_flags = 0;
sigusr1.sa_handler = &thread_suspendHandler;
// NOTE: We want to ignore all signals while in this handler, so fill
// sa_mask to indicate this.
status = sigfillset( &sigusr1.sa_mask );
assert( status == 0 );
status = sigdelset( &sigusr1.sa_mask , SIGABRT);
assert( status == 0 );
// NOTE: Since SIGUSR2 should only be issued for threads within the
// suspend handler, we don't want this signal to trigger a
// restart.
sigusr2.sa_flags = 0;
sigusr2.sa_handler = &thread_resumeHandler;
// NOTE: We want to ignore all signals while in this handler, so fill
// sa_mask to indicate this.
status = sigfillset( &sigusr2.sa_mask );
assert( status == 0 );
status = sigdelset( &sigusr2.sa_mask , SIGABRT);
assert( status == 0 );
status = sigaction( SIGUSR1, &sigusr1, null );
assert( status == 0 );
status = sigaction( SIGUSR2, &sigusr2, null );
assert( status == 0 );
// NOTE: SA_RESTART indicates that system calls should restart if they
// are interrupted by a signal, but this is not available on all
// Posix systems, even those that support multithreading.
static if( is( typeof( SA_RESTART ) ) )
sigabrt.sa_flags = SA_RESTART;
else
sigabrt.sa_flags = 0;
sigabrt.sa_handler = &thread_abortHandler;
// NOTE: We want to ignore all signals while in this handler, so fill
// sa_mask to indicate this.
status = sigfillset( &sigabrt.sa_mask );
assert( status == 0 );
status = sigaction( SIGABRT, &sigabrt, null );
assert( status == 0 );
version(AtomicSuspendCount){
suspendCount=0;
} else {
status = sem_init( &suspendCount, 0, 0 );
}
assert( status == 0 );
status = pthread_key_create( &Thread.sm_this, null );
assert( status == 0 );
status = pthread_key_create( &Fiber.sm_this, null );
assert( status == 0 );
}
thread_attachThis();
}
/**
* Registers the calling thread for use with Tango. If this routine is called
* for a thread which is already registered, the result is undefined.
*/
extern (C) void thread_attachThis()
{
version( Win32 )
{
Thread thisThread = new Thread();
Thread.Context* thisContext = &thisThread.m_main;
assert( thisContext == thisThread.m_curr );
thisThread.m_addr = GetCurrentThreadId();
thisThread.m_hndl = GetCurrentThreadHandle();
thisContext.bstack = getStackBottom();
thisContext.tstack = thisContext.bstack;
thisThread.m_isDaemon = true;
Thread.setThis( thisThread );
}
else version( Posix )
{
Thread thisThread = new Thread();
Thread.Context* thisContext = thisThread.m_curr;
assert( thisContext == &thisThread.m_main );
thisThread.m_addr = pthread_self();
thisContext.bstack = getStackBottom();
thisContext.tstack = thisContext.bstack;
thisThread.m_isRunning = true;
thisThread.m_isDaemon = true;
Thread.setThis( thisThread );
}
Thread.add( thisThread );
Thread.add( thisContext );
}
/**
* Deregisters the calling thread from use with Tango. If this routine is
* called for a thread which is already registered, the result is undefined.
*/
extern (C) void thread_detachThis()
{
Thread.remove( Thread.getThis() );
}
/**
* Joins all non-daemon threads that are currently running. This is done by
* performing successive scans through the thread list until a scan consists
* of only daemon threads.
*/
extern (C) void thread_joinAll()
{
while( true )
{
Thread nonDaemon = null;
foreach( t; Thread )
{
if( !t.isDaemon )
{
nonDaemon = t;
break;
}
}
if( nonDaemon is null )
return;
nonDaemon.join();
}
}
/**
* Performs intermediate shutdown of the thread module.
*/
static ~this()
{
// NOTE: The functionality related to garbage collection must be minimally
// operable after this dtor completes. Therefore, only minimal
// cleanup may occur.
for( Thread t = Thread.sm_tbeg; t; t = t.next )
{
if( !t.isRunning )
Thread.remove( t );
}
}
// Used for needLock below
private bool multiThreadedFlag = false;
/**
* This function is used to determine whether the the process is
* multi-threaded. Optimizations may only be performed on this
* value if the programmer can guarantee that no path from the
* enclosed code will start a thread.
*
* Returns:
* True if Thread.start() has been called in this process.
*/
extern (C) bool thread_needLock()
{
return multiThreadedFlag;
}
// Used for suspendAll/resumeAll below
private uint suspendDepth = 0;
/**
* Suspend all threads but the calling thread for "stop the world" garbage
* collection runs. This function may be called multiple times, and must
* be followed by a matching number of calls to thread_resumeAll before
* processing is resumed.
*
* Throws:
* ThreadException if the suspend operation fails for a running thread.
*/
extern (C) void thread_suspendAll()
{
int suspendedCount=0;
/**
* Suspend the specified thread and load stack and register information for
* use by thread_scanAll. If the supplied thread is the calling thread,
* stack and register information will be loaded but the thread will not
* be suspended. If the suspend operation fails and the thread is not
* running then it will be removed from the global thread list, otherwise
* an exception will be thrown.
*
* Params:
* t = The thread to suspend.
*
* Throws:
* ThreadException if the suspend operation fails for a running thread.
*/
void suspend( Thread t )
{
version( Win32 )
{
if( t.m_addr != GetCurrentThreadId() && SuspendThread( t.m_hndl ) == 0xFFFFFFFF )
{
if( !t.isRunning )
{
Thread.remove( t );
return;
}
throw new ThreadException( "Unable to suspend thread" );
}
CONTEXT context = void;
context.ContextFlags = CONTEXT_INTEGER | CONTEXT_CONTROL;
if( !GetThreadContext( t.m_hndl, &context ) )
throw new ThreadException( "Unable to load thread context" );
if( !t.m_lock )
t.m_curr.tstack = cast(void*) context.Esp;
// edi,esi,ebp,esp,ebx,edx,ecx,eax
t.m_reg[0] = context.Edi;
t.m_reg[1] = context.Esi;
t.m_reg[2] = context.Ebp;
t.m_reg[3] = context.Esp;
t.m_reg[4] = context.Ebx;
t.m_reg[5] = context.Edx;
t.m_reg[6] = context.Ecx;
t.m_reg[7] = context.Eax;
}
else version( Posix )
{
if( t.m_addr != pthread_self() )
{
if( pthread_kill( t.m_addr, SIGUSR1 ) != 0 )
{
if( !t.isRunning )
{
Thread.remove( t );
return;
}
throw new ThreadException( "Unable to suspend thread" );
}
version (AtomicSuspendCount){
++suspendedCount;
version(AtomicSuspendCount){
version(SuspendOneAtTime){ // when debugging suspending all threads at once might give "lost" signals
int icycle=0;
suspendLoop: while (flagGet(suspendCount)!=suspendedCount){
for (size_t i=1000;i!=0;--i){
if (flagGet(suspendCount)==suspendedCount) break suspendLoop;
if (++icycle==100_000){
debug(Thread)
printf("waited %d cycles for thread suspension, suspendCount=%d, should be %d\nAtomic ops do not work?\nContinuing wait...\n",icycle,suspendCount,suspendedCount);
}
Thread.yield();
}
Thread.sleep(0.0001);
}
}
}
} else {
sem_wait( &suspendCount );
// shouldn't the return be checked and maybe a loop added for further interrupts
// as in Semaphore.d ?
}
}
else if( !t.m_lock )
{
t.m_curr.tstack = getStackTop();
}
}
}
// NOTE: We've got an odd chicken & egg problem here, because while the GC
// is required to call thread_init before calling any other thread
// routines, thread_init may allocate memory which could in turn
// trigger a collection. Thus, thread_suspendAll, thread_scanAll,
// and thread_resumeAll must be callable before thread_init completes,
// with the assumption that no other GC memory has yet been allocated
// by the system, and thus there is no risk of losing data if the
// global thread list is empty. The check of Thread.sm_tbeg
// below is done to ensure thread_init has completed, and therefore
// that calling Thread.getThis will not result in an error. For the
// short time when Thread.sm_tbeg is null, there is no reason
// not to simply call the multithreaded code below, with the
// expectation that the foreach loop will never be entered.
if( !multiThreadedFlag && Thread.sm_tbeg )
{
if( ++suspendDepth == 1 ) {
suspend( Thread.getThis() );
}
return;
}
_d_monitorenter(Thread.slock);
{
if( ++suspendDepth > 1 )
return;
// NOTE: I'd really prefer not to check isRunning within this loop but
// not doing so could be problematic if threads are termianted
// abnormally and a new thread is created with the same thread
// address before the next GC run. This situation might cause
// the same thread to be suspended twice, which would likely
// cause the second suspend to fail, the garbage collection to
// abort, and Bad Things to occur.
for( Thread t = Thread.sm_tbeg; t; t = t.next )
{
if( t.isRunning ){
suspend( t );
} else
Thread.remove( t );
}
version( Posix )
{
version(AtomicSuspendCount){
int icycle=0;
suspendLoop2: while (flagGet(suspendCount)!=suspendedCount){
for (size_t i=1000;i!=0;--i){
if (flagGet(suspendCount)==suspendedCount) break suspendLoop2;
if (++icycle==1000_000){
debug(Thread)
printf("waited %d cycles for thread suspension, suspendCount=%d, should be %d\nAtomic ops do not work?\nContinuing wait...\n",icycle,suspendCount,suspendedCount);
}
Thread.yield();
}
Thread.sleep(0.0001);
}
}
}
}
}
/**
* Resume all threads but the calling thread for "stop the world" garbage
* collection runs. This function must be called once for each preceding
* call to thread_suspendAll before the threads are actually resumed.
*
* In:
* This routine must be preceded by a call to thread_suspendAll.
*
* Throws:
* ThreadException if the resume operation fails for a running thread.
*/
extern (C) void thread_resumeAll()
in
{
assert( suspendDepth > 0 );
}
body
{
version(AtomicSuspendCount) version(SuspendOneAtTime) auto suspendedCount=flagGet(suspendCount);
/**
* Resume the specified thread and unload stack and register information.
* If the supplied thread is the calling thread, stack and register
* information will be unloaded but the thread will not be resumed. If
* the resume operation fails and the thread is not running then it will
* be removed from the global thread list, otherwise an exception will be
* thrown.
*
* Params:
* t = The thread to resume.
*
* Throws:
* ThreadException if the resume fails for a running thread.
*/
void resume( Thread t )
{
version( Win32 )
{
if( t.m_addr != GetCurrentThreadId() && ResumeThread( t.m_hndl ) == 0xFFFFFFFF )
{
if( !t.isRunning )
{
Thread.remove( t );
return;
}
throw new ThreadException( "Unable to resume thread" );
}
if( !t.m_lock )
t.m_curr.tstack = t.m_curr.bstack;
t.m_reg[0 .. $] = 0;
}
else version( Posix )
{
if( t.m_addr != pthread_self() )
{
if( pthread_kill( t.m_addr, SIGUSR2 ) != 0 )
{
if( !t.isRunning )
{
Thread.remove( t );
return;
}
throw new ThreadException( "Unable to resume thread" );
}
version (AtomicSuspendCount){
version(SuspendOneAtTime){ // when debugging suspending all threads at once might give "lost" signals
--suspendedCount;
int icycle=0;
recoverLoop: while(flagGet(suspendCount)>suspendedCount){
for (size_t i=1000;i!=0;--i){
if (flagGet(suspendCount)==suspendedCount) break recoverLoop;
if (++icycle==100_000){
debug(Thread)
printf("waited %d cycles for thread recover, suspendCount=%d, should be %d\nAtomic ops do not work?\nContinuing wait...\n",icycle,suspendCount,suspendedCount);
}
Thread.yield();
}
Thread.sleep(0.0001);
}
}
} else {
sem_wait( &suspendCount );
// shouldn't the return be checked and maybe a loop added for further interrupts
// as in Semaphore.d ?
}
}
else if( !t.m_lock )
{
t.m_curr.tstack = t.m_curr.bstack;
}
}
}
// NOTE: See thread_suspendAll for the logic behind this.
if( !multiThreadedFlag && Thread.sm_tbeg )
{
if( --suspendDepth == 0 )
resume( Thread.getThis() );
return;
}
{
scope(exit) _d_monitorexit(Thread.slock);
if( --suspendDepth > 0 )
return;
{
for( Thread t = Thread.sm_tbeg; t; t = t.next )
{
resume( t );
}
version(AtomicSuspendCount){
int icycle=0;
recoverLoop2: while(flagGet(suspendCount)>0){
for (size_t i=1000;i!=0;--i){
Thread.yield();
if (flagGet(suspendCount)==0) break recoverLoop2;
if (++icycle==100_000){
debug(Thread)
printf("waited %d cycles for thread recovery, suspendCount=%d, should be %d\nAtomic ops do not work?\nContinuing wait...\n",icycle,suspendCount,0);
}
}
Thread.sleep(0.0001);
}
}
}
}
}
private alias scope void delegate( void*, void* ) scanAllThreadsFn;
/**
* The main entry point for garbage collection. The supplied delegate
* will be passed ranges representing both stack and register values.
*
* Params:
* scan = The scanner function. It should scan from p1 through p2 - 1.
* curStackTop = An optional pointer to the top of the calling thread's stack.
*
* In:
* This routine must be preceded by a call to thread_suspendAll.
*/
extern (C) void thread_scanAll( scanAllThreadsFn scan, void* curStackTop = null )
in
{
assert( suspendDepth > 0 );
}
body
{
Thread thisThread = null;
void* oldStackTop = null;
if( curStackTop && Thread.sm_tbeg )
{
thisThread = Thread.getThis();
if( thisThread && (!thisThread.m_lock) )
{
oldStackTop = thisThread.m_curr.tstack;
thisThread.m_curr.tstack = curStackTop;
}
}
scope( exit )
{
if( curStackTop && Thread.sm_tbeg )
{
if( thisThread && (!thisThread.m_lock) )
{
thisThread.m_curr.tstack = oldStackTop;
}
}
}
// NOTE: Synchronizing on Thread.slock is not needed because this
// function may only be called after all other threads have
// been suspended from within the same lock.
for( Thread.Context* c = Thread.sm_cbeg; c; c = c.next )
{
version( StackGrowsDown )
{
// NOTE: We can't index past the bottom of the stack
// so don't do the "+1" for StackGrowsDown.
if( c.tstack && c.tstack < c.bstack )
scan( c.tstack, c.bstack );
}
else
{
if( c.bstack && c.bstack < c.tstack )
scan( c.bstack, c.tstack + 1 );
}
}
version( Win32 )
{
for( Thread t = Thread.sm_tbeg; t; t = t.next )
{
scan( &t.m_reg[0], &t.m_reg[0] + t.m_reg.length );
}
}
}
////////////////////////////////////////////////////////////////////////////////
// Thread Local
////////////////////////////////////////////////////////////////////////////////
/**
* This class encapsulates the operations required to initialize, access, and
* destroy thread local data.
*/
class ThreadLocal( T )
{
////////////////////////////////////////////////////////////////////////////
// Initialization
////////////////////////////////////////////////////////////////////////////
/**
* Initializes thread local storage for the indicated value which will be
* initialized to def for all threads.
*
* Params:
* def = The default value to return if no value has been explicitly set.
*/
this( T def = T.init )
{
m_def = def;
m_key = Thread.createLocal();
}
~this()
{
Thread.deleteLocal( m_key );
}
////////////////////////////////////////////////////////////////////////////
// Accessors
////////////////////////////////////////////////////////////////////////////
/**
* Gets the value last set by the calling thread, or def if no such value
* has been set.
*
* Returns:
* The stored value or def if no value is stored.
*/
T val()
{
Wrap* wrap = cast(Wrap*) Thread.getLocal( m_key );
return wrap ? wrap.val : m_def;
}
/**
* Copies newval to a location specific to the calling thread, and returns
* newval.
*
* Params:
* newval = The value to set.
*
* Returns:
* The value passed to this function.
*/
T val( T newval )
{
Wrap* wrap = cast(Wrap*) Thread.getLocal( m_key );
if( wrap is null )
{
wrap = new Wrap;
Thread.setLocal( m_key, wrap );
}
wrap.val = newval;
return newval;
}
private:
//
// A wrapper for the stored data. This is needed for determining whether
// set has ever been called for this thread (and therefore whether the
// default value should be returned) and also to flatten the differences
// between data that is smaller and larger than (void*).sizeof. The
// obvious tradeoff here is an extra per-thread allocation for each
// ThreadLocal value as compared to calling the Thread routines directly.
//
struct Wrap
{
T val;
}
T m_def;
uint m_key;
}
////////////////////////////////////////////////////////////////////////////////
// Thread Group
////////////////////////////////////////////////////////////////////////////////
/**
* This class is intended to simplify certain common programming techniques.
*/
class ThreadGroup
{
/**
* Creates and starts a new Thread object that executes fn and adds it to
* the list of tracked threads.
*
* Params:
* fn = The thread function.
*
* Returns:
* A reference to the newly created thread.
*/
final Thread create( void function() fn )
{
Thread t = new Thread( fn );
t.start();
synchronized( this )
{
m_all[t] = t;
}
return t;
}
/**
* Creates and starts a new Thread object that executes dg and adds it to
* the list of tracked threads.
*
* Params:
* dg = The thread function.
*
* Returns:
* A reference to the newly created thread.
*/
final Thread create( void delegate() dg )
{
Thread t = new Thread( dg );
t.start();
synchronized( this )
{
m_all[t] = t;
}
return t;
}
/**
* Add t to the list of tracked threads if it is not already being tracked.
*
* Params:
* t = The thread to add.
*
* In:
* t must not be null.
*/
final void add( Thread t )
in
{
assert( t );
}
body
{
synchronized( this )
{
m_all[t] = t;
}
}
/**
* Removes t from the list of tracked threads. No operation will be
* performed if t is not currently being tracked by this object.
*
* Params:
* t = The thread to remove.
*
* In:
* t must not be null.
*/
final void remove( Thread t )
in
{
assert( t );
}
body
{
synchronized( this )
{
m_all.remove( t );
}
}
/**
* Operates on all threads currently tracked by this object.
*/
final int opApply( scope int delegate( ref Thread ) dg )
{
synchronized( this )
{
int ret = 0;
// NOTE: This loop relies on the knowledge that m_all uses the
// Thread object for both the key and the mapped value.
foreach( Thread t; m_all.keys )
{
ret = dg( t );
if( ret )
break;
}
return ret;
}
}
/**
* Iteratively joins all tracked threads. This function will block add,
* remove, and opApply until it completes.
*
* Params:
* rethrow = Rethrow any unhandled exception which may have caused the
* current thread to terminate.
*
* Throws:
* Any exception not handled by the joined threads.
*/
final void joinAll( bool rethrow = true )
{
synchronized( this )
{
// NOTE: This loop relies on the knowledge that m_all uses the
// Thread object for both the key and the mapped value.
foreach( Thread t; m_all.keys )
{
t.join( rethrow );
}
}
}
private:
Thread[Thread] m_all;
}
////////////////////////////////////////////////////////////////////////////////
// Fiber Platform Detection and Memory Allocation
////////////////////////////////////////////////////////////////////////////////
private
{
version( D_InlineAsm_X86 )
{
version( X86_64 )
{
// Shouldn't an x64 compiler be setting D_InlineAsm_X86_64 instead?
}
else
{
version( Win32 )
version = AsmX86_Win32;
else version( Posix )
version = AsmX86_Posix;
}
}
else version( D_InlineAsm_X86_64 )
{
version( Posix )
version = AsmX86_64_Posix;
}
else version( PPC )
{
version( Posix )
version = AsmPPC_Posix;
}
version( Posix )
{
import tango.stdc.posix.unistd; // for sysconf
import tango.stdc.posix.sys.mman; // for mmap
import tango.stdc.posix.stdlib; // for malloc, valloc, free
version( AsmX86_Win32 ) {} else
version( AsmX86_Posix ) {} else
version( AsmX86_64_Posix ) {} else
version( AsmPPC_Posix ) {} else
{
// NOTE: The ucontext implementation requires architecture specific
// data definitions to operate so testing for it must be done
// by checking for the existence of ucontext_t rather than by
// a version identifier. Please note that this is considered
// an obsolescent feature according to the POSIX spec, so a
// custom solution is still preferred.
import tango.stdc.posix.ucontext;
static assert( is( ucontext_t ), "Unknown fiber implementation");
}
}
const size_t PAGESIZE;
}
static this()
{
static if( is( typeof( GetSystemInfo ) ) )
{
SYSTEM_INFO info;
GetSystemInfo( &info );
PAGESIZE = info.dwPageSize;
assert( PAGESIZE < int.max );
}
else static if( is( typeof( sysconf ) ) &&
is( typeof( _SC_PAGESIZE ) ) )
{
PAGESIZE = cast(size_t) sysconf( _SC_PAGESIZE );
assert( PAGESIZE < int.max );
}
else
{
version( PPC )
PAGESIZE = 8192;
else
PAGESIZE = 4096;
}
}
////////////////////////////////////////////////////////////////////////////////
// Fiber Entry Point and Context Switch
////////////////////////////////////////////////////////////////////////////////
private
{
extern (C) void fiber_entryPoint()
{
Fiber obj = Fiber.getThis();
assert( obj );
assert( Thread.getThis().m_curr is obj.m_ctxt );
volatile Thread.getThis().m_lock = false;
obj.m_ctxt.tstack = obj.m_ctxt.bstack;
obj.m_state = Fiber.State.EXEC;
try
{
obj.run();
}
catch( Throwable o )
{
obj.m_unhandled = o;
}
static if( is( ucontext_t ) )
obj.m_ucur = &obj.m_utxt;
obj.m_state = Fiber.State.TERM;
obj.switchOut();
}
// NOTE: If AsmPPC_Posix is defined then the context switch routine will
// be defined externally until GDC supports inline PPC ASM.
version( AsmPPC_Posix )
extern (C) void fiber_switchContext( void** oldp, void* newp );
else
extern (C) void fiber_switchContext( void** oldp, void* newp )
{
// NOTE: The data pushed and popped in this routine must match the
// default stack created by Fiber.initStack or the initial
// switch into a new context will fail.
version( AsmX86_Win32 )
{
asm
{
naked;
// save current stack state
push EBP;
mov EBP, ESP;
push EAX;
push dword ptr FS:[0];
push dword ptr FS:[4];
push dword ptr FS:[8];
push EBX;
push ESI;
push EDI;
// store oldp again with more accurate address
mov EAX, dword ptr 8[EBP];
mov [EAX], ESP;
// load newp to begin context switch
mov ESP, dword ptr 12[EBP];
// load saved state from new stack
pop EDI;
pop ESI;
pop EBX;
pop dword ptr FS:[8];
pop dword ptr FS:[4];
pop dword ptr FS:[0];
pop EAX;
pop EBP;
// 'return' to complete switch
ret;
}
}
else version( AsmX86_Posix )
{
asm
{
naked;
// save current stack state
push EBP;
mov EBP, ESP;
push EAX;
push EBX;
push ECX;
push ESI;
push EDI;
// store oldp again with more accurate address
mov EAX, dword ptr 8[EBP];
mov [EAX], ESP;
// load newp to begin context switch
mov ESP, dword ptr 12[EBP];
// load saved state from new stack
pop EDI;
pop ESI;
pop ECX;
pop EBX;
pop EAX;
pop EBP;
// 'return' to complete switch
ret;
}
}
else version( AsmX86_64_Posix )
{
version( DigitalMars ) const dmdgdc = true;
else version (GNU) const dmdgdc = true;
else const dmdgdc = false;
static if (dmdgdc == true) asm
{
naked;
// save current stack state
push RBP;
mov RBP, RSP;
push RBX;
push R12;
push R13;
push R14;
push R15;
sub RSP, 4;
stmxcsr [RSP];
sub RSP, 4;
//version(SynchroFloatExcept){
fstcw [RSP];
fwait;
//} else {
// fnstcw [RSP];
// fnclex;
//}
// store oldp again with more accurate address
mov [RDI], RSP;
// load newp to begin context switch
mov RSP, RSI;
// load saved state from new stack
fldcw [RSP];
add RSP, 4;
ldmxcsr [RSP];
add RSP, 4;
pop R15;
pop R14;
pop R13;
pop R12;
pop RBX;
pop RBP;
// 'return' to complete switch
ret;
}
else asm
{
naked;
// save current stack state
pushq RBP;
mov RBP, RSP;
pushq RBX;
pushq R12;
pushq R13;
pushq R14;
pushq R15;
sub RSP, 4;
stmxcsr [RSP];
sub RSP, 4;
//version(SynchroFloatExcept){
fstcw [RSP];
fwait;
//} else {
// fnstcw [RSP];
// fnclex;
//}
// store oldp again with more accurate address
mov [RDI], RSP;
// load newp to begin context switch
mov RSP, RSI;
// load saved state from new stack
fldcw [RSP];
add RSP, 4;
ldmxcsr [RSP];
add RSP, 4;
popq R15;
popq R14;
popq R13;
popq R12;
popq RBX;
popq RBP;
// 'return' to complete switch
ret;
}
}
else static if( is( ucontext_t ) )
{
Fiber cfib = Fiber.getThis();
void* ucur = cfib.m_ucur;
*oldp = &ucur;
swapcontext( **(cast(ucontext_t***) oldp),
*(cast(ucontext_t**) newp) );
}
}
}
////////////////////////////////////////////////////////////////////////////////
// Fiber
////////////////////////////////////////////////////////////////////////////////
private char[] ptrToStr(size_t addr,char[]buf){
enum char[] digits="0123456789ABCDEF".dup;
enum{ nDigits=size_t.sizeof*2 }
if (nDigits>buf.length) assert(0);
char[] res=buf[0..nDigits];
size_t addrAtt=addr;
for (int i=nDigits;i!=0;--i){
res[i-1]=digits[addrAtt&0xF];
addrAtt>>=4;
}
return res;
}
/**
* This class provides a cooperative concurrency mechanism integrated with the
* threading and garbage collection functionality. Calling a fiber may be
* considered a blocking operation that returns when the fiber yields (via
* Fiber.yield()). Execution occurs within the context of the calling thread
* so synchronization is not necessary to guarantee memory visibility so long
* as the same thread calls the fiber each time. Please note that there is no
* requirement that a fiber be bound to one specific thread. Rather, fibers
* may be freely passed between threads so long as they are not currently
* executing. Like threads, a new fiber thread may be created using either
* derivation or composition, as in the following example.
*
* Example:
* ----------------------------------------------------------------------
* class DerivedFiber : Fiber
* {
* this()
* {
* super( &run );
* }
*
* private :
* void run()
* {
* printf( "Derived fiber running.\n" );
* }
* }
*
* void fiberFunc()
* {
* printf( "Composed fiber running.\n" );
* Fiber.yield();
* printf( "Composed fiber running.\n" );
* }
*
* // create instances of each type
* Fiber derived = new DerivedFiber();
* Fiber composed = new Fiber( &fiberFunc );
*
* // call both fibers once
* derived.call();
* composed.call();
* printf( "Execution returned to calling context.\n" );
* composed.call();
*
* // since each fiber has run to completion, each should have state TERM
* assert( derived.state == Fiber.State.TERM );
* assert( composed.state == Fiber.State.TERM );
* ----------------------------------------------------------------------
*
* Authors: Based on a design by Mikola Lysenko.
*/
class Fiber
{
static class Scheduler
{
alias void* Handle;
enum Type {Read=1, Write=2, Accept=3, Connect=4, Transfer=5}
void pause (uint ms) {}
void ready (Fiber fiber) {}
void open (Handle fd, char[] name) {}
void close (Handle fd, char[] name) {}
void await (Handle fd, Type t, uint timeout) {}
void spawn (char[] name, void delegate() dg, size_t stack=8192) {}
}
struct Event // scheduler support
{
uint idx; // support for timer removal
Fiber next; // linked list of elapsed fibers
void* data; // data to exchange
ulong clock; // request timeout duration
Scheduler.Handle handle; // IO request handle
Scheduler scheduler; // associated scheduler (may be null)
}
/+
final override int opCmp (Object o)
{
throw new Exception ("Invalid opCmp in Fiber");
auto other = cast(Fiber) cast(void*) o;
if (other)
{
auto x = cast(long) event.clock - cast(long) other.event.clock;
return (x < 0 ? -1 : x is 0 ? 0 : 1);
}
return 1;
}
+/
final static Scheduler scheduler ()
{
return getThis.event.scheduler;
}
////////////////////////////////////////////////////////////////////////////
// Initialization
////////////////////////////////////////////////////////////////////////////
/**
* Initializes an empty fiber object
*
* (useful to reset it)
*/
this(size_t sz){
m_dg = null;
m_fn = null;
m_call = Call.NO;
m_state = State.TERM;
m_unhandled = null;
allocStack( sz );
}
/**
* Initializes a fiber object which is associated with a static
* D function.
*
* Params:
* fn = The thread function.
* sz = The stack size for this fiber.
*
* In:
* fn must not be null.
*/
this( void function() fn, size_t sz = PAGESIZE)
in
{
assert( fn );
}
body
{
m_fn = fn;
m_call = Call.FN;
m_state = State.HOLD;
allocStack( sz );
initStack();
}
/**
* Initializes a fiber object which is associated with a dynamic
* D function.
*
* Params:
* dg = The thread function.
* sz = The stack size for this fiber.
*
* In:
* dg must not be null.
*/
this( void delegate() dg, size_t sz = PAGESIZE, Scheduler s = null )
in
{
assert( dg );
}
body
{
event.scheduler = s;
m_dg = dg;
m_call = Call.DG;
m_state = State.HOLD;
allocStack(sz);
initStack();
}
/**
* Cleans up any remaining resources used by this object.
*/
~this()
{
// NOTE: A live reference to this object will exist on its associated
// stack from the first time its call() method has been called
// until its execution completes with State.TERM. Thus, the only
// times this dtor should be called are either if the fiber has
// terminated (and therefore has no active stack) or if the user
// explicitly deletes this object. The latter case is an error
// but is not easily tested for, since State.HOLD may imply that
// the fiber was just created but has never been run. There is
// not a compelling case to create a State.INIT just to offer a
// means of ensuring the user isn't violating this object's
// contract, so for now this requirement will be enforced by
// documentation only.
freeStack();
}
////////////////////////////////////////////////////////////////////////////
// General Actions
////////////////////////////////////////////////////////////////////////////
/**
* Transfers execution to this fiber object. The calling context will be
* suspended until the fiber calls Fiber.yield() or until it terminates
* via an unhandled exception.
*
* Params:
* rethrow = Rethrow any unhandled exception which may have caused this
* fiber to terminate.
*
* In:
* This fiber must be in state HOLD.
*
* Throws:
* Any exception not handled by the joined thread.
*
* Returns:
* Any exception not handled by this fiber if rethrow = false, null
* otherwise.
*/
final Object call( bool rethrow = true )
in
{
assert( m_state == State.HOLD );
}
body
{
Fiber cur = getThis();
static if( is( ucontext_t ) )
m_ucur = cur ? &cur.m_utxt : &Fiber.sm_utxt;
setThis( this );
this.switchIn();
setThis( cur );
static if( is( ucontext_t ) )
m_ucur = null;
// NOTE: If the fiber has terminated then the stack pointers must be
// reset. This ensures that the stack for this fiber is not
// scanned if the fiber has terminated. This is necessary to
// prevent any references lingering on the stack from delaying
// the collection of otherwise dead objects. The most notable
// being the current object, which is referenced at the top of
// fiber_entryPoint.
if( m_state == State.TERM )
{
m_ctxt.tstack = m_ctxt.bstack;
}
if( m_unhandled )
{
Throwable obj = m_unhandled;
m_unhandled = null;
if( rethrow )
throw obj;
return obj;
}
return null;
}
/**
* Resets this fiber so that it may be re-used with the same function.
* This routine may only be
* called for fibers that have terminated, as doing otherwise could result
* in scope-dependent functionality that is not executed. Stack-based
* classes, for example, may not be cleaned up properly if a fiber is reset
* before it has terminated.
*
* In:
* This fiber must be in state TERM, and have a valid function/delegate.
*/
final void reset()
in
{
assert( m_call != Call.NO );
assert( m_state == State.TERM );
assert( m_ctxt.tstack == m_ctxt.bstack );
}
body
{
m_state = State.HOLD;
initStack();
m_unhandled = null;
}
/**
* Reinitializes a fiber object which is associated with a static
* D function.
*
* Params:
* fn = The thread function.
*
* In:
* This fiber must be in state TERM.
* fn must not be null.
*/
final void reset( void function() fn )
in
{
assert( fn );
assert( m_state == State.TERM );
assert( m_ctxt.tstack == m_ctxt.bstack );
}
body
{
m_fn = fn;
m_call = Call.FN;
m_state = State.HOLD;
initStack();
m_unhandled = null;
}
/**
* Reinitializes a fiber object which is associated with a dynamic
* D function.
*
* Params:
* dg = The thread function.
*
* In:
* This fiber must be in state TERM.
* dg must not be null.
*/
final void reset( void delegate() dg )
in
{
assert( dg );
assert( m_state == State.TERM );
assert( m_ctxt.tstack == m_ctxt.bstack );
}
body
{
m_dg = dg;
m_call = Call.DG;
m_state = State.HOLD;
initStack();
m_unhandled = null;
}
/**
* Clears the fiber from all references to a previous call (unhandled exceptions, delegate)
*
* In:
* This fiber must be in state TERM.
*/
final void clear()
in
{
assert( m_state == State.TERM );
assert( m_ctxt.tstack == m_ctxt.bstack );
}
body
{
if (m_state != State.TERM){
char[20] buf;
throw new Exception("Fiber@"~ptrToStr(cast(size_t)cast(void*)this,buf).idup~" in unexpected state "~ptrToStr(m_state,buf).idup,__FILE__,__LINE__);
}
if (m_ctxt.tstack != m_ctxt.bstack){
char[20] buf;
throw new Exception("Fiber@"~ptrToStr(cast(size_t)cast(void*)this,buf).idup~" bstack="~ptrToStr(cast(size_t)cast(void*)m_ctxt.bstack,buf).idup~" != tstack="~ptrToStr(cast(size_t)cast(void*)m_ctxt.tstack,buf).idup,__FILE__,__LINE__);
}
m_dg = null;
m_fn = null;
m_call = Call.NO;
m_state = State.TERM;
m_unhandled = null;
}
////////////////////////////////////////////////////////////////////////////
// General Properties
////////////////////////////////////////////////////////////////////////////
/**
* A fiber may occupy one of three states: HOLD, EXEC, and TERM. The HOLD
* state applies to any fiber that is suspended and ready to be called.
* The EXEC state will be set for any fiber that is currently executing.
* And the TERM state is set when a fiber terminates. Once a fiber
* terminates, it must be reset before it may be called again.
*/
enum State
{
HOLD, ///
EXEC, ///
TERM ///
}
/**
* Gets the current state of this fiber.
*
* Returns:
* The state of this fiber as an enumerated value.
*/
const final State state()
{
return m_state;
}
const size_t stackSize(){
return m_size;
}
////////////////////////////////////////////////////////////////////////////
// Actions on Calling Fiber
////////////////////////////////////////////////////////////////////////////
/**
* Forces a context switch to occur away from the calling fiber.
*/
final void cede ()
{
assert( m_state == State.EXEC );
static if( is( ucontext_t ) )
m_ucur = &m_utxt;
m_state = State.HOLD;
switchOut();
m_state = State.EXEC;
}
/**
* Forces a context switch to occur away from the calling fiber.
*/
static void yield()
{
Fiber cur = getThis;
assert( cur, "Fiber.yield() called with no active fiber" );
if (cur.event.scheduler)
cur.event.scheduler.pause (0);
else
cur.cede;
}
/**
* Forces a context switch to occur away from the calling fiber and then
* throws obj in the calling fiber.
*
* Params:
* obj = The object to throw.
*
* In:
* obj must not be null.
*/
static void yieldAndThrow( Throwable obj )
in
{
assert( obj );
}
body
{
Fiber cur = getThis();
assert( cur, "Fiber.yield(obj) called with no active fiber" );
cur.m_unhandled = obj;
if (cur.event.scheduler)
cur.event.scheduler.pause (0);
else
cur.cede;
}
////////////////////////////////////////////////////////////////////////////
// Fiber Accessors
////////////////////////////////////////////////////////////////////////////
/**
* Provides a reference to the calling fiber or null if no fiber is
* currently active.
*
* Returns:
* The fiber object representing the calling fiber or null if no fiber
* is currently active. The result of deleting this object is undefined.
*/
static Fiber getThis()
{
version( Win32 )
{
return cast(Fiber) TlsGetValue( sm_this );
}
else version( Posix )
{
return cast(Fiber) pthread_getspecific( sm_this );
}
}
////////////////////////////////////////////////////////////////////////////
// Static Initialization
////////////////////////////////////////////////////////////////////////////
static this()
{
version( Win32 )
{
sm_this = TlsAlloc();
assert( sm_this != TLS_OUT_OF_INDEXES );
}
else version( Posix )
{
int status;
status = pthread_key_create( &sm_this, null );
assert( status == 0 );
static if( is( ucontext_t ) )
{
status = getcontext( &sm_utxt );
assert( status == 0 );
}
}
}
private:
//
// Initializes a fiber object which has no associated executable function.
//
this()
{
m_call = Call.NO;
}
//
// Fiber entry point. Invokes the function or delegate passed on
// construction (if any).
//
final void run()
{
switch( m_call )
{
case Call.FN:
m_fn();
break;
case Call.DG:
m_dg();
break;
default:
break;
}
}
private:
//
// The type of routine passed on fiber construction.
//
enum Call
{
NO,
FN,
DG
}
//
// Standard fiber data
//
Call m_call;
union
{
void function() m_fn;
void delegate() m_dg;
}
bool m_isRunning;
Throwable m_unhandled;
State m_state;
char[] m_name;
public:
Event event;
private:
////////////////////////////////////////////////////////////////////////////
// Stack Management
////////////////////////////////////////////////////////////////////////////
//
// Allocate a new stack for this fiber.
//
final void allocStack( size_t sz )
in
{
assert( !m_pmem && !m_ctxt );
}
body
{
// adjust alloc size to a multiple of PAGESIZE
sz += PAGESIZE - 1;
sz -= sz % PAGESIZE;
// NOTE: This instance of Thread.Context is dynamic so Fiber objects
// can be collected by the GC so long as no user level references
// to the object exist. If m_ctxt were not dynamic then its
// presence in the global context list would be enough to keep
// this object alive indefinitely. An alternative to allocating
// room for this struct explicitly would be to mash it into the
// base of the stack being allocated below. However, doing so
// requires too much special logic to be worthwhile.
m_ctxt = new Thread.Context;
static if( is( typeof( VirtualAlloc ) ) )
{
// reserve memory for stack
m_pmem = VirtualAlloc( null,
sz + PAGESIZE,
MEM_RESERVE,
PAGE_NOACCESS );
if( !m_pmem )
{
throw new FiberException( "Unable to reserve memory for stack" );
}
version( StackGrowsDown )
{
void* stack = m_pmem + PAGESIZE;
void* guard = m_pmem;
void* pbase = stack + sz;
}
else
{
void* stack = m_pmem;
void* guard = m_pmem + sz;
void* pbase = stack;
}
// allocate reserved stack segment
stack = VirtualAlloc( stack,
sz,
MEM_COMMIT,
PAGE_READWRITE );
if( !stack )
{
throw new FiberException( "Unable to allocate memory for stack" );
}
// allocate reserved guard page
guard = VirtualAlloc( guard,
PAGESIZE,
MEM_COMMIT,
PAGE_READWRITE | PAGE_GUARD );
if( !guard )
{
throw new FiberException( "Unable to create guard page for stack" );
}
m_ctxt.bstack = pbase;
m_ctxt.tstack = pbase;
m_size = sz;
}
else
{ static if( is( typeof( mmap ) ) )
{
m_pmem = mmap( null,
sz,
PROT_READ | PROT_WRITE,
MAP_PRIVATE | MAP_ANON,
-1,
0 );
if( m_pmem == MAP_FAILED )
m_pmem = null;
}
else static if( is( typeof( valloc ) ) )
{
m_pmem = valloc( sz );
}
else static if( is( typeof( malloc ) ) )
{
m_pmem = malloc( sz );
}
else
{
m_pmem = null;
}
if( !m_pmem )
{
throw new FiberException( "Unable to allocate memory for stack" );
}
version( StackGrowsDown )
{
m_ctxt.bstack = m_pmem + sz;
m_ctxt.tstack = m_pmem + sz;
}
else
{
m_ctxt.bstack = m_pmem;
m_ctxt.tstack = m_pmem;
}
m_size = sz;
}
Thread.add( m_ctxt );
}
//
// Free this fiber's stack.
//
final void freeStack()
in
{
assert( m_pmem && m_ctxt );
}
body
{
// NOTE: Since this routine is only ever expected to be called from
// the dtor, pointers to freed data are not set to null.
// NOTE: m_ctxt is guaranteed to be alive because it is held in the
// global context list.
Thread.remove( m_ctxt );
static if( is( typeof( VirtualAlloc ) ) )
{
VirtualFree( m_pmem, 0, MEM_RELEASE );
}
else static if( is( typeof( mmap ) ) )
{
munmap( m_pmem, m_size );
}
else static if( is( typeof( valloc ) ) )
{
free( m_pmem );
}
else static if( is( typeof( malloc ) ) )
{
free( m_pmem );
}
delete m_ctxt;
}
//
// Initialize the allocated stack.
//
final void initStack()
in
{
assert( m_ctxt.tstack && m_ctxt.tstack == m_ctxt.bstack );
assert( cast(size_t) m_ctxt.bstack % (void*).sizeof == 0 );
}
body
{
void* pstack = m_ctxt.tstack;
scope( exit ) m_ctxt.tstack = pstack;
void push( size_t val )
{
version( StackGrowsDown )
{
pstack -= size_t.sizeof;
*(cast(size_t*) pstack) = val;
}
else
{
pstack += size_t.sizeof;
*(cast(size_t*) pstack) = val;
}
}
// NOTE: On OS X the stack must be 16-byte aligned according to the
// IA-32 call spec.
version( darwin )
{
pstack = cast(void*)(cast(uint)(pstack) - (cast(uint)(pstack) & 0x0F));
}
version( AsmX86_Win32 )
{
push( cast(size_t) &fiber_entryPoint ); // EIP
push( 0xFFFFFFFF ); // EBP
push( 0x00000000 ); // EAX
push( 0xFFFFFFFF ); // FS:[0]
version( StackGrowsDown )
{
push( cast(size_t) m_ctxt.bstack ); // FS:[4]
push( cast(size_t) m_ctxt.bstack - m_size ); // FS:[8]
}
else
{
push( cast(size_t) m_ctxt.bstack ); // FS:[4]
push( cast(size_t) m_ctxt.bstack + m_size ); // FS:[8]
}
push( 0x00000000 ); // EBX
push( 0x00000000 ); // ESI
push( 0x00000000 ); // EDI
}
else version( AsmX86_Posix )
{
push( 0x00000000 ); // strange pre EIP
push( cast(size_t) &fiber_entryPoint ); // EIP
push( (cast(size_t)pstack)+8 ); // EBP
push( 0x00000000 ); // EAX
push( getEBX() ); // EBX used for PIC code
push( 0x00000000 ); // ECX just to have it aligned...
push( 0x00000000 ); // ESI
push( 0x00000000 ); // EDI
}
else version( AsmX86_64_Posix )
{
push( 0x00000000 ); // strange pre EIP
push( cast(size_t) &fiber_entryPoint ); // RIP
push( (cast(size_t)pstack)+8 ); // RBP
push( 0x00000000_00000000 ); // RBX
push( 0x00000000_00000000 ); // R12
push( 0x00000000_00000000 ); // R13
push( 0x00000000_00000000 ); // R14
push( 0x00000000_00000000 ); // R15
push( 0x00001f80_0000037f ); // MXCSR (32 bits), unused (16 bits) , x87 control (16 bits)
}
else version( AsmPPC_Posix )
{
version( StackGrowsDown )
{
pstack -= int.sizeof * 5;
}
else
{
pstack += int.sizeof * 5;
}
push( cast(size_t) &fiber_entryPoint ); // link register
push( 0x00000000 ); // control register
push( 0x00000000 ); // old stack pointer
// GPR values
version( StackGrowsDown )
{
pstack -= int.sizeof * 20;
}
else
{
pstack += int.sizeof * 20;
}
assert( (cast(uint) pstack & 0x0f) == 0 );
}
else static if( is( ucontext_t ) )
{
getcontext( &m_utxt );
// patch from #1707 - thanks to jerdfelt
//m_utxt.uc_stack.ss_sp = m_ctxt.bstack;
m_utxt.uc_stack.ss_sp = m_pmem;
m_utxt.uc_stack.ss_size = m_size;
makecontext( &m_utxt, &fiber_entryPoint, 0 );
// NOTE: If ucontext is being used then the top of the stack will
// be a pointer to the ucontext_t struct for that fiber.
push( cast(size_t) &m_utxt );
}
}
public Thread.Context* m_ctxt;
public size_t m_size;
void* m_pmem;
static if( is( ucontext_t ) )
{
// NOTE: The static ucontext instance is used to represent the context
// of the main application thread.
static __gshared ucontext_t sm_utxt = void;
ucontext_t m_utxt = void;
ucontext_t* m_ucur = null;
}
private:
////////////////////////////////////////////////////////////////////////////
// Storage of Active Fiber
////////////////////////////////////////////////////////////////////////////
//
// Sets a thread-local reference to the current fiber object.
//
static void setThis( Fiber f )
{
version( Win32 )
{
TlsSetValue( sm_this, cast(void*) f );
}
else version( Posix )
{
pthread_setspecific( sm_this, cast(void*) f );
}
}
static __gshared Thread.TLSKey sm_this;
private:
////////////////////////////////////////////////////////////////////////////
// Context Switching
////////////////////////////////////////////////////////////////////////////
//
// Switches into the stack held by this fiber.
//
final void switchIn()
{
Thread tobj = Thread.getThis();
void** oldp = &tobj.m_curr.tstack;
void* newp = m_ctxt.tstack;
// NOTE: The order of operations here is very important. The current
// stack top must be stored before m_lock is set, and pushContext
// must not be called until after m_lock is set. This process
// is intended to prevent a race condition with the suspend
// mechanism used for garbage collection. If it is not followed,
// a badly timed collection could cause the GC to scan from the
// bottom of one stack to the top of another, or to miss scanning
// a stack that still contains valid data. The old stack pointer
// oldp will be set again before the context switch to guarantee
// that it points to exactly the correct stack location so the
// successive pop operations will succeed.
*oldp = getStackTop();
volatile tobj.m_lock = true;
tobj.pushContext( m_ctxt );
fiber_switchContext( oldp, newp );
// NOTE: As above, these operations must be performed in a strict order
// to prevent Bad Things from happening.
tobj.popContext();
volatile tobj.m_lock = false;
tobj.m_curr.tstack = tobj.m_curr.bstack;
}
//
// Switches out of the current stack and into the enclosing stack.
//
final void switchOut()
{
Thread tobj = Thread.getThis();
void** oldp = &m_ctxt.tstack;
void* newp = tobj.m_curr.within.tstack;
// NOTE: The order of operations here is very important. The current
// stack top must be stored before m_lock is set, and pushContext
// must not be called until after m_lock is set. This process
// is intended to prevent a race condition with the suspend
// mechanism used for garbage collection. If it is not followed,
// a badly timed collection could cause the GC to scan from the
// bottom of one stack to the top of another, or to miss scanning
// a stack that still contains valid data. The old stack pointer
// oldp will be set again before the context switch to guarantee
// that it points to exactly the correct stack location so the
// successive pop operations will succeed.
*oldp = getStackTop();
volatile tobj.m_lock = true;
fiber_switchContext( oldp, newp );
// NOTE: As above, these operations must be performed in a strict order
// to prevent Bad Things from happening.
tobj=Thread.getThis();
volatile tobj.m_lock = false;
tobj.m_curr.tstack = tobj.m_curr.bstack;
}
}
}
+/
|