标签: udp

  • 计算udp校验和(反码求和)

    C语言1:

    #include <stdio.h>
    #include <arpa/inet.h>
    
    unsigned long checksum(char *buf,int nword)
    {
        unsigned long sum;
        unsigned short *data = (unsigned short *) buf;
    
        for(sum=0; nword>0; nword--)
        {
            sum += *data;
            data++;
            sum = (sum>>16) + (sum&0xffff);
        }
        sum = ~checksum & 0xFFFF;
        return htons(sum); //转为大端字节序
    }
    
    void main(int argc, char *argv[])
    {
        char b []  = "\0d\0e";
        unsigned long ret = checksum(b, 2);
        printf("checksum is: 0x%lx\n", ret);
        printf("checksum is: %lu\n", ret);
    
    }
    

    C语言2(github copilot):

    #include <stdio.h>
    #include <stdint.h>
    
    uint16_t udp_checksum(uint8_t *data, size_t len) {
        uint32_t checksum = 0;
    
        // 如果数据长度是奇数,填充一个字节
        if (len % 2 != 0) {
            data[len] = 0;
            len++;
        }
    
        // 将字节数组分成16位的块并相加
        for (size_t i = 0; i < len; i += 2) {
            uint16_t word = (data[i] << 8) + data[i + 1];
            checksum += word;
            // 处理进位
            checksum = (checksum & 0xFFFF) + (checksum >> 16);
        }
    
        // 取反码
        checksum = ~checksum & 0xFFFF;
        return (uint16_t)checksum;
    }
    
    int main() {
        uint8_t data[] = {'a', 'b', 'c', 'd'};
        size_t len = sizeof(data) / sizeof(data[0]);
        printf("Checksum: 0x%04X\n", udp_checksum(data, len));
        return 0;
    }
    

    Python(Github copilot):

    def udp_checksum(data):
        """下面是一个用Python编写的UDP校验和算法:
        ### 步骤:
        1. 定义一个函数 `udp_checksum`,接受一个字节数组作为参数。
        2. 将字节数组分成16位的块。
        3. 将所有16位块相加。
        4. 如果有进位,则将进位加到结果中。
        5. 取结果的反码。
        6. 返回低16位,就是校验和。
        """
        if len(data) % 2 != 0:
            data += b'\x00'  # 如果数据长度是奇数,填充一个字节
    
        checksum = 0
        for i in range(0, len(data), 2):
            # 这个公式是大端的算法,如果是小端可以写作(data[i+1] << 8) + data[i]
            word = (data[i] << 8) + data[i + 1]
            print(word)
            checksum += word
            checksum = (checksum & 0xFFFF) + (checksum >> 16)  # 处理进位
    
        checksum = ~checksum & 0xFFFF  # 取反码
        return checksum
    
    # 示例用法
    data = b'\0d\0e'
    print(hex(udp_checksum(data)))  # 输出校验和
    

    关于接收端验证的方法:

    把整个需要校验的数据计算校验和(反码求和),如果结果为0x0000则结果就是正确的,或者计算原码求和(累加后不取反),结果为0xffff则就是正确的。网上有的说验证结果为0x0000,有的说验证结果为0xffff要了解其中的原因。

    参考:
    https://blog.csdn.net/weixin_28673511/article/details/130314065
    https://zhuanlan.zhihu.com/p/74381973

    Views: 2

  • 在NAT网络中测试UDP会话的超时时间

    创建stuntest.sh脚本,内容如下

    #!/bin/sh
    stunclient --protocol 2 --localport 21 stun.hetao.me
    echo "sleep 1"
    sleep1
    stunclient --protocol 2 --localport 21 stun.hetao.me
    

    然后以不同的时间间隔多次运行这个脚本(可以并行运行),看映射的端口有没有变化,有变化就是超时了
    ./stuntest.sh 235 udp #第一个参数是间隔时间
    test1

    Binding test: success
    Local address: 100.82.167.62:1233
    Mapped address: 117.143.55.240:13837
    sleep 238
    Binding test: success
    Local address: 100.82.167.62:1233
    Mapped address: 117.143.55.240:13837

    test2

    Binding test: success
    Local address: 100.82.167.62:1240
    Mapped address: 117.143.55.240:13853
    sleep 245
    Binding test: success
    Local address: 100.82.167.62:1240
    Mapped address: 117.143.55.240:14115
    经过了238秒,端口号没有变,但是245秒端口号就变了,说明超时时间应该不超过240秒也就是4分钟

    测试用到的stun服务端和客户端是用的这个项目的代码:
    https://github.com/jselbie/stunserver

    也可以用coturn中的turnutils_natdiscovery中的工具来测试
    turnutils_natdiscovery -m -f -t -T 500 stunserver2024.stunprotocol.org
    这样会在开始的时候发一次请求,500秒后再发一次请求

    openwrt修改NAT会话超时(其它系统方法类似)
    vi /etc/sysctl.d/11-nf-conntrack.conf
    修改
    net.netfilter.nf_conntrack_udp_timeout=240
    net.netfilter.nf_conntrack_udp_timeout_stream=300
    生效
    /etc/init.d/sysctl reload
    查看当前NAT会话超时
    sysctl net.netfilter.nf_conntrack_udp_timeout

    Views: 1

  • python实现非塞UDP通信的两种方法

    当使用阻塞式UDP socket时无法执行多任务处理,也无法与用户交互,甚至不能响应ctrl+c中断。为了解决这些问题所以要用非阻塞式udp通信。
    1,多线程法

    import socket
    import signal
    import threading
    import time
    
    def handler(signal_received, frame):
        # Handle any cleanup here
        if signal_received == signal.SIGINT:
            print('SIGINT or CTRL-C detected. Exiting gracefully')
            exit(0)
    
    def task(host, port):
        print("udp server is listen on " + str(host) + ':' + str(port))
    
        sock = socket.socket(socket.AF_INET, # Internet
                            socket.SOCK_DGRAM) # UDP
        sock.bind((UDP_IP, UDP_PORT))
    
        while True:
            data, addr = sock.recvfrom(1024) # buffer size is 1024 bytes
            print("from %s received message: %s" % (addr, data))
    
    if __name__ == '__main__':
        signal.signal(signal.SIGINT, handler=handler) # ctlr + c
    
        UDP_IP = "0.0.0.0"
        UDP_PORT = 5005
    
        t = threading.Thread(target=task, args=(UDP_IP, UDP_PORT))
        t.daemon = True
        t.start()
    
        while True:
            time.sleep(100)
    

    2, 协程法
    官方文档说不要在应用中使用asyncio,而是应该在框架中用

    import asyncio
    
    class EchoServerProtocol:
        def __init__(self, message, on_con_lost):
            self.message = message
            self.on_con_lost = on_con_lost
            self.transport = None
    
        def connection_made(self, transport):
            self.transport = transport
    
        def datagram_received(self, data, addr):
            print('Received %r from %s' % (data.decode(), addr))
            self.transport.sendto(data, addr)
        def error_received(self, exc):
            print('Error received:', exc)
        def connection_lost(self, exc):
            print("Connection closed")
            self.on_con_lost.set_result(True)
    
    async def main():
        print("Starting UDP server")
    
        # Get a reference to the event loop as we plan to use
        # low-level APIs.
        loop = asyncio.get_running_loop()
        on_con_lost = loop.create_future()
        message = 'Hello World!'
        # One protocol instance will be created to serve all
        # client requests.
        transport, protocol = await loop.create_datagram_endpoint(
            lambda: EchoServerProtocol(message, on_con_lost=on_con_lost),
            local_addr=('127.0.0.1', 9999),
            remote_addr=('127.0.0.1', 5005))
    
        try:
            while True:
                await asyncio.sleep(10)  # Serve for 1 hour.
        finally:
            transport.close()
    
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print('Got signal: SIGINT, shutting down.')
        exit(0)
    

    Views: 89