a
    QIhG                     @  s  U d dl mZ d dlZd dlZd dlZd dlZd dlZd dlZd dlm	Z	m
Z
mZmZmZ d dlZd dlZd dlmZmZ e	rd dlZd dlmZ d dlmZmZ d dlmZ d dlmZmZmZ d d	lmZ eej ej!f Z"d
e#d< dddddZ$dddddZ%dddddZ&dddddZ'ddddddZ(ej)G dd dZ*ej)G d d! d!Z+ej)G d"d# d#Z,ej)G d$d% d%ej-j.Z/ej)G d&d' d'ej-j0Z1eG d(d) d)Z2eG d*d+ d+ejj3ed,Z4dS )-    )annotationsN)TYPE_CHECKINGAnyNoReturnUnionoverload)NoPublicConstructorfinal)Iterable)AddressFamily
SocketKind)TracebackType)BufferSelf	TypeAlias)AddressFormatr   	IPAddressint)ipreturnc                 C  s4   t | tjrtjjS t | tjr(tjjS tdd S )Nz!Unhandled IPAddress instance type)	
isinstance	ipaddressIPv4AddresstriosocketAF_INETIPv6AddressAF_INET6NotImplementedError)r    r   P/var/www/html/venv_bot_3.9/lib/python3.9/site-packages/trio/testing/_fake_net.py_family_for+   s
    r!   )familyr   c                 C  s8   | t jjkrtdS | t jjkr,tdS tdd S )N0.0.0.0::Unhandled ip address familyr   r   r   r   
ip_addressr   r   r"   r   r   r    _wildcard_ip_for3   s
    

r)   c                 C  s8   | t jjkrtdS | t jjkr,tdS tdd S )N	127.0.0.1::1r%   r&   r(   r   r   r    _localhost_ip_for<   s
    

r,   r   )coder   c                 C  s   t | t| d S N)OSErrorosstrerror)r-   r   r   r    	_fake_errD   s    r2   bytesIterable[Buffer])databuffersr   c              	   C  s|   d}|D ]n}| ||t |j  }t | }||d t|< W d    n1 sP0    Y  |t|7 }|t| kr qxq|S )Nr   )
memoryviewnbyteslen)r5   r6   writtenbufZ
next_pieceZmbufr   r   r    _scatterH   s    
.r<   c                   @  s@   e Zd ZU ded< ded< ddddZedd d	d
dZdS )UDPEndpointr   r   r   port+tuple[str, int] | tuple[str, int, int, int]r   c                 C  s(   | j j| jf}t| j tjr$|d7 }|S )N)r   r   )r   
compressedr>   r   r   r   )selfsockaddrr   r   r    as_python_sockaddrY   s    zUDPEndpoint.as_python_sockaddr)rC   r   c                 C  s"   |d d \}}| t ||dS )N   )r   r>   )r   r'   )clsrC   r   r>   r   r   r    from_python_sockaddrb   s    z UDPEndpoint.from_python_sockaddrN)__name__
__module____qualname____annotations__rD   classmethodrG   r   r   r   r    r=   T   s
   
	r=   c                   @  s   e Zd ZU ded< dS )
UDPBindingr=   localN)rH   rI   rJ   rK   r   r   r   r    rM   k   s   
rM   c                   @  sF   e Zd ZU ded< ded< ejdd dZded< dd d	d
dZdS )	UDPPacketr=   sourcedestinationc                 C  s   |   S r.   )hex)pr   r   r    <lambda>u       zUDPPacket.<lambda>)reprr3   payload)rW   r   c                 C  s   t | j| j|dS )NrP   rQ   rW   )rO   rQ   rP   )rB   rW   r   r   r    replyx   s
    zUDPPacket.replyN)rH   rI   rJ   rK   attrsfieldrW   rY   r   r   r   r    rO   q   s   
