| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417341834193420342134223423342434253426342734283429343034313432343334343435343634373438343934403441344234433444344534463447344834493450345134523453345434553456345734583459346034613462346334643465346634673468346934703471347234733474347534763477347834793480348134823483348434853486348734883489349034913492349334943495349634973498349935003501350235033504350535063507350835093510351135123513351435153516351735183519352035213522352335243525352635273528352935303531353235333534353535363537353835393540354135423543354435453546354735483549355035513552355335543555355635573558355935603561356235633564356535663567356835693570357135723573357435753576357735783579358035813582358335843585358635873588358935903591359235933594359535963597359835993600360136023603360436053606360736083609361036113612361336143615361636173618361936203621362236233624362536263627362836293630363136323633363436353636363736383639364036413642364336443645364636473648364936503651365236533654365536563657365836593660366136623663366436653666366736683669367036713672367336743675367636773678367936803681368236833684368536863687368836893690369136923693369436953696369736983699370037013702370337043705370637073708370937103711371237133714371537163717371837193720372137223723372437253726372737283729373037313732373337343735373637373738373937403741374237433744374537463747374837493750375137523753375437553756375737583759376037613762376337643765376637673768376937703771377237733774377537763777377837793780378137823783378437853786378737883789379037913792379337943795379637973798379938003801380238033804380538063807380838093810381138123813381438153816381738183819382038213822382338243825382638273828382938303831383238333834383538363837383838393840 | /** * The thread module provides support for thread creation and management. * * If AtomicSuspendCount is used for speed reasons all signals are sent together. * When debugging gdb funnels all signals through one single handler, and if * the signals arrive quickly enough they will be coalesced in a single signal, * (discarding the second) thus it is possible to loose signals, which blocks * the program. Thus when debugging it is better to use the slower SuspendOneAtTime * version. * * Copyright: Copyright (C) 2005-2006 Sean Kelly, Fawzi. All rights reserved. * License: BSD style: $(LICENSE) * Authors: Sean Kelly, Fawzi Mohamed */ module tango.core.Thread; public import core.thread; import Time = tango.core.Time; extern(C) { void thread_yield() { Thread.yield(); } void thread_sleep(double period) { Thread.sleep(Time.seconds(period)); } } /+ import tango.core.sync.Atomic; debug(Thread) import tango.stdc.stdio : printf; // this should be true for most architectures version = StackGrowsDown; version(darwin){ version=AtomicSuspendCount; } version(linux){ version=AtomicSuspendCount; } public { // import tango.core.TimeSpan; } private { import tango.core.Exception; extern (C) void _d_monitorenter(Object); extern (C) void _d_monitorexit(Object); // // exposed by compiler runtime // extern (C) void* rt_stackBottom(); extern (C) void* rt_stackTop(); void* getStackBottom() { return rt_stackBottom(); } void* getStackTop() { version( D_InlineAsm_X86 ) { asm { naked; mov EAX, ESP; ret; } } else { return rt_stackTop(); } } version(D_InlineAsm_X86){ uint getEBX(){ uint retVal; asm{ mov retVal,EBX; } return retVal; } } } //////////////////////////////////////////////////////////////////////////////// // Thread Entry Point and Signal Handlers //////////////////////////////////////////////////////////////////////////////// version( Win32 ) { private { import tango.stdc.stdint : uintptr_t; // for _beginthreadex decl below import tango.sys.win32.UserGdi; enum DWORD TLS_OUT_OF_INDEXES = 0xFFFFFFFF; // // avoid multiple imports via tango.sys.windows.process // extern (Windows) alias uint function(void*) btex_fptr; extern (C) uintptr_t _beginthreadex(void*, uint, btex_fptr, void*, uint, uint*); // // entry point for Windows threads // extern (Windows) uint thread_entryPoint( void* arg ) { Thread obj = cast(Thread) arg; assert( obj ); scope( exit ) Thread.remove( obj ); assert( obj.m_curr is &obj.m_main ); obj.m_main.bstack = getStackBottom(); obj.m_main.tstack = obj.m_main.bstack; Thread.add( &obj.m_main ); Thread.setThis( obj ); // NOTE: No GC allocations may occur until the stack pointers have // been set and Thread.getThis returns a valid reference to // this thread object (this latter condition is not strictly // necessary on Win32 but it should be followed for the sake // of consistency). // TODO: Consider putting an auto exception object here (using // alloca) forOutOfMemoryError plus something to track // whether an exception is in-flight? try { obj.run(); } catch( Object o ) { obj.m_unhandled = o; } return 0; } // // copy of the same-named function in phobos.std.thread--it uses the // Windows naming convention to be consistent with GetCurrentThreadId // HANDLE GetCurrentThreadHandle() { enum uint DUPLICATE_SAME_ACCESS = 0x00000002; HANDLE curr = GetCurrentThread(), proc = GetCurrentProcess(), hndl; DuplicateHandle( proc, curr, proc, &hndl, 0, TRUE, DUPLICATE_SAME_ACCESS ); return hndl; } } } else version( Posix ) { private { import tango.stdc.posix.semaphore; import tango.stdc.posix.pthread; import tango.stdc.posix.signal; import tango.stdc.posix.time; import tango.stdc.errno; extern (C) int getErrno(); version( GNU ) { import gcc.builtins; } // // entry point for POSIX threads // extern (C) void* thread_entryPoint( void* arg ) { Thread obj = cast(Thread) arg; assert( obj ); scope( exit ) { // NOTE: isRunning should be set to false after the thread is // removed or a double-removal could occur between this // function and thread_suspendAll. Thread.remove( obj ); obj.m_isRunning = false; } static extern (C) void thread_cleanupHandler( void* arg ) { Thread obj = cast(Thread) arg; assert( obj ); // NOTE: If the thread terminated abnormally, just set it as // not running and let thread_suspendAll remove it from // the thread list. This is safer and is consistent // with the Windows thread code. obj.m_isRunning = false; } // NOTE: Using void to skip the initialization here relies on // knowledge of how pthread_cleanup is implemented. It may // not be appropriate for all platforms. However, it does // avoid the need to link the pthread module. If any // implementation actually requires default initialization // then pthread_cleanup should be restructured to maintain // the current lack of a link dependency. version( linux ) { pthread_cleanup cleanup = void; cleanup.push( &thread_cleanupHandler, cast(void*) obj ); } else version( darwin ) { pthread_cleanup cleanup = void; cleanup.push( &thread_cleanupHandler, cast(void*) obj ); } else version( solaris ) { pthread_cleanup cleanup = void; cleanup.push( &thread_cleanupHandler, cast(void*) obj ); } else { pthread_cleanup_push( &thread_cleanupHandler, cast(void*) obj ); } // NOTE: For some reason this does not always work for threads. //obj.m_main.bstack = getStackBottom(); version( D_InlineAsm_X86 ) { static void* getBasePtr() { asm { naked; mov EAX, EBP; ret; } } obj.m_main.bstack = getBasePtr(); } else version( StackGrowsDown ) obj.m_main.bstack = &obj + 1; else obj.m_main.bstack = &obj; obj.m_main.tstack = obj.m_main.bstack; assert( obj.m_curr == &obj.m_main ); Thread.add( &obj.m_main ); Thread.setThis( obj ); // NOTE: No GC allocations may occur until the stack pointers have // been set and Thread.getThis returns a valid reference to // this thread object (this latter condition is not strictly // necessary on Win32 but it should be followed for the sake // of consistency). // TODO: Consider putting an auto exception object here (using // alloca) forOutOfMemoryError plus something to track // whether an exception is in-flight? try { obj.run(); } catch( Throwable o ) { obj.m_unhandled = o; } return null; } // // used to track the number of suspended threads // version(AtomicSuspendCount){ int suspendCount; } else { sem_t suspendCount; } extern (C) void thread_suspendHandler( int sig ) in { assert( sig == SIGUSR1 ); } body { version( LDC) { version(X86) { uint eax,ecx,edx,ebx,ebp,esi,edi; asm { mov eax[EBP], EAX ; mov ecx[EBP], ECX ; mov edx[EBP], EDX ; mov ebx[EBP], EBX ; mov ebp[EBP], EBP ; mov esi[EBP], ESI ; mov edi[EBP], EDI ; } } else version (X86_64) { ulong rax,rbx,rcx,rdx,rbp,rsi,rdi,rsp,r8,r9,r10,r11,r12,r13,r14,r15; asm { movq rax[RBP], RAX ; movq rbx[RBP], RBX ; movq rcx[RBP], RCX ; movq rdx[RBP], RDX ; movq rbp[RBP], RBP ; movq rsi[RBP], RSI ; movq rdi[RBP], RDI ; movq rsp[RBP], RSP ; movq r8 [RBP], R8 ; movq r9 [RBP], R9 ; movq r10[RBP], R10 ; movq r11[RBP], R11 ; movq r12[RBP], R12 ; movq r13[RBP], R13 ; movq r14[RBP], R14 ; movq r15[RBP], R15 ; } } else { static assert( false, "Architecture not supported." ); } } else version( D_InlineAsm_X86 ) { asm { pushad; } } else version( GNU ) { __builtin_unwind_init(); } else version ( D_InlineAsm_X86_64 ) { asm { // Not sure what goes here, pushad is invalid in 64 bit code push RAX ; push RBX ; push RCX ; push RDX ; push RSI ; push RDI ; push RBP ; push R8 ; push R9 ; push R10 ; push R11 ; push R12 ; push R13 ; push R14 ; push R15 ; push RAX ; // 16 byte align the stack } } else { static assert( false, "Architecture not supported." ); } // NOTE: Since registers are being pushed and popped from the stack, // any other stack data used by this function should be gone // before the stack cleanup code is called below. { Thread obj = Thread.getThis(); // NOTE: The thread reference returned by getThis is set within // the thread startup code, so it is possible that this // handler may be called before the reference is set. In // this case it is safe to simply suspend and not worry // about the stack pointers as the thread will not have // any references to GC-managed data. if( obj && !obj.m_lock ) { obj.m_curr.tstack = getStackTop(); } sigset_t sigres = void; int status; status = sigfillset( &sigres ); assert( status == 0 ); status = sigdelset( &sigres, SIGUSR2 ); assert( status == 0 ); version (AtomicSuspendCount){ auto oldV=flagAdd(suspendCount,1); } else { status = sem_post( &suspendCount ); assert( status == 0 ); } // here one could do some work (like scan the current stack in this thread...) sigsuspend( &sigres ); if( obj && !obj.m_lock ) { obj.m_curr.tstack = obj.m_curr.bstack; } } version( LDC) { // nothing to pop } else version( D_InlineAsm_X86 ) { asm { popad; } } else version( GNU ) { // registers will be popped automatically } else version ( D_InlineAsm_X86_64 ) { asm { // Not sure what goes here, popad is invalid in 64 bit code pop RAX ; // 16 byte align the stack pop R15 ; pop R14 ; pop R13 ; pop R12 ; pop R11 ; pop R10 ; pop R9 ; pop R8 ; pop RBP ; pop RDI ; pop RSI ; pop RDX ; pop RCX ; pop RBX ; pop RAX ; } } else { static assert( false, "Architecture not supported." ); } } extern (C) void thread_resumeHandler( int sig ) in { assert( sig == SIGUSR2 ); } body { int status; version (AtomicSuspendCount){ auto oldV=flagAdd(suspendCount,-1); } else { status = sem_post( &suspendCount ); } assert( status == 0 ); } } alias void function(int) sHandler; sHandler _thread_abortHandler=null; extern (C) void thread_abortHandler( int sig ){ if (_thread_abortHandler!is null){ _thread_abortHandler(sig); } else { exit(-1); } } extern (C) void setthread_abortHandler(sHandler f){ _thread_abortHandler=f; } } else { // NOTE: This is the only place threading versions are checked. If a new // version is added, the module code will need to be searched for // places where version-specific code may be required. This can be // easily accomlished by searching for 'Windows' or 'Posix'. static assert( false, "Unknown threading implementation." ); } //////////////////////////////////////////////////////////////////////////////// // Thread //////////////////////////////////////////////////////////////////////////////// /** * This class encapsulates all threading functionality for the D * programming language. As thread manipulation is a required facility * for garbage collection, all user threads should derive from this * class, and instances of this class should never be explicitly deleted. * A new thread may be created using either derivation or composition, as * in the following example. * * Example: * ----------------------------------------------------------------------------- * class DerivedThread : Thread * { * this() * { * super( &run ); * } * * private : * void run() * { * printf( "Derived thread running.\n" ); * } * } * * void threadFunc() * { * printf( "Composed thread running.\n" ); * } * * // create instances of each type * Thread derived = new DerivedThread(); * Thread composed = new Thread( &threadFunc ); * * // start both threads * derived.start(); * composed.start(); * ----------------------------------------------------------------------------- */ class Thread { //////////////////////////////////////////////////////////////////////////// // Initialization //////////////////////////////////////////////////////////////////////////// /** * Initializes a thread object which is associated with a static * D function. * * Params: * fn = The thread function. * sz = The stack size for this thread. * * In: * fn must not be null. */ this( void function() fn, size_t sz = 0 ) in { assert( fn ); } body { m_fn = fn; m_sz = sz; m_call = Call.FN; m_curr = &m_main; } /** * Initializes a thread object which is associated with a dynamic * D function. * * Params: * dg = The thread function. * sz = The stack size for this thread. * * In: * dg must not be null. */ this( void delegate() dg, size_t sz = 0 ) in { assert( dg ); } body { m_dg = dg; m_sz = sz; m_call = Call.DG; m_curr = &m_main; } /** * Cleans up any remaining resources used by this object. */ ~this() { if( m_addr == m_addr.init ) { return; } version( Win32 ) { m_addr = m_addr.init; CloseHandle( m_hndl ); m_hndl = m_hndl.init; } else version( Posix ) { pthread_detach( m_addr ); m_addr = m_addr.init; } } //////////////////////////////////////////////////////////////////////////// // General Actions //////////////////////////////////////////////////////////////////////////// /** * Starts the thread and invokes the function or delegate passed upon * construction. * * In: * This routine may only be called once per thread instance. * * Throws: * ThreadException if the thread fails to start. */ final void start() in { assert( !next && !prev ); } body { version( Win32 ) {} else version( Posix ) { pthread_attr_t attr; if( pthread_attr_init( &attr ) ) throw new ThreadException( "Error initializing thread attributes" ); if( m_sz && pthread_attr_setstacksize( &attr, m_sz ) ) throw new ThreadException( "Error initializing thread stack size" ); if( pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_JOINABLE ) ) throw new ThreadException( "Error setting thread joinable" ); } // NOTE: This operation needs to be synchronized to avoid a race // condition with the GC. Without this lock, the thread // could start and allocate memory before being added to // the global thread list, preventing it from being scanned // and causing memory to be collected that is still in use. synchronized( slock ) { volatile multiThreadedFlag = true; version( Win32 ) { m_hndl = cast(HANDLE) _beginthreadex( null, m_sz, &thread_entryPoint, cast(void*) this, 0, &m_addr ); if( cast(size_t) m_hndl == 0 ) throw new ThreadException( "Error creating thread" ); } else version( Posix ) { m_isRunning = true; scope( failure ) m_isRunning = false; if( pthread_create( &m_addr, &attr, &thread_entryPoint, cast(void*) this ) != 0 ) throw new ThreadException( "Error creating thread" ); } add( this ); } } /** * Waits for this thread to complete. If the thread terminated as the * result of an unhandled exception, this exception will be rethrown. * * Params: * rethrow = Rethrow any unhandled exception which may have caused this * thread to terminate. * * Throws: * ThreadException if the operation fails. * Any exception not handled by the joined thread. * * Returns: * Any exception not handled by this thread if rethrow = false, null * otherwise. */ final Object join( bool rethrow = true ) { if(!isRunning()) return null; version( Win32 ) { if( WaitForSingleObject( m_hndl, INFINITE ) != WAIT_OBJECT_0 ) throw new ThreadException( "Unable to join thread" ); // NOTE: m_addr must be cleared before m_hndl is closed to avoid // a race condition with isRunning. The operation is labeled // volatile to prevent compiler reordering. volatile m_addr = m_addr.init; CloseHandle( m_hndl ); m_hndl = m_hndl.init; } else version( Posix ) { if( pthread_join( m_addr, null ) != 0 ) throw new ThreadException( "Unable to join thread" ); // NOTE: pthread_join acts as a substitute for pthread_detach, // which is normally called by the dtor. Setting m_addr // to zero ensures that pthread_detach will not be called // on object destruction. volatile m_addr = m_addr.init; } if( m_unhandled ) { if( rethrow ) throw m_unhandled; return m_unhandled; } return null; } //////////////////////////////////////////////////////////////////////////// // General Properties //////////////////////////////////////////////////////////////////////////// /** * Gets the user-readable label for this thread. * * Returns: * The name of this thread. */ const final const(char[]) name() { synchronized( this ) { return m_name; } } /** * Sets the user-readable label for this thread. * * Params: * val = The new name of this thread. */ final void name( const(char[]) val ) { synchronized( this ) { m_name = val.dup; } } /** * Gets the daemon status for this thread. While the runtime will wait for * all normal threads to complete before tearing down the process, daemon * threads are effectively ignored and thus will not prevent the process * from terminating. In effect, daemon threads will be terminated * automatically by the OS when the process exits. * * Returns: * true if this is a daemon thread. */ const final bool isDaemon() { synchronized( this ) { return m_isDaemon; } } /** * Sets the daemon status for this thread. While the runtime will wait for * all normal threads to complete before tearing down the process, daemon * threads are effectively ignored and thus will not prevent the process * from terminating. In effect, daemon threads will be terminated * automatically by the OS when the process exits. * * Params: * val = The new daemon status for this thread. */ final void isDaemon( bool val ) { synchronized( this ) { m_isDaemon = val; } } /** * Tests whether this thread is running. * * Returns: * true if the thread is running, false if not. */ const final bool isRunning() { if( m_addr == m_addr.init ) { return false; } version( Win32 ) { uint ecode = 0; GetExitCodeThread( m_hndl, &ecode ); return ecode == STILL_ACTIVE; } else version( Posix ) { // NOTE: It should be safe to access this value without // memory barriers because word-tearing and such // really isn't an issue for boolean values. return m_isRunning; } } //////////////////////////////////////////////////////////////////////////// // Thread Priority Actions //////////////////////////////////////////////////////////////////////////// /** * The minimum scheduling priority that may be set for a thread. On * systems where multiple scheduling policies are defined, this value * represents the minimum valid priority for the scheduling policy of * the process. */ static __gshared const int PRIORITY_MIN; /** * The maximum scheduling priority that may be set for a thread. On * systems where multiple scheduling policies are defined, this value * represents the minimum valid priority for the scheduling policy of * the process. */ static __gshared const int PRIORITY_MAX; /** * Gets the scheduling priority for the associated thread. * * Returns: * The scheduling priority of this thread. */ const final int priority() { version( Win32 ) { return GetThreadPriority( m_hndl ); } else version( Posix ) { int policy; sched_param param; if( pthread_getschedparam( m_addr, &policy, ¶m ) ) throw new ThreadException( "Unable to get thread priority" ); return param.sched_priority; } } /** * Sets the scheduling priority for the associated thread. * * Params: * val = The new scheduling priority of this thread. */ final void priority( int val ) { version( Win32 ) { if( !SetThreadPriority( m_hndl, val ) ) throw new ThreadException( "Unable to set thread priority" ); } else version( Posix ) { // NOTE: pthread_setschedprio is not implemented on linux, so use // the more complicated get/set sequence below. //if( pthread_setschedprio( m_addr, val ) ) // throw new ThreadException( "Unable to set thread priority" ); int policy; sched_param param; if( pthread_getschedparam( m_addr, &policy, ¶m ) ) throw new ThreadException( "Unable to set thread priority" ); param.sched_priority = val; if( pthread_setschedparam( m_addr, policy, ¶m ) ) throw new ThreadException( "Unable to set thread priority" ); } } //////////////////////////////////////////////////////////////////////////// // Actions on Calling Thread //////////////////////////////////////////////////////////////////////////// /** * Suspends the calling thread for at least the supplied time, up to a * maximum of (uint.max - 1) milliseconds. * * Params: * period = The minimum duration the calling thread should be suspended, * in seconds. Sub-second durations are specified as fractional * values. * * In: * period must be less than (uint.max - 1) milliseconds. * * Example: * ------------------------------------------------------------------------- * Thread.sleep( 0.05 ); // sleep for 50 milliseconds * Thread.sleep( 5 ); // sleep for 5 seconds * ------------------------------------------------------------------------- */ static void sleep( double period ) in { // NOTE: The fractional value added to period is to correct fp error. assert( period * 1000 + 0.1 < uint.max - 1 ); } body { version( Win32 ) { Sleep( cast(uint)( period * 1000 + 0.1 ) ); } else version( Posix ) { timespec tin = void; timespec tout = void; period += 0.000_000_000_1; if( tin.tv_sec.max < period ) { tin.tv_sec = tin.tv_sec.max; tin.tv_nsec = 0; } else { tin.tv_sec = cast(typeof(tin.tv_sec)) period; tin.tv_nsec = cast(typeof(tin.tv_nsec)) ((period % 1.0) * 1_000_000_000); } while( true ) { if( !nanosleep( &tin, &tout ) ) return; if( getErrno() != EINTR ) throw new ThreadException( "Unable to sleep for specified duration" ); tin = tout; } } } /+ /** * Suspends the calling thread for at least the supplied time, up to a * maximum of (uint.max - 1) milliseconds. * * Params: * period = The minimum duration the calling thread should be suspended. * * In: * period must be less than (uint.max - 1) milliseconds. * * Example: * ------------------------------------------------------------------------- * Thread.sleep( TimeSpan.milliseconds( 50 ) ); // sleep for 50 milliseconds * Thread.sleep( TimeSpan.seconds( 5 ) ); // sleep for 5 seconds * ------------------------------------------------------------------------- */ static void sleep( TimeSpan period ) in { assert( period.milliseconds < uint.max - 1 ); } body { version( Win32 ) { Sleep( cast(uint)( period.milliseconds ) ); } else version( Posix ) { timespec tin = void; timespec tout = void; if( tin.tv_sec.max < period.seconds ) { tin.tv_sec = tin.tv_sec.max; tin.tv_nsec = 0; } else { tin.tv_sec = cast(typeof(tin.tv_sec)) period.seconds; tin.tv_nsec = cast(typeof(tin.tv_nsec)) period.nanoseconds % 1_000_000_000; } while( true ) { if( !nanosleep( &tin, &tout ) ) return; if( getErrno() != EINTR ) throw new ThreadException( "Unable to sleep for specified duration" ); tin = tout; } } } /** * Suspends the calling thread for at least the supplied time, up to a * maximum of (uint.max - 1) milliseconds. * * Params: * period = The minimum duration the calling thread should be suspended, * in seconds. Sub-second durations are specified as fractional * values. Please note that because period is a floating-point * number, some accuracy may be lost for certain intervals. For * this reason, the TimeSpan overload is preferred in instances * where an exact interval is required. * * In: * period must be less than (uint.max - 1) milliseconds. * * Example: * ------------------------------------------------------------------------- * Thread.sleep( 0.05 ); // sleep for 50 milliseconds * Thread.sleep( 5 ); // sleep for 5 seconds * ------------------------------------------------------------------------- */ static void sleep( double period ) { sleep( TimeSpan.interval( period ) ); } +/ /** * Forces a context switch to occur away from the calling thread. */ static void yield() { version( Win32 ) { // NOTE: Sleep(1) is necessary because Sleep(0) does not give // lower priority threads any timeslice, so looping on // Sleep(0) could be resource-intensive in some cases. Sleep( 1 ); } else version( Posix ) { sched_yield(); } } //////////////////////////////////////////////////////////////////////////// // Thread Accessors //////////////////////////////////////////////////////////////////////////// /** * Provides a reference to the calling thread. * * Returns: * The thread object representing the calling thread. The result of * deleting this object is undefined. */ static Thread getThis() { // NOTE: This function may not be called until thread_init has // completed. See thread_suspendAll for more information // on why this might occur. version( Win32 ) { return cast(Thread) TlsGetValue( sm_this ); } else version( Posix ) { return cast(Thread) pthread_getspecific( sm_this ); } } /** * Provides a list of all threads currently being tracked by the system. * * Returns: * An array containing references to all threads currently being * tracked by the system. The result of deleting any contained * objects is undefined. */ static Thread[] getAll() { Thread[] buf; while(1){ if (buf) delete buf; buf = new Thread[sm_tlen]; synchronized( slock ) { size_t pos = 0; if (buf.length<sm_tlen) { continue; } else { buf.length=sm_tlen; } foreach( Thread t; Thread ) { buf[pos++] = t; } return buf; } } } /** * Operates on all threads currently being tracked by the system. The * result of deleting any Thread object is undefined. * * Params: * dg = The supplied code as a delegate. * * Returns: * Zero if all elemented are visited, nonzero if not. */ static int opApply(scope int delegate( ref Thread ) dg ) { synchronized( slock ) { int ret = 0; for( Thread t = sm_tbeg; t; t = t.next ) { ret = dg( t ); if( ret ) break; } return ret; } } //////////////////////////////////////////////////////////////////////////// // Local Storage Actions //////////////////////////////////////////////////////////////////////////// /** * Indicates the number of local storage pointers available at program * startup. It is recommended that this number be at least 64. */ static const uint LOCAL_MAX = 64; /** * Reserves a local storage pointer for use and initializes this location * to null for all running threads. * * Returns: * A key representing the array offset of this memory location. */ static uint createLocal() { synchronized( slock ) { foreach( uint key, ref bool set; sm_local ) { if( !set ) { //foreach( Thread t; sm_tbeg ) Bug in GDC 0.24 SVN (r139) for( Thread t = sm_tbeg; t; t = t.next ) { t.m_local[key] = null; } set = true; return key; } } throw new ThreadException( "No more local storage slots available" ); } } /** * Marks the supplied key as available and sets the associated location * to null for all running threads. It is assumed that any key passed * to this function is valid. The result of calling this function for * a key which is still in use is undefined. * * Params: * key = The key to delete. */ static void deleteLocal( uint key ) { synchronized( slock ) { sm_local[key] = false; // foreach( Thread t; sm_tbeg ) Bug in GDC 0.24 SVN (r139) for( Thread t = sm_tbeg; t; t = t.next ) { t.m_local[key] = null; } } } /** * Loads the value stored at key within a thread-local static array. It is * assumed that any key passed to this function is valid. * * Params: * key = The location which holds the desired data. * * Returns: * The data associated with the supplied key. */ static void* getLocal( uint key ) { return getThis().m_local[key]; } /** * Stores the supplied value at key within a thread-local static array. It * is assumed that any key passed to this function is valid. * * Params: * key = The location to store the supplied data. * val = The data to store. * * Returns: * A copy of the data which has just been stored. */ static void* setLocal( uint key, void* val ) { return getThis().m_local[key] = val; } //////////////////////////////////////////////////////////////////////////// // Static Initalizer //////////////////////////////////////////////////////////////////////////// /** * This initializer is used to set thread constants. All functional * initialization occurs within thread_init(). */ static this() { version( Win32 ) { PRIORITY_MIN = -15; PRIORITY_MAX = 15; } else version( Posix ) { int policy; sched_param param; pthread_t self = pthread_self(); int status = pthread_getschedparam( self, &policy, ¶m ); assert( status == 0 ); PRIORITY_MIN = sched_get_priority_min( policy ); assert( PRIORITY_MIN != -1 ); PRIORITY_MAX = sched_get_priority_max( policy ); assert( PRIORITY_MAX != -1 ); } } private: // // Initializes a thread object which has no associated executable function. // This is used for the main thread initialized in thread_init(). // this() { m_call = Call.NO; m_curr = &m_main; } // // Thread entry point. Invokes the function or delegate passed on // construction (if any). // final void run() { switch( m_call ) { case Call.FN: m_fn(); break; case Call.DG: m_dg(); break; default: break; } } private: // // The type of routine passed on thread construction. // enum Call { NO, FN, DG } // // Standard types // version( Win32 ) { alias uint TLSKey; alias uint ThreadAddr; } else version( Posix ) { alias pthread_key_t TLSKey; alias pthread_t ThreadAddr; } // // Local storage // static __gshared bool[LOCAL_MAX] sm_local; static __gshared TLSKey sm_this; void*[LOCAL_MAX] m_local; // // Standard thread data // version( Win32 ) { HANDLE m_hndl; } public ThreadAddr m_addr; Call m_call; const(char)[] m_name; union { void function() m_fn; void delegate() m_dg; } size_t m_sz; version( Posix ) { bool m_isRunning; } bool m_isDaemon; public Throwable m_unhandled; private: //////////////////////////////////////////////////////////////////////////// // Storage of Active Thread //////////////////////////////////////////////////////////////////////////// // // Sets a thread-local reference to the current thread object. // static void setThis( Thread t ) { version( Win32 ) { TlsSetValue( sm_this, cast(void*) t ); } else version( Posix ) { pthread_setspecific( sm_this, cast(void*) t ); } } private: //////////////////////////////////////////////////////////////////////////// // Thread Context and GC Scanning Support //////////////////////////////////////////////////////////////////////////// final void pushContext( Context* c ) in { assert( !c.within ); } body { c.within = m_curr; m_curr = c; } final void popContext() in { assert( m_curr && m_curr.within ); } body { Context* c = m_curr; m_curr = c.within; c.within = null; } public final Context* topContext() in { assert( m_curr ); } body { return m_curr; } public static struct Context { void* bstack, tstack; Context* within; Context* next, prev; } Context m_main; Context* m_curr; bool m_lock; version( Win32 ) { uint[8] m_reg; // edi,esi,ebp,esp,ebx,edx,ecx,eax } private: //////////////////////////////////////////////////////////////////////////// // GC Scanning Support //////////////////////////////////////////////////////////////////////////// // NOTE: The GC scanning process works like so: // // 1. Suspend all threads. // 2. Scan the stacks of all suspended threads for roots. // 3. Resume all threads. // // Step 1 and 3 require a list of all threads in the system, while // step 2 requires a list of all thread stacks (each represented by // a Context struct). Traditionally, there was one stack per thread // and the Context structs were not necessary. However, Fibers have // changed things so that each thread has its own 'main' stack plus // an arbitrary number of nested stacks (normally referenced via // m_curr). Also, there may be 'free-floating' stacks in the system, // which are Fibers that are not currently executing on any specific // thread but are still being processed and still contain valid // roots. // // To support all of this, the Context struct has been created to // represent a stack range, and a global list of Context structs has // been added to enable scanning of these stack ranges. The lifetime // (and presence in the Context list) of a thread's 'main' stack will // be equivalent to the thread's lifetime. So the Ccontext will be // added to the list on thread entry, and removed from the list on // thread exit (which is essentially the same as the presence of a // Thread object in its own global list). The lifetime of a Fiber's // context, however, will be tied to the lifetime of the Fiber object // itself, and Fibers are expected to add/remove their Context struct // on construction/deletion. // // All use of the global lists should synchronize on this lock. // static Object slock() { return Thread.classinfo; } static __gshared Context* sm_cbeg; static __gshared size_t sm_clen; static __gshared Thread sm_tbeg; static __gshared size_t sm_tlen; // // Used for ordering threads in the global thread list. // Thread prev; Thread next; //////////////////////////////////////////////////////////////////////////// // Global Context List Operations //////////////////////////////////////////////////////////////////////////// // // Add a context to the global context list. // static void add( Context* c ) in { assert( c ); assert( !c.next && !c.prev ); } body { synchronized( slock ) { if( sm_cbeg ) { c.next = sm_cbeg; sm_cbeg.prev = c; } sm_cbeg = c; ++sm_clen; } } // // Remove a context from the global context list. // static void remove( Context* c ) in { assert( c ); assert( c.next || c.prev ); } body { synchronized( slock ) { if( c.prev ) c.prev.next = c.next; if( c.next ) c.next.prev = c.prev; if( sm_cbeg == c ) sm_cbeg = c.next; --sm_clen; } // NOTE: Don't null out c.next or c.prev because opApply currently // follows c.next after removing a node. This could be easily // addressed by simply returning the next node from this function, // however, a context should never be re-added to the list anyway // and having next and prev be non-null is a good way to // ensure that. } //////////////////////////////////////////////////////////////////////////// // Global Thread List Operations //////////////////////////////////////////////////////////////////////////// // // Add a thread to the global thread list. // static void add( Thread t ) in { assert( t ); assert( !t.next && !t.prev ); assert( t.isRunning ); } body { synchronized( slock ) { if( sm_tbeg ) { t.next = sm_tbeg; sm_tbeg.prev = t; } sm_tbeg = t; ++sm_tlen; } } // // Remove a thread from the global thread list. // static void remove( Thread t ) in { assert( t ); assert( t.next || t.prev ); } body { synchronized( slock ) { // NOTE: When a thread is removed from the global thread list its // main context is invalid and should be removed as well. // It is possible that t.m_curr could reference more // than just the main context if the thread exited abnormally // (if it was terminated), but we must assume that the user // retains a reference to them and that they may be re-used // elsewhere. Therefore, it is the responsibility of any // object that creates contexts to clean them up properly // when it is done with them. remove( &t.m_main ); if( t.prev ) t.prev.next = t.next; if( t.next ) t.next.prev = t.prev; if( sm_tbeg == t ) sm_tbeg = t.next; --sm_tlen; } // NOTE: Don't null out t.next or t.prev because opApply currently // follows t.next after removing a node. This could be easily // addressed by simply returning the next node from this function, // however, a thread should never be re-added to the list anyway // and having next and prev be non-null is a good way to // ensure that. } } //////////////////////////////////////////////////////////////////////////////// // GC Support Routines //////////////////////////////////////////////////////////////////////////////// /** * Initializes the thread module. This function must be called by the * garbage collector on startup and before any other thread routines * are called. */ extern (C) void thread_init() { // NOTE: If thread_init itself performs any allocations then the thread // routines reserved for garbage collector use may be called while // thread_init is being processed. However, since no memory should // exist to be scanned at this point, it is sufficient for these // functions to detect the condition and return immediately. version( Win32 ) { Thread.sm_this = TlsAlloc(); assert( Thread.sm_this != TLS_OUT_OF_INDEXES ); Fiber.sm_this = TlsAlloc(); assert( Thread.sm_this != TLS_OUT_OF_INDEXES ); } else version( Posix ) { int status; sigaction_t sigusr1 = void; sigaction_t sigusr2 = void; sigaction_t sigabrt = void; // This is a quick way to zero-initialize the structs without using // memset or creating a link dependency on their static initializer. (cast(byte*) &sigusr1)[0 .. sigaction_t.sizeof] = 0; (cast(byte*) &sigusr2)[0 .. sigaction_t.sizeof] = 0; (cast(byte*) &sigabrt)[0 .. sigaction_t.sizeof] = 0; // NOTE: SA_RESTART indicates that system calls should restart if they // are interrupted by a signal, but this is not available on all // Posix systems, even those that support multithreading. static if( is( typeof( SA_RESTART ) ) ) sigusr1.sa_flags = SA_RESTART; else sigusr1.sa_flags = 0; sigusr1.sa_handler = &thread_suspendHandler; // NOTE: We want to ignore all signals while in this handler, so fill // sa_mask to indicate this. status = sigfillset( &sigusr1.sa_mask ); assert( status == 0 ); status = sigdelset( &sigusr1.sa_mask , SIGABRT); assert( status == 0 ); // NOTE: Since SIGUSR2 should only be issued for threads within the // suspend handler, we don't want this signal to trigger a // restart. sigusr2.sa_flags = 0; sigusr2.sa_handler = &thread_resumeHandler; // NOTE: We want to ignore all signals while in this handler, so fill // sa_mask to indicate this. status = sigfillset( &sigusr2.sa_mask ); assert( status == 0 ); status = sigdelset( &sigusr2.sa_mask , SIGABRT); assert( status == 0 ); status = sigaction( SIGUSR1, &sigusr1, null ); assert( status == 0 ); status = sigaction( SIGUSR2, &sigusr2, null ); assert( status == 0 ); // NOTE: SA_RESTART indicates that system calls should restart if they // are interrupted by a signal, but this is not available on all // Posix systems, even those that support multithreading. static if( is( typeof( SA_RESTART ) ) ) sigabrt.sa_flags = SA_RESTART; else sigabrt.sa_flags = 0; sigabrt.sa_handler = &thread_abortHandler; // NOTE: We want to ignore all signals while in this handler, so fill // sa_mask to indicate this. status = sigfillset( &sigabrt.sa_mask ); assert( status == 0 ); status = sigaction( SIGABRT, &sigabrt, null ); assert( status == 0 ); version(AtomicSuspendCount){ suspendCount=0; } else { status = sem_init( &suspendCount, 0, 0 ); } assert( status == 0 ); status = pthread_key_create( &Thread.sm_this, null ); assert( status == 0 ); status = pthread_key_create( &Fiber.sm_this, null ); assert( status == 0 ); } thread_attachThis(); } /** * Registers the calling thread for use with Tango. If this routine is called * for a thread which is already registered, the result is undefined. */ extern (C) void thread_attachThis() { version( Win32 ) { Thread thisThread = new Thread(); Thread.Context* thisContext = &thisThread.m_main; assert( thisContext == thisThread.m_curr ); thisThread.m_addr = GetCurrentThreadId(); thisThread.m_hndl = GetCurrentThreadHandle(); thisContext.bstack = getStackBottom(); thisContext.tstack = thisContext.bstack; thisThread.m_isDaemon = true; Thread.setThis( thisThread ); } else version( Posix ) { Thread thisThread = new Thread(); Thread.Context* thisContext = thisThread.m_curr; assert( thisContext == &thisThread.m_main ); thisThread.m_addr = pthread_self(); thisContext.bstack = getStackBottom(); thisContext.tstack = thisContext.bstack; thisThread.m_isRunning = true; thisThread.m_isDaemon = true; Thread.setThis( thisThread ); } Thread.add( thisThread ); Thread.add( thisContext ); } /** * Deregisters the calling thread from use with Tango. If this routine is * called for a thread which is already registered, the result is undefined. */ extern (C) void thread_detachThis() { Thread.remove( Thread.getThis() ); } /** * Joins all non-daemon threads that are currently running. This is done by * performing successive scans through the thread list until a scan consists * of only daemon threads. */ extern (C) void thread_joinAll() { while( true ) { Thread nonDaemon = null; foreach( t; Thread ) { if( !t.isDaemon ) { nonDaemon = t; break; } } if( nonDaemon is null ) return; nonDaemon.join(); } } /** * Performs intermediate shutdown of the thread module. */ static ~this() { // NOTE: The functionality related to garbage collection must be minimally // operable after this dtor completes. Therefore, only minimal // cleanup may occur. for( Thread t = Thread.sm_tbeg; t; t = t.next ) { if( !t.isRunning ) Thread.remove( t ); } } // Used for needLock below private bool multiThreadedFlag = false; /** * This function is used to determine whether the the process is * multi-threaded. Optimizations may only be performed on this * value if the programmer can guarantee that no path from the * enclosed code will start a thread. * * Returns: * True if Thread.start() has been called in this process. */ extern (C) bool thread_needLock() { return multiThreadedFlag; } // Used for suspendAll/resumeAll below private uint suspendDepth = 0; /** * Suspend all threads but the calling thread for "stop the world" garbage * collection runs. This function may be called multiple times, and must * be followed by a matching number of calls to thread_resumeAll before * processing is resumed. * * Throws: * ThreadException if the suspend operation fails for a running thread. */ extern (C) void thread_suspendAll() { int suspendedCount=0; /** * Suspend the specified thread and load stack and register information for * use by thread_scanAll. If the supplied thread is the calling thread, * stack and register information will be loaded but the thread will not * be suspended. If the suspend operation fails and the thread is not * running then it will be removed from the global thread list, otherwise * an exception will be thrown. * * Params: * t = The thread to suspend. * * Throws: * ThreadException if the suspend operation fails for a running thread. */ void suspend( Thread t ) { version( Win32 ) { if( t.m_addr != GetCurrentThreadId() && SuspendThread( t.m_hndl ) == 0xFFFFFFFF ) { if( !t.isRunning ) { Thread.remove( t ); return; } throw new ThreadException( "Unable to suspend thread" ); } CONTEXT context = void; context.ContextFlags = CONTEXT_INTEGER | CONTEXT_CONTROL; if( !GetThreadContext( t.m_hndl, &context ) ) throw new ThreadException( "Unable to load thread context" ); if( !t.m_lock ) t.m_curr.tstack = cast(void*) context.Esp; // edi,esi,ebp,esp,ebx,edx,ecx,eax t.m_reg[0] = context.Edi; t.m_reg[1] = context.Esi; t.m_reg[2] = context.Ebp; t.m_reg[3] = context.Esp; t.m_reg[4] = context.Ebx; t.m_reg[5] = context.Edx; t.m_reg[6] = context.Ecx; t.m_reg[7] = context.Eax; } else version( Posix ) { if( t.m_addr != pthread_self() ) { if( pthread_kill( t.m_addr, SIGUSR1 ) != 0 ) { if( !t.isRunning ) { Thread.remove( t ); return; } throw new ThreadException( "Unable to suspend thread" ); } version (AtomicSuspendCount){ ++suspendedCount; version(AtomicSuspendCount){ version(SuspendOneAtTime){ // when debugging suspending all threads at once might give "lost" signals int icycle=0; suspendLoop: while (flagGet(suspendCount)!=suspendedCount){ for (size_t i=1000;i!=0;--i){ if (flagGet(suspendCount)==suspendedCount) break suspendLoop; if (++icycle==100_000){ debug(Thread) printf("waited %d cycles for thread suspension, suspendCount=%d, should be %d\nAtomic ops do not work?\nContinuing wait...\n",icycle,suspendCount,suspendedCount); } Thread.yield(); } Thread.sleep(0.0001); } } } } else { sem_wait( &suspendCount ); // shouldn't the return be checked and maybe a loop added for further interrupts // as in Semaphore.d ? } } else if( !t.m_lock ) { t.m_curr.tstack = getStackTop(); } } } // NOTE: We've got an odd chicken & egg problem here, because while the GC // is required to call thread_init before calling any other thread // routines, thread_init may allocate memory which could in turn // trigger a collection. Thus, thread_suspendAll, thread_scanAll, // and thread_resumeAll must be callable before thread_init completes, // with the assumption that no other GC memory has yet been allocated // by the system, and thus there is no risk of losing data if the // global thread list is empty. The check of Thread.sm_tbeg // below is done to ensure thread_init has completed, and therefore // that calling Thread.getThis will not result in an error. For the // short time when Thread.sm_tbeg is null, there is no reason // not to simply call the multithreaded code below, with the // expectation that the foreach loop will never be entered. if( !multiThreadedFlag && Thread.sm_tbeg ) { if( ++suspendDepth == 1 ) { suspend( Thread.getThis() ); } return; } _d_monitorenter(Thread.slock); { if( ++suspendDepth > 1 ) return; // NOTE: I'd really prefer not to check isRunning within this loop but // not doing so could be problematic if threads are termianted // abnormally and a new thread is created with the same thread // address before the next GC run. This situation might cause // the same thread to be suspended twice, which would likely // cause the second suspend to fail, the garbage collection to // abort, and Bad Things to occur. for( Thread t = Thread.sm_tbeg; t; t = t.next ) { if( t.isRunning ){ suspend( t ); } else Thread.remove( t ); } version( Posix ) { version(AtomicSuspendCount){ int icycle=0; suspendLoop2: while (flagGet(suspendCount)!=suspendedCount){ for (size_t i=1000;i!=0;--i){ if (flagGet(suspendCount)==suspendedCount) break suspendLoop2; if (++icycle==1000_000){ debug(Thread) printf("waited %d cycles for thread suspension, suspendCount=%d, should be %d\nAtomic ops do not work?\nContinuing wait...\n",icycle,suspendCount,suspendedCount); } Thread.yield(); } Thread.sleep(0.0001); } } } } } /** * Resume all threads but the calling thread for "stop the world" garbage * collection runs. This function must be called once for each preceding * call to thread_suspendAll before the threads are actually resumed. * * In: * This routine must be preceded by a call to thread_suspendAll. * * Throws: * ThreadException if the resume operation fails for a running thread. */ extern (C) void thread_resumeAll() in { assert( suspendDepth > 0 ); } body { version(AtomicSuspendCount) version(SuspendOneAtTime) auto suspendedCount=flagGet(suspendCount); /** * Resume the specified thread and unload stack and register information. * If the supplied thread is the calling thread, stack and register * information will be unloaded but the thread will not be resumed. If * the resume operation fails and the thread is not running then it will * be removed from the global thread list, otherwise an exception will be * thrown. * * Params: * t = The thread to resume. * * Throws: * ThreadException if the resume fails for a running thread. */ void resume( Thread t ) { version( Win32 ) { if( t.m_addr != GetCurrentThreadId() && ResumeThread( t.m_hndl ) == 0xFFFFFFFF ) { if( !t.isRunning ) { Thread.remove( t ); return; } throw new ThreadException( "Unable to resume thread" ); } if( !t.m_lock ) t.m_curr.tstack = t.m_curr.bstack; t.m_reg[0 .. $] = 0; } else version( Posix ) { if( t.m_addr != pthread_self() ) { if( pthread_kill( t.m_addr, SIGUSR2 ) != 0 ) { if( !t.isRunning ) { Thread.remove( t ); return; } throw new ThreadException( "Unable to resume thread" ); } version (AtomicSuspendCount){ version(SuspendOneAtTime){ // when debugging suspending all threads at once might give "lost" signals --suspendedCount; int icycle=0; recoverLoop: while(flagGet(suspendCount)>suspendedCount){ for (size_t i=1000;i!=0;--i){ if (flagGet(suspendCount)==suspendedCount) break recoverLoop; if (++icycle==100_000){ debug(Thread) printf("waited %d cycles for thread recover, suspendCount=%d, should be %d\nAtomic ops do not work?\nContinuing wait...\n",icycle,suspendCount,suspendedCount); } Thread.yield(); } Thread.sleep(0.0001); } } } else { sem_wait( &suspendCount ); // shouldn't the return be checked and maybe a loop added for further interrupts // as in Semaphore.d ? } } else if( !t.m_lock ) { t.m_curr.tstack = t.m_curr.bstack; } } } // NOTE: See thread_suspendAll for the logic behind this. if( !multiThreadedFlag && Thread.sm_tbeg ) { if( --suspendDepth == 0 ) resume( Thread.getThis() ); return; } { scope(exit) _d_monitorexit(Thread.slock); if( --suspendDepth > 0 ) return; { for( Thread t = Thread.sm_tbeg; t; t = t.next ) { resume( t ); } version(AtomicSuspendCount){ int icycle=0; recoverLoop2: while(flagGet(suspendCount)>0){ for (size_t i=1000;i!=0;--i){ Thread.yield(); if (flagGet(suspendCount)==0) break recoverLoop2; if (++icycle==100_000){ debug(Thread) printf("waited %d cycles for thread recovery, suspendCount=%d, should be %d\nAtomic ops do not work?\nContinuing wait...\n",icycle,suspendCount,0); } } Thread.sleep(0.0001); } } } } } private alias scope void delegate( void*, void* ) scanAllThreadsFn; /** * The main entry point for garbage collection. The supplied delegate * will be passed ranges representing both stack and register values. * * Params: * scan = The scanner function. It should scan from p1 through p2 - 1. * curStackTop = An optional pointer to the top of the calling thread's stack. * * In: * This routine must be preceded by a call to thread_suspendAll. */ extern (C) void thread_scanAll( scanAllThreadsFn scan, void* curStackTop = null ) in { assert( suspendDepth > 0 ); } body { Thread thisThread = null; void* oldStackTop = null; if( curStackTop && Thread.sm_tbeg ) { thisThread = Thread.getThis(); if( thisThread && (!thisThread.m_lock) ) { oldStackTop = thisThread.m_curr.tstack; thisThread.m_curr.tstack = curStackTop; } } scope( exit ) { if( curStackTop && Thread.sm_tbeg ) { if( thisThread && (!thisThread.m_lock) ) { thisThread.m_curr.tstack = oldStackTop; } } } // NOTE: Synchronizing on Thread.slock is not needed because this // function may only be called after all other threads have // been suspended from within the same lock. for( Thread.Context* c = Thread.sm_cbeg; c; c = c.next ) { version( StackGrowsDown ) { // NOTE: We can't index past the bottom of the stack // so don't do the "+1" for StackGrowsDown. if( c.tstack && c.tstack < c.bstack ) scan( c.tstack, c.bstack ); } else { if( c.bstack && c.bstack < c.tstack ) scan( c.bstack, c.tstack + 1 ); } } version( Win32 ) { for( Thread t = Thread.sm_tbeg; t; t = t.next ) { scan( &t.m_reg[0], &t.m_reg[0] + t.m_reg.length ); } } } //////////////////////////////////////////////////////////////////////////////// // Thread Local //////////////////////////////////////////////////////////////////////////////// /** * This class encapsulates the operations required to initialize, access, and * destroy thread local data. */ class ThreadLocal( T ) { //////////////////////////////////////////////////////////////////////////// // Initialization //////////////////////////////////////////////////////////////////////////// /** * Initializes thread local storage for the indicated value which will be * initialized to def for all threads. * * Params: * def = The default value to return if no value has been explicitly set. */ this( T def = T.init ) { m_def = def; m_key = Thread.createLocal(); } ~this() { Thread.deleteLocal( m_key ); } //////////////////////////////////////////////////////////////////////////// // Accessors //////////////////////////////////////////////////////////////////////////// /** * Gets the value last set by the calling thread, or def if no such value * has been set. * * Returns: * The stored value or def if no value is stored. */ T val() { Wrap* wrap = cast(Wrap*) Thread.getLocal( m_key ); return wrap ? wrap.val : m_def; } /** * Copies newval to a location specific to the calling thread, and returns * newval. * * Params: * newval = The value to set. * * Returns: * The value passed to this function. */ T val( T newval ) { Wrap* wrap = cast(Wrap*) Thread.getLocal( m_key ); if( wrap is null ) { wrap = new Wrap; Thread.setLocal( m_key, wrap ); } wrap.val = newval; return newval; } private: // // A wrapper for the stored data. This is needed for determining whether // set has ever been called for this thread (and therefore whether the // default value should be returned) and also to flatten the differences // between data that is smaller and larger than (void*).sizeof. The // obvious tradeoff here is an extra per-thread allocation for each // ThreadLocal value as compared to calling the Thread routines directly. // struct Wrap { T val; } T m_def; uint m_key; } //////////////////////////////////////////////////////////////////////////////// // Thread Group //////////////////////////////////////////////////////////////////////////////// /** * This class is intended to simplify certain common programming techniques. */ class ThreadGroup { /** * Creates and starts a new Thread object that executes fn and adds it to * the list of tracked threads. * * Params: * fn = The thread function. * * Returns: * A reference to the newly created thread. */ final Thread create( void function() fn ) { Thread t = new Thread( fn ); t.start(); synchronized( this ) { m_all[t] = t; } return t; } /** * Creates and starts a new Thread object that executes dg and adds it to * the list of tracked threads. * * Params: * dg = The thread function. * * Returns: * A reference to the newly created thread. */ final Thread create( void delegate() dg ) { Thread t = new Thread( dg ); t.start(); synchronized( this ) { m_all[t] = t; } return t; } /** * Add t to the list of tracked threads if it is not already being tracked. * * Params: * t = The thread to add. * * In: * t must not be null. */ final void add( Thread t ) in { assert( t ); } body { synchronized( this ) { m_all[t] = t; } } /** * Removes t from the list of tracked threads. No operation will be * performed if t is not currently being tracked by this object. * * Params: * t = The thread to remove. * * In: * t must not be null. */ final void remove( Thread t ) in { assert( t ); } body { synchronized( this ) { m_all.remove( t ); } } /** * Operates on all threads currently tracked by this object. */ final int opApply( scope int delegate( ref Thread ) dg ) { synchronized( this ) { int ret = 0; // NOTE: This loop relies on the knowledge that m_all uses the // Thread object for both the key and the mapped value. foreach( Thread t; m_all.keys ) { ret = dg( t ); if( ret ) break; } return ret; } } /** * Iteratively joins all tracked threads. This function will block add, * remove, and opApply until it completes. * * Params: * rethrow = Rethrow any unhandled exception which may have caused the * current thread to terminate. * * Throws: * Any exception not handled by the joined threads. */ final void joinAll( bool rethrow = true ) { synchronized( this ) { // NOTE: This loop relies on the knowledge that m_all uses the // Thread object for both the key and the mapped value. foreach( Thread t; m_all.keys ) { t.join( rethrow ); } } } private: Thread[Thread] m_all; } //////////////////////////////////////////////////////////////////////////////// // Fiber Platform Detection and Memory Allocation //////////////////////////////////////////////////////////////////////////////// private { version( D_InlineAsm_X86 ) { version( X86_64 ) { // Shouldn't an x64 compiler be setting D_InlineAsm_X86_64 instead? } else { version( Win32 ) version = AsmX86_Win32; else version( Posix ) version = AsmX86_Posix; } } else version( D_InlineAsm_X86_64 ) { version( Posix ) version = AsmX86_64_Posix; } else version( PPC ) { version( Posix ) version = AsmPPC_Posix; } version( Posix ) { import tango.stdc.posix.unistd; // for sysconf import tango.stdc.posix.sys.mman; // for mmap import tango.stdc.posix.stdlib; // for malloc, valloc, free version( AsmX86_Win32 ) {} else version( AsmX86_Posix ) {} else version( AsmX86_64_Posix ) {} else version( AsmPPC_Posix ) {} else { // NOTE: The ucontext implementation requires architecture specific // data definitions to operate so testing for it must be done // by checking for the existence of ucontext_t rather than by // a version identifier. Please note that this is considered // an obsolescent feature according to the POSIX spec, so a // custom solution is still preferred. import tango.stdc.posix.ucontext; static assert( is( ucontext_t ), "Unknown fiber implementation"); } } const size_t PAGESIZE; } static this() { static if( is( typeof( GetSystemInfo ) ) ) { SYSTEM_INFO info; GetSystemInfo( &info ); PAGESIZE = info.dwPageSize; assert( PAGESIZE < int.max ); } else static if( is( typeof( sysconf ) ) && is( typeof( _SC_PAGESIZE ) ) ) { PAGESIZE = cast(size_t) sysconf( _SC_PAGESIZE ); assert( PAGESIZE < int.max ); } else { version( PPC ) PAGESIZE = 8192; else PAGESIZE = 4096; } } //////////////////////////////////////////////////////////////////////////////// // Fiber Entry Point and Context Switch //////////////////////////////////////////////////////////////////////////////// private { extern (C) void fiber_entryPoint() { Fiber obj = Fiber.getThis(); assert( obj ); assert( Thread.getThis().m_curr is obj.m_ctxt ); volatile Thread.getThis().m_lock = false; obj.m_ctxt.tstack = obj.m_ctxt.bstack; obj.m_state = Fiber.State.EXEC; try { obj.run(); } catch( Throwable o ) { obj.m_unhandled = o; } static if( is( ucontext_t ) ) obj.m_ucur = &obj.m_utxt; obj.m_state = Fiber.State.TERM; obj.switchOut(); } // NOTE: If AsmPPC_Posix is defined then the context switch routine will // be defined externally until GDC supports inline PPC ASM. version( AsmPPC_Posix ) extern (C) void fiber_switchContext( void** oldp, void* newp ); else extern (C) void fiber_switchContext( void** oldp, void* newp ) { // NOTE: The data pushed and popped in this routine must match the // default stack created by Fiber.initStack or the initial // switch into a new context will fail. version( AsmX86_Win32 ) { asm { naked; // save current stack state push EBP; mov EBP, ESP; push EAX; push dword ptr FS:[0]; push dword ptr FS:[4]; push dword ptr FS:[8]; push EBX; push ESI; push EDI; // store oldp again with more accurate address mov EAX, dword ptr 8[EBP]; mov [EAX], ESP; // load newp to begin context switch mov ESP, dword ptr 12[EBP]; // load saved state from new stack pop EDI; pop ESI; pop EBX; pop dword ptr FS:[8]; pop dword ptr FS:[4]; pop dword ptr FS:[0]; pop EAX; pop EBP; // 'return' to complete switch ret; } } else version( AsmX86_Posix ) { asm { naked; // save current stack state push EBP; mov EBP, ESP; push EAX; push EBX; push ECX; push ESI; push EDI; // store oldp again with more accurate address mov EAX, dword ptr 8[EBP]; mov [EAX], ESP; // load newp to begin context switch mov ESP, dword ptr 12[EBP]; // load saved state from new stack pop EDI; pop ESI; pop ECX; pop EBX; pop EAX; pop EBP; // 'return' to complete switch ret; } } else version( AsmX86_64_Posix ) { version( DigitalMars ) const dmdgdc = true; else version (GNU) const dmdgdc = true; else const dmdgdc = false; static if (dmdgdc == true) asm { naked; // save current stack state push RBP; mov RBP, RSP; push RBX; push R12; push R13; push R14; push R15; sub RSP, 4; stmxcsr [RSP]; sub RSP, 4; //version(SynchroFloatExcept){ fstcw [RSP]; fwait; //} else { // fnstcw [RSP]; // fnclex; //} // store oldp again with more accurate address mov [RDI], RSP; // load newp to begin context switch mov RSP, RSI; // load saved state from new stack fldcw [RSP]; add RSP, 4; ldmxcsr [RSP]; add RSP, 4; pop R15; pop R14; pop R13; pop R12; pop RBX; pop RBP; // 'return' to complete switch ret; } else asm { naked; // save current stack state pushq RBP; mov RBP, RSP; pushq RBX; pushq R12; pushq R13; pushq R14; pushq R15; sub RSP, 4; stmxcsr [RSP]; sub RSP, 4; //version(SynchroFloatExcept){ fstcw [RSP]; fwait; //} else { // fnstcw [RSP]; // fnclex; //} // store oldp again with more accurate address mov [RDI], RSP; // load newp to begin context switch mov RSP, RSI; // load saved state from new stack fldcw [RSP]; add RSP, 4; ldmxcsr [RSP]; add RSP, 4; popq R15; popq R14; popq R13; popq R12; popq RBX; popq RBP; // 'return' to complete switch ret; } } else static if( is( ucontext_t ) ) { Fiber cfib = Fiber.getThis(); void* ucur = cfib.m_ucur; *oldp = &ucur; swapcontext( **(cast(ucontext_t***) oldp), *(cast(ucontext_t**) newp) ); } } } //////////////////////////////////////////////////////////////////////////////// // Fiber //////////////////////////////////////////////////////////////////////////////// private char[] ptrToStr(size_t addr,char[]buf){ enum char[] digits="0123456789ABCDEF".dup; enum{ nDigits=size_t.sizeof*2 } if (nDigits>buf.length) assert(0); char[] res=buf[0..nDigits]; size_t addrAtt=addr; for (int i=nDigits;i!=0;--i){ res[i-1]=digits[addrAtt&0xF]; addrAtt>>=4; } return res; } /** * This class provides a cooperative concurrency mechanism integrated with the * threading and garbage collection functionality. Calling a fiber may be * considered a blocking operation that returns when the fiber yields (via * Fiber.yield()). Execution occurs within the context of the calling thread * so synchronization is not necessary to guarantee memory visibility so long * as the same thread calls the fiber each time. Please note that there is no * requirement that a fiber be bound to one specific thread. Rather, fibers * may be freely passed between threads so long as they are not currently * executing. Like threads, a new fiber thread may be created using either * derivation or composition, as in the following example. * * Example: * ---------------------------------------------------------------------- * class DerivedFiber : Fiber * { * this() * { * super( &run ); * } * * private : * void run() * { * printf( "Derived fiber running.\n" ); * } * } * * void fiberFunc() * { * printf( "Composed fiber running.\n" ); * Fiber.yield(); * printf( "Composed fiber running.\n" ); * } * * // create instances of each type * Fiber derived = new DerivedFiber(); * Fiber composed = new Fiber( &fiberFunc ); * * // call both fibers once * derived.call(); * composed.call(); * printf( "Execution returned to calling context.\n" ); * composed.call(); * * // since each fiber has run to completion, each should have state TERM * assert( derived.state == Fiber.State.TERM ); * assert( composed.state == Fiber.State.TERM ); * ---------------------------------------------------------------------- * * Authors: Based on a design by Mikola Lysenko. */ class Fiber { static class Scheduler { alias void* Handle; enum Type {Read=1, Write=2, Accept=3, Connect=4, Transfer=5} void pause (uint ms) {} void ready (Fiber fiber) {} void open (Handle fd, char[] name) {} void close (Handle fd, char[] name) {} void await (Handle fd, Type t, uint timeout) {} void spawn (char[] name, void delegate() dg, size_t stack=8192) {} } struct Event // scheduler support { uint idx; // support for timer removal Fiber next; // linked list of elapsed fibers void* data; // data to exchange ulong clock; // request timeout duration Scheduler.Handle handle; // IO request handle Scheduler scheduler; // associated scheduler (may be null) } /+ final override int opCmp (Object o) { throw new Exception ("Invalid opCmp in Fiber"); auto other = cast(Fiber) cast(void*) o; if (other) { auto x = cast(long) event.clock - cast(long) other.event.clock; return (x < 0 ? -1 : x is 0 ? 0 : 1); } return 1; } +/ final static Scheduler scheduler () { return getThis.event.scheduler; } //////////////////////////////////////////////////////////////////////////// // Initialization //////////////////////////////////////////////////////////////////////////// /** * Initializes an empty fiber object * * (useful to reset it) */ this(size_t sz){ m_dg = null; m_fn = null; m_call = Call.NO; m_state = State.TERM; m_unhandled = null; allocStack( sz ); } /** * Initializes a fiber object which is associated with a static * D function. * * Params: * fn = The thread function. * sz = The stack size for this fiber. * * In: * fn must not be null. */ this( void function() fn, size_t sz = PAGESIZE) in { assert( fn ); } body { m_fn = fn; m_call = Call.FN; m_state = State.HOLD; allocStack( sz ); initStack(); } /** * Initializes a fiber object which is associated with a dynamic * D function. * * Params: * dg = The thread function. * sz = The stack size for this fiber. * * In: * dg must not be null. */ this( void delegate() dg, size_t sz = PAGESIZE, Scheduler s = null ) in { assert( dg ); } body { event.scheduler = s; m_dg = dg; m_call = Call.DG; m_state = State.HOLD; allocStack(sz); initStack(); } /** * Cleans up any remaining resources used by this object. */ ~this() { // NOTE: A live reference to this object will exist on its associated // stack from the first time its call() method has been called // until its execution completes with State.TERM. Thus, the only // times this dtor should be called are either if the fiber has // terminated (and therefore has no active stack) or if the user // explicitly deletes this object. The latter case is an error // but is not easily tested for, since State.HOLD may imply that // the fiber was just created but has never been run. There is // not a compelling case to create a State.INIT just to offer a // means of ensuring the user isn't violating this object's // contract, so for now this requirement will be enforced by // documentation only. freeStack(); } //////////////////////////////////////////////////////////////////////////// // General Actions //////////////////////////////////////////////////////////////////////////// /** * Transfers execution to this fiber object. The calling context will be * suspended until the fiber calls Fiber.yield() or until it terminates * via an unhandled exception. * * Params: * rethrow = Rethrow any unhandled exception which may have caused this * fiber to terminate. * * In: * This fiber must be in state HOLD. * * Throws: * Any exception not handled by the joined thread. * * Returns: * Any exception not handled by this fiber if rethrow = false, null * otherwise. */ final Object call( bool rethrow = true ) in { assert( m_state == State.HOLD ); } body { Fiber cur = getThis(); static if( is( ucontext_t ) ) m_ucur = cur ? &cur.m_utxt : &Fiber.sm_utxt; setThis( this ); this.switchIn(); setThis( cur ); static if( is( ucontext_t ) ) m_ucur = null; // NOTE: If the fiber has terminated then the stack pointers must be // reset. This ensures that the stack for this fiber is not // scanned if the fiber has terminated. This is necessary to // prevent any references lingering on the stack from delaying // the collection of otherwise dead objects. The most notable // being the current object, which is referenced at the top of // fiber_entryPoint. if( m_state == State.TERM ) { m_ctxt.tstack = m_ctxt.bstack; } if( m_unhandled ) { Throwable obj = m_unhandled; m_unhandled = null; if( rethrow ) throw obj; return obj; } return null; } /** * Resets this fiber so that it may be re-used with the same function. * This routine may only be * called for fibers that have terminated, as doing otherwise could result * in scope-dependent functionality that is not executed. Stack-based * classes, for example, may not be cleaned up properly if a fiber is reset * before it has terminated. * * In: * This fiber must be in state TERM, and have a valid function/delegate. */ final void reset() in { assert( m_call != Call.NO ); assert( m_state == State.TERM ); assert( m_ctxt.tstack == m_ctxt.bstack ); } body { m_state = State.HOLD; initStack(); m_unhandled = null; } /** * Reinitializes a fiber object which is associated with a static * D function. * * Params: * fn = The thread function. * * In: * This fiber must be in state TERM. * fn must not be null. */ final void reset( void function() fn ) in { assert( fn ); assert( m_state == State.TERM ); assert( m_ctxt.tstack == m_ctxt.bstack ); } body { m_fn = fn; m_call = Call.FN; m_state = State.HOLD; initStack(); m_unhandled = null; } /** * Reinitializes a fiber object which is associated with a dynamic * D function. * * Params: * dg = The thread function. * * In: * This fiber must be in state TERM. * dg must not be null. */ final void reset( void delegate() dg ) in { assert( dg ); assert( m_state == State.TERM ); assert( m_ctxt.tstack == m_ctxt.bstack ); } body { m_dg = dg; m_call = Call.DG; m_state = State.HOLD; initStack(); m_unhandled = null; } /** * Clears the fiber from all references to a previous call (unhandled exceptions, delegate) * * In: * This fiber must be in state TERM. */ final void clear() in { assert( m_state == State.TERM ); assert( m_ctxt.tstack == m_ctxt.bstack ); } body { if (m_state != State.TERM){ char[20] buf; throw new Exception("Fiber@"~ptrToStr(cast(size_t)cast(void*)this,buf).idup~" in unexpected state "~ptrToStr(m_state,buf).idup,__FILE__,__LINE__); } if (m_ctxt.tstack != m_ctxt.bstack){ char[20] buf; throw new Exception("Fiber@"~ptrToStr(cast(size_t)cast(void*)this,buf).idup~" bstack="~ptrToStr(cast(size_t)cast(void*)m_ctxt.bstack,buf).idup~" != tstack="~ptrToStr(cast(size_t)cast(void*)m_ctxt.tstack,buf).idup,__FILE__,__LINE__); } m_dg = null; m_fn = null; m_call = Call.NO; m_state = State.TERM; m_unhandled = null; } //////////////////////////////////////////////////////////////////////////// // General Properties //////////////////////////////////////////////////////////////////////////// /** * A fiber may occupy one of three states: HOLD, EXEC, and TERM. The HOLD * state applies to any fiber that is suspended and ready to be called. * The EXEC state will be set for any fiber that is currently executing. * And the TERM state is set when a fiber terminates. Once a fiber * terminates, it must be reset before it may be called again. */ enum State { HOLD, /// EXEC, /// TERM /// } /** * Gets the current state of this fiber. * * Returns: * The state of this fiber as an enumerated value. */ const final State state() { return m_state; } const size_t stackSize(){ return m_size; } //////////////////////////////////////////////////////////////////////////// // Actions on Calling Fiber //////////////////////////////////////////////////////////////////////////// /** * Forces a context switch to occur away from the calling fiber. */ final void cede () { assert( m_state == State.EXEC ); static if( is( ucontext_t ) ) m_ucur = &m_utxt; m_state = State.HOLD; switchOut(); m_state = State.EXEC; } /** * Forces a context switch to occur away from the calling fiber. */ static void yield() { Fiber cur = getThis; assert( cur, "Fiber.yield() called with no active fiber" ); if (cur.event.scheduler) cur.event.scheduler.pause (0); else cur.cede; } /** * Forces a context switch to occur away from the calling fiber and then * throws obj in the calling fiber. * * Params: * obj = The object to throw. * * In: * obj must not be null. */ static void yieldAndThrow( Throwable obj ) in { assert( obj ); } body { Fiber cur = getThis(); assert( cur, "Fiber.yield(obj) called with no active fiber" ); cur.m_unhandled = obj; if (cur.event.scheduler) cur.event.scheduler.pause (0); else cur.cede; } //////////////////////////////////////////////////////////////////////////// // Fiber Accessors //////////////////////////////////////////////////////////////////////////// /** * Provides a reference to the calling fiber or null if no fiber is * currently active. * * Returns: * The fiber object representing the calling fiber or null if no fiber * is currently active. The result of deleting this object is undefined. */ static Fiber getThis() { version( Win32 ) { return cast(Fiber) TlsGetValue( sm_this ); } else version( Posix ) { return cast(Fiber) pthread_getspecific( sm_this ); } } //////////////////////////////////////////////////////////////////////////// // Static Initialization //////////////////////////////////////////////////////////////////////////// static this() { version( Win32 ) { sm_this = TlsAlloc(); assert( sm_this != TLS_OUT_OF_INDEXES ); } else version( Posix ) { int status; status = pthread_key_create( &sm_this, null ); assert( status == 0 ); static if( is( ucontext_t ) ) { status = getcontext( &sm_utxt ); assert( status == 0 ); } } } private: // // Initializes a fiber object which has no associated executable function. // this() { m_call = Call.NO; } // // Fiber entry point. Invokes the function or delegate passed on // construction (if any). // final void run() { switch( m_call ) { case Call.FN: m_fn(); break; case Call.DG: m_dg(); break; default: break; } } private: // // The type of routine passed on fiber construction. // enum Call { NO, FN, DG } // // Standard fiber data // Call m_call; union { void function() m_fn; void delegate() m_dg; } bool m_isRunning; Throwable m_unhandled; State m_state; char[] m_name; public: Event event; private: //////////////////////////////////////////////////////////////////////////// // Stack Management //////////////////////////////////////////////////////////////////////////// // // Allocate a new stack for this fiber. // final void allocStack( size_t sz ) in { assert( !m_pmem && !m_ctxt ); } body { // adjust alloc size to a multiple of PAGESIZE sz += PAGESIZE - 1; sz -= sz % PAGESIZE; // NOTE: This instance of Thread.Context is dynamic so Fiber objects // can be collected by the GC so long as no user level references // to the object exist. If m_ctxt were not dynamic then its // presence in the global context list would be enough to keep // this object alive indefinitely. An alternative to allocating // room for this struct explicitly would be to mash it into the // base of the stack being allocated below. However, doing so // requires too much special logic to be worthwhile. m_ctxt = new Thread.Context; static if( is( typeof( VirtualAlloc ) ) ) { // reserve memory for stack m_pmem = VirtualAlloc( null, sz + PAGESIZE, MEM_RESERVE, PAGE_NOACCESS ); if( !m_pmem ) { throw new FiberException( "Unable to reserve memory for stack" ); } version( StackGrowsDown ) { void* stack = m_pmem + PAGESIZE; void* guard = m_pmem; void* pbase = stack + sz; } else { void* stack = m_pmem; void* guard = m_pmem + sz; void* pbase = stack; } // allocate reserved stack segment stack = VirtualAlloc( stack, sz, MEM_COMMIT, PAGE_READWRITE ); if( !stack ) { throw new FiberException( "Unable to allocate memory for stack" ); } // allocate reserved guard page guard = VirtualAlloc( guard, PAGESIZE, MEM_COMMIT, PAGE_READWRITE | PAGE_GUARD ); if( !guard ) { throw new FiberException( "Unable to create guard page for stack" ); } m_ctxt.bstack = pbase; m_ctxt.tstack = pbase; m_size = sz; } else { static if( is( typeof( mmap ) ) ) { m_pmem = mmap( null, sz, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANON, -1, 0 ); if( m_pmem == MAP_FAILED ) m_pmem = null; } else static if( is( typeof( valloc ) ) ) { m_pmem = valloc( sz ); } else static if( is( typeof( malloc ) ) ) { m_pmem = malloc( sz ); } else { m_pmem = null; } if( !m_pmem ) { throw new FiberException( "Unable to allocate memory for stack" ); } version( StackGrowsDown ) { m_ctxt.bstack = m_pmem + sz; m_ctxt.tstack = m_pmem + sz; } else { m_ctxt.bstack = m_pmem; m_ctxt.tstack = m_pmem; } m_size = sz; } Thread.add( m_ctxt ); } // // Free this fiber's stack. // final void freeStack() in { assert( m_pmem && m_ctxt ); } body { // NOTE: Since this routine is only ever expected to be called from // the dtor, pointers to freed data are not set to null. // NOTE: m_ctxt is guaranteed to be alive because it is held in the // global context list. Thread.remove( m_ctxt ); static if( is( typeof( VirtualAlloc ) ) ) { VirtualFree( m_pmem, 0, MEM_RELEASE ); } else static if( is( typeof( mmap ) ) ) { munmap( m_pmem, m_size ); } else static if( is( typeof( valloc ) ) ) { free( m_pmem ); } else static if( is( typeof( malloc ) ) ) { free( m_pmem ); } delete m_ctxt; } // // Initialize the allocated stack. // final void initStack() in { assert( m_ctxt.tstack && m_ctxt.tstack == m_ctxt.bstack ); assert( cast(size_t) m_ctxt.bstack % (void*).sizeof == 0 ); } body { void* pstack = m_ctxt.tstack; scope( exit ) m_ctxt.tstack = pstack; void push( size_t val ) { version( StackGrowsDown ) { pstack -= size_t.sizeof; *(cast(size_t*) pstack) = val; } else { pstack += size_t.sizeof; *(cast(size_t*) pstack) = val; } } // NOTE: On OS X the stack must be 16-byte aligned according to the // IA-32 call spec. version( darwin ) { pstack = cast(void*)(cast(uint)(pstack) - (cast(uint)(pstack) & 0x0F)); } version( AsmX86_Win32 ) { push( cast(size_t) &fiber_entryPoint ); // EIP push( 0xFFFFFFFF ); // EBP push( 0x00000000 ); // EAX push( 0xFFFFFFFF ); // FS:[0] version( StackGrowsDown ) { push( cast(size_t) m_ctxt.bstack ); // FS:[4] push( cast(size_t) m_ctxt.bstack - m_size ); // FS:[8] } else { push( cast(size_t) m_ctxt.bstack ); // FS:[4] push( cast(size_t) m_ctxt.bstack + m_size ); // FS:[8] } push( 0x00000000 ); // EBX push( 0x00000000 ); // ESI push( 0x00000000 ); // EDI } else version( AsmX86_Posix ) { push( 0x00000000 ); // strange pre EIP push( cast(size_t) &fiber_entryPoint ); // EIP push( (cast(size_t)pstack)+8 ); // EBP push( 0x00000000 ); // EAX push( getEBX() ); // EBX used for PIC code push( 0x00000000 ); // ECX just to have it aligned... push( 0x00000000 ); // ESI push( 0x00000000 ); // EDI } else version( AsmX86_64_Posix ) { push( 0x00000000 ); // strange pre EIP push( cast(size_t) &fiber_entryPoint ); // RIP push( (cast(size_t)pstack)+8 ); // RBP push( 0x00000000_00000000 ); // RBX push( 0x00000000_00000000 ); // R12 push( 0x00000000_00000000 ); // R13 push( 0x00000000_00000000 ); // R14 push( 0x00000000_00000000 ); // R15 push( 0x00001f80_0000037f ); // MXCSR (32 bits), unused (16 bits) , x87 control (16 bits) } else version( AsmPPC_Posix ) { version( StackGrowsDown ) { pstack -= int.sizeof * 5; } else { pstack += int.sizeof * 5; } push( cast(size_t) &fiber_entryPoint ); // link register push( 0x00000000 ); // control register push( 0x00000000 ); // old stack pointer // GPR values version( StackGrowsDown ) { pstack -= int.sizeof * 20; } else { pstack += int.sizeof * 20; } assert( (cast(uint) pstack & 0x0f) == 0 ); } else static if( is( ucontext_t ) ) { getcontext( &m_utxt ); // patch from #1707 - thanks to jerdfelt //m_utxt.uc_stack.ss_sp = m_ctxt.bstack; m_utxt.uc_stack.ss_sp = m_pmem; m_utxt.uc_stack.ss_size = m_size; makecontext( &m_utxt, &fiber_entryPoint, 0 ); // NOTE: If ucontext is being used then the top of the stack will // be a pointer to the ucontext_t struct for that fiber. push( cast(size_t) &m_utxt ); } } public Thread.Context* m_ctxt; public size_t m_size; void* m_pmem; static if( is( ucontext_t ) ) { // NOTE: The static ucontext instance is used to represent the context // of the main application thread. static __gshared ucontext_t sm_utxt = void; ucontext_t m_utxt = void; ucontext_t* m_ucur = null; } private: //////////////////////////////////////////////////////////////////////////// // Storage of Active Fiber //////////////////////////////////////////////////////////////////////////// // // Sets a thread-local reference to the current fiber object. // static void setThis( Fiber f ) { version( Win32 ) { TlsSetValue( sm_this, cast(void*) f ); } else version( Posix ) { pthread_setspecific( sm_this, cast(void*) f ); } } static __gshared Thread.TLSKey sm_this; private: //////////////////////////////////////////////////////////////////////////// // Context Switching //////////////////////////////////////////////////////////////////////////// // // Switches into the stack held by this fiber. // final void switchIn() { Thread tobj = Thread.getThis(); void** oldp = &tobj.m_curr.tstack; void* newp = m_ctxt.tstack; // NOTE: The order of operations here is very important. The current // stack top must be stored before m_lock is set, and pushContext // must not be called until after m_lock is set. This process // is intended to prevent a race condition with the suspend // mechanism used for garbage collection. If it is not followed, // a badly timed collection could cause the GC to scan from the // bottom of one stack to the top of another, or to miss scanning // a stack that still contains valid data. The old stack pointer // oldp will be set again before the context switch to guarantee // that it points to exactly the correct stack location so the // successive pop operations will succeed. *oldp = getStackTop(); volatile tobj.m_lock = true; tobj.pushContext( m_ctxt ); fiber_switchContext( oldp, newp ); // NOTE: As above, these operations must be performed in a strict order // to prevent Bad Things from happening. tobj.popContext(); volatile tobj.m_lock = false; tobj.m_curr.tstack = tobj.m_curr.bstack; } // // Switches out of the current stack and into the enclosing stack. // final void switchOut() { Thread tobj = Thread.getThis(); void** oldp = &m_ctxt.tstack; void* newp = tobj.m_curr.within.tstack; // NOTE: The order of operations here is very important. The current // stack top must be stored before m_lock is set, and pushContext // must not be called until after m_lock is set. This process // is intended to prevent a race condition with the suspend // mechanism used for garbage collection. If it is not followed, // a badly timed collection could cause the GC to scan from the // bottom of one stack to the top of another, or to miss scanning // a stack that still contains valid data. The old stack pointer // oldp will be set again before the context switch to guarantee // that it points to exactly the correct stack location so the // successive pop operations will succeed. *oldp = getStackTop(); volatile tobj.m_lock = true; fiber_switchContext( oldp, newp ); // NOTE: As above, these operations must be performed in a strict order // to prevent Bad Things from happening. tobj=Thread.getThis(); volatile tobj.m_lock = false; tobj.m_curr.tstack = tobj.m_curr.bstack; } } } +/ |