RError.com

RError.com Logo RError.com Logo

RError.com Navigation

  • 主页

Mobile menu

Close
  • 主页
  • 系统&网络
    • 热门问题
    • 最新问题
    • 标签
  • Ubuntu
    • 热门问题
    • 最新问题
    • 标签
  • 帮助
主页 / 问题 / 684398
Accepted
Mr Lucky Tomas
Mr Lucky Tomas
Asked:2020-06-27 21:39:39 +0000 UTC2020-06-27 21:39:39 +0000 UTC 2020-06-27 21:39:39 +0000 UTC

在套接字python中接收消息

  • 772

有一个服务器,它接收和发送消息,我想出了发送,通过一个循环到每个,但是在一个循环中接收是行不通的,因为它会接收当前在循环中的那个,其余的将在队列中,这不是您需要的。现在接收消息的代码是什么:

def Reciver():
    global conn
    while 1:
        for i in set(conn):
            try:
                data = i.recv(1024)
                if data:
                    print(data.decode())
            except:
                pass

conn - 设置连接。您需要在屏幕上显示每条消息。

python
  • 1 1 个回答
  • 10 Views

1 个回答

  • Voted
  1. Best Answer
    Sergey Gornostaev
    2020-06-30T00:09:01Z2020-06-30T00:09:01Z

    socket上的操作阻塞线程执行的问题可以通过三种方式解决:

    1. 为每个阻塞操作分配一个单独的执行线程,它可以在不损害其他线程的情况下阻塞它;
    2. 将套接字切换到非阻塞模式并循环轮询它们的状态;
    3. 在异步模式下使用套接字。

    下面将讨论所有三个选项。

    多线程服务器

    也许是最常用的方法。可以说是经典。对于线程之间的交互,我们将使用queue.Queue。首先,队列的逻辑正好适合我们。其次,队列是线程安全的。

    # -*- coding: utf-8 -*-
    import socket
    import threading
    import queue
    
    # Определяем константу содержащую имя ОС
    # для учёта особенностей данной операционной системы
    import platform
    OS_NAME = platform.system()
    
    # Константы
    HOST = 'localhost'
    PORT = 1080
    
    # Единственная глобальная переменная
    # доступная всем потокам
    run = True
    
    def shutdown_socket(s):
        # В Linux'ах просто закрыть заблокированный сокет будет мало,
        # он так и не выйдет из состояния блокировки. Нужно передать
        # ему команду на завершение. Но в Windows наоборот, команда
        # на завершение вызовет зависание, если сокет был заблокирован
        # вызовом accept(), а простое закрытие сработает.
        if OS_NAME == 'Linux':
            s.shutdown(socket.SHUT_RDWR)
        s.close()    
    
    def reciver(client, q):
        while run:
            try:
                # Здесь поток блокируется до тех пор
                # пока не будут считаны все имеющиеся
                # в сокете данные
                data = client.recv(1024)
                if data: # Если есть данные
                    # Отправляем в очередь сообщений кортеж
                    # содержащий сокет отправителя
                    # и принятые данные
                    q.put((client, data))
                    print('{} отправил: {}'.format(client.getpeername(), data.decode()))
            except:
                break # В случае ошибки выходим из цикла
        client.close() # И закрываем клиентский сокет
    
    
    def sender(q, connections):
        while run:
            closed_sockets = set()
            try:
                # Получаем из очереди сообщений
                # сокет отправителя и принятые данные
                sender, message = q.get(timeout=1)
            except queue.Empty:
                pass # Игнорируем отсутствие сообщений в очереди
            else: # Если же сообщения есть
                for s in set(connections): # Обходим все сокеты
                    if s != sender: # Кроме сокета отправителя
                        try:
                            s.send(message) # И отправляем им принятое сообщение
                        except:
                            closed_sockets.add(s)
                if closed_sockets:
                    with threading.Lock():
                        connections -= closed_sockets
                    print("Подключено:", len(connections))
                q.task_done() # Сообщаем, что сообщение обработано
    
    
    def accepter(server, connections, q):
        while run:
            try:
                # Здесь поток блокируется до тех пор, пока кто-то не подключится к серверу
                client, addr = server.accept()
            except OSError as e:
                # Если отловлена не ожидаемая ошибка закрытия серверного сокета, а какая-то другая
                if (OS_NAME == 'Windows' and e.errno != 10038) or (OS_NAME == 'Linux' and e.errno != 22):
                    raise # то возбуждаем её повторно
            else: # Если кто-то подключился и создан новый клиентский сокет
                # Устанавливаем ему таймаут, чтобы считать его сбойным, 
                # если в этот сокет не будут ничего писать более 5 минут
                client.settimeout(60 * 5)
                with threading.Lock():
                    connections.add(client)
                # Запускаем новый поток, выполняющий функцию receiver
                # для только что полученного сокета
                threading.Thread(target=reciver, args=(client, q)).start()
                print("Подключено:", len(connections))
    
    
    if __name__ == '__main__':
        print('Запуск...')
    
        # Очередь сообщений, через которую будут общаться потоки
        q = queue.Queue()
        # Множество соединений
        connections = set()
    
        server = socket.socket()
        server.bind((HOST, PORT))
        server.listen()
    
        print(u'Сервер запущен на {}\n'.format(server.getsockname()))
    
        # Поток получающий сообщения из очереди
        # и отправляющий их всем сокетам в множестве connections
        threading.Thread(target=sender, args=(q, connections)).start()
        # Поток принимающий новые соединения
        threading.Thread(target=accepter, args=(server, connections, q)).start()
    
        while True:
            command = input()
            if command == 'exit': # Если в консоли введена команда exit
                run = False # отменяем выполнение циклов во всех потоках
                break # и выходим из этого цикла
        for s in connections:
            shutdown_socket(s)
        shutdown_socket(server)
    

    除了对示例进行必要的代码更改外,我还删除了不必要的全局变量并进行了样式更改。

    非阻塞服务器

    很古老,但并没有失去它的关联方法。它是古人在单任务操作系统中工作时使用的。要轮询套接字的状态,我们将使用同名模块中的select函数。它不是同类中最快的,但它适用于所有操作系统。

    # -*- coding: utf-8 -*-
    import select
    import socket
    import queue
    
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setblocking(0) # Неблокирующийся сокет
    server.bind(('localhost', 1080))
    server.listen()
    
    sockets = [server]
    message_queues = {}
    
    def close_connection(con):
        sockets.remove(con)
        if con in message_queues:
            del message_queues[con]
        con.close()
    
    
    # Пока есть хоть один сокет
    while sockets:
        # Опрашиваем сокеты на готовность к чтению, записи, ошибки.
        # С таймаутом в 1 секунду для того, чтобы программа реагировала
        # на другие события.
        readable, writable, exceptional = select.select(sockets, sockets, sockets, 1)
    
        for s in readable: # Для каждого сокета готового к чтению
            if s is server: # Если это сокет принимающий соединения
                connection, client_address = s.accept()
                connection.setblocking(0) # Этот клиентский сокет тоже будет неблокируемым
                sockets.append(connection) # Добавляем клиентский сокет в список сокетов
                message_queues[connection] = queue.Queue() # Создаём очередь сообщений для сокета
            else:
                try:
                    data = s.recv(1024) # Читаем без блокировки
                except:
                    close_connection(s) # В случае ошибки закрываем этот сокет и удаляем
                else: # Если ошибка не произошла
                    if data: # И данные получены
                        for c in message_queues: # Обходим все очереди сообщений
                            if c != s: # Кроме очереди текущего сокета
                                message_queues[c].put(data) # Отправляем данные в каждую очередь
                    else:
                        # Если данных нет в сокете готовом для чтения
                        # значит он в состоянии закрытия на клиентской
                        # стороне. Закрываем его на стороне сервера.
                        close_connection(s)
    
        for s in writable: # Для каждого сокета готового к записи
            try:
                next_msg = message_queues[s].get_nowait() # Получаем сообщение из очереди
            except queue.Empty:
                pass # Игнорируем пустые очереди
            except KeyError:
                pass # Игнорируем очереди удалённые до того, как до них дошла очередь обработки
            else:
                s.send(next_msg) # Отправляем без блокировки
    
        for s in exceptional: # Для каждого сбойного сокета
            close_connection(s) # Закрываем сбойный сокет
    

    异步服务器

    比较新的方式。Asyncio 为使用套接字提供了三个抽象级别——套接字操作、流和协议的异步包装器。名称可能有些混乱,asyncio 线程与执行线程无关,协议与网络协议不同。Ceteris paribus,你应该总是选择最高级别的抽象,所以我们实现协议。

    # -*- coding: utf-8 -*-
    import asyncio
    
    clients = []
    
    class SimpleChatClientProtocol(asyncio.Protocol):
        def connection_made(self, transport):
            self.transport = transport
            self.peername = transport.get_extra_info("peername")
            print('Подключился: {}'.format(self.peername))
            clients.append(self)
    
        def data_received(self, data):
            print('{} отправил: {}'.format(self.peername, data.decode()))
            for client in clients:
                if client is not self:
                    client.transport.write(data)
    
        def connection_lost(self, ex):
            print('Отключился: {}'.format(self.peername))
            clients.remove(self)
    
    
    # Цикл событий невозможно прервать, если в нём
    # не происходят события. Чтобы избежать этого
    # регистрируем в цикле фунцию, которая будет 
    # вызываться раз в секунду.
    def wakeup():
        loop = asyncio.get_event_loop()
        loop.call_later(1, wakeup)
    
    
    if __name__ == '__main__':
        print('Запуск...')
    
        # Получаем цикл событий
        loop = asyncio.get_event_loop()
        # Регистрируем "отлипатель"
        loop.call_later(1, wakeup)
        # Создаём асинхронную сопрограмму-протокол
        coro = loop.create_server(SimpleChatClientProtocol, host='localhost', port=1080)
        # Регистрируем её в цикле событий на выполнение
        server = loop.run_until_complete(coro)
    
        for socket in server.sockets:
            print('Сервер запущен на {}'.format(socket.getsockname()))
        print('Выход по Ctrl+C\n')
    
        try:
            loop.run_forever() # Запускаем бесконечный цикл событий
        except KeyboardInterrupt: # Программа прервана нажатием Ctrl+C
            pass
        finally:
            server.close() # Закрываем протокол
            loop.run_until_complete(server.wait_closed()) # Асинхронно ожидаем окончания закрытия
        loop.close() # Закрываем цикл событий
    

    协议代码简单明了,我什至没有找到任何相关的注释。

    上面的每个示例都可以通过telnet并在您的客户端代码的帮助下进行。

    • 8

相关问题

Sidebar

Stats

  • 问题 10021
  • Answers 30001
  • 最佳答案 8000
  • 用户 6900
  • 常问
  • 回答
  • Marko Smith

    Python 3.6 - 安装 MySQL (Windows)

    • 1 个回答
  • Marko Smith

    C++ 编写程序“计算单个岛屿”。填充一个二维数组 12x12 0 和 1

    • 2 个回答
  • Marko Smith

    返回指针的函数

    • 1 个回答
  • Marko Smith

    我使用 django 管理面板添加图像,但它没有显示

    • 1 个回答
  • Marko Smith

    这些条目是什么意思,它们的完整等效项是什么样的

    • 2 个回答
  • Marko Smith

    浏览器仍然缓存文件数据

    • 1 个回答
  • Marko Smith

    在 Excel VBA 中激活工作表的问题

    • 3 个回答
  • Marko Smith

    为什么内置类型中包含复数而小数不包含?

    • 2 个回答
  • Marko Smith

    获得唯一途径

    • 3 个回答
  • Marko Smith

    告诉我一个像幻灯片一样创建滚动的库

    • 1 个回答
  • Martin Hope
    Air 究竟是什么标识了网站访问者? 2020-11-03 15:49:20 +0000 UTC
  • Martin Hope
    Алексей Шиманский 如何以及通过什么方式来查找 Javascript 代码中的错误? 2020-08-03 00:21:37 +0000 UTC
  • Martin Hope
    Qwertiy 号码显示 9223372036854775807 2020-07-11 18:16:49 +0000 UTC
  • Martin Hope
    user216109 如何为黑客设下陷阱,或充分击退攻击? 2020-05-10 02:22:52 +0000 UTC
  • Martin Hope
    Qwertiy 并变成3个无穷大 2020-11-06 07:15:57 +0000 UTC
  • Martin Hope
    koks_rs 什么是样板代码? 2020-10-27 15:43:19 +0000 UTC
  • Martin Hope
    user207618 Codegolf——组合选择算法的实现 2020-10-23 18:46:29 +0000 UTC
  • Martin Hope
    Sirop4ik 向 git 提交发布的正确方法是什么? 2020-10-05 00:02:00 +0000 UTC
  • Martin Hope
    faoxis 为什么在这么多示例中函数都称为 foo? 2020-08-15 04:42:49 +0000 UTC
  • Martin Hope
    Pavel Mayorov 如何从事件或回调函数中返回值?或者至少等他们完成。 2020-08-11 16:49:28 +0000 UTC

热门标签

javascript python java php c# c++ html android jquery mysql

Explore

  • 主页
  • 问题
    • 热门问题
    • 最新问题
  • 标签
  • 帮助

Footer

RError.com

关于我们

  • 关于我们
  • 联系我们

Legal Stuff

  • Privacy Policy

帮助

© 2023 RError.com All Rights Reserve   沪ICP备12040472号-5