rO   c                   @  s*   e Zd ZU ded< dddddddZdS )	FakeSocketFactoryFakeNetfake_netr   
FakeSocket)r"   type_protor   c                 C  s   t | j|||S r.   )r_   _creater^   )rB   r"   r`   ra   r   r   r    r      s    zFakeSocketFactory.socketN)rH   rI   rJ   rK   r   r   r   r   r    r\      s   
r\   c                	   @  sD   e Zd ZU ded< dddddddddd	d
ZddddddZdS )FakeHostnameResolverr]   r^   r   zbytes | Nonezbytes | str | int | Noner   zqlist[tuple[AddressFamily, SocketKind, int, str, tuple[str, int] | tuple[str, int, int, int] | tuple[int, bytes]]])hostr>   r"   typera   flagsr   c                   s   t dd S NzFakeNet doesn't do fake DNS yetr   )rB   rd   r>   r"   re   ra   rf   r   r   r    getaddrinfo   s    z FakeHostnameResolver.getaddrinfor?   ztuple[str, str])rC   rf   r   c                   s   t dd S rg   rh   )rB   rC   rf   r   r   r    getnameinfo   s    z FakeHostnameResolver.getnameinfoN)r   r   r   r   )rH   rI   rJ   rK   ri   rj   r   r   r   r    rc      s   
    rc   c                   @  sZ   e Zd ZddddZdddddd	Zddd
dZdddddZdddddZdS )r]   Noner@   c                 C  s@   t d | _t d | _ttdd| _i | _	d | _
d S )Nz	1.0.0.0/8z1::/16iP  i  )r   IPv4NetworkhostsZ_auto_ipv4_iterIPv6NetworkZ_auto_ipv6_iteriterrange_auto_port_iter_boundroute_packetrB   r   r   r    __init__   s
    zFakeNet.__init__rM   r_   )bindingr   r   c                 C  s"   || j v rttj || j |< d S r.   )rr   r2   errno
EADDRINUSE)rB   rv   r   r   r   r    _bind   s    

zFakeNet._bindc                 C  s$   t jt|  t jt|  d S r.   )r   r   Zset_custom_socket_factoryr\   Zset_custom_hostname_resolverrc   rt   r   r   r    enable   s    zFakeNet.enablerO   packetr   c                 C  s$   | j d u r| | n
|  | d S r.   )rs   deliver_packetrB   r|   r   r   r    send_packet   s    
zFakeNet.send_packetc                 C  s,   t |jd}|| jv r(| j| | n d S )NrN   )rM   rQ   rr   _deliver_packet)rB   r|   rv   r   r   r    r}      s    
zFakeNet.deliver_packetN)rH   rI   rJ   ru   ry   rz   r   r}   r   r   r   r    r]      s
   
r]   c                   @  s<  e Zd ZddddddddZedd	d
dZedd	ddZedd	ddZdd	ddZdd	ddZ	ddddddZ
dddddZddddd Zdd!d"d#d$Zd~d(d)dd*dd+d,d-Zejd.kseseejd/reZdd(ddd0d1d2d3Zejd.kses"eejd/r"eZd4d	d5d6Zd4d	d7d8Zedddd9d:d;Zedddd<d=d>d;Zdddd?d@d=dAd;ZedddBddCdDdEZeddddddFdGdEZddddHd?ddFdIdEZdJd	dKdLZdMdNdOddPdQdRZddSdddTdUdVZedSdWddXdYdZZedSdd[dd\d]dZZd^dd_d`dZZdddd<dadbdcZ ddSddddddedfZ!ddddgdadhdiZ"ddSdddjdddkdlZ#dddddmdndodpZ$ejd.kseseejd/re$Z%dd	dqdrZ&dd	dsdtZ'dd	dudvZ(dddwdxdyZ)ejd.ks(es8eejdzr8dd<d{d|d}Z*d'S )r_   r]   r   r   r   rk   )r^   r"   re   ra   r   c                 C  s   || _ |stjj}|stjj}|tjjtjjfvr@td| |tjjkrZtd| || _|| _	|| _
d| _tjt td\| _| _d | _d S )Nz%FakeNet doesn't (yet) support family=z#FakeNet doesn't (yet) support type=Finf)	_fake_netr   r   r   SOCK_STREAMr   r   
SOCK_DGRAM_family_type_proto_closedZopen_memory_channelrO   float_packet_sender_packet_receiver_binding)rB   r^   r"   re   ra   r   r   r    ru      s&    zFakeSocket.__init__r@   c                 C  s   | j S r.   )r   rt   r   r   r    re      s    zFakeSocket.typec                 C  s   | j S r.   )r   rt   r   r   r    r"      s    zFakeSocket.familyc                 C  s   | j S r.   )r   rt   r   r   r    ra      s    zFakeSocket.protoc                 C  s   | j rttj d S r.   )r   r2   rw   EBADFrt   r   r   r    _check_closed   s    zFakeSocket._check_closedc                 C  s4   | j r
d S d| _ | jd ur&| jj| j= | j  d S )NT)r   r   r   rr   r   closert   r   r   r    r      s    
zFakeSocket.closeobjectboolztuple[str, int])addressrN   r   c                  s$   t jj| j| j| j|d|dI d H S )NF)r   Zipv6_v6onlyrN   )r   _socket_resolve_address_nocpre   r"   ra   )rB   r   rN   r   r   r    r     s    z FakeSocket._resolve_address_nocprO   r{   c                 C  s<   t tj | j| W d    n1 s.0    Y  d S r.   )
contextlibsuppressr   ZBrokenResourceErrorr   Zsend_nowaitr~   r   r   r    r     s    zFakeSocket._deliver_packet)addrr   c                   s   |    | jd urttj tj I d H  | j|ddI d H ^}}}|g ksVJ dt	
|}t|| jksrJ |t	
dkrt	
d}n|t	
dkrt	
d}|dkrt| jj}tt||d}| j||  || _d S )	NTr   zTODO: handle other values?r#   r*   r$   r+   r   )r   r   r2   rw   EINVALr   lowlevel
checkpointr   r   r'   r!   r"   nextr   rq   rM   r=   ry   )rB   r   ip_strr>   _r   rv   r   r   r    bind  s"    



