
    daf              	       ~   d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
Z
d dlZd dlZd dlZd dlmZ d dlmZ d dlmZmZ d dlmZ 	 d dlZ G d dej0                        Z ej4                  e
j6                  dk(  d	       G d
 dej0                               Z ej:                  e
j6                  dk(  d       G d dej0                               Z G d dej0                        Z ej4                  e
j6                  dk(  d	       G d dej0                               Z  ej:                   e!ed      d       G d dej0                               Z" ej4                  e
j6                  dk(  d	       ej:                   e!ed      d       ejF                          ej:                   e!ed      d       G d dej0                                                    Z$ ej4                  e
j6                  dk(  d	       ej:                   e!ed      xr	  e!ed      d       G d  d!ej0                                      Z% G d" d#ej0                        Z& G d$ d%ej0                        Z' G d& d'ej0                        Z( G d( d)ej0                        Z)d* Z*e+d+k(  r ejX                          yy# e$ r dZY cw xY w),    N)support)	os_helper)assert_python_okspawn_python)threading_helperc                       e Zd Zd Zd Zy)GenericTestsc                    t        t              D ]  }t        t        |      }|dv r!| j                  |t        j                         8|dv r!| j                  |t        j
                         ]|j                  d      r2|j                  d      s!| j                  |t        j                         |j                  d      s| j                  |t        j                         | j                  t        j                  d        t        j                  t        j                  ddd	 t        
      }t        j                  |t        j                         t        j                  t        j                  ddd t        
      }t        j                  |t        j                         t        t        dd       }|Dt        j                  t        j                  ddd t        
      }t        j                  ||       y y )N>   SIG_DFLSIG_IGN>   	SIG_BLOCKSIG_SETMASKSIG_UNBLOCKSIGSIG_CTRL_win32Signalssignalc                     | j                         xr% | j                  d      xr | j                  d       xs | j                  d      S )Nr   r   r   )isupper
startswithnames    +/root/Python-3.12.4/Lib/test/test_signal.py<lambda>z)GenericTests.test_enums.<locals>.<lambda>*   sD    LLN Q/O8O4O0w/     )sourceHandlersc                 
    | dv S )N)r   r    r   s    r   r   z)GenericTests.test_enums.<locals>.<lambda>4   s    T%;; r   Sigmasksc                 
    | dv S )N)r   r   r   r!   r   s    r   r   z)GenericTests.test_enums.<locals>.<lambda>=   s    )T!T r   )dirr   getattrassertIsInstancer   r"   r   r   assertEqualsysplatformenum_old_convert_IntEnum_test_simple_enum)selfr   sigCheckedSignalsCheckedHandlersr"   CheckedSigmaskss          r   
test_enumszGenericTests.test_enums   sz   K 
	8D&$'C--%%c6??;DD%%c6??;'0G%%c6>>:)%%c6>>:  w7
	8 ++i0  	~v~~>,,j(;
 	@6:t4"00LL*hT!O
 ""?H=  r   c                     t        t              D ]Z  }t        t        |      }t        j                  |      s)t        j
                  |      r?| j                  |j                  d       \ y )Nr   )r$   r   r%   inspect	isroutine	isbuiltinr'   
__module__)r.   r   values      r   test_functions_module_attrz'GenericTests.test_functions_module_attrB   sT     K 	=DFD)E  '0A0A%0H  !1!18<	=r   N)__name__r8   __qualname__r3   r:   r!   r   r   r	   r	      s    %>N=r   r	   r   zNot valid on Windowsc                       e Zd Zd Zd Zd Zd Zd Zd Zd Z	d Z
 ej                   eed	      d
      d        Z ej                  ej"                  d       ej&                         d               Zy)
PosixTestsc                      y Nr!   r.   argss     r   trivial_signal_handlerz!PosixTests.trivial_signal_handlerM   s    r   c                 B    t        j                  | j                  |      S r@   )	functoolspartialrC   )r.   arguments     r   create_handler_with_partialz&PosixTests.create_handler_with_partialP   s      !<!<hGGr   c                     | j                  t        t        j                  d       | j                  t        t        j                  d| j                         | j                  t        t        j
                  d       y )Ni  )assertRaises
ValueErrorr   	getsignalrC   	strsignalr.   s    r   ,test_out_of_range_signal_number_raises_errorz7PosixTests.test_out_of_range_signal_number_raises_errorS   sU    *f&6&6=*fmmT55	7 	*f&6&6=r   c                 l    | j                  t        t        j                  t        j                  d        y r@   )rJ   	TypeErrorr   SIGUSR1rN   s    r   0test_setting_signal_handler_to_none_raises_errorz;PosixTests.test_setting_signal_handler_to_none_raises_error[   s!    )V]] ..$	0r   c                    t        j                   t         j                  | j                        }| j                  |t         j                         | j                  t        j                  t         j                        | j                         t        j                   t         j                  |       | j                  t        j                  t         j                        |       y r@   )r   SIGHUPrC   r&   r   r'   rL   )r.   hups     r   test_getsignalzPosixTests.test_getsignal_   s    mmFMM4+F+FGc6??3))&--844	6fmmS)))&--8#>r   c                 T    G d d      } |       }| j                  d|j                         | j                  |      }t        j                  t        j                  |      }| j                  |t        j                         | j                  t        j                  t        j                        |       t        j                  t        j                  |       | j                  t        j                  t        j                        |       | j                  d|j                         y )Nc                   $     e Zd Zd Z fdZ xZS )GPosixTests.test_no_repr_is_called_on_signal_handler.<locals>.MyArgumentc                     d| _         y Nr   )
repr_countrN   s    r   __init__zPPosixTests.test_no_repr_is_called_on_signal_handler.<locals>.MyArgument.__init__k   s	    "#r   c                 J    | xj                   dz  c_         t        | 	         S N   )r]   super__repr__)r.   	__class__s    r   rc   zPPosixTests.test_no_repr_is_called_on_signal_handler.<locals>.MyArgument.__repr__n   s    1$w'))r   )r;   r8   r<   r^   rc   __classcell__)rd   s   @r   
MyArgumentrZ   j   s    $* *r   rf   r   )r'   r]   rH   r   rU   r&   r   rL   )r.   rf   rG   handlerrV   s        r   (test_no_repr_is_called_on_signal_handlerz3PosixTests.test_no_repr_is_called_on_signal_handlerg   s    	* 	* <H//0228<mmFMM73c6??3))&--8'BfmmS)))&--8#>H//0r   c                 6   | j                  dt        j                  t        j                               | j                  dt        j                  t        j                               | j                  dt        j                  t        j
                               y )N	Interrupt
