RError.com

RError.com Logo RError.com Logo

RError.com Navigation

  • 主页

Mobile menu

Close
  • 主页
  • 系统&网络
    • 热门问题
    • 最新问题
    • 标签
  • Ubuntu
    • 热门问题
    • 最新问题
    • 标签
  • 帮助
主页 / 问题 / 1388880
Accepted
Miekrif
Miekrif
Asked:2022-08-10 15:37:03 +0000 UTC2022-08-10 15:37:03 +0000 UTC 2022-08-10 15:37:03 +0000 UTC

使用 Python 套接字进行异常处理或故障排除

  • 772

我在 pyhon3 上有一个套接字服务器,由于行走机器人会敲击所有打开的端口,因此具有一定的安全性。脚本工作正常,但由于 Dos,它会因错误而崩溃

错误代码:

During handling of the above exception, another exception occurred:

Traceback (most recent call last):   File "SM_803GIT.py", line 94, in <module>
    start_my_server()   File "SM_803GIT.py", line 79, in start_my_server
    load_secret(secret_token, data)   File "SM_803GIT.py", line 43, in load_secret
    accept(key)   File "SM_803GIT.py", line 49, in accept
    try_to_verification(key)   File "SM_803GIT.py", line 60, in try_to_verification
    next_step()   File "SM_803GIT.py", line 89, in next_step
    start_my_server()   File "SM_803GIT.py", line 79, in start_my_server
    load_secret(secret_token, data)   File "SM_803GIT.py", line 43, in load_secret
    accept(key)   File "SM_803GIT.py", line 49, in accept
    try_to_verification(key)   File "SM_803GIT.py", line 60, in try_to_verification
    next_step()   File "SM_803GIT.py", line 89, in next_step
    start_my_server()   File "SM_803GIT.py", line 79, in start_my_server
    load_secret(secret_token, data)   File "SM_803GIT.py", line 43, in load_secret
    accept(key)   File "SM_803GIT.py", line 49, in accept
    try_to_verification(key)   File "SM_803GIT.py", line 60, in try_to_verification
    next_step()   File "SM_803GIT.py", line 89, in next_step
    start_my_server()   File "SM_803GIT.py", line 79, in start_my_server
    load_secret(secret_token, data)   File "SM_803GIT.py", line 43, in load_secret
    accept(key)   File "SM_803GIT.py", line 49, in accept
    try_to_verification(key)   File "SM_803GIT.py", line 60, in try_to_verification
    next_step()   File "SM_803GIT.py", line 89, in next_step
    start_my_server()   File "SM_803GIT.py", line 79, in start_my_server
    load_secret(secret_token, data)   File "SM_803GIT.py", line 43, in load_secret
    accept(key)   File "SM_803GIT.py", line 49, in accept
    try_to_verification(key)   File "SM_803GIT.py", line 60, in try_to_verification
    next_step()   File "SM_803GIT.py", line 89, in next_step
    start_my_server()   File "SM_803GIT.py", line 79, in start_my_server
    load_secret(secret_token, data)   File "SM_803GIT.py", line 43, in load_secret
    accept(key)   File "SM_803GIT.py", line 49, in accept
    try_to_verification(key)   File "SM_803GIT.py", line 60, in try_to_verification
    next_step()   File "SM_803GIT.py", line 89, in next_step
    start_my_server()   File "SM_803GIT.py", line 79, in start_my_server
    load_secret(secret_token, data)   File "SM_803GIT.py", line 43, in load_secret
    accept(key)   File "SM_803GIT.py", line 49, in accept
    try_to_verification(key)   File "SM_803GIT.py", line 60, in try_to_verification
    next_step()   File "SM_803GIT.py", line 89, in next_step
    start_my_server()   File "SM_803GIT.py", line 79, in start_my_server
    load_secret(secret_token, data)   File "SM_803GIT.py", line 43, in load_secret
    accept(key)   File "SM_803GIT.py", line 49, in accept
    try_to_verification(key)   File "SM_803GIT.py", line 60, in try_to_verification
    next_step()   File "SM_803GIT.py", line 89, in next_step
    start_my_server()   File "SM_803GIT.py", line 81, in start_my_server
    start_my_server()   File "SM_803GIT.py", line 76, in start_my_server
    data = client_socket.recv(1048576).decode('utf-8') UnicodeDecodeError: 'utf-8' codec can't decode byte 0xee in position 4: invalid continuation byte

我该如何处理这种情况,或者只是从一个 ip 代码示例中跳过大量请求:

import os
import socket
from pathlib import Path
from dotenv import load_dotenv
import json
import time
import re

#Init .env
env_path = Path('.') / '.env'
load_dotenv(dotenv_path=env_path)

#init
secret_token = os.environ['secret_token']

#time
seconds = time.time()
local_time = time.ctime(seconds)

#HDRS
HDRS = 'HTTP/1.1 200 OK\r\nContent-Type: text/html; charset=utf-8\r\n\r\n'
HDRS_404 = 'HTTP/1.1 404 OK\r\nContent-Type: text/html; charset=utf-8\r\n\r\n'

socket_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket_server.bind(('ip', port))
socket_server.listen(32768)
#create webserver socket
def start_my_server():
    # print('Working...')
    # client_socket, address = socket_server.accept()
    # print('loop', address)
    #
    # data = client_socket.recv(1048576).decode('utf-8')
    def load_secret(secret_token, data):
        key = str(data)
        key = re.findall(f'X-Gitlab-Token:.............', key)
        key = str(key).replace("['X-Gitlab-Token: ",'')
        key = str(key).replace("']", '')
        # print(key, 'load_secret')
        # print(secret_token, 'load_secret')
        if secret_token == key:
            accept(key)
        else:
            fail_verifivcation()
        # return key
    def accept(key):
        print('Find')
        try_to_verification(key)

    def try_to_verification(key):
        print(key, 'key try_to_verification')
        # print(data,'\n', client_socket,'\n', address,'\n', 'ПРОВЕРКА')
        with open(f'data("{local_time}").json', 'w') as output_file:
            json.dump(data, output_file)
        client_socket.send(HDRS.encode('utf-8'))
        client_socket.shutdown(socket.SHUT_WR)
        # socket_server.close()
        # return init_next()
        next_step()
    def fail_verifivcation():
        print('Not find')
        client_socket.shutdown(socket.SHUT_WR)
        addresses = open('ipPOST', 'a')
        addresses.write(str(address) + f'{local_time}\n')
        addresses.close()
        # init_next()
    try:
        while True:
            print('Working...')
            client_socket, address = socket_server.accept()
            print('loop', address)
            addresses = open('ipPOST', 'a')
            addresses.write(str(address) + f'{local_time}\n')
            # addresses.close()
            data = client_socket.recv(1048576).decode('utf-8')
            # content = load_page_from_get_request(data)
            # return data, client_socket, address
            load_secret(secret_token, data)
    except socket.error:
        start_my_server()
    # load_secret(secret_token, data)
    # init_next()
    # return init_next()

def next_step():
    print('next step')
    # return  init_next()
    start_my_server()

print('loop script')
if __name__ == '__main__':
    print('name = main')
    start_my_server()
    # load_secret()
    next_step()
python
  • 1 1 个回答
  • 10 Views

1 个回答

  • Voted
  1. Best Answer
    Miekrif
    2022-08-11T19:57:59Z2022-08-11T19:57:59Z

    现在我的代码看起来像这样,感谢@Mike 的帮助

    仍然需要解决连接数据包和 # 通过超时断开通道的问题

        import os
        import socket
        from pathlib import Path
        from dotenv import load_dotenv
        import json
        import time
        import re
        
        # Init .env
        env_path = Path('.') / '.env'
        load_dotenv(dotenv_path=env_path)
        
        # init
        secret_token = os.environ['secret_token']
        
        # time
        seconds = time.time()
        local_time = time.ctime(seconds)
        
        # HDRS
        HDRS = 'HTTP/1.1 200 OK\r\nContent-Type: text/html; charset=utf-8\r\n\r\n'
        HDRS_404 = 'HTTP/1.1 404 OK\r\nContent-Type: text/html; charset=utf-8\r\n\r\n'
        
        socket_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        socket_server.bind((ip, port))
        socket_server.listen(32768)
        socket_server.settimeout(4)
    
        
        # create webserver socket
        def start_my_server():
            def load_secret(secret_token, data):
                key = str(data)
                key = re.findall(f'X-Gitlab-Token:.............', key)
                key = str(key).replace("['X-Gitlab-Token: ", '')
                key = str(key).replace("']", '')
                if secret_token == key:
                    socket_server.settimeout(None)                
                    accept(key)
                else:
                    fail_verifivcation()
                # return key
        
            def accept(key):
                print('Find')
                try_to_verification(key)
        
            def try_to_verification(key):
                print(key, 'key try_to_verification')
                with open(f'data("{local_time}").json', 'w') as output_file:
                    json.dump(data, output_file)
                client_socket.send(HDRS.encode('utf-8'))
                client_socket.shutdown(socket.SHUT_WR)
                next_step()
        
            def fail_verifivcation():
                print('Not find')
                client_socket.shutdown(socket.SHUT_WR)
                addresses = open('ipPOST', 'a')
                addresses.write(str(address) + f'{local_time}\n')
                addresses.close()
                # init_next()
            while True:
                try:
                    while True:
                        print('Working...')
                        client_socket, address = socket_server.accept()
                        print('loop', address)
                        try:
                            data = client_socket.recv(1048576).decode('utf-8')
                            load_secret(secret_token, data)
                        except UnicodeDecodeError:
                            fail_verifivcation()
                        # content = load_page_from_get_request(data)
                        # return data, client_socket, address
                        # load_secret(secret_token, data)
                except socket.error:
                    pass
        
        
        def next_step():
            print('next step')
            # start_my_server()
        
        
        print('loop script')
        if __name__ == '__main__':
            print('name = main')
            start_my_server()
            # load_secret()
            next_step()
    
    • 0

相关问题

  • 是否可以以某种方式自定义 QTabWidget?

  • telebot.anihelper.ApiException 错误

  • Python。检查一个数字是否是 3 的幂。输出 无

  • 解析多个响应

  • 交换两个数组的元素,以便它们的新内容也反转

Sidebar

Stats

  • 问题 10021
  • Answers 30001
  • 最佳答案 8000
  • 用户 6900
  • 常问
  • 回答
  • Marko Smith

    我看不懂措辞

    • 1 个回答
  • Marko Smith

    请求的模块“del”不提供名为“default”的导出

    • 3 个回答
  • Marko Smith

    "!+tab" 在 HTML 的 vs 代码中不起作用

    • 5 个回答
  • Marko Smith

    我正在尝试解决“猜词”的问题。Python

    • 2 个回答
  • Marko Smith

    可以使用哪些命令将当前指针移动到指定的提交而不更改工作目录中的文件?

    • 1 个回答
  • Marko Smith

    Python解析野莓

    • 1 个回答
  • Marko Smith

    问题:“警告:检查最新版本的 pip 时出错。”

    • 2 个回答
  • Marko Smith

    帮助编写一个用值填充变量的循环。解决这个问题

    • 2 个回答
  • Marko Smith

    尽管依赖数组为空,但在渲染上调用了 2 次 useEffect

    • 2 个回答
  • Marko Smith

    数据不通过 Telegram.WebApp.sendData 发送

    • 1 个回答
  • Martin Hope
    Alexandr_TT 2020年新年大赛! 2020-12-20 18:20:21 +0000 UTC
  • Martin Hope
    Alexandr_TT 圣诞树动画 2020-12-23 00:38:08 +0000 UTC
  • Martin Hope
    Air 究竟是什么标识了网站访问者? 2020-11-03 15:49:20 +0000 UTC
  • Martin Hope
    Qwertiy 号码显示 9223372036854775807 2020-07-11 18:16:49 +0000 UTC
  • Martin Hope
    user216109 如何为黑客设下陷阱,或充分击退攻击? 2020-05-10 02:22:52 +0000 UTC
  • Martin Hope
    Qwertiy 并变成3个无穷大 2020-11-06 07:15:57 +0000 UTC
  • Martin Hope
    koks_rs 什么是样板代码? 2020-10-27 15:43:19 +0000 UTC
  • Martin Hope
    Sirop4ik 向 git 提交发布的正确方法是什么? 2020-10-05 00:02:00 +0000 UTC
  • Martin Hope
    faoxis 为什么在这么多示例中函数都称为 foo? 2020-08-15 04:42:49 +0000 UTC
  • Martin Hope
    Pavel Mayorov 如何从事件或回调函数中返回值?或者至少等他们完成。 2020-08-11 16:49:28 +0000 UTC

热门标签

javascript python java php c# c++ html android jquery mysql

Explore

  • 主页
  • 问题
    • 热门问题
    • 最新问题
  • 标签
  • 帮助

Footer

RError.com

关于我们

  • 关于我们
  • 联系我们

Legal Stuff

  • Privacy Policy

帮助

© 2023 RError.com All Rights Reserve   沪ICP备12040472号-5