I'm trying to connect to other peers and get data from them using asyncio streams API in a bittorrent client I'm working on. However, I'm having trouble getting data from all the connected peers asynchronously. So far, I can only get data from the first peer, but reading from the rest of peers seems to be blocked. I get no response from chunk = await peer.reader.read() from other peers and the code is blocking other code to execute.
I'm very new to asyncio, any help would be appreciated!
output:
2016-06-20 12:16:26,302 - main.torrent_client - ERROR - [Errno 61] Connect call failed ('74.101.0.24', 6881), {'port': 6881, 'host': '74.101.0.24'}
2016-06-20 12:16:26,397 - main.torrent_client - INFO - connected with peer {'port': 14122, 'host': '80.94.76.7'}
2016-06-20 12:16:26,397 - main.torrent_client - INFO - wating for chunks from {'port': 14122, 'host': '80.94.76.7'}
2016-06-20 12:16:26,454 - main.torrent_client - INFO - connected with peer {'port': 61571, 'host': '83.132.145.93'}
2016-06-20 12:16:26,454 - main.torrent_client - INFO - wating for chunks from {'port': 61571, 'host': '83.132.145.93'}
2016-06-20 12:16:26,460 - main.torrent_client - INFO - connected with peer {'port': 36657, 'host': '79.114.230.255'}
2016-06-20 12:16:26,460 - main.torrent_client - INFO - wating for chunks from {'port': 36657, 'host': '79.114.230.255'}
2016-06-20 12:16:26,464 - main.torrent_client - ERROR - [Errno 61] Connect call failed ('77.235.138.250', 6881), {'port': 6881, 'host': '77.235.138.250'}
2016-06-20 12:16:26,507 - main.torrent_client - INFO - connected with peer {'port': 21426, 'host': '78.176.46.111'}
2016-06-20 12:16:26,508 - main.torrent_client - INFO - wating for chunks from {'port': 21426, 'host': '78.176.46.111'}
2016-06-20 12:16:26,545 - main.torrent_client - INFO - connected with peer {'port': 14474, 'host': '88.244.155.118'}
2016-06-20 12:16:26,545 - main.torrent_client - INFO - wating for chunks from {'port': 14474, 'host': '88.244.155.118'}
2016-06-20 12:16:34,042 - main.torrent_client - ERROR - [Errno 51] Connect call failed ('95.10.195.101', 22665), {'port': 22665, 'host': '95.10.195.101'}
2016-06-20 12:17:41,387 - main.torrent_client - ERROR - [Errno 60] Connect call failed ('93.34.36.131', 13311), {'port': 13311, 'host': '93.34.36.131'}
2016-06-20 12:17:41,387 - main.torrent_client - ERROR - [Errno 60] Connect call failed ('204.237.81.10', 52074), {'port': 52074, 'host': '204.237.81.10'}
2016-06-20 12:18:26,170 - main.torrent_client - DEBUG - read from peer {'port': 14122, 'host': '80.94.76.7'}
2016-06-20 12:18:26,171 - main.torrent_client - INFO - chunk from {'port': 14122, 'host': '80.94.76.7'}: b'x13BitTorrent protocolx00x00x00x00x00x10x00x00#xb0x93x9ezxc8xb9xb0x1cxdaxf0x1fx98x8bx9fxfbxbeDxddxe2-lt0D20-O.xa8qxbdxd9xd2x96x0fxf2t>x00x00x06#x05xffxffxffxffxffxfffxffxffxffxffxffxffxffxffxffxffxffxffxffxffxffxffxffxffxffxffxff'
2016-06-20 12:18:26,174 - main.torrent_client - INFO - Sent INTERESTED message to Peer {'port': 14122, 'host': '80.94.76.7'}
2016-06-20 12:18:26,174 - main.torrent_client - INFO - wating for chunks from {'port': 14122, 'host': '80.94.76.7'}
2016-06-20 12:18:26,174 - main.torrent_client - DEBUG - read from peer {'port': 14122, 'host': '80.94.76.7'}
2016-06-20 12:18:26,174 - main.torrent_client - INFO - no chunk from {'port': 14122, 'host': '80.94.76.7'}
client.py:
async def _connection_handler(self, peer):
    # listen for incoming data from peers
    self.logger.info('connected with peer {}'.format(peer.ip))
    peer.writer.write(self._hand_shake())
    while True:
        self.logger.info('wating for chunks from {}'.format(peer.ip))
        chunk = await peer.reader.read()
        if not chunk:
            self.logger.info('no chunk from {}'.format(peer.ip))
            break
        self.logger.info('chunk from {}: {}'.format(peer.ip, chunk))
        if chunk[1:20].lower() == b'bittorrent protocol':
            info_hash = chunk[28:48]
            if self.torrent.info_hash != info_hash:
                self._close_connection(peer)
            else:
                chunk = chunk[68:]
                peer.writer.write(INTERESTED)
                await peer.writer.drain()
                self.logger.info('Sent INTERESTED message to Peer {}'.format(peer.ip))
        # await self._process_data_from_peer(peer)
async def _connect_to_peer(self, peer):
    try:
        reader, writer = await asyncio.open_connection(
                peer.ip['host'], peer.ip['port'])
        peer.reader = reader
        peer.writer = writer
        await self._connection_handler(peer)
    except ConnectionRefusedError as e:
        self.logger.error('{}, {}'.format(e, peer.ip))
    except (TimeoutError, OSError) as e:
        self.logger.error('{}, {}'.format(e, peer.ip))
        # pp = PeerProtocol(
        #         self.torrent,
        #         self.data_buffer,
        #         self.blocks_requested,
        #         self.pieces_downloaded
        #         )
        # await self.loop.create_connection(
        #         lambda: pp,
        #         peer['host'],
        #         peer['port']
        #     )
async def connect_to_peers(self):
    await asyncio.gather(
            *[self._connect_to_peer(peer) for peer in self.peers],
            )
main.py:
def main():
    filename = '../street-fighter.torrent'
    client = TorrentClient(filename, loop)
    asyncio.ensure_future(client.connect_to_peers())
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    finally:
        loop.close()
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    main()
Aucun commentaire:
Enregistrer un commentaire