Python 基础

Starter

  • 官网
  • Python Standard Library
  • Python Package Index
  • 安装 python
    • mac: brew install python3
    • windows: download and install from python website
  • 包管理工具 pip:
    • pip/pip3 [cmd] [opts]
    • eg: pip instal xxx,pip install --upgrade xxx,pip install xxx==version,pip uninstall xxx,pip list

开发环境 (IDE)

  1. python自带的开发环境

     # 交互式
     $ python
     Python 3.7.2 (tags/v3.7.2:9a3ffc0492, Dec 23 2018, 23:09:28) [MSC v.1916 64 bit (AMD64)] on win32
     Type "help", "copyright", "credits" or "license" for more information.
     >>> 100+200
     300
     >>> exit()
    
     # 直接运行
     $ python hello.py
     100+200=300
    
     # 调试:
     # pdb调试器, 以参数`-m pdb`启动,输入命令`n`单步执行代码,`p 变量名`查看变量,`q`  结束调试退出程序
     D:\Space\python> python -m pdb hello.py
     > d:\space\python\hello.py(2)<module>()
     -> print('--- Print --- ')
     (Pdb) n
     --- Print ---
     > d:\space\python\hello.py(3)<module>()
     -> print('100+200='+str(100+200))
     (Pdb) n
     100+200=300
     > d:\space\python\hello.py(4)<module>()
     -> print('100+200=',100+200)
     (Pdb) n
     100+200= 300
     > d:\space\python\hello.py(7)<module>()
     -> print('--- Inport --- ')
     (Pdb) n
     --- Inport ---
     > d:\space\python\hello.py(8)<module>()
     -> name=input('input name:')
     (Pdb) n
     input name:Tom
     > d:\space\python\hello.py(9)<module>()
     -> print('Hello '+name)
     (Pdb) p name
     'Tom'
     (Pdb) n
     Hello Tom
     > d:\space\python\hello.py(12)<module>()
     -> print('--- Function --- ')
     (Pdb) q
    
     D:\Space\python>
    
  2. Anaconda

    • 开源免费 Download
    • 一个集成各类Python工具的集成平台
    • 包括 conda包管理工具, 某版本Python, 一批第三方库,编程工具Spyder,交互式编程工具IPython,网页交互式编程工具Jupyter Notebook等
    • ipython

      • ?: 变量前或后增加?将显示一些通用信息,包括函数对应的源代码
      • %
        • %run: 用于运行.py程序 (注意在一个空的命名空间执行%)
        • %magic: 显示所有魔术命令
        • %hist: IPython命令的输入历史
        • %pdb: 异常发生后是否自动进入调试器
        • %reset: 重置,即删除当前命名空间中的全部变量或名称
        • %who: 显示Ipython当前命名空间中已经定义的变量
        • %time: 给出代码的执行时间
        • %timeit: 多次执行代码,计算综合平均执行时间
    • conda 包管理和环境管理工具

      • 包管理与pip类似,管理Python第三方库
      • 环境管理能够允许用户使用不同版本Python,并能灵活切换
      • 常用命令:

        • 获取conda版本 conda ‐‐version
        • 升级conda conda update conda
        • 下载/卸载包 conda install/uninstall xxx
        • 列出安装的包 conda list
        • 搜索包 conda search xxx eg: conda search numpy>=1.12
        • 创建/启用/退出env conda create/activate/deactivate xxx

            # conda activate/deactivate envName
            $ conda activate base
            (base) $ ipython3
            Python 3.7.3 (default, Mar 27 2019, 16:54:48)
            Type 'copyright', 'credits' or 'license' for more information
            IPython 7.4.0 -- An enhanced Interactive Python. Type '?' for help.
          
            In [1]: 100+200
            Out[1]: 300
          
            In [2]: exit()
            (base) $
          
  3. PyCharm

    • Community版本免费 Download
    • 类似 Eclipse,有调试、语法高亮、Project管理、代码跳转、智能提示、自动完成、单元测试、版本控制等功能
    • Preference -> Project Interpreter -> 配置 environment (若安装)

基础

关键字

>>> import keyword
>>> help(keyword)
>>> keyword.kwlist
['False', 'None', 'True', 'and', 'as', 'assert', 'async', 'await', 'break', 'class', 'continue', 'def', 'del', 'elif', 'else', 'except', 'finally', 'for', 'from', 'global', 'if', 'import', 'in', 'is', 'lambda', 'nonlocal', 'not', 'or', 'pass', 'raise', 'return', 'try', 'while', 'with', 'yield']

注释

  • 单行注释
      # Test
      print("Hello World")
    
  • 多行注释
      '''
      This is test program
      Author: CJ
      Date: 2019/03/10
      '''
    
  • 中文支持(for Syntax Error:Non-ASCII Character ...)
      # -*- coding:utf-8 -*-
      print('你好')
    
    • 注:# -*- coding:utf-8 -*- 一般放文件首行

输出

# 1. "",'','''
print('123')
print(12)

print("It's a.")
print('It\'s a.')

print("Hello 'Tom'.")    # Hello 'Tom'.
print('Hello "Tom".')    # Hello "Tom".

# 2. ''',\n

print('sdf\nsdfd')
print('''sdf
sdfd
''')

print('''Hello Tom.
Miss you
How are you?''')
#Hello Tom.
#Miss you
#How are you?

print("Hello Tom.\nMiss you\nHow are you?")
#Hello Tom.
#Miss you
#How are you?

# 3. a b
print("Hello","Tom") # Hello Tom

# 4. %
print("Hello %s" % "Tom")    # Hello Tom
print("a=%d" % 123.5)        # a=123
print("Hello %s,a=%d" % ("Tom",123.5)) # Hello Tom,a=123

x=[1,2,3]
print("Hello %s" % x)        # Hello [1, 2, 3]

x="Hello %s,a=%d"
print(x % ("Tom",123.5))    # Hello Tom,a=123

x="Hello %s,a=%d" % ("Tom",123.5)
type(x)                        # str
print(x)                    # Hello Tom,a=123

# 5. str,int
x=123
y='123'
print(x)
print(y)
print(str(x)+y)

格式化输出,常用的格式符号:

  • 字符
    • %c: 字符
    • %s: 字符串,通过str()字符串转换格式化,eg:
      • %10s: 右对齐,占位符10位
      • %-10s: 左对齐,占位符10位
      • %.2s: 截取2位字符串
      • %10.2s: 10位占位符,截取两位字符串
  • 整数
    • %d: dec 十进制整数
    • %o: oct 八进制整数
    • %x: hex 十六进制整数
      >>> a=123
      >>> print("a=%x" % a)
      a=7b
      >>> print("a=%X" % a)
      a=7B
      
  • 小数

    • %f: 浮点实数(默认保留小数点后面六位有效数字)
      • %.3f: 保留3位小数位
    • %e: 指数形式输出(默认保留小数点后面六位有效数字)   * %.3e: 保留3位小数位,使用科学计数法
    • %g: %f+%e(默认保证六位有效数字)   * %.3g: 保留3位有效数字,使用小数或科学计数法

      >>> a=123.5
      
      # f%
      >>> print("a1=%f,a2=%.3f" % (a,a))
      a1=123.500000,a2=123.500
      
      # e%
      >>> print("a=%e,a=%.3e" % (a,a))
      a1=1.235000e+02,a2=1.235e+02
      
      # g%
      >>> print("a=%g" % a)
      a=123.5
      >>> print("a=%.2g" % a)
      a=1.2e+02
      >>> print("a=%.3g" % a)
      a=124
      >>> print("a=%.5g" % a)
      a=123.5
      >>> print("a=%g" % 123.56789)
      a=123.568
      

输入

input(prompt=None)

注意:input获取的数据,都以字符串的方式进行保存

>>> a=input()
Tom
>>> print(a)
Tom

>>> a=input("Enter your name:")
Enter your name:Tom
>>> print(a)
Tom

>>> a=input("Enter your num:")
Enter you num: 123
>>> print(a)
123
>>> type(a)
<class 'str'>
name=input('name:')        # 输入值会被强制性地转换为字符串类型
print('Hi '+name);

age=int(input('age:'))
print('get age:'+str(age))

运算符

  1. 算术运算符

    运算符 描述 实例
    + print(10+20) => 30
    - print(10-20) => -10
    * print(a_b) => 200 ; print('##'_3) => ######
    / print(10/20) => 0.5
    // 取整除 print(10//20) => 0 ; print(20/10) => 2
    % 取余 print(20%10) => 0 ; print(20/3) => 2
    ** print(2**3) => 8
  2. 比较运算符: ==,!=,<>,>,<,>=,<=

  3. 逻辑运算符:and,or,not

判断语句

keyword: if,elif,else

# if-else
a=100
if a>100:
    print('a>100')
else
    print('a<=100')

# if-elif(else)
if a>100:
    print('a>50')
elif a==100:
    print('a==100')
elif a<100 and a>50:
    print('a (50,100)')
else:
    print('a<=50')

循环语句

keyword: for,while,break,continue

# while
i=0
while i<100:
    if i%2==0
        print(i,"偶数")
    else
        print(i,"奇数")
    i+=1
# for in
# sample1:
for i in 'abcdefg123456':
    if i=='d':
        print('find d before 5')
        continue
    if i=='5':
        break
    print(i)

# sample2:
for i in range(10):
    if i == 11:
        print('找到结果')
        break
else:
    print('未找到结果')

异常处理

  • Python的错误也是class,所有的错误类型都继承自BaseException,可使用继承自定义异常类
  • 如果错误没有被捕获,它就会一直往上抛,最后被Python解释器捕获,打印一个错误信息,然后程序退出
  • 捕获异常:
    • try...except...
    • try...finally...
    • try...except...else...
    • try...except...finally...
    • try...except...else...finally...
    • 说明:
      • 捕获到except中匹配的异常,则执行except下的语句,反之则执行else中的语句
      • 无论是否捕获到异常,都会执行finally中的语句
  • 手动抛出异常:raise 异常实例 (注:在except中可直接用raise重新抛出捕获的异常)
  • 借助Python内置的logging模块,可以容易地记录错误信息(输出或记录到文件),方便事后排查

Sample:

# 1. 捕获某个异常
def testCatchOneException():                
    try:
        print('Begin')
        open('test.json','r')        # 若test.json不存在,则会产生 IOError 异常
        print('End')
    except IOError:                 # 用变量记录可使用 `except IOError as ex:`
        print('Catch IOError')
    else:
        print('Not catch')
    finally:
        print('Done')

# 2. 捕获多个异常
def testCatchMultiException():                 
    try:
        print('Begin')
        open('test.json','r')         # 若test.json不存在,则会产生 IOError 异常
        print('then')
        print(num)                     # 若num未定义,则会会产生 NameError 异常
        print('End')
    except IOError or NameError:     # catch IOError or NameError
        print('Catch Error')
    except:                            # catch other all exception (under BaseException)
        print('Catch Error')    
    else:
        print('Not catch')
    finally:
        print('Done')

# 3. 用变量保存捕获到的异常
def testRecordException():
    try:
        print('Begin')
        open('test.json','r')
        print('Then')
        print(num)
        print('End')
    except (IOError,NameError) as ex:     # 等同:`except IOError or NameError as ex:`
        print('Catch error:',type(ex),ex)

# 4. 用raise抛出异常
def testRaiseException1():
    try:
        print('Begin')
        raise ValueError('Hello',1,2)    # 抛出异常(异常实例,构造可传入任意参数,无限制)
        print('End')
    except:                             # 捕获异常
        print('Catch Error')             # 记录一下
        raise                             # 再抛出
    finally:
        print('Done')

def testRaiseException2():
    try:
        print('Begin')
        open('test.json','r')
        print('then')
        print(num)
        print('End')
    except (IOError,NameError) as ex:
        print('Catch error:',type(ex),ex)
        raise         # 抛出发生的IOError或NameError异常实例,
                    # 也可抛出另一个异常实例,如:`raise ValueError(msg)`
    finally:
        print('Done')

# 5. 使用logging模块,记录错误信息
import logging
def testLogException():
    try:
        print('Begin')
        raise ValueError('Hello')
        print('End')
    except Exception as ex:
        print("Catch error:",type(ex),ex)
        logging.exception(ex)
    finally:
        print("Done")

>>> testLogException()
Begin
Catch error: <class 'ValueError'> Hello
ERROR:root:Hello
Traceback (most recent call last):
  File "<ipython-input-34-817d16b0d2f0>", line 4, in testLogException
    raise ValueError('Hello')
ValueError: Hello
Done

Python Build-in Exceptions

Refer Doc

BaseException
 +-- SystemExit
 +-- KeyboardInterrupt
 +-- GeneratorExit
 +-- Exception
      +-- StopIteration
      +-- StopAsyncIteration
      +-- ArithmeticError
      |    +-- FloatingPointError
      |    +-- OverflowError
      |    +-- ZeroDivisionError
      +-- AssertionError
      +-- AttributeError
      +-- BufferError
      +-- EOFError
      +-- ImportError
      |    +-- ModuleNotFoundError
      +-- LookupError
      |    +-- IndexError
      |    +-- KeyError
      +-- MemoryError
      +-- NameError
      |    +-- UnboundLocalError
      +-- OSError
      |    +-- BlockingIOError
      |    +-- ChildProcessError
      |    +-- ConnectionError
      |    |    +-- BrokenPipeError
      |    |    +-- ConnectionAbortedError
      |    |    +-- ConnectionRefusedError
      |    |    +-- ConnectionResetError
      |    +-- FileExistsError
      |    +-- FileNotFoundError
      |    +-- InterruptedError
      |    +-- IsADirectoryError
      |    +-- NotADirectoryError
      |    +-- PermissionError
      |    +-- ProcessLookupError
      |    +-- TimeoutError
      +-- ReferenceError
      +-- RuntimeError
      |    +-- NotImplementedError
      |    +-- RecursionError
      +-- SyntaxError
      |    +-- IndentationError
      |         +-- TabError
      +-- SystemError
      +-- TypeError
      +-- ValueError
      |    +-- UnicodeError
      |         +-- UnicodeDecodeError
      |         +-- UnicodeEncodeError
      |         +-- UnicodeTranslateError
      +-- Warning
           +-- DeprecationWarning
           +-- PendingDeprecationWarning
           +-- RuntimeWarning
           +-- SyntaxWarning
           +-- UserWarning
           +-- FutureWarning
           +-- ImportWarning
           +-- UnicodeWarning
           +-- BytesWarning
           +-- ResourceWarning

给程序传参

$ vi argv-demo.py
import sys
print("Hello argv:",sys.argv)

$ python3 argv-demo.py abc 123 456
Hello argv: ['argv-demo.py', 'abc', '123', '456']

测试

  1. 单元测试 unittest

    • 导入包:import unittest
    • 编写测试类(继承自unittest.TestCase):class Xxx(unittest.TestCase)
    • 编写测试函数(以test开头): def testXxx()
    • 添加测试切面函数(自动在每一个测试方法前后执行):setUp(),tearDown()
    • 运行:
      • method1: 添加:if __name__ == '__main__':unittest.main(),执行:python unittest-demo.py
      • method2: 直接执行python -m unittest unittest-demo.py
  2. 文档测试 doctest

    • 导入包:import doctest
    • doctest 模块可以直接提取py开始的注释和函数开始的注释'''...'''
    • 运行:if __name__=='__main__': doctest.testmod()

Sample1: 单元测试 unittest

import unittest

class MyTest(unittest.TestCase):
    def setUp(self):
        print('setUp...')

    def tearDown(self):
        print('tearDown...')

    def test_dict(self):
        print('test 1')
        d={'a':1,'b':2}
        self.assertEqual(d['a'],1)
        self.assertEqual(d['b'],2)
        self.assertTrue(isinstance(d, dict))

    def testdict2(self):
        print('test 2')
        d={'a':1,'b':2}
        with self.assertRaises(KeyError):   # 期待抛出指定类型的Error
            value = d['c']

if __name__ == '__main__':
    unittest.main()
$ python unittest-demo.py
setUp...
test 1
tearDown...
.setUp...
test 2
tearDown...
.
----------------------------------------------------------------------
Ran 2 tests in 0.000s

OK

Sample2:文档测试 doctest

'''
>>> hello([4,5,6])
hello: [4,5,6]
'''
def hello(x):
    '''
    doc test demo -- helo

    >>> hello('Tom')
    hello: Tom
    >>> hello(1/0)
    Traceback (most recent call last):
    ...
    ZeroDivisionError: division by zero
    >>> hello([1,2,3])
    hello: [1,2,3]
    '''
    print('hello:',x)


if __name__=='__main__':
    import doctest
    doctest.testmod()
$ python doctest-demo.py
**********************************************************************
File "doctest-demo.py", line 2, in __main__
Failed example:
    hello([4,5,6])
Expected:
    hello: [4,5,6]
Got:
    hello: [4, 5, 6]
**********************************************************************
File "doctest-demo.py", line 15, in __main__.hello
Failed example:
    hello([1,2,3])
Expected:
    hello: [1,2,3]
Got:
    hello: [1, 2, 3]
**********************************************************************
2 items had failures:
   1 of   1 in __main__
   1 of   3 in __main__.hello
***Test Failed*** 2 failures.

基础数据类型

不可变对象 & 可变对象

  1. 不可变对象:即无法修改这个对象的值,每次对变量的修改,实际上是创建一个新的对象 (=> 可做key)

    • 整数 int eg: a=123
    • 浮点数 float eg: a=123.0
    • 布尔 bool eg: a=True,a=False,a=bool(1),bool('')
    • 字符串 str eg: a="123abc"
    • 元祖 tuple eg: a=(1,23.4,'ef')
    • NoneType eg: a=None
  2. 可变对象(=> 不可做key)

    • 列表 list eg: a=[1,23.4,'ef']
    • 字典 dict eg: a={'a':1,1:'d',(1,2):'123'}
    • 集合 set eg: a={1,2,'ef',12.4}
  3. 对象比较

    • id(obj) 查看对象的唯一标识
    • == 比较两个对象内容是否相等
    • is 比较两个对象是否相等(是否是同一块地址空间)

Sample:

# 1. 不可变对象:int
>>> a=555
>>> b=a

>>> id(a),id(b)                # a和b相等,指向同一个地址空间
(4566055600,4566055600)

>>> b=789                     # b指向新开辟的另一个地址空间,a与b不同了
>>> id(a),id(b)
(4324742368,4566055696)

# 2. 可变对象:list
>>> a=[1,2,3]
>>> b=a

>>> id(a),id(b)             # a和b相等,指向同一个地址空间
(4333307784,4333307784)

>>> b.append(4)             # b和a依旧相等,指向同一块地址空间
>>> id(a),id(b)
(4333307784,4333307784)

>>> import copy
>>> c=copy.deepcopy(a)         # 拷贝内容(开辟了一块新空间存储被拷贝对象的所有内容),相当于执行了c=[1,2,3]
>>> id(a),id(c)
(4333307784,4397232200)
>>> a==c,a is c             # a与c内容相同,但指向不同地址空间
(True,False)

