a
    QIh                     @  s  d dl mZ d dlZd dlZd dlmZ d dlZd dlZG dd deZ	ddddd	Z
d
dddZddddZddd
dddZdd
dddZddd
dddZddd
dddZddd
dddZddd
ddd Zddd
dd!d"Zddd
dd#d$Zd
dd%d&ZdS )'    )annotationsN)Protocolc                   @  s   e Zd ZddddddZdS )RawInput strpromptreturnc                 C  s   d S N )selfr   r   r   O/var/www/html/venv_bot_3.9/lib/python3.9/site-packages/trio/_tests/test_repl.py__call__       zRawInput.__call__N)r   )__name__
__module____qualname__r   r   r   r   r   r      s   r   z	list[str])cmdsr	   c                   s(   t |  g dddd fdd}|S )z
    Pass in a list of strings.
    Returns a callable that returns each string, each time its called
    When there are not more strings to return, raise EOFError
    r   r   r   c                   s2    |  z
t W S  ty,   td Y n0 d S r
   )appendnextStopIterationEOFError)r   Z	cmds_iterZpromptsr   r   _raw_helper   s
    

z$build_raw_input.<locals>._raw_helper)r   )iter)r   r   r   r   r   build_raw_input   s    r   None)r	   c                  C  sL   t dg} |  dksJ tt |   W d   n1 s>0    Y  dS )z"Quick test of our helper function.Zcmd1N)r   pytestraisesr   )	raw_inputr   r   r   test_build_raw_input#   s    
r    zdict[str, object]c                   C  s   dt iS )N__builtins__)r!   r   r   r   r   build_locals.   s    r"   zpytest.CaptureFixture[str]zpytest.MonkeyPatch)capsysmonkeypatchr	   c                   s`   t jjt d}tg d}||d| t j|I dH  |  \}}| g dks\J dS )z
    Run some basic commands through the interpreter while capturing stdout.
    Ensure that the interpreted prints the expected results.
    Zrepl_locals)zx = 1zprint(f'{x=}')'hello'zdef func():z  print(x + 1)r   zfunc()zasync def afunc():z
  return 4r   zawait afunc()
import sysz"sys.stdout.write('hello stdout\n')r   N)zx=1r&   24zhello stdoutZ13)	trio_replTrioInteractiveConsoler"   r   setattrrun_repl
readouterr
splitlinesr#   r$   consoler   outZ_errr   r   r   test_basic_interaction2   s    r4   )r$   r	   c                   sh   t jjt d}tdg}| |d| tt" t j	|I d H  W d    n1 sZ0    Y  d S )Nr%   zraise SystemExitr   )
r*   r+   r,   r"   r   r-   r   r   
SystemExitr.   )r$   r2   r   r   r   r   "test_system_exits_quit_interpreterW   s    r6   c                   sp   t jjt d}tg d}||d| t j|I d H  |  \}}d|v sTJ d|vs`J d|v slJ d S )Nr%   )z"import signal, trio, trio.lowlevelzasync def f():zh  trio.lowlevel.spawn_system_task(    trio.to_thread.run_sync,    signal.raise_signal, signal.SIGINT,  )z  await trio.sleep_forever()z  print('should not see this')r   z	await f()z print('AFTER KeyboardInterrupt')r   KeyboardInterruptZshouldzAFTER KeyboardInterruptr*   r+   r,   r"   r   r-   r.   r/   r#   r$   r2   r   r3   errr   r   r   test_KI_interruptsc   s    r;   c                   sX   t jjt d}tg d}||d| t j|I d H  |  \}}d|v sTJ d S )Nr%   )r'   if sys.version_info < (3, 11):/  from exceptiongroup import BaseExceptionGroupr   z<raise BaseExceptionGroup('', [RuntimeError(), SystemExit()])!print('AFTER BaseExceptionGroup')r   AFTER BaseExceptionGroupr8   r1   r   r   r   test_system_exits_in_exc_group   s    
r@   c                   sX   t jjt d}tg d}||d| t j|I d H  |  \}}d|v sTJ d S )Nr%   )r'   r<   r=   r   zraise BaseExceptionGroup(z?  '', [BaseExceptionGroup('', [RuntimeError(), SystemExit()])])r>   r   r?   r8   r1   r   r   r   %test_system_exits_in_nested_exc_group   s    rA   c                   sp   t jjt d}tddg}||d| t j|I d H  |  \}}d|vsTJ d|vs`J d|v slJ d S )Nr%   zraise BaseExceptionprint('AFTER BaseException')r   _threads.py_repl.pyAFTER BaseExceptionr8   r9   r   r   r   test_base_exception_captured   s    rF   c                   sX   t jjt d}tddg}||d| t j|I d H  |  \}}d|v sTJ d S )Nr%   z&raise ExceptionGroup('', [KeyError()])zprint('AFTER ExceptionGroup')r   zAFTER ExceptionGroupr8   r1   r   r   r   test_exc_group_captured   s    rG   c                   sp   t jjt d}tg d}||d| t j|I d H  |  \}}d|vsTJ d|vs`J d|v slJ d S )Nr%   )z-async def async_func_raises_base_exception():z  raise BaseExceptionr   z(await async_func_raises_base_exception()rB   r   rC   rD   rE   r8   r9   r   r   r   *test_base_exception_capture_from_coroutine   s    rH   c                  C  s(   t jtjddgdd} | jdks$J dS )zL
    Basic smoke test when running via the package __main__ entrypoint.
    z-mr*   s   exit())inputr   N)
subprocessrunsys
executable
returncode)replr   r   r   test_main_entrypoint   s    rP   )
__future__r   rJ   rL   typingr   r   Z
trio._replr*   r   r   r    r"   r4   r6   r;   r@   rA   rF   rG   rH   rP   r   r   r   r   <module>   s$   %