TerminatedHangup)assertInr   rM   SIGINTSIGTERMrU   rN   s    r   test_strsignalzPosixTests.test_strsignal}   sY    k6#3#3FMM#BClF$4$4V^^$DEh 0 0 ?@r   c                     t         j                  j                  t              }t         j                  j	                  |d      }t        |       y )Nzsignalinterproctester.py)ospathdirname__file__joinr   )r.   rt   scripts      r   test_interprocess_signalz#PosixTests.test_interprocess_signal   s1    ''//(+g'AB r   valid_signalszrequires signal.valid_signalsc                    t        j                         }| j                  |t               | j	                  t         j
                  j                  |       | j	                  t         j
                  j                  |       | j                  d|       | j                  t         j                  |       | j                  t        |      t         j                         t        t               D ]v  }|j                  d      s|dv r| j                  |      5  t        t         |      }| j!                  |d       | j                  |t         j                         d d d        x y # 1 sw Y   xY w)Nr   r   >   r   r   r   )r   ry   r&   setrm   r   rn   SIGALRMassertNotInNSIG
assertLesslenr$   r   subTestr%   assertGreaterEqual)r.   sr   signums       r   test_valid_signalszPosixTests.test_valid_signals   s   
   "a%fnn++Q/fnn,,a0Aa(A, K 		5D??5)--4( 5 .''245 5		55 5s   AE##E,	sys.executable required.c                     t        j                  t        j                  ddgt         j                        }| j                  d|j                         | j                  |j                  t        j                          y)z+KeyboardInterrupt triggers exit via SIGINT.-czaimport os, signal, time
os.kill(os.getpid(), signal.SIGINT)
for _ in range(999): time.sleep(0.01)stderr   KeyboardInterruptN)
subprocessrunr(   
executablePIPErm   r   r'   
returncoder   rn   )r.   processs     r   !test_keyboard_interrupt_exit_codez,PosixTests.test_keyboard_interrupt_exit_code   s_     ..9: "( 	*GNN;++fmm^<r   N)r;   r8   r<   rC   rH   rO   rS   rW   rh   rp   rx   unittest
skipUnlesshasattrr   r   r(   r   r   requires_subprocessr   r!   r   r   r>   r>   K   s    H>0?1,A!
 X('5	5, X)CD W  "	= # E	=r   r>   zWindows specificc                       e Zd Zd Zd Z ej                  ej                  d       e	j                         d               Zy)WindowsSignalTestsc                    t        j                         }| j                  |t               | j	                  t        |      d       | j                  t         j                  j                  |       | j                  d|       | j                  t         j                  |       | j                  t        |      t         j                         y )N   r   )r   ry   r&   r{   r   r   rm   r   rn   r}   r~   r   r.   r   s     r   r   z%WindowsSignalTests.test_valid_signals   s      "a%A*fnn++Q/Aa(A,r   c                    d }t               }t        j                  t        j                  t        j                  t        j
                  t        j                  t        j                  t        j                  fD ]S  }t        j                  |      t        j                  |t        j                  ||             |j                  |       U | j                  |       | j                  t              5  t        j                  d|       d d d        | j                  t              5  t        j                  d|       d d d        y # 1 sw Y   >xY w# 1 sw Y   y xY w)Nc                      y r@   r!   )xys     r   r   z3WindowsSignalTests.test_issue9324.<locals>.<lambda>   s    r      )r{   r   SIGABRTSIGBREAKSIGFPESIGILLrn   SIGSEGVro   rL   add
assertTruerJ   rK   )r.   rg   checkedr/   s       r   test_issue9324z!WindowsSignalTests.test_issue9324   s    #%NNFOOV]]MM6==&..NN$ 	!C
 $0c6==g#>?C 	! 	 z* 	'MM"g&	' z* 	&MM!W%	& 	&	' 	'	& 	&s   5E	)E	EEr   c                     t        j                  t        j                  ddgt         j                        }| j                  d|j                         d}| j                  |j                  |       y)z?KeyboardInterrupt triggers an exit using STATUS_CONTROL_C_EXIT.r   zraise KeyboardInterruptr   r   l   :   N)	r   r   r(   r   r   rm   r   r'   r   )r.   r   STATUS_CONTROL_C_EXITs      r   r   z4WindowsSignalTests.test_keyboard_interrupt_exit_code   sZ     ..'@A!( 	*GNN; *++-BCr   N)r;   r8   r<   r   r   r   r   r(   r   r   r   r   r!   r   r   r   r      sJ    -&* X)CD W  "D # EDr   r   c                   *   e Zd Zd Zd Z ej                  ej                  d      d        Z	 ej                  ej                  d       ej                   eed      d      d               Z ej                  ej                  d       ej                  ej                  d      d	               Z ej                  ej"                  d
k(  d       ej                  ej                  d       ej                   eed      d      d                      Zy)WakeupFDTestsc                 <   | j                  t              5  t        j                  t        j                         d d d        | j                  t              5  t        j                  t        j                  d       d d d        y # 1 sw Y   LxY w# 1 sw Y   y xY w)N)r   F)rJ   rQ   r   set_wakeup_fdrn   rN   s    r   test_invalid_callzWakeupFDTests.test_invalid_call   su    y) 	7  6	7 y) 	7  6	7 	7		7 	7	7 	7s   %B%BBBc                     t        j                         }| j                  t        t        ft
        j                  |       y r@   )r   make_bad_fdrJ   rK   OSErrorr   r   )r.   fds     r   test_invalid_fdzWakeupFDTests.test_invalid_fd   s0    ""$:w/ ..	4r   zneeds working sockets.c                     t        j                          }|j                         }|j                          | j                  t        t
        ft        j                  |       y r@   )socketfilenocloserJ   rK   r   r   r   )r.   sockr   s      r   test_invalid_socketz!WakeupFDTests.test_invalid_socket   sA    }}[[]

:w/ ..	4r   zEmscripten cannot fstat pipes.piperequires os.pipe()c                    t        j                         \  }}| j                  t         j                  |       | j                  t         j                  |       t        j                         \  }}| j                  t         j                  |       | j                  t         j                  |       t	        t         d      r,t        j
                  |d       t        j
                  |d       t        j                  |       | j                  t        j                  |      |       | j                  t        j                  d      |       | j                  t        j                  d      d       y )Nset_blockingFr   )	rr   r   
addCleanupr   r   r   r   r   r'   )r.   r1w1r2w2s        r   test_set_wakeup_fd_resultz'WakeupFDTests.test_set_wakeup_fd_result   s     B"%"%B"%"%2~&OOB&OOB&R --b126--b126--b126r   c                 L   t        j                          }| j                  |j                         |j                  d       |j	                         }t        j                          }| j                  |j                         |j                  d       |j	                         }t        j                  |       | j                  t        j                  |      |       | j                  t        j                  d      |       | j                  t        j                  d      d       y )NFr   )r   r   r   setblockingr   r   r   r'   )r.   sock1fd1sock2fd2s        r    test_set_wakeup_fd_socket_resultz.WakeupFDTests.test_set_wakeup_fd_socket_result  s     $% lln$% llnS!--c2C8--b137--b126r   r   ztests specific to POSIXc                 ,   t        j                         \  }}| j                  t         j                  |       | j                  t         j                  |       t        j                  |d       | j                  t              5 }t        j                  |       d d d        | j                  t        j                        d|z         t        j                  |d       t        j                  |       t        j                  d       y # 1 sw Y   rxY w)NTz&the fd %s must be in non-blocking modeFr   )rr   r   r   r   r   rJ   rK   r   r   r'   str	exception)r.   rfdwfdcms       r   test_set_wakeup_fd_blockingz)WakeupFDTests.test_set_wakeup_fd_blocking&  s     779S#&#& 	T"z* 	&b  %	&R\\*ACG	I 	U#S!R 	& 	&s   D

DN)r;   r8   r<   r   r   r   r   r   has_socket_supportr   skipIfis_emscriptenr   rr   r   r   r(   r)   r   r!   r   r   r   r      s1   74
 X335MN4 O4 X__W**,LMXV,.BC7 D N7" X__W**,LMX335MN7 O N7$ X__S\\W,.GHX__W**,LMXV,.BC! D N I!r   r   c                   
   e Zd Z ej                  edu d      ddd       Z ej                  edu d       ej                   ee	d      d      d               Z
d	 Zd
 Zd Z ej                   eed      d      d        Zy)WakeupSignalTestsNneed _testcapiTorderedc                p    dj                  t        t        t        |            ||      }t	        d|       y )Na  if 1:
        import _testcapi
        import os
        import signal
        import struct

        signals = {!r}

        def handler(signum, frame):
            pass

        def check_signum(signals):
            data = os.read(read, len(signals)+1)
            raised = struct.unpack('%uB' % len(data), data)
            if not {!r}:
                raised = set(raised)
                signals = set(signals)
            if raised != signals:
                raise Exception("%r != %r" % (raised, signals))

        {}

        signal.signal(signal.SIGALRM, handler)
        read, write = os.pipe()
        os.set_blocking(write, False)
        signal.set_wakeup_fd(write)

        test()
        check_signum(signals)

        os.close(read)
        os.close(write)
        r   )formattuplemapintr   )r.   	test_bodyr   signalscodes        r   check_wakeupzWakeupSignalTests.check_wakeup=  s7     @ F5S'*+Wi@A 	D 	t$r   r   r   c                 n   d}t        j                         \  }}	 t        j                  |d       | j                  d       t        j
                  |       t        j
                  |       t        d|       y # t        $ r Y Bw xY w# t        j
                  |       t        j
                  |       w xY w)Na&  if 1:
        import _testcapi
        import errno
        import os
        import signal
        import sys
        from test.support import captured_stderr

        def handler(signum, frame):
            1/0

        signal.signal(signal.SIGALRM, handler)
        r, w = os.pipe()
        os.set_blocking(r, False)

        # Set wakeup_fd a read-only file descriptor to trigger the error
        signal.set_wakeup_fd(r)
        try:
            with captured_stderr() as err:
                signal.raise_signal(signal.SIGALRM)
        except ZeroDivisionError:
            # An ignored exception should have been printed out on stderr
            err = err.getvalue()
            if ('Exception ignored when trying to write to the signal wakeup fd'
                not in err):
                raise AssertionError(err)
            if ('OSError: [Errno %d]' % errno.EBADF) not in err:
                raise AssertionError(err)
        else:
            raise AssertionError("ZeroDivisionError not raised")

        os.close(r)
        os.close(w)
           xz9OS doesn't report write() error on the read end of a piper   )rr   r   writeskipTestr   r   r   )r.   r   rws       r   test_wakeup_write_errorz)WakeupSignalTests.test_wakeup_write_errord  s    !D wwy1	HHQ MMUVHHQKHHQKt$  		
 HHQKHHQKs(   A9 B 9	BB BB ,B4c                 D    | j                  dt        j                         y )Na  def test():
            import select
            import time

            TIMEOUT_FULL = 10
            TIMEOUT_HALF = 5

            class InterruptSelect(Exception):
                pass

            def handler(signum, frame):
                raise InterruptSelect
            signal.signal(signal.SIGALRM, handler)

            signal.alarm(1)

            # We attempt to get a signal during the sleep,
            # before select is called
            try:
                select.select([], [], [], TIMEOUT_FULL)
            except InterruptSelect:
                pass
            else:
                raise Exception("select() was not interrupted")

            before_time = time.monotonic()
            select.select([read], [], [], TIMEOUT_FULL)
            after_time = time.monotonic()
            dt = after_time - before_time
            if dt >= TIMEOUT_HALF:
                raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
        r   r   r|   rN   s    r   test_wakeup_fd_earlyz&WakeupSignalTests.test_wakeup_fd_early  s     > ^^?	r   c                 D    | j                  dt        j                         y )Na`  def test():
            import select
            import time

            TIMEOUT_FULL = 10
            TIMEOUT_HALF = 5

            class InterruptSelect(Exception):
                pass

            def handler(signum, frame):
                raise InterruptSelect
            signal.signal(signal.SIGALRM, handler)

            signal.alarm(1)
            before_time = time.monotonic()
            # We attempt to get a signal during the select call
            try:
                select.select([read], [], [], TIMEOUT_FULL)
            except InterruptSelect:
                pass
            else:
                raise Exception("select() was not interrupted")
            after_time = time.monotonic()
            dt = after_time - before_time
            if dt >= TIMEOUT_HALF:
                raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
        r   rN   s    r   test_wakeup_fd_duringz'WakeupSignalTests.test_wakeup_fd_during  s     6 ^^7	r   c                 b    | j                  dt        j                  t        j                         y )Nzdef test():
            signal.signal(signal.SIGUSR1, handler)
            signal.raise_signal(signal.SIGUSR1)
            signal.raise_signal(signal.SIGALRM)
        )r   r   rR   r|   rN   s    r   test_signumzWakeupSignalTests.test_signum  s$      ^^V^^		-r   pthread_sigmaskneed signal.pthread_sigmask()c                 f    | j                  dt        j                  t        j                  d       y )Na  def test():
            signum1 = signal.SIGUSR1
            signum2 = signal.SIGUSR2

            signal.signal(signum1, handler)
            signal.signal(signum2, handler)

            signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2))
            signal.raise_signal(signum1)
            signal.raise_signal(signum2)
            # Unblocking the 2 signals calls the C signal handler twice
            signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2))
        Fr   )r   r   rR   SIGUSR2rN   s    r   test_pendingzWakeupSignalTests.test_pending  s-     	  nnfnne 	 	=r   )r;   r8   r<   r   r   	_testcapir   r   r   rr   r   r   r   r   r   r   r!   r   r   r   r   ;  s    X__Y$&(898< $% :$%L X__Y$&(89XV,.BC1% D :1%f D<- X):;8:=:=r   r   
socketpairzneed socket.socketpairc                       e Zd Z ej                  edu d      d        Z ej                  edu d      d        Z ej                  edu d      d        Zy)WakeupSocketSignalTestsNr   c                      d}t        d|       y )Na  if 1:
        import signal
        import socket
        import struct
        import _testcapi

        signum = signal.SIGINT
        signals = (signum,)

        def handler(signum, frame):
            pass

        signal.signal(signum, handler)

        read, write = socket.socketpair()
        write.setblocking(False)
        signal.set_wakeup_fd(write.fileno())

        signal.raise_signal(signum)

        data = read.recv(1)
        if not data:
            raise Exception("no signum written")
        raised = struct.unpack('B', data)
        if raised != signals:
            raise Exception("%r != %r" % (raised, signals))

        read.close()
        write.close()
        r   r   r.   r   s     r   test_socketz#WakeupSocketSignalTests.test_socket  s    > 	t$r   c                 p    t         j                  dk(  rd}nd}dj                  |      }t        d|       y )Nntsendr   a+  if 1:
        import errno
        import signal
        import socket
        import sys
        import time
        import _testcapi
        from test.support import captured_stderr

        signum = signal.SIGINT

        def handler(signum, frame):
            pass

        signal.signal(signum, handler)

        read, write = socket.socketpair()
        read.setblocking(False)
        write.setblocking(False)

        signal.set_wakeup_fd(write.fileno())

        # Close sockets: send() will fail
        read.close()
        write.close()

        with captured_stderr() as err:
            signal.raise_signal(signum)

        err = err.getvalue()
        if ('Exception ignored when trying to {action} to the signal wakeup fd'
            not in err):
            raise AssertionError(err)
        actionr   rr   r   r   r   r.   r  r   s      r   test_send_errorz'WakeupSocketSignalTests.test_send_error  s@     77d?FF!B F&F!C 	D 	t$r   c                 p    t         j                  dk(  rd}nd}dj                  |      }t        d|       y )Nr  r  r   aJ  if 1:
        import errno
        import signal
        import socket
        import sys
        import time
        import _testcapi
        from test.support import captured_stderr

        signum = signal.SIGINT

        # This handler will be called, but we intentionally won't read from
        # the wakeup fd.
        def handler(signum, frame):
            pass

        signal.signal(signum, handler)

        read, write = socket.socketpair()

        # Fill the socketpair buffer
        if sys.platform == 'win32':
            # bpo-34130: On Windows, sometimes non-blocking send fails to fill
            # the full socketpair buffer, so use a timeout of 50 ms instead.
            write.settimeout(0.050)
        else:
            write.setblocking(False)

        written = 0
        if sys.platform == "vxworks":
            CHUNK_SIZES = (1,)
        else:
            # Start with large chunk size to reduce the
            # number of send needed to fill the buffer.
            CHUNK_SIZES = (2 ** 16, 2 ** 8, 1)
        for chunk_size in CHUNK_SIZES:
            chunk = b"x" * chunk_size
            try:
                while True:
                    write.send(chunk)
                    written += chunk_size
            except (BlockingIOError, TimeoutError):
                pass

        print(f"%s bytes written into the socketpair" % written, flush=True)

        write.setblocking(False)
        try:
            write.send(b"x")
        except BlockingIOError:
            # The socketpair buffer seems full
            pass
        else:
            raise AssertionError("%s bytes failed to fill the socketpair "
                                 "buffer" % written)

        # By default, we get a warning when a signal arrives
        msg = ('Exception ignored when trying to {action} '
               'to the signal wakeup fd')
        signal.set_wakeup_fd(write.fileno())

        with captured_stderr() as err:
            signal.raise_signal(signum)

        err = err.getvalue()
        if msg not in err:
            raise AssertionError("first set_wakeup_fd() test failed, "
                                 "stderr: %r" % err)

        # And also if warn_on_full_buffer=True
        signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=True)

        with captured_stderr() as err:
            signal.raise_signal(signum)

        err = err.getvalue()
        if msg not in err:
            raise AssertionError("set_wakeup_fd(warn_on_full_buffer=True) "
                                 "test failed, stderr: %r" % err)

        # But not if warn_on_full_buffer=False
        signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=False)

        with captured_stderr() as err:
            signal.raise_signal(signum)

        err = err.getvalue()
        if err != "":
            raise AssertionError("set_wakeup_fd(warn_on_full_buffer=False) "
                                 "test failed, stderr: %r" % err)

        # And then check the default again, to make sure warn_on_full_buffer
        # settings don't leak across calls.
        signal.set_wakeup_fd(write.fileno())

        with captured_stderr() as err:
            signal.raise_signal(signum)

        err = err.getvalue()
        if msg not in err:
            raise AssertionError("second set_wakeup_fd() test failed, "
                                 "stderr: %r" % err)

        r  r   r  r  s      r   test_warn_on_full_bufferz0WakeupSocketSignalTests.test_warn_on_full_bufferD  sA     77d?FFgN F&F!O 	P 	t$r   )	r;   r8   r<   r   r   r   r   r  r
  r!   r   r   r   r     s{     X__Y$&(89!% :!%F X__Y$&(89(% :(%T X__Y$&(89n% :n%r   r   siginterruptzneeds signal.siginterrupt()r   r   c                   f    e Zd Zej                  fdZd Zd Z ej                  d      d        Z	y)SiginterruptTestc                 r   d|d}t        d|      5 }	 |j                  j                         }|j                  |      \  }}||z   }|j	                         }|dvrt        d|d|      |dk(  cd	d	d	       S # t        j                  $ r |j                          Y d	d	d	       y
w xY w# 1 sw Y   y	xY w)zPerform a read during which a signal will arrive.  Return True if the
        read is interrupted by the signal and raises an exception.  Return False
        if it returns normally.
        zif 1:
            import errno
            import os
            import signal
            import sys

            interrupt = aq  
            r, w = os.pipe()

            def handler(signum, frame):
                1 / 0

            signal.signal(signal.SIGALRM, handler)
            if interrupt is not None:
                signal.siginterrupt(signal.SIGALRM, interrupt)

            print("ready")
            sys.stdout.flush()

            # run the test twice
            try:
                for loop in range(2):
                    # send a SIGALRM in a second (during the read)
                    signal.alarm(1)
                    try:
                        # blocking call: read from a pipe without data
                        os.read(r, 1)
                    except ZeroDivisionError:
                        pass
                    else:
                        sys.exit(2)
                sys.exit(3)
            finally:
                os.close(r)
                os.close(w)
        r   timeout)      Child error (exit code ): r  NF)	r   stdoutreadlinecommunicatewait	Exceptionr   TimeoutExpiredkill)	r.   	interruptr  r   r   
first_liner  r   exitcodes	            r   readpipe_interruptedz%SiginterruptTest.readpipe_interrupted  s    T G#H $% 	''$^^446
!(!4!4W!4!E
 $f,"<<>6)#'/%9 : : A	' 	' ,, 	' 	'	' 	's.   B-/A<.B-<#B*B-)B**B--B6c                 H    | j                  d       }| j                  |       y r@   r  r   r.   interrupteds     r   test_without_siginterruptz*SiginterruptTest.test_without_siginterrupt        //5$r   c                 H    | j                  d      }| j                  |       y NTr!  r"  s     r   test_siginterrupt_onz%SiginterruptTest.test_siginterrupt_on  r%  r   walltimec                 L    | j                  dd      }| j                  |       y )NFr  r  )r  assertFalser"  s     r   test_siginterrupt_offz&SiginterruptTest.test_siginterrupt_off  s'    
 //q/A%r   N)
r;   r8   r<   r   SHORT_TIMEOUTr  r$  r(  requires_resourcer,  r!   r   r   r  r    s>     7>6K6K :'x%% Wz*& +&r   r  	getitimer	setitimerz/needs signal.getitimer() and signal.setitimer()c                       e Zd Zd Zd Zd Zd Zd Zd Zd Z	 e
j                  ej                  dv d	      d
        Zd Zd Zy)
ItimerTestc                     d| _         d| _        d | _        t        j                  t        j                  | j
                        | _        y )NFr   )hndl_called
hndl_countitimerr   r|   sig_alrm	old_alarmrN   s    r   setUpzItimerTest.setUp  s2     v~~t}}Er   c                     t        j                   t         j                  | j                         | j                  !t        j                  | j                  d       y y r\   )r   r|   r8  r6  r0  rN   s    r   tearDownzItimerTest.tearDown  s;    fnndnn5;;"T[[!, #r   c                     d| _         y r'  )r4  rA   s     r   r7  zItimerTest.sig_alrm  s
    r   c                     d| _         | j                  dkD  rt        j                  d      | j                  dk(  r$t        j                  t        j
                  d       | xj                  dz  c_        y )NTr  z.setitimer didn't disable ITIMER_VIRTUAL timer.r   ra   )r4  r5  r   ItimerErrorr0  ITIMER_VIRTUALrA   s     r   
sig_vtalrmzItimerTest.sig_vtalrm"  s^    ??Q$$ &  __!V22A61r   c                 Z    d| _         t        j                  t        j                  d       y )NTr   )r4  r   r0  ITIMER_PROFrA   s     r   sig_profzItimerTest.sig_prof/  s     ++Q/r   c                 d    | j                  t        j                  t        j                  dd       y )Nr   r   )rJ   r   r>  r0  ITIMER_REALrN   s    r   test_itimer_exczItimerTest.test_itimer_exc3  s'     	&,,f.>.>AFr   c                     t         j                  | _        t        j                  | j                  d       t        j                          | j                  | j                  d       y )Ng      ?T)r   rE  r6  r0  pauser'   r4  rN   s    r   test_itimer_realzItimerTest.test_itimer_real<  sB    ((c*))40r   )netbsd5zDitimer not reliable (does not mix well with threading) on some BSDs.c                    t         j                  | _        t        j                   t         j                  | j                         t        j
                  | j                  dd       t        j                  t        j                        D ]3  }t        ddd      }t        j                  | j                        dk(  s3 n | j                  t        j                  | j                        d       | j                  | j                  d       y )Ng333333?皙?90  2	 铖         rQ  T)r   r?  r6  	SIGVTALRMr@  r0  r   
busy_retryLONG_TIMEOUTpowr/  r'   r4  r.   _s     r   test_itimer_virtualzItimerTest.test_itimer_virtualC  s     ++f&&8c3/##G$8$89 	AE5(+A,
:	 	))$++6
C))40r   c                    t         j                  | _        t        j                   t         j                  | j                         t        j
                  | j                  dd       t        j                  t        j                        D ]3  }t        ddd      }t        j                  | j                        dk(  s3 n | j                  t        j                  | j                        d       | j                  | j                  d       y )NrL  rM  rN  rO  rP  T)r   rB  r6  SIGPROFrC  r0  r   rS  rT  rU  r/  r'   r4  rV  s     r   test_itimer_profzItimerTest.test_itimer_profV  s    ((fnndmm4c3/##G$8$89 	AE5(+A,
:	 	))$++6
C))40r   c                     t         j                  | _        t        j                  | j                  d       t	        j
                  d       | j                  | j                  d       y )Nư>ra   T)r   rE  r6  r0  timesleepr'   r4  rN   s    r   test_setitimer_tinyzItimerTest.test_setitimer_tinyg  sF     ((d+

1))40r   N)r;   r8   r<   r9  r;  r7  r@  rC  rF  rI  r   r   r(   r)   rX  r[  r`  r!   r   r   r2  r2    s`    F- 0H1 X__S\\\1NP1P1"1"1r   r2  c                      e Zd ZdZ ej
                   eed      d      d        Z ej
                   eed      d       ej
                   eed      d      d               Z	 ej
                   eed      d	       e
j                         d
               Z ej
                   eed      d      d        Z ej
                   eed      d      d        Z ej
                   eed      d      d        Z ej
                   eed      d      d        Z ej
                   eed      d      d        Z ej
                   eed      d      d        Z ej
                   eed      d      d        Z ej
                   eed      d       ej
                   eed      d       e
j                         d                      Z ej
                   eed      d      d        Z ej
                   eed      d      d        Z ej
                   eed      d       e
j                         d               Z ej
                   eed      d	       e
j                         d               Zy)PendingSignalsTestsz[
    Test pthread_sigmask(), pthread_kill(), sigpending() and sigwait()
    functions.
    
sigpendingzneed signal.sigpending()c                 \    | j                  t        j                         t                      y r@   )r'   r   rc  r{   rN   s    r   test_sigpending_emptyz)PendingSignalsTests.test_sigpending_emptyv  s     	**,ce4r   r   r   c                      d}t        d|       y )Na  if 1:
            import os
            import signal

            def handler(signum, frame):
                1/0

            signum = signal.SIGUSR1
            signal.signal(signum, handler)

            signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
            os.kill(os.getpid(), signum)
            pending = signal.sigpending()
            for sig in pending:
                assert isinstance(sig, signal.Signals), repr(pending)
            if pending != {signum}:
                raise Exception('%s != {%s}' % (pending, signum))
            try:
                signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
            except ZeroDivisionError:
                pass
            else:
                raise Exception("ZeroDivisionError not raised")
        r   r   r   s     r   test_sigpendingz#PendingSignalsTests.test_sigpending{  s    
0 	t$r   pthread_killzneed signal.pthread_kill()c                      d}t        d|       y )Na  if 1:
            import signal
            import threading
            import sys

            signum = signal.SIGUSR1

            def handler(signum, frame):
                1/0

            signal.signal(signum, handler)

            tid = threading.get_ident()
            try:
                signal.pthread_kill(tid, signum)
            except ZeroDivisionError:
                pass
            else:
                raise Exception("ZeroDivisionError not raised")
        r   r   r   s     r   test_pthread_killz%PendingSignalsTests.test_pthread_kill  s    ( 	t$r   c                 J    d|j                         d|d}t        d|       y)zo
        test: body of the "def test(signum):" function.
        blocked: number of the blocked signal
        zif 1:
        import signal
        import sys
        from signal import Signals

        def handler(signum, frame):
            1/0

        z

        blocked = a  
        signum = signal.SIGALRM

        # child: block and wait the signal
        try:
            signal.signal(signum, handler)
            signal.pthread_sigmask(signal.SIG_BLOCK, [blocked])

            # Do the tests
            test(signum)

            # The handler must not be called on unblock
            try:
                signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked])
            except ZeroDivisionError:
                print("the signal handler has been called",
                      file=sys.stderr)
                sys.exit(1)
        except BaseException as err:
            print("error: {}".format(err), file=sys.stderr)
            sys.stderr.flush()
            sys.exit(1)
        r   N)stripr   )r.   blockedtestr   s       r   wait_helperzPendingSignalsTests.wait_helper  s%    N zz|WA &J 	t$r   sigwaitzneed signal.sigwait()c                 D    | j                  t        j                  d       y )Na   
        def test(signum):
            signal.alarm(1)
            received = signal.sigwait([signum])
            assert isinstance(received, signal.Signals), received
            if received != signum:
                raise Exception('received %s, not %s' % (received, signum))
        ro  r   r|   rN   s    r   test_sigwaitz PendingSignalsTests.test_sigwait  s     	 * 	r   sigwaitinfozneed signal.sigwaitinfo()c                 D    | j                  t        j                  d       y )Nz
        def test(signum):
            signal.alarm(1)
            info = signal.sigwaitinfo([signum])
            if info.si_signo != signum:
                raise Exception("info.si_signo != %s" % signum)
        rr  rN   s    r   test_sigwaitinfoz$PendingSignalsTests.test_sigwaitinfo       	 * 	r   sigtimedwaitzneed signal.sigtimedwait()c                 D    | j                  t        j                  d       y )Nz
        def test(signum):
            signal.alarm(1)
            info = signal.sigtimedwait([signum], 10.1000)
            if info.si_signo != signum:
                raise Exception('info.si_signo != %s' % signum)
        rr  rN   s    r   test_sigtimedwaitz%PendingSignalsTests.test_sigtimedwait  rw  r   c                 D    | j                  t        j                  d       y )Nz
        def test(signum):
            import os
            os.kill(os.getpid(), signum)
            info = signal.sigtimedwait([signum], 0)
            if info.si_signo != signum:
                raise Exception('info.si_signo != %s' % signum)
        rr  rN   s    r   test_sigtimedwait_pollz*PendingSignalsTests.test_sigtimedwait_poll  s     	 * 	r   c                 D    | j                  t        j                  d       y )Nz
        def test(signum):
            received = signal.sigtimedwait([signum], 1.0)
            if received is not None:
                raise Exception("received=%r" % (received,))
        rr  rN   s    r   test_sigtimedwait_timeoutz-PendingSignalsTests.test_sigtimedwait_timeout  s     	 * 	r   c                 r    t         j                  }| j                  t        t         j                  |gd       y )Ng      )r   r|   rJ   rK   rx  )r.   r   s     r   "test_sigtimedwait_negative_timeoutz6PendingSignalsTests.test_sigtimedwait_negative_timeout  s)     *f&9&9F8TJr   c                     t        dd       y )Nr   a  if True:
            import os, threading, sys, time, signal

            # the default handler terminates the process
            signum = signal.SIGUSR1

            def kill_later():
                # wait until the main thread is waiting in sigwait()
                time.sleep(1)
                os.kill(os.getpid(), signum)

            # the signal must be blocked by all the threads
            signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
            killer = threading.Thread(target=kill_later)
            killer.start()
            received = signal.sigwait([signum])
            if received != signum:
                print("sigwait() received %s, not %s" % (received, signum),
                      file=sys.stderr)
                sys.exit(1)
            killer.join()
            # unblock the signal, which should have been cleared by sigwait()
            signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
        r   rN   s    r   test_sigwait_threadz'PendingSignalsTests.test_sigwait_thread!  s     	   	r   c                 ,   | j                  t        t        j                         | j                  t        t        j                  d       | j                  t        t        j                  ddd       | j                  t        t        j                  dg        | j                  t
              5  t        j                  t        j                  t        j                  g       d d d        | j                  t
              5  t        j                  t        j                  dg       d d d        | j                  t
              5  t        j                  t        j                  ddz  g       d d d        y # 1 sw Y   xY w# 1 sw Y   \xY w# 1 sw Y   y xY w)Nra   r  r  i  r   i  )rJ   rQ   r   r   r   rK   r   r~   rN   s    r   test_pthread_sigmask_argumentsz2PendingSignalsTests.test_pthread_sigmask_argumentsD  s/    	)V%;%;<)V%;%;Q?)V%;%;Q1E'6#9#94Dz* 	D""6#3#3fkk]C	Dz* 	:""6#3#3aS9	:z* 	@""6#3#3agY?	@ 	@		D 	D	: 	:	@ 	@s$   ,4E2=&E> )F
2E;>F
Fc                    t        j                  t         j                  t        j                               }| j	                  t         j                  t         j
                  |       t        j                  t         j                  t        j                               }| j                  |t        j                                y r@   )r   r   r   ry   r   r   r   assertLessEqualr   s     r   "test_pthread_sigmask_valid_signalsz6PendingSignalsTests.test_pthread_sigmask_valid_signalsR  s{     ""6#3#3V5I5I5KL..0B0BAF""6#5#5v7K7K7MNQ 4 4 67r   c                      d}t        d|       y )Na-	  if 1:
        import signal
        import os; import threading

        def handler(signum, frame):
            1/0

        def kill(signum):
            os.kill(os.getpid(), signum)

        def check_mask(mask):
            for sig in mask:
                assert isinstance(sig, signal.Signals), repr(sig)

        def read_sigmask():
            sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, [])
            check_mask(sigmask)
            return sigmask

        signum = signal.SIGUSR1

        # Install our signal handler
        old_handler = signal.signal(signum, handler)

        # Unblock SIGUSR1 (and copy the old mask) to test our signal handler
        old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
        check_mask(old_mask)
        try:
            kill(signum)
        except ZeroDivisionError:
            pass
        else:
            raise Exception("ZeroDivisionError not raised")

        # Block and then raise SIGUSR1. The signal is blocked: the signal
        # handler is not called, and the signal is now pending
        mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
        check_mask(mask)
        kill(signum)

        # Check the new mask
        blocked = read_sigmask()
        check_mask(blocked)
        if signum not in blocked:
            raise Exception("%s not in %s" % (signum, blocked))
        if old_mask ^ blocked != {signum}:
            raise Exception("%s ^ %s != {%s}" % (old_mask, blocked, signum))

        # Unblock SIGUSR1
        try:
            # unblock the pending signal calls immediately the signal handler
            signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
        except ZeroDivisionError:
            pass
        else:
            raise Exception("ZeroDivisionError not raised")
        try:
            kill(signum)
        except ZeroDivisionError:
            pass
        else:
            raise Exception("ZeroDivisionError not raised")

        # Check the new mask
        unblocked = read_sigmask()
        if signum in unblocked:
            raise Exception("%s in %s" % (signum, unblocked))
        if blocked ^ unblocked != {signum}:
            raise Exception("%s ^ %s != {%s}" % (blocked, unblocked, signum))
        if old_mask != unblocked:
            raise Exception("%s != %s" % (old_mask, unblocked))
        r   r   r   s     r   test_pthread_sigmaskz(PendingSignalsTests.test_pthread_sigmask[  s    GP 	t$r   c                     d}t        d|      5 }|j                         \  }}|j                         }|dk7  rt        d|d|      	 d d d        y # 1 sw Y   y xY w)Na7  if True:
            import threading
            import signal
            import sys

            def handler(signum, frame):
                sys.exit(3)

            signal.signal(signal.SIGUSR1, handler)
            signal.pthread_kill(threading.get_ident(), signal.SIGUSR1)
            sys.exit(2)
        r   r  r  r  )r   r  r  r  )r.   r   r   r  r   r  s         r   test_pthread_kill_main_threadz1PendingSignalsTests.test_pthread_kill_main_thread  sl     $% 	4$002NFF||~H1}!)6!3 4 4 	4 	4 	4s   :AAN)r;   r8   r<   __doc__r   r   r   r   re  rg  r   requires_working_threadingrj  ro  rs  rv  rz  r|  r~  r  r  r  r  r  r  r!   r   r   rb  rb  q  sJ    X635555 X):;8:X635%5:%6 X8570002% 37%. X):;8:*%:*%X X3022 X7466 X8577 X857	7	 X8577 X857K7K X302X):;8:0002 3:2
< X):;8:
@:
@ X):;8:8:8 X):;8:0002I% 3:I%V X85700024 374r   rb  c                   P   e Zd ZdZd Zd Zd Z ej                   e	e
d      d      d        Z ej                   e	e
d      d      d        Z ej                  ej                  d	k(  d
       ej                   e	e
d      d       ej"                         d                      Zy)
StressTestz
    Stress signal delivery, especially when a signal arrives in
    the middle of recomputing the signal state or executing
    previously tripped signal handlers.
    c                 r    t        j                   ||      }| j                  t         j                   ||       y r@   )r   r   )r.   r   rg   old_handlers       r   setsigzStressTest.setsig  s&    mmFG4v{;r   c                    dg dfd	}| j                  t        j                  t        j                  d       | j	                  t        j
                  |        |        t              k  r$t        j                  d       t              k  r$t        t              dz
        D cg c]  }|dz      |   z
   }}t        j                  |      }t        j                  rt        d|fz         |S c c}w )N   c                     t              k  rHj                  t        j                                t	        j
                  t        j                  d       y y )Nr]  )r   appendr^  perf_counterr   r0  rE  )r   frameNtimess     r   rg   z5StressTest.measure_itimer_resolution.<locals>.handler  s@    5zA~T..01   !3!3T: r   r   gMbP?ra   z,detected median itimer() resolution: %.6f s.NN)r   r   r0  rE  r  r|   r   r^  r_  range
statisticsmedianr   verboseprint)r.   rg   i	durationsmedr  r  s        @@r   measure_itimer_resolutionz$StressTest.measure_itimer_resolution  s    	; 	((&*<*<a@FNNG,	%j1nJJt %j1n 5:#e*q.4IJqU1Q3Z%(*J	J	*??@C6IJ
	 Ks   /C<c                 f    | j                         }|dk  ry|dk  ry| j                  d|fz         y )Ng-C6?i'  g{Gz?d   z^detected itimer resolution (%.3f s.) too high (> 10 ms.) on this platform (or system too busy))r  r   )r.   resos     r   decide_itimer_countzStressTest.decide_itimer_count  sB     --/4<T\MM M!G$ %r   r0  ztest needs setitimer()c                    | j                         }g d }dfd	}| j                  t        j                  |       | j                  t        j                  |       | j                  t        j
                  |       d}t        j                         t        j                  z   }||k  r=t        j                  t        j                         t        j                         |dz  }t              |k  rRt        j                         |k  r;t        j                  d       t              |k  rt        j                         |k  r;t        j                  t        j                         t        j                         |dz  }t              |k  rRt        j                         |k  r;t        j                  d       t              |k  rt        j                         |k  r;||k  r=| j                  t              |d       y)	z;
        This test uses dependent signal handlers.
        c                 |    t        j                  t         j                  dt        j                         dz  z          y )Nr]  h㈵>)r   r0  rE  random)r   r  s     r   first_handlerz@StressTest.test_stress_delivery_dependent.<locals>.first_handler  s*     V//$8N1NOr   Nc                 (    j                  |        y r@   r  r   r  sigss     r   second_handlerzAStressTest.test_stress_delivery_dependent.<locals>.second_handler      KKr   r   ra   r  Some signals were lostr  )r  r  r   rZ  rR   r|   r^  	monotonicr   r-  rr   r  getpidr   r_  r'   )r.   r  r  r  expected_sigsdeadliner  s         @r   test_stress_delivery_dependentz)StressTest.test_stress_delivery_dependent  sl    $$&	P	  	FNNM2FNNM2FNNN3>>#g&;&;;aGGBIIK0QMd)m+0@80K

4  d)m+0@80K GGBIIK0QMd)m+0@80K

4  d)m+0@80K a 	TA'?@r   c                 d   | j                         }g fd}| j                  t        j                  |       | j                  t        j                  |       d}||k  rt        j
                  t        j                  dt        j                         dz  z          t        j                  t        j                         t        j                         |dz  }t        j                  t        j                        D ]  }t              |k\  s n ||k  r| j                  t              |d       y)z>
        This test uses simultaneous signal handlers.
        c                 (    j                  |        y r@   r  r  s     r   rg   z=StressTest.test_stress_delivery_simultaneous.<locals>.handler,  r  r   r   r]  r  r  r  N)r  r  r   rR   r|   r0  rE  r  rr   r  r  r   sleeping_retryr-  r   r'   )r.   r  rg   r  rW  r  s        @r   !test_stress_delivery_simultaneousz,StressTest.test_stress_delivery_simultaneous#  s     $$&	  	FNNG,FNNG,a V//$8N1NOGGBIIK0QM++G,A,AB t9- a 	TA'?@r   darwinz&crashes due to system bug (FB13453490)rR   ztest needs SIGUSR1c                   	
 t         j                  d
d	d	fd
fd}	
fd}t        j                         }| j                  t         j                   |       t        j                  |      }	 d}t        j                         5 }|j                           |        d|j                          |j                  `| j                  |j                  j                  t               | j                  dd	d
t        |j                  j                               d}d d d        |s| j                  	d       | j!                  	
       d|j                          y # 1 sw Y   BxY w# d|j                          w xY w)Nr   Fc                     dz  y r`   r!   )r   r  num_received_signalss     r   custom_handlerzAStressTest.test_stress_modifying_handlers.<locals>.custom_handlerN  s     A% r   c                  F     st        j                         dz   sy y r`   )r   raise_signal)do_stopnum_sent_signalsr   s   r   set_interruptszAStressTest.test_stress_modifying_handlers.<locals>.set_interruptsR  s$    ##F+ A%  r   c                      dk  sdk  rJt        d      D ]/  } t        j                  fD ]  }t        j                  |        1 dk  rCdk  rIy y )Nr  ra   i N  )r  r   r   )r  rg   r  r  r  r   s     r   cycle_handlerszAStressTest.test_stress_modifying_handlers.<locals>.cycle_handlersX  s^    "S(,@1,Du 7A$2FNN#C 7fg677 #S(,@1,Dr   )targetTzSignal dz ignored due to race condition)r   rR   r   	threadingThreadr   catch_unraisable_exceptionstartrv   
unraisabler&   	exc_valuer   rm   r   assertGreaterr  )r.   r  r  r  tignoredr   r  r  r  r  r   s          @@@@@r   test_stress_modifying_handlersz)StressTest.test_stress_modifying_handlersC  sO     	&	&	7 mmFN;v{;N3	G335 #	 ==, ))"--*A*A7KMM!&+IJBMM3346 #G#$ ""#7;  !57GHGFFH1# #. GFFHs%   >E7 BE+*.E7 +E40E7 7FN)r;   r8   r<   r  r  r  r  r   r   r   r   r  r  r   r(   r)   r   r  r  r!   r   r   r  r    s    <0% X513*A3*AX X513A3A< X__S\\X-/WXX3-/00026 3/ Y6r   r  c                   j    e Zd Zd Z ej
                  ej                  dk7  d      d        Zd Z	d Z
y)RaiseSignalTestc                     | j                  t              5  t        j                  t        j                         d d d        y # 1 sw Y   y xY wr@   )rJ   KeyboardInterruptr   r  rn   rN   s    r   test_sigintzRaiseSignalTest.test_sigint  s8    01 	/.	/ 	/ 	/s   $AAr   zWindows specific testc                     	 d}t        j                  |       | j                  d       y # t        $ r)}|j                  t        j
                  k(  rn Y d }~y d }~ww xY w)Nra   z#OSError (Invalid argument) expected)r   r  failr   errnoEINVAL)r.   rU   es      r   test_invalid_argumentz%RaiseSignalTest.test_invalid_argument  sR    	F'II;< 	ww%,,& 	s   (+ 	AAAc                 "   dfd}t        j                   t         j                  |      }| j                  t         j                   t         j                  |       t        j                  t         j                         | j	                         y )NFc                 
    dy r'  r!   )abis_oks     r   rg   z-RaiseSignalTest.test_handler.<locals>.handler  s    Er   )r   rn   r   r  r   )r.   rg   
old_signalr  s      @r   test_handlerzRaiseSignalTest.test_handler  sY    	 ]]6==':
v}}jAFMM*r   c                 L    d}t        d|      \  }}}| j                  d|       y )Nzif 1:
        import _thread
        class Foo():
            def __del__(self):
                _thread.interrupt_main()

        x = Foo()
        r   s/   OSError: Signal 2 ignored due to race condition)r   rm   )r.   r   rcouterrs        r   test__thread_interrupt_mainz+RaiseSignalTest.test__thread_interrupt_main  s,     (d3CH#Nr   N)r;   r8   r<   r  r   r   r(   r)   r  r  r  r!   r   r   r  r    s>    / X__S\\W,.EF	 G		Or   r  c                   L    e Zd Z ej                   eed      d      d        Zy)PidfdSignalTestpidfd_send_signalzpidfd support not built inc                    | j                  t              5 }t        j                  dt        j                         d d d        j
                  j                  t        j                  k(  r| j                  d       n8|j
                  j                  t        j                  k(  r| j                  d       | j                  |j
                  j                  t        j                         t        j                  dt        j                          t        j                        }| j!                  t        j"                  |       | j%                  t&        d      5  t        j                  |t        j                  t)               d       d d d        | j                  t*              5  t        j                  |t        j                         d d d        y # 1 sw Y   xY w# 1 sw Y   YxY w# 1 sw Y   y xY w)Nr   zkernel does not support pidfdsz"Not enough privileges to use pidfsz/proc/z^siginfo must be None$)rJ   r   r   r  rn   r   r  ENOSYSr   EPERMr'   EBADFrr   openr  O_DIRECTORYr   r   assertRaisesRegexrQ   objectr  )r.   r   my_pidfds      r   test_pidfd_send_signalz&PidfdSignalTest.test_pidfd_send_signal  sU   
 w' 	72$$Q6	7<<-MM:;\\5;;.MM>?++U[[977VBIIK=12>>B(+##I/GH 	K$$Xv}}fhJ	K01 	>$$Xv}}=	> 	>	7 	7	K 	K	> 	>s#   %G/G#%G*GG'*G3N)r;   r8   r<   r   r   r   r   r  r!   r   r   r  r    s/    X+,$>	>r   r  c                  ,    t        j                          y r@   )r   reap_childrenr!   r   r   tearDownModuler    s    r   __main__)-r*   r  rE   r5   rr   r  r   r   r  r   r(   r  r^  r   rn  r   test.supportr   test.support.script_helperr   r   r   r   ImportErrorTestCaser	   r   r)   r>   r   r   r   r   r   r   r   r  r2  rb  r  r  r  r  r;   mainr!   r   r   <module>r     s       	      
     " E )
/=8$$ /=d (*@Aa="" a= Ba=P S\\W,.@A-D** -D B-D`S!H%% S!l (*@As=)) s= Bs=l WV\24LM@%h// @% N@%F (*@AWV^46STWR(*>?R&x(( R& @  U BR&j (*@AWV[1Rgfk6RJL\1"" \1L B\1~P4(++ P4f
y"" yx)Oh'' )OZ>h'' >* zHMMO a-  Is   J1 1J<;J<