4.3. Protocol Context Manager

Context managers are a Python construct that allow you to manage resources, such as files or network connections, in a safe and efficient way. They ensure that resources are properly acquired and released, even in the face of errors or exceptions.

In Python, context managers are implemented using the with statement. The with statement provides a way to wrap a block of code with methods defined by a context manager. When the block of code is entered, the __enter__() method of the context manager is called, and when the block of code is exited, the __exit__() method is called.

For example, the following code uses the with statement to open a file and read its contents:

>>> 
... with open('/tmp/myfile.txt', mode='r') as f:
...    contents = f.read()

In this example, the open() function returns a context manager that manages the file resource. When the with block is entered, the __enter__() method of the context manager is called, which opens the file. When the with block is exited, the __exit__() method is called, which closes the file.

Context managers are a powerful tool in Python, and are used extensively in libraries and frameworks to manage resources and ensure safe and efficient code execution.

>>> class ContextManager:
...     def __enter__(self):
...         return self
...
...     def __exit__(self, *args):
...         return None
>>>
>>>
>>> with ContextManager() as cm:
...     print('Do something with: cm')
Do something with: cm

SetUp:

>>> from typing import Protocol

Use case:

>>> with open('/tmp/myfile.txt') as file:
...     content = file.read()

Equivalent code (naive):

>>> file = open('/tmp/myfile.txt')
>>> content = file.read()
>>> file.close()

Equivalent code (less naive):

>>> file = open('/tmp/myfile.txt').__enter__()
>>> content = file.read()
>>> file.__exit__()

Equivalent code:

>>> try:
...     file = open('/tmp/myfile.txt').__enter__()
...     content = file.read()
... finally:
...     file.__exit__()

This is because:

>>> 
... class ContextManger(Protocol):
...     def __enter__(self): ...
...     def __exit__(self, exc_type, exc_val, exc_tb): ...
...
...
... with ContextManger as obj:
...     ...

4.3.1. Typing

  • contextlib.AbstractContextManager

  • contextlib.AbstractAsyncContextManager

4.3.2. Example

>>> class User:
...     def __init__(self, firstname, lastname):
...         self.firstname = firstname
...         self.lastname = lastname
...
...     def __enter__(self):
...         print('Entering the block')
...         return self
...
...     def __exit__(self, *args):
...         print('Exiting the block')
...
...     def say_hello(self):
...         print(f'Hello {self.firstname} {self.lastname}')

Now we can use context manager:

>>> with User('Mark', 'Watney') as mark:
...     mark.say_hello()
...
Entering the block
Hello Mark Watney
Exiting the block

Is equivalent to:

>>> mark = User('Mark', 'Watney')
>>>
>>> mark = mark.__enter__()
Entering the block
>>>
>>> mark.say_hello()
Hello Mark Watney
>>>
>>> mark.__exit__()
Exiting the block

4.3.3. Contex Manager

We need to import time() function to get current timestamp (number of seconds from 1970-01-01 00:00:00 UTC):

>>> from time import time

Define our context manager:

>>> class Timeit:
...     def __enter__(self):
...         self.start = time()
...
...     def __exit__(self, *args):
...         self.stop = time()
...         duration = self.stop - self.start
...         print(f'Duration: {duration:.4f} seconds')
>>> 
... with Timeit():
...     result = [x**x for x in range(0, 10_000)]
...
Duration: 5.9882 seconds

4.3.4. Context Decorator Class

  • Inherit from contextlib.ContextDecorator

  • Class become context manager decorator

  • Mind the brackets in decorator @Timeit()

We need to import time() function to get current timestamp (number of seconds from 1970-01-01 00:00:00 UTC). Moreover, this time we need also contextlib.ContextDecorator for our class to inherit from:

>>> from time import time
>>> from contextlib import ContextDecorator

Define our context manager:

>>> class Timeit(ContextDecorator):
...     def __enter__(self):
...         self.start = time()
...
...     def __exit__(self, *args):
...         self.stop = time()
...         duration = self.stop - self.start
...         print(f'Duration: {duration:.4f} seconds')

Define the function which will be automatically wrapped by context manager. Mind the brackets in @Timeit():

>>> @Timeit()
... def run():
...     result = [x**x for x in range(0, 10_000)]

Calling function will result in executing context manager:

>>> run()  
Duration: 5.9302 seconds

4.3.5. Context Decorator Function

  • Split function for parts before and after yield

  • Code before yield becomes __enter__()

  • Code after yield becomes __exit__()

We need to import time() function to get current timestamp (number of seconds from 1970-01-01 00:00:00 UTC):

>>> from time import time
>>> from contextlib import contextmanager

Define our context manager. Mind that Python will split our function for parts before and after yield. Code before yield becomes __enter__() and code after yield becomes __exit__():

>>> @contextmanager
... def timeit():
...     start = time()
...     yield
...     end = time()
...     duration = stop - start
...     print(f'Duration: {duration:.4f} seconds')

Now we can use our function as a context manager:

>>> with timeit():  
...     result = [x**x for x in range(0, 10_000)]
...
Duration 4.0250 seconds

4.3.6. Many Context Managers

>>> def convert(json_string) -> str:
...     ...
>>> 
... with open('/tmp/myfile.json', mode='r') as infile, \
...      open('/tmp/myfile.csv', mode='w') as outfile:
...     json_string = infile.read()
...     csv_string = convert(json_string)
...     outfile.write(csv_string)
>>> 
... with (CtxManager() as example):
...     ...
>>> 
... with (
...     CtxManager1(),
...     CtxManager2()
... ):
...     ...
>>> 
... with (CtxManager1() as example,
...       CtxManager2()):
...     ...
>>> 
... with (CtxManager1(),
...       CtxManager2() as example):
...     ...
>>> 
... with (
...     CtxManager1() as example1,
...     CtxManager2() as example2
... ):
...     ...
>>> 
... with (
...     CtxManager1() as example1,
...     CtxManager2() as example2,
...     CtxManager3() as example3,
... ):
...     ...

4.3.7. Use Case - 0x01

>>> from contextlib import contextmanager
>>>
>>>
>>> @contextmanager
... def html_tag(name):
...     print(f'<{name}>')
...     yield
...     print(f'</{name}>')
>>>
>>>
>>> with html_tag('p'):
...     print('We choose to go to the Moon.')
...
<p>
We choose to go to the Moon.
</p>

4.3.8. Use Case - 0x02

  • Files

SetUp:

>>> from pathlib import Path
>>>
>>> Path('/tmp/myfile.txt').touch()

Open/Close:

>>> f = open('/tmp/myfile.txt')
>>>
>>> try:
...     content = f.read()
... finally:
...     f.close()

Context Manager:

>>> with open('/tmp/myfile.txt') as f:
...     content = f.read()

Story about file allocation table:

$ uptime
11:29  up 39 days,  2:33, 2 users, load averages: 2.97 4.23 4.41

$ lsof |wc -l
   12710
uint32_max = 4_294_967_295
char* file[uint32_max];

file_alloc[0] = '/tmp/myfile1.txt'
file_alloc[1] = '/tmp/myfile2.txt'
file_alloc[2] = '/tmp/myfile3.txt'
...
file_alloc[4_294_967_294] = '/tmp/myfile4294967294.txt'
file_alloc[4_294_967_295] = '/tmp/myfile4294967295.txt'
file_alloc[4_294_967_296] -> KernelPanic

4.3.9. Use Case - 0x03

>>> class File:
...     def __init__(self, filename):
...         self.filename = filename
...
...     def read(self):
...         pass
...
...     def __enter__(self):
...         return self
...
...     def __exit__(self, exc_type, exc_val, exc_tb):
...         pass
>>>
>>>
>>> with File('/tmp/myfile.txt') as file:
...     content = file.read()

4.3.10. Use Case - 0x04

  • Database

>>> import sqlite3
>>>
>>>
>>> DATABASE = ':memory:'
>>>
>>> SQL_CREATE_TABLE = """
...     CREATE TABLE IF NOT EXISTS users (
...         id INTEGER PRIMARY KEY AUTOINCREMENT,
...         firstname TEXT NOT NULL,
...         lastname TEXT NOT NULL,
...         age INTEGER
...     )
... """
>>>
>>> SQL_INSERT = """
...     INSERT INTO users VALUES(NULL, :firstname, :lastname, :age)
... """
>>>
>>> SQL_SELECT = """
...     SELECT * FROM users
... """
>>>
>>> DATA = [
...     {'firstname': 'Mark', 'lastname': 'Watney', 'age': 42},
...     {'firstname': 'Melissa', 'lastname': 'Lewis', 'age': 41},
...     {'firstname': 'Rick', 'lastname': 'Martinez', 'age': 40},
...     {'firstname': 'Alex', 'lastname': 'Vogel', 'age': 42},
...     {'firstname': 'Beth', 'lastname': 'Johanssen', 'age': 29},
...     {'firstname': 'Chris', 'lastname': 'Beck', 'age': 36},
... ]
>>>
>>>
>>> with sqlite3.connect(DATABASE) as db:  
...     db.execute(SQL_CREATE_TABLE)
...     db.executemany(SQL_INSERT, DATA)
...     db.row_factory = sqlite3.Row
...
...     for row in db.execute(SQL_SELECT):
...         print(dict(row))
...
<sqlite3.Cursor object at 0x...>
<sqlite3.Cursor object at 0x...>
{'id': 1, 'firstname': 'Mark', 'lastname': 'Watney', 'age': 42}
{'id': 2, 'firstname': 'Melissa', 'lastname': 'Lewis', 'age': 41}
{'id': 3, 'firstname': 'Rick', 'lastname': 'Martinez', 'age': 40}
{'id': 4, 'firstname': 'Alex', 'lastname': 'Vogel', 'age': 42}
{'id': 5, 'firstname': 'Beth', 'lastname': 'Johanssen', 'age': 29}
{'id': 6, 'firstname': 'Chris', 'lastname': 'Beck', 'age': 36}

4.3.11. Use Case - 0x05

  • Lock

Without context manager:

>>> from threading import Lock
>>>
>>>
>>> lock = Lock()
>>> lock.acquire()
True
>>>
>>> try:
...     print('Critical section 1')
...     print('Critical section 2')
... finally:
...     lock.release()
...
Critical section 1
Critical section 2

With context manager:

>>> from threading import Lock
>>>
>>>
>>> mylock = Lock()
>>>
>>> with mylock:
...     print('Critical section 1')
...     print('Critical section 2')
...
Critical section 1
Critical section 2

4.3.12. Use Case - 0x06

SetUp:

>>> from threading import Lock

Define decorator to automatically use context manager with lock:

>>> def lock(mylock: Lock):
...     def decorator(func):
...         def wrapper(*args, **kwargs):
...             with mylock:
...                 return func(*args, **kwargs)
...         return wrapper
...     return decorator

Usage:

>>> mylock = Lock()
>>>
>>> @lock(mylock)
... def write(file, content):
...     print(f'Writing "{content}" to {file}')
>>>
>>>
>>> write(file='/tmp/myfile.txt', content='hello')
Writing "hello" to /tmp/myfile.txt

4.3.13. Use Case - 0x07

  • Microbenchmark

SetUp:

>>> from time import time

Define Context Manager to measure start, stop times and calculate duration:

>>> class Timeit:
...     def __enter__(self):
...         self.start = time()
...
...     def __exit__(self, *args):
...         self.stop = time()
...         duration = self.stop - self.start
...         print(f'Duration: {duration:.4f} seconds')

Let's define some constants for tests:

>>> firstname = 'Mark'
>>> lastname = 'Watney'
>>> repetitions = 10_000_000

Microbenchmark for concatenation using f-string:

>>> with Timeit():  
...     for _ in range(repetitions):
...         f'{firstname}{lastname}'
Duration: 1.3408 seconds

Microbenchmark for concatenation using add (+) operator:

>>> with Timeit():  
...     for _ in range(repetitions):
...         firstname + lastname
Duration: 1.2745 seconds

Microbenchmark for concatenation using modulo (%) operator:

>>> with Timeit():  
...     for _ in range(repetitions):
...         '%s%s' % (firstname, lastname)
Duration: 2.1886 seconds

Microbenchmark for concatenation using modulo (%) operator:

>>> with Timeit():  
...     for _ in range(repetitions):
...         '%(fname)s%(lname)s' % {'fname': firstname, 'lname': lastname}
Duration: 4.1019 seconds

Microbenchmark for concatenation using str.format() method:

>>> with Timeit():  
...     for _ in range(repetitions):
...         '{}{}'.format(firstname, lastname)
Duration: 2.6623 seconds
>>> with Timeit():  
...     for _ in range(repetitions):
...         '{0}{1}'.format(firstname, lastname)
Duration: 2.7617 seconds

Microbenchmark for concatenation using str.format() method:

>>> with Timeit():  
...     for _ in range(repetitions):
...         '{fname}{lname}'.format(fname=firstname, lname=lastname)
Duration: 5.3505 seconds

4.3.14. Use Case - 0x08

>>> from unittest import IsolatedAsyncioTestCase
>>> from httpx import AsyncClient, Response, HTTPStatusError
>>> from http import HTTPStatus
>>>
>>>
>>> BASE_URL = 'https://python3.info'
>>>
>>> async def request(method: str = 'GET',
...             path: str = '/',
...             data: dict | None = None,
...             headers: dict | None = None,
...             ) -> Response:
...     async with AsyncClient(base_url=BASE_URL) as ac:
...         return await ac.request(method=method, url=path, data=data, headers=headers)
...
...
>>> class WebsiteTest(IsolatedAsyncioTestCase):
...     async def test_index(self):
...         resp = await request('GET', '/index.html')
...         self.assertEqual(resp.status_code, HTTPStatus.OK)
...         self.assertIn('Python - from None to AI', resp.text)
...         self.assertIn('Matt Harasymczuk', resp.text)
...         self.assertIn('Creative Commons Attribution-ShareAlike 4.0 International License', resp.text)
...
...     async def test_license(self):
...         resp = await request('GET', '/LICENSE.html')
...         self.assertEqual(resp.status_code, HTTPStatus.OK)
...         self.assertIn('Matt Harasymczuk', resp.text)
...         self.assertIn('matt@astronaut.center', resp.text)
...         self.assertIn('last update: ', resp.text)
...         self.assertIn('Creative Commons Attribution-ShareAlike 4.0 International Public License', resp.text)
...
...     async def test_login(self):
...         resp = await request('POST', '/login.html', data={'username':'mwatney', 'password': 'Ares3'})
...         self.assertEqual(resp.status_code, HTTPStatus.FORBIDDEN)
...         with self.assertRaises(HTTPStatusError):
...             resp.raise_for_status()
...
...     async def test_install(self):
...         resp = await request('GET', '/install.html')
...         self.assertEqual(resp.status_code, HTTPStatus.OK)
...         with self.subTest('Python'):
...             self.assertNotIn('3.8', resp.text)
...             self.assertNotIn('3.9', resp.text)
...             self.assertNotIn('3.10', resp.text)
...             self.assertIn('3.11', resp.text)
...             self.assertIn('3.12', resp.text)
...         with self.subTest('PyCharm'):
...             self.assertNotIn('2021.1', resp.text)
...             self.assertNotIn('2021.2', resp.text)
...             self.assertNotIn('2021.3', resp.text)
...             self.assertNotIn('2022.1', resp.text)
...             self.assertNotIn('2022.2', resp.text)
...             self.assertNotIn('2022.3', resp.text)
...             self.assertNotIn('2023.1', html.text)
...             self.assertNotIn('2023.1', html.text)
...             self.assertIn('2023.3', html.text)
...         with self.subTest('Git'):
...             self.assertIn('2.40 lub nowszy', resp.text)

4.3.15. Use Case - 0x09

In the most general sense, the Session establishes all conversations with the database and represents a 'holding zone' for all the objects which you've loaded or associated with it during its lifespan. It provides the interface where SELECT and other queries are made that will return and modify ORM-mapped objects. The ORM objects themselves are maintained inside the Session, inside a structure called the identity map - a data structure that maintains unique copies of each object, where 'unique' means 'only one object with a particular primary key'.

>>> 
... from sqlalchemy import create_engine
... from sqlalchemy.orm import Session
...
... engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/")

Create session and add objects:

>>> 
... with Session(engine) as session:
...     session.add(some_object)
...     session.add(some_other_object)
...     session.commit()

The long-form sequence of operations illustrated above can be achieved more succinctly by making use of the SessionTransaction object returned by the Session.begin() method, which provides a context manager interface for the same sequence of operations:

>>> 
... with Session(engine) as session:
...     with session.begin():
...         session.add(some_object)
...         session.add(some_other_object)

Create session and add objects. Inner context calls session.commit(), if there were no exceptions. Outer context calls session.close()

There could be several context managers entered at the same time:

>>> 
... with Session(engine) as session, session.begin():
...     session.add(some_object)
...     session.add(some_other_object)

Create session and add objects. Inner context calls session.commit(), if there were no exceptions. Outer context calls session.close()

4.3.16. References

4.3.17. Assignments

Code 4.50. Solution
"""
* Assignment: Protocol ContextManager File
* Complexity: easy
* Lines of code: 14 lines
* Time: 5 min

English:
    1. Define class `File` with parameter: `filename: str`
    2. `File` must implement Context Manager protocol
    3. `File` buffers lines added using `File.append(text: str)` method
    4. On `with` block exit, `File` class:
        a. Creates file (if not exists)
        b. Opens file
        c. Writes buffer to file
        d. Clears buffer
        e. Closes file
    5. Run doctests - all must succeed

Polish:
    1. Stwórz klasę `File` z parametrem: `filename: str`
    2. `File` ma implementować protokół Context Manager
    3. `File` buforuje linie dodawane za pomocą metody `File.append(text: str)`
    4. Na wyjściu z bloku `with`, klasa `File`:
        a. Tworzy plik (jeżeli nie istnieje)
        b. Otwiera plik
        c. Zapisuje bufor do pliku
        d. Czyści bufor
        e. Zamyka plik
    5. Uruchom doctesty - wszystkie muszą się powieść

Hints:
    * Append newline character (`\n`) before adding to buffer

Tests:
    >>> import sys; sys.tracebacklimit = 0
    >>> from os import remove
    >>> from inspect import isclass, ismethod

    >>> assert isclass(File)
    >>> assert hasattr(File, 'append')
    >>> assert hasattr(File, '__enter__')
    >>> assert hasattr(File, '__exit__')
    >>> assert ismethod(File(None).append)
    >>> assert ismethod(File(None).__enter__)
    >>> assert ismethod(File(None).__exit__)

    >>> with File('_temporary.txt') as file:
    ...    file.append('One')
    ...    file.append('Two')

    >>> open('_temporary.txt').read()
    'One\\nTwo\\n'

    >>> remove('_temporary.txt')
"""

class File:
    ...


Code 4.51. Solution
"""
* Assignment: Protocol ContextManager Buffer
* Complexity: medium
* Lines of code: 15 lines
* Time: 8 min

English:
    1. Define class attribute `BUFFER_LIMIT: int = 100` bytes
    2. File has to be written to disk every X bytes of buffer
    3. Writing and reading takes time,
       how to make buffer save data in the background,
       but it could be still used?
    4. Run doctests - all must succeed

Polish:
    1. Zdefiniuj klasowy atrybut `BUFFER_LIMIT: int = 100` bajtów
    2. Plik na dysku ma być zapisywany co X bajtów bufora
    3. Operacje zapisu i odczytu trwają, jak zrobić,
       aby do bufora podczas zapisu na dysk,
       nadal można było pisać?
    4. Uruchom doctesty - wszystkie muszą się powieść

Hints:
    * `sys.getsizeof(obj)` returns `obj` size in bytes

Tests:
    >>> import sys; sys.tracebacklimit = 0
    >>> from os import remove
    >>> from inspect import isclass, ismethod

    >>> assert isclass(File)
    >>> assert hasattr(File, 'append')
    >>> assert hasattr(File, 'BUFFER_LIMIT')
    >>> assert hasattr(File, '__enter__')
    >>> assert hasattr(File, '__exit__')
    >>> assert ismethod(File(None).append)
    >>> assert ismethod(File(None).__enter__)
    >>> assert ismethod(File(None).__exit__)
    >>> assert File.BUFFER_LIMIT == 100

    >>> with File('_temporary.txt') as file:
    ...    file.append('One')
    ...    file.append('Two')
    ...    file.append('Three')
    ...    file.append('Four')
    ...    file.append('Five')
    ...    file.append('Six')

    >>> open('_temporary.txt').read()
    'One\\nTwo\\nThree\\nFour\\nFive\\nSix\\n'

    >>> remove('_temporary.txt')
"""

from sys import getsizeof
from typing import ClassVar


class File:
    ...


Code 4.52. Solution
"""
* Assignment: Protocol Context Manager AutoSave
* Complexity: hard
* Lines of code: 13 lines
* Time: 13 min

English:
    1. Modify class `File`
    2. Add class configuration attribute `AUTOSAVE_SECONDS: float = 1.0`
    3. Save buffer content to file every `AUTOSAVE_SECONDS` seconds
    4. Writing and reading takes time, how to make buffer save data in the background, but it could be still used?
    5. Run doctests - all must succeed

Polish:
    1. Zmodyfikuj klasę `File`
    2. Dodaj klasowy atrybut konfiguracyjny `AUTOSAVE_SECONDS: float = 1.0`
    3. Zapisuj zawartość bufora do pliku co `AUTOSAVE_SECONDS` sekund
    4. Operacje zapisu i odczytu trwają, jak zrobić, aby do bufora podczas zapisu na dysk, nadal można było pisać?
    5. Uruchom doctesty - wszystkie muszą się powieść

Hint:
    * `from threading import Timer`
    * `timer = Timer(interval, function)`
    * `timer.start()`
    * `timer.cancel()`
    * `ctrl+c` or stop button kills infinite loop

Tests:
    >>> import sys; sys.tracebacklimit = 0
    >>> from os import remove
    >>> from inspect import isclass, ismethod
    >>> from time import sleep

    >>> assert isclass(File)
    >>> assert hasattr(File, 'append')
    >>> assert hasattr(File, 'AUTOSAVE_SECONDS')
    >>> assert hasattr(File, '__enter__')
    >>> assert hasattr(File, '__exit__')
    >>> assert ismethod(File(None).append)
    >>> assert ismethod(File(None).__enter__)
    >>> assert ismethod(File(None).__exit__)
    >>> assert File.AUTOSAVE_SECONDS == 1.0

    >>> with File('_temporary.txt') as file:
    ...     file.append('One')
    ...     file.append('Two')
    ...     sleep(0.5)
    ...     file.append('Three')
    ...     file.append('Four')
    ...     sleep(2.0)
    ...     file.append('Five')
    ...     file.append('Six')

    >>> open('_temporary.txt').read()
    'One\\nTwo\\nThree\\nFour\\nFive\\nSix\\n'

    >>> remove('_temporary.txt')
"""

from threading import Timer


class File:
    ...