zFakeSocket.bindr   )peerr   c                   s   t dd S )Nz0FakeNet does not (yet) support connected socketsrh   )rB   r   r   r   r    connect2  s    zFakeSocket.connectr   r   Nr4   z!Iterable[tuple[int, int, Buffer]]zAddressFormat | None)r6   ancdatarf   r   r   c                   s   |    tj I d H  |d ur4| j|ddI d H }|r@td|rRtd| |d u rdttj t	
|}| jd u r| t| jjdfI d H  d|}| jd usJ t| jj||d}| j| t|S )NFr   &FakeNet doesn't support ancillary dataz"FakeNet send flags must be 0, not r   rU   rX   )r   r   r   r   r   r   r2   rw   ENOTCONNr=   rG   r   r   r)   r"   rA   joinrO   rN   r   r   r9   )rB   r6   r   rf   r   rQ   rW   r|   r   r   r    _sendmsg5  s,    



zFakeSocket._sendmsgwin32sendmsgzZtuple[int, list[tuple[int, int, bytes]], int, tuple[str, int] | tuple[str, int, int, int]])r6   
ancbufsizerf   r   c           	        s   |dkrt d|dkr t d| jd u r2t d|   g }d}| j I d H }|j }t|j|}|t	|jk r|t
jjO }||||fS )Nr   r   z&FakeNet doesn't support any recv flagszThe code will most likely hang if you try to receive on a fakesocket without a binding. If that is not the case, or you explicitly want to test that, remove this warning.)r   r   r   r   ZreceiverP   rD   r<   rW   r9   r   r   	MSG_TRUNC)	rB   r6   r   rf   r   	msg_flagsr|   r   r:   r   r   r    _recvmsg_intoa  s"    

zFakeSocket._recvmsg_intor?   c                 C  sJ   |    | jd ur| jj S | jtjjkr0dS | jtjjksBJ dS d S )N)r#   r   )r$   r   )	r   r   rN   rD   r"   r   r   r   r   rt   r   r   r    getsockname  s    
zFakeSocket.getsocknamec                 C  sb   |    | jd urTt| jds&J d| jjd urTt| jjtsHJ d| jj S ttj	 d S )NremotezGThis method seems to assume that self._binding has a remote UDPEndpointz,Self._binding.remote should be a UDPEndpoint)
r   r   hasattrr   r   r=   rD   r2   rw   r   rt   r   r   r    getpeername  s"    
zFakeSocket.getpeername)leveloptnamer   c                C  s   d S r.   r   )rB   r   r   r   r   r    
getsockopt  s    zFakeSocket.getsockoptr3   )r   r   buflenr   c                C  s   d S r.   r   rB   r   r   r   r   r   r    r     s    z
int | Nonezint | bytesc                C  s"   |    td| d| dd S )Nz%FakeNet doesn't implement getsockopt(, ))r   r/   r   r   r   r    r     s    zint | Buffer)r   r   valuer   c                C  s   d S r.   r   )rB   r   r   r   r   r   r    
setsockopt  s    zFakeSocket.setsockopt)r   r   r   optlenr   c                C  s   d S r.   r   rB   r   r   r   r   r   r   r    r     s    zint | Buffer | Nonec                C  sF   |    ||ftjjtjjfkr,|s,tdtd| d| dd S )Nz#FakeNet always has IPV6_V6ONLY=Truez%FakeNet doesn't implement setsockopt(r   z, ...))r   r   r   IPPROTO_IPV6IPV6_V6ONLYr   r/   r   r   r   r    r     s    r   c                 C  s   | S r.   r   rt   r   r   r    	__enter__  s    zFakeSocket.__enter__z#builtins.type[BaseException] | NonezBaseException | NonezTracebackType | None)exc_type	exc_value	tracebackr   c                 C  s   |    d S r.   )r   )rB   r   r   r   r   r   r    __exit__  s    zFakeSocket.__exit__r   )r5   rf   r   c                   s   |  ||d I d H S r.   )sendto)rB   r5   rf   r   r   r    send  s    zFakeSocket.sendz!tuple[object, ...] | str | Buffer)_FakeSocket__data_FakeSocket__addressr   c                   s   d S r.   r   )rB   r   r   r   r   r    r     s    zFakeSocket.sendtoz(tuple[object, ...] | str | Buffer | None)r   _FakeSocket__flagsr   r   c                   s   d S r.   r   )rB   r   r   r   r   r   r    r     s    r   )argsr   c                   sR   t |dkr|\}}d}n t |dkr2|\}}}ntd| |gg ||I d H S )NrE   r      zwrong number of arguments)r9   	TypeErrorr   )rB   r   r5   r   rf   r   r   r    r     s    )bufsizerf   r   c                   s   |  ||I d H \}}|S r.   )recvfrom)rB   r   rf   r5   _addressr   r   r    recv
  s    zFakeSocket.recv)r;   r8   rf   r   c                   s   |  |||I d H \}}|S r.   )recvfrom_into)rB   r;   r8   rf   Z	got_bytesr   r   r   r    	recv_into  s    zFakeSocket.recv_intoztuple[bytes, AddressFormat]c                   s"   |  ||I d H \}}}}||fS r.   )_recvmsg)rB   r   rf   r5   _ancdata
_msg_flagsr   r   r   r    r     s    zFakeSocket.recvfromztuple[int, AddressFormat]c                   sD   |dkr|t |jkrtd| |gd|I d H \}}}}||fS )Nr   zpartial recvfrom_into)r7   r8   r   r   )rB   r;   r8   rf   
got_nbytesr   r   r   r   r   r    r     s    zFakeSocket.recvfrom_intoz>tuple[bytes, list[tuple[int, int, bytes]], int, AddressFormat])r   r   rf   r   c           	        s>   t |}| |g||I d H \}}}}t|d | |||fS r.   )	bytearrayr   r3   )	rB   r   r   rf   r;   r   r   r   r   r   r   r    r   )  s    zFakeSocket._recvmsgc                 C  s   t dd S )Nz&can't get fileno() for FakeNet socketsrh   rt   r   r   r    fileno<  s    zFakeSocket.filenoc                 C  s   t dd S )Nzcan't detach() a FakeNet socketrh   rt   r   r   r    detach?  s    zFakeSocket.detachc                 C  s   dS )NFr   rt   r   r   r    get_inheritableB  s    zFakeSocket.get_inheritable)inheritabler   c                 C  s   |rt dd S )Nz&FakeNet can't make inheritable socketsrh   )rB   r   r   r   r    set_inheritableE  s    zFakeSocket.set_inheritableshare)
process_idr   c                 C  s   t dd S )NzFakeNet can't share socketsrh   )rB   r   r   r   r    r   M  s    zFakeSocket.share)r   r   N)r   r   )N)N)r   )r   )r   r   )r   )r   r   )r   r   )+rH   rI   rJ   ru   propertyre   r"   ra   r   r   r   r   r   r   r   sysplatformr   r   r   r   r   recvmsg_intor   r   r   r   r   r   r   r   r   r   r   r   r   r   recvmsgr   r   r   r   r   r   r   r   r    r_      s    	   '

  $
 
      

r_   )	metaclass)5
__future__r   r   rw   r   r0   r   r   typingr   r   r   r   r   rZ   r   Z
trio._utilr   r	   builtinscollections.abcr
   r   r   typesr   Ztyping_extensionsr   r   r   Ztrio._socketr   r   r   r   rK   r!   r)   r,   r2   r<   frozenr=   rM   rO   abcZSocketFactoryr\   ZHostnameResolverrc   r]   
SocketTyper_   r   r   r   r    <module>	   sJ   	#