垃圾回收机制

  1. intern机制:Python为了优化速度,对一些对象使用了对象池,避免为频繁申请和销毁内存空间

     # 小整数:共用对象
     >>> a=123
     >>> b=123
     >>> id(a),id(b),id(123)             # 相同
     (4557612256, 4557612256, 4557612256)
    
     # 大整数:不共用对象
     >>> a=789
     >>> b=789
     >>> id(a),id(b),id(789)             # 不相同
     (4566055600, 4566055760, 4561935984)
    
     # 一个字符:共用对象
     >>> a='A'
     >>> b='A'
     >>> id(a),id(b),id('A')              # 相同
     (4562778576, 4562778576,4562778576)
    
     # 一个单词:共用对象
     >>> a="Hello"
     >>> b="Hello"
     >>> id(a),id(b),id("Hello")         # 相同
     (4566204968, 4566204968,4566204968)
    
     # 非单词字符串:不共用对象
     >>> a="Hello World"
     >>> b="Hello World"
     >>> id(a),id(b),id("Hello World")   # 不相同
     (4564866736, 4565787824, 4565787888)
    
    • 小整数[-5,257): 共用对象,常驻内存
    • 单个字符: 共用对象,常驻内存
    • 单个单词: 共用对象,不常驻内存,引用计数为0时销毁
    • 注:
      • 非单个但单词的字符串(如含有空格等),不会使用intern机制(即不共用对象),引用计数为0则销毁
      • 数值类型和字符串类型在Python中都是不可变类型,即无法修改这个对象的值,每次对变量的修改,实际上是创建一个新的对象
  2. 引用计数

     # 查看一个对象的引用计数
     >>> import sys
     >>> a="Hello World"
     >>> sys.getrefcount(a)  # 比正常计数大1,因为调用函数的时候传入a,这会让a的引用计数+1
     2
     >>> b=a
     >>> sys.getrefcount(a)
     3 
     >>> del b
     >>> sys.getrefcount(a)
     2
    
    • 一旦引用计数为0则销毁以释放内存,具有实时性,处理回收内存的时间分摊到了平时(不用像其他机制等到特定时机)
    • 循环引用所占用的内存永远无法被回收
    • 维护消耗资源:占空间且处理相对较慢
  3. 标记-清除(判别出垃圾清除)

    • 标记出有指针引用的对象,剩下未被标记的对象则为垃圾,可进行清除
  4. 分代收集 Generational GC(可处理循环引用释放问题)

    • 使用链表来持续追踪活跃对象
    • 内部使用多代(个)链表:Generation Zero & Generation One & Generation Two(新创建的对象放入Generation Zero链表)
    • 周期性地检查链表,根据规则减掉上面互相引用的对象的引用计数,清除引用计数为0的对象,剩下的活跃对象则移动到下一代链表
    • GC阈值:一旦被分配对象与被释放对象的计数值之差达到阈值,就启动检查,释放“浮动的垃圾”,活跃对象移动到下一代链表
  5. gc模块: 提供设置垃圾回收的API

    • gc.isenabled()
    • gc.disable()
    • gc.set_debug(flags) 设置gc的debug日志,一般设置为gc.DEBUG_LEAK
    • gc.collect(generation=2):
      • 0: 只检查0代的对象;
      • 1:检查0,1代对象;
      • 2:检查0,1,2代对象;
      • 返回收集的垃圾数
    • gc.get_count():
      • 返回一个长度为3的元组,代表各代自动执行垃圾回收的计数器
      • eg: (488,0,0) 488是指距离上一次0代垃圾检查,Python分配内存的数目减去释放内存的数目
    • gc.get_threshold():
      • 返回一个长度为3的元组,代表各代自动执行垃圾回收的阈值
      • 每一次计数器增加,gc模块会检查增加后的计数是否达到阀值的数目,如果是,就会执行对应的代数的垃圾检查,然后重置计数器
      • eg:(700,10,10)
    • gc.set_threshold(threshold0[, threshold1[, threshold2])
      • 设置各代阈值
    • eg: 阈值:(700,10,10), 计数器:
      • (699,3,0)增到(700,3,0)时 => gc模块执行gc.collect(0)检查0代对象的垃圾,重置计数器为(0,4,0)
      • (699,9,0)增到(700,9,0)时 => gc模块执行gc.collect(1)检查1,2代对象的垃圾,重置计数器为(0,0,1)
      • (699,9,9)增到(700,9,9)时 => gc模块执行gc.collect(2)检查0,1,2,3代对象的垃圾,重置计数器为(0,0,0)
  6. 垃圾回收触发点:

    • 调用gc.collect()
    • gc模块的计数器达到阀值时
    • 程序退出时
  7. 总结:

    • Python中使用intern机制共用一些对象以优化速度
    • 垃圾回收采用引用计数为主,标记-清除+分代收集为辅的策略
    • gc模块提供垃圾回收的API

类型转换

  1. 整数:int(x [,base ])

    • int(20.4) => 20
    • int('20.4') => Error
    • int('89') => 89
    • int("0xc",base=16) => 12
  2. 浮点数: float(x)

    • float("123"),flaot(123) => 123.0
    • float("123.5"),float(123.5) => 123.5
    • float("12e+2"),float(12e+2) => 1200.0
  3. 字符串: str(obj) 参数可为任意对象

    • str(123.5),str("123.5") => "123.5"
    • str([12,123.5,"se"]) => "[12, 123.5, 'se']"
    • str({'a':1,'b':2}) => "{'a': 1, 'b': 2}"
  4. 元组: tuple(s) 参数可以是元组,列表,字典(只取keys),集合,字符串

    • tuple((1,2,3)) => (1,2,3)
    • tuple([1,2,3]) => (1,2,3)
    • tuple({'a':1,'b':2}) => ('a', 'b')
    • tuple({1, 23.0, 'ef'}) => (1, 23.0, 'ef')
    • tuple('123') => ('1', '2', '3')
  5. 列表:list(s) 参数可为元组,字典,列表,字典(只取keys),集合,字符串

    • list([1,2,3]) => [1,2,3]
    • list((1,2,3)) => [1,2,3]
    • list({'a':1,'b':2}) => ['a', 'b']
    • list('123') => ['1','2','3']
    • list(set([1,2,3])) => [1,2,3]
  6. 集合:set(s) 参数可以是元组,列表,字典(只取keys),集合,字符串

    • set((1,2,3)) => {1,2,3}
    • set({1,2,3}) => {1,2,3}
    • set([1,2,2,3,1,2]) => {1,2,3}
    • set({'a':1,'b':2}) => {'a', 'b'}
    • set('123') => {'2', '3', '1'}
  7. 其他:

    • 整数转换某进制(字符串表示)
      • hex(x) : 把一个整数转换为十六进制字符串 eg: hex(12) => '0xc'
      • oct(x) : 把一个整数转换为八进制字符串 eg: oct(12) => '0o14'
    • 执行一个字符串表达式,返回计算的结果: eval(str)
      • eval("12+23") => 35
    • 利用enumerate将list中的item转换为tuple
        >>> seasons = ['Spring', 'Summer', 'Fall', 'Winter']
        >>> list(enumerate(seasons))
        [(0, 'Spring'), (1, 'Summer'), (2, 'Fall'), (3, 'Winter')]
      

字符编码

  1. ASCII & Unicode & UTF-8编码

    • ASCII : 1个字节
    • Unicode : 2个字节(偏僻的字符要4个字节)
    • UTF-8 : 可变长编码,能节省空间(把一个Unicode字符根据不同的数字大小编码成1-6个字节,英文字母1个字节,汉字通常是3个字节,偏僻的字符要4~6个字节)
    • eg:

      字符 ASCII Unicode UTF-8
      A 01000001 00000000 01000001 01000001
      / 01001110 00101101 11100100 10111000 10101101
  2. 工作方式:

    • 在计算机内存中,统一使用Unicode编码,当需要保存到硬盘或者需要传输的时候,就转换为UTF-8编码
    • python中str在内存中以Unicode表示,一个字符对应若干个字节,传输或者保存到磁盘,则变为以字节为单位的bytes
  3. 获取字符编码的整数表示ord(x)

    • ord('a') => 97
    • ord('A') => 65
    • ord('你') => 20320
    • ord('中') => 20013
  4. 把编码转换为对应的字符chr(x)

    • chr(65) => 'A'
    • chr(20013) => '中'
  5. encode/decode

    • encode: str -> bytes
    • decode: str <- bytes

Sample:

# 1. encode/decode
a='abc'
b=a.encode()        
print(b)            # b'abc'
print(b.decode())    # abc

a='你好'
b=a.encode()
print(b)            # b'\xe4\xbd\xa0\xe5\xa5\xbd'
print(b.decode())    # 你好

# 2. len
#     - len(str)         返回字符数
#     - len(bytes)     返回字节数
a='abc'
print(len(a))    # 3

a=b'abc'
print(len(a))    # 3

a='你好'
print(len(a))                    # 2
print(a.encode('utf-8'))         # b'\xe4\xbd\xa0\xe5\xa5\xbd'
print(len(a.encode('utf-8')))    # 6

整数:进制转换/位运算

  1. 进制

    • 十进制(逢十进一): 0,1,2,3,4,5,6,7,8,9,10,11,12,...
    • 二进制(逢二进一): 0,1,10,11,100,...
    • 八进制(逢八进一): 0,1,2,3,4,5,6,7,10,11,12,...
    • 十六进制(逢十六进一): 0,1,2,...8,9,10,a,b,c,d,e,f,10,11,12,...
  2. 计算机存储:

    • 以二进制的补码形式存储,人们看到的数字是原码转化来的,而原码是通过补码得到的
    • 即:补码(机器存储) -> 原码 -> 转换对应进制 -> 最后人们看到的数
    • 符号位:第一位为符号位,表正数或负数
    • 原码:用来转换对应进制
    • 反码:符号位不变,其他位数取反
    • 补码: 用来做数据的存储运算(运算包括符号位). 补码提出的根源是让计算机底层用加法实现减法操作,即减去一个数=加上一个负数
    • 进制转换时,需要先把内存中存储的补码拿出来变成原码,再进行转换输出
    • 原码,补码转换规则:

        正数: 5
        补码存储        -> 原码(=补码)       -> 人们看到的数(一般为十进制)
        0,000,0101      0,000,0101          5
      
        负数:-5
        补码存储        -> 原码(=补码取反+1) -> 人们看到的数(一般为十进制)
        1,111,1011       1,000,0101         -5
      
      • 正数:原码 <=> 补码 (原码=反码=补码,即:无需转换)
      • 负数:原码 <=> 取反+1 <=> 补码
    • 运算:用补码运算

        负数:-5
        补码存储        -> 原码(=补码取反+1) -> 人们看到的数(一般为十进制)
        1,111,1011       1,000,0101         -5
      
        运算:
        -5 << 1:
        1,111,0110       1,000,1010         -10
      
        -5 >> 1
        1,111,1011       1,000,00101        -3
      
        -5 + 1
        1,111,1100       1,000,0100         -4
      
      • +,-: 用加法加上正负数, eg: 1-1 = 1+(-1) = (1的补码) + (-1的补码) = 0
      • *,/: 按位左右移<<,>>, eg: 5*2 = 0101<<1 = 1010 = 10 , 5*3=5*2+5=(0101<<1)+(0101)=1111=15
    • 注意:
      • 运算时用补码,包括符号位
      • 原补码转换(取反)时,不包括符号位
      • 进制间转换时用原码,不包括符号位
  3. 进制转换

     # 1. 十进制 (type:int) => `bin(num)`/`oct(num)`/`hex(num)` => 二进制(0b),八进制(0o),十六进制(0x) (type:str)
     >>> bin(18)
     '0b10010'
     >>> oct(18)
     '0o22'
     >>> hex(18)
     '0x12'
    
     # 2. 二进制(0b),八进制(0o),十六进制(0x) (type: str) => `int(strNum,base=10)` => 十进制 (type: int)
     >>> int('0b10010',2)
     >>> int('0o22',8)
     >>> int('0x12',16)
    
     # 3. 注:bin(x) 返回的是简写版`符号+原码`
     >>> bin(5)
     '0b101'
     >>> bin(-5)
     '-0b101'
    
  4. 位运算: &,|,^,~,<<,>> 直接操作二进制,按位运算,省内存,效率高

     # 1. << : 左移 = x*(2^n)
     >>> 0b0101<<1
     10
     >>> 5<<1        # = 5*2^1
     10
    
     >>> 0b0101<<2
     20
     >>> 5<<2        # = 5*2^2
     20
    
     # 2. >> : 右移 = x//(2^n)
     >>> 5>>1        # = 5//(2^1)
     2
     >>> 5>>2        # = 5//(2^2)
     1
     >>> -5>>1       # = -5//2
     -3
    
     # 3. &:按位与(都为1则为1)
     >>> 0b0101 & 0b1111
     5
     >>> 5&15
     5
    
     # 4. |:按位或(有1则为1)
     >>> 0b0101 | 0b1111
     15
     >>> 5|15
     15
    
     # 5. ^:按位异或(不同则为1)
     >>> 0b0101 ^ 0b1111     # => 0b1010
     10
     >>> 5^15
     10
    
     # 6. ~:按位翻转,结果为:`-(x+1)` (注:包括符号位翻转,所以不是反码)
     # eg:
     # 5 补码(正数,与原码相同):       0000,0101
     # 运算~5(按位全部取反):          1111,1010
     # 转换为原码(除符号位,取反+1):    1000,0110   => -6
     >>> ~0b0101
     -6
     >>> ~5
     -6
    
     # 5取反+1=-5; -5取反+1=5
     >>> ~5+1
     -5
     >>> ~5
     -6
     >>> ~-5+1
     5
     >>> bin(5)
     '0b101'
     >>> bin(~5)
     '-0b110'
     >>> bin(-5)
     '-0b101'
    

str,tuple,list,set,dict

常用方法总结:

    • str/tuple: +,*n -- 不可改,生成新的
    • list: +[...],*n,.append(x),.extend(iterable),.insert(index,x)
    • set: .add(key),.update(iterable)
    • dict: xxx[key]=value,.update({key:value,...}),.setdefault(key,default=None)
    • str/tuple: 不可改
    • list: del xxx[index],remove(item),pop(index=-1)
    • set: .remove(key),.discard(key),pop()
    • dict: del xxx[key],.pop(key),.popitem()
    • str/tuple: /
    • list: xxx[index]=value
    • set: /
    • dict: xxx[key]=value,.update({key:value,...}),.setdefault(key,default=None)
    • str/tuple: [index],[start:end:step],.index(x,start,end),.count(x)
    • list: [index],[start:end:step],.index(x,start,end),.count(x)
    • set: /
    • dict: xxx[key],.get(key,default=None),keys(),values(),items()
  • 公共方法
    • in/not in 存在与否
    • .len(x),.max(x),.min(item)
    • for item in s,for index,item in enumerate(s) 遍历 (对于dict,会遍历keys)

str

"....":

  • find/rfind/index/rindex/count(str, start=0, end=len(mystr)) 注:index方法若是找不到会抛出一个异常
  • replace(str1,str2,count=-1)
  • split(sep=None, maxsplit=-1),splitlines(),join(iterable)
  • capitalize/title() 字符串的第一个/每个单词首字母大写
  • lower/upper() 转换为小写/大写
  • startswith/endswith(prefix[, start[, end]]) 返回 True/False
  • ljust/rjust/center(width, fillchar=' ') 原字符串左/右/居中对齐,并使用空格填充至长度 width 的新字符串
  • lstrip/rstrip/strip(chars=None) 去除字符串左/右/两边的空白字符
  • rpartition/partition(seq) 从右/左边开始以seq分割成三部分,seq前,seq和seq后
  • isalpha/isdigit/isalnum/isspace 返回True/False

Sample:

>>> a="Hello Tom! How are you? Are you ok?"

# find/index/count
>>> a.find('you')
19
>>> a.index('you')
19
>>> a.count('you')
2

# title,lower
>>> a.title()
'Hello Tom! How Are You? Are You Ok?'
>>> a.lower()
'hello tom! how are you? are you ok?'
>>> a.strip('?')
'Hello Tom! How are you? Are you ok'

# split/partition
>>> a.split('you')
['Hello Tom! How are ', '? Are ', ' ok?']
>>> a.partition('you')
('Hello Tom! How are ', 'you', '? Are you ok?')

# startswith
>>> a.startswith("Hello")
True
>>> a.startswith("You")
False

# isalpha
>>> a.isa
a.isalnum(  a.isalpha(  a.isascii(
>>> a.isalpha()
False

# 下标索引
>>> a[0]
'H'
# 切片
>>> a[0:3]
'Hel'

# in,not in
>>> a="123"
>>> '1' in a
True
>>> 'b' in a
False

# index,count
>>> a.index('2')
1
>>> a.count('2')
1

tuple

(item1,item2,...):

  • 元素可以是不同类型的,可重复值,可嵌套 ( 与列表类似,不同之处在于元组创建后就不能修改 )
  • 访问元素:xxx[index],index从0开始
  • 修改元素:不能修改元素,会抛出异常
  • 查找元素:index(item,start=0, stop=9223372036854775807) 返回index,左闭右开 [start,end),找不到抛出异常
  • 查找某元素个数: count(item)
  • 可由元组,列表,字典(只取keys),集合,字符串创建元组: tuple(s)
  • 注:
    • tuple所谓的“不变”是指tuple的每个元素指向不变
    • 只有1个元素的tuple定义时须加一个逗号,,来消除歧义,eg:a=(1,)

Sample:

# tuple:元素可以是不同类型的,可重复值
>>> a=(123,12.4,'Hello',('a','b'),[1,2,3],'Hello')

# len
>>> len(a)
6

# 访问元素:xxx[index],index从0开始
>>> a[0]
123

# 修改元素:不能修改元素,会抛出异常
>>> a[1]='abc'
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: 'tuple' object does not support item assignment

# 注:tuple所谓的“不变”是指tuple的每个元素指向不变
>>> a[4]
[1, 2, 3]
>>> a[4][1]="a"        # 变的不是tuple的元素,而是list的元素
>>> a
(123, 12.4, 'Hello', ('a', 'b'), [1, 'a', 3], 'Hello')

# 查找元素:index(item,start,end) 返回index,左闭右开 [start,end),找不到抛出异常
>>> a.index('Hello',2,6)
2
>>> a.index('Hello',3,6)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
ValueError: tuple.index(x): x not in tuple

# 查找元素个数: count(item)
>>> a.count('Hello')
2
>>> a.count([1,2,3])
1

# tuple(s): 由元组,列表,字典(只取keys),集合,字符串创建元组
>>> tuple((1,2,3))
(1,2,3)
>>> tuple([1,2,3])
(1,2,3)
>>> tuple({'a':1,'b':2})
('a', 'b')
>>> tuple({1, 23.0, 'ef'})
(1, 23.0, 'ef')
>>> tuple('123')
('1', '2', '3')

# 注:只有1个元素的tuple定义时必须加一个逗号,,来消除歧义
>>> a=(1)
>>> type(a)
<class 'int'>
>>> a=(1,)
>>> type(a)
<class 'tuple'>
>>>

list

[item1,item2,...]:

  • 元素可以是不同类型的,可重复值,可嵌套 ( 与元组类似,不同之处在于元组是不可变的,不能修改 )
  • 访问/修改元素:
    • xxx[index]
    • xxx[index]=value
    • 注:index从0开始
  • 查找:
    • index(item,start=0, stop=9223372036854775807) 返回index,左闭右开 [start,end),找不到抛出异常
    • 查找某元素个数: count(item)
    • 是否存在某元素:item in xxx, item not in xxx
      • 添加元素:
      • append(item)
      • extend([item1,item2,...])
      • insert(index,item)
      • 删除元素:
      • del xxx[index]
      • pop(index=-1)
      • remove(item) 不能存在要移除元素,则会抛出异常
      • 排序:
      • sort(reverse=False)
      • reverse()
  • 可由元组,列表,字典(只取keys),集合,字符串创建list: list(s)

Sample:

# 
a=[123,12.4,'Hello',('a','b'),[1,2,3],'Hello']

# len
>>> len(a)
6

# 访问元素: xxx[index] index从0开始
>>> a[0]
123
>>> a[2]
'Hello'
>>> a[2][0]
'H'
>>> a[3]
('a', 'b')
>>> a[3][0]
'a'
>>> a[4]
[1, 2, 3]
>>> a[4][1]
2

# 修改元素: xxx[index]=value
>>> a[2]="Tom"
>>> a
[123, 12.4, 'Tom', ('a', 'b'), [1, 2, 3], 'Hello']

# 查找元素:index,count
>>> a.index('Tom')
2
>>> a.count('Hello')
1

# 是否存在某元素:in,not in
>>> 'Tom' in a
True
>>> 2 in a
False
>>> ('a','b') in a
True
>>> 'Hi' not in a
True

# 添加元素: append(x),extend([...]),insert(index,x)
a = [5, 4, 2, 3]
>>> a.append(1)            # [5, 4, 2, 3, 1]
>>> a.extend([2,8])        # [5, 4, 2, 3, 1, 2, 8]
>>> a.insert(0,7)        # [7, 5, 4, 2, 3, 1, 2, 8]
>>> a.insert(-1,'Tom')    # [7, 5, 4, 2, 3, 1, 2, 'Tom', 8]

# 删除元素: del xxxx[index] , pop(index=-1), remove(item)
>>> a=[]
>>> del a[0]
>>> a
[5, 4, 2, 3, 1, 2, 'Tom', 8]

>>> a.pop()
8
>>> a
[5, 4, 2, 3, 1, 2, 'Tom']
>>> a.pop(4)
1
>>> a
[5, 4, 2, 3, 2, 'Tom']

>>> a.remove('Tom')
>>> a
[5, 4, 2, 3, 2]
>>> remove(1)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
NameError: name 'remove' is not defined

# 排序: sort reverse
>>> a.sort()
>>> a
[2, 2, 3, 4, 5]
>>> a.sort(reverse=True)
>>> a
[5, 4, 3, 2, 2]

>>> a.reverse()
>>> a
[2, 2, 3, 4, 5]

# list(s): 可由元组,列表,字典(只取keys),集合,字符串创建
>>> list([1,2,3])
[1,2,3]
>>> list((1,2,3))
[1,2,3]
>>> list({'a':1,'b':2})
['a', 'b']
>>> list('123')
['1','2','3']
>>> list(set([1,2,3]))
[1,2,3]

set

{key1,key2,...}:

  • 无不可重复元素,可以是不同类型的,可嵌套
  • 注:不可以放入可变对象,因为无法判断两个可变对象是否相等,也就无法保证set内部“不会有重复元素”
  • 添加元素:
    • add(key)
    • update(iterable)
  • 删除元素:
    • remove(key) 找不到会报错
    • discard(key) 找不到不会报错
    • pop()
  • 判断是否存在某元素:
    • item in s1
    • item not in s1
  • 交,并,差,补集:
    • 交集: s1&s2,s1.intersection(s2),s1.intersection_update(s2)--取交集并更新自己
    • 并集: s1|s2,s1.union(s2)
    • 差集: s1-s2,s1.differenc(s2),s1.difference_update(s2) (注:s1-s2!=s2-s1)
    • 补集: s1^s2,s1.symmetric_difference(s2), s1.symmetric_difference_update(s2)
  • 关系判断:相交不相交, 包含不包含
    • 是否不相交 s1.isdisjoint(s2)
    • 是否包含某集合 s1.issuperset(s2),s1>=s2
    • 是否被某集合包含 s1.issubset(s2),s1<=s2
  • 不变集合: frozenset(s)

Sample:

>>> s1={1,5,9,3,9}
>>> len(s1)
4
>>> s1
{1, 3, 5, 9}

# 注:不可以放入可变对象,因为无法判断两个可变对象是否相等,也就无法保证set内部“不会有重复元素”
>>> s2={1,2,(1,2)}
>>> s2
{(1, 2), 1, 2}

>>> s2={1,2,[1,2]}
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: unhashable type: 'list'

# 添加元素 add(key),update(iterable)
>>> s1.add(2)
>>> s1
{1, 2, 3, 5, 9}
>>> s1.add('Tom')
>>> s1
{1, 2, 3, 5, 9, 'Tom'} 
>>> s1.add(9)
>>> s1
{1, 2, 3, 5, 9}

>>> s1.update({7,10})
>>> s1
{1, 2, 3, 5, 7, 9, 10}
>>> s1.update([1,4])
>>> s1
{1, 2, 3, 4, 5, 7, 9, 10}
>>> s1.update('ef')
>>> s1
{1, 2, 3, 4, 5, 7, 9, 10, 'f', 'e'}
>>> s1.update(9)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: 'int' object is not iterable

# 删除元素 remove(key),discard(key)--不会报错,pop()
>>> s1.remove('Tom')
>>> s1
{1, 2, 3, 5, 9}
>>> s1.remove('a')
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
KeyError: 'a' 

>>> s1.discard('a')

>>> s1.pop()
1
>>> s1
{2, 3, 5, 9, 'Tom'}

# 判断是否存在某元素: in,not in
>>> s1
{2, 3, 5, 9, 'Tom'}
>>> 5 in s1
True
>>> 7 not in s1
True
>>> 1 in s1
False

# 交,并,差,补集
>>> s1={1,5,6,9}
>>> s2={3,5,9,10}

>>> s1&s2     # 交集 s1.intersection(s2), s1.intersection_update(s2) -- 取交集并更新自己
{9, 5}

>>> s1|s2      # 并集 s1.union(s2)
{1, 3, 5, 6, 9, 10}

>>> s1-s2     # 差集 differenc,difference_update
{1, 6}
>>> s2-s1
{10, 3}

>>> s1^s2    # 补集 symmetric_difference, symmetric_difference_update
{1, 3, 6, 10}

# 关系判断:相交不相交,包含不包含
>>> s1.isdisjoint(s2)     # 是否不相交
False
>>> {1,3}.isdisjoint({2,5})
True

>>> s1.issuperset(s2)  # 是否包含某集合(a>=b)
False
>>> {1,3,5}.issuperset({3,5})
True 
>>> {1,3,5} >= {3,5}

>>> s1.issubset(s2)    # 是否被包含 (a<=b)
False
>>> {3,5}.issubset({1,3,5})
True
>>> {3,5}<={1,3,5}
True

# frozenset 不变集合
>>> a=frozenset({1,3,5})
>>> a
frozenset({1, 3, 5})
>>> a.add(4)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
AttributeError: 'frozenset' object has no attribute 'add'

# set(s): 可由元组,列表,字典(只取keys),集合,字符串创建
>>> set((1,2,3))
{1,2,3}

>>> set({1,2,3})
{1,2,3}

>>> set([1,2,2,3,1,2])
{1,2,3}

>>> set({'a':1,'b':2})
{'a', 'b'}

>>> set('123')
{'2', '3', '1'}

dict

{key1:value1,key2:value2,...}:

  • key:value 键值对,key需为不可改变类型,如int,float,str,tuple
  • 访问元素:
    • xxx[key]
    • .get(key,default=None)
  • 更新/添加元素:
    • xxx[key]=value
    • .update({key:value,...})
    • .setdefault(key,default=None)
  • 删除元素:
    • del xxx[key]
    • .pop(key)
    • .popitem()
    • .clear()
  • 获取对象
    • keys() 返回一个包含所有KEY的列表
    • values() 返回一个包含所有value的列表
    • items() 返回一个元组列表 [(key,value),...]
  • 判断是非存在某key:
    • key in d
    • key not in d

Sample:

>>> d={'a':1,'b':2}
>>> len(d)
2

# 访问元素: xxxx[key],xxx.get(key,default=None)
>>> d['a']
1
>>> d['b']
2
>>> d.get('a')
1
>>> d.get('c','N/A')
'N/A'

# 更新/插入元素: xxx[key]=value,.update({...}),.setdefault(key,default=None)
>>> d['a']='123'
>>> d['c']=789
>>> d['e']=(1,2)
>>> d
{'a': '123', 'b': 2, 'c': 789, 'e': (1, 2)}

>>> d.update({'b':'Hello','f':'Tom'})
>>> d
{'a': '123', 'b': 'Hello', 'c': 789, 'e': (1, 2), 'f': 'Tom'}

>>> d.setdefault('a',99)
'123'
>>> d
{'a': '123', 'b': 'Hello', 'c': 789, 'e': (1, 2), 'f': 'Tom'}
>>> d.setdefault('g',99)
99
>>> d
{'a': '123', 'b': 'Hello', 'c': 789, 'e': (1, 2), 'f': 'Tom', 'g': 99}

# 删除元素: del xxx[key],pop(key),popitem(),clear()
>>> del d['e']
>>> d
{'a': '123', 'b': 'Hello', 'c': 789, 'f': 'Tom', 'g': 99}
>>> d.pop('a')
'123'
>>> d
{'b': 'Hello', 'c': 789, 'f': 'Tom', 'g': 99}

>>> d.popitem()
('g', 99)
>>> d
{'b': 'Hello', 'c': 789, 'f': 'Tom'}

# keys
>>> d.keys()
dict_keys(['a', 'b', 'c'])
# values
dict_values(['123', 2, 789])
# items
>>> d.items()
dict_items([('a', '123'), ('b', 2), ('c', 789)])

# 判断key是否存在:key in d, key not in d
>>> 'c' in d
True
>>> 'g' in d
False
>>> 'a' not in d
True
>>> d.setdefault('k')
>>> d
{'b': 'Hello', 'c': 789, 'f': 'Tom', 'k': None}
>>> 'k' in d
True

函数

def test1():
    pass

def test2(step=1)
    print("step=%s" % step)

def test3()
    return [1,2,3]
>>> test1()
>>> test2(5)
step=5
>>> test3()
[1,2,3]

作用域

  • 局部变量:
    • 在函数内部定义的变量,只能在本函数中用
    • 可使用
  • 全局变量:
    • 在函数外定义的变量
    • 使用:可以在所有的函数中使用
    • 修改:在函数中不能修改全局变量的指向
      • 可变类型的全局变量 => 其指向的数据直接可以修改
      • 不可类型的全局变量 => 使用global声明后可修改
  • 注:
    • 可使用globals()locals()列出当前作用域中所有全局/局部变量
    • 函数中同名变量,局部变量优先全局变量
    • 实际上,Python使用LEGB顺序规则来查找一个符号对应的对象
        Locals(局部) -> Enclosing function(嵌套函数)-> Globals(全局) -> Builtins(Python启动时会自动载入)
      

Sample1:查看全局/局部变量

import math
NumA=10
NumB=20
def testScope(*arg):
    a=1
    b='Hello'
    print("locals:",locals())         # print 局部变量 (注:函数参数也属于locals)
    print("globals:",globals())        # print 全局变量 (注:包括python内建模块和手动import的)

>>> testScope('Tom')
locals: {'arg': ('Tom',), 'a': 1, 'b': 'Hello'}
globals: {'__name__': '__main__', '__doc__': None, '__package__': None, '__loader__': <class '_frozen_importlib.BuiltinImporter'>, '__spec__': None, '__annotations__': {}, '__builtins__': <module 'builtins' (built-in)>, 'NumA': 10, 'NumB': 20, 'testLocals': <function testLocals at 0x10c7acf28>, 'testScope': <function testScope at 0x10c780950>, 'math': <module 'math' from '/anaconda3/lib/python3.7/lib-dynload/math.cpython-37m-darwin.so'>}

Sample2: 全局/局部变量优先级

Num=10

# 1. 使用全局变量
def testGlobalVar(step=1):
    x=Num+step                # 使用全局变量
    print("x=%s" % x)

>>> testVar(3)
x=13

# 2. 使用局部变量
def testLocalVar(step=1):
    Num=20                    # 同名变量,局部变量优先级高于全局变量
    x=Num+step
    print("x=%s,Num=%s" % (x,Num))

>>> testVar4(3)
x=23,Num=20
>>> Num
13

Sample3: 修改全局变量

# 1. 不可变类型的全局变量
Num=10                        # 不可变类型
def testVar(step=1):
    x=Num+step
    Num=x
    print("x=%s,Num=%s" % (x,Num))

>>> testVar(3)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 2, in testVar2
UnboundLocalError: local variable 'Num' referenced before assignment

def testVar(step=1):
    global Num                     # 使用global声明后可修改该全局变量
    x=Num+step
    Num=x
    print("x=%s,Num=%s" % (x,Num))

>>> testVar(3)
x=13,Num=13
>>> Num                         # 全局变量Num已修改
13

# 2. 可变类型的全局变量
A=[1,2,3]
def testVar(step=1):
    A.append(step)
    print(A)

>>> testVar(4)
[1, 2, 3, 4]
>>> A                            # 全局变量A已修改

函数参数

  • 参数
    • 引用传参: 引用传递,不是值传递
    • 不可变类型: 函数中运算不会影响到变量自身
    • 可变类型: 函数中运算可能会影响到变量自身
  • 缺省参数(默认参数)
    • 一定要位于参数列表的最后面
    • (可在可变参数*前面或后面,但一定要在关键字参数**前面,不然会有歧义报错)
  • 不定长参数:
    • 用于处理比当初声明时更多的参数
    • *变量:
      • 表示可变参数,tuple类型
        • 如: *args ,args用tuple存放未命名的变量参数
        • 可直接传入,也可先组装list或tuple,再通过*传入,如:func(*(1, 2, 3))
    • **变量:
      • 表示关键字参数,dict类型,可以限制传入的参数名,同时提供默认值
      • 如: **kwargs,kwargs用dict存放命名的参数(即key=value的参数)
      • 可直接传入,也可先组装dict,再通过**传入,如:func(**{'a': 1, 'b': 2})
      • *变量时,需在*变量后面(即最后面)

Sample1:传入不可变/可变参数

# 1. 传入不可变参数
def testArg(a,b):
    # a=a+b
    a+=b
    print("a=%s" % a)

>>> a,b=3,5
>>> testArg(a,b)
a=8
>>> a                     # a 未变
3

# 2. 传入可变参数
def testArg(a,b):
    a.append(b)
    print('a=%s' % a)

>>> a,b=[1,2,3],4
>>> testArg(a,b)
a=[1, 2, 3, 4]
>>> a                  # a 变了
[1, 2, 3, 4]

Sample2: 缺省参数(默认参数)

# 1. 单个缺省参数
def testArg(a,b=1):
    print('a=%s,b=%s,a+b=%s' % (a,b,a+b))

>>> testArg(3,5)
a=3,b=5,a+b=8
>>> testArg(3)
a=3,b=1,a+b=4

# 2.多个缺省参数(位于参数列表的最后面)
def testArg(a,b=1,c=None,d='abc'):
    print('a=%s,b=%s,c=%s,d=%s' % (a,b,c,d))

>>> testArg(1,2,3,4)
a=1,b=2,c=3,d=4
>>> testArg(1,2,3)
a=1,b=2,c=3,d=abc
>>> testArg(1,2)
a=1,b=2,c=None,d=abc
>>> testArg(1)
a=1,b=1,c=None,d=abc
>>> testArg(1,d=5,c=4)         # 指定传入参数
a=1,b=1,c=4,d=5
>>> testArg(1,[2,3])        # 传入可变参数
a=1,b=[2, 3],c=None,d=abc

Sample3: 不定长参数

# 1. *arg,**kwargs
def testArg(a,b,*args,**kwargs):
    print("a=%s,b=%s,args=%s,kwargs=%s" % (a,b,args,kwargs))

>>> testArg(1,2)
a=1,b=2,args=(),kwargs={}

>>> testArg(1,2,3,4,x='001',y='002')    # 直接传入
a=1,b=2,args=(3, 4),kwargs={'x': '001', 'y': '002'}

>>> m=(7,8,9)
>>> n={'k':'005','z':'006'}

>>> testArg(1,2,m,n)
a=1,b=2,args=((7, 8, 9), {'k': '005', 'z': '006'}),kwargs={}

>>> testArg(1,2,*m,**n)                    # 组装后传入
a=1,b=2,args=(7, 8, 9),kwargs={'k': '005', 'z': '006'}

>>> testArg(1,2,3,4,*m,**n,x='001',y='002')
a=1,b=2,args=(3, 4, 7, 8, 9),kwargs={'k': '005', 'z': '006', 'x': '001', 'y': '002'}

>>> testArg(1,2,3,4,*m,**n,x='001',z='002')
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: testArg() got multiple values for keyword argument 'z'

# 2. + default arg
def testArg(a,b,c='abc',*args,**kwargs):    # 或 def testArg(a,b,*args,c='abc',**kwargs):
    print("a=%s,b=%s,c=%s,args=%s,kwargs=%s" % (a,b,c,args,kwargs))

>>> testArg(1,2)
a=1,b=2,c=abc,args=(),kwargs={}
>>> testArg(1,2,3,x='001',y='002')
a=1,b=2,c=3,args=(),kwargs={'x': '001', 'y': '002'}
>>> testArg(1,2,3,4,x='001',y='002')
a=1,b=2,c=3,args=(4,),kwargs={'x': '001', 'y': '002'}
>>> testArg(1,2,x='001',y='002',c=4)
a=1,b=2,c=4,args=(),kwargs={'x': '001', 'y': '002'}

>>> testArg(1,2,5,c=4,x='001',y='002')
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: testArg() got multiple values for argument 'c'

函数返回值

  • 无返回值
  • 返回一个变量,eg: return x
  • 返回多个变量,eg: return x,y 本质是返回了一个元组
  • 注意:变量也可以代表函数,所以也可返回函数

Sample:

# 1. 无返回值
def testReturn(a,b):
    print("a+b=%s" % (a+b))

>>> testReturn(1,3)
a+b=4

# 2. 返回一个变量
def testReturn(a,b):
    return a+b

>>> testReturn(1,3)
4
>>> testReturn([1,2],[3,4])
[1, 2, 3, 4]

# 3. 返回多个变量
def testReturn(a,b):
    return a,b,a+b

>>> testReturn(2,3)
(2, 3, 5)

>>> a,b,c=testReturn(2,3)
>>> a,b,c
(2, 3, 5)
>>> a
2
>>> b
3
>>> c
5

>>> a,b,c=testReturn([1,2],[3,4])
>>> a,b,c
([1, 2], [3, 4], [1, 2, 3, 4])

# 4. 返回函数
def testReturn(a,b):
    def test_in(c):
        return a+b+c
    return test_in

>>> f=testReturn(3,5)
>>> f
>>> f(1)
9
>>> f(2)
10

高级特性:闭包

  • 返回内部函数,内部函数使用了外部包裹函数作用域里变量(非全局变量),则称内部函数为闭包
  • 优点:
    • 可提高代码可复用性
    • 似优化了变量,原来需要类对象完成的工作,闭包也可以完成
  • 缺点:
    • 由于闭包引用了外部函数的局部变量,则外部函数的局部变量没有及时释放,消耗内存

Sample1:闭包

def test(a):
    def test_in(b):
        print("a=%s,b=%s,a+b=%s" % (a,b,a+b))
        return a+b
    return test_in

>>> f=test(3)
>>> f
<function test.<locals>.test_in at 0x107af51e0>

>>> f(5)
a=3,b=5,a+b=8
8
>>> f(8)
a=3,b=8,a+b=11
11

Sample2:返回一个计数器

def counter(start=0):
    global total
    total=start
    def incr():
        total+=1                # total相对于incr()为外部变量,且为不可变类型,修改会报错
        return total
    return incr

>>> ct1=counter()
>>> a,b,c=ct1(),ct1(),ct1()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 4, in incr
UnboundLocalError: local variable 'total' referenced before assignment

# solution1: 使用可变型变量
def counter(start=0):
    total=[start]
    def incr():
        total[0] += 1
        return total[0]
    return incr

# solution2: 使用nonlocal声明
def counter(start=0):
    total=start
    def incr():
        nonlocal total
        total+=1
        return total
    return incr

# 或者:
def counter(start=0):
    def incr():
        nonlocal start
        start+=1
        return start
    return incr


# test:
>>> ct1=counter()
>>> ct2=counter(5)
>>> a,b,c=ct1(),ct1(),ct1()
>>> a,b,c
>>> (1,2,3)
>>> x,y,z=ct2(),ct2(),ct2()
>>> x,y,z
(6, 7, 8)

高级特性: 列表生成式

List Comprehensions: [] 列表生成式即用来创建list的生成式

# 
>>> list(range(1, 11))
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

>>> [x * x for x in range(1, 11)]
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]

>>> [x * x for x in range(1, 11) if x % 2 == 0]
[4, 16, 36, 64, 100]

>>> d = {'x': 'A', 'y': 'B', 'z': 'C' }
>>> [k+'='+v for k, v in d.items()]
['y=B', 'x=A', 'z=C']

高级特性:生成器(generator)

  • 生成器: 一个不断产生值的函数,边计算边产出值
  • 生成过程: 每次迭代从上一次返回时的函数体中的位置开始,所使用的参数都是第一次所保留下的,不是新创建的(生成器“记住”了数据状态和流程中的位置)
  • 相比一次列出所有内容的优势: 节约内存,响应迅速,使用灵活
  • 创建生成器:
    • 使用包含yield语句的函数, 生成器每调用一次,会在yield位置产生一个值, 直到函数执行结束
    • 简单的生成器,可以通过将一个列表生成式的 [ ] 改成 ( )直接生成
  • 获取生成器生成的元素:
    • 通过next(g)函数获得生成器的下一个返回值
      • 没有更多的元素时,抛出 StopIteration 的异常
      • 生成器函数的返回值包含在StopIteration的value中,可通过捕获StopIteration错误获取返回值
    • 通过for in 循环
      • 没有更多的元素时,不会抛出 StopIteration 的异常
      • 拿不到生成器函数的返回值(return语句的返回值)
    • 注:生成器中元素都取出后就无用了

Sample1:

# 1. 普通函数: 一次列出所有(如果n=100M,1000M,...会很慢)
def double(n):
    return [i*2 for i in range(1,n)]

# 2. 生成器函数: 用多少生成多少,效率高(更节约内存和快捷)
def gen(n):
    for i in range(1,n):
        yield i*2

# 3. 创建生成器,下面g1与g2效果相同
g1 = gen(5)
g2 = (x*2 for x in range(1,5))

>>> g1
<generator object <genexpr> at 0x1114b1e58>
>>> g2

# 4. 获取生成器生成的元素
# Method1: 使用`next(g)`函数
>>> next(g1)
2
>>> next(g1)
4
>>> next(g1)
8
>>> next(g1)
---------------------------------------------------------------------------
StopIteration                             Traceback (most recent call last)
<ipython-input-51-e734f8aca5ac> in <module>
----> 1 next(g)

StopIteration:

# Method2: 使用`for in g`循环遍历
for i in g2:
...:     print(i," ",end="")
...:
2 4 6 8

Sample2:

# 1. 定义生成器函数
def odd():
    print('step 1')
    yield 1
    print('step 2')
    yield(3)
    print('step 3')
    yield(5)
    return "no more"

# 2. 定义遍历生成器函数
def getAll_by_forin(g):
    for i in g:
        print(i)

def getAll_by_next(g):
    while True:
        try:
            print('x=%s' % next(g))
        except StopIteration as e:
            print('Generator return value:', e.value)
            break
    print('finish!')

# test1: 使用 for in 循环
>>> g1=odd()
>>> getAll_by_forin(g1)
step 1
1
step 2
3
step 3
5

# test2: 使用 next(g) 
>>> g2=odd()
>>> getAll_by_next(g1)
tep 1
x=1
step 2
x=3
step 3
x=5
Generator return value: no more
finish!

高级特性:迭代器(Iterator)

  • 迭代器(Iterator)对象:
    • 可以被next()函数调用并不断返回下一个值的对象(直到没有数据时抛出StopIteration错误)
    • Iterator对象表示的是一个惰性计算(即只有在需要获取下一个数据时它才会计算)的序列(即数据流,可以无限大,如全体自然数)
    • 如:生成器generator对象
  • 可迭代对象(Iterable):
    • 可以直接作用于for in循环的对象
    • 如: list、tuple、dict、set、str,generator
  • 迭代器Iterator对象一定是可迭代对象Iterable(可以用for in循环),反之不一定
  • 可使用iter()函数将Iterable对象转换为Iterator对象
  • 可以使用isinstance()判断一个对象类型,如:判断是否是Iterable对象,是否是Iterator对象

Sample:

import collections

# 1. list,tuple,dict,set,str等是`Iterable`对象但不是`Iterator`对象
>>> l=[1,2,3]
>>> isinstance(l,Iterable)
True
>>> isinstance(l,Iterator)
False
>>> next(l)
next(l)
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-79-cdc8a39da60d> in <module>
----> 1 next(l)

TypeError: 'list' object is not an iterator

>>> for i in l:
...:     print(i)
...:
1
2
3

# 2. 使用`iter(x)`函数将`Iterable`对象转换为`Iterator`对象
>>> a=iter(l)
>>> isinstance(a,Iterable)
True
>>> isinstance(a,Iterator)
True
>>> next(a)
1
>>> next(a)
2

# 3. generator对象是`Iterator`对象
>>> g=(x*2 for x in range(1,5))
>>> isinstance(l,Iterable)
True
>>> isinstance(l,Iterator)
True
>>> next(g)
2
>>> next(g)
4

高级特性:装饰器(decorator)

  • 增强函数功能(例如在函数调用前后自动打印日志),但不修改函数原本定义
  • 本质上,decorator就是一个返回函数的高阶函数

Sample:

  1. 原始函数特性

     def sum(a,b):
         print("a+b=%s" % (a+b))
    
     # 查看函数名字`__name__`
     >>> dir(sum)
     ['__annotations__', '__call__', '__class__', '__closure__', '__code__', '__defaults__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__get__', '__getattribute__', '__globals__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__kwdefaults__', '__le__', '__lt__', '__module__', '__name__', '__ne__', '__new__', '__qualname__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__']
     >>> sum.__name__
     'sum'
     >>> f=sum
     >>> f.__name__
     'sum'
    
     >>> sum(3,5)
     a+b=8
     >>> f(3,5)
     a+b=8
    
  2. 定义使用一个无参decorator

     # 1. 定义decorator:log
     def log(func):
         def func_wrapper(*args, **kw):
             print('call %s(%s,%s):' % (func.__name__,args,kw))
             return func(*args, **kw)
         return func_wrapper
    
     # 2. 在原函数上加上decorator: 使用`@`
     # 把`@log`放到`sum()`函数的定义处,相当于执行了:`sum = log(sum)`
     @log
     def sum(a,b):
         print("a+b=%s" % (a+b))
    
     # 3. test:
     >>> sum(3,5)
     call sum((3, 5),{}):
     a+b=8
     >>> sum.__name__
     'func_wrapper'
    
  3. 定义使用一个有参decorator

     # 1. 定义decorator:log (decorator本身传入参数,需返回一个decorator的高阶函数)
     def log(text):
         def log_wrapper(func):
             def func_wrapper(*args, **kw):
                 print('%s %s(%s,%s):' % (text,func.__name__,args,kw))
                 return func(*args, **kw)
             return func_wrapper
         return log_wrapper
    
     # 2. 使用decorator
     # 相当于执行了: `sum = log('execute:')(sum)`
     @log('execute:')
     def sum(a,b):
         print("a+b=%s" % (a+b))
    
     # 3. test:
     >>> sum(3,5)
     execute: sum((3, 5),{}):
     a+b=8
     >>> sum.__name__
     'func_wrapper'
    
  4. Issue: 被装饰后的函数其实已经是另外一个函数了(函数名等函数属性会发生改变),eg: 装饰后的函数__name__等不准确了

     # 需要把原始函数的`__name__`等属性复制到`func_wraper()`函数
     # 可使用functools.wraps(function)装饰器来消除这样的副作用
    
     import functools
    
     def log(text):
         def log_wrapper(func):
             @functools.wraps(func)     # 添加functools.wraps(function)装饰器
             def func_wrapper(*args, **kw):
                 print('%s %s(%s,%s):' % (text,func.__name__,args,kw))
                 return func(*args, **kw)
             return func_wrapper
         return log_wrapper
    
     @log('execute:')
     def sum(a,b):
         print("a+b=%s" % (a+b))
    
     # test:
     >>> sum(3,5)
     execute: sum((3, 5),{}):
     a+b=8
     >>> sum.__name__
     'sum'
    

匿名函数

  • lambda函数:lambda [arg1 [,arg2,.....argn]]:expression
    • 冒号前面的表示函数参数,可以无参
    • 只能有一个表达式,不用写return,返回值就是该表达式的结果
  • 应用: 函数作为参数,返回值传递

Sample:

# 1. lambda函数
>>> sum=lambda a,b:a+b         # 相当于 def sum(a,b): return a+b
>>> sum
<function <lambda> at 0x10787c158>
>>> sum(3,5)
8

# 2. 应用: 作为参数
>>> a=[3,1,5]
>>> a.sort()
>>> a
[1, 3, 5]

>>> a=[
... {'name':'Tom','age':18}
... ,{'name':'Lucy','age':22}
... ,{'name':'Andy','age':20}
... ]
>>> a.sort()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: '<' not supported between instances of 'dict' and 'dict'

>>> k=lambda item:item['name']
>>> a.sort(key=k)
>>> a
[{'name': 'Andy', 'age': 20}, {'name': 'Lucy', 'age': 22}, {'name': 'Tom', 'age': 18}]

>>> a.sort(key=lambda x:x['age'])
>>> a
[{'name': 'Tom', 'age': 18}, {'name': 'Andy', 'age': 20}, {'name': 'Lucy', 'age': 22}]

内建函数

Build-in Function

  • python解释器启动后默认加载
  • 查看:dir(__builtins__)
  • 使用:help(function)
  • 常用的内建函数

    • range(stop), range(start stop[, step]) -> integer list (默认start=0,不包括stop,使用了惰性计算)
        >>> a = range(5)
        >>> a 
        range(0, 5)
        >>> list(a)
        [0, 1, 2, 3, 4]
        >>> [x*2 for x in a]
        [0, 2, 4, 6, 8]
      
    • map(func, *iterables) 根据提供的函数对指定序列做映射(使用了惰性计算)

        >>> a=[1,2,3]
        >>> b=map(lambda x:x*x,a)
        >>> list(b)
        [1, 4, 9]
      
        >>> c=map(lambda x,y:(x,y),[1,2,3],['a,','b','c','d'])
        >>> list(c)
        [(1, 'a,'), (2, 'b'), (3, 'c')]
      
        >>> d=map(str,[1,2,3,'a'])
        >>> list(d)
        ['1', '2', '3', 'a']
      
        >>> t=map(str,(1,2,3,'a'))
        >>> tuple(t)
        ('1', '2', '3', 'a')
      
    • filter(function or None, iterable) 从一个序列中筛出符合条件的元素(使用了惰性计算,所以只有在取filter()结果的时候,才会真正筛选并每次返回下一个筛出的元素)

        >>> a=filter(lambda x:x%2,[1,2,3,4])    # 参数 function 返回1/0 => 布尔值True/False
        >>> list(a)
        [1, 3]
      
        >>> b=filter(lambda x:x%2,{1,2,3,4})
        >>> set(b)
        {1, 3}
      
    • sorted(iterable, cmp=None, key=None, reverse=False) -> new sorted object

        >>> a=sorted([1,5,4,2,9])
        >>> a
        [1, 2, 4, 5, 9]
      
        >>> a=[{'name':'Tom','age':18},{'name':'Lucy','age':22},{'name':'Andy','age':20}]
        >>> result=sorted(a,key=lambda x:x['age'])
        >>> result
        [{'name': 'Tom', 'age': 18}, {'name': 'Andy', 'age': 20}, {'name': 'Lucy', 'age': 22}]
      
  • functools 工具函数包

      >>> import functools
      >>> dir(functools)
      ['RLock', 'WRAPPER_ASSIGNMENTS', 'WRAPPER_UPDATES', '_CacheInfo', '_HashedSeq', '__all__', '__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__spec__', '_c3_merge', '_c3_mro', '_compose_mro', '_convert', '_find_impl', '_ge_from_gt', '_ge_from_le', '_ge_from_lt', '_gt_from_ge', '_gt_from_le', '_gt_from_lt', '_le_from_ge', '_le_from_gt', '_le_from_lt', '_lru_cache_wrapper', '_lt_from_ge', '_lt_from_gt', '_lt_from_le', '_make_key', 'cmp_to_key', 'get_cache_token', 'lru_cache', 'namedtuple', 'partial', 'partialmethod', 'recursive_repr', 'reduce', 'singledispatch', 'total_ordering', 'update_wrapper', 'wraps']
    
    • reduce(function, sequence[, initial]) -> value

        >>> a=functools.reduce(lambda x, y: x+y, [1,2,3,4])
        >>> a
        10
      
        >>> a=functools.reduce(lambda x, y: x+y, [1,2,3,4],10)
        >>> a
        20
      
    • partial(func, *args, **keywords) 偏函数:简化func,设置默认值 -> new function 这样调用这个新函数会更简单

        >>> int2 = functools.partial(int, base=2)
        >>> int2('1000000')            # 等同:int('1000000',base=2)
        64
      
        >>> def showarg(*args, **kw):
        ...     print("args=%s,kw=%s" % (args,kw))
        ...
        >>> f=functools.partial(showarg, 1,2,3,a=3,b='linux')
        >>> f()
        args=(1, 2, 3),kw={'a': 3, 'b': 'linux'}
        >>> f(4,5,c='hello')
        args=(1, 2, 3, 4, 5),kw={'a': 3, 'b': 'linux', 'c': 'hello'}
      
    • wraps(func)装饰器: 消除被装饰函数的函数名等函数属性发生改变的副作用

        def note(func):
            "note function"
            @functools.wraps(func)
            def func_wrapper(*args,**kw):
                "wrapper function"
                print('call %s(%s,%s):' % (func.__name__,args,kw))
                return func(*args,*kw)
            return func_wrapper
      
        @note
        def test(a,b):
            'test function'
            print('a=%s,b=%s' % (a,b))
      
        >>> test(3,5)
        call test((3, 5),{}):
        a=3,b=5
        >>> test.__name__
        'test'
        >>> test.__doc__
        'test function'
      

类与实例

class Cat:            # 定义类
    pass

>>> c=Cat()            # 创建实例

# 1. 类对象
>>> Cat             
__main__.Cat
>>> type(Cat)
type

# 2. 实例对象
>>> c                 
<__main__.Cat at 0x104f875f8>
>>> type(c)
__main__.Cat
  1. 类对象与实例对象

    • 类对象(class) : 模版
    • 实例对象(instance) : 根据类创建的对象
    • 获取对象信息:
      • type(x) : 获取对象类型
      • isinstance(x,class_or_tuple) : 判断class的类型
      • dir(x) : 查看一个对象的所有属性和方法(包括私有)
  2. 特性:

    • 类的三大特性:封装,继承,多态 (注:python类不仅支持单继承,还支持多继承)
    • python是动态语言,可动态给类/对象增删改属性和方法
      • 增/改:类/实例对象.xxx=...
      • 删除:del 类/实例对象.xxx,delattr(类/实例对象,name)
  3. 访问限制

    • 公有: 默认
    • 保护:以_开头,自己内部和子类可访问,不能用from module import *导入(即不可跨包)
    • 私有: 以__开头,只有自己内部可以访问,外部(包括子类,实例对象等)不能访问
    • 注:
      • 一般将属性私有化,以防止外部随意访问修改
      • 可通过编写getXxx/setXxx方法或@property控制对外开放接口
  4. 访问

    • 属性/方法: 类.xxx,实例.xxx
    • 方法调用:类.xxx(...),实例.xxx(...)
    • python也提供了内置函数判断获取对象属性或方法(无法判断或调用私有属性/方法)
      • getattr(obj,name)
      • setattr(obj,name,value)
      • hasattr(obj,name)

类成员:属性

class Cat:
    name='cls_cat'                # 类属性
    def __init__(self):
        self.age=1                # 实例属性

c=Cat()

>>> Cat.name,c.name
('cls_cat', 'cls_cat')

>>> c.name='self_cat'            # 添加同名实例属性
>>> Cat.name,c.name             # 实例属性会屏蔽掉同名的类属性
('cls_cat', 'self_cat')

>>> del c.name                     # 删除实例属性
>>> Cat.name,c.name
('cls_cat', 'cls_cat')

>>> c.age
1
  • 类属性:
    • 类所有,所有实例共享一个属性,在内存中只存在一个副本
    • 在类外访问:可通过类和实例对象访问公有类属性 类名.xxx,实例名.xxx
    • 在类外添加:类名.xxx=...
  • 实例属性:
    • 实例所有,各个实例各自维护,互不影响
    • 在类外访问:通过实例对象访问 实例名.xxx
    • 在类外添加:实例名.xxx=...
  • 注: 使用实例名.xxx访问属性时,先找实例属性,没有再找类属性,所以:
    • 类属性和实例属性同名,用实例名.xxx访问时,实例属性会屏蔽掉同名的类属性
    • 在类外修改类属性,必须通过类对象(如果通过实例对象,会产生一个同名的实例属性)
  • 高级
    • 属性私有化: 一般将属性私有化,以防止外部随意访问修改
      • 法一:编写getXxx/setXxx方法
      • 法二:使用@property标示为属性函数(即将一个方法变成属性调用)
    • 限制动态添加实例属性:
      • 定义class的时候,定义一个特殊的__slots__变量,来限制该class实例能添加的属性
      • tuple定义允许绑定的属性名称: __slots__=(...)
      • 注: __slots__仅对当前类实例起作用,对继承的子类无用,除非在子类中也定义,这样子类实例允许定义的属性就是自身的__slots__加上父类的__slots__

Sample1: 属性私有化

  • 法一:编写getXxx/setXxx方法开放端口

      class Cat:
          def __init__(self):
              self.__age=0
    
          def getAge(self):
              return self.__age
    
          def setAge(self,value):
              if not isinstance(value, int):
                  raise ValueError('age must be an integer!')
              if value < 0 or value > 100:
                  raise ValueError('age must between 0 ~ 100!')
              self.__age = value
    
      c=Cat()
    
      >>> c.age
      --------------------------------------------------------------------------
      AttributeError                            Traceback (most recent call last)
      <ipython-input-257-4dd7fde137b3> in <module>
      ----> 1 c.age
    
      AttributeError: 'Cat' object has no attribute 'age'
    
      >>> c.setAge(3)
      >>> c.getAge()
      3
    
  • 法二:使用@property标示为属性函数

      class Cat:
          def __init__(self):
              self.__age=0
    
          @property                     # getter
          def age(self):
              return self.__age
    
          @age.setter                 # setter
          def age(self,value):
              if not isinstance(value, int):
                  raise ValueError('age must be an integer!')
              if value < 0 or value > 100:
                  raise ValueError('age must between 0 ~ 100!')
              self.__age = value
    
      c=Cat()
    
      >>> c.age=5
      >>> c.age
      5
    

Sample2:限制动态添加实例属性

class Cat:
    __slots__=('name','age')     # 用tuple定义允许绑定的属性名称

class ColorfulCat(Cat):
    pass

class PureCat(Cat):
    __slots__=('color')

# 1. test 父:Cat
>>> c=Cat()                      # 父类实例对象 -- 只允许属性:name,age
>>> c.name='Tom'
>>> c.age=3
>>> c.country='China'
c.country="China"
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-269-64570eaae7be> in <module>
----> 1 c.country="China"

AttributeError: 'Cat' object has no attribute 'country'

# 2. test 子:ColorfulCat
>>> e=ColorfulCat()                # 子类实例对象--无限制
>>> e.name='Tom'
>>> e.name
'Tom'
>>> e.country='China'    
>>> e.country
'China'

# 3. test 子:PureCat
>>> p=PureCat()                 # 子类实例对象--允许属性:父(name,age)+ 子(color)
>>> p.color='White'
>>> p.name='Tom'
>>> p.age=1
>>> p.country='China'
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-281-3661a6a00d34> in <module>
----> 1 p.country='China'

AttributeError: 'PureCat' object has no attribute 'country'

类成员:方法

class Cat:
    name='cls_cat'

    def eat(self):                # 实例方法
        print('%s is eating' % self.name)

    @classmethod
    def from_setting(cls):        # 类方法
        cls.name='cls_cat_fresh'
        # 可添加类方法(第一个参数代表类):
        @classmethod
        def say_cls(x,y):
            return "%s say %s" % (x.name,y)
        cls.say_cls=say_cls
        # 可添加实例方法(第一个参数代表实例):
        cls.say_self=lambda x,y:"%s say %s" % (x.name,y)

    @staticmethod                 # 静态方法
    def play():
        print("%s is playing" % Cat.name)

c=Cat()

>>> Cat.name,c.name
('cls_cat', 'cls_cat')

# 1. 调用类方法
>>> Cat.from_setting()          # or use: c.from_setting()
>>> Cat.name,c.name
('cls_cat_fresh', 'cls_cat_fresh')

# 2. 调用静态方法
>>> Cat.play()                 # or use: c.play()
cls_cat_fresh is playing

# 3. 调用实例方法
>>> c.eat()                 # can't use: Cat.eat()
cls_cat_fresh is eating
>>> c.name='self_cat'
>>> c.eat()
self_cat is eating

# 4. 调用from_setting()后添加的方法
>>> c.say_self('Hello') 
'self_cat say Hello'
>>> c.say_cls('Hello')
'cls_cat_fresh say Hello'
  • 类方法:
    • 需用修饰器@classmethod标识,第一个参数(习惯用变量名cls)必须是类对象(类外调用时,不用传递该参数,Python解释器会自己把类对象传入)
    • 方法中:可修改类定义
    • 在类外访问:可通过实例和类访问 类名.xxx(...),实例名.xxx(...)
  • 实例方法:
    • 不用修饰,第一参数(习惯用变量名self)必须是实例对象本身(类外调用时,不用传递该参数,Python解释器会自己把实例对象传入)
    • 方法中:可修改实例属性(eg: self.xxx)
    • 在类外访问:通过实例访问实例名.xxx(...) (不可使用类访问)
  • 静态方法:
    • 需用修饰器@staticmethod标识,无参数限制
    • 方法中:可通过类调用类属性,类方法,静态方法 类名.xxx,类名.xxx(...)
    • 在类外调用:可通过实例和类访问 类名.xxx(...),实例名.xxx(...)

继承与多态

  • 继承:

    • 子类创建过程:
      • 子类__new__,无则调用父类 __new__
      • 子类__init__,无则父类 __init__
    • 父类使用了有参__init__,则子类创建时也要有参
    • 子类无法访问父类的私有属性和方法
    • 重写: 子类会覆盖父类中同名的方法
      • 子类中调用父类方法super().xxx,父类.xxx
      • 类.__mro__ 可以查看类搜索方法时的先后顺序
  • 单继承

      class Animal:
          def eat(self):
              print('eating...')
    
      class Cat(Animal):
          pass
    
  • 多继承(MixIn)

      class Runnable:
          def run(self):
              print('runing...')
    
      class Cat(Animal,Runnable):
          pass
    
  • 多态:自动调用实际类型的方法

Sample1:继承

class Animal:
    def __init__(self,name):
        self.name=name
        print("Animal %s init." % name)

    def eat(self):
        print('Animal is eating...')

class Cat(Animal):

    def eat(self):
        print("Cat is eating...")

    def play(self):
        print("Cat is playing...")

    def father(self):
        print("call father eat func:")
        super().eat()
        print("call self eat func:")
        self.eat()

>>> c=Cat("Tom")
Animal Tom init.

>>> c.eat()
Cat is eating...

>>> c.play()
Cat is playing... 

>>> c.father()
call father eat func:
Animal is eating...
call self eat func:
Cat is eating...

>>> Cat.__mro__
(__main__.Cat, __main__.Animal, object)

>>> isinstance(c,Cat)
True
>>> isinstance(c,Animal)
True

Sample2:多态

class Animal:
    def eat():
        print('Animal is eating...')

class Cat(Animal):
    def eat():
        print("Cat is eating...")

class Dog(Animal):
    def eat():
        print("Dog is eating...")

def do_eat(obj):
    obj.eat()

>>> do_eat(Animal())
Animal is eating...

>>> do_eat(Cat())
Cat is eating...

>>> do_eat(Dog())
Dog is eating...

高阶:元类metaclass

  • 类拥有创建实例对象的能力,而类本身也是对象,可动态地创建(Python是动态语言,函数和类,不是编译时定义的,而是运行时动态创建的)
  • 元类即用来创建类的“东西”,实现方式:
    • 直接使用type元类
    • 自定义一个元类:创建一个函数或类,使用class关键字或type(...)创建类
  • type(object_or_name, bases, dict): Python内建的可以用来创建所有类的元类
    • type(object) -> the object's type 查看某个对象的类型
    • type(name, bases, dict) -> a new type 创建一个新的类型
      • name: 类名,str
      • bases: 父类名,tuple
      • dict: 类属性,成员方法,dict
  • metaclass/__metaclass__: 类定义时指定元类

      # metaclass元类查找顺序:Foo -> Bar -> Module -> type
    
      # 1. python2写法:
      class Foo(Bar):        
          __metaclass__=xxx
          pass
    
      # 2. python3写法:
      class Foo(Bar,metaclass=xxx):
          pass
    
    • metaclass=函数/类,参数同type:name, bases, dict
    • 应用:创建类时动态修改类定义(元类:拦截类的创建 -> 修改类 -> 返回修改之后的类)
      • 例如:ORM框架,一个类对应一个表,类/实例对象操作对应数据库表/记录操作
    • Python会使用当前类中metaclass指定的元类创造类对象,没有则找父类的,父类没有则去模块层次中找,还找不到则用内置的type来创建这个类对象

Sample1: 使用元类type创建类

def eat(self):
    print('Cat is eating...')

>>> cls=type('Cat',(),{'name':'Cat','age':1,'eat':eat})
>>> cls.name,cls.age
('Cat',1)

>>> c=cls()
>>> c.name='Tom'
>>> cls.name,c.name
 ('Cat', 'Tom')

>>> c.eat()
Cat is eating...

Sample2: 自定义一个函数作为元类来创建类

# 自定义一个函数,使用`class`关键字创建类(也可使用`type`创建类)
def choose_animal(name):
    if name=='Cat':
        class Cat:
            def eat(self):
                print("Cat is eating...")
        return Cat
    else:
        class Animal:
            def eat(self):
                print('Animal is eating...')
        return Animal

>>> cls=choose_animal('Cat')    # __main__.choose_animal.<locals>.Cat
>>> c=cls()                     # <__main__.choose_animal.<locals>.Cat at 0x1050e7ac8>
>>> c.eat()
Cat is eating...

Sample3:通过__metaclass__属性指定元类,动态改变类定义

# 1. 自定义一个元类
# 使用函数
def animalMetaCls(clsName,bases,attrs):
    print('call animalMetaCls',clsName)
    if clsName=='Cat':
        def eat(self):
            print('Cat is eating...')
        attrs['eat']=eat
        attrs['name']='Cat'
    return type(clsName,bases,attrs)

# 或使用类
class animalMetaCls:
    def __new__(cls,clsName,bases,attrs):
        print('call animalMetaCls',clsName)
        if clsName=='Cat':
            def eat(self):
                print('Cat is eating...')
            attrs['eat']=eat
            attrs['name']='Cat'
        return type(clsName,bases,attrs)

# 2. 通过metaclass指定元类
>>> class Cat(metaclass=animalMetaCls):
...     pass
...
call animalMetaCls Cat                 # 创建类时,就会触发metaclass

# 3. verify:
>>> c=Cat()
>>> c.eat()
Cat is eating...
>>> Cat.name
'Cat'

高阶:枚举类Enum

Refer Doc

  • 可以把一组相关常量定义在一个class中,class不可变,成员可以等值比较
  • Enum(value, names=None, *, module=None, qualname=None, type=None, start=1)
    • value: What the new Enum class will record as its name.
    • names: The Enum members. This can be a whitespace or comma separated string (values will start at 1 unless otherwise specified)
      • 'RED GREEN BLUE' | 'RED,GREEN,BLUE' | 'RED, GREEN, BLUE'
      • ['RED', 'GREEN', 'BLUE']
      • [('CYAN', 4), ('MAGENTA', 5), ('YELLOW', 6)]
      • {'CHARTREUSE': 7, 'SEA_GREEN': 11, 'ROSEMARY': 42}
    • module: name of module where new Enum class can be found.
    • qualname: where in module new Enum class can be found.
    • type: type to mix in to new Enum class
    • start: number to start counting at if only names are passed i
  • 注:Enum的value属性是自动赋给成员的int常量,默认从1开始计数

Sample:

from enum import Enum,auto

# 1.创建枚举类对象

# 法一:直接使用`Enum(变量统称名,(变量1,变量2....))`
Month = Enum('Month', ('Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'))

# 法二:继承自Enum类
@unique                     # optional: @unique装饰器--检查保证没有重复值
class Month(Enum):
    Jan=1
    Feb=auto()
    Mar=auto()
    Apr=auto()
    May=auto()
    Jun=auto()
    Jul=auto()
    Aug=auto()
    Sep=auto()
    Oct=auto()
    Nov=auto()
    Dec=auto()

# 2. test:
>>> Month
<enum 'Month'>
>>> type(Month)
enum.EnumMeta

>>> Month.Feb                         # 枚举
<Month.Feb: 2>
>>> type(Month.Feb)
<enum 'Month'>

>>> Month.Feb.name                     # 获取枚举名
'Feb'
>>> Month.Feb.value                 # 获取枚举名
2
>>> Month['Feb']                     # 获取枚举名为'Feb'的枚举,返回枚举类型
<Month.Feb: 2> 
>>> Month(2)                         # 获取枚举值为2的枚举,返回枚举类型
<Month.Feb: 2>

>>> Month.Feb==Month.Feb             # 枚举等值比较(注:无法直接进行大小比较)
True
>>> Month.Feb!=Month.Jan
True
>>> Month.Feb is Month.Jan
False

>>> for item in Month:                 # 遍历枚举,列出所有枚举成员
...     print(item)
Month.Jan
Month.Feb
Month.Mar
Month.Apr
Month.May
Month.Jun
Month.Jul
Month.Aug
Month.Sep
Month.Oct
Month.Nov
Month.Dec

>>> list(Month)
[<Month.Jan: 1>,
 <Month.Feb: 2>,
 <Month.Mar: 3>,
 <Month.Apr: 4>,
 <Month.May: 5>,
 <Month.Jun: 6>,
 <Month.Jul: 7>,
 <Month.Aug: 8>,
 <Month.Sep: 9>,
 <Month.Oct: 10>,
 <Month.Nov: 11>,
 <Month.Dec: 12>]

>>> Month.__members__                     # __members__
mappingproxy({'Jan': <Month.Jan: 1>,
              'Feb': <Month.Feb: 2>,
              'Mar': <Month.Mar: 3>,
              'Apr': <Month.Apr: 4>,
              'May': <Month.May: 5>,
              'Jun': <Month.Jun: 6>,
              'Jul': <Month.Jul: 7>,
              'Aug': <Month.Aug: 8>,
              'Sep': <Month.Sep: 9>,
              'Oct': <Month.Oct: 10>,
              'Nov': <Month.Nov: 11>,
              'Dec': <Month.Dec: 12>})

>>> Month.__members__.keys()
Out[35]: odict_keys(['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'])

>>> Month.__members__.values()
odict_values([<Month.Jan: 1>, <Month.Feb: 2>, <Month.Mar: 3>, <Month.Apr: 4>, <Month.May: 5>, <Month.Jun: 6>, <Month.Jul: 7>, <Month.Aug: 8>, <Month.Sep: 9>, <Month.Oct: 10>, <Month.Nov: 11>, <Month.Dec: 12>])

>>> Month.__members__.items()
Out[5]: odict_items([('Jan', <Month.Jan: 1>), ('Feb', <Month.Feb: 2>), ('Mar', <Month.Mar: 3>), ('Apr', <Month.Apr: 4>), ('May', <Month.May: 5>), ('Jun', <Month.Jun: 6>), ('Jul', <Month.Jul: 7>), ('Aug', <Month.Aug: 8>), ('Sep', <Month.Sep: 9>), ('Oct', <Month.Oct: 10>), ('Nov', <Month.Nov: 11>), ('Dec', <Month.Dec: 12>)])

>>> for name, member in Month.__members__.items():
...    print(name, '=>', member, ',', member.value)
...
Jan => Month.Jan , 1
Feb => Month.Feb , 2
Mar => Month.Mar , 3
Apr => Month.Apr , 4
May => Month.May , 5

高阶:单例模式

Singleton 确保某一个类只有一个实例

class Cat:
    __instance=None
    __first_init = True         # optional:用来保证只初始化一次

    def __new__(cls,name,age):
        if not cls.__instance:
            cls.__instance=object.__new__(cls)
        return cls.__instance

    def __init__(self,name,age):
        if self.__first_init:
            print('init')
            self.name=name
            self.age=age
            Cat.__first_init=False

>>> c1=Cat('Tom',2)
init
>>> c2=Cat('Andy',3)
>>> c1==c2
True
>>> id(c1),id(c2)
(4559954776, 4559954776)
>>> c1.name,c2.name
('Tom', 'Tom')

实际Enum就是个单例:

class Cat(Enum):
    Instance=1

>>> c1=Cat.Instance
>>> c2=Cat.Instance
>>> c1==c2
True
>>> id(c1),id(c2)
(4560045560, 4560045560)
>>> c1.name,c2.name
('Instance', 'Instance')
>>> c2.age=3
>>> c1.age,c2.age
(3,3)

高阶:内置类属性

可通过dir(obj)查看某对象的所有方法和属性,eg:


class Cat:
    def eat(self):
        print('eating...')

>>> dir(Cat)
['__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__']

一些常用的内置类属性和方法说明:

  • 方法:
    • __new__ 创建实例时触发调用
    • __init__ 创建实例后触发调用
    • __str__ 返回用户看到的字符串,若没实现,使用repr结果,print时调用
    • __repr__ 返回程序开发者看到的字符串(为调试服务的),直接输出时调用
    • __getattribute__ 属性访问拦截器,触发访问实例属性时
  • 属性:
    • __class__ 实例的类,触发方式:实例.__class__
    • __doc__ 类文档,子类不继承,触发方式: help(类或实例)
    • __module__ 类定义所在的模块
    • __dict__ 对象的属性字典
class Cat:
    age=1

    def __new__(cls,name):                 # 要有一个参数cls,代表要实例化的类,此参数在实例化时由Python解释器自动提供
        print('new Cat %s' % name)
        return object.__new__(cls)      # 返回实例化出来的实例

    def __init__(self,name):             # 参数self,就是__new__返回的实例
        self.name=name
        print('%s init.' % name)     

    def __del__(self):
        print('%s del.' % self.name)

    def __str__(self):
        return "Cat [name=%s]" % self.name

    def __getattribute__(self,attr): # 注意:不要这个方法中调用self.xxxx,会进入死循环
        print('get attr: %s' % attr)
        return object.__getattribute__(self,attr) 


>>> c=Cat('Tom')         # 触发调用 __new__ => __init__
new Cat Tom
Tom init.

>>> c                     # 触发调用__repr__
<__main__.Cat at 0x104b83208>

>>> print(c)             # 触发调用__str__
Cat [name=Tom]

>>> c.name                 # 触发__getattribute__
get attr: name
'tom'

>>> c.__class__
__main__.Cat

>>> c.__dict__
{'name': 'Tom'}

>>> del c                 # 触发调用__del__
Tom del.

高阶:定制类

定制类:通过在类中定义python内建的特殊方法__xxx__,可为类实现一些特殊的操作

一些常见的python内建的特殊方法__xxx__

  • 基础方法:
    • __new__(self[,arg1,…]),__init__(self[,arg1,…]),__del__(self)
    • __str__(self) 可打印的字符串输出;内建 str()print() 函数
    • __repr__(self) 运行时的字符串输出;内建 repr() 函数及 ' ' 操作符
    • __len__(self) 长度;内建 len()
    • __nonezero__(self) 为实例定义 False 值;内建 bool() 函数
    • __call__(self,*args) 用于可调用的实例;可以用来替代闭包的实现
  • 值比较:
    • __cmp__(self,obj) 对象比较;内建 cmp()
    • __lt__(self,obj),__le__(self,obj) 小于 & 小于等于;内建< & <=
    • __gt__(self,obj),__ge__(self,obj) 大于 & 大于等于;内建 > & >=
    • __eq__(self,obj),__ne__(self,obj) 等于 & 不等于;内建 = & !=
  • 类的属性:
    • __getattr/setattr/delattr__(self,attr) 获取/设置/删除属性;注:__getattr__内建 getattr(),仅在属性没有找到时调用
    • __getattribute__(self,attr) 获取属性;内建 getattr();总是被调用
    • __get/set/delete__(self,attr) (描述符)获取/设置/删除属性
  • 数值类型:
    • __complex__(self, com) 内建 complex()
    • __int__(self) 内建 int()
    • __float__(self) 内建 float()
    • __index__(self) 在有必要时,压缩可选的数值类型为整型(比如用于切片索引时等)
    • __neg/pos/abs/invert__(self) 一元负/一元正/绝对值/按位求反(内建 ~ 操作符)
    • __add/sub/mul/dev/mod/pow__(self,obj) : +,-,*,/,%,**
    • __lshift/rshift/and/or/xor__(self,obj) : <<,>>,&,|,^
  • 序列类型:
    • __len__(self) 序列中的项目数
    • __getitem__(self, ind),__setitem__(self, ind,val),__delitem__(self, ind) 获取/设置/删除元素
    • __getslice__(self, ind1,ind2),__setslice__(self, i1, i2,val),__delslice__(self, ind1,ind2) 获取/设置/删除切片元素
    • __contains__(self, val) 含有成员;内建 in 关键字
    • __*add__(self,obj) 串联;+ 操作符
    • __*mul__(self,obj) 重复;*操作符
    • __iter__(self) 生成迭代器;内建 iter() 函数
  • 映射类型:
    • __len__(self) 类中的项目数
    • __hash__(self) 散列(hash)函数值
    • __getitem__(self,key),__setitem__(self,key,val),__delitem__(self,key) 获取/设置/删除某个值
    • __missing__(self,key) 给定键若不存在,则返回一个默认值

Sample1:使用__call__实现:为可调用对象

  • 使用了__call__后,对实例进行直接调用就好比对一个函数进行调用一样, 除了用instance.method(...)调用实例方法,可以直接对实例进行调用instance(...)
  • 能被调用的对象就是一个Callable对象通过callable()函数判断一个对象是否是“可调用”对象
class Cat:
    def __init__(self,name):
        self.name='Cat'

    def __call__(self):
        print("I am %s." % self.name)

    def eat(self):
        print('%s is eating...' % self.name)

>>> c=Cat('Tom')
>>> c()                 # 可直接对实例进行调用
I am Cat.
>>> callable(c)
True
>>> c.eat()
Tom is eating...

Sample2:使用__getattr__实现:动态返回一个属性值

__getattr__ 特性:

  • 获取属性,只有在没有找到属性的情况下,才调用
  • 实现若不写返回值则表示返回None,不会再抛出AttributeError
class Cat:
    def __init__(self,name):
        self.name='Cat'
    def __getattr__(self,attr):
        if attr=='age':
            return 0
        elif attr=='color':
            return lambda:'Red'
        #raise AttributeError('This object has no attribute \'%s\'' % attr)
        return 'N/A'

>>> c=Cat('Tom')
>>> c.age
0
>>> c.color()
'Red'
>>> c.sex
'N/A'
>>> c.country
'N/A'

应用:链式调用组建path

class PathChain:
    def __init__(self,path=''):
        self.path=path

    def __getattr__(self,attr):
        return PathChain('%s/%s' % (self.path,attr))

    def __str__(self):
        return self.path
    __repr__=__str__

    def __call__(self,path):
        return PathChain('%s/%s' % (self.path,path))

>>> PathChain().users.roles
/users/roles
>>> PathChain().users('Tom').roles
/users/Tom/roles

Sample3:使用__iter__&__next__实现迭代

__iter__返回一个迭代对象,for循环不断调用该迭代对象的__next__方法拿到循环的下一个值,直到遇到StopIteration错误时退出循环

# f(n)=f(n-2)+f(n-1),f(0)=0,f(1)=1
# a=f(n-2),b=f(n-1)
# n: 0,1,2,3,4,5,6,7 ,8
# v: 0,1,1,2,3,5,8,13,21
class Fib:
    def __init__(self,n):    # n>=1
        self.a,self.b=0,1
        self.i,self.n=0,n

    def __iter__(self):
        return self

    def __next__(self):
        if self.i==0:
            self.i+=1
            return self.a
        self.a,self.b=self.b,self.a+self.b
        self.i+=1
        if self.i > self.n:
            raise StopIteration()
        return self.a

>>> for i in Fib(9):
...     print(i) 
0
1
1
2
3
5
8
13
21

>>> list(Fib(9))
[0, 1, 1, 2, 3, 5, 8, 13, 21]

>>> f=Fib(5)
>>> next(f)
0
>>> next(f)
1
>>> next(f)
1
>>> next(f)
2
>>> next(f)
3
>>> next(f)
StopIteration                             Traceback (most recent call last)
<ipython-input-34-aff1dd02a623> in <module>
----> 1 next(f)
...

Sample4:使用__getitem__实现按照下标/切片取出元素

class Fib(object):
    def __getitem__(self, n):
        if isinstance(n, int): # n是索引
            a, b = 0, 1
            for x in range(n):
                a, b = b, a + b
            return a
        if isinstance(n, slice): # n是切片
            a, b = 0, 1
            start,stop = n.start or 0, n.stop
            result = []
            for x in range(stop):
                if x>=start:
                    result.append(a)
                a, b = b, a + b
            return result

>>> f=Fib()
>>> f[0]
>>> f[8]
21
>>> f[0:9]                             # [start,end)
[0, 1, 1, 2, 3, 5, 8, 13,21]
>>> f[3:8]
[2, 3, 5, 8, 13]

模块与包

导入

  1. import ... [as ...]

     import math,time
     math.sqrt(2)
     time.sleep(3)
    
     import math as mt,time as tt
     mt.sqrt(2)
     tt.sleep(3)
    
  2. from ... import ...

     from math import sqrt,pow
     sqrt(2)
     pow(2,3)
    
     from math import *     # 注:无法使用as重命名
     sqrt(2)
     pow(2,3)
    
     from math import sqrt as st,pow as pw
     st(2)
     pw(2,3)
    
  3. 模块定位顺序(存储在system模块的sys.path变量中):

     >>> import sys
     >>> sys.path
     ['', '/anaconda3/lib/python37.zip', '/anaconda3/lib/python3.7', '/anaconda3/lib/python3.7/lib-dynload', '/anaconda3/lib/python3.7/site-packages', '/anaconda3/lib/python3.7/site-packages/aeosa']
    
    • 从上面列出的目录里依次查找要导入的模块文件
    • 搜索顺序:
      • 当前目录(上面'' 即表示当前目录)
      • shell变量PYTHONPATH下的每个目录
      • python安装路径
    • 添加搜索目录:
      • 法一:直接修改sys.path,动态添加,运行结束后就失效. eg: sys.path.insert(0, xxx) 可以确保先搜索xxx这个路径
      • 法二:设置环境变量PYTHONPATH
  4. 一个程序进程下,模块被导入后,import.../from...import... 并不能重新导入模块,重新导入需先import importlibimportlib.reload(moduleName)

     # 1. initial:
     $ mkdir First
     $ vi First/hello.py
     def say():
         print('say hello')
    
     # 2. run:
     $ python3
     >>> import First.hello
     >>> First.hello.say()
     say hello
    
     # 3. update module function:
     $ vi First/hello.py
     def say():
         print('say hello again')
    
     # 4. back to previous process,re-import and test:
     >>> import First.hello
     >>> First.hello.say() 
     say hello                     # no change!
    
     # 5. use importlib reload the module
     >>> import importlib
     >>> importlib.reload(First.hello)
     >>> First.hello.say()
     say hello again             # changed!
    

自定义库

  1. 库:包/模块 => 根目录:文件夹/文件 (命名空间隔离的执行者)

    • 包:文件夹,树状结构,可放入多个层级包/模块
    • 模块:文件,一个python文件本身就可以作为一个Module导入,此时模块名=文件名
  2. 包初始化文件:__init__.py => 代表可执行包(导入包时会执行它对应的__init__.py文件)

    • 暴露当前包对外的API,以控制这个包的导入行为
    • 同module一样,导入时,此文件里的语句就会被执行:
      • module => module.py(like: module/init.py)
      • package => package/init.py
    • 无此文件时,无法配置包的特殊变量__all__,则import *时无内容开放
    • 注:只能控制此文件中的变量 & 当前目录下的模块或包的开放(不包括子目录下内容的控制)
  3. 特殊变量:

    • __name__
      • 直接运行此.py时,此属性值='__main__'
      • 以模块形式被导入时,此属性值='文件名'
      • 可根据此变量的结果判断是直接执行的python脚本还是被引入执行的,从而能够有选择性的执行测试代码
    • 私有化变量:_xxx,__xxx
      • 只对import *方式的导入有制约效果,对import xxx的导入无效
    • __all__
      • 可设置__all__=['..','..',...],列出可对外公开的内容(即外面可导入使用的内容)
        • package/__init__.py中,控制此package(一级)目录下开放的模块 & 子包 & package包本身init文件中定义的变量,即控制package/*内容的开放
        • module.py中,控制此module下开放的变量(此module.py可看做是module的__init__.py),即控制module/*内容的开放
      • 只对import *方式的导入有用,对import xxx的导入无效
      • 配置的优先级高于私有化变量,即配置后,私有化变量_xxx,__xxx的制约效果无效了,只受__all__约束
  4. 导入:

    • 导入时使用.表路径层级,eg:p1/p2/hello.py
      • 导入包:import p1,import p1.p2
      • 导入模块: import p1.p2.hello,from p1.p2 import hello
      • 导入模块中的变量:from p1.p2.hello import v1,v2,...
      • 导入包下一级开放内容:from p1 import *
      • 导入模块中开放内容:from p1.p2.hello import *
      • 可混合导入包和模块
      • 注:不可以from p1 import p2.hello
    • 导入时就会依次执行对应的.py文件
      • import path:
        • step: 沿路径搜索,并沿途执行初始化(即执行path经过的包__init__.py& 模块module.py),导入所有变量
        • 止于此path,会执行path/__init__.py or path.py,__all__配置不会起作用(因为并未导入path/*.py
      • from path import *:
        • step1: from path => 沿路径搜索,并沿途执行初始化(即执行path经过的包__init__.py & 模块module.py),但不会导入这些变量
        • step2: import * => 执行path/*下直属py文件并导入变量
          • path/__init__.py先执行,会根据其中定义的__all__控制可导入变量;
          • 若path代表的是module,则module.py即相当于它的__init__.py
        • 止于path/*,会执行path/*.py,根据path/__init__.py中配置的__all__导入变量
    • 总结:
      • import部分导入变量,from部分不导入变量;
      • path:代表到此文件(包或模块),*:代表path/*.py(受path/__init__.py__all__限制)
    • eg: import path
      • 导入包:import First.sub => import first initial vars + sub initial vars
      • 导入包:from First import sub => import sub initial vars
      • 导入模块:from First import hello => import hello initial vars
      • 导入模块:import First.hello => import first initial vars + hello initial vars
      • 导入变量:from First.hello import A,_B,__C,Cat => import A,_B,__C,Cat
    • eg: from path import *
      • 导入包:from First import * => depends on the __all__ in first initial py: ['hello','MCount','sub'] => first opened initial vars: MCount + hello initial vars: hello.* + sub initial vars sub.*
      • 导入包:from First.sub import * => depends on the __all__ in first initial py: ['aa'] => aa initial vars
      • 导入变量:from First.hello import * => depends on the __all__ in hello.py: ['A','__C','Cat'] => hello opened initial vars:A,__C,Cat
      • 注:无法使用from First import sub.aafrom First import sub.*

Sample:

First
├── __init__.py             # __all__=['hello','MCount','sub']
│                              # MCount=2,MNum=3
│
├── hello.py                 # __all__=['A','__C','Cat']
│                              # A=[1,2,3],_B=['a','b','c'],__C=('Hello','Tom')
│                              # def say():..., class Cat: ...
├── world.py
│ 
├── sub
│   ├── __init__.py         # __all__=['aa'],sub_var=1
│   ├── aa.py                 # colorA='Red'
│   └── bb.py                 # colorB='Green'
  1. 单独测试某个模块的功能(使用__name__区分状态)

     $ cat First/hello.py
     print('hello.py init')
     __all__=['A','__C','Cat']
    
     A=[1,2,3]
     _B=['a','b','c']
     __C=('Hello','Tom')
    
     def say():
         print('say hello again')
    
     class Cat:
         def eat(self):
             print('Cat is eating...')
    
     if __name__ == '__main__':
         # do test:
         say()
         c=Cat()
         c.eat()
    
     # 单独测试验证hello.py功能:
     $ python3 First/hello.py
     hello.py init
     A=[1, 2, 3],_B=['a', 'b', 'c'],__C=('Hello', 'Tom')
     say hello again
     Cat is eating...
    
  2. 使用import xxx => 不受xxx__all__控制

    • xxx最终为包 import First,import First.sub
        >>> import First             # 会执行First的__init__.py,导入其中所有变量
        First package init.
        >>> dir()
        ['First', '__annotations__', '__builtins__', '__doc__', '__loader__', '__name__', '__package__', '__spec__']
        >>> dir(First)
        ['MCount', 'MNum', '__all__', '__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__path__', '__spec__']
      
        >>> import First.sub         # 会执行First的__init__.py & sub的__init__.py,导入其中所有变量
        First package init.
        sub package init.
        >>> dir()
        ['First', '__annotations__', '__builtins__', '__doc__', '__loader__', '__name__', '__package__', '__spec__']
        >>> dir(First)
        ['MCount', 'MNum', '__all__', '__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__path__', '__spec__', 'sub']
        >>> dir(First.sub)
        ['__all__', '__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__path__', '__spec__', 'sub_var']
      
    • xxx最终为模块 import First.hello,from First import hello
        >>> import First.hello         # 会执行 First的__init__.py & hello.py,导入其中所有变量
        First package init.
        hello.py init
        >>> dir()
        ['First', '__annotations__', '__builtins__', '__doc__', '__loader__', '__name__', '__package__', '__spec__']
        >>> dir(First)
        ['MCount', 'MNum', '__all__', '__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__path__', '__spec__', 'hello']
        >>> dir(First.hello)
        ['A', 'Cat', '_B', '__C', '__all__', '__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__spec__', 'say']
      
        >>> from First import hello # 执行hello.py,导入其中所有变量
        First package init.
        hello.py init
        >>> dir()
        ['__annotations__', '__builtins__', '__doc__', '__loader__', '__name__', '__package__', '__spec__', 'hello']
        >>> dir(hello)
        ['A', 'Cat', '_B', '__C', '__all__', '__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__spec__', 'say']
      
    • xxx最终为变量 from First.hello import A,_B,__C,say,Cat
        >>> from First.hello import A,_B,__C,say,Cat
        First package init.
        hello.py init
        >>> dir()
        ['A', 'Cat', '_B', '__C', '__annotations__', '__builtins__', '__doc__', '__loader__', '__name__', '__package__', '__spec__', 'say']
      
  3. 使用from xxx import * => 受xxx/__init__.py__all__的控制

    • xxx为包 from First import *from First.sub import *
        >>> from First import *         # = from First import hello,sub
        First package init.
        hello.py init
        sub package init.
        >>> dir()                         # 受First/__init__.py 中`__all__`控制
        ['MCount', '__annotations__', '__builtins__', '__doc__', '__loader__', '__name__', '__package__', '__spec__', 'hello', 'sub']
        >>> dir(hello)                     # 不受hello.py 中`__all__`控制
        ['A', 'Cat', '_B', '__C', '__all__', '__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__spec__', 'say']
        >>> dir(sub)                # 不受sub/__init__.py 中`__all__`控制
        ['__all__', '__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__path__', '__spec__', 'sub_var']
      
        >>> from First.sub import *     
        First package init.
        sub package init.
        sub/aa.py init.
        >>> dir()                         # 受sub/__init__.py 中`__all__`控制
        ['__annotations__', '__builtins__', '__doc__', '__loader__', '__name__', '__package__', '__spec__', 'aa']
      
    • xxx为模块 from First.hello import *
        >>> from First.hello import *     # = from First.hello import A,Cat,__C
        First package init.
        hello.py init
        >>> dir()                         # 受hello.py 中`__all__`控制
        ['A', 'Cat', '__C', '__annotations__', '__builtins__', '__doc__', '__loader__', '__name__', '__package__', '__spec__']
      

发布/安装包

  • 发布:
    • 编辑setup.py文件
    • 构建模块 python setup.py build
    • 生成发布压缩包 python setup.py sdist
  • 安装:
    • 解压获得的压缩包
    • 进入文件夹,执行python setup.py install (可加参数指定安装路径:--prefix=安装路径)
  • 使用:from ... import ...import package.module 导入

Sample:

Second
├── setup.py
├── subA
│   ├── __init__.py
│   ├── aa.py
│   └── bb.py
└── subB
    ├── __init__.py
    ├── cc.py
    └── dd.py

发布:

# 1. setup.py:
$ cd Second
$ vi setup.py
from distutils.core import setup
setup(name="SecondPkg", 
    version="1.0", 
    description="Second module", 
    author="dongGe", 
    py_modules=['subA.aa', 'subA.bb', 'subB.cc','subB.dd']) 
    # py_modules 需指明所需包含的py文件

# 2. build: 构建 (发布的包下一定要有`__init__.py`文件,不然会build失败)
$ python setup.py build
running build
running build_py
copying subA/__init__.py -> build/lib/subA
copying subB/__init__.py -> build/lib/subB

# 3. 查看(实际就是copy了一份到`build/lib`下)
$ tree
.
├── build
│   └── lib
│       ├── subA
│       │   ├── __init__.py
│       │   ├── aa.py
│       │   └── bb.py
│       └── subB
│           ├── __init__.py
│           ├── cc.py
│           └── dd.py
├── setup.py
├── subA
│   ├── __init__.py
│   ├── aa.py
│   └── bb.py
└── subB
    ├── __init__.py
    ├── cc.py
    └── dd.py

# 4. sdist: 生成发布压缩包 (产生`MANIFEST`文件,`dist/SecondPkg-1.0.tar.gz`压缩包)
$ python setup.py sdist
$ tree
.
├── MANIFEST
├── build
│   └── lib
│       ├── subA
│       │   ├── __init__.py
│       │   ├── aa.py
│       │   └── bb.py
│       └── subB
│           ├── __init__.py
│           ├── cc.py
│           └── dd.py
├── dist
│   └── SecondPkg-1.0.tar.gz
├── setup.py
├── subA
│   ├── __init__.py
│   ├── aa.py
│   └── bb.py
└── subB
    ├── __init__.py
    ├── cc.py
    └── dd.py

安装:

# 1. 解压
$ tar -xvf SecondPkg-1.0.tar.gz

# 2. 进入文件夹
$ cd SecondPkg-1.0
$ tree
.
├── PKG-INFO
├── setup.py
├── subA
│   ├── __init__.py
│   ├── aa.py
│   └── bb.py
└── subB
    ├── __init__.py
    ├── cc.py
    └── dd.py

# 3. 安装
$ python setup.py install
...
creating /anaconda3/lib/python3.7/site-packages/subA
...
creating /anaconda3/lib/python3.7/site-packages/subB
...
running install_egg_info
Writing /anaconda3/lib/python3.7/site-packages/SecondPkg-1.0-py3.7.egg-info

# 4. 查看安装的包
$ pip list | grep Second
SecondPkg                          1.0

# 5. 使用
$ cd ~/space
$ python3

# 导入包:
>>> from subB import *             # 注意:不要使用 `import subA,subB` -- 不会导入包下的任何模块
subB init.
This subB/cc.py
This subB/dd.py
>>> dir()
['__annotations__', '__builtins__', '__doc__', '__loader__', '__name__', '__package__', '__spec__', 'cc', 'dd']

# 导入模块:
>>> from subA.aa import *  # 或 `from subA import aa` 或 `import subA.aa`
subA init.
This subA/aa.py

常用标准库

Python的一个组成部分,随着Python解释器,一起安装在电脑中的

标准库 说明
builtins 内建函数默认加载
os 操作系统接口
sys Python自身的运行环境
functools 常用的工具
json 编码和解码JSON对象
logging 记录日志,调试
multiprocessing 多进程
threading 多线程
copy 拷贝
time 时间
datetime 日期和时间
calendar 日历
hashlib 加密算法
random 生成随机数
re 字符串正则匹配
socket 标准的 BSD Sockets API
shutil 文件和目录管理
glob 基于文件通配符搜索
collections 一个集合模块,提供了许多有用的集合类

Sample:datetime

>>> from datetime import datetime

>>> datetime.now()                         # 1. 获取当前datetime
datetime.datetime(2019, 3, 17, 11, 58, 22, 874984)
>>> type(datetime.now())
<class 'datetime.datetime'>

>>> dt=datetime(2018, 5, 19, 13, 10)     # 2. 指定某个日期和时间的datetime
>>> print(dt)
2018-05-19 13:10:00

>>> ts=dt.timestamp()                     # 3.1: datetime -> timestamp
>>> ts
1526706600.0
>>> datetime.fromtimestamp(ts)             # 3.2: timestamp -> datetime 本地时间
datetime.datetime(2018, 5, 19, 13, 10)
>>> datetime.utcfromtimestamp(ts)         # 3.2: timestamp -> datetime UTC时间
datetime.datetime(2018, 5, 19, 5, 10)

>>> dt.strftime('%Y-%m-%d %H:%M:%S')     # 4.1: datetime -> str
'2018-05-19 13:10:00'
>>> str(dt)
'2018-05-19 13:10:00'
>>> datetime.strptime('2018-05-19 13:10:00','%Y-%m-%d %H:%M:%S')     # 4.2: str -> datetime
datetime.datetime(2018, 5, 19, 13, 10)

>>> from datetime import timedelta         # 5. datetime +/-
>>> dt+timedelta(hours=10)
datetime.datetime(2018, 5, 19, 23, 10)
>>> dt-timedelta(days=2, hours=12)
datetime.datetime(2018, 5, 17, 1, 10)

>>> from datetime import timezone                     # 6. 时区转换
>>> dt_utc=dt.replace(tzinfo=timezone.utc)            # 6.1 强制设置为utc时区,作为基准时间
>>> dt_utc
datetime.datetime(2018, 5, 19, 13, 10, tzinfo=datetime.timezone.utc)
>>> print(dt_utc)
2018-05-19 13:10:00+00:00
>>> dt_utc.astimezone(timezone(timedelta(hours=8))) # 转换北京时区(UTC+8)
datetime.datetime(2018, 5, 19, 21, 10, tzinfo=datetime.timezone(datetime.timedelta(seconds=28800)))
>>> dt_utc.astimezone(timezone(timedelta(hours=9))) # 转换东京时区(UTC+9)
datetime.datetime(2018, 5, 19, 22, 10, tzinfo=datetime.timezone(datetime.timedelta(seconds=32400)))

>>> dt_bj=dt_utc.astimezone(timezone(timedelta(hours=8)))
>>> dt_bj.astimezone(timezone(timedelta(hours=9)))    # 北京时间转换为东京时间(UTC+9)
datetime.datetime(2018, 5, 19, 22, 10, tzinfo=datetime.timezone(datetime.timedelta(seconds=32400)))
  • datetime表示的时间需要时区信息才能确定一个特定的时间,默认视为本地时区时间
  • 时区转换: 确定一个datetime的时区(可强制设置时区,作为基准时间),利用带时区的datetime,通过astimezone()方法,可以转换到任意时区
  • 存储datetime,最佳方法是将其转换为timestamp再存储,因为timestamp的值与时区完全无关

Sample:Collections

  • namedtuple : tuple子类,可用属性而不是索引来引用tuple的元素
  • deque : 双向列表(list是线性存储,数据量大时,插入和删除效率低),注:不是list子类
  • defaultdict : dict子类,key不存在时,不会报错而是返回一个默认值
  • OrderedDict : dict子类,Key会按照插入的顺序排列 (应用:可用来实现一个FIFO(先进先出)的dict,当容量超出限制时,先删除最早添加的Key)
  • ChainMap : 把一组dict串起来并组成一个逻辑上的dict,查找时按顺序在内部的dict中依次查找(应用:可用来实现参数的优先级查找),注:不是dict子类
  • Counter: dict子类,可实现一个简单的计数器
# 1. namedtuple
>>> from collections import namedtuple
>>> Point = namedtuple('Point', ['x', 'y'])
>>> p = Point(1, 2)
>>> p
Point(x=1, y=2)
>>> p.x,p.y                         # 可用属性访问
(1, 2)
>>> isinstance(p, Point),isinstance(p, tuple)
(True, True)

# 2. deque
>>> from collections import deque
>>> q = deque(['a', 'b', 'c'])
>>> q.append('x')                     # append(),pop()
>>> q.appendleft('y')                 # appendleft(),popleft()
>>> q
deque(['y', 'a', 'b', 'c', 'x'])
>>> isinstance(q,list)
False

# 3. defaultdict
>>> from collections import defaultdict
>>> d = defaultdict(lambda: 'N/A')
>>> d['a'] = 123
>>> d['a']
123
>>> d['b']                         # 访问存在key,返回设置的默认值,不会报错
'N/A'
>>> isinstance(d,dict)
True

# 4. ChainMap
>>> from collections import ChainMap
>>> m=ChainMap({'a':1,'b':2},{'b':3,'c':4})
>>> m['a'],m['b'],m['c']
(1, 2, 4)
>>> isinstance(m,dict)
False

# 5. Counter
>>> from collections import Counter
>>> c = Counter()
>>> for i in 'programming':     # 统计字符出现的个数,效果同:c={},c[i]=c.get(i,0)+1
...  c[i] += 1
...
>>> c
Counter({'r': 2, 'g': 2, 'm': 2, 'p': 1, 'o': 1, 'a': 1, 'i': 1, 'n': 1})
>>> isinstance(c,dict)
True

Sample:hashlib

>>> import hashlib
>>> m=hashlib.md5()
>>> m.update(b'Hello')
>>> m.hexdigest()
'8b1a9953c4611296a827abf8c47804d7'

常用扩展库

扩展库 说明
requests 使用的是urllib3,继承了urllib2的所有特性
urllib 基于http的高层库
scrapy 爬虫
beautifulsoup4 HTML/XML的解析器
celery 分布式任务调度模块
redis 缓存
Pillow(PIL) 图像处理
xlsxwriter 仅写excle功能,支持xlsx
xlwt 仅写excle功能,支持xls ,2013或更早版office
xlrd 仅读excle功能
elasticsearch 全文搜索引擎
pymysql 数据库连接库
mongoengine/pymongo mongodbpython接口
matplotlib 画图
numpy/scipy 科学计算
django/tornado/flask web框架
xmltodict xml转dict
SimpleHTTPServer 简单地HTTP Server,不使用Web框架
gevent 基于协程的Python网络库
fabric 系统管理
pandas 数据处理库
scikit-learn 机器学习库

Sample:SimpleHTTPServer

  1. 运行一个静态(目录)服务器
     $ cd Second
     $ python -m http.server 8000
     Serving HTTP on 0.0.0.0 port 8000 (http://0.0.0.0:8000/) ...
    
  2. visit http://localhost:8000 就可以看到列出了当前目录的页面,可方便的查看文件
     $ curl http://localhost:8000
     <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN" "http://www.w3.org/TR/html4/strict.dtd">
     <html>
     <head>
     <meta http-equiv="Content-Type" content="text/html; charset=utf-8">
     <title>Directory listing for /</title>
     </head>
     <body>
     <h1>Directory listing for /</h1>
     <hr>
     <ul>
     <li><a href="build/">build/</a></li>
     <li><a href="dist/">dist/</a></li>
     <li><a href="MANIFEST">MANIFEST</a></li>
     <li><a href="setup.py">setup.py</a></li>
     <li><a href="subA/">subA/</a></li>
     <li><a href="subB/">subB/</a></li>
     </ul>
     <hr>
     </body>
     </html>
    

文件IO

读写文件

  • 打开文件:f=open(filename,mode='r',encoding=None)
  • 关闭文件:f.close()
  • with语句包裹打开文件,不管是否异常,自动关闭文件,更保险(或使用try...finally
  • 读:
    • content=f.read(128) => str
    • content=f.readline() => str
    • content=f.readlines() => list
  • 写:
    • f.write('...') => int: length
    • f.writelines(...)
    • f.flush() 缓冲区内容刷新到文件中
  • 查看当前位置:position = f.tell()
  • 定位到某个位置:f.seek(offset, from)
    • offset: 偏移量
    • from: 0 文件开头/ 1 当前位置 / 2 文件末尾
    • eg: f.seek(2, 1): 从头开始,偏移2个字节; f.seek(-3,2): 离文件末尾,3字节处

Sample:

# 写
f = open('test.txt', 'w')      # 追加可用a
f.write('hello world, i am here!')
f.close()

# 读
f = open('test.txt', 'r')
content = f.read(128) # content = f.readline(),contentlst = f.readlines()
position = f.tell()
f.close()

# 使用with语句,会自动关闭
with open('test.txt', 'w') as f
    f.write('Hello world')

操作文件/目录

  • import os,import shutil(可以看做是os模块的补充,有更多方便的操作命令)
  • 系统相关:
    • os.name 查看操作系统类型(posix:Linux、Unix或Mac OS X;nt:Windows)
    • os.uname() 获取详细的系统信息(Windows上不提供)
    • os.environ 获取环境变量(dict,os.environ.get('PATH'))
  • 路径相关:os.path
    • 查看绝对路径 os.path.abspath(path)
    • 合成路径 os.path.join(p1,p2,...)
    • 拆分路径 os.path.split(path)
    • 文件扩展名 os.path.splitext(path)
    • 是否是文件 os.path.isfile(x)
    • 是否是文件夹 os.path.isdir(x) - 操作文件/文件夹:
    • 获取当前路径 os.getcwd()
    • 改变当前路径 os.chdir(path)
    • 列出文件/文件夹(ls) os.listdir(path)
    • 创建文件夹 os.mkdir(dir)
    • 删除空文件夹 os.rmdir(dir)
    • 重命名/移动文件/文件夹(mvos.rename(src,dist)
    • 删除文件 os.remove(file)
    • 拷贝文件:shutil.copyfile(src,dist)

多任务

  1. 同步(Sync) & 异步(Async):消息通知机制

    • 同步:主动等
    • 异步:被动听
  2. 阻塞(Blocking) & 非阻塞(UnBlocking):等待消息通知时的调用者状态

    • 阻塞:等待 (one thread do one task)
    • 非阻塞:继续做其他事 (one thread do many tasks,高吞吐量)
  3. 并发(Concurrent) & 并行 (Parallel):多任务处理能力

    • 并发:交替运行
    • 并行:同时运行
  4. 进程(Process) & 线程(Thread):多任务处理手段

    • 进程:独立地址空间,进程间独立运行(eg: 多核CPU,并行运行)
    • 线程:进程内,多线程共享同一段地址空间,但有自己独立的堆栈和局部变量(eg: 单核CPU多线程,并发运行)
  5. 组合(举例 Task:烧水 & 看电视)

    • 阻塞 & 同步
      • 烧水 -> 等水开(主动看)-> 看电视 :效率低
    • 阻塞 & 异步
      • 烧水 -> 水壶响(被动听)-> 看电视:效率低
    • 非阻塞 & 同步
      • 烧水 & 看电视 & 时不时看下水(主动看): 来回切换,效率低
    • 非阻塞 & 异步
      • 烧水 & 看电视 & 水壶响了再去关(被动听):减少切换,效率高

进程 Process

  1. os.fork(): 只在Unix/Linux/Mac上运行,windows不可以

     $ cat fork-demo.py
     import os,time
    
     num=1
     ret = os.fork()
     if ret==0:      # sub-process do:
         num=num-1   # should be 0
         print("sub-process:%s,pid:%s,ppid:%s num:%s" % (ret,os.getpid(),os.getppid(),num))
     else:           # main-process do:
         num=num+1   # should be 2
         print("main-process:%s,pid:%s,ppid:%s num:%s" % (ret,os.getpid(),os.getppid(),num))
    
     time.sleep(3)
    
     print('End')
    
     $ python fork-demo.py
     main-process:30929,pid:30928,ppid:6855 num:2
     sub-process:0,pid:30929,ppid:30928 num:0
     End
     End
    
     $ ps
        PID TTY           TIME CMD
      6855 ttys001    0:00.03 -bash
     30928 ttys001    0:00.02 python fork-demo.py
     30929 ttys001    0:00.00 python fork-demo.py
     28023 ttys002    0:00.02 -bash
    
    • 分出一个子进程,后面的代码会分别在原有进程和新创建进程运行
    • 返回0: 代表当前是新创建的子进程
    • 返回非0: 代表当前是原本的父进程(返回的数字=子进程pid)
    • 即子进程永远返回0,父进程返回子进程的ID
    • 每个进程中所有数据(包括全局变量)都各有拥有一份,互不影响
    • 父子进程互补影响,父进程结束不会导致子进程结束
    • 1次fork->2个进程,2次fork->4个进程,...
  2. multiprocessing.Process: 通用

     $ cat process-demo.py
     from multiprocessing import Process
     import os,time,random
    
     # 法一
     def task(name):
         print("子进程(%s) task %s start" % (os.getpid(),name))
         time.sleep(random.random()*2) 
         print("子进程(%s) task %s end" % (os.getpid(),name))
    
     # 法二
     class MyProcess(Process):
         def __init__(self,name):
             Process.__init__(self)
             self.name=name
    
         def run(self):
             print("子进程(%s) task %s start" % (os.getpid(),self.name))
             time.sleep(random.random()*2) 
             print("子进程(%s) task %s end" % (os.getpid(),self.name))
    
     if __name__=='__main__':
         print('父进程(%s) 开始' % os.getpid())
    
         # p = Process(target=task, args=('test',))   # 创建子进程,法一:创建一个Process实例
         p = MyProcess('test')                       # 创建子进程,法二:创建一个自定义的继承自Process类的实例
    
         p.start()   # 启动子进程
         p.join()    # 父进程阻塞等待子进程结束后再继续往下运行,非必需
         print('子进程(%s) 结束' % p.name)
         print('父进程(%s) 结束' % os.getpid())
    
     $ python process-demo.py
     父进程(39487) 开始
     子进程(39488) task test start
     子进程(39488) task test end
     子进程(test) 结束
     父进程(39487) 结束
    
    • 创建
      • 法一:创建一个Process实例
      • 法二:自定义一个类,继承Process类,重写run方法
    • 启动:p.start() 内部调用run方法
    • 等待结束:p.join([timeout])
    • 终止:p.terminate()
    • 判断是否还在执行:is_alive()
    • 常用属性:
      • name:当前进程实例别名,默认为Process-N,N为从1开始递增的整数
      • pid:当前进程实例的PID值
    • 注:
      • 父进程结束不影响子进程
      • 子进程无法访问父进程创建的全局变量
      • 变量可通过参数传递
  3. multiprocessing.Pool: 进程池

     from multiprocessing import Pool
     import os,time,random
    
     def task(name):
         print("子进程(%s): task %s => start" % (os.getpid(),name))
         t=random.random()*2
         time.sleep(t) 
         print("子进程(%s): task %s => end(sleep:%.2f)" % (os.getpid(),name,t))
         return os.getpid(),name
    
     def task_callback(arg):         # 父进程中执行,以元组方式获取task return值
         print("父进程(%s): 获取子进程(%s) task %s => callback" % (os.getpid(),arg[0],arg[1]))
    
     if __name__=='__main__':
         print('父进程(%s): => 开始' % os.getpid())
    
         p=Pool(3)
         for i in range(0,5):
             # p.apply(task,(i,))      # 阻塞方式发配任务(等一个任务完成后,再发布下一个)
             p.apply_async(task,(i,),callback=task_callback)  # 非阻塞方式发配任务(一次性发布3个,等空出进程后,再发布下一)
    
         p.close()   # 关闭进程池,是以不再接收新的请求
         p.join()    # 父进程阻塞等待所有子进程执行完成,必须放在close语句之后(不加join的话可能子进程还未完成就整个退出了)
         print('所有子进程结束')
         print('父进程(%s): => 结束' % os.getpid())
    
     # 阻塞方式结果:
     $ python pool-demo.py
     父进程(44350): => 开始
     子进程(44351): task 0 => start
     子进程(44351): task 0 => end(sleep:1.30)
     子进程(44352): task 1 => start
     子进程(44352): task 1 => end(sleep:0.27)
     子进程(44353): task 2 => start
     子进程(44353): task 2 => end(sleep:0.53)
     子进程(44351): task 3 => start
     子进程(44351): task 3 => end(sleep:1.68)
     子进程(44352): task 4 => start
     子进程(44352): task 4 => end(sleep:0.53)
     所有子进程结束
     父进程(44350): => 结束
    
     # 非阻塞方式结果:
     $ python pool-demo.py
     父进程(44306): => 开始
     子进程(44307): task 0 => start
     子进程(44308): task 1 => start
     子进程(44309): task 2 => start
     子进程(44309): task 2 => end(sleep:0.89)
     子进程(44309): task 3 => start
     父进程(44306): 获取子进程(44309) task 2 => callback
     子进程(44307): task 0 => end(sleep:1.14)
     子进程(44307): task 4 => start
     父进程(44306): 获取子进程(44307) task 0 => callback
     子进程(44308): task 1 => end(sleep:1.50)
     父进程(44306): 获取子进程(44308) task 1 => callback
     子进程(44309): task 3 => end(sleep:0.92)
     父进程(44306): 获取子进程(44309) task 3 => callback
     子进程(44307): task 4 => end(sleep:1.36)
     父进程(44306): 获取子进程(44307) task 4 => callback
     所有子进程结束
     父进程(44306): => 结束
    
    • 初始化Pool时,可以指定一个最大进程数
    • 若进程数达到指定的最大值,直到池中有进程任务结束,下一个任务请求才会发布给空闲进程执行
    • apply(func, args=(), kwds={}) 阻塞方式发布任务给一个进程,必须等待上一个进程任务结束才能执行下一个
    • apply_async(func, args=(), kwds={}, callback=None, error_callback=None) 非阻塞方式,有空闲进程即可发布执行
    • close() 关闭进程池,使其不再接受新的任务
    • terminate() 不管任务是否完成,立即终止
    • join() 主进程阻塞等待子进程们结束,必须在closeterminate之后使用
    • 注:
      • 父进程结束会影响子进程
      • 变量可通过参数传递
  4. Queue: 消息列队

     $ cat process-queue.py
     from multiprocessing import Process, Queue
     from multiprocessing import Manager,Pool
     import os, time, random
    
     def produce_task(q):
         while True:
             if not q.full():
                 t=random.random()
                 q.put(t)
                 print('produce:%.2f' % t)
                 time.sleep(t)
             else:
                 print('stop produce!')
                 break
    
     def consume_task(q):
         while True:
             if not q.empty():
                 t=q.get()
                 print('consume:%.2f' % t)
                 time.sleep(t)
             else:
                 print('stop consume!')
                 break
    
     def process_test():
         q=Queue(5)
         p_produce=Process(target=produce_task,args=(q,))
         p_consume=Process(target=consume_task,args=(q,))
         p_produce.start()
         p_consume.start()
         p_produce.join()
         p_consume.join()
         print("Done!")
    
     def pool_test():
         q=Manager().Queue(5)
         p=Pool(2)
         p.apply_async(produce_task,(q,))
         p.apply_async(consume_task,(q,))
         p.close()
         p.join()
         print("Done!")
    
     if __name__=='__main__':
         # process_test()
         pool_test()
    
     $ python process-queue.py
     produce:0.04
     consume:0.04
     produce:0.99
     consume:0.99
     stop consume!
     produce:0.07
     produce:0.13
     produce:0.96
     produce:0.86
     produce:0.27
     stop produce!
     Done!
    
    • 进程间通信,可使用multiprocessing.Queue()
    • 进程池进程通讯,可使用multiprocessing.Manager().Queue()
    • get(block=True,timeout=None) 从队列中弹出一个消息(无消息则阻塞或抛出Queue.Empty异常)
    • get_nowait() = get(False)
    • put(item,block=True,timeout=None) 将一个消息写入队列(队列满了则阻塞或抛出Queue.Full异常)
    • put_nowait(item) = put(item,False)
    • qsize() 当前队列包含的消息数量 => 效果不好,可能会报NotImplementedError
    • empty() 判断当前队列是否为空
    • full() 判断当前队列是否满了

线程 Thread

threading模块

  • Python的thread模块较底层,threading模块对thread做了一些包装,更方便
  • Python解释器(CPython)在解析多线程时,会使用GIL全局排他锁(Global Interpreter Lock),不管几核CPU,同一时刻只有一个线程能使用到CPU,从而导致多线程无法利用多核,解决方法有
    • 用多进程multiprocess代替多线程
    • 使用其他解析器(GIL只是CPython的产物)
    • 使用c语言实现多线程(利用ctypes模块直接调用任意的C动态库的导出函数)
  • 使用:
    • 创建线程:
      • 法一:创建一个Thread实例 Thread(group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None)
      • 法二:自定义一个类,继承Thread类,重写run方法
    • start() 启动
    • join() 主线程阻塞等待子线程结束后再继续,非必需
    • is_alive() 判断线程是否活着
    • name 线程名字,默认Thread-1,Thread-2,...
  • 线程同步
    • 多个线程有序协同工作
    • 场景:
      • 多个线程共同争夺修改某个共享数据,如全局变量,一个进程内的所有线程共享全局变量(各线程可修改,线程非安全)
      • 生产-消费者,速度不匹配,需要一个缓冲协同
    • 实现方式:
      • 互斥锁 Mutex
        • 每次只有一个线程可以上锁,其他需要获得这个锁的线程变为blocked状态
        • 线程调度程序从处于同步阻塞状态的线程中选择一个来获得锁,并使得该线程进入运行running状态
        • 锁:相当于一块独占空间,一次只能进一个人(线程)
        • 创建锁: mutex=threading.Lock()
        • 锁住:mutex.acquire(blocking=True, timeout=-1) => 锁状态:locked,锁住的那个线程:running,其他需要这个锁的线程:blocked
        • 开锁:mutex.release() => 锁状态:unlocked,需要这个锁的线程开始争夺这个锁,获这个锁的线程状态从blocked变成runing
        • 死锁:不同的线程持有不同的锁,并试图获取对方持有的锁时,可能会造成死锁 => 需尽量避免,如添加超时时间等
      • 队列 Queue
        • from queue import Queue
        • q=Queue(maxsize=0)
        • .get(block=True,timeout=None),.put(item,block=True,timeout=None)
        • .qsize(),.full(),.empty(),.join()
  • 注:
    • 主进程,一个主线程运行
    • 主线程结束,不会导致子线程结束
    • 线程间传递数据:
      • 全局变量
      • 传参
    • 线程自己的数据:
      • 局部变量
      • ThreadLocal
        • 定义一个全局变量 l=threading.local()
        • 每个Thread对它的读写互不影响 l.xxx,l.xxx=xxx
        • 类似一个dict
        • 虽然是全局变量,但每个线程都只能读写自己线程的独立副本,互不干扰
        • 解决了参数在一个线程中各个函数之间互相传递的问题

Sample1:Thread

from threading import Thread
import threading
import os,time,random

def task(name):
    print("%s do task %s start" % (threading.current_thread().name,name))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print("%s do task %s end (%.2f)" % (threading.current_thread().name,name,end-start))


class MyThread(Thread):
    def __init__(self,taskName):
        Thread.__init__(self)
        self.taskName=taskName

    def run(self):
        print("%s do task %s start" % (self.name,self.taskName)) # name属性中保存的是当前线程的名字
        start = time.time()
        time.sleep(random.random() * 3)
        end = time.time()
        print("%s do task %s end (%.2f)" % (self.name,self.taskName,end-start))


def thread_inst_test():
    t=Thread(target=task,args=('test',))
    t.start()

def thread_class_test():
    t=MyThread('test')
    t.start()


if __name__=='__main__':
    print("%s thread start" % threading.current_thread().name)

    # thread_inst_test()

    thread_class_test()

    print("%s thread end" % threading.current_thread().name)
$ python thread-demo.py
MainThread thread start
Thread-1 do task test start
MainThread thread end
Thread-1 do task test end (0.47)

Sample2: Multiple tasks

# test1: 异步,多线程交错打印
# result sample:
# Thread-1 print 1
# Thread-2 print 1
# Thread-1 print 2
# Thread-1 print 3
# Thread-1 done!
# Thread-2 print 2
# Thread-2 print 3
# Thread-2 done!
def test_multi_tasks_async():
    def async_task():
        for i in range(1,4):
            print("%s print %d " % (threading.current_thread().name,i))
            time.sleep(random.random())
        print("%s done!" % threading.current_thread().name)

    t1=Thread(target=async_task)
    t2=Thread(target=async_task)
    t1.start()
    t2.start()

# test2: 同步,多线程顺序打印
# result sample:
# Thread-1 print 1
# Thread-1 print 2
# Thread-1 print 3
# Thread-1 done!
# Thread-2 print 1
# Thread-2 print 2
# Thread-2 print 3
# Thread-2 done!
def test_multi_tasks_sync():
    mutex=threading.Lock()

    def sync_task():
        if mutex.acquire():
            for i in range(1,4):
                print("%s print %d " % (threading.current_thread().name,i))
                time.sleep(random.random())
            print("%s done!" % threading.current_thread().name)
            mutex.release()

    t1=Thread(target=sync_task)
    t2=Thread(target=sync_task)
    t1.start()
    t2.start()

Sample3:Mutex

# test: 操作共享资源,不加锁,结果不正确,shoud be 0
# result sample:
# Done Thread-1 do add,num = 208671
# Done Thread-2 do sub,num = -102681
def test_multi_tasks_global_val():
    num=0
    def add_task():
        global num
        for i in range(0,1000000):
            num+=1
        print("Done %s do add,num = %d " % (threading.current_thread().name,num))

    def sub_task():
        global num
        for i in range(0,1000000):
            num-=1
        print("Done %s do sub,num = %d " % (threading.current_thread().name,num))

    t1=Thread(target=add_task)
    t2=Thread(target=sub_task)
    t1.start()
    t2.start()

# test: 操作共享资源,加锁,得正确结果,be 0
# result sample:
# Done Thread-2 do sub,num = -36334
# Done Thread-1 do add,num = 0
def test_multi_tasks_global_val_mutex():
    lock=threading.Lock()
    num=0
    def add_task():
        nonlocal num
        for i in range(0,1000000):
            lock.acquire()
            num+=1
            lock.release()
        print("Done %s do add,num = %d " % (threading.current_thread().name,num))

    def sub_task():
        nonlocal num
        for i in range(0,1000000):
            lock.acquire()
            num-=1
            lock.release()
        print("Done %s do sub,num = %d " % (threading.current_thread().name,num))

    t1=Thread(target=add_task)
    t2=Thread(target=sub_task)
    t1.start()
    t2.start()

Sample4:Queue

Queue
# result sample:
# Consumer [0] Begin
# Consumer [0] handle item: 5.77
# Consumer [1] Begin
# Consumer [2] Begin
# Waiting for all threads done...
# Consumer [2] handle item: 4.51
# Consumer [1] handle item: 3.87
# Consumer [0] handle item: 1.61
# Consumer [1] handle item: 6.42
# Consumer [2] handle item: 0.17
# Consumer [1] handle item: 9.36
# Consumer [0] handle item: 4.88
# Consumer [1] handle item: 8.63
# Consumer [1] handle item: 0.27
# Consumer [2] Done
# Consumer [0] Done
# Consumer [1] Done
# All threads done.

def test_thread_queue():
    import queue

    def queue_consumer(index,q):
        print("Consumer [%s] Begin" % index)
        while not q.empty():
            item = q.get()
            if not item:
                time.sleep(1)
                continue
            do_task(index,item)
        print('Consumer [%s] Done' % index)

    def do_task(index,item):
        print("Consumer [%s] handle item: %s" % (index,item))
        time.sleep(random.random())

    q=queue.Queue()
    [q.put("%.2f" % (random.random()*10)) for i in range(10)]

    t_list=[]
    for i in range(3):
        t=threading.Thread(target=queue_consumer,args=(i,q,))
        t.start()
        t_list.append(t)

    print('Waiting for all threads done...')
    for t in t_list:
        t.join()
    print('All threads done.')

Sample5:ThreadLocal

ThreadLocal
# result sample:
# Done Thread-1 task: step=0
# Done Thread-5 task: step=40
# Done Thread-2 task: step=10
# Done Thread-3 task: step=20
# Done Thread-4 task: step=30

def test_thread_dict():
    thread_dict = {}

    def task(step):
        thread_dict[threading.current_thread()]=step
        do_task()

    def do_task():
        currentThread=threading.current_thread()
        step=thread_dict[currentThread]
        time.sleep(random.random())
        print("Done %s task: step=%s" % (currentThread.name,step))

    for i in range(5):
        t=Thread(target=task,args=(i*10,))
        t.start()


def test_thread_local():

    #thread_dict = {}
    thread_local = threading.local()

    def task(step):
        #thread_dict[threading.current_thread()]=step
        thread_local.step=step
        do_task()

    def do_task():
        currentThread=threading.current_thread()
        #step=thread_dict[currentThread]
        step=thread_local.step
        time.sleep(random.random())
        print("Done %s task: step=%s" % (currentThread.name,step))

    for i in range(5):
        t=Thread(target=task,args=(i*10,))
        t.start()

协程 Coroutine

Async programming allows you to write concurrent code that runs in a single thread. The first advantage compared to multiple threads is that you decide where the scheduler will switch from one task to another, which means that sharing data between tasks it's safer and easier. With asynchronous programming, you allow your code to handle other tasks while waiting for these other resources to respond.

  • async IO,被动通知,单线程多任务(非阻塞),执行效率高
  • 与多线程对比优势:
    • 没有线程切换的开销
    • 共享资源无需加锁也不会发生冲突(判断状态即可)

使用Python标准库:asyncio (Python 3.4引入) 编程模型: 一个消息循环,从asyncio模块中获取一个EventLoop的引用,把需要执行的协程扔到EventLoop中执行

  1. Coroutine 协同任务

     # method1: use `@asyncio.coroutine`
     @asyncio.coroutine
     def my_task(args):
         print('start')
         yield from asyncio.sleep(3)  # 用`yield from`调用另一个`coroutine`实现异步操作
         print('end')
    
     # method2: use `async`
     async def my_task(args):
         print('start')
         await asyncio.sleep(3)     # 用`await`调用另一个`coroutine`实现异步操作
         print('end')
    
     # return a coroutine object
     my_coroutine = my_task(args)
    
    • 一个用@asyncio.coroutineasync标注的function
    • xxx(args),不会执行方法体中的语句,只是返回一个coroutine object
    • async&await是Python 3.5引入的语法糖,async代替@asyncio.coroutine,await代替yield from(Note: await keyword can only be used inside an async def function
  2. EventLoop 事件循环(事件池)

     # 1. create loop: 从`asyncio`模块中获取一个`EventLoop`的引用
     loop = asyncio.new_event_loop()
    
     # 2. add coroutine to the loop,return a future object
     future = loop.create_task(my_coroutine)
    
     # 3. execute all coroutine added to the loop concurrently
     # method1:
     loop.run_until_complete(my_coroutine)
     # method2:
     loop.run_until_complete(future)
    
     loop.close()
    
    • 组织协调执行加入的协同任务
    • execute our asyncronous code and decide how to switch between async functions
  3. Running Tasks Concurrently(一个事件循环中执行多个task , 并发执行)

    • asyncio.wait(fs, *, loop=None, timeout=None, return_when='ALL_COMPLETED')
      • 等待任务完成,支持在完成第一个任务后或在指定的超时之后停止等待
      • return: two sets of Future: (done, pending)
        • done : 已完成的协程Task集合 {asyncio.Task,asyncio.Task,...}
        • pending : 超时未完成的协程Task集合
        • asyncio.Task : extends Future, could use .result() get task return value
      • usage:
        • done, pending = await asyncio.wait(coros_or_futures) -- in async func
        • done, pending = loop.run_until_complete(async.wait(coros_or_futures)) -- in normal func
    • asyncio.gather
      • 等待并按给定的顺序返回完成的任务结果
      • return:已完成的协程Task的结果值列表 [taskReturnValue1,taskReturnValue2,...]
      • usage:
        • results=await asyncio.gather(*coros_or_futures)
        • results=loop.run_until_complete(async.gather(*coros_or_futures))
        • 还可对任务进行高级别分组
            group1 = asyncio.gather(*[task1,task2])
            group2 = asyncio.gather(*[task3,task4])
            all_groups = asyncio.gather(group1, group2)
            results = loop.run_until_complete(all_groups)
          
    • 总结 asyncio.wait vs. asyncio.gather :
      • 都是用于获取结果的,且都不阻塞,直接返回一个生成器对象可用于 yield from / await
      • 都是两种用法
        • results = asyncio.run_until_completed(asyncio.wait/gather) 执行所有完成之后获取
        • results = await asyncio.wait/gather 在一个协程内获取结果
      • 但返回结果不一样
        • asyncio.wait 返回 tuple: (doneSet,pendingSet)
        • asyncio.gather 返回 list: [taskReturnValue1,taskReturnValue2,...]
  4. Task/Future(注: class Task(Future)):

     # method1: add a coroutine to the loop and returns a `Task` which is a subtype of `Future`
     future = loop.create_task(my_coroutine)
    
     # method2: adds a coroutine to the default loop
     future = asyncio.ensure_future(my_coroutine)
    
     # could add call back for the future object
     future.add_done_callback(result_handler())
    
    • works as a placeholder for the output of an asynchronous function
    • and it gives us information about the function state,
    • A future is created when we add a corutine to an event loop
  5. Run Result:

     # method1: use the `yield from` or `await`
     result=await my_coroutine
    
     # method2: add it to an event loop
     future=loop.create_task(my_coroutine)
     result=future.result()
    
    • Get the output of an async function from a coroutine
  6. Exception handling:

     # method1: try catch the exception
     try:    
         future.result() # this will re-raise the exception raised during the coroutine execution
     except Exception:
         pass
    
     # method2: get the exception information
     future.exception()
    
    • method1: try...except... 捕获异常
    • method2: 调用future.exception()异常信息
    • Note:unhandled exception raised inside a coroutine doesn't break the program,it will be stored inside the future and get Task exception was never retrieved error before program exit.

Sample1:@asyncio.coroutine & yield from vs. async & await

  1. @asyncio.coroutine & yield from

     import asyncio
     import random
    
     @asyncio.coroutine
     def hello(name):
         r=random.random()*3
         print('Hello %s, sleep %s' % (name,r))
         yield from asyncio.sleep(r)
         print('Hello %s wake up!' % name)
    
     if __name__ == '__main__':
         loop = asyncio.get_event_loop()                 # 获取EventLoop
         #tasks = [hello("Tom"), hello("Lucy")]
         t1=loop.create_task(hello("Tom"))
         t2=loop.create_task(hello("Lucy"))
         tasks=[t1,t2]                                   # t1,t2 is future
         loop.run_until_complete(asyncio.wait(tasks))    # 执行
         loop.close()
    
  2. async & await

     import asyncio
     import random
    
     async def hello(name):                                # 使用`async` 代替`@asyncio.coroutine`
         r=random.random()*3
         print('Hello %s, sleep %s' % (name,r))
         await asyncio.sleep(r)                            # 使用`await` 代替`yield from`
         print('Hello %s wake up!' % name)
    
     if __name__ == '__main__':
         loop = asyncio.get_event_loop()
         # tasks = [hello("Tom"), hello("Lucy")]
         t1=loop.create_task(hello("Tom"))
         t2=loop.create_task(hello("Lucy"))
         tasks=[t1,t2]
         loop.run_until_complete(asyncio.wait(tasks))
         loop.close()
    

Sample2: coroutine vs. future, asyncio.wait vs. asyncio.gather

  1. coroutine

     import asyncio
     import time,random
    
     async def do_async_task(index,item):
         print('[start]\t %s:\t %s' % (index,item))
         start=time.time()
         await asyncio.sleep(random.random())
         end=time.time()
         result ='[end]\t %s:\t %s (spent:%.2f)' % (index,item,(end-start))
         return result
    
     if __name__=='__main__':
         loop = asyncio.get_event_loop()
    
         task_list=[]
         for i in range(10):
             item="%.2f" % (random.random()*10)
             t=do_async_task(i,item)                 # t is coroutine
             task_list.append(t)
    
         print('Waiting for all sub-proc done...')
    
         # method1: use asyncio.wait
         done,pending=loop.run_until_complete(asyncio.wait(task_list))
         print(done,pending)
         for r in done:
             print("get result:",r.result())
    
         # method2: use asyncio.gather
         # results=loop.run_until_complete(asyncio.gather(*task_list))
         # print(results)
         # for r in result:
         #     print("get result:",r)
    
         loop.close()
         print('All sub-procs done.')
    
         # for task in task_list:
         #     # task:
         #     # type: coroutine
         #     # sample: <coroutine object do_async_task at 0x10f24c8c8>
         #     print(task)
    
  2. future

     import asyncio
     import time,random
    
     async def do_async_task(index,item):
         print('[start]\t %s:\t %s' % (index,item))
         start=time.time()
         await asyncio.sleep(random.random())
         end=time.time()
         result ='[end]\t %s:\t %s (spent:%.2f)' % (index,item,(end-start))
         return result
    
     def handle_result(future):
         print("callback:",future.result())
    
     if __name__=='__main__':
         loop = asyncio.get_event_loop()
         task_list=[]
         for i in range(10):
             item="%.2f" % (random.random()*10)
             t=do_async_task(i,item)                 # t is coroutine
             f=loop.create_task(t)                   # f is future
             f.add_done_callback(handle_result)
             task_list.append(f)
    
         print('Waiting for all sub-proc done...')
    
         # method1: use asyncio.wait
         done,pending=loop.run_until_complete(asyncio.wait(task_list))
         print(done,pending)
         for r in done:
             print("get result:",r.result())
    
         # method2: use asyncio.gather
         # results=loop.run_until_complete(asyncio.gather(*task_list))
         # print(results)
         # for r in result:
         #     print("get result:",r)
    
         loop.close()
         print('All sub-procs done.')
    
         # for task in task_list: 
         #     # task:
         #     # type: _asyncio.Task,subtype of `Future`
         #     # sample: <Task finished 
         #     # coro=<do_async_task() done, defined at asyncio-demo.py:5> 
         #     # result='[end]\t 7:\t... (spent:0.36)'>
         #     print("get task result:",task.result())
    
  3. Result sample:

     Waiting for all sub-proc done...
     [start]  0:  6.28
     [start]  1:  6.02
     [start]  2:  5.86
     [start]  3:  1.32
     [start]  4:  8.29
     [start]  5:  6.30
     [start]  6:  2.07
     [start]  7:  3.41
     [start]  8:  8.47
     [start]  9:  3.76
     callback: [end]  4:  8.29 (spent:0.11)
     callback: [end]  0:  6.28 (spent:0.18)
     callback: [end]  3:  1.32 (spent:0.20)
     callback: [end]  6:  2.07 (spent:0.31)
     callback: [end]  1:  6.02 (spent:0.51)
     callback: [end]  9:  3.76 (spent:0.51)
     callback: [end]  7:  3.41 (spent:0.75)
     callback: [end]  8:  8.47 (spent:0.78)
     callback: [end]  5:  6.30 (spent:0.79)
     callback: [end]  2:  5.86 (spent:1.00)
     get result: [end]    5:  6.30 (spent:0.79)
     get result: [end]    2:  5.86 (spent:1.00)
     get result: [end]    8:  8.47 (spent:0.78)
     get result: [end]    6:  2.07 (spent:0.31)
     get result: [end]    3:  1.32 (spent:0.20)
     get result: [end]    0:  6.28 (spent:0.18)
     get result: [end]    9:  3.76 (spent:0.51)
     get result: [end]    7:  3.41 (spent:0.75)
     get result: [end]    4:  8.29 (spent:0.11)
     get result: [end]    1:  6.02 (spent:0.51)
     All sub-procs done.
    

Sample3:queue

async def do_async_task(index,item):
    print('do task %s: Consumer[%s] %s' % (item[0],index,item[1]))
    start=time.time()
    await asyncio.sleep(random.random())
    end=time.time()
    result ='do task %s: Consumer[%s] %s (spent:%.2f)' % (item[0],index,item[1],(end-start))
    return result

def handle_result(future):
    print(future.result())

async def async_queue_consumer(index,q):
    print("Consumer[%s] Begin" % index)
    while not q.empty():
        item = await q.get()
        if not item:
            await asyncio.sleep(1)
            continue
        task=asyncio.create_task(do_async_task(index,item))
        task.add_done_callback(handle_result)
        await asyncio.gather(task)
    print('Consumer[%s] Done' % index)

if __name__=='__main__':   
    print("--- Test: asyncio queue ---")
    q=asyncio.Queue()
    [q.put_nowait((i,"%.2f" % (random.random()*10))) for i in range(10)]

    tasks = [async_queue_consumer(i,q) for i in range(3)]
    loop = asyncio.get_event_loop()

    print('Waiting for all sub-proc done...')
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()
    print('All sub-procs done.')

Result sample:

--- Test: asyncio queue ---
Waiting for all sub-proc done...
Consumer[1] Begin
Consumer[0] Begin
Consumer[2] Begin
do task 0: Consumer[1] 5.87
do task 1: Consumer[0] 7.46
do task 2: Consumer[2] 7.20
do task 1: Consumer[0] 7.46 (spent:0.05)
do task 3: Consumer[0] 7.28
do task 2: Consumer[2] 7.20 (spent:0.61)
do task 4: Consumer[2] 7.07
do task 0: Consumer[1] 5.87 (spent:0.72)
do task 5: Consumer[1] 9.09
do task 3: Consumer[0] 7.28 (spent:0.67)
do task 6: Consumer[0] 9.65
do task 4: Consumer[2] 7.07 (spent:0.33)
do task 7: Consumer[2] 6.64
do task 6: Consumer[0] 9.65 (spent:0.89)
do task 8: Consumer[0] 3.21
do task 7: Consumer[2] 6.64 (spent:0.74)
do task 9: Consumer[2] 1.78
do task 5: Consumer[1] 9.09 (spent:0.98)
Consumer[1] Done
do task 9: Consumer[2] 1.78 (spent:0.13)
Consumer[2] Done
do task 8: Consumer[0] 3.21 (spent:0.62)
Consumer[0] Done
All sub-procs done.

访问数据库

SQLite

  • SQLite是一种嵌入式数据库,它的数据库就是一个文件
  • Python就内置了SQLite3,可直接使用
  • 操作:建立连接Connection,打开游标,通过Cursor执行SQL语句,获得执行结果,关闭游标,提交事物,关闭连接
    • Cursor对象通过execute方法执行SQL语句(可使用?占位符)
    • 执行insert/update/delete后,可通过cursor.rowcount获取影响的行数
    • 执行select后,可通过cursor.featchall()获取结果集列表,每个元素都是一个tuple,对应一行记录

Sample:

import sqlite3
import os

# delte test.db
db_file = os.path.join(os.path.dirname(__file__), 'test.db')
if os.path.isfile(db_file):
    os.remove(db_file)


conn = sqlite3.connect('test.db')   # 建立连接(数据库文件不存在时,会创建)
cursor = conn.cursor()              # 创建并打开一个Cursor

try:
    # 执行SQL语句: create
    cursor.execute('create table user(id varchar(20) primary key, name varchar(20), score int)')

    # 执行SQL语句: insert
    cursor.execute(r"insert into user values ('A-001', 'Adam', 95)")
    print(cursor.rowcount)

    cursor.execute(r"insert into user values ('A-002', 'Bart', 62)")
    print(cursor.rowcount)

    cursor.execute(r"insert into user values ('A-003', 'Lisa', 78)")
    print(cursor.rowcount)

    # 提交事务
    conn.commit()

    # 执行SQL语句: select
    cursor.execute('select * from user where id=?', ('A-001',))
    records = cursor.fetchall()
    print(records)

finally:
    cursor.close()                  # 关闭Cursor
    conn.close()                    # 关闭连接

MySQL

pip install mysql-connector-python or pip install pymysql

#import mysql.connector as pymysql
import pymysql

conn=pymysql.connect(host="localhost",port=3316,user='root',password='123456',database='demo1')
cursor=conn.cursor()

try:
    # 执行SQL语句: drop
    cursor.execute('drop table if exists user')

    # 执行SQL语句: create
    cursor.execute('create table user(id varchar(20) primary key, name varchar(20), score int)')

    # 执行SQL语句: insert
    cursor.execute(r"insert into user values ('A-001', 'Adam', 95)")
    print(cursor.rowcount)

    cursor.execute(r"insert into user values ('A-002', 'Bart', 62)")
    print(cursor.rowcount)

    cursor.execute(r"insert into user values ('A-003', 'Lisa', 78)")
    print(cursor.rowcount)

    # 提交事务
    conn.commit()

    # 执行SQL语句: select
    # 注:这里占位符是'%',不是'?'
    cursor.execute('select * from user where id=%s', ('A-001',))
    records = cursor.fetchall()
    print(records)

finally:
    cursor.close()
    conn.close()

check result:

mysql> use demo1
Database changed
mysql> select * from user;
+-------+------+-------+
| id    | name | score |
+-------+------+-------+
| A-001 | Adam |    95 |
| A-002 | Bart |    62 |
| A-003 | Lisa |    78 |
+-------+------+-------+
3 rows in set (0.00 sec)

Redis

pip install redis

  1. 连接Redis数据库

     import redis
    
     # 1. 连接Redis数据库
     # method1: 直接连接
     # client=redis.Redis(host='127.0.0.1',port=6379,password='123456')
    
     # method2: 连接池连接(预先创建多个连接, 进行redis操作时, 直接获取已经创建的连接进行操作, 完成后,不会释放, 用于后续的其他redis操作)
     pool = redis.ConnectionPool(host='localhost', port=6379,password='123456')
     client = redis.Redis(connection_pool=pool)
    
     # check exist keys:
     # results=client.keys('*')
     # print("list keys * :",results)
    
  2. 基础数据操作: 基本redis的命令名与redis模块中的函数名一致

    • String: key -> value

        # 设置:
        # set(key, value, ex=None, px=None, nx=False, xx=False)
        # setnx(key, value)             只有key不存在时设置
        # setex(key, value, time)        设置过期时间(秒)
        # mset({key:value,...})          批量设置值
        #
        # 获取:
        # get(key)
        # mget(key1,key2,...)
        # strlen(key)                   key对应值的字节长度
        #
        # 值追加内容:
        # append(key,addValue)
        def test_string():
            print("Test String:")
            result=client.set('a',1)
            print(result)                   # True
      
            result=client.get('a')
            print(result)                   # b'1'
      
            result=client.append('a',23)
            print(result)                   # 3
      
            result=client.get('a')
            print(result)                   # b'123'
      
    • Hash: 一个name对应一个dic字典 n1 -> k1:v1,k2,v2,n2 -> kx:vx,..

        # hset(name, key, value)
        # hget(name, key)
        # hgetall(name)
        # hmset(name, mapping)
        # hmget(name, keys, *args)
        #
        # hlen(name)
        # hkeys(name)
        # hvals(name)
        #
        # hexists(name, key)
        # hdel(name, *keys)
        # hincrby(name, key, amount=1)
        def test_hash():
            print("Test Hash:")
      
            result = client.hset('stu-001','name','Tom')
            print(result)                       # 1
            result = client.hmset('stu-001',{'age':19,'gender':'male'})
            print(result)                       # True
            result = client.hgetall('stu-001')
            print(result)                       # {b'name': b'Tom', b'age': b'19', b'gender': b'male'}
      
            result = client.hmset('stu-002',{'name':'Lucy','age':16,'gender':'Female'})
            print(result)                       # True
            result = client.hgetall('stu-002')
            print(result)                       # {b'name': b'Lucy', b'age': b'16', b'gender': b'Female'}
      
            result = client.hlen('stu-001')
            print(result)                       # 3
            result = client.hkeys('stu-001')
            print(result)                       # [b'name', b'age', b'gender']
            result = client.hvals('stu-001')
            print(result)                       # [b'Tom', b'19', b'male']
      
            result = client.hincrby('stu-001','age',amount=5)
            print(result)                       # 24
            result = client.hgetall('stu-001')
            print(result)                       # {b'name': b'Tom', b'age': b'24', b'gender': b'male'}
      
    • List: 一个name对应一个列表存储,n1 -> [v1,v2,...],n2 -> [...]

       # lpush(name, *values),rpush(name, *values) 
       # lpushx(name, *values),rpushx(name, *values)  name存在时,才能加入
       # linsert(name, where, refvalue, value)        name对应的列表的refValue值的前或后where插入一个value
       #
       # llen(name)                        name列表长度
       # lindex(name, index)               索引获取元素
       # lrange(name, start, end)          分片获取元素
       #
       # lset(name, index, value)          索引位置重新赋值
       # lpop(name)                        移除列表的左侧第一个元素,返回值则是第一个元素
       # lrem(name, value, num=0)          删除name对应的list中的指定值
       # ltrim(name, start, end)           移除列表内没有在该索引之内的值
      
       def test_list():
           print("Test List:")
      
           result = client.lpush('group1','stu-A','stu-B','stu-C','stu-D','stu-E')
           print(result)                      # 5
           result = client.lpush('group2','child-A','child-B')
           print(result)                       # 3
      
           result = client.llen('group1')
           print(result)                       # 5
           result = client.lrange('group1',2,5)
           print(result)                       # [b'stu-C', b'stu-B', b'stu-A']
      
    • Set: 不允许有重复的集合,n1 -> {v1,v2,...},n2 -> {...}

        # sadd(name, *values)           添加元素
        # srem(name,*values)            删除元素
        # smembers(name)                列出集合所有元素
        # scard(name)                   获取元素个数
        # sismember(name, value)        测试素是否在
      
        # spop(name)                    从集合中随机弹出一个元素
        # smove(src, dst, value)        将一个元素从集合1移到集合2
      
        # sdiff/sdiffstore(keys, *args) 差集(以前一个集合为标准)
        # sinter(keys, *args)           交集
        # sunion(keys, *args)           并集
      
        def test_set():
            print("Test Set:")
      
            result = client.sadd('stdGrp1','stu-A','stu-B','stu-B','stu-C')
            print(result)                       # 3
            result = client.smembers('stdGrp1')
            print(result)                       # {b'stu-B', b'stu-C', b'stu-A'}
            result = client.scard('stdGrp1')
            print(result)                       # 3
      
            result = client.sadd('stdGrp2',*('stu-A','stu-C','stu-D'))
            print(result)                       # 3
      
            result = client.sdiff('stdGrp1','stdGrp2')
            print(result)                       # {b'stu-B'}
            result = client.sinter('stdGrp1','stdGrp2')
            print(result)                       # {b'stu-C', b'stu-A'}
            result = client.sunion('stdGrp1','stdGrp2')
            print(result)                       # {b'stu-B', b'stu-C', b'stu-A', b'stu-D'}
      
    • ZSet: 有序集合(set的升级), 每次添加修改元素后会自动重新排序,n1 -> {(value1,score1),(value2,score2),...}

        # 每一个元素有两个值,值value和分数score(专门用来做排序)
        # 元素rank(表index,即下标索引)
        # lexicographical: 相同的分值时,有序集合的元素会根据成员的值逐字节对比
      
        # zadd(name, mapping, nx=False, xx=False, ch=False, incr=False) 添加
      
        # zrem(name, values)                                            删除
        # zremrangebyrank(name, start, end)
        # zremrangebyscore(name, minscore, maxscore)
      
        # zscan(name, cursor=0, match=None, count=None, score_cast_func=float) 查看
        # zscan_iter(name, match=None, count=None,score_cast_func=float)
      
        # zrange/zrevrange(name, start, end, desc=False, withscores=False, score_cast_func=float) 获取rank范围内的元素
        # zrangebyscore(name,minscore,maxscore,...)   获取score范围内的元素
      
        # zrank/zrevrank(name, value)                  获取元素rank
        # zscore(name, value)                          获取元素score
      
        # zcard(name)                         所有元素个数
        # zcount(name, minscore, maxscore)    指定score范围内的元素数量
      
        # zincrby(name, value, amount)                自增
        # zinterstore(dest, keys, aggregate=None)     交集(相同值不同分数,则按照 aggregate=SUM/MIN/MAX 进行操作)
        # zunionstore(dest, keys, aggregate=None)     并集
      
        def test_zset():
            print("Test ZSet:")
      
            # 创建
            result = client.zadd('car1',{'car-A':10,'car-B':20,'car-C':30,'car-D':40}) 
            print(result)                    # 4
      
            # 列出
            result = client.zscan('car1')
            print(result)                   # (0, [(b'car-A', 10.0), (b'car-B', 20.0), (b'car-C', 30.0), (b'car-D', 40.0)])
      
            # 切片,获取元素列表
            result = client.zrange('car1',1,3)
            print(result)                   # [b'car-B', b'car-C', b'car-D']
            result = client.zrangebyscore('car1',15,35)
            print(result)                   # [b'car-B', b'car-C']
      
            # 统计数量
            result = client.zcard('car1')
            print(result)                   # 4
            result = client.zcount('car1',15,35)
            print(result)                   # 2
      
            # 获取元素属性:rank,score
            result = client.zrank('car1','car-C')
            print(result)                   # 2
            result = client.zscore('car1','car-C')
            print(result)                   # 30.0
      
            # 交集
            result = client.zadd('car2',{'car-B':25,'car-D':45,'car-E':55})
            print(result)                   # 3
      
            result = client.zinterstore('car-inter',('car1','car2'))
            print(result)                   # 2
            result = client.zscan('car-inter')
            print(result)                   # (0, [(b'car-B', 45.0), (b'car-D', 85.0)])
      
            result = client.zinterstore('car-inter',('car1','car2'),aggregate='MAX')
            print(result)                   # 2
            result = client.zscan('car-inter')
            print(result)                   # (0, [(b'car-B', 25.0), (b'car-D', 45.0)])
      
  3. 其他常用操作

     # flushdb(asynchronous=False)
     # flushall(asynchronous=False)
     # delete( *names)
     # exists( name)
     # keys( pattern='*')
     # expire(name ,time)
     # rename( src, dst)
     # move( name, db)
     # randomkey()
     # type(name)
    
     def test_others():
         result = client.keys()
         print(result)
         # [b'car-inter', b'car1', b'car2', b'car', b'stdGrp1', b'stdGrp2', b'stu-001', b'a', b'top:dupefilter', b'stu-002', b'top:items', b'group1', b'tt']
    
         result = client.delete('car')
         print(result)       # 1
    
         result = client.exists('car')
         print(result)       # 0
    
         result = client.type('car-inter')
         print(result)       # b'zset'
    
  4. 管道: 批量提交命令,还可用来实现事务transation

     # pipeline(transaction=True,shard_hint=None) 默认情况下一次pipline是原子性操作
     # pipe.watch(name) -- 乐观锁,watch的对象不可改
     # pipe.unwatch()
    
     def test_pipeline():
         import time
    
         client.set('cnt',10)
         result=client.get('cnt')
         print('initial cnt:',result)
    
         pipe=client.pipeline(transaction=True)
         pipe.watch('cnt')               # 加锁
         try:
             pipe.multi()
    
             cnt=int(client.get('cnt'))
             pipe.set('a', 1)
             pipe.set('cnt',cnt+1)
             pipe.set('b',2)
    
             print('sleep...')
             time.sleep(5)               # 此时,若另一个客户端修改了cnt,则这段操作提交(执行execute)时会报错
    
             print('execute...')
             pipe.execute()
         except redis.exceptions.WatchError as ex:
             print("pipe fail:",ex)
         else:
             print("pipe success")
         finally:
             print("finally: a=%s,cnt=%s,b=%s" % (client.get('a'),client.get('cnt'),client.get('b')))
             pipe.unwatch()              # 解锁
    
  5. 发布/订阅 -- 不推荐使用

     # 发布: 
     #   publish(channel,msg)     
     #   => redis client execute : `publish channel msg` 
     # 订阅: 
     #   pubsub().subscribe(channel).parse_response(block,timeout)  
     #   => redis client execute : `subscribe channel` (取消使用命令punsubscribe/unsubscribe)
     def test_publish():
         result=client.publish("channel-1","Hello!")
         print("result:",result)     # 0
    
         while True:
             msg = "This is %.2f" % (random.random()*10)
             print('sending...',msg)
    
             result=client.publish("channel-1",msg)
             if result==1:
                 print('send success')
             else:
                 print('send fail')
    
             isCondinue=input("continue?(y/n)")
             if isCondinue=='n':
                 break;
    
         print('Done!')
    
     def test_subscribe():
         subscribeObj = client.pubsub()
         result=subscribeObj.subscribe("channel-1")
         print(result)           # None
    
         while True:
             print('receiving...')
             msg=subscribeObj.parse_response(block=False,timeout=60)
             # 第一次收到:[b'subscribe', b'channel-1', 1]
             # 当另一个客户端向此channel发布消息时(eg: `publish channel-1 "Hello World"`)
             # 这里会收到: [b'message', b'channel-1', b'Hello World']
             print("receive msg:",msg)   
             isCondinue=input("continue?(y/n)")
             if isCondinue=='n':
                 break;
         print("Done!")
    
     if __name__=='__main__':
         # test_publish()
         # test_subscribe()
    

MongoDB

pip install pymongo

import pymongo
import json
from datetime import datetime

# 1. 建立连接
mongoConnStr="mongodb://cj:123456@localhost:27017/?authSource=admin"
client=pymongo.MongoClient(mongoConnStr)    
print("list dbs:",client.list_database_names())                         # 列出dbs

db=client['mg_test']
print("list collections of mg_test db:",db.list_collection_names())     # 列出collections(类似表)

collection=db['movies']
print("get collection:movies count:",collection.estimated_document_count())

# 2. clear:
# ret=collection.delete_many({})           
# print(ret.deleted_count,ret.acknowledged,ret.raw_result)

# 3. update_one -- update or insert record
contents=json.load(open('test.json','r'))           # load: file -> string -> python obj

print('update or insert records:')
for record in contents:
    id=record.pop('id')
    t=datetime.now()
    print("to store record...id=%s,title=%s" % (id,record['title']))

    record['last_update_time']=t
    ret=collection.update_one({'_id':id}
            ,{'$setOnInsert':{'create_time':t},'$set':record}
            ,upsert=True)

    print(ret.matched_count,ret.modified_count,ret.upserted_id,ret.acknowledged,ret.raw_result)

# 4. find -- list records
print('list stored records:')

results=collection.find({},{'_id':1,'title':1,'rate':1})
for result in results:
    print(result)
# results sample:
# {'_id': '27109879', 'rate': '6.5', 'title': '硬核'}
# {'_id': '26707088', 'rate': '7.1', 'title': '奎迪:英雄再起'}
# {'_id': '30334122', 'rate': '6.1', 'title': '芳龄十六'}
# {'_id': '1945750', 'rate': '7.7', 'title': '污垢'}
# {'_id': '26611891', 'rate': '6.8', 'title': '欢乐满人间2'}

print('list rate>=7 records:')
results=collection.find({'rate':{'$gte':"7"}},{'_id':1,'title':1,'rate':1}).limit(5)
for record in results:
    print(record)

# results sample:
# {'_id': '26707088', 'rate': '7.1', 'title': '奎迪:英雄再起'}
# {'_id': '1945750', 'rate': '7.7', 'title': '污垢'}

# 5. aggregate -- summary records
print('list summary by rate level')
# $cond:{if:{$gte:['$rating',8]},then:1,else:0} 
# $addFields:{'rate_number':{$convert:{input:"$rate",to:"int"}}} 
# use $project also could add fields
results=collection.aggregate([
    {'$addFields':{
        'rate_number':{'$convert':{'input':"$rate",'to':"double"}}
        ,'rate_level':{'$cond':[
            {'$lt':['$rate','7.5']}
            ,{'$cond':[{'$gte':['$rate','6.5']},'Middle','Low']}
            ,'High'
        ]}
    }}
    # ,{'$project':{
    #     '_id':1
    #     ,'rate':1
    #     ,'title':1
    #     # ,'rate_level':{'$cond':[
    #     #     {'$lt':['$rate','7.5']}
    #     #     ,{'$cond':[{'$gte':['$rate','6.5']},'Middle','Low']}
    #     #     ,'High'
    #     # ]}
    # }}
    ,{'$group':{
        '_id':"$rate_level"
        ,'count':{'$sum':1}
        ,'avg_rate':{'$avg':'$rate_number'}
        #,'rate_list':{'$push':'$rate_number'}
        ,'rate_list':{'$push':{'$concat':['$title',':','$rate']}}
    }}
    ,{'$sort':{'count':-1}}
    #,{'$limit':10}
])
for record in results:
    print(record)

# results sample:
# {'_id': 'Middle', 'count': 3, 'avg_rate': 6.8, 'rate_list': ['硬核:6.5', '奎迪:英雄再起:7.1', '欢乐满人间2:6.8']}
# {'_id': 'Low', 'count': 1, 'avg_rate': 6.1, 'rate_list': ['芳龄十六:6.1']}
# {'_id': 'High', 'count': 1, 'avg_rate': 7.7, 'rate_list': ['污垢:7.7']}


print("Done!")

Reference