#!/usr/bin/python
# -*- coding: utf-8 -*-
#
# Copyright 2013-2015 clowwindy
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import, division, print_function, \
with_statement
import socket
import struct
import logging
import hashlib
import hmac
ONETIMEAUTH_BYTES = 10
ONETIMEAUTH_CHUNK_BYTES = 12
ONETIMEAUTH_CHUNK_DATA_LEN = 2
def sha1_hmac(secret, data):
return hmac.new(secret, data, hashlib.sha1).digest()
def onetimeauth_verify(_hash, data, key):
return _hash == sha1_hmac(key, data)[:ONETIMEAUTH_BYTES]
def onetimeauth_gen(data, key):
return sha1_hmac(key, data)[:ONETIMEAUTH_BYTES]
def compat_ord(s):
if type(s) == int:
return s
return _ord(s)
def compat_chr(d):
if bytes == str:
return _chr(d)
return bytes([d])
_ord = ord
_chr = chr
ord = compat_ord
chr = compat_chr
#首先,它检查当前 Python 解释器是否支持字节串(bytes)。在 Python 2 和 Python 3 中,字节串的类型和处理方式略有不同,因此这个函数首先进行了一些适配工作。
def to_bytes(s):
if bytes != str:
if type(s) == str:
return s.encode('utf-8')
return s
#正常就不相等 bytes和str
def to_str(s):
if bytes != str:
if type(s) == bytes:
return s.decode('utf-8')
return s
#二进制转字节串
def inet_ntop(family, ipstr):#返回类似于b'127.0.0.1'
if family == socket.AF_INET:
return to_bytes(socket.inet_ntoa(ipstr))#(bytes->str)->bytes
elif family == socket.AF_INET6:
import re
v6addr = ':'.join(('%02X%02X' % (ord(i), ord(j))).lstrip('0')#lstrip是去掉开头的0 每个字节转为转为16进制数字
for i, j in zip(ipstr[::2], ipstr[1::2]))#zip返回迭代器,后面是每两个取出一个字节 从0和从1开始取
v6addr = re.sub('::+', '::', v6addr, count=1)
return to_bytes(v6addr)
def inet_pton(family, addr):#返回类似于b'\x7f\x00\x00\x01' input:str
addr = to_str(addr)
if family == socket.AF_INET:
#返回二进制字节串
return socket.inet_aton(addr)
elif family == socket.AF_INET6:
if '.' in addr: # a v4 addr
#最后一个:后面的地址 addr.rindex返回的是索引 127.0.0.1
v4addr = addr[addr.rindex(':') + 1:]
#返回一个包含了这个地址二进制表示的 4 字节的字节串 b'\x7f\x00\x00\x01' 字节串则是4个部分 可以逐步运行
v4addr = socket.inet_aton(v4addr)
#转为ascii 逐字节处理 返回类似于['7F', '00', '00', '01']
v4addr = map(lambda x: ('%02X' % ord(x)), v4addr)
#
v4addr.insert(2, ':')
newaddr = addr[:addr.rindex(':') + 1] + ''.join(v4addr)
#处理一次没有嵌套得了
return inet_pton(family, newaddr)
dbyts = [0] * 8 # 8 groups
#::1 结果是‘’ ‘’ ‘1’
grps = addr.split(':')
for i, v in enumerate(grps): #index, value
if v:
dbyts[i] = int(v, 16)
else:
for j, w in enumerate(grps[::-1]):#[None:None:-1]。这表示从序列的开头到末尾,以步长为 -1 的方式进行切片,实现反转序列的效果。
if w:
#int(w, 16) 是将十六进制字符串 w 转换为整数。这里的 16 表示使用十六进制解析 w 出来是整数了 双冒号的处理
dbyts[7 - j] = int(w, 16)
else:
break
#退出了外面的循环
break
#用ascii编码存了地址 在字节串中 0也有 把缩写也打开了
return b''.join((chr(i // 256) + chr(i % 256)) for i in dbyts)
else:
raise RuntimeError("What family?")
'''
这个 is_ip 函数用于检测一个给定的地址是否是有效的 IP 地址(IPv4 或 IPv6)。
它的工作原理是:
首先,它遍历了一个地址族的列表 (socket.AF_INET, socket.AF_INET6),分别表示 IPv4 和 IPv6 地址族。
对于每个地址族,它尝试使用 inet_pton() 函数将传入的地址解析为该地址族对应的格式。
如果能够成功解析,且不抛出 TypeError, ValueError, OSError, IOError 异常,则说明传入的地址是有效的 IPv4 或 IPv6 地址,函数会返回相应的地址族常量 (socket.AF_INET 或 socket.AF_INET6)。
如果无法成功解析,或者抛出了上述异常,就继续尝试下一个地址族。
如果对所有地址族的尝试都失败了,函数最终返回 False,表示传入的地址不是有效的 IPv4 或 IPv6 地址。
这个函数是一个通用的地址检查函数,用于判断传入的地址是否符合 IPv4 或 IPv6 地址的格式。
'''
#返回是v4还算v6 return socket.AF_INET6之类的
def is_ip(address):
for family in (socket.AF_INET, socket.AF_INET6):
try:
if type(address) != str:
address = address.decode('utf8')
inet_pton(family, address)
return family
#不是ip地址则报异常 换一个 最后不是就是false
except (TypeError, ValueError, OSError, IOError):
pass
return False
def patch_socket():#返回二进制字节串 ip地址的 用上面的函数做补丁
if not hasattr(socket, 'inet_pton'):#str->bytes 字符串点分十进制 到 二进制标识
socket.inet_pton = inet_pton
if not hasattr(socket, 'inet_ntop'):#bytes->bytes 二进制表示到点分十进制
socket.inet_ntop = inet_ntop
patch_socket()
ADDRTYPE_IPV4 = 0x01
ADDRTYPE_IPV6 = 0x04
ADDRTYPE_HOST = 0x03
ADDRTYPE_AUTH = 0x10
ADDRTYPE_MASK = 0xF
def pack_addr(address): #打包地址 如果错了 可能是主机 跳过 ipv4返回的是b'\x01和32位的串' ipv6是b'\x04和128位的串' 地址是b'\x03+长度+源地址'
address_str = to_str(address)
for family in (socket.AF_INET, socket.AF_INET6):
try:
r = socket.inet_pton(family, address_str)
if family == socket.AF_INET6:
return b'\x04' + r
else:
return b'\x01' + r
except (TypeError, ValueError, OSError, IOError):
pass
if len(address) > 255:
address = address[:255] # TODO
return b'\x03' + chr(len(address)) + address#chr返回字符
'''
+----+-----+-------+------+----------+----------+
|拿走 | 拿走 | 拿走 | ATYP | DST.ADDR | DST.PORT |
+----+-----+-------+------+----------+----------+
|VER | CMD | RSV | ATYP | DST.ADDR | DST.PORT |
+----+-----+-------+------+----------+----------+
| 1 | 1 | X'00' | 1 | Variable | 2 |
+----+-----+-------+------+----------+----------+
'''
def parse_header(data):
addrtype = ord(data[0])#返回Unicode 码点
dest_addr = None
dest_port = None
header_length = 0
if addrtype & ADDRTYPE_MASK == ADDRTYPE_IPV4:
if len(data) >= 7:
dest_addr = socket.inet_ntoa(data[1:5])#转为字符串
dest_port = struct.unpack('>H', data[5:7])[0]#索引5/6的端口号
header_length = 7
else:
logging.warn('header is too short')
elif addrtype & ADDRTYPE_MASK == ADDRTYPE_HOST:
if len(data) > 2:
addrlen = ord(data[1])#取出地址长度
if len(data) >= 4 + addrlen:
dest_addr = data[2:2 + addrlen]
dest_port = struct.unpack('>H', data[2 + addrlen:4 +
addrlen])[0]
header_length = 4 + addrlen
else:
logging.warn('header is too short')
else:
logging.warn('header is too short')
elif addrtype & ADDRTYPE_MASK == ADDRTYPE_IPV6:
if len(data) >= 19:
dest_addr = socket.inet_ntop(socket.AF_INET6, data[1:17])
dest_port = struct.unpack('>H', data[17:19])[0]
header_length = 19
else:
logging.warn('header is too short')
else:
logging.warn('unsupported addrtype %d, maybe wrong password or '
'encryption method' % addrtype)
if dest_addr is None:
return None
return addrtype, to_bytes(dest_addr), dest_port, header_length
#shell.check_config会调用
class IPNetwork(object):#基类是object
ADDRLENGTH = {socket.AF_INET: 32, socket.AF_INET6: 128, False: 0}
def __init__(self, addrs): #shell.check_config中IPNetwork IPNetwork(config.get('forbidden_ip', '127.0.0.0/8,::1/128')) 会调用 传入的参数地址 每一个以元组(网络号,后边0的个数)进入列表
self._network_list_v4 = []
self._network_list_v6 = []
if type(addrs) == str:
addrs = addrs.split(',')
#list() 函数将迭代器转换为列表 每个列表成员都执行了函数
list(map(self.add_network, addrs))
def add_network(self, addr):#将二进制的网络字节序的加入列表
if addr is "":
return
block = addr.split('/')
#返回的是那一族ip socket.AF_INET或者socket.AF_INET6
addr_family = is_ip(block[0])
addr_len = IPNetwork.ADDRLENGTH[addr_family]
if addr_family is socket.AF_INET:#inet_aton 字节串 ;! 表示使用网络字节序(big-endian),即最高有效位在前的字节序 实际没有点分十进制 例如127.0.0.1 二进制转换2130706433 保存为无符号整数
# 小端序解析出来0001000000000000000001111111 是16777343 最后一个字节000000001是高位(前面的位置) struct.pack 解析的顺序 只是解析字节串的顺序
ip, = struct.unpack("!I", socket.inet_aton(block[0]))
elif addr_family is socket.AF_INET6:
hi, lo = struct.unpack("!QQ", inet_pton(addr_family, block[0])) #拆分为两个 64 位无符号整数。 高位high 低位low Q 表示无符号长长整型(64 位无符号整数) I 表示无符号整型(32 位无符号整数
ip = (hi >= 1 #右移一位
prefix_size += 1
logging.warn("You did't specify CIDR routing prefix size for %s, "
"implicit treated as %s/%d" % (addr, addr, addr_len))
elif block[1].isdigit() and int(block[1]) >= prefix_size
else:
raise Exception("Not a valid CIDR notation: %s" % addr) #异常推出了
if addr_family is socket.AF_INET:
self._network_list_v4.append((ip, prefix_size)) #列表的append 添加元组(网络号和后面的0的后缀数(注意不是前缀))
#存入的缩短了的ip 去掉了主机号的 类似于0.0.0.01000000 存的是 (0.0.0.01,1)
else:
self._network_list_v6.append((ip, prefix_size))
#__contains__ 方法允许自定义类 MyClass 实现了包含性检查。当使用 in 关键字检查某个元素是否存在于 MyClass 实例时,实际上会调用 __contains__ 方法来执行检查操作。
def __contains__(self, addr):#检查地址在IPNetwork._network_list_v4列表中吗
addr_family = is_ip(addr)
if addr_family is socket.AF_INET:
ip, = struct.unpack("!I", socket.inet_aton(addr))
return any(map(lambda n_ps: n_ps[0] == ip >> n_ps[1],#主要查看ip在cidr段中不,否则不用这么麻烦
self._network_list_v4)) #对self._network_list_v4每一个操作,n_ps[0] == (ip >> n_ps[1]):将ip右移列表中各个掩码位,看是否相等,map返回迭代器,any查看使得否有true相等的。
elif addr_family is socket.AF_INET6:
hi, lo = struct.unpack("!QQ", inet_pton(addr_family, addr))#Q 表示一个无符号长长整型数 64位 ipv6 128位正好两个
ip = (hi > n_ps[1],
self._network_list_v6))
else:
return False
def test_inet_conv():
ipv4 = b'8.8.4.4'
b = inet_pton(socket.AF_INET, ipv4)
assert inet_ntop(socket.AF_INET, b) == ipv4
ipv6 = b'2404:6800:4005:805::1011'
b = inet_pton(socket.AF_INET6, ipv6)
assert inet_ntop(socket.AF_INET6, b) == ipv6
def test_parse_header():
assert parse_header(b'\x03\x0ewww.google.com\x00\x50') == \
(3, b'www.google.com', 80, 18)
assert parse_header(b'\x01\x08\x08\x08\x08\x00\x35') == \
(1, b'8.8.8.8', 53, 7)
assert parse_header((b'\x04$\x04h\x00@\x05\x08\x05\x00\x00\x00\x00\x00'
b'\x00\x10\x11\x00\x50')) == \
(4, b'2404:6800:4005:805::1011', 80, 19)
def test_pack_header():
assert pack_addr(b'8.8.8.8') == b'\x01\x08\x08\x08\x08'
assert pack_addr(b'2404:6800:4005:805::1011') == \
b'\x04$\x04h\x00@\x05\x08\x05\x00\x00\x00\x00\x00\x00\x10\x11'
assert pack_addr(b'www.google.com') == b'\x03\x0ewww.google.com'
def test_ip_network():
ip_network = IPNetwork('127.0.0.0/24,::ff:1/112,::1,192.168.1.1,192.0.2.0')
assert '127.0.0.1' in ip_network
assert '127.0.1.1' not in ip_network
assert ':ff:ffff' in ip_network
assert '::ffff:1' not in ip_network
assert '::1' in ip_network
assert '::2' not in ip_network
assert '192.168.1.1' in ip_network
assert '192.168.1.2' not in ip_network
assert '192.0.2.1' in ip_network
assert '192.0.3.1' in ip_network # 192.0.2.0 is treated as 192.0.2.0/23
assert 'www.google.com' not in ip_network
if __name__ == '__main__':
test_inet_conv()
test_parse_header()
test_pack_header()
test_ip_network()