Starter
- 官网
- Python Standard Library
- Python Package Index
- 安装
python
:- mac:
brew install python3
- windows: download and install from python website
- mac:
- 包管理工具
pip
:pip/pip3 [cmd] [opts]
- eg:
pip instal xxx
,pip install --upgrade xxx
,pip install xxx==version
,pip uninstall xxx
,pip list
开发环境 (IDE)
python自带的开发环境
# 交互式 $ python Python 3.7.2 (tags/v3.7.2:9a3ffc0492, Dec 23 2018, 23:09:28) [MSC v.1916 64 bit (AMD64)] on win32 Type "help", "copyright", "credits" or "license" for more information. >>> 100+200 300 >>> exit()
# 直接运行 $ python hello.py 100+200=300
# 调试: # pdb调试器, 以参数`-m pdb`启动,输入命令`n`单步执行代码,`p 变量名`查看变量,`q` 结束调试退出程序 D:\Space\python> python -m pdb hello.py > d:\space\python\hello.py(2)<module>() -> print('--- Print --- ') (Pdb) n --- Print --- > d:\space\python\hello.py(3)<module>() -> print('100+200='+str(100+200)) (Pdb) n 100+200=300 > d:\space\python\hello.py(4)<module>() -> print('100+200=',100+200) (Pdb) n 100+200= 300 > d:\space\python\hello.py(7)<module>() -> print('--- Inport --- ') (Pdb) n --- Inport --- > d:\space\python\hello.py(8)<module>() -> name=input('input name:') (Pdb) n input name:Tom > d:\space\python\hello.py(9)<module>() -> print('Hello '+name) (Pdb) p name 'Tom' (Pdb) n Hello Tom > d:\space\python\hello.py(12)<module>() -> print('--- Function --- ') (Pdb) q D:\Space\python>
Anaconda
- 开源免费 Download
- 一个集成各类Python工具的集成平台
- 包括 conda包管理工具, 某版本Python, 一批第三方库,编程工具Spyder,交互式编程工具IPython,网页交互式编程工具Jupyter Notebook等
ipython
?
: 变量前或后增加?
将显示一些通用信息,包括函数对应的源代码%
%run
: 用于运行.py
程序 (注意在一个空的命名空间执行%
)%magic
: 显示所有魔术命令%hist
: IPython命令的输入历史%pdb
: 异常发生后是否自动进入调试器%reset
: 重置,即删除当前命名空间中的全部变量或名称%who
: 显示Ipython当前命名空间中已经定义的变量%time
: 给出代码的执行时间%timeit
: 多次执行代码,计算综合平均执行时间
conda
包管理和环境管理工具- 包管理与pip类似,管理Python第三方库
- 环境管理能够允许用户使用不同版本Python,并能灵活切换
常用命令:
- 获取conda版本
conda ‐‐version
- 升级conda
conda update conda
- 下载/卸载包
conda install/uninstall xxx
- 列出安装的包
conda list
- 搜索包
conda search xxx
eg:conda search numpy>=1.12
创建/启用/退出env
conda create/activate/deactivate xxx
# conda activate/deactivate envName $ conda activate base (base) $ ipython3 Python 3.7.3 (default, Mar 27 2019, 16:54:48) Type 'copyright', 'credits' or 'license' for more information IPython 7.4.0 -- An enhanced Interactive Python. Type '?' for help. In [1]: 100+200 Out[1]: 300 In [2]: exit() (base) $
- 获取conda版本
PyCharm
- Community版本免费 Download
- 类似 Eclipse,有调试、语法高亮、Project管理、代码跳转、智能提示、自动完成、单元测试、版本控制等功能
Preference -> Project Interpreter -> 配置 environment
(若安装)
基础
关键字
>>> import keyword
>>> help(keyword)
>>> keyword.kwlist
['False', 'None', 'True', 'and', 'as', 'assert', 'async', 'await', 'break', 'class', 'continue', 'def', 'del', 'elif', 'else', 'except', 'finally', 'for', 'from', 'global', 'if', 'import', 'in', 'is', 'lambda', 'nonlocal', 'not', 'or', 'pass', 'raise', 'return', 'try', 'while', 'with', 'yield']
注释
- 单行注释
# Test print("Hello World")
- 多行注释
''' This is test program Author: CJ Date: 2019/03/10 '''
- 中文支持(for
Syntax Error:Non-ASCII Character ...
)# -*- coding:utf-8 -*- print('你好')
- 注:
# -*- coding:utf-8 -*-
一般放文件首行
- 注:
输出
# 1. "",'','''
print('123')
print(12)
print("It's a.")
print('It\'s a.')
print("Hello 'Tom'.") # Hello 'Tom'.
print('Hello "Tom".') # Hello "Tom".
# 2. ''',\n
print('sdf\nsdfd')
print('''sdf
sdfd
''')
print('''Hello Tom.
Miss you
How are you?''')
#Hello Tom.
#Miss you
#How are you?
print("Hello Tom.\nMiss you\nHow are you?")
#Hello Tom.
#Miss you
#How are you?
# 3. a b
print("Hello","Tom") # Hello Tom
# 4. %
print("Hello %s" % "Tom") # Hello Tom
print("a=%d" % 123.5) # a=123
print("Hello %s,a=%d" % ("Tom",123.5)) # Hello Tom,a=123
x=[1,2,3]
print("Hello %s" % x) # Hello [1, 2, 3]
x="Hello %s,a=%d"
print(x % ("Tom",123.5)) # Hello Tom,a=123
x="Hello %s,a=%d" % ("Tom",123.5)
type(x) # str
print(x) # Hello Tom,a=123
# 5. str,int
x=123
y='123'
print(x)
print(y)
print(str(x)+y)
格式化输出,常用的格式符号:
- 字符
%c
: 字符%s
: 字符串,通过str()
字符串转换格式化,eg:%10s
: 右对齐,占位符10位%-10s
: 左对齐,占位符10位%.2s
: 截取2位字符串%10.2s
: 10位占位符,截取两位字符串
- 整数
%d
: dec 十进制整数%o
: oct 八进制整数%x
: hex 十六进制整数>>> a=123 >>> print("a=%x" % a) a=7b >>> print("a=%X" % a) a=7B
小数
%f
: 浮点实数(默认保留小数点后面六位有效数字)%.3f
: 保留3位小数位
%e
: 指数形式输出(默认保留小数点后面六位有效数字) *%.3e
: 保留3位小数位,使用科学计数法%g
:%f+%e
(默认保证六位有效数字) *%.3g
: 保留3位有效数字,使用小数或科学计数法>>> a=123.5 # f% >>> print("a1=%f,a2=%.3f" % (a,a)) a1=123.500000,a2=123.500 # e% >>> print("a=%e,a=%.3e" % (a,a)) a1=1.235000e+02,a2=1.235e+02 # g% >>> print("a=%g" % a) a=123.5 >>> print("a=%.2g" % a) a=1.2e+02 >>> print("a=%.3g" % a) a=124 >>> print("a=%.5g" % a) a=123.5 >>> print("a=%g" % 123.56789) a=123.568
输入
input(prompt=None)
注意:input获取的数据,都以字符串的方式进行保存
>>> a=input()
Tom
>>> print(a)
Tom
>>> a=input("Enter your name:")
Enter your name:Tom
>>> print(a)
Tom
>>> a=input("Enter your num:")
Enter you num: 123
>>> print(a)
123
>>> type(a)
<class 'str'>
name=input('name:') # 输入值会被强制性地转换为字符串类型
print('Hi '+name);
age=int(input('age:'))
print('get age:'+str(age))
运算符
算术运算符
运算符 描述 实例 +
加 print(10+20) => 30 -
减 print(10-20) => -10 *
乘 print(a_b) => 200 ; print('##'_3) => ###### /
除 print(10/20) => 0.5 //
取整除 print(10//20) => 0 ; print(20/10) => 2 %
取余 print(20%10) => 0 ; print(20/3) => 2 **
幂 print(2**3) => 8 比较运算符:
==
,!=
,<>
,>
,<
,>=
,<=
逻辑运算符:
and
,or
,not
判断语句
keyword: if
,elif
,else
# if-else
a=100
if a>100:
print('a>100')
else
print('a<=100')
# if-elif(else)
if a>100:
print('a>50')
elif a==100:
print('a==100')
elif a<100 and a>50:
print('a (50,100)')
else:
print('a<=50')
循环语句
keyword: for
,while
,break
,continue
# while
i=0
while i<100:
if i%2==0
print(i,"偶数")
else
print(i,"奇数")
i+=1
# for in
# sample1:
for i in 'abcdefg123456':
if i=='d':
print('find d before 5')
continue
if i=='5':
break
print(i)
# sample2:
for i in range(10):
if i == 11:
print('找到结果')
break
else:
print('未找到结果')
异常处理
- Python的错误也是class,所有的错误类型都继承自
BaseException
,可使用继承自定义异常类 - 如果错误没有被捕获,它就会一直往上抛,最后被Python解释器捕获,打印一个错误信息,然后程序退出
- 捕获异常:
try...except...
try...finally...
try...except...else...
try...except...finally...
try...except...else...finally...
- 说明:
- 捕获到
except
中匹配的异常,则执行except
下的语句,反之则执行else
中的语句 - 无论是否捕获到异常,都会执行
finally
中的语句
- 捕获到
- 手动抛出异常:
raise 异常实例
(注:在except
中可直接用raise
重新抛出捕获的异常) - 借助Python内置的
logging
模块,可以容易地记录错误信息(输出或记录到文件),方便事后排查
Sample:
# 1. 捕获某个异常
def testCatchOneException():
try:
print('Begin')
open('test.json','r') # 若test.json不存在,则会产生 IOError 异常
print('End')
except IOError: # 用变量记录可使用 `except IOError as ex:`
print('Catch IOError')
else:
print('Not catch')
finally:
print('Done')
# 2. 捕获多个异常
def testCatchMultiException():
try:
print('Begin')
open('test.json','r') # 若test.json不存在,则会产生 IOError 异常
print('then')
print(num) # 若num未定义,则会会产生 NameError 异常
print('End')
except IOError or NameError: # catch IOError or NameError
print('Catch Error')
except: # catch other all exception (under BaseException)
print('Catch Error')
else:
print('Not catch')
finally:
print('Done')
# 3. 用变量保存捕获到的异常
def testRecordException():
try:
print('Begin')
open('test.json','r')
print('Then')
print(num)
print('End')
except (IOError,NameError) as ex: # 等同:`except IOError or NameError as ex:`
print('Catch error:',type(ex),ex)
# 4. 用raise抛出异常
def testRaiseException1():
try:
print('Begin')
raise ValueError('Hello',1,2) # 抛出异常(异常实例,构造可传入任意参数,无限制)
print('End')
except: # 捕获异常
print('Catch Error') # 记录一下
raise # 再抛出
finally:
print('Done')
def testRaiseException2():
try:
print('Begin')
open('test.json','r')
print('then')
print(num)
print('End')
except (IOError,NameError) as ex:
print('Catch error:',type(ex),ex)
raise # 抛出发生的IOError或NameError异常实例,
# 也可抛出另一个异常实例,如:`raise ValueError(msg)`
finally:
print('Done')
# 5. 使用logging模块,记录错误信息
import logging
def testLogException():
try:
print('Begin')
raise ValueError('Hello')
print('End')
except Exception as ex:
print("Catch error:",type(ex),ex)
logging.exception(ex)
finally:
print("Done")
>>> testLogException()
Begin
Catch error: <class 'ValueError'> Hello
ERROR:root:Hello
Traceback (most recent call last):
File "<ipython-input-34-817d16b0d2f0>", line 4, in testLogException
raise ValueError('Hello')
ValueError: Hello
Done
Python Build-in Exceptions
Refer Doc
BaseException
+-- SystemExit
+-- KeyboardInterrupt
+-- GeneratorExit
+-- Exception
+-- StopIteration
+-- StopAsyncIteration
+-- ArithmeticError
| +-- FloatingPointError
| +-- OverflowError
| +-- ZeroDivisionError
+-- AssertionError
+-- AttributeError
+-- BufferError
+-- EOFError
+-- ImportError
| +-- ModuleNotFoundError
+-- LookupError
| +-- IndexError
| +-- KeyError
+-- MemoryError
+-- NameError
| +-- UnboundLocalError
+-- OSError
| +-- BlockingIOError
| +-- ChildProcessError
| +-- ConnectionError
| | +-- BrokenPipeError
| | +-- ConnectionAbortedError
| | +-- ConnectionRefusedError
| | +-- ConnectionResetError
| +-- FileExistsError
| +-- FileNotFoundError
| +-- InterruptedError
| +-- IsADirectoryError
| +-- NotADirectoryError
| +-- PermissionError
| +-- ProcessLookupError
| +-- TimeoutError
+-- ReferenceError
+-- RuntimeError
| +-- NotImplementedError
| +-- RecursionError
+-- SyntaxError
| +-- IndentationError
| +-- TabError
+-- SystemError
+-- TypeError
+-- ValueError
| +-- UnicodeError
| +-- UnicodeDecodeError
| +-- UnicodeEncodeError
| +-- UnicodeTranslateError
+-- Warning
+-- DeprecationWarning
+-- PendingDeprecationWarning
+-- RuntimeWarning
+-- SyntaxWarning
+-- UserWarning
+-- FutureWarning
+-- ImportWarning
+-- UnicodeWarning
+-- BytesWarning
+-- ResourceWarning
给程序传参
$ vi argv-demo.py
import sys
print("Hello argv:",sys.argv)
$ python3 argv-demo.py abc 123 456
Hello argv: ['argv-demo.py', 'abc', '123', '456']
测试
单元测试
unittest
- 导入包:
import unittest
- 编写测试类(继承自
unittest.TestCase
):class Xxx(unittest.TestCase)
- 编写测试函数(以test开头):
def testXxx()
- 添加测试切面函数(自动在每一个测试方法前后执行):
setUp()
,tearDown()
- 运行:
- method1: 添加:
if __name__ == '__main__':unittest.main()
,执行:python unittest-demo.py
- method2: 直接执行
python -m unittest unittest-demo.py
- method1: 添加:
- 导入包:
文档测试
doctest
- 导入包:
import doctest
- doctest 模块可以直接提取py开始的注释和函数开始的注释
'''...'''
- 运行:
if __name__=='__main__': doctest.testmod()
- 导入包:
Sample1: 单元测试 unittest
import unittest
class MyTest(unittest.TestCase):
def setUp(self):
print('setUp...')
def tearDown(self):
print('tearDown...')
def test_dict(self):
print('test 1')
d={'a':1,'b':2}
self.assertEqual(d['a'],1)
self.assertEqual(d['b'],2)
self.assertTrue(isinstance(d, dict))
def testdict2(self):
print('test 2')
d={'a':1,'b':2}
with self.assertRaises(KeyError): # 期待抛出指定类型的Error
value = d['c']
if __name__ == '__main__':
unittest.main()
$ python unittest-demo.py
setUp...
test 1
tearDown...
.setUp...
test 2
tearDown...
.
----------------------------------------------------------------------
Ran 2 tests in 0.000s
OK
Sample2:文档测试 doctest
'''
>>> hello([4,5,6])
hello: [4,5,6]
'''
def hello(x):
'''
doc test demo -- helo
>>> hello('Tom')
hello: Tom
>>> hello(1/0)
Traceback (most recent call last):
...
ZeroDivisionError: division by zero
>>> hello([1,2,3])
hello: [1,2,3]
'''
print('hello:',x)
if __name__=='__main__':
import doctest
doctest.testmod()
$ python doctest-demo.py
**********************************************************************
File "doctest-demo.py", line 2, in __main__
Failed example:
hello([4,5,6])
Expected:
hello: [4,5,6]
Got:
hello: [4, 5, 6]
**********************************************************************
File "doctest-demo.py", line 15, in __main__.hello
Failed example:
hello([1,2,3])
Expected:
hello: [1,2,3]
Got:
hello: [1, 2, 3]
**********************************************************************
2 items had failures:
1 of 1 in __main__
1 of 3 in __main__.hello
***Test Failed*** 2 failures.
基础数据类型
不可变对象 & 可变对象
不可变对象:即无法修改这个对象的值,每次对变量的修改,实际上是创建一个新的对象 (=> 可做key)
- 整数
int
eg:a=123
- 浮点数
float
eg:a=123.0
- 布尔
bool
eg:a=True
,a=False
,a=bool(1)
,bool('')
- 字符串
str
eg:a="123abc"
- 元祖
tuple
eg:a=(1,23.4,'ef')
- 空
NoneType
eg:a=None
- 整数
可变对象(=> 不可做key)
- 列表
list
eg:a=[1,23.4,'ef']
- 字典
dict
eg:a={'a':1,1:'d',(1,2):'123'}
- 集合
set
eg:a={1,2,'ef',12.4}
- 列表
对象比较
id(obj)
查看对象的唯一标识==
比较两个对象内容是否相等is
比较两个对象是否相等(是否是同一块地址空间)
Sample:
# 1. 不可变对象:int
>>> a=555
>>> b=a
>>> id(a),id(b) # a和b相等,指向同一个地址空间
(4566055600,4566055600)
>>> b=789 # b指向新开辟的另一个地址空间,a与b不同了
>>> id(a),id(b)
(4324742368,4566055696)
# 2. 可变对象:list
>>> a=[1,2,3]
>>> b=a
>>> id(a),id(b) # a和b相等,指向同一个地址空间
(4333307784,4333307784)
>>> b.append(4) # b和a依旧相等,指向同一块地址空间
>>> id(a),id(b)
(4333307784,4333307784)
>>> import copy
>>> c=copy.deepcopy(a) # 拷贝内容(开辟了一块新空间存储被拷贝对象的所有内容),相当于执行了c=[1,2,3]
>>> id(a),id(c)
(4333307784,4397232200)
>>> a==c,a is c # a与c内容相同,但指向不同地址空间
(True,False)
垃圾回收机制
intern机制:Python为了优化速度,对一些对象使用了对象池,避免为频繁申请和销毁内存空间
# 小整数:共用对象 >>> a=123 >>> b=123 >>> id(a),id(b),id(123) # 相同 (4557612256, 4557612256, 4557612256) # 大整数:不共用对象 >>> a=789 >>> b=789 >>> id(a),id(b),id(789) # 不相同 (4566055600, 4566055760, 4561935984) # 一个字符:共用对象 >>> a='A' >>> b='A' >>> id(a),id(b),id('A') # 相同 (4562778576, 4562778576,4562778576) # 一个单词:共用对象 >>> a="Hello" >>> b="Hello" >>> id(a),id(b),id("Hello") # 相同 (4566204968, 4566204968,4566204968) # 非单词字符串:不共用对象 >>> a="Hello World" >>> b="Hello World" >>> id(a),id(b),id("Hello World") # 不相同 (4564866736, 4565787824, 4565787888)
小整数[-5,257)
: 共用对象,常驻内存单个字符
: 共用对象,常驻内存单个单词
: 共用对象,不常驻内存,引用计数为0时销毁- 注:
- 非单个但单词的字符串(如含有空格等),不会使用intern机制(即不共用对象),引用计数为0则销毁
- 数值类型和字符串类型在Python中都是不可变类型,即无法修改这个对象的值,每次对变量的修改,实际上是创建一个新的对象
引用计数
# 查看一个对象的引用计数 >>> import sys >>> a="Hello World" >>> sys.getrefcount(a) # 比正常计数大1,因为调用函数的时候传入a,这会让a的引用计数+1 2 >>> b=a >>> sys.getrefcount(a) 3 >>> del b >>> sys.getrefcount(a) 2
- 一旦引用计数为0则销毁以释放内存,具有实时性,处理回收内存的时间分摊到了平时(不用像其他机制等到特定时机)
- 循环引用所占用的内存永远无法被回收
- 维护消耗资源:占空间且处理相对较慢
标记-清除(判别出垃圾清除)
- 标记出有指针引用的对象,剩下未被标记的对象则为垃圾,可进行清除
分代收集 Generational GC(可处理循环引用释放问题)
- 使用链表来持续追踪活跃对象
- 内部使用多代(个)链表:Generation Zero & Generation One & Generation Two(新创建的对象放入Generation Zero链表)
- 周期性地检查链表,根据规则减掉上面互相引用的对象的引用计数,清除引用计数为0的对象,剩下的活跃对象则移动到下一代链表
- GC阈值:一旦被分配对象与被释放对象的计数值之差达到阈值,就启动检查,释放“浮动的垃圾”,活跃对象移动到下一代链表
gc
模块: 提供设置垃圾回收的APIgc.isenabled()
gc.disable()
gc.set_debug(flags)
设置gc的debug日志,一般设置为gc.DEBUG_LEAK
gc.collect(generation=2)
:- 0: 只检查0代的对象;
- 1:检查0,1代对象;
- 2:检查0,1,2代对象;
- 返回收集的垃圾数
gc.get_count()
:- 返回一个长度为3的元组,代表各代自动执行垃圾回收的计数器
- eg: (488,0,0) 488是指距离上一次0代垃圾检查,Python分配内存的数目减去释放内存的数目
gc.get_threshold()
:- 返回一个长度为3的元组,代表各代自动执行垃圾回收的阈值
- 每一次计数器增加,gc模块会检查增加后的计数是否达到阀值的数目,如果是,就会执行对应的代数的垃圾检查,然后重置计数器
- eg:(700,10,10)
gc.set_threshold(threshold0[, threshold1[, threshold2])
- 设置各代阈值
- eg: 阈值:(700,10,10), 计数器:
- (699,3,0)增到(700,3,0)时 => gc模块执行
gc.collect(0)
检查0代对象的垃圾,重置计数器为(0,4,0)
- (699,9,0)增到(700,9,0)时 => gc模块执行
gc.collect(1)
检查1,2代对象的垃圾,重置计数器为(0,0,1)
- (699,9,9)增到(700,9,9)时 => gc模块执行
gc.collect(2)
检查0,1,2,3代对象的垃圾,重置计数器为(0,0,0)
- (699,3,0)增到(700,3,0)时 => gc模块执行
垃圾回收触发点:
- 调用
gc.collect()
gc
模块的计数器达到阀值时- 程序退出时
- 调用
总结:
- Python中使用
intern机制
共用一些对象以优化速度 - 垃圾回收采用
引用计数
为主,标记-清除
+分代收集
为辅的策略 gc
模块提供垃圾回收的API
- Python中使用
类型转换
整数:
int(x [,base ])
int(20.4)
=> 20int('20.4')
=> Errorint('89')
=> 89int("0xc",base=16)
=> 12
浮点数:
float(x)
float("123")
,flaot(123)
=> 123.0float("123.5")
,float(123.5)
=> 123.5float("12e+2")
,float(12e+2)
=> 1200.0
字符串:
str(obj)
参数可为任意对象str(123.5)
,str("123.5")
=> "123.5"str([12,123.5,"se"])
=> "[12, 123.5, 'se']"str({'a':1,'b':2})
=> "{'a': 1, 'b': 2}"
元组:
tuple(s)
参数可以是元组,列表,字典(只取keys),集合,字符串tuple((1,2,3))
=> (1,2,3)tuple([1,2,3])
=> (1,2,3)tuple({'a':1,'b':2})
=> ('a', 'b')tuple({1, 23.0, 'ef'})
=> (1, 23.0, 'ef')tuple('123')
=> ('1', '2', '3')
列表:
list(s)
参数可为元组,字典,列表,字典(只取keys),集合,字符串list([1,2,3])
=> [1,2,3]list((1,2,3))
=> [1,2,3]list({'a':1,'b':2})
=> ['a', 'b']list('123')
=> ['1','2','3']list(set([1,2,3]))
=> [1,2,3]
集合:
set(s)
参数可以是元组,列表,字典(只取keys),集合,字符串set((1,2,3))
=> {1,2,3}set({1,2,3})
=> {1,2,3}set([1,2,2,3,1,2])
=> {1,2,3}set({'a':1,'b':2})
=> {'a', 'b'}set('123')
=> {'2', '3', '1'}
其他:
- 整数转换某进制(字符串表示)
hex(x)
: 把一个整数转换为十六进制字符串 eg:hex(12)
=> '0xc'oct(x)
: 把一个整数转换为八进制字符串 eg:oct(12)
=> '0o14'
- 执行一个字符串表达式,返回计算的结果:
eval(str)
eval("12+23")
=> 35
- 利用
enumerate
将list中的item转换为tuple>>> seasons = ['Spring', 'Summer', 'Fall', 'Winter'] >>> list(enumerate(seasons)) [(0, 'Spring'), (1, 'Summer'), (2, 'Fall'), (3, 'Winter')]
- 整数转换某进制(字符串表示)
字符编码
ASCII
&Unicode
&UTF-8
编码ASCII
: 1个字节Unicode
: 2个字节(偏僻的字符要4个字节)UTF-8
: 可变长编码,能节省空间(把一个Unicode字符根据不同的数字大小编码成1-6个字节,英文字母1个字节,汉字通常是3个字节,偏僻的字符要4~6个字节)eg:
字符 ASCII Unicode UTF-8 A 01000001 00000000 01000001 01000001 中 / 01001110 00101101 11100100 10111000 10101101
工作方式:
- 在计算机内存中,统一使用
Unicode
编码,当需要保存到硬盘或者需要传输的时候,就转换为UTF-8
编码 - python中
str
在内存中以Unicode
表示,一个字符对应若干个字节,传输或者保存到磁盘,则变为以字节为单位的bytes
- 在计算机内存中,统一使用
获取字符编码的整数表示
ord(x)
ord('a')
=> 97ord('A')
=> 65ord('你')
=> 20320ord('中')
=> 20013
把编码转换为对应的字符
chr(x)
chr(65)
=> 'A'chr(20013)
=> '中'
encode/decode
- encode: str -> bytes
- decode: str <- bytes
Sample:
# 1. encode/decode
a='abc'
b=a.encode()
print(b) # b'abc'
print(b.decode()) # abc
a='你好'
b=a.encode()
print(b) # b'\xe4\xbd\xa0\xe5\xa5\xbd'
print(b.decode()) # 你好
# 2. len
# - len(str) 返回字符数
# - len(bytes) 返回字节数
a='abc'
print(len(a)) # 3
a=b'abc'
print(len(a)) # 3
a='你好'
print(len(a)) # 2
print(a.encode('utf-8')) # b'\xe4\xbd\xa0\xe5\xa5\xbd'
print(len(a.encode('utf-8'))) # 6
整数:进制转换/位运算
进制
- 十进制(逢十进一): 0,1,2,3,4,5,6,7,8,9,10,11,12,...
- 二进制(逢二进一): 0,1,10,11,100,...
- 八进制(逢八进一): 0,1,2,3,4,5,6,7,10,11,12,...
- 十六进制(逢十六进一): 0,1,2,...8,9,10,a,b,c,d,e,f,10,11,12,...
计算机存储:
- 以二进制的补码形式存储,人们看到的数字是原码转化来的,而原码是通过补码得到的
- 即:
补码(机器存储) -> 原码 -> 转换对应进制 -> 最后人们看到的数
- 符号位:第一位为符号位,表正数或负数
- 原码:用来转换对应进制
- 反码:符号位不变,其他位数取反
- 补码: 用来做数据的存储运算(运算包括符号位). 补码提出的根源是让计算机底层用加法实现减法操作,即减去一个数=加上一个负数
- 进制转换时,需要先把内存中存储的补码拿出来变成原码,再进行转换输出
原码,补码转换规则:
正数: 5 补码存储 -> 原码(=补码) -> 人们看到的数(一般为十进制) 0,000,0101 0,000,0101 5 负数:-5 补码存储 -> 原码(=补码取反+1) -> 人们看到的数(一般为十进制) 1,111,1011 1,000,0101 -5
- 正数:原码 <=> 补码 (原码=反码=补码,即:无需转换)
- 负数:原码 <=> 取反+1 <=> 补码
运算:用补码运算
负数:-5 补码存储 -> 原码(=补码取反+1) -> 人们看到的数(一般为十进制) 1,111,1011 1,000,0101 -5 运算: -5 << 1: 1,111,0110 1,000,1010 -10 -5 >> 1 1,111,1011 1,000,00101 -3 -5 + 1 1,111,1100 1,000,0100 -4
+,-
: 用加法加上正负数, eg:1-1 = 1+(-1) = (1的补码) + (-1的补码) = 0
*,/
: 按位左右移<<,>>
, eg:5*2 = 0101<<1 = 1010 = 10
,5*3=5*2+5=(0101<<1)+(0101)=1111=15
- 注意:
- 运算时用补码,包括符号位
- 原补码转换(取反)时,不包括符号位
- 进制间转换时用原码,不包括符号位
进制转换
# 1. 十进制 (type:int) => `bin(num)`/`oct(num)`/`hex(num)` => 二进制(0b),八进制(0o),十六进制(0x) (type:str) >>> bin(18) '0b10010' >>> oct(18) '0o22' >>> hex(18) '0x12' # 2. 二进制(0b),八进制(0o),十六进制(0x) (type: str) => `int(strNum,base=10)` => 十进制 (type: int) >>> int('0b10010',2) >>> int('0o22',8) >>> int('0x12',16) # 3. 注:bin(x) 返回的是简写版`符号+原码` >>> bin(5) '0b101' >>> bin(-5) '-0b101'
位运算:
&,|,^,~,<<,>>
直接操作二进制,按位运算,省内存,效率高# 1. << : 左移 = x*(2^n) >>> 0b0101<<1 10 >>> 5<<1 # = 5*2^1 10 >>> 0b0101<<2 20 >>> 5<<2 # = 5*2^2 20 # 2. >> : 右移 = x//(2^n) >>> 5>>1 # = 5//(2^1) 2 >>> 5>>2 # = 5//(2^2) 1 >>> -5>>1 # = -5//2 -3 # 3. &:按位与(都为1则为1) >>> 0b0101 & 0b1111 5 >>> 5&15 5 # 4. |:按位或(有1则为1) >>> 0b0101 | 0b1111 15 >>> 5|15 15 # 5. ^:按位异或(不同则为1) >>> 0b0101 ^ 0b1111 # => 0b1010 10 >>> 5^15 10 # 6. ~:按位翻转,结果为:`-(x+1)` (注:包括符号位翻转,所以不是反码) # eg: # 5 补码(正数,与原码相同): 0000,0101 # 运算~5(按位全部取反): 1111,1010 # 转换为原码(除符号位,取反+1): 1000,0110 => -6 >>> ~0b0101 -6 >>> ~5 -6 # 5取反+1=-5; -5取反+1=5 >>> ~5+1 -5 >>> ~5 -6 >>> ~-5+1 5 >>> bin(5) '0b101' >>> bin(~5) '-0b110' >>> bin(-5) '-0b101'
str,tuple,list,set,dict
常用方法总结:
- 增
- str/tuple:
+
,*n
-- 不可改,生成新的 - list:
+[...]
,*n
,.append(x)
,.extend(iterable)
,.insert(index,x)
- set:
.add(key)
,.update(iterable)
- dict:
xxx[key]=value
,.update({key:value,...})
,.setdefault(key,default=None)
- str/tuple:
- 删
- str/tuple: 不可改
- list:
del xxx[index]
,remove(item)
,pop(index=-1)
- set:
.remove(key)
,.discard(key)
,pop()
- dict:
del xxx[key]
,.pop(key)
,.popitem()
- 改
- str/tuple: /
- list:
xxx[index]=value
- set: /
- dict:
xxx[key]=value
,.update({key:value,...})
,.setdefault(key,default=None)
- 查
- str/tuple:
[index]
,[start:end:step]
,.index(x,start,end)
,.count(x)
- list:
[index]
,[start:end:step]
,.index(x,start,end)
,.count(x)
- set: /
- dict:
xxx[key]
,.get(key,default=None)
,keys()
,values()
,items()
- str/tuple:
- 公共方法
in/not in
存在与否.len(x)
,.max(x)
,.min(item)
for item in s
,for index,item in enumerate(s)
遍历 (对于dict,会遍历keys)
str
"...."
:
find/rfind/index/rindex/count(str, start=0, end=len(mystr))
注:index方法若是找不到会抛出一个异常replace(str1,str2,count=-1)
split(sep=None, maxsplit=-1)
,splitlines()
,join(iterable)
capitalize/title()
字符串的第一个/每个单词首字母大写lower/upper()
转换为小写/大写startswith/endswith(prefix[, start[, end]])
返回 True/Falseljust/rjust/center(width, fillchar=' ')
原字符串左/右/居中对齐,并使用空格填充至长度 width 的新字符串lstrip/rstrip/strip(chars=None)
去除字符串左/右/两边的空白字符rpartition/partition(seq)
从右/左边开始以seq分割成三部分,seq前,seq和seq后isalpha/isdigit/isalnum/isspace
返回True/False
Sample:
>>> a="Hello Tom! How are you? Are you ok?"
# find/index/count
>>> a.find('you')
19
>>> a.index('you')
19
>>> a.count('you')
2
# title,lower
>>> a.title()
'Hello Tom! How Are You? Are You Ok?'
>>> a.lower()
'hello tom! how are you? are you ok?'
>>> a.strip('?')
'Hello Tom! How are you? Are you ok'
# split/partition
>>> a.split('you')
['Hello Tom! How are ', '? Are ', ' ok?']
>>> a.partition('you')
('Hello Tom! How are ', 'you', '? Are you ok?')
# startswith
>>> a.startswith("Hello")
True
>>> a.startswith("You")
False
# isalpha
>>> a.isa
a.isalnum( a.isalpha( a.isascii(
>>> a.isalpha()
False
# 下标索引
>>> a[0]
'H'
# 切片
>>> a[0:3]
'Hel'
# in,not in
>>> a="123"
>>> '1' in a
True
>>> 'b' in a
False
# index,count
>>> a.index('2')
1
>>> a.count('2')
1
tuple
(item1,item2,...)
:
- 元素可以是不同类型的,可重复值,可嵌套 ( 与列表类似,不同之处在于元组创建后就不能修改 )
- 访问元素:
xxx[index]
,index从0开始 - 修改元素:不能修改元素,会抛出异常
- 查找元素:
index(item,start=0, stop=9223372036854775807)
返回index,左闭右开 [start,end),找不到抛出异常 - 查找某元素个数:
count(item)
- 可由元组,列表,字典(只取keys),集合,字符串创建元组:
tuple(s)
- 注:
- tuple所谓的“不变”是指tuple的每个元素指向不变
- 只有1个元素的tuple定义时须加一个逗号
,
,来消除歧义,eg:a=(1,)
Sample:
# tuple:元素可以是不同类型的,可重复值
>>> a=(123,12.4,'Hello',('a','b'),[1,2,3],'Hello')
# len
>>> len(a)
6
# 访问元素:xxx[index],index从0开始
>>> a[0]
123
# 修改元素:不能修改元素,会抛出异常
>>> a[1]='abc'
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: 'tuple' object does not support item assignment
# 注:tuple所谓的“不变”是指tuple的每个元素指向不变
>>> a[4]
[1, 2, 3]
>>> a[4][1]="a" # 变的不是tuple的元素,而是list的元素
>>> a
(123, 12.4, 'Hello', ('a', 'b'), [1, 'a', 3], 'Hello')
# 查找元素:index(item,start,end) 返回index,左闭右开 [start,end),找不到抛出异常
>>> a.index('Hello',2,6)
2
>>> a.index('Hello',3,6)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
ValueError: tuple.index(x): x not in tuple
# 查找元素个数: count(item)
>>> a.count('Hello')
2
>>> a.count([1,2,3])
1
# tuple(s): 由元组,列表,字典(只取keys),集合,字符串创建元组
>>> tuple((1,2,3))
(1,2,3)
>>> tuple([1,2,3])
(1,2,3)
>>> tuple({'a':1,'b':2})
('a', 'b')
>>> tuple({1, 23.0, 'ef'})
(1, 23.0, 'ef')
>>> tuple('123')
('1', '2', '3')
# 注:只有1个元素的tuple定义时必须加一个逗号,,来消除歧义
>>> a=(1)
>>> type(a)
<class 'int'>
>>> a=(1,)
>>> type(a)
<class 'tuple'>
>>>
list
[item1,item2,...]
:
- 元素可以是不同类型的,可重复值,可嵌套 ( 与元组类似,不同之处在于元组是不可变的,不能修改 )
- 访问/修改元素:
xxx[index]
xxx[index]=value
- 注:index从0开始
- 查找:
index(item,start=0, stop=9223372036854775807)
返回index,左闭右开 [start,end),找不到抛出异常- 查找某元素个数:
count(item)
- 是否存在某元素:
item in xxx
,item not in xxx
- 添加元素:
append(item)
extend([item1,item2,...])
insert(index,item)
- 删除元素:
del xxx[index]
pop(index=-1)
remove(item)
不能存在要移除元素,则会抛出异常- 排序:
sort(reverse=False)
reverse()
- 可由元组,列表,字典(只取keys),集合,字符串创建list:
list(s)
Sample:
#
a=[123,12.4,'Hello',('a','b'),[1,2,3],'Hello']
# len
>>> len(a)
6
# 访问元素: xxx[index] index从0开始
>>> a[0]
123
>>> a[2]
'Hello'
>>> a[2][0]
'H'
>>> a[3]
('a', 'b')
>>> a[3][0]
'a'
>>> a[4]
[1, 2, 3]
>>> a[4][1]
2
# 修改元素: xxx[index]=value
>>> a[2]="Tom"
>>> a
[123, 12.4, 'Tom', ('a', 'b'), [1, 2, 3], 'Hello']
# 查找元素:index,count
>>> a.index('Tom')
2
>>> a.count('Hello')
1
# 是否存在某元素:in,not in
>>> 'Tom' in a
True
>>> 2 in a
False
>>> ('a','b') in a
True
>>> 'Hi' not in a
True
# 添加元素: append(x),extend([...]),insert(index,x)
a = [5, 4, 2, 3]
>>> a.append(1) # [5, 4, 2, 3, 1]
>>> a.extend([2,8]) # [5, 4, 2, 3, 1, 2, 8]
>>> a.insert(0,7) # [7, 5, 4, 2, 3, 1, 2, 8]
>>> a.insert(-1,'Tom') # [7, 5, 4, 2, 3, 1, 2, 'Tom', 8]
# 删除元素: del xxxx[index] , pop(index=-1), remove(item)
>>> a=[]
>>> del a[0]
>>> a
[5, 4, 2, 3, 1, 2, 'Tom', 8]
>>> a.pop()
8
>>> a
[5, 4, 2, 3, 1, 2, 'Tom']
>>> a.pop(4)
1
>>> a
[5, 4, 2, 3, 2, 'Tom']
>>> a.remove('Tom')
>>> a
[5, 4, 2, 3, 2]
>>> remove(1)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
NameError: name 'remove' is not defined
# 排序: sort reverse
>>> a.sort()
>>> a
[2, 2, 3, 4, 5]
>>> a.sort(reverse=True)
>>> a
[5, 4, 3, 2, 2]
>>> a.reverse()
>>> a
[2, 2, 3, 4, 5]
# list(s): 可由元组,列表,字典(只取keys),集合,字符串创建
>>> list([1,2,3])
[1,2,3]
>>> list((1,2,3))
[1,2,3]
>>> list({'a':1,'b':2})
['a', 'b']
>>> list('123')
['1','2','3']
>>> list(set([1,2,3]))
[1,2,3]
set
{key1,key2,...}
:
- 无不可重复元素,可以是不同类型的,可嵌套
- 注:不可以放入可变对象,因为无法判断两个可变对象是否相等,也就无法保证set内部“不会有重复元素”
- 添加元素:
add(key)
update(iterable)
- 删除元素:
remove(key)
找不到会报错discard(key)
找不到不会报错pop()
- 判断是否存在某元素:
item in s1
item not in s1
- 交,并,差,补集:
- 交集:
s1&s2
,s1.intersection(s2)
,s1.intersection_update(s2)
--取交集并更新自己 - 并集:
s1|s2
,s1.union(s2)
- 差集:
s1-s2
,s1.differenc(s2)
,s1.difference_update(s2)
(注:s1-s2!=s2-s1) - 补集:
s1^s2
,s1.symmetric_difference(s2)
,s1.symmetric_difference_update(s2)
- 交集:
- 关系判断:相交不相交, 包含不包含
- 是否不相交
s1.isdisjoint(s2)
- 是否包含某集合
s1.issuperset(s2)
,s1>=s2
- 是否被某集合包含
s1.issubset(s2)
,s1<=s2
- 是否不相交
- 不变集合:
frozenset(s)
Sample:
>>> s1={1,5,9,3,9}
>>> len(s1)
4
>>> s1
{1, 3, 5, 9}
# 注:不可以放入可变对象,因为无法判断两个可变对象是否相等,也就无法保证set内部“不会有重复元素”
>>> s2={1,2,(1,2)}
>>> s2
{(1, 2), 1, 2}
>>> s2={1,2,[1,2]}
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: unhashable type: 'list'
# 添加元素 add(key),update(iterable)
>>> s1.add(2)
>>> s1
{1, 2, 3, 5, 9}
>>> s1.add('Tom')
>>> s1
{1, 2, 3, 5, 9, 'Tom'}
>>> s1.add(9)
>>> s1
{1, 2, 3, 5, 9}
>>> s1.update({7,10})
>>> s1
{1, 2, 3, 5, 7, 9, 10}
>>> s1.update([1,4])
>>> s1
{1, 2, 3, 4, 5, 7, 9, 10}
>>> s1.update('ef')
>>> s1
{1, 2, 3, 4, 5, 7, 9, 10, 'f', 'e'}
>>> s1.update(9)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: 'int' object is not iterable
# 删除元素 remove(key),discard(key)--不会报错,pop()
>>> s1.remove('Tom')
>>> s1
{1, 2, 3, 5, 9}
>>> s1.remove('a')
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
KeyError: 'a'
>>> s1.discard('a')
>>> s1.pop()
1
>>> s1
{2, 3, 5, 9, 'Tom'}
# 判断是否存在某元素: in,not in
>>> s1
{2, 3, 5, 9, 'Tom'}
>>> 5 in s1
True
>>> 7 not in s1
True
>>> 1 in s1
False
# 交,并,差,补集
>>> s1={1,5,6,9}
>>> s2={3,5,9,10}
>>> s1&s2 # 交集 s1.intersection(s2), s1.intersection_update(s2) -- 取交集并更新自己
{9, 5}
>>> s1|s2 # 并集 s1.union(s2)
{1, 3, 5, 6, 9, 10}
>>> s1-s2 # 差集 differenc,difference_update
{1, 6}
>>> s2-s1
{10, 3}
>>> s1^s2 # 补集 symmetric_difference, symmetric_difference_update
{1, 3, 6, 10}
# 关系判断:相交不相交,包含不包含
>>> s1.isdisjoint(s2) # 是否不相交
False
>>> {1,3}.isdisjoint({2,5})
True
>>> s1.issuperset(s2) # 是否包含某集合(a>=b)
False
>>> {1,3,5}.issuperset({3,5})
True
>>> {1,3,5} >= {3,5}
>>> s1.issubset(s2) # 是否被包含 (a<=b)
False
>>> {3,5}.issubset({1,3,5})
True
>>> {3,5}<={1,3,5}
True
# frozenset 不变集合
>>> a=frozenset({1,3,5})
>>> a
frozenset({1, 3, 5})
>>> a.add(4)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
AttributeError: 'frozenset' object has no attribute 'add'
# set(s): 可由元组,列表,字典(只取keys),集合,字符串创建
>>> set((1,2,3))
{1,2,3}
>>> set({1,2,3})
{1,2,3}
>>> set([1,2,2,3,1,2])
{1,2,3}
>>> set({'a':1,'b':2})
{'a', 'b'}
>>> set('123')
{'2', '3', '1'}
dict
{key1:value1,key2:value2,...}
:
key:value
键值对,key需为不可改变类型,如int,float,str,tuple- 访问元素:
xxx[key]
.get(key,default=None)
- 更新/添加元素:
xxx[key]=value
.update({key:value,...})
.setdefault(key,default=None)
- 删除元素:
del xxx[key]
.pop(key)
.popitem()
.clear()
- 获取对象
keys()
返回一个包含所有KEY的列表values()
返回一个包含所有value的列表items()
返回一个元组列表[(key,value),...]
- 判断是非存在某key:
key in d
key not in d
Sample:
>>> d={'a':1,'b':2}
>>> len(d)
2
# 访问元素: xxxx[key],xxx.get(key,default=None)
>>> d['a']
1
>>> d['b']
2
>>> d.get('a')
1
>>> d.get('c','N/A')
'N/A'
# 更新/插入元素: xxx[key]=value,.update({...}),.setdefault(key,default=None)
>>> d['a']='123'
>>> d['c']=789
>>> d['e']=(1,2)
>>> d
{'a': '123', 'b': 2, 'c': 789, 'e': (1, 2)}
>>> d.update({'b':'Hello','f':'Tom'})
>>> d
{'a': '123', 'b': 'Hello', 'c': 789, 'e': (1, 2), 'f': 'Tom'}
>>> d.setdefault('a',99)
'123'
>>> d
{'a': '123', 'b': 'Hello', 'c': 789, 'e': (1, 2), 'f': 'Tom'}
>>> d.setdefault('g',99)
99
>>> d
{'a': '123', 'b': 'Hello', 'c': 789, 'e': (1, 2), 'f': 'Tom', 'g': 99}
# 删除元素: del xxx[key],pop(key),popitem(),clear()
>>> del d['e']
>>> d
{'a': '123', 'b': 'Hello', 'c': 789, 'f': 'Tom', 'g': 99}
>>> d.pop('a')
'123'
>>> d
{'b': 'Hello', 'c': 789, 'f': 'Tom', 'g': 99}
>>> d.popitem()
('g', 99)
>>> d
{'b': 'Hello', 'c': 789, 'f': 'Tom'}
# keys
>>> d.keys()
dict_keys(['a', 'b', 'c'])
# values
dict_values(['123', 2, 789])
# items
>>> d.items()
dict_items([('a', '123'), ('b', 2), ('c', 789)])
# 判断key是否存在:key in d, key not in d
>>> 'c' in d
True
>>> 'g' in d
False
>>> 'a' not in d
True
>>> d.setdefault('k')
>>> d
{'b': 'Hello', 'c': 789, 'f': 'Tom', 'k': None}
>>> 'k' in d
True
函数
def test1():
pass
def test2(step=1)
print("step=%s" % step)
def test3()
return [1,2,3]
>>> test1()
>>> test2(5)
step=5
>>> test3()
[1,2,3]
作用域
- 局部变量:
- 在函数内部定义的变量,只能在本函数中用
- 可使用
- 全局变量:
- 在函数外定义的变量
- 使用:可以在所有的函数中使用
- 修改:在函数中不能修改全局变量的指向
- 可变类型的全局变量 => 其指向的数据直接可以修改
- 不可类型的全局变量 => 使用global声明后可修改
- 注:
- 可使用
globals()
、locals()
列出当前作用域中所有全局/局部变量 - 函数中同名变量,局部变量优先全局变量
- 实际上,Python使用
LEGB顺序
规则来查找一个符号对应的对象Locals(局部) -> Enclosing function(嵌套函数)-> Globals(全局) -> Builtins(Python启动时会自动载入)
- 可使用
Sample1:查看全局/局部变量
import math
NumA=10
NumB=20
def testScope(*arg):
a=1
b='Hello'
print("locals:",locals()) # print 局部变量 (注:函数参数也属于locals)
print("globals:",globals()) # print 全局变量 (注:包括python内建模块和手动import的)
>>> testScope('Tom')
locals: {'arg': ('Tom',), 'a': 1, 'b': 'Hello'}
globals: {'__name__': '__main__', '__doc__': None, '__package__': None, '__loader__': <class '_frozen_importlib.BuiltinImporter'>, '__spec__': None, '__annotations__': {}, '__builtins__': <module 'builtins' (built-in)>, 'NumA': 10, 'NumB': 20, 'testLocals': <function testLocals at 0x10c7acf28>, 'testScope': <function testScope at 0x10c780950>, 'math': <module 'math' from '/anaconda3/lib/python3.7/lib-dynload/math.cpython-37m-darwin.so'>}
Sample2: 全局/局部变量优先级
Num=10
# 1. 使用全局变量
def testGlobalVar(step=1):
x=Num+step # 使用全局变量
print("x=%s" % x)
>>> testVar(3)
x=13
# 2. 使用局部变量
def testLocalVar(step=1):
Num=20 # 同名变量,局部变量优先级高于全局变量
x=Num+step
print("x=%s,Num=%s" % (x,Num))
>>> testVar4(3)
x=23,Num=20
>>> Num
13
Sample3: 修改全局变量
# 1. 不可变类型的全局变量
Num=10 # 不可变类型
def testVar(step=1):
x=Num+step
Num=x
print("x=%s,Num=%s" % (x,Num))
>>> testVar(3)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 2, in testVar2
UnboundLocalError: local variable 'Num' referenced before assignment
def testVar(step=1):
global Num # 使用global声明后可修改该全局变量
x=Num+step
Num=x
print("x=%s,Num=%s" % (x,Num))
>>> testVar(3)
x=13,Num=13
>>> Num # 全局变量Num已修改
13
# 2. 可变类型的全局变量
A=[1,2,3]
def testVar(step=1):
A.append(step)
print(A)
>>> testVar(4)
[1, 2, 3, 4]
>>> A # 全局变量A已修改
函数参数
- 参数
- 引用传参: 引用传递,不是值传递
- 不可变类型: 函数中运算不会影响到变量自身
- 可变类型: 函数中运算可能会影响到变量自身
- 缺省参数(默认参数)
- 一定要位于参数列表的最后面
- (可在可变参数
*
前面或后面,但一定要在关键字参数**
前面,不然会有歧义报错)
- 不定长参数:
- 用于处理比当初声明时更多的参数
*
变量:- 表示可变参数,tuple类型
- 如:
*args
,args用tuple存放未命名的变量参数 - 可直接传入,也可先组装list或tuple,再通过
*
传入,如:func(*(1, 2, 3))
- 如:
- 表示可变参数,tuple类型
**
变量:- 表示关键字参数,dict类型,可以限制传入的参数名,同时提供默认值
- 如:
**kwargs
,kwargs用dict存放命名的参数(即key=value的参数) - 可直接传入,也可先组装dict,再通过
**
传入,如:func(**{'a': 1, 'b': 2})
- 有
*
变量时,需在*
变量后面(即最后面)
Sample1:传入不可变/可变参数
# 1. 传入不可变参数
def testArg(a,b):
# a=a+b
a+=b
print("a=%s" % a)
>>> a,b=3,5
>>> testArg(a,b)
a=8
>>> a # a 未变
3
# 2. 传入可变参数
def testArg(a,b):
a.append(b)
print('a=%s' % a)
>>> a,b=[1,2,3],4
>>> testArg(a,b)
a=[1, 2, 3, 4]
>>> a # a 变了
[1, 2, 3, 4]
Sample2: 缺省参数(默认参数)
# 1. 单个缺省参数
def testArg(a,b=1):
print('a=%s,b=%s,a+b=%s' % (a,b,a+b))
>>> testArg(3,5)
a=3,b=5,a+b=8
>>> testArg(3)
a=3,b=1,a+b=4
# 2.多个缺省参数(位于参数列表的最后面)
def testArg(a,b=1,c=None,d='abc'):
print('a=%s,b=%s,c=%s,d=%s' % (a,b,c,d))
>>> testArg(1,2,3,4)
a=1,b=2,c=3,d=4
>>> testArg(1,2,3)
a=1,b=2,c=3,d=abc
>>> testArg(1,2)
a=1,b=2,c=None,d=abc
>>> testArg(1)
a=1,b=1,c=None,d=abc
>>> testArg(1,d=5,c=4) # 指定传入参数
a=1,b=1,c=4,d=5
>>> testArg(1,[2,3]) # 传入可变参数
a=1,b=[2, 3],c=None,d=abc
Sample3: 不定长参数
# 1. *arg,**kwargs
def testArg(a,b,*args,**kwargs):
print("a=%s,b=%s,args=%s,kwargs=%s" % (a,b,args,kwargs))
>>> testArg(1,2)
a=1,b=2,args=(),kwargs={}
>>> testArg(1,2,3,4,x='001',y='002') # 直接传入
a=1,b=2,args=(3, 4),kwargs={'x': '001', 'y': '002'}
>>> m=(7,8,9)
>>> n={'k':'005','z':'006'}
>>> testArg(1,2,m,n)
a=1,b=2,args=((7, 8, 9), {'k': '005', 'z': '006'}),kwargs={}
>>> testArg(1,2,*m,**n) # 组装后传入
a=1,b=2,args=(7, 8, 9),kwargs={'k': '005', 'z': '006'}
>>> testArg(1,2,3,4,*m,**n,x='001',y='002')
a=1,b=2,args=(3, 4, 7, 8, 9),kwargs={'k': '005', 'z': '006', 'x': '001', 'y': '002'}
>>> testArg(1,2,3,4,*m,**n,x='001',z='002')
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: testArg() got multiple values for keyword argument 'z'
# 2. + default arg
def testArg(a,b,c='abc',*args,**kwargs): # 或 def testArg(a,b,*args,c='abc',**kwargs):
print("a=%s,b=%s,c=%s,args=%s,kwargs=%s" % (a,b,c,args,kwargs))
>>> testArg(1,2)
a=1,b=2,c=abc,args=(),kwargs={}
>>> testArg(1,2,3,x='001',y='002')
a=1,b=2,c=3,args=(),kwargs={'x': '001', 'y': '002'}
>>> testArg(1,2,3,4,x='001',y='002')
a=1,b=2,c=3,args=(4,),kwargs={'x': '001', 'y': '002'}
>>> testArg(1,2,x='001',y='002',c=4)
a=1,b=2,c=4,args=(),kwargs={'x': '001', 'y': '002'}
>>> testArg(1,2,5,c=4,x='001',y='002')
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: testArg() got multiple values for argument 'c'
函数返回值
- 无返回值
- 返回一个变量,eg:
return x
- 返回多个变量,eg:
return x,y
本质是返回了一个元组 - 注意:变量也可以代表函数,所以也可返回函数
Sample:
# 1. 无返回值
def testReturn(a,b):
print("a+b=%s" % (a+b))
>>> testReturn(1,3)
a+b=4
# 2. 返回一个变量
def testReturn(a,b):
return a+b
>>> testReturn(1,3)
4
>>> testReturn([1,2],[3,4])
[1, 2, 3, 4]
# 3. 返回多个变量
def testReturn(a,b):
return a,b,a+b
>>> testReturn(2,3)
(2, 3, 5)
>>> a,b,c=testReturn(2,3)
>>> a,b,c
(2, 3, 5)
>>> a
2
>>> b
3
>>> c
5
>>> a,b,c=testReturn([1,2],[3,4])
>>> a,b,c
([1, 2], [3, 4], [1, 2, 3, 4])
# 4. 返回函数
def testReturn(a,b):
def test_in(c):
return a+b+c
return test_in
>>> f=testReturn(3,5)
>>> f
>>> f(1)
9
>>> f(2)
10
高级特性:闭包
- 返回内部函数,内部函数使用了外部包裹函数作用域里变量(非全局变量),则称内部函数为闭包
- 优点:
- 可提高代码可复用性
- 似优化了变量,原来需要类对象完成的工作,闭包也可以完成
- 缺点:
- 由于闭包引用了外部函数的局部变量,则外部函数的局部变量没有及时释放,消耗内存
Sample1:闭包
def test(a):
def test_in(b):
print("a=%s,b=%s,a+b=%s" % (a,b,a+b))
return a+b
return test_in
>>> f=test(3)
>>> f
<function test.<locals>.test_in at 0x107af51e0>
>>> f(5)
a=3,b=5,a+b=8
8
>>> f(8)
a=3,b=8,a+b=11
11
Sample2:返回一个计数器
def counter(start=0):
global total
total=start
def incr():
total+=1 # total相对于incr()为外部变量,且为不可变类型,修改会报错
return total
return incr
>>> ct1=counter()
>>> a,b,c=ct1(),ct1(),ct1()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 4, in incr
UnboundLocalError: local variable 'total' referenced before assignment
# solution1: 使用可变型变量
def counter(start=0):
total=[start]
def incr():
total[0] += 1
return total[0]
return incr
# solution2: 使用nonlocal声明
def counter(start=0):
total=start
def incr():
nonlocal total
total+=1
return total
return incr
# 或者:
def counter(start=0):
def incr():
nonlocal start
start+=1
return start
return incr
# test:
>>> ct1=counter()
>>> ct2=counter(5)
>>> a,b,c=ct1(),ct1(),ct1()
>>> a,b,c
>>> (1,2,3)
>>> x,y,z=ct2(),ct2(),ct2()
>>> x,y,z
(6, 7, 8)
高级特性: 列表生成式
List Comprehensions: []
列表生成式即用来创建list的生成式
#
>>> list(range(1, 11))
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
>>> [x * x for x in range(1, 11)]
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
>>> [x * x for x in range(1, 11) if x % 2 == 0]
[4, 16, 36, 64, 100]
>>> d = {'x': 'A', 'y': 'B', 'z': 'C' }
>>> [k+'='+v for k, v in d.items()]
['y=B', 'x=A', 'z=C']
高级特性:生成器(generator)
- 生成器: 一个不断产生值的函数,边计算边产出值
- 生成过程: 每次迭代从上一次返回时的函数体中的位置开始,所使用的参数都是第一次所保留下的,不是新创建的(生成器“记住”了数据状态和流程中的位置)
- 相比一次列出所有内容的优势: 节约内存,响应迅速,使用灵活
- 创建生成器:
- 使用包含
yield
语句的函数, 生成器每调用一次,会在yield位置产生一个值, 直到函数执行结束 - 简单的生成器,可以通过将一个列表生成式的
[ ]
改成( )
直接生成
- 使用包含
- 获取生成器生成的元素:
- 通过
next(g)
函数获得生成器的下一个返回值- 没有更多的元素时,抛出
StopIteration
的异常 - 生成器函数的返回值包含在StopIteration的value中,可通过捕获StopIteration错误获取返回值
- 没有更多的元素时,抛出
- 通过
for in
循环- 没有更多的元素时,不会抛出
StopIteration
的异常 - 拿不到生成器函数的返回值(return语句的返回值)
- 没有更多的元素时,不会抛出
- 注:生成器中元素都取出后就无用了
- 通过
Sample1:
# 1. 普通函数: 一次列出所有(如果n=100M,1000M,...会很慢)
def double(n):
return [i*2 for i in range(1,n)]
# 2. 生成器函数: 用多少生成多少,效率高(更节约内存和快捷)
def gen(n):
for i in range(1,n):
yield i*2
# 3. 创建生成器,下面g1与g2效果相同
g1 = gen(5)
g2 = (x*2 for x in range(1,5))
>>> g1
<generator object <genexpr> at 0x1114b1e58>
>>> g2
# 4. 获取生成器生成的元素
# Method1: 使用`next(g)`函数
>>> next(g1)
2
>>> next(g1)
4
>>> next(g1)
8
>>> next(g1)
---------------------------------------------------------------------------
StopIteration Traceback (most recent call last)
<ipython-input-51-e734f8aca5ac> in <module>
----> 1 next(g)
StopIteration:
# Method2: 使用`for in g`循环遍历
for i in g2:
...: print(i," ",end="")
...:
2 4 6 8
Sample2:
# 1. 定义生成器函数
def odd():
print('step 1')
yield 1
print('step 2')
yield(3)
print('step 3')
yield(5)
return "no more"
# 2. 定义遍历生成器函数
def getAll_by_forin(g):
for i in g:
print(i)
def getAll_by_next(g):
while True:
try:
print('x=%s' % next(g))
except StopIteration as e:
print('Generator return value:', e.value)
break
print('finish!')
# test1: 使用 for in 循环
>>> g1=odd()
>>> getAll_by_forin(g1)
step 1
1
step 2
3
step 3
5
# test2: 使用 next(g)
>>> g2=odd()
>>> getAll_by_next(g1)
tep 1
x=1
step 2
x=3
step 3
x=5
Generator return value: no more
finish!
高级特性:迭代器(Iterator)
- 迭代器(Iterator)对象:
- 可以被
next()
函数调用并不断返回下一个值的对象(直到没有数据时抛出StopIteration
错误) Iterator
对象表示的是一个惰性计算(即只有在需要获取下一个数据时它才会计算)的序列(即数据流,可以无限大,如全体自然数)- 如:生成器generator对象
- 可以被
- 可迭代对象(Iterable):
- 可以直接作用于
for in
循环的对象 - 如: list、tuple、dict、set、str,generator
- 可以直接作用于
- 迭代器
Iterator
对象一定是可迭代对象Iterable
(可以用for in
循环),反之不一定 - 可使用
iter()
函数将Iterable
对象转换为Iterator
对象 - 可以使用
isinstance()
判断一个对象类型,如:判断是否是Iterable
对象,是否是Iterator
对象
Sample:
import collections
# 1. list,tuple,dict,set,str等是`Iterable`对象但不是`Iterator`对象
>>> l=[1,2,3]
>>> isinstance(l,Iterable)
True
>>> isinstance(l,Iterator)
False
>>> next(l)
next(l)
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-79-cdc8a39da60d> in <module>
----> 1 next(l)
TypeError: 'list' object is not an iterator
>>> for i in l:
...: print(i)
...:
1
2
3
# 2. 使用`iter(x)`函数将`Iterable`对象转换为`Iterator`对象
>>> a=iter(l)
>>> isinstance(a,Iterable)
True
>>> isinstance(a,Iterator)
True
>>> next(a)
1
>>> next(a)
2
# 3. generator对象是`Iterator`对象
>>> g=(x*2 for x in range(1,5))
>>> isinstance(l,Iterable)
True
>>> isinstance(l,Iterator)
True
>>> next(g)
2
>>> next(g)
4
高级特性:装饰器(decorator)
- 增强函数功能(例如在函数调用前后自动打印日志),但不修改函数原本定义
- 本质上,decorator就是一个返回函数的高阶函数
Sample:
原始函数特性
def sum(a,b): print("a+b=%s" % (a+b)) # 查看函数名字`__name__` >>> dir(sum) ['__annotations__', '__call__', '__class__', '__closure__', '__code__', '__defaults__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__get__', '__getattribute__', '__globals__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__kwdefaults__', '__le__', '__lt__', '__module__', '__name__', '__ne__', '__new__', '__qualname__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__'] >>> sum.__name__ 'sum' >>> f=sum >>> f.__name__ 'sum' >>> sum(3,5) a+b=8 >>> f(3,5) a+b=8
定义使用一个无参decorator
# 1. 定义decorator:log def log(func): def func_wrapper(*args, **kw): print('call %s(%s,%s):' % (func.__name__,args,kw)) return func(*args, **kw) return func_wrapper # 2. 在原函数上加上decorator: 使用`@` # 把`@log`放到`sum()`函数的定义处,相当于执行了:`sum = log(sum)` @log def sum(a,b): print("a+b=%s" % (a+b)) # 3. test: >>> sum(3,5) call sum((3, 5),{}): a+b=8 >>> sum.__name__ 'func_wrapper'
定义使用一个有参decorator
# 1. 定义decorator:log (decorator本身传入参数,需返回一个decorator的高阶函数) def log(text): def log_wrapper(func): def func_wrapper(*args, **kw): print('%s %s(%s,%s):' % (text,func.__name__,args,kw)) return func(*args, **kw) return func_wrapper return log_wrapper # 2. 使用decorator # 相当于执行了: `sum = log('execute:')(sum)` @log('execute:') def sum(a,b): print("a+b=%s" % (a+b)) # 3. test: >>> sum(3,5) execute: sum((3, 5),{}): a+b=8 >>> sum.__name__ 'func_wrapper'
Issue: 被装饰后的函数其实已经是另外一个函数了(函数名等函数属性会发生改变),eg: 装饰后的函数
__name__
等不准确了# 需要把原始函数的`__name__`等属性复制到`func_wraper()`函数 # 可使用functools.wraps(function)装饰器来消除这样的副作用 import functools def log(text): def log_wrapper(func): @functools.wraps(func) # 添加functools.wraps(function)装饰器 def func_wrapper(*args, **kw): print('%s %s(%s,%s):' % (text,func.__name__,args,kw)) return func(*args, **kw) return func_wrapper return log_wrapper @log('execute:') def sum(a,b): print("a+b=%s" % (a+b)) # test: >>> sum(3,5) execute: sum((3, 5),{}): a+b=8 >>> sum.__name__ 'sum'
匿名函数
- lambda函数:
lambda [arg1 [,arg2,.....argn]]:expression
冒号
前面的表示函数参数,可以无参- 只能有一个
表达式
,不用写return
,返回值就是该表达式的结果
- 应用: 函数作为参数,返回值传递
Sample:
# 1. lambda函数
>>> sum=lambda a,b:a+b # 相当于 def sum(a,b): return a+b
>>> sum
<function <lambda> at 0x10787c158>
>>> sum(3,5)
8
# 2. 应用: 作为参数
>>> a=[3,1,5]
>>> a.sort()
>>> a
[1, 3, 5]
>>> a=[
... {'name':'Tom','age':18}
... ,{'name':'Lucy','age':22}
... ,{'name':'Andy','age':20}
... ]
>>> a.sort()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: '<' not supported between instances of 'dict' and 'dict'
>>> k=lambda item:item['name']
>>> a.sort(key=k)
>>> a
[{'name': 'Andy', 'age': 20}, {'name': 'Lucy', 'age': 22}, {'name': 'Tom', 'age': 18}]
>>> a.sort(key=lambda x:x['age'])
>>> a
[{'name': 'Tom', 'age': 18}, {'name': 'Andy', 'age': 20}, {'name': 'Lucy', 'age': 22}]
内建函数
Build-in Function
- python解释器启动后默认加载
- 查看:
dir(__builtins__)
- 使用:
help(function)
常用的内建函数
range(stop)
,range(start stop[, step])
-> integer list (默认start=0,不包括stop,使用了惰性计算)>>> a = range(5) >>> a range(0, 5) >>> list(a) [0, 1, 2, 3, 4] >>> [x*2 for x in a] [0, 2, 4, 6, 8]
map(func, *iterables)
根据提供的函数对指定序列做映射(使用了惰性计算)>>> a=[1,2,3] >>> b=map(lambda x:x*x,a) >>> list(b) [1, 4, 9] >>> c=map(lambda x,y:(x,y),[1,2,3],['a,','b','c','d']) >>> list(c) [(1, 'a,'), (2, 'b'), (3, 'c')] >>> d=map(str,[1,2,3,'a']) >>> list(d) ['1', '2', '3', 'a'] >>> t=map(str,(1,2,3,'a')) >>> tuple(t) ('1', '2', '3', 'a')
filter(function or None, iterable)
从一个序列中筛出符合条件的元素(使用了惰性计算,所以只有在取filter()
结果的时候,才会真正筛选并每次返回下一个筛出的元素)>>> a=filter(lambda x:x%2,[1,2,3,4]) # 参数 function 返回1/0 => 布尔值True/False >>> list(a) [1, 3] >>> b=filter(lambda x:x%2,{1,2,3,4}) >>> set(b) {1, 3}
sorted(iterable, cmp=None, key=None, reverse=False)
-> new sorted object>>> a=sorted([1,5,4,2,9]) >>> a [1, 2, 4, 5, 9] >>> a=[{'name':'Tom','age':18},{'name':'Lucy','age':22},{'name':'Andy','age':20}] >>> result=sorted(a,key=lambda x:x['age']) >>> result [{'name': 'Tom', 'age': 18}, {'name': 'Andy', 'age': 20}, {'name': 'Lucy', 'age': 22}]
functools
工具函数包>>> import functools >>> dir(functools) ['RLock', 'WRAPPER_ASSIGNMENTS', 'WRAPPER_UPDATES', '_CacheInfo', '_HashedSeq', '__all__', '__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__spec__', '_c3_merge', '_c3_mro', '_compose_mro', '_convert', '_find_impl', '_ge_from_gt', '_ge_from_le', '_ge_from_lt', '_gt_from_ge', '_gt_from_le', '_gt_from_lt', '_le_from_ge', '_le_from_gt', '_le_from_lt', '_lru_cache_wrapper', '_lt_from_ge', '_lt_from_gt', '_lt_from_le', '_make_key', 'cmp_to_key', 'get_cache_token', 'lru_cache', 'namedtuple', 'partial', 'partialmethod', 'recursive_repr', 'reduce', 'singledispatch', 'total_ordering', 'update_wrapper', 'wraps']
reduce(function, sequence[, initial])
-> value>>> a=functools.reduce(lambda x, y: x+y, [1,2,3,4]) >>> a 10 >>> a=functools.reduce(lambda x, y: x+y, [1,2,3,4],10) >>> a 20
partial(func, *args, **keywords)
偏函数:简化func,设置默认值 -> new function 这样调用这个新函数会更简单>>> int2 = functools.partial(int, base=2) >>> int2('1000000') # 等同:int('1000000',base=2) 64 >>> def showarg(*args, **kw): ... print("args=%s,kw=%s" % (args,kw)) ... >>> f=functools.partial(showarg, 1,2,3,a=3,b='linux') >>> f() args=(1, 2, 3),kw={'a': 3, 'b': 'linux'} >>> f(4,5,c='hello') args=(1, 2, 3, 4, 5),kw={'a': 3, 'b': 'linux', 'c': 'hello'}
wraps(func)
装饰器: 消除被装饰函数的函数名等函数属性发生改变的副作用def note(func): "note function" @functools.wraps(func) def func_wrapper(*args,**kw): "wrapper function" print('call %s(%s,%s):' % (func.__name__,args,kw)) return func(*args,*kw) return func_wrapper @note def test(a,b): 'test function' print('a=%s,b=%s' % (a,b)) >>> test(3,5) call test((3, 5),{}): a=3,b=5 >>> test.__name__ 'test' >>> test.__doc__ 'test function'
类与实例
class Cat: # 定义类
pass
>>> c=Cat() # 创建实例
# 1. 类对象
>>> Cat
__main__.Cat
>>> type(Cat)
type
# 2. 实例对象
>>> c
<__main__.Cat at 0x104f875f8>
>>> type(c)
__main__.Cat
类对象与实例对象
- 类对象(class) : 模版
- 实例对象(instance) : 根据类创建的对象
- 获取对象信息:
type(x)
: 获取对象类型isinstance(x,class_or_tuple)
: 判断class的类型dir(x)
: 查看一个对象的所有属性和方法(包括私有)
特性:
- 类的三大特性:封装,继承,多态 (注:python类不仅支持单继承,还支持多继承)
- python是动态语言,可动态给类/对象增删改属性和方法
- 增/改:
类/实例对象.xxx=...
- 删除:
del 类/实例对象.xxx
,delattr(类/实例对象,name)
- 增/改:
访问限制
- 公有: 默认
- 保护:以
_
开头,自己内部和子类可访问,不能用from module import *
导入(即不可跨包) - 私有: 以
__
开头,只有自己内部可以访问,外部(包括子类,实例对象等)不能访问 - 注:
- 一般将属性私有化,以防止外部随意访问修改
- 可通过编写
getXxx/setXxx
方法或@property
控制对外开放接口
访问
- 属性/方法:
类.xxx
,实例.xxx
- 方法调用:
类.xxx(...)
,实例.xxx(...)
- python也提供了内置函数判断获取对象属性或方法(无法判断或调用私有属性/方法)
getattr(obj,name)
setattr(obj,name,value)
hasattr(obj,name)
- 属性/方法:
类成员:属性
class Cat:
name='cls_cat' # 类属性
def __init__(self):
self.age=1 # 实例属性
c=Cat()
>>> Cat.name,c.name
('cls_cat', 'cls_cat')
>>> c.name='self_cat' # 添加同名实例属性
>>> Cat.name,c.name # 实例属性会屏蔽掉同名的类属性
('cls_cat', 'self_cat')
>>> del c.name # 删除实例属性
>>> Cat.name,c.name
('cls_cat', 'cls_cat')
>>> c.age
1
- 类属性:
- 类所有,所有实例共享一个属性,在内存中只存在一个副本
- 在类外访问:可通过类和实例对象访问公有类属性
类名.xxx
,实例名.xxx
- 在类外添加:
类名.xxx=...
- 实例属性:
- 实例所有,各个实例各自维护,互不影响
- 在类外访问:通过实例对象访问
实例名.xxx
- 在类外添加:
实例名.xxx=...
- 注: 使用
实例名.xxx
访问属性时,先找实例属性,没有再找类属性,所以:- 类属性和实例属性同名,用
实例名.xxx
访问时,实例属性会屏蔽掉同名的类属性 - 在类外修改类属性,必须通过类对象(如果通过实例对象,会产生一个同名的实例属性)
- 类属性和实例属性同名,用
- 高级
- 属性私有化: 一般将属性私有化,以防止外部随意访问修改
- 法一:编写
getXxx/setXxx
方法 - 法二:使用
@property
标示为属性函数(即将一个方法变成属性调用)
- 法一:编写
- 限制动态添加实例属性:
- 定义class的时候,定义一个特殊的
__slots__
变量,来限制该class实例能添加的属性 - 用
tuple
定义允许绑定的属性名称:__slots__=(...)
- 注:
__slots__
仅对当前类实例起作用,对继承的子类无用,除非在子类中也定义,这样子类实例允许定义的属性就是自身的__slots__
加上父类的__slots__
- 定义class的时候,定义一个特殊的
- 属性私有化: 一般将属性私有化,以防止外部随意访问修改
Sample1: 属性私有化
法一:编写
getXxx/setXxx
方法开放端口class Cat: def __init__(self): self.__age=0 def getAge(self): return self.__age def setAge(self,value): if not isinstance(value, int): raise ValueError('age must be an integer!') if value < 0 or value > 100: raise ValueError('age must between 0 ~ 100!') self.__age = value c=Cat() >>> c.age -------------------------------------------------------------------------- AttributeError Traceback (most recent call last) <ipython-input-257-4dd7fde137b3> in <module> ----> 1 c.age AttributeError: 'Cat' object has no attribute 'age' >>> c.setAge(3) >>> c.getAge() 3
法二:使用
@property
标示为属性函数class Cat: def __init__(self): self.__age=0 @property # getter def age(self): return self.__age @age.setter # setter def age(self,value): if not isinstance(value, int): raise ValueError('age must be an integer!') if value < 0 or value > 100: raise ValueError('age must between 0 ~ 100!') self.__age = value c=Cat() >>> c.age=5 >>> c.age 5
Sample2:限制动态添加实例属性
class Cat:
__slots__=('name','age') # 用tuple定义允许绑定的属性名称
class ColorfulCat(Cat):
pass
class PureCat(Cat):
__slots__=('color')
# 1. test 父:Cat
>>> c=Cat() # 父类实例对象 -- 只允许属性:name,age
>>> c.name='Tom'
>>> c.age=3
>>> c.country='China'
c.country="China"
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-269-64570eaae7be> in <module>
----> 1 c.country="China"
AttributeError: 'Cat' object has no attribute 'country'
# 2. test 子:ColorfulCat
>>> e=ColorfulCat() # 子类实例对象--无限制
>>> e.name='Tom'
>>> e.name
'Tom'
>>> e.country='China'
>>> e.country
'China'
# 3. test 子:PureCat
>>> p=PureCat() # 子类实例对象--允许属性:父(name,age)+ 子(color)
>>> p.color='White'
>>> p.name='Tom'
>>> p.age=1
>>> p.country='China'
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-281-3661a6a00d34> in <module>
----> 1 p.country='China'
AttributeError: 'PureCat' object has no attribute 'country'
类成员:方法
class Cat:
name='cls_cat'
def eat(self): # 实例方法
print('%s is eating' % self.name)
@classmethod
def from_setting(cls): # 类方法
cls.name='cls_cat_fresh'
# 可添加类方法(第一个参数代表类):
@classmethod
def say_cls(x,y):
return "%s say %s" % (x.name,y)
cls.say_cls=say_cls
# 可添加实例方法(第一个参数代表实例):
cls.say_self=lambda x,y:"%s say %s" % (x.name,y)
@staticmethod # 静态方法
def play():
print("%s is playing" % Cat.name)
c=Cat()
>>> Cat.name,c.name
('cls_cat', 'cls_cat')
# 1. 调用类方法
>>> Cat.from_setting() # or use: c.from_setting()
>>> Cat.name,c.name
('cls_cat_fresh', 'cls_cat_fresh')
# 2. 调用静态方法
>>> Cat.play() # or use: c.play()
cls_cat_fresh is playing
# 3. 调用实例方法
>>> c.eat() # can't use: Cat.eat()
cls_cat_fresh is eating
>>> c.name='self_cat'
>>> c.eat()
self_cat is eating
# 4. 调用from_setting()后添加的方法
>>> c.say_self('Hello')
'self_cat say Hello'
>>> c.say_cls('Hello')
'cls_cat_fresh say Hello'
- 类方法:
- 需用修饰器
@classmethod
标识,第一个参数(习惯用变量名cls
)必须是类对象(类外调用时,不用传递该参数,Python解释器会自己把类对象传入) - 方法中:可修改类定义
- 在类外访问:可通过实例和类访问
类名.xxx(...)
,实例名.xxx(...)
- 需用修饰器
- 实例方法:
- 不用修饰,第一参数(习惯用变量名
self
)必须是实例对象本身(类外调用时,不用传递该参数,Python解释器会自己把实例对象传入) - 方法中:可修改实例属性(eg:
self.xxx
) - 在类外访问:通过实例访问
实例名.xxx(...)
(不可使用类访问)
- 不用修饰,第一参数(习惯用变量名
- 静态方法:
- 需用修饰器
@staticmethod
标识,无参数限制 - 方法中:可通过类调用类属性,类方法,静态方法
类名.xxx
,类名.xxx(...)
- 在类外调用:可通过实例和类访问
类名.xxx(...)
,实例名.xxx(...)
- 需用修饰器
继承与多态
继承:
- 子类创建过程:
- 子类
__new__
,无则调用父类__new__
- 子类
__init__
,无则父类__init__
- 子类
- 父类使用了有参
__init__
,则子类创建时也要有参 - 子类无法访问父类的私有属性和方法
- 重写: 子类会覆盖父类中同名的方法
- 子类中调用父类方法
super().xxx
,父类.xxx
类.__mro__
可以查看类搜索方法时的先后顺序
- 子类中调用父类方法
- 子类创建过程:
单继承
class Animal: def eat(self): print('eating...') class Cat(Animal): pass
多继承(MixIn)
class Runnable: def run(self): print('runing...') class Cat(Animal,Runnable): pass
多态:自动调用实际类型的方法
Sample1:继承
class Animal:
def __init__(self,name):
self.name=name
print("Animal %s init." % name)
def eat(self):
print('Animal is eating...')
class Cat(Animal):
def eat(self):
print("Cat is eating...")
def play(self):
print("Cat is playing...")
def father(self):
print("call father eat func:")
super().eat()
print("call self eat func:")
self.eat()
>>> c=Cat("Tom")
Animal Tom init.
>>> c.eat()
Cat is eating...
>>> c.play()
Cat is playing...
>>> c.father()
call father eat func:
Animal is eating...
call self eat func:
Cat is eating...
>>> Cat.__mro__
(__main__.Cat, __main__.Animal, object)
>>> isinstance(c,Cat)
True
>>> isinstance(c,Animal)
True
Sample2:多态
class Animal:
def eat():
print('Animal is eating...')
class Cat(Animal):
def eat():
print("Cat is eating...")
class Dog(Animal):
def eat():
print("Dog is eating...")
def do_eat(obj):
obj.eat()
>>> do_eat(Animal())
Animal is eating...
>>> do_eat(Cat())
Cat is eating...
>>> do_eat(Dog())
Dog is eating...
高阶:元类metaclass
- 类拥有创建实例对象的能力,而类本身也是对象,可动态地创建(Python是动态语言,函数和类,不是编译时定义的,而是运行时动态创建的)
- 元类即用来创建类的“东西”,实现方式:
- 直接使用
type
元类 - 自定义一个元类:创建一个函数或类,使用
class
关键字或type(...)
创建类
- 直接使用
type(object_or_name, bases, dict)
: Python内建的可以用来创建所有类的元类- type(object) -> the object's type 查看某个对象的类型
- type(name, bases, dict) -> a new type 创建一个新的类型
- name: 类名,str
- bases: 父类名,tuple
- dict: 类属性,成员方法,dict
metaclass
/__metaclass__
: 类定义时指定元类# metaclass元类查找顺序:Foo -> Bar -> Module -> type # 1. python2写法: class Foo(Bar): __metaclass__=xxx pass # 2. python3写法: class Foo(Bar,metaclass=xxx): pass
metaclass=函数/类
,参数同type
:name, bases, dict- 应用:创建类时动态修改类定义(元类:拦截类的创建 -> 修改类 -> 返回修改之后的类)
- 例如:ORM框架,一个类对应一个表,类/实例对象操作对应数据库表/记录操作
- Python会使用当前类中
metaclass
指定的元类创造类对象,没有则找父类的,父类没有则去模块层次中找,还找不到则用内置的type
来创建这个类对象
Sample1: 使用元类type
创建类
def eat(self):
print('Cat is eating...')
>>> cls=type('Cat',(),{'name':'Cat','age':1,'eat':eat})
>>> cls.name,cls.age
('Cat',1)
>>> c=cls()
>>> c.name='Tom'
>>> cls.name,c.name
('Cat', 'Tom')
>>> c.eat()
Cat is eating...
Sample2: 自定义一个函数作为元类来创建类
# 自定义一个函数,使用`class`关键字创建类(也可使用`type`创建类)
def choose_animal(name):
if name=='Cat':
class Cat:
def eat(self):
print("Cat is eating...")
return Cat
else:
class Animal:
def eat(self):
print('Animal is eating...')
return Animal
>>> cls=choose_animal('Cat') # __main__.choose_animal.<locals>.Cat
>>> c=cls() # <__main__.choose_animal.<locals>.Cat at 0x1050e7ac8>
>>> c.eat()
Cat is eating...
Sample3:通过__metaclass__
属性指定元类,动态改变类定义
# 1. 自定义一个元类
# 使用函数
def animalMetaCls(clsName,bases,attrs):
print('call animalMetaCls',clsName)
if clsName=='Cat':
def eat(self):
print('Cat is eating...')
attrs['eat']=eat
attrs['name']='Cat'
return type(clsName,bases,attrs)
# 或使用类
class animalMetaCls:
def __new__(cls,clsName,bases,attrs):
print('call animalMetaCls',clsName)
if clsName=='Cat':
def eat(self):
print('Cat is eating...')
attrs['eat']=eat
attrs['name']='Cat'
return type(clsName,bases,attrs)
# 2. 通过metaclass指定元类
>>> class Cat(metaclass=animalMetaCls):
... pass
...
call animalMetaCls Cat # 创建类时,就会触发metaclass
# 3. verify:
>>> c=Cat()
>>> c.eat()
Cat is eating...
>>> Cat.name
'Cat'
高阶:枚举类Enum
Refer Doc
- 可以把一组相关常量定义在一个class中,class不可变,成员可以等值比较
Enum(value, names=None, *, module=None, qualname=None, type=None, start=1)
- value: What the new Enum class will record as its name.
- names: The Enum members. This can be a whitespace or comma separated string (values will start at 1 unless otherwise specified)
'RED GREEN BLUE' | 'RED,GREEN,BLUE' | 'RED, GREEN, BLUE'
['RED', 'GREEN', 'BLUE']
[('CYAN', 4), ('MAGENTA', 5), ('YELLOW', 6)]
{'CHARTREUSE': 7, 'SEA_GREEN': 11, 'ROSEMARY': 42}
- module: name of module where new Enum class can be found.
- qualname: where in module new Enum class can be found.
- type: type to mix in to new Enum class
- start: number to start counting at if only names are passed i
- 注:Enum的value属性是自动赋给成员的int常量,默认从1开始计数
Sample:
from enum import Enum,auto
# 1.创建枚举类对象
# 法一:直接使用`Enum(变量统称名,(变量1,变量2....))`
Month = Enum('Month', ('Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'))
# 法二:继承自Enum类
@unique # optional: @unique装饰器--检查保证没有重复值
class Month(Enum):
Jan=1
Feb=auto()
Mar=auto()
Apr=auto()
May=auto()
Jun=auto()
Jul=auto()
Aug=auto()
Sep=auto()
Oct=auto()
Nov=auto()
Dec=auto()
# 2. test:
>>> Month
<enum 'Month'>
>>> type(Month)
enum.EnumMeta
>>> Month.Feb # 枚举
<Month.Feb: 2>
>>> type(Month.Feb)
<enum 'Month'>
>>> Month.Feb.name # 获取枚举名
'Feb'
>>> Month.Feb.value # 获取枚举名
2
>>> Month['Feb'] # 获取枚举名为'Feb'的枚举,返回枚举类型
<Month.Feb: 2>
>>> Month(2) # 获取枚举值为2的枚举,返回枚举类型
<Month.Feb: 2>
>>> Month.Feb==Month.Feb # 枚举等值比较(注:无法直接进行大小比较)
True
>>> Month.Feb!=Month.Jan
True
>>> Month.Feb is Month.Jan
False
>>> for item in Month: # 遍历枚举,列出所有枚举成员
... print(item)
Month.Jan
Month.Feb
Month.Mar
Month.Apr
Month.May
Month.Jun
Month.Jul
Month.Aug
Month.Sep
Month.Oct
Month.Nov
Month.Dec
>>> list(Month)
[<Month.Jan: 1>,
<Month.Feb: 2>,
<Month.Mar: 3>,
<Month.Apr: 4>,
<Month.May: 5>,
<Month.Jun: 6>,
<Month.Jul: 7>,
<Month.Aug: 8>,
<Month.Sep: 9>,
<Month.Oct: 10>,
<Month.Nov: 11>,
<Month.Dec: 12>]
>>> Month.__members__ # __members__
mappingproxy({'Jan': <Month.Jan: 1>,
'Feb': <Month.Feb: 2>,
'Mar': <Month.Mar: 3>,
'Apr': <Month.Apr: 4>,
'May': <Month.May: 5>,
'Jun': <Month.Jun: 6>,
'Jul': <Month.Jul: 7>,
'Aug': <Month.Aug: 8>,
'Sep': <Month.Sep: 9>,
'Oct': <Month.Oct: 10>,
'Nov': <Month.Nov: 11>,
'Dec': <Month.Dec: 12>})
>>> Month.__members__.keys()
Out[35]: odict_keys(['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'])
>>> Month.__members__.values()
odict_values([<Month.Jan: 1>, <Month.Feb: 2>, <Month.Mar: 3>, <Month.Apr: 4>, <Month.May: 5>, <Month.Jun: 6>, <Month.Jul: 7>, <Month.Aug: 8>, <Month.Sep: 9>, <Month.Oct: 10>, <Month.Nov: 11>, <Month.Dec: 12>])
>>> Month.__members__.items()
Out[5]: odict_items([('Jan', <Month.Jan: 1>), ('Feb', <Month.Feb: 2>), ('Mar', <Month.Mar: 3>), ('Apr', <Month.Apr: 4>), ('May', <Month.May: 5>), ('Jun', <Month.Jun: 6>), ('Jul', <Month.Jul: 7>), ('Aug', <Month.Aug: 8>), ('Sep', <Month.Sep: 9>), ('Oct', <Month.Oct: 10>), ('Nov', <Month.Nov: 11>), ('Dec', <Month.Dec: 12>)])
>>> for name, member in Month.__members__.items():
... print(name, '=>', member, ',', member.value)
...
Jan => Month.Jan , 1
Feb => Month.Feb , 2
Mar => Month.Mar , 3
Apr => Month.Apr , 4
May => Month.May , 5
高阶:单例模式
Singleton 确保某一个类只有一个实例
class Cat:
__instance=None
__first_init = True # optional:用来保证只初始化一次
def __new__(cls,name,age):
if not cls.__instance:
cls.__instance=object.__new__(cls)
return cls.__instance
def __init__(self,name,age):
if self.__first_init:
print('init')
self.name=name
self.age=age
Cat.__first_init=False
>>> c1=Cat('Tom',2)
init
>>> c2=Cat('Andy',3)
>>> c1==c2
True
>>> id(c1),id(c2)
(4559954776, 4559954776)
>>> c1.name,c2.name
('Tom', 'Tom')
实际Enum
就是个单例:
class Cat(Enum):
Instance=1
>>> c1=Cat.Instance
>>> c2=Cat.Instance
>>> c1==c2
True
>>> id(c1),id(c2)
(4560045560, 4560045560)
>>> c1.name,c2.name
('Instance', 'Instance')
>>> c2.age=3
>>> c1.age,c2.age
(3,3)
高阶:内置类属性
可通过dir(obj)
查看某对象的所有方法和属性,eg:
class Cat:
def eat(self):
print('eating...')
>>> dir(Cat)
['__class__',
'__delattr__',
'__dict__',
'__dir__',
'__doc__',
'__eq__',
'__format__',
'__ge__',
'__getattribute__',
'__gt__',
'__hash__',
'__init__',
'__init_subclass__',
'__le__',
'__lt__',
'__module__',
'__ne__',
'__new__',
'__reduce__',
'__reduce_ex__',
'__repr__',
'__setattr__',
'__sizeof__',
'__str__',
'__subclasshook__',
'__weakref__']
一些常用的内置类属性和方法说明:
- 方法:
__new__
创建实例时触发调用__init__
创建实例后触发调用__str__
返回用户看到的字符串,若没实现,使用repr结果,print时调用__repr__
返回程序开发者看到的字符串(为调试服务的),直接输出时调用__getattribute__
属性访问拦截器,触发访问实例属性时
- 属性:
__class__
实例的类,触发方式:实例.__class__
__doc__
类文档,子类不继承,触发方式:help(类或实例)
__module__
类定义所在的模块__dict__
对象的属性字典
class Cat:
age=1
def __new__(cls,name): # 要有一个参数cls,代表要实例化的类,此参数在实例化时由Python解释器自动提供
print('new Cat %s' % name)
return object.__new__(cls) # 返回实例化出来的实例
def __init__(self,name): # 参数self,就是__new__返回的实例
self.name=name
print('%s init.' % name)
def __del__(self):
print('%s del.' % self.name)
def __str__(self):
return "Cat [name=%s]" % self.name
def __getattribute__(self,attr): # 注意:不要这个方法中调用self.xxxx,会进入死循环
print('get attr: %s' % attr)
return object.__getattribute__(self,attr)
>>> c=Cat('Tom') # 触发调用 __new__ => __init__
new Cat Tom
Tom init.
>>> c # 触发调用__repr__
<__main__.Cat at 0x104b83208>
>>> print(c) # 触发调用__str__
Cat [name=Tom]
>>> c.name # 触发__getattribute__
get attr: name
'tom'
>>> c.__class__
__main__.Cat
>>> c.__dict__
{'name': 'Tom'}
>>> del c # 触发调用__del__
Tom del.
高阶:定制类
定制类:通过在类中定义python内建的特殊方法__xxx__
,可为类实现一些特殊的操作
一些常见的python内建的特殊方法__xxx__
:
- 基础方法:
__new__(self[,arg1,…])
,__init__(self[,arg1,…])
,__del__(self)
__str__(self)
可打印的字符串输出;内建str()
及print()
函数__repr__(self)
运行时的字符串输出;内建repr()
函数及' '
操作符__len__(self)
长度;内建len()
__nonezero__(self)
为实例定义 False 值;内建bool()
函数__call__(self,*args)
用于可调用的实例;可以用来替代闭包的实现
- 值比较:
__cmp__(self,obj)
对象比较;内建cmp()
__lt__(self,obj)
,__le__(self,obj)
小于 & 小于等于;内建<
&<=
__gt__(self,obj)
,__ge__(self,obj)
大于 & 大于等于;内建>
&>=
__eq__(self,obj)
,__ne__(self,obj)
等于 & 不等于;内建=
&!=
- 类的属性:
__getattr/setattr/delattr__(self,attr)
获取/设置/删除属性;注:__getattr__
内建getattr()
,仅在属性没有找到时调用__getattribute__(self,attr)
获取属性;内建getattr()
;总是被调用__get/set/delete__(self,attr)
(描述符)获取/设置/删除属性
- 数值类型:
__complex__(self, com)
内建complex()
__int__(self)
内建int()
__float__(self)
内建float()
__index__(self)
在有必要时,压缩可选的数值类型为整型(比如用于切片索引时等)__neg/pos/abs/invert__(self)
一元负/一元正/绝对值/按位求反(内建 ~ 操作符)__add/sub/mul/dev/mod/pow__(self,obj)
:+,-,*,/,%,**
__lshift/rshift/and/or/xor__(self,obj)
:<<,>>,&,|,^
- 序列类型:
__len__(self)
序列中的项目数__getitem__(self, ind)
,__setitem__(self, ind,val)
,__delitem__(self, ind)
获取/设置/删除元素__getslice__(self, ind1,ind2)
,__setslice__(self, i1, i2,val)
,__delslice__(self, ind1,ind2)
获取/设置/删除切片元素__contains__(self, val)
含有成员;内建 in 关键字__*add__(self,obj)
串联;+
操作符__*mul__(self,obj)
重复;*
操作符__iter__(self)
生成迭代器;内建iter()
函数
- 映射类型:
__len__(self)
类中的项目数__hash__(self)
散列(hash)函数值__getitem__(self,key)
,__setitem__(self,key,val)
,__delitem__(self,key)
获取/设置/删除某个值__missing__(self,key)
给定键若不存在,则返回一个默认值
Sample1:使用__call__
实现:为可调用对象
- 使用了
__call__
后,对实例进行直接调用就好比对一个函数进行调用一样, 除了用instance.method(...)
调用实例方法,可以直接对实例进行调用instance(...)
- 能被调用的对象就是一个
Callable
对象通过callable()
函数判断一个对象是否是“可调用”对象
class Cat:
def __init__(self,name):
self.name='Cat'
def __call__(self):
print("I am %s." % self.name)
def eat(self):
print('%s is eating...' % self.name)
>>> c=Cat('Tom')
>>> c() # 可直接对实例进行调用
I am Cat.
>>> callable(c)
True
>>> c.eat()
Tom is eating...
Sample2:使用__getattr__
实现:动态返回一个属性值
__getattr__
特性:
- 获取属性,只有在没有找到属性的情况下,才调用
- 实现若不写返回值则表示返回None,不会再抛出AttributeError
class Cat:
def __init__(self,name):
self.name='Cat'
def __getattr__(self,attr):
if attr=='age':
return 0
elif attr=='color':
return lambda:'Red'
#raise AttributeError('This object has no attribute \'%s\'' % attr)
return 'N/A'
>>> c=Cat('Tom')
>>> c.age
0
>>> c.color()
'Red'
>>> c.sex
'N/A'
>>> c.country
'N/A'
应用:链式调用组建path
class PathChain:
def __init__(self,path=''):
self.path=path
def __getattr__(self,attr):
return PathChain('%s/%s' % (self.path,attr))
def __str__(self):
return self.path
__repr__=__str__
def __call__(self,path):
return PathChain('%s/%s' % (self.path,path))
>>> PathChain().users.roles
/users/roles
>>> PathChain().users('Tom').roles
/users/Tom/roles
Sample3:使用__iter__
&__next__
实现迭代
__iter__
返回一个迭代对象,for循环不断调用该迭代对象的__next__
方法拿到循环的下一个值,直到遇到StopIteration
错误时退出循环
# f(n)=f(n-2)+f(n-1),f(0)=0,f(1)=1
# a=f(n-2),b=f(n-1)
# n: 0,1,2,3,4,5,6,7 ,8
# v: 0,1,1,2,3,5,8,13,21
class Fib:
def __init__(self,n): # n>=1
self.a,self.b=0,1
self.i,self.n=0,n
def __iter__(self):
return self
def __next__(self):
if self.i==0:
self.i+=1
return self.a
self.a,self.b=self.b,self.a+self.b
self.i+=1
if self.i > self.n:
raise StopIteration()
return self.a
>>> for i in Fib(9):
... print(i)
0
1
1
2
3
5
8
13
21
>>> list(Fib(9))
[0, 1, 1, 2, 3, 5, 8, 13, 21]
>>> f=Fib(5)
>>> next(f)
0
>>> next(f)
1
>>> next(f)
1
>>> next(f)
2
>>> next(f)
3
>>> next(f)
StopIteration Traceback (most recent call last)
<ipython-input-34-aff1dd02a623> in <module>
----> 1 next(f)
...
Sample4:使用__getitem__
实现按照下标/切片取出元素
class Fib(object):
def __getitem__(self, n):
if isinstance(n, int): # n是索引
a, b = 0, 1
for x in range(n):
a, b = b, a + b
return a
if isinstance(n, slice): # n是切片
a, b = 0, 1
start,stop = n.start or 0, n.stop
result = []
for x in range(stop):
if x>=start:
result.append(a)
a, b = b, a + b
return result
>>> f=Fib()
>>> f[0]
>>> f[8]
21
>>> f[0:9] # [start,end)
[0, 1, 1, 2, 3, 5, 8, 13,21]
>>> f[3:8]
[2, 3, 5, 8, 13]
模块与包
导入
import ... [as ...]
import math,time math.sqrt(2) time.sleep(3) import math as mt,time as tt mt.sqrt(2) tt.sleep(3)
from ... import ...
from math import sqrt,pow sqrt(2) pow(2,3) from math import * # 注:无法使用as重命名 sqrt(2) pow(2,3) from math import sqrt as st,pow as pw st(2) pw(2,3)
模块定位顺序(存储在
system
模块的sys.path
变量中):>>> import sys >>> sys.path ['', '/anaconda3/lib/python37.zip', '/anaconda3/lib/python3.7', '/anaconda3/lib/python3.7/lib-dynload', '/anaconda3/lib/python3.7/site-packages', '/anaconda3/lib/python3.7/site-packages/aeosa']
- 从上面列出的目录里依次查找要导入的模块文件
- 搜索顺序:
- 当前目录(上面
''
即表示当前目录) - shell变量
PYTHONPATH
下的每个目录 - python安装路径
- 当前目录(上面
- 添加搜索目录:
- 法一:直接修改
sys.path
,动态添加,运行结束后就失效. eg:sys.path.insert(0, xxx)
可以确保先搜索xxx这个路径 - 法二:设置环境变量
PYTHONPATH
- 法一:直接修改
一个程序进程下,模块被导入后,
import...
/from...import...
并不能重新导入模块,重新导入需先import importlib
,importlib.reload(moduleName)
# 1. initial: $ mkdir First $ vi First/hello.py def say(): print('say hello') # 2. run: $ python3 >>> import First.hello >>> First.hello.say() say hello # 3. update module function: $ vi First/hello.py def say(): print('say hello again') # 4. back to previous process,re-import and test: >>> import First.hello >>> First.hello.say() say hello # no change! # 5. use importlib reload the module >>> import importlib >>> importlib.reload(First.hello) >>> First.hello.say() say hello again # changed!
自定义库
库:包/模块 => 根目录:文件夹/文件 (命名空间隔离的执行者)
- 包:文件夹,树状结构,可放入多个层级包/模块
- 模块:文件,一个python文件本身就可以作为一个Module导入,此时模块名=文件名
包初始化文件:
__init__.py
=> 代表可执行包(导入包时会执行它对应的__init__.py
文件)- 暴露当前包对外的API,以控制这个包的导入行为
- 同module一样,导入时,此文件里的语句就会被执行:
- module => module.py(like: module/init.py)
- package => package/init.py
- 无此文件时,无法配置包的特殊变量
__all__
,则import *
时无内容开放 - 注:只能控制此文件中的变量 & 当前目录下的模块或包的开放(不包括子目录下内容的控制)
特殊变量:
__name__
- 直接运行此
.py
时,此属性值='__main__'
- 以模块形式被导入时,此属性值=
'文件名'
- 可根据此变量的结果判断是直接执行的python脚本还是被引入执行的,从而能够有选择性的执行测试代码
- 直接运行此
- 私有化变量:
_xxx
,__xxx
- 只对
import *
方式的导入有制约效果,对import xxx
的导入无效
- 只对
__all__
- 可设置
__all__=['..','..',...]
,列出可对外公开的内容(即外面可导入使用的内容)- 在
package/__init__.py
中,控制此package(一级)目录下开放的模块 & 子包 & package包本身init文件中定义的变量,即控制package/*
内容的开放 - 在
module.py
中,控制此module下开放的变量(此module.py可看做是module的__init__.py
),即控制module/*
内容的开放
- 在
- 只对
import *
方式的导入有用,对import xxx
的导入无效 - 配置的优先级高于私有化变量,即配置后,私有化变量
_xxx
,__xxx
的制约效果无效了,只受__all__
约束
- 可设置
导入:
- 导入时使用
.
表路径层级,eg:p1/p2/hello.py
- 导入包:
import p1
,import p1.p2
- 导入模块:
import p1.p2.hello
,from p1.p2 import hello
- 导入模块中的变量:
from p1.p2.hello import v1,v2,...
- 导入包下一级开放内容:
from p1 import *
- 导入模块中开放内容:
from p1.p2.hello import *
- 可混合导入包和模块
- 注:不可以
from p1 import p2.hello
- 导入包:
- 导入时就会依次执行对应的
.py
文件import path
:- step: 沿路径搜索,并沿途执行初始化(即执行
path
经过的包__init__.py
& 模块module.py
),导入所有变量 - 止于此
path
,会执行path/__init__.py
orpath.py
,__all__
配置不会起作用(因为并未导入path/*.py
)
- step: 沿路径搜索,并沿途执行初始化(即执行
from path import *
:- step1:
from path
=> 沿路径搜索,并沿途执行初始化(即执行path
经过的包__init__.py
& 模块module.py
),但不会导入这些变量 - step2:
import *
=> 执行path/*
下直属py文件并导入变量path/__init__.py
先执行,会根据其中定义的__all__
控制可导入变量;- 若path代表的是module,则
module.py
即相当于它的__init__.py
- 止于
path/*
,会执行path/*.py
,根据path/__init__.py
中配置的__all__
导入变量
- step1:
- 总结:
import
部分导入变量,from
部分不导入变量;path
:代表到此文件(包或模块),*
:代表path/*.py
(受path/__init__.py
中__all__
限制)
- eg:
import path
- 导入包:
import First.sub
=> import first initial vars + sub initial vars - 导入包:
from First import sub
=> import sub initial vars - 导入模块:
from First import hello
=> import hello initial vars - 导入模块:
import First.hello
=> import first initial vars + hello initial vars - 导入变量:
from First.hello import A,_B,__C,Cat
=> import A,_B,__C,Cat
- 导入包:
- eg:
from path import *
- 导入包:
from First import *
=> depends on the__all__
in first initial py:['hello','MCount','sub']
=> first opened initial vars:MCount
+ hello initial vars:hello.*
+ sub initial varssub.*
- 导入包:
from First.sub import *
=> depends on the__all__
in first initial py:['aa']
=> aa initial vars - 导入变量:
from First.hello import *
=> depends on the__all__
inhello.py
:['A','__C','Cat']
=> hello opened initial vars:A
,__C
,Cat
- 注:无法使用
from First import sub.aa
,from First import sub.*
- 导入包:
- 导入时使用
Sample:
First
├── __init__.py # __all__=['hello','MCount','sub']
│ # MCount=2,MNum=3
│
├── hello.py # __all__=['A','__C','Cat']
│ # A=[1,2,3],_B=['a','b','c'],__C=('Hello','Tom')
│ # def say():..., class Cat: ...
├── world.py
│
├── sub
│ ├── __init__.py # __all__=['aa'],sub_var=1
│ ├── aa.py # colorA='Red'
│ └── bb.py # colorB='Green'
单独测试某个模块的功能(使用
__name__
区分状态)$ cat First/hello.py print('hello.py init') __all__=['A','__C','Cat'] A=[1,2,3] _B=['a','b','c'] __C=('Hello','Tom') def say(): print('say hello again') class Cat: def eat(self): print('Cat is eating...') if __name__ == '__main__': # do test: say() c=Cat() c.eat() # 单独测试验证hello.py功能: $ python3 First/hello.py hello.py init A=[1, 2, 3],_B=['a', 'b', 'c'],__C=('Hello', 'Tom') say hello again Cat is eating...
使用
import xxx
=> 不受xxx
的__all__
控制- xxx最终为包
import First
,import First.sub
>>> import First # 会执行First的__init__.py,导入其中所有变量 First package init. >>> dir() ['First', '__annotations__', '__builtins__', '__doc__', '__loader__', '__name__', '__package__', '__spec__'] >>> dir(First) ['MCount', 'MNum', '__all__', '__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__path__', '__spec__']
>>> import First.sub # 会执行First的__init__.py & sub的__init__.py,导入其中所有变量 First package init. sub package init. >>> dir() ['First', '__annotations__', '__builtins__', '__doc__', '__loader__', '__name__', '__package__', '__spec__'] >>> dir(First) ['MCount', 'MNum', '__all__', '__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__path__', '__spec__', 'sub'] >>> dir(First.sub) ['__all__', '__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__path__', '__spec__', 'sub_var']
- xxx最终为模块
import First.hello
,from First import hello
>>> import First.hello # 会执行 First的__init__.py & hello.py,导入其中所有变量 First package init. hello.py init >>> dir() ['First', '__annotations__', '__builtins__', '__doc__', '__loader__', '__name__', '__package__', '__spec__'] >>> dir(First) ['MCount', 'MNum', '__all__', '__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__path__', '__spec__', 'hello'] >>> dir(First.hello) ['A', 'Cat', '_B', '__C', '__all__', '__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__spec__', 'say']
>>> from First import hello # 执行hello.py,导入其中所有变量 First package init. hello.py init >>> dir() ['__annotations__', '__builtins__', '__doc__', '__loader__', '__name__', '__package__', '__spec__', 'hello'] >>> dir(hello) ['A', 'Cat', '_B', '__C', '__all__', '__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__spec__', 'say']
- xxx最终为变量
from First.hello import A,_B,__C,say,Cat
>>> from First.hello import A,_B,__C,say,Cat First package init. hello.py init >>> dir() ['A', 'Cat', '_B', '__C', '__annotations__', '__builtins__', '__doc__', '__loader__', '__name__', '__package__', '__spec__', 'say']
- xxx最终为包
使用
from xxx import *
=> 受xxx/__init__.py
中__all__
的控制- xxx为包
from First import *
,from First.sub import *
>>> from First import * # = from First import hello,sub First package init. hello.py init sub package init. >>> dir() # 受First/__init__.py 中`__all__`控制 ['MCount', '__annotations__', '__builtins__', '__doc__', '__loader__', '__name__', '__package__', '__spec__', 'hello', 'sub'] >>> dir(hello) # 不受hello.py 中`__all__`控制 ['A', 'Cat', '_B', '__C', '__all__', '__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__spec__', 'say'] >>> dir(sub) # 不受sub/__init__.py 中`__all__`控制 ['__all__', '__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__path__', '__spec__', 'sub_var']
>>> from First.sub import * First package init. sub package init. sub/aa.py init. >>> dir() # 受sub/__init__.py 中`__all__`控制 ['__annotations__', '__builtins__', '__doc__', '__loader__', '__name__', '__package__', '__spec__', 'aa']
- xxx为模块
from First.hello import *
>>> from First.hello import * # = from First.hello import A,Cat,__C First package init. hello.py init >>> dir() # 受hello.py 中`__all__`控制 ['A', 'Cat', '__C', '__annotations__', '__builtins__', '__doc__', '__loader__', '__name__', '__package__', '__spec__']
- xxx为包
发布/安装包
- 发布:
- 编辑
setup.py
文件 - 构建模块
python setup.py build
- 生成发布压缩包
python setup.py sdist
- 编辑
- 安装:
- 解压获得的压缩包
- 进入文件夹,执行
python setup.py install
(可加参数指定安装路径:--prefix=安装路径
)
- 使用:
from ... import ...
,import package.module
导入
Sample:
Second
├── setup.py
├── subA
│ ├── __init__.py
│ ├── aa.py
│ └── bb.py
└── subB
├── __init__.py
├── cc.py
└── dd.py
发布:
# 1. setup.py:
$ cd Second
$ vi setup.py
from distutils.core import setup
setup(name="SecondPkg",
version="1.0",
description="Second module",
author="dongGe",
py_modules=['subA.aa', 'subA.bb', 'subB.cc','subB.dd'])
# py_modules 需指明所需包含的py文件
# 2. build: 构建 (发布的包下一定要有`__init__.py`文件,不然会build失败)
$ python setup.py build
running build
running build_py
copying subA/__init__.py -> build/lib/subA
copying subB/__init__.py -> build/lib/subB
# 3. 查看(实际就是copy了一份到`build/lib`下)
$ tree
.
├── build
│ └── lib
│ ├── subA
│ │ ├── __init__.py
│ │ ├── aa.py
│ │ └── bb.py
│ └── subB
│ ├── __init__.py
│ ├── cc.py
│ └── dd.py
├── setup.py
├── subA
│ ├── __init__.py
│ ├── aa.py
│ └── bb.py
└── subB
├── __init__.py
├── cc.py
└── dd.py
# 4. sdist: 生成发布压缩包 (产生`MANIFEST`文件,`dist/SecondPkg-1.0.tar.gz`压缩包)
$ python setup.py sdist
$ tree
.
├── MANIFEST
├── build
│ └── lib
│ ├── subA
│ │ ├── __init__.py
│ │ ├── aa.py
│ │ └── bb.py
│ └── subB
│ ├── __init__.py
│ ├── cc.py
│ └── dd.py
├── dist
│ └── SecondPkg-1.0.tar.gz
├── setup.py
├── subA
│ ├── __init__.py
│ ├── aa.py
│ └── bb.py
└── subB
├── __init__.py
├── cc.py
└── dd.py
安装:
# 1. 解压
$ tar -xvf SecondPkg-1.0.tar.gz
# 2. 进入文件夹
$ cd SecondPkg-1.0
$ tree
.
├── PKG-INFO
├── setup.py
├── subA
│ ├── __init__.py
│ ├── aa.py
│ └── bb.py
└── subB
├── __init__.py
├── cc.py
└── dd.py
# 3. 安装
$ python setup.py install
...
creating /anaconda3/lib/python3.7/site-packages/subA
...
creating /anaconda3/lib/python3.7/site-packages/subB
...
running install_egg_info
Writing /anaconda3/lib/python3.7/site-packages/SecondPkg-1.0-py3.7.egg-info
# 4. 查看安装的包
$ pip list | grep Second
SecondPkg 1.0
# 5. 使用
$ cd ~/space
$ python3
# 导入包:
>>> from subB import * # 注意:不要使用 `import subA,subB` -- 不会导入包下的任何模块
subB init.
This subB/cc.py
This subB/dd.py
>>> dir()
['__annotations__', '__builtins__', '__doc__', '__loader__', '__name__', '__package__', '__spec__', 'cc', 'dd']
# 导入模块:
>>> from subA.aa import * # 或 `from subA import aa` 或 `import subA.aa`
subA init.
This subA/aa.py
常用标准库
Python的一个组成部分,随着Python解释器,一起安装在电脑中的
标准库 | 说明 |
---|---|
builtins | 内建函数默认加载 |
os | 操作系统接口 |
sys | Python自身的运行环境 |
functools | 常用的工具 |
json | 编码和解码JSON对象 |
logging | 记录日志,调试 |
multiprocessing | 多进程 |
threading | 多线程 |
copy | 拷贝 |
time | 时间 |
datetime | 日期和时间 |
calendar | 日历 |
hashlib | 加密算法 |
random | 生成随机数 |
re | 字符串正则匹配 |
socket | 标准的 BSD Sockets API |
shutil | 文件和目录管理 |
glob | 基于文件通配符搜索 |
collections | 一个集合模块,提供了许多有用的集合类 |
Sample:datetime
>>> from datetime import datetime
>>> datetime.now() # 1. 获取当前datetime
datetime.datetime(2019, 3, 17, 11, 58, 22, 874984)
>>> type(datetime.now())
<class 'datetime.datetime'>
>>> dt=datetime(2018, 5, 19, 13, 10) # 2. 指定某个日期和时间的datetime
>>> print(dt)
2018-05-19 13:10:00
>>> ts=dt.timestamp() # 3.1: datetime -> timestamp
>>> ts
1526706600.0
>>> datetime.fromtimestamp(ts) # 3.2: timestamp -> datetime 本地时间
datetime.datetime(2018, 5, 19, 13, 10)
>>> datetime.utcfromtimestamp(ts) # 3.2: timestamp -> datetime UTC时间
datetime.datetime(2018, 5, 19, 5, 10)
>>> dt.strftime('%Y-%m-%d %H:%M:%S') # 4.1: datetime -> str
'2018-05-19 13:10:00'
>>> str(dt)
'2018-05-19 13:10:00'
>>> datetime.strptime('2018-05-19 13:10:00','%Y-%m-%d %H:%M:%S') # 4.2: str -> datetime
datetime.datetime(2018, 5, 19, 13, 10)
>>> from datetime import timedelta # 5. datetime +/-
>>> dt+timedelta(hours=10)
datetime.datetime(2018, 5, 19, 23, 10)
>>> dt-timedelta(days=2, hours=12)
datetime.datetime(2018, 5, 17, 1, 10)
>>> from datetime import timezone # 6. 时区转换
>>> dt_utc=dt.replace(tzinfo=timezone.utc) # 6.1 强制设置为utc时区,作为基准时间
>>> dt_utc
datetime.datetime(2018, 5, 19, 13, 10, tzinfo=datetime.timezone.utc)
>>> print(dt_utc)
2018-05-19 13:10:00+00:00
>>> dt_utc.astimezone(timezone(timedelta(hours=8))) # 转换北京时区(UTC+8)
datetime.datetime(2018, 5, 19, 21, 10, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800)))
>>> dt_utc.astimezone(timezone(timedelta(hours=9))) # 转换东京时区(UTC+9)
datetime.datetime(2018, 5, 19, 22, 10, tzinfo=datetime.timezone(datetime.timedelta(seconds=32400)))
>>> dt_bj=dt_utc.astimezone(timezone(timedelta(hours=8)))
>>> dt_bj.astimezone(timezone(timedelta(hours=9))) # 北京时间转换为东京时间(UTC+9)
datetime.datetime(2018, 5, 19, 22, 10, tzinfo=datetime.timezone(datetime.timedelta(seconds=32400)))
datetime
表示的时间需要时区信息才能确定一个特定的时间,默认视为本地时区时间- 时区转换: 确定一个
datetime
的时区(可强制设置时区,作为基准时间),利用带时区的datetime
,通过astimezone()
方法,可以转换到任意时区 - 存储
datetime
,最佳方法是将其转换为timestamp
再存储,因为timestamp
的值与时区完全无关
Sample:Collections
- namedtuple : tuple子类,可用属性而不是索引来引用tuple的元素
- deque : 双向列表(list是线性存储,数据量大时,插入和删除效率低),注:不是list子类
- defaultdict : dict子类,key不存在时,不会报错而是返回一个默认值
- OrderedDict : dict子类,Key会按照插入的顺序排列 (应用:可用来实现一个FIFO(先进先出)的dict,当容量超出限制时,先删除最早添加的Key)
- ChainMap : 把一组dict串起来并组成一个逻辑上的dict,查找时按顺序在内部的dict中依次查找(应用:可用来实现参数的优先级查找),注:不是dict子类
- Counter: dict子类,可实现一个简单的计数器
# 1. namedtuple
>>> from collections import namedtuple
>>> Point = namedtuple('Point', ['x', 'y'])
>>> p = Point(1, 2)
>>> p
Point(x=1, y=2)
>>> p.x,p.y # 可用属性访问
(1, 2)
>>> isinstance(p, Point),isinstance(p, tuple)
(True, True)
# 2. deque
>>> from collections import deque
>>> q = deque(['a', 'b', 'c'])
>>> q.append('x') # append(),pop()
>>> q.appendleft('y') # appendleft(),popleft()
>>> q
deque(['y', 'a', 'b', 'c', 'x'])
>>> isinstance(q,list)
False
# 3. defaultdict
>>> from collections import defaultdict
>>> d = defaultdict(lambda: 'N/A')
>>> d['a'] = 123
>>> d['a']
123
>>> d['b'] # 访问存在key,返回设置的默认值,不会报错
'N/A'
>>> isinstance(d,dict)
True
# 4. ChainMap
>>> from collections import ChainMap
>>> m=ChainMap({'a':1,'b':2},{'b':3,'c':4})
>>> m['a'],m['b'],m['c']
(1, 2, 4)
>>> isinstance(m,dict)
False
# 5. Counter
>>> from collections import Counter
>>> c = Counter()
>>> for i in 'programming': # 统计字符出现的个数,效果同:c={},c[i]=c.get(i,0)+1
... c[i] += 1
...
>>> c
Counter({'r': 2, 'g': 2, 'm': 2, 'p': 1, 'o': 1, 'a': 1, 'i': 1, 'n': 1})
>>> isinstance(c,dict)
True
Sample:hashlib
>>> import hashlib
>>> m=hashlib.md5()
>>> m.update(b'Hello')
>>> m.hexdigest()
'8b1a9953c4611296a827abf8c47804d7'
常用扩展库
扩展库 | 说明 |
---|---|
requests | 使用的是urllib3,继承了urllib2的所有特性 |
urllib | 基于http的高层库 |
scrapy | 爬虫 |
beautifulsoup4 | HTML/XML的解析器 |
celery | 分布式任务调度模块 |
redis | 缓存 |
Pillow(PIL) | 图像处理 |
xlsxwriter | 仅写excle功能,支持xlsx |
xlwt | 仅写excle功能,支持xls ,2013或更早版office |
xlrd | 仅读excle功能 |
elasticsearch | 全文搜索引擎 |
pymysql | 数据库连接库 |
mongoengine/pymongo | mongodbpython接口 |
matplotlib | 画图 |
numpy/scipy | 科学计算 |
django/tornado/flask | web框架 |
xmltodict | xml转dict |
SimpleHTTPServer | 简单地HTTP Server,不使用Web框架 |
gevent | 基于协程的Python网络库 |
fabric | 系统管理 |
pandas | 数据处理库 |
scikit-learn | 机器学习库 |
Sample:SimpleHTTPServer
- 运行一个静态(目录)服务器
$ cd Second $ python -m http.server 8000 Serving HTTP on 0.0.0.0 port 8000 (http://0.0.0.0:8000/) ...
- visit
http://localhost:8000
就可以看到列出了当前目录的页面,可方便的查看文件$ curl http://localhost:8000 <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN" "http://www.w3.org/TR/html4/strict.dtd"> <html> <head> <meta http-equiv="Content-Type" content="text/html; charset=utf-8"> <title>Directory listing for /</title> </head> <body> <h1>Directory listing for /</h1> <hr> <ul> <li><a href="build/">build/</a></li> <li><a href="dist/">dist/</a></li> <li><a href="MANIFEST">MANIFEST</a></li> <li><a href="setup.py">setup.py</a></li> <li><a href="subA/">subA/</a></li> <li><a href="subB/">subB/</a></li> </ul> <hr> </body> </html>
文件IO
读写文件
- 打开文件:
f=open(filename,mode='r',encoding=None)
- 关闭文件:
f.close()
- 用
with
语句包裹打开文件,不管是否异常,自动关闭文件,更保险(或使用try...finally
) - 读:
content=f.read(128)
=> strcontent=f.readline()
=> strcontent=f.readlines()
=> list
- 写:
f.write('...')
=> int: lengthf.writelines(...)
f.flush()
缓冲区内容刷新到文件中
- 查看当前位置:
position = f.tell()
- 定位到某个位置:
f.seek(offset, from)
- offset: 偏移量
- from: 0 文件开头/ 1 当前位置 / 2 文件末尾
- eg:
f.seek(2, 1)
: 从头开始,偏移2个字节;f.seek(-3,2)
: 离文件末尾,3字节处
Sample:
# 写
f = open('test.txt', 'w') # 追加可用a
f.write('hello world, i am here!')
f.close()
# 读
f = open('test.txt', 'r')
content = f.read(128) # content = f.readline(),contentlst = f.readlines()
position = f.tell()
f.close()
# 使用with语句,会自动关闭
with open('test.txt', 'w') as f
f.write('Hello world')
操作文件/目录
import os
,import shutil
(可以看做是os模块的补充,有更多方便的操作命令)- 系统相关:
os.name
查看操作系统类型(posix:Linux、Unix或Mac OS X;nt:Windows)os.uname()
获取详细的系统信息(Windows上不提供)os.environ
获取环境变量(dict,os.environ.get('PATH'))
- 路径相关:
os.path
- 查看绝对路径
os.path.abspath(path)
- 合成路径
os.path.join(p1,p2,...)
- 拆分路径
os.path.split(path)
- 文件扩展名
os.path.splitext(path)
- 是否是文件
os.path.isfile(x)
- 是否是文件夹
os.path.isdir(x)
- 操作文件/文件夹: - 获取当前路径
os.getcwd()
- 改变当前路径
os.chdir(path)
- 列出文件/文件夹(
ls
)os.listdir(path)
- 创建文件夹
os.mkdir(dir)
- 删除空文件夹
os.rmdir(dir)
- 重命名/移动文件/文件夹(
mv
)os.rename(src,dist)
- 删除文件
os.remove(file)
- 拷贝文件:
shutil.copyfile(src,dist)
- 查看绝对路径
多任务
同步(Sync) & 异步(Async):消息通知机制
- 同步:主动等
- 异步:被动听
阻塞(Blocking) & 非阻塞(UnBlocking):等待消息通知时的调用者状态
- 阻塞:等待 (one thread do one task)
- 非阻塞:继续做其他事 (one thread do many tasks,高吞吐量)
并发(Concurrent) & 并行 (Parallel):多任务处理能力
- 并发:交替运行
- 并行:同时运行
进程(Process) & 线程(Thread):多任务处理手段
- 进程:独立地址空间,进程间独立运行(eg: 多核CPU,并行运行)
- 线程:进程内,多线程共享同一段地址空间,但有自己独立的堆栈和局部变量(eg: 单核CPU多线程,并发运行)
组合(举例 Task:烧水 & 看电视)
- 阻塞 & 同步
- 烧水 -> 等水开(主动看)-> 看电视 :效率低
- 阻塞 & 异步
- 烧水 -> 水壶响(被动听)-> 看电视:效率低
- 非阻塞 & 同步
- 烧水 & 看电视 & 时不时看下水(主动看): 来回切换,效率低
- 非阻塞 & 异步
- 烧水 & 看电视 & 水壶响了再去关(被动听):减少切换,效率高
- 阻塞 & 同步
进程 Process
os.fork()
: 只在Unix/Linux/Mac上运行,windows不可以$ cat fork-demo.py import os,time num=1 ret = os.fork() if ret==0: # sub-process do: num=num-1 # should be 0 print("sub-process:%s,pid:%s,ppid:%s num:%s" % (ret,os.getpid(),os.getppid(),num)) else: # main-process do: num=num+1 # should be 2 print("main-process:%s,pid:%s,ppid:%s num:%s" % (ret,os.getpid(),os.getppid(),num)) time.sleep(3) print('End') $ python fork-demo.py main-process:30929,pid:30928,ppid:6855 num:2 sub-process:0,pid:30929,ppid:30928 num:0 End End $ ps PID TTY TIME CMD 6855 ttys001 0:00.03 -bash 30928 ttys001 0:00.02 python fork-demo.py 30929 ttys001 0:00.00 python fork-demo.py 28023 ttys002 0:00.02 -bash
- 分出一个子进程,后面的代码会分别在原有进程和新创建进程运行
- 返回0: 代表当前是新创建的子进程
- 返回非0: 代表当前是原本的父进程(返回的数字=子进程pid)
- 即子进程永远返回0,父进程返回子进程的ID
- 每个进程中所有数据(包括全局变量)都各有拥有一份,互不影响
- 父子进程互补影响,父进程结束不会导致子进程结束
- 1次fork->2个进程,2次fork->4个进程,...
multiprocessing.Process
: 通用$ cat process-demo.py from multiprocessing import Process import os,time,random # 法一 def task(name): print("子进程(%s) task %s start" % (os.getpid(),name)) time.sleep(random.random()*2) print("子进程(%s) task %s end" % (os.getpid(),name)) # 法二 class MyProcess(Process): def __init__(self,name): Process.__init__(self) self.name=name def run(self): print("子进程(%s) task %s start" % (os.getpid(),self.name)) time.sleep(random.random()*2) print("子进程(%s) task %s end" % (os.getpid(),self.name)) if __name__=='__main__': print('父进程(%s) 开始' % os.getpid()) # p = Process(target=task, args=('test',)) # 创建子进程,法一:创建一个Process实例 p = MyProcess('test') # 创建子进程,法二:创建一个自定义的继承自Process类的实例 p.start() # 启动子进程 p.join() # 父进程阻塞等待子进程结束后再继续往下运行,非必需 print('子进程(%s) 结束' % p.name) print('父进程(%s) 结束' % os.getpid()) $ python process-demo.py 父进程(39487) 开始 子进程(39488) task test start 子进程(39488) task test end 子进程(test) 结束 父进程(39487) 结束
- 创建
- 法一:创建一个Process实例
- 法二:自定义一个类,继承
Process
类,重写run
方法
- 启动:
p.start()
内部调用run
方法 - 等待结束:
p.join([timeout])
- 终止:
p.terminate()
- 判断是否还在执行:
is_alive()
- 常用属性:
name
:当前进程实例别名,默认为Process-N,N为从1开始递增的整数pid
:当前进程实例的PID值
- 注:
- 父进程结束不影响子进程
- 子进程无法访问父进程创建的全局变量
- 变量可通过参数传递
- 创建
multiprocessing.Pool
: 进程池from multiprocessing import Pool import os,time,random def task(name): print("子进程(%s): task %s => start" % (os.getpid(),name)) t=random.random()*2 time.sleep(t) print("子进程(%s): task %s => end(sleep:%.2f)" % (os.getpid(),name,t)) return os.getpid(),name def task_callback(arg): # 父进程中执行,以元组方式获取task return值 print("父进程(%s): 获取子进程(%s) task %s => callback" % (os.getpid(),arg[0],arg[1])) if __name__=='__main__': print('父进程(%s): => 开始' % os.getpid()) p=Pool(3) for i in range(0,5): # p.apply(task,(i,)) # 阻塞方式发配任务(等一个任务完成后,再发布下一个) p.apply_async(task,(i,),callback=task_callback) # 非阻塞方式发配任务(一次性发布3个,等空出进程后,再发布下一) p.close() # 关闭进程池,是以不再接收新的请求 p.join() # 父进程阻塞等待所有子进程执行完成,必须放在close语句之后(不加join的话可能子进程还未完成就整个退出了) print('所有子进程结束') print('父进程(%s): => 结束' % os.getpid()) # 阻塞方式结果: $ python pool-demo.py 父进程(44350): => 开始 子进程(44351): task 0 => start 子进程(44351): task 0 => end(sleep:1.30) 子进程(44352): task 1 => start 子进程(44352): task 1 => end(sleep:0.27) 子进程(44353): task 2 => start 子进程(44353): task 2 => end(sleep:0.53) 子进程(44351): task 3 => start 子进程(44351): task 3 => end(sleep:1.68) 子进程(44352): task 4 => start 子进程(44352): task 4 => end(sleep:0.53) 所有子进程结束 父进程(44350): => 结束 # 非阻塞方式结果: $ python pool-demo.py 父进程(44306): => 开始 子进程(44307): task 0 => start 子进程(44308): task 1 => start 子进程(44309): task 2 => start 子进程(44309): task 2 => end(sleep:0.89) 子进程(44309): task 3 => start 父进程(44306): 获取子进程(44309) task 2 => callback 子进程(44307): task 0 => end(sleep:1.14) 子进程(44307): task 4 => start 父进程(44306): 获取子进程(44307) task 0 => callback 子进程(44308): task 1 => end(sleep:1.50) 父进程(44306): 获取子进程(44308) task 1 => callback 子进程(44309): task 3 => end(sleep:0.92) 父进程(44306): 获取子进程(44309) task 3 => callback 子进程(44307): task 4 => end(sleep:1.36) 父进程(44306): 获取子进程(44307) task 4 => callback 所有子进程结束 父进程(44306): => 结束
- 初始化Pool时,可以指定一个最大进程数
- 若进程数达到指定的最大值,直到池中有进程任务结束,下一个任务请求才会发布给空闲进程执行
apply(func, args=(), kwds={})
阻塞方式发布任务给一个进程,必须等待上一个进程任务结束才能执行下一个apply_async(func, args=(), kwds={}, callback=None, error_callback=None)
非阻塞方式,有空闲进程即可发布执行close()
关闭进程池,使其不再接受新的任务terminate()
不管任务是否完成,立即终止join()
主进程阻塞等待子进程们结束,必须在close
或terminate
之后使用- 注:
- 父进程结束会影响子进程
- 变量可通过参数传递
Queue
: 消息列队$ cat process-queue.py from multiprocessing import Process, Queue from multiprocessing import Manager,Pool import os, time, random def produce_task(q): while True: if not q.full(): t=random.random() q.put(t) print('produce:%.2f' % t) time.sleep(t) else: print('stop produce!') break def consume_task(q): while True: if not q.empty(): t=q.get() print('consume:%.2f' % t) time.sleep(t) else: print('stop consume!') break def process_test(): q=Queue(5) p_produce=Process(target=produce_task,args=(q,)) p_consume=Process(target=consume_task,args=(q,)) p_produce.start() p_consume.start() p_produce.join() p_consume.join() print("Done!") def pool_test(): q=Manager().Queue(5) p=Pool(2) p.apply_async(produce_task,(q,)) p.apply_async(consume_task,(q,)) p.close() p.join() print("Done!") if __name__=='__main__': # process_test() pool_test() $ python process-queue.py produce:0.04 consume:0.04 produce:0.99 consume:0.99 stop consume! produce:0.07 produce:0.13 produce:0.96 produce:0.86 produce:0.27 stop produce! Done!
- 进程间通信,可使用
multiprocessing.Queue()
- 进程池进程通讯,可使用
multiprocessing.Manager().Queue()
get(block=True,timeout=None)
从队列中弹出一个消息(无消息则阻塞或抛出Queue.Empty异常)get_nowait()
=get(False)
put(item,block=True,timeout=None)
将一个消息写入队列(队列满了则阻塞或抛出Queue.Full异常)put_nowait(item)
=put(item,False)
qsize()
当前队列包含的消息数量 => 效果不好,可能会报NotImplementedErrorempty()
判断当前队列是否为空full()
判断当前队列是否满了
- 进程间通信,可使用
线程 Thread
threading模块
- Python的thread模块较底层,threading模块对thread做了一些包装,更方便
- Python解释器(CPython)在解析多线程时,会使用
GIL
全局排他锁(Global Interpreter Lock
),不管几核CPU,同一时刻只有一个线程能使用到CPU,从而导致多线程无法利用多核,解决方法有- 用多进程
multiprocess
代替多线程 - 使用其他解析器(GIL只是CPython的产物)
- 使用c语言实现多线程(利用
ctypes
模块直接调用任意的C动态库的导出函数)
- 用多进程
- 使用:
- 创建线程:
- 法一:创建一个Thread实例
Thread(group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None)
- 法二:自定义一个类,继承
Thread
类,重写run
方法
- 法一:创建一个Thread实例
start()
启动join()
主线程阻塞等待子线程结束后再继续,非必需is_alive()
判断线程是否活着name
线程名字,默认Thread-1
,Thread-2
,...
- 创建线程:
- 线程同步
- 多个线程有序协同工作
- 场景:
- 多个线程共同争夺修改某个共享数据,如全局变量,一个进程内的所有线程共享全局变量(各线程可修改,线程非安全)
- 生产-消费者,速度不匹配,需要一个缓冲协同
- 实现方式:
- 互斥锁 Mutex
- 每次只有一个线程可以上锁,其他需要获得这个锁的线程变为
blocked
状态 - 线程调度程序从处于同步阻塞状态的线程中选择一个来获得锁,并使得该线程进入运行
running
状态 - 锁:相当于一块独占空间,一次只能进一个人(线程)
- 创建锁:
mutex=threading.Lock()
- 锁住:
mutex.acquire(blocking=True, timeout=-1)
=> 锁状态:locked
,锁住的那个线程:running
,其他需要这个锁的线程:blocked
- 开锁:
mutex.release()
=> 锁状态:unlocked
,需要这个锁的线程开始争夺这个锁,获这个锁的线程状态从blocked
变成runing
- 死锁:不同的线程持有不同的锁,并试图获取对方持有的锁时,可能会造成死锁 => 需尽量避免,如添加超时时间等
- 每次只有一个线程可以上锁,其他需要获得这个锁的线程变为
- 队列 Queue
from queue import Queue
q=Queue(maxsize=0)
.get(block=True,timeout=None)
,.put(item,block=True,timeout=None)
.qsize()
,.full()
,.empty()
,.join()
- 互斥锁 Mutex
- 注:
- 主进程,一个主线程运行
- 主线程结束,不会导致子线程结束
- 线程间传递数据:
- 全局变量
- 传参
- 线程自己的数据:
- 局部变量
- ThreadLocal
- 定义一个全局变量
l=threading.local()
- 每个Thread对它的读写互不影响
l.xxx
,l.xxx=xxx
- 类似一个dict
- 虽然是全局变量,但每个线程都只能读写自己线程的独立副本,互不干扰
- 解决了参数在一个线程中各个函数之间互相传递的问题
- 定义一个全局变量
Sample1:Thread
from threading import Thread
import threading
import os,time,random
def task(name):
print("%s do task %s start" % (threading.current_thread().name,name))
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print("%s do task %s end (%.2f)" % (threading.current_thread().name,name,end-start))
class MyThread(Thread):
def __init__(self,taskName):
Thread.__init__(self)
self.taskName=taskName
def run(self):
print("%s do task %s start" % (self.name,self.taskName)) # name属性中保存的是当前线程的名字
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print("%s do task %s end (%.2f)" % (self.name,self.taskName,end-start))
def thread_inst_test():
t=Thread(target=task,args=('test',))
t.start()
def thread_class_test():
t=MyThread('test')
t.start()
if __name__=='__main__':
print("%s thread start" % threading.current_thread().name)
# thread_inst_test()
thread_class_test()
print("%s thread end" % threading.current_thread().name)
$ python thread-demo.py
MainThread thread start
Thread-1 do task test start
MainThread thread end
Thread-1 do task test end (0.47)
Sample2: Multiple tasks
# test1: 异步,多线程交错打印
# result sample:
# Thread-1 print 1
# Thread-2 print 1
# Thread-1 print 2
# Thread-1 print 3
# Thread-1 done!
# Thread-2 print 2
# Thread-2 print 3
# Thread-2 done!
def test_multi_tasks_async():
def async_task():
for i in range(1,4):
print("%s print %d " % (threading.current_thread().name,i))
time.sleep(random.random())
print("%s done!" % threading.current_thread().name)
t1=Thread(target=async_task)
t2=Thread(target=async_task)
t1.start()
t2.start()
# test2: 同步,多线程顺序打印
# result sample:
# Thread-1 print 1
# Thread-1 print 2
# Thread-1 print 3
# Thread-1 done!
# Thread-2 print 1
# Thread-2 print 2
# Thread-2 print 3
# Thread-2 done!
def test_multi_tasks_sync():
mutex=threading.Lock()
def sync_task():
if mutex.acquire():
for i in range(1,4):
print("%s print %d " % (threading.current_thread().name,i))
time.sleep(random.random())
print("%s done!" % threading.current_thread().name)
mutex.release()
t1=Thread(target=sync_task)
t2=Thread(target=sync_task)
t1.start()
t2.start()
Sample3:Mutex
# test: 操作共享资源,不加锁,结果不正确,shoud be 0
# result sample:
# Done Thread-1 do add,num = 208671
# Done Thread-2 do sub,num = -102681
def test_multi_tasks_global_val():
num=0
def add_task():
global num
for i in range(0,1000000):
num+=1
print("Done %s do add,num = %d " % (threading.current_thread().name,num))
def sub_task():
global num
for i in range(0,1000000):
num-=1
print("Done %s do sub,num = %d " % (threading.current_thread().name,num))
t1=Thread(target=add_task)
t2=Thread(target=sub_task)
t1.start()
t2.start()
# test: 操作共享资源,加锁,得正确结果,be 0
# result sample:
# Done Thread-2 do sub,num = -36334
# Done Thread-1 do add,num = 0
def test_multi_tasks_global_val_mutex():
lock=threading.Lock()
num=0
def add_task():
nonlocal num
for i in range(0,1000000):
lock.acquire()
num+=1
lock.release()
print("Done %s do add,num = %d " % (threading.current_thread().name,num))
def sub_task():
nonlocal num
for i in range(0,1000000):
lock.acquire()
num-=1
lock.release()
print("Done %s do sub,num = %d " % (threading.current_thread().name,num))
t1=Thread(target=add_task)
t2=Thread(target=sub_task)
t1.start()
t2.start()
Sample4:Queue
Queue
# result sample:
# Consumer [0] Begin
# Consumer [0] handle item: 5.77
# Consumer [1] Begin
# Consumer [2] Begin
# Waiting for all threads done...
# Consumer [2] handle item: 4.51
# Consumer [1] handle item: 3.87
# Consumer [0] handle item: 1.61
# Consumer [1] handle item: 6.42
# Consumer [2] handle item: 0.17
# Consumer [1] handle item: 9.36
# Consumer [0] handle item: 4.88
# Consumer [1] handle item: 8.63
# Consumer [1] handle item: 0.27
# Consumer [2] Done
# Consumer [0] Done
# Consumer [1] Done
# All threads done.
def test_thread_queue():
import queue
def queue_consumer(index,q):
print("Consumer [%s] Begin" % index)
while not q.empty():
item = q.get()
if not item:
time.sleep(1)
continue
do_task(index,item)
print('Consumer [%s] Done' % index)
def do_task(index,item):
print("Consumer [%s] handle item: %s" % (index,item))
time.sleep(random.random())
q=queue.Queue()
[q.put("%.2f" % (random.random()*10)) for i in range(10)]
t_list=[]
for i in range(3):
t=threading.Thread(target=queue_consumer,args=(i,q,))
t.start()
t_list.append(t)
print('Waiting for all threads done...')
for t in t_list:
t.join()
print('All threads done.')
Sample5:ThreadLocal
ThreadLocal
# result sample:
# Done Thread-1 task: step=0
# Done Thread-5 task: step=40
# Done Thread-2 task: step=10
# Done Thread-3 task: step=20
# Done Thread-4 task: step=30
def test_thread_dict():
thread_dict = {}
def task(step):
thread_dict[threading.current_thread()]=step
do_task()
def do_task():
currentThread=threading.current_thread()
step=thread_dict[currentThread]
time.sleep(random.random())
print("Done %s task: step=%s" % (currentThread.name,step))
for i in range(5):
t=Thread(target=task,args=(i*10,))
t.start()
def test_thread_local():
#thread_dict = {}
thread_local = threading.local()
def task(step):
#thread_dict[threading.current_thread()]=step
thread_local.step=step
do_task()
def do_task():
currentThread=threading.current_thread()
#step=thread_dict[currentThread]
step=thread_local.step
time.sleep(random.random())
print("Done %s task: step=%s" % (currentThread.name,step))
for i in range(5):
t=Thread(target=task,args=(i*10,))
t.start()
协程 Coroutine
Async programming allows you to write concurrent code that runs in a single thread. The first advantage compared to multiple threads is that you decide where the scheduler will switch from one task to another, which means that sharing data between tasks it's safer and easier. With asynchronous programming, you allow your code to handle other tasks while waiting for these other resources to respond.
- async IO,被动通知,单线程多任务(非阻塞),执行效率高
- 与多线程对比优势:
- 没有线程切换的开销
- 共享资源无需加锁也不会发生冲突(判断状态即可)
使用Python标准库:asyncio
(Python 3.4引入)
编程模型: 一个消息循环,从asyncio
模块中获取一个EventLoop
的引用,把需要执行的协程扔到EventLoop
中执行
Coroutine
协同任务# method1: use `@asyncio.coroutine` @asyncio.coroutine def my_task(args): print('start') yield from asyncio.sleep(3) # 用`yield from`调用另一个`coroutine`实现异步操作 print('end') # method2: use `async` async def my_task(args): print('start') await asyncio.sleep(3) # 用`await`调用另一个`coroutine`实现异步操作 print('end') # return a coroutine object my_coroutine = my_task(args)
- 一个用
@asyncio.coroutine
或async
标注的function xxx(args)
,不会执行方法体中的语句,只是返回一个coroutine objectasync
&await
是Python 3.5引入的语法糖,async
代替@asyncio.coroutine
,await
代替yield from
(Note:await
keyword can only be used inside anasync def function
)
- 一个用
EventLoop
事件循环(事件池)# 1. create loop: 从`asyncio`模块中获取一个`EventLoop`的引用 loop = asyncio.new_event_loop() # 2. add coroutine to the loop,return a future object future = loop.create_task(my_coroutine) # 3. execute all coroutine added to the loop concurrently # method1: loop.run_until_complete(my_coroutine) # method2: loop.run_until_complete(future) loop.close()
- 组织协调执行加入的协同任务
- execute our asyncronous code and decide how to switch between async functions
Running Tasks Concurrently(一个事件循环中执行多个task , 并发执行)
asyncio.wait(fs, *, loop=None, timeout=None, return_when='ALL_COMPLETED')
- 等待任务完成,支持在完成第一个任务后或在指定的超时之后停止等待
- return: two sets of Future:
(done, pending)
done
: 已完成的协程Task集合{asyncio.Task,asyncio.Task,...}
pending
: 超时未完成的协程Task集合asyncio.Task
: extends Future, could use.result()
get task return value
- usage:
done, pending = await asyncio.wait(coros_or_futures)
-- in async funcdone, pending = loop.run_until_complete(async.wait(coros_or_futures))
-- in normal func
asyncio.gather
- 等待并按给定的顺序返回完成的任务结果
- return:已完成的协程Task的结果值列表
[taskReturnValue1,taskReturnValue2,...]
- usage:
results=await asyncio.gather(*coros_or_futures)
results=loop.run_until_complete(async.gather(*coros_or_futures))
- 还可对任务进行高级别分组
group1 = asyncio.gather(*[task1,task2]) group2 = asyncio.gather(*[task3,task4]) all_groups = asyncio.gather(group1, group2) results = loop.run_until_complete(all_groups)
- 总结
asyncio.wait
vs.asyncio.gather
:- 都是用于获取结果的,且都不阻塞,直接返回一个生成器对象可用于
yield from
/await
- 都是两种用法
results = asyncio.run_until_completed(asyncio.wait/gather)
执行所有完成之后获取results = await asyncio.wait/gather
在一个协程内获取结果
- 但返回结果不一样
asyncio.wait
返回 tuple:(doneSet,pendingSet)
asyncio.gather
返回 list:[taskReturnValue1,taskReturnValue2,...]
- 都是用于获取结果的,且都不阻塞,直接返回一个生成器对象可用于
Task
/Future
(注:class Task(Future)
):# method1: add a coroutine to the loop and returns a `Task` which is a subtype of `Future` future = loop.create_task(my_coroutine) # method2: adds a coroutine to the default loop future = asyncio.ensure_future(my_coroutine) # could add call back for the future object future.add_done_callback(result_handler())
- works as a placeholder for the output of an asynchronous function
- and it gives us information about the function state,
- A future is created when we add a corutine to an event loop
Run Result:
# method1: use the `yield from` or `await` result=await my_coroutine # method2: add it to an event loop future=loop.create_task(my_coroutine) result=future.result()
- Get the output of an async function from a coroutine
Exception handling:
# method1: try catch the exception try: future.result() # this will re-raise the exception raised during the coroutine execution except Exception: pass # method2: get the exception information future.exception()
- method1:
try...except...
捕获异常 - method2: 调用
future.exception()
异常信息 - Note:unhandled exception raised inside a coroutine doesn't break the program,it will be stored inside the future and get
Task exception was never retrieved
error before program exit.
- method1:
Sample1:@asyncio.coroutine
& yield from
vs. async
& await
@asyncio.coroutine
&yield from
import asyncio import random @asyncio.coroutine def hello(name): r=random.random()*3 print('Hello %s, sleep %s' % (name,r)) yield from asyncio.sleep(r) print('Hello %s wake up!' % name) if __name__ == '__main__': loop = asyncio.get_event_loop() # 获取EventLoop #tasks = [hello("Tom"), hello("Lucy")] t1=loop.create_task(hello("Tom")) t2=loop.create_task(hello("Lucy")) tasks=[t1,t2] # t1,t2 is future loop.run_until_complete(asyncio.wait(tasks)) # 执行 loop.close()
async
&await
import asyncio import random async def hello(name): # 使用`async` 代替`@asyncio.coroutine` r=random.random()*3 print('Hello %s, sleep %s' % (name,r)) await asyncio.sleep(r) # 使用`await` 代替`yield from` print('Hello %s wake up!' % name) if __name__ == '__main__': loop = asyncio.get_event_loop() # tasks = [hello("Tom"), hello("Lucy")] t1=loop.create_task(hello("Tom")) t2=loop.create_task(hello("Lucy")) tasks=[t1,t2] loop.run_until_complete(asyncio.wait(tasks)) loop.close()
Sample2: coroutine
vs. future
, asyncio.wait
vs. asyncio.gather
coroutine
import asyncio import time,random async def do_async_task(index,item): print('[start]\t %s:\t %s' % (index,item)) start=time.time() await asyncio.sleep(random.random()) end=time.time() result ='[end]\t %s:\t %s (spent:%.2f)' % (index,item,(end-start)) return result if __name__=='__main__': loop = asyncio.get_event_loop() task_list=[] for i in range(10): item="%.2f" % (random.random()*10) t=do_async_task(i,item) # t is coroutine task_list.append(t) print('Waiting for all sub-proc done...') # method1: use asyncio.wait done,pending=loop.run_until_complete(asyncio.wait(task_list)) print(done,pending) for r in done: print("get result:",r.result()) # method2: use asyncio.gather # results=loop.run_until_complete(asyncio.gather(*task_list)) # print(results) # for r in result: # print("get result:",r) loop.close() print('All sub-procs done.') # for task in task_list: # # task: # # type: coroutine # # sample: <coroutine object do_async_task at 0x10f24c8c8> # print(task)
future
import asyncio import time,random async def do_async_task(index,item): print('[start]\t %s:\t %s' % (index,item)) start=time.time() await asyncio.sleep(random.random()) end=time.time() result ='[end]\t %s:\t %s (spent:%.2f)' % (index,item,(end-start)) return result def handle_result(future): print("callback:",future.result()) if __name__=='__main__': loop = asyncio.get_event_loop() task_list=[] for i in range(10): item="%.2f" % (random.random()*10) t=do_async_task(i,item) # t is coroutine f=loop.create_task(t) # f is future f.add_done_callback(handle_result) task_list.append(f) print('Waiting for all sub-proc done...') # method1: use asyncio.wait done,pending=loop.run_until_complete(asyncio.wait(task_list)) print(done,pending) for r in done: print("get result:",r.result()) # method2: use asyncio.gather # results=loop.run_until_complete(asyncio.gather(*task_list)) # print(results) # for r in result: # print("get result:",r) loop.close() print('All sub-procs done.') # for task in task_list: # # task: # # type: _asyncio.Task,subtype of `Future` # # sample: <Task finished # # coro=<do_async_task() done, defined at asyncio-demo.py:5> # # result='[end]\t 7:\t... (spent:0.36)'> # print("get task result:",task.result())
Result sample:
Waiting for all sub-proc done... [start] 0: 6.28 [start] 1: 6.02 [start] 2: 5.86 [start] 3: 1.32 [start] 4: 8.29 [start] 5: 6.30 [start] 6: 2.07 [start] 7: 3.41 [start] 8: 8.47 [start] 9: 3.76 callback: [end] 4: 8.29 (spent:0.11) callback: [end] 0: 6.28 (spent:0.18) callback: [end] 3: 1.32 (spent:0.20) callback: [end] 6: 2.07 (spent:0.31) callback: [end] 1: 6.02 (spent:0.51) callback: [end] 9: 3.76 (spent:0.51) callback: [end] 7: 3.41 (spent:0.75) callback: [end] 8: 8.47 (spent:0.78) callback: [end] 5: 6.30 (spent:0.79) callback: [end] 2: 5.86 (spent:1.00) get result: [end] 5: 6.30 (spent:0.79) get result: [end] 2: 5.86 (spent:1.00) get result: [end] 8: 8.47 (spent:0.78) get result: [end] 6: 2.07 (spent:0.31) get result: [end] 3: 1.32 (spent:0.20) get result: [end] 0: 6.28 (spent:0.18) get result: [end] 9: 3.76 (spent:0.51) get result: [end] 7: 3.41 (spent:0.75) get result: [end] 4: 8.29 (spent:0.11) get result: [end] 1: 6.02 (spent:0.51) All sub-procs done.
Sample3:queue
async def do_async_task(index,item):
print('do task %s: Consumer[%s] %s' % (item[0],index,item[1]))
start=time.time()
await asyncio.sleep(random.random())
end=time.time()
result ='do task %s: Consumer[%s] %s (spent:%.2f)' % (item[0],index,item[1],(end-start))
return result
def handle_result(future):
print(future.result())
async def async_queue_consumer(index,q):
print("Consumer[%s] Begin" % index)
while not q.empty():
item = await q.get()
if not item:
await asyncio.sleep(1)
continue
task=asyncio.create_task(do_async_task(index,item))
task.add_done_callback(handle_result)
await asyncio.gather(task)
print('Consumer[%s] Done' % index)
if __name__=='__main__':
print("--- Test: asyncio queue ---")
q=asyncio.Queue()
[q.put_nowait((i,"%.2f" % (random.random()*10))) for i in range(10)]
tasks = [async_queue_consumer(i,q) for i in range(3)]
loop = asyncio.get_event_loop()
print('Waiting for all sub-proc done...')
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
print('All sub-procs done.')
Result sample:
--- Test: asyncio queue ---
Waiting for all sub-proc done...
Consumer[1] Begin
Consumer[0] Begin
Consumer[2] Begin
do task 0: Consumer[1] 5.87
do task 1: Consumer[0] 7.46
do task 2: Consumer[2] 7.20
do task 1: Consumer[0] 7.46 (spent:0.05)
do task 3: Consumer[0] 7.28
do task 2: Consumer[2] 7.20 (spent:0.61)
do task 4: Consumer[2] 7.07
do task 0: Consumer[1] 5.87 (spent:0.72)
do task 5: Consumer[1] 9.09
do task 3: Consumer[0] 7.28 (spent:0.67)
do task 6: Consumer[0] 9.65
do task 4: Consumer[2] 7.07 (spent:0.33)
do task 7: Consumer[2] 6.64
do task 6: Consumer[0] 9.65 (spent:0.89)
do task 8: Consumer[0] 3.21
do task 7: Consumer[2] 6.64 (spent:0.74)
do task 9: Consumer[2] 1.78
do task 5: Consumer[1] 9.09 (spent:0.98)
Consumer[1] Done
do task 9: Consumer[2] 1.78 (spent:0.13)
Consumer[2] Done
do task 8: Consumer[0] 3.21 (spent:0.62)
Consumer[0] Done
All sub-procs done.
访问数据库
SQLite
- SQLite是一种嵌入式数据库,它的数据库就是一个文件
- Python就内置了SQLite3,可直接使用
- 操作:建立连接Connection,打开游标,通过Cursor执行SQL语句,获得执行结果,关闭游标,提交事物,关闭连接
Cursor
对象通过execute
方法执行SQL语句(可使用?
占位符)- 执行
insert
/update
/delete
后,可通过cursor.rowcount
获取影响的行数 - 执行
select
后,可通过cursor.featchall()
获取结果集列表,每个元素都是一个tuple,对应一行记录
Sample:
import sqlite3
import os
# delte test.db
db_file = os.path.join(os.path.dirname(__file__), 'test.db')
if os.path.isfile(db_file):
os.remove(db_file)
conn = sqlite3.connect('test.db') # 建立连接(数据库文件不存在时,会创建)
cursor = conn.cursor() # 创建并打开一个Cursor
try:
# 执行SQL语句: create
cursor.execute('create table user(id varchar(20) primary key, name varchar(20), score int)')
# 执行SQL语句: insert
cursor.execute(r"insert into user values ('A-001', 'Adam', 95)")
print(cursor.rowcount)
cursor.execute(r"insert into user values ('A-002', 'Bart', 62)")
print(cursor.rowcount)
cursor.execute(r"insert into user values ('A-003', 'Lisa', 78)")
print(cursor.rowcount)
# 提交事务
conn.commit()
# 执行SQL语句: select
cursor.execute('select * from user where id=?', ('A-001',))
records = cursor.fetchall()
print(records)
finally:
cursor.close() # 关闭Cursor
conn.close() # 关闭连接
MySQL
pip install mysql-connector-python
or pip install pymysql
#import mysql.connector as pymysql
import pymysql
conn=pymysql.connect(host="localhost",port=3316,user='root',password='123456',database='demo1')
cursor=conn.cursor()
try:
# 执行SQL语句: drop
cursor.execute('drop table if exists user')
# 执行SQL语句: create
cursor.execute('create table user(id varchar(20) primary key, name varchar(20), score int)')
# 执行SQL语句: insert
cursor.execute(r"insert into user values ('A-001', 'Adam', 95)")
print(cursor.rowcount)
cursor.execute(r"insert into user values ('A-002', 'Bart', 62)")
print(cursor.rowcount)
cursor.execute(r"insert into user values ('A-003', 'Lisa', 78)")
print(cursor.rowcount)
# 提交事务
conn.commit()
# 执行SQL语句: select
# 注:这里占位符是'%',不是'?'
cursor.execute('select * from user where id=%s', ('A-001',))
records = cursor.fetchall()
print(records)
finally:
cursor.close()
conn.close()
check result:
mysql> use demo1
Database changed
mysql> select * from user;
+-------+------+-------+
| id | name | score |
+-------+------+-------+
| A-001 | Adam | 95 |
| A-002 | Bart | 62 |
| A-003 | Lisa | 78 |
+-------+------+-------+
3 rows in set (0.00 sec)
Redis
pip install redis
连接Redis数据库
import redis # 1. 连接Redis数据库 # method1: 直接连接 # client=redis.Redis(host='127.0.0.1',port=6379,password='123456') # method2: 连接池连接(预先创建多个连接, 进行redis操作时, 直接获取已经创建的连接进行操作, 完成后,不会释放, 用于后续的其他redis操作) pool = redis.ConnectionPool(host='localhost', port=6379,password='123456') client = redis.Redis(connection_pool=pool) # check exist keys: # results=client.keys('*') # print("list keys * :",results)
基础数据操作: 基本redis的命令名与redis模块中的函数名一致
String:
key -> value
# 设置: # set(key, value, ex=None, px=None, nx=False, xx=False) # setnx(key, value) 只有key不存在时设置 # setex(key, value, time) 设置过期时间(秒) # mset({key:value,...}) 批量设置值 # # 获取: # get(key) # mget(key1,key2,...) # strlen(key) key对应值的字节长度 # # 值追加内容: # append(key,addValue) def test_string(): print("Test String:") result=client.set('a',1) print(result) # True result=client.get('a') print(result) # b'1' result=client.append('a',23) print(result) # 3 result=client.get('a') print(result) # b'123'
Hash: 一个name对应一个dic字典
n1 -> k1:v1,k2,v2
,n2 -> kx:vx,..
# hset(name, key, value) # hget(name, key) # hgetall(name) # hmset(name, mapping) # hmget(name, keys, *args) # # hlen(name) # hkeys(name) # hvals(name) # # hexists(name, key) # hdel(name, *keys) # hincrby(name, key, amount=1) def test_hash(): print("Test Hash:") result = client.hset('stu-001','name','Tom') print(result) # 1 result = client.hmset('stu-001',{'age':19,'gender':'male'}) print(result) # True result = client.hgetall('stu-001') print(result) # {b'name': b'Tom', b'age': b'19', b'gender': b'male'} result = client.hmset('stu-002',{'name':'Lucy','age':16,'gender':'Female'}) print(result) # True result = client.hgetall('stu-002') print(result) # {b'name': b'Lucy', b'age': b'16', b'gender': b'Female'} result = client.hlen('stu-001') print(result) # 3 result = client.hkeys('stu-001') print(result) # [b'name', b'age', b'gender'] result = client.hvals('stu-001') print(result) # [b'Tom', b'19', b'male'] result = client.hincrby('stu-001','age',amount=5) print(result) # 24 result = client.hgetall('stu-001') print(result) # {b'name': b'Tom', b'age': b'24', b'gender': b'male'}
List: 一个name对应一个列表存储,
n1 -> [v1,v2,...]
,n2 -> [...]
# lpush(name, *values),rpush(name, *values) # lpushx(name, *values),rpushx(name, *values) name存在时,才能加入 # linsert(name, where, refvalue, value) name对应的列表的refValue值的前或后where插入一个value # # llen(name) name列表长度 # lindex(name, index) 索引获取元素 # lrange(name, start, end) 分片获取元素 # # lset(name, index, value) 索引位置重新赋值 # lpop(name) 移除列表的左侧第一个元素,返回值则是第一个元素 # lrem(name, value, num=0) 删除name对应的list中的指定值 # ltrim(name, start, end) 移除列表内没有在该索引之内的值 def test_list(): print("Test List:") result = client.lpush('group1','stu-A','stu-B','stu-C','stu-D','stu-E') print(result) # 5 result = client.lpush('group2','child-A','child-B') print(result) # 3 result = client.llen('group1') print(result) # 5 result = client.lrange('group1',2,5) print(result) # [b'stu-C', b'stu-B', b'stu-A']
Set: 不允许有重复的集合,
n1 -> {v1,v2,...}
,n2 -> {...}
# sadd(name, *values) 添加元素 # srem(name,*values) 删除元素 # smembers(name) 列出集合所有元素 # scard(name) 获取元素个数 # sismember(name, value) 测试素是否在 # spop(name) 从集合中随机弹出一个元素 # smove(src, dst, value) 将一个元素从集合1移到集合2 # sdiff/sdiffstore(keys, *args) 差集(以前一个集合为标准) # sinter(keys, *args) 交集 # sunion(keys, *args) 并集 def test_set(): print("Test Set:") result = client.sadd('stdGrp1','stu-A','stu-B','stu-B','stu-C') print(result) # 3 result = client.smembers('stdGrp1') print(result) # {b'stu-B', b'stu-C', b'stu-A'} result = client.scard('stdGrp1') print(result) # 3 result = client.sadd('stdGrp2',*('stu-A','stu-C','stu-D')) print(result) # 3 result = client.sdiff('stdGrp1','stdGrp2') print(result) # {b'stu-B'} result = client.sinter('stdGrp1','stdGrp2') print(result) # {b'stu-C', b'stu-A'} result = client.sunion('stdGrp1','stdGrp2') print(result) # {b'stu-B', b'stu-C', b'stu-A', b'stu-D'}
ZSet: 有序集合(set的升级), 每次添加修改元素后会自动重新排序,
n1 -> {(value1,score1),(value2,score2),...}
# 每一个元素有两个值,值value和分数score(专门用来做排序) # 元素rank(表index,即下标索引) # lexicographical: 相同的分值时,有序集合的元素会根据成员的值逐字节对比 # zadd(name, mapping, nx=False, xx=False, ch=False, incr=False) 添加 # zrem(name, values) 删除 # zremrangebyrank(name, start, end) # zremrangebyscore(name, minscore, maxscore) # zscan(name, cursor=0, match=None, count=None, score_cast_func=float) 查看 # zscan_iter(name, match=None, count=None,score_cast_func=float) # zrange/zrevrange(name, start, end, desc=False, withscores=False, score_cast_func=float) 获取rank范围内的元素 # zrangebyscore(name,minscore,maxscore,...) 获取score范围内的元素 # zrank/zrevrank(name, value) 获取元素rank # zscore(name, value) 获取元素score # zcard(name) 所有元素个数 # zcount(name, minscore, maxscore) 指定score范围内的元素数量 # zincrby(name, value, amount) 自增 # zinterstore(dest, keys, aggregate=None) 交集(相同值不同分数,则按照 aggregate=SUM/MIN/MAX 进行操作) # zunionstore(dest, keys, aggregate=None) 并集 def test_zset(): print("Test ZSet:") # 创建 result = client.zadd('car1',{'car-A':10,'car-B':20,'car-C':30,'car-D':40}) print(result) # 4 # 列出 result = client.zscan('car1') print(result) # (0, [(b'car-A', 10.0), (b'car-B', 20.0), (b'car-C', 30.0), (b'car-D', 40.0)]) # 切片,获取元素列表 result = client.zrange('car1',1,3) print(result) # [b'car-B', b'car-C', b'car-D'] result = client.zrangebyscore('car1',15,35) print(result) # [b'car-B', b'car-C'] # 统计数量 result = client.zcard('car1') print(result) # 4 result = client.zcount('car1',15,35) print(result) # 2 # 获取元素属性:rank,score result = client.zrank('car1','car-C') print(result) # 2 result = client.zscore('car1','car-C') print(result) # 30.0 # 交集 result = client.zadd('car2',{'car-B':25,'car-D':45,'car-E':55}) print(result) # 3 result = client.zinterstore('car-inter',('car1','car2')) print(result) # 2 result = client.zscan('car-inter') print(result) # (0, [(b'car-B', 45.0), (b'car-D', 85.0)]) result = client.zinterstore('car-inter',('car1','car2'),aggregate='MAX') print(result) # 2 result = client.zscan('car-inter') print(result) # (0, [(b'car-B', 25.0), (b'car-D', 45.0)])
其他常用操作
# flushdb(asynchronous=False) # flushall(asynchronous=False) # delete( *names) # exists( name) # keys( pattern='*') # expire(name ,time) # rename( src, dst) # move( name, db) # randomkey() # type(name) def test_others(): result = client.keys() print(result) # [b'car-inter', b'car1', b'car2', b'car', b'stdGrp1', b'stdGrp2', b'stu-001', b'a', b'top:dupefilter', b'stu-002', b'top:items', b'group1', b'tt'] result = client.delete('car') print(result) # 1 result = client.exists('car') print(result) # 0 result = client.type('car-inter') print(result) # b'zset'
管道: 批量提交命令,还可用来实现事务transation
# pipeline(transaction=True,shard_hint=None) 默认情况下一次pipline是原子性操作 # pipe.watch(name) -- 乐观锁,watch的对象不可改 # pipe.unwatch() def test_pipeline(): import time client.set('cnt',10) result=client.get('cnt') print('initial cnt:',result) pipe=client.pipeline(transaction=True) pipe.watch('cnt') # 加锁 try: pipe.multi() cnt=int(client.get('cnt')) pipe.set('a', 1) pipe.set('cnt',cnt+1) pipe.set('b',2) print('sleep...') time.sleep(5) # 此时,若另一个客户端修改了cnt,则这段操作提交(执行execute)时会报错 print('execute...') pipe.execute() except redis.exceptions.WatchError as ex: print("pipe fail:",ex) else: print("pipe success") finally: print("finally: a=%s,cnt=%s,b=%s" % (client.get('a'),client.get('cnt'),client.get('b'))) pipe.unwatch() # 解锁
发布/订阅 -- 不推荐使用
# 发布: # publish(channel,msg) # => redis client execute : `publish channel msg` # 订阅: # pubsub().subscribe(channel).parse_response(block,timeout) # => redis client execute : `subscribe channel` (取消使用命令punsubscribe/unsubscribe) def test_publish(): result=client.publish("channel-1","Hello!") print("result:",result) # 0 while True: msg = "This is %.2f" % (random.random()*10) print('sending...',msg) result=client.publish("channel-1",msg) if result==1: print('send success') else: print('send fail') isCondinue=input("continue?(y/n)") if isCondinue=='n': break; print('Done!') def test_subscribe(): subscribeObj = client.pubsub() result=subscribeObj.subscribe("channel-1") print(result) # None while True: print('receiving...') msg=subscribeObj.parse_response(block=False,timeout=60) # 第一次收到:[b'subscribe', b'channel-1', 1] # 当另一个客户端向此channel发布消息时(eg: `publish channel-1 "Hello World"`) # 这里会收到: [b'message', b'channel-1', b'Hello World'] print("receive msg:",msg) isCondinue=input("continue?(y/n)") if isCondinue=='n': break; print("Done!") if __name__=='__main__': # test_publish() # test_subscribe()
MongoDB
pip install pymongo
import pymongo
import json
from datetime import datetime
# 1. 建立连接
mongoConnStr="mongodb://cj:123456@localhost:27017/?authSource=admin"
client=pymongo.MongoClient(mongoConnStr)
print("list dbs:",client.list_database_names()) # 列出dbs
db=client['mg_test']
print("list collections of mg_test db:",db.list_collection_names()) # 列出collections(类似表)
collection=db['movies']
print("get collection:movies count:",collection.estimated_document_count())
# 2. clear:
# ret=collection.delete_many({})
# print(ret.deleted_count,ret.acknowledged,ret.raw_result)
# 3. update_one -- update or insert record
contents=json.load(open('test.json','r')) # load: file -> string -> python obj
print('update or insert records:')
for record in contents:
id=record.pop('id')
t=datetime.now()
print("to store record...id=%s,title=%s" % (id,record['title']))
record['last_update_time']=t
ret=collection.update_one({'_id':id}
,{'$setOnInsert':{'create_time':t},'$set':record}
,upsert=True)
print(ret.matched_count,ret.modified_count,ret.upserted_id,ret.acknowledged,ret.raw_result)
# 4. find -- list records
print('list stored records:')
results=collection.find({},{'_id':1,'title':1,'rate':1})
for result in results:
print(result)
# results sample:
# {'_id': '27109879', 'rate': '6.5', 'title': '硬核'}
# {'_id': '26707088', 'rate': '7.1', 'title': '奎迪:英雄再起'}
# {'_id': '30334122', 'rate': '6.1', 'title': '芳龄十六'}
# {'_id': '1945750', 'rate': '7.7', 'title': '污垢'}
# {'_id': '26611891', 'rate': '6.8', 'title': '欢乐满人间2'}
print('list rate>=7 records:')
results=collection.find({'rate':{'$gte':"7"}},{'_id':1,'title':1,'rate':1}).limit(5)
for record in results:
print(record)
# results sample:
# {'_id': '26707088', 'rate': '7.1', 'title': '奎迪:英雄再起'}
# {'_id': '1945750', 'rate': '7.7', 'title': '污垢'}
# 5. aggregate -- summary records
print('list summary by rate level')
# $cond:{if:{$gte:['$rating',8]},then:1,else:0}
# $addFields:{'rate_number':{$convert:{input:"$rate",to:"int"}}}
# use $project also could add fields
results=collection.aggregate([
{'$addFields':{
'rate_number':{'$convert':{'input':"$rate",'to':"double"}}
,'rate_level':{'$cond':[
{'$lt':['$rate','7.5']}
,{'$cond':[{'$gte':['$rate','6.5']},'Middle','Low']}
,'High'
]}
}}
# ,{'$project':{
# '_id':1
# ,'rate':1
# ,'title':1
# # ,'rate_level':{'$cond':[
# # {'$lt':['$rate','7.5']}
# # ,{'$cond':[{'$gte':['$rate','6.5']},'Middle','Low']}
# # ,'High'
# # ]}
# }}
,{'$group':{
'_id':"$rate_level"
,'count':{'$sum':1}
,'avg_rate':{'$avg':'$rate_number'}
#,'rate_list':{'$push':'$rate_number'}
,'rate_list':{'$push':{'$concat':['$title',':','$rate']}}
}}
,{'$sort':{'count':-1}}
#,{'$limit':10}
])
for record in results:
print(record)
# results sample:
# {'_id': 'Middle', 'count': 3, 'avg_rate': 6.8, 'rate_list': ['硬核:6.5', '奎迪:英雄再起:7.1', '欢乐满人间2:6.8']}
# {'_id': 'Low', 'count': 1, 'avg_rate': 6.1, 'rate_list': ['芳龄十六:6.1']}
# {'_id': 'High', 'count': 1, 'avg_rate': 7.7, 'rate_list': ['污垢:7.7']}
print("Done!")