7.2. Context Manager

Context managers are a Python construct that allow you to manage resources, such as files or network connections, in a safe and efficient way. They ensure that resources are properly acquired and released, even in the face of errors or exceptions.

In Python, context managers are implemented using the with statement. The with statement provides a way to wrap a block of code with methods defined by a context manager. When the block of code is entered, the __enter__() method of the context manager is called, and when the block of code is exited, the __exit__() method is called.

For example, the following code uses the with statement to open a file and read its contents:


... with open('/tmp/myfile.txt', mode='r') as f:
...    contents = f.read()

In this example, the open() function returns a context manager that manages the file resource. When the with block is entered, the __enter__() method of the context manager is called, which opens the file. When the with block is exited, the __exit__() method is called, which closes the file.

Context managers are a powerful tool in Python, and are used extensively in libraries and frameworks to manage resources and ensure safe and efficient code execution.

7.2.1. Problem

class User:
    def __init__(self, username, password):
        self.username = username
        self.password = password
        self._authenticated = False

    def login(self):
        self._authenticated = True
        print('User logged-in')

    def logout(self):
        self._authenticated = False
        print('User logged-out')

    def set_password(self, new_password):
        if not self._authenticated:
            raise PermissionError('Not authenticated')
        self.password = new_password
        print('Password updated')

Usage:

alice = User(username='alice', password='secret')

alice.login()
User logged-in

alice.set_password('qwerty')
Password updated

alice.logout()
User logged-out

With block:

with User(username='alice', password='secret') as alice:
    alice.set_password('qwerty')

Traceback (most recent call last):
TypeError: 'User' object does not support the context manager protocol

7.2.2. Solution

class User:
    def __init__(self, username, password):
        self.username = username
        self.password = password
        self._authenticated = False

    def login(self):
        self._authenticated = True
        print('User logged-in')

    def logout(self):
        self._authenticated = False
        print('User logged-out')

    def set_password(self, new_password):
        if not self._authenticated:
            raise PermissionError('Not authenticated')
        self.password = new_password
        print('Password updated')

    def __enter__(self):
        self.login()
        return self

    # def __exit__(self, exc_type, exc_value, exc_traceback):
    def __exit__(self, *args):
        self.logout()

Usage:

with User(username='alice', password='secret') as alice:
    alice.set_password('qwerty')

User logged-in
Password updated
User logged-out

7.2.3. Example

class User:
    def __init__(self, firstname, lastname):
        self.firstname = firstname
        self.lastname = lastname

    def __enter__(self):
        print('Entering the block')
        return self

    def __exit__(self, *args):
        print('Exiting the block')

    def say_hello(self):
        print(f'Hello {self.firstname} {self.lastname}')

Now we can use context manager:

with User('Mark', 'Watney') as mark:
    mark.say_hello()

Entering the block
Hello Mark Watney
Exiting the block

Is equivalent to:

mark = User('Mark', 'Watney')

mark = mark.__enter__()
Entering the block

mark.say_hello()
Hello Mark Watney

mark.__exit__()
Exiting the block

7.2.4. Contex Manager

We need to import time() function to get current timestamp (number of seconds from 1970-01-01 00:00:00 UTC):

from time import time

Define our context manager:

class Timeit:
    def __enter__(self):
        self.start = time()

    def __exit__(self, *args):
        self.stop = time()
        duration = self.stop - self.start
        print(f'Duration: {duration:.4f} seconds')

... with Timeit():
...     result = [x**x for x in range(0, 10_000)]
...
Duration: 5.9882 seconds

7.2.5. Context Decorator Class

  • Inherit from contextlib.ContextDecorator

  • Class become context manager decorator

  • Mind the brackets in decorator @Timeit()

We need to import time() function to get current timestamp (number of seconds from 1970-01-01 00:00:00 UTC). Moreover, this time we need also contextlib.ContextDecorator for our class to inherit from:

from time import time
from contextlib import ContextDecorator

Define our context manager:

class Timeit(ContextDecorator):
    def __enter__(self):
        self.start = time()

    def __exit__(self, *args):
        self.stop = time()
        duration = self.stop - self.start
        print(f'Duration: {duration:.4f} seconds')

Define the function which will be automatically wrapped by context manager. Mind the brackets in @Timeit():

@Timeit()
def run():
    result = [x**x for x in range(0, 10_000)]

Calling function will result in executing context manager:

run()
Duration: 5.9302 seconds

7.2.6. Context Decorator Function

  • Split function for parts before and after yield

  • Code before yield becomes __enter__()

  • Code after yield becomes __exit__()

We need to import time() function to get current timestamp (number of seconds from 1970-01-01 00:00:00 UTC):

from time import time
from contextlib import contextmanager

Define our context manager. Mind that Python will split our function for parts before and after yield. Code before yield becomes __enter__() and code after yield becomes __exit__():

@contextmanager
def timeit():
    start = time()
    yield
    end = time()
    duration = stop - start
    print(f'Duration: {duration:.4f} seconds')

Now we can use our function as a context manager:

with timeit():
    result = [x**x for x in range(0, 10_000)]

Duration 4.0250 seconds

7.2.7. Many Context Managers

def convert(json_string) -> str:
    ...

... with open('/tmp/myfile.json', mode='r') as infile, \
...      open('/tmp/myfile.csv', mode='w') as outfile:
...     data = infile.read()
...     result = convert(data)
...     outfile.write(result)

... with (CtxManager() as example):
...     ...

... with (
...     CtxManager1(),
...     CtxManager2()
... ):
...     ...

... with (CtxManager1() as example,
...       CtxManager2()):
...     ...

... with (CtxManager1(),
...       CtxManager2() as example):
...     ...

... with (
...     CtxManager1() as example1,
...     CtxManager2() as example2
... ):
...     ...

... with (
...     CtxManager1() as example1,
...     CtxManager2() as example2,
...     CtxManager3() as example3,
... ):
...     ...

7.2.8. Use Case - 1

from contextlib import contextmanager


@contextmanager
def html_tag(name):
    print(f'<{name}>')
    yield
    print(f'</{name}>')


with html_tag('p'):
    print('We choose to go to the Moon.')

<p>
We choose to go to the Moon.
</p>

7.2.9. Use Case - 2

  • Files

SetUp:

from pathlib import Path

Path('/tmp/myfile.txt').touch()

Open/Close:

f = open('/tmp/myfile.txt')

try:
    content = f.read()
finally:
    f.close()

Context Manager:

with open('/tmp/myfile.txt') as f:
    content = f.read()

Story about file allocation table:

$ uptime
11:29  up 39 days,  2:33, 2 users, load averages: 2.97 4.23 4.41

$ lsof |wc -l
   12710
uint32_max = 4_294_967_295
char* file[uint32_max];

file_alloc[0] = '/tmp/myfile1.txt'
file_alloc[1] = '/tmp/myfile2.txt'
file_alloc[2] = '/tmp/myfile3.txt'
...
file_alloc[4_294_967_294] = '/tmp/myfile4294967294.txt'
file_alloc[4_294_967_295] = '/tmp/myfile4294967295.txt'
file_alloc[4_294_967_296] -> KernelPanic

7.2.10. Use Case - 3

class File:
    def __init__(self, filename):
        self.filename = filename

    def read(self):
        pass

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        pass


with File('/tmp/myfile.txt') as file:
    content = file.read()

7.2.11. Use Case - 4

  • Database

import sqlite3


DATABASE = ':memory:'

SQL_CREATE_TABLE = """
    CREATE TABLE IF NOT EXISTS users (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        firstname TEXT NOT NULL,
        lastname TEXT NOT NULL,
        age INTEGER
    )
"""

SQL_INSERT = """
    INSERT INTO users VALUES(NULL, :firstname, :lastname, :age)
"""

SQL_SELECT = """
    SELECT * FROM users
"""

DATA = [
    {'firstname': 'Mark', 'lastname': 'Watney', 'age': 42},
    {'firstname': 'Melissa', 'lastname': 'Lewis', 'age': 41},
    {'firstname': 'Rick', 'lastname': 'Martinez', 'age': 40},
    {'firstname': 'Alex', 'lastname': 'Vogel', 'age': 42},
    {'firstname': 'Beth', 'lastname': 'Johanssen', 'age': 29},
    {'firstname': 'Chris', 'lastname': 'Beck', 'age': 36},
]


with sqlite3.connect(DATABASE) as db:
    db.execute(SQL_CREATE_TABLE)
    db.executemany(SQL_INSERT, DATA)
    db.row_factory = sqlite3.Row

    for row in db.execute(SQL_SELECT):
        print(dict(row))

<sqlite3.Cursor object at 0x...>
<sqlite3.Cursor object at 0x...>
{'id': 1, 'firstname': 'Mark', 'lastname': 'Watney', 'age': 42}
{'id': 2, 'firstname': 'Melissa', 'lastname': 'Lewis', 'age': 41}
{'id': 3, 'firstname': 'Rick', 'lastname': 'Martinez', 'age': 40}
{'id': 4, 'firstname': 'Alex', 'lastname': 'Vogel', 'age': 42}
{'id': 5, 'firstname': 'Beth', 'lastname': 'Johanssen', 'age': 29}
{'id': 6, 'firstname': 'Chris', 'lastname': 'Beck', 'age': 36}

7.2.12. Use Case - 5

  • Lock

Without context manager:

from threading import Lock


lock = Lock()
lock.acquire()
True

try:
    print('Critical section 1')
    print('Critical section 2')
finally:
    lock.release()

Critical section 1
Critical section 2

With context manager:

from threading import Lock


mylock = Lock()

with mylock:
    print('Critical section 1')
    print('Critical section 2')

Critical section 1
Critical section 2

7.2.13. Use Case - 6

SetUp:

from threading import Lock

Define decorator to automatically use context manager with lock:

def lock(mylock: Lock):
    def decorator(func):
        def wrapper(*args, **kwargs):
            with mylock:
                return func(*args, **kwargs)
        return wrapper
    return decorator

Usage:

mylock = Lock()

@lock(mylock)
def write(file, content):
    print(f'Writing "{content}" to {file}')


write(file='/tmp/myfile.txt', content='hello')
Writing "hello" to /tmp/myfile.txt

7.2.14. Use Case - 7

  • Microbenchmark

SetUp:

from time import time

Define Context Manager to measure start, stop times and calculate duration:

class Timeit:
    def __enter__(self):
        self.start = time()

    def __exit__(self, *args):
        self.stop = time()
        duration = self.stop - self.start
        print(f'Duration: {duration:.4f} seconds')

Let's define some constants for tests:

firstname = 'Mark'
lastname = 'Watney'
repetitions = 10_000_000

Microbenchmark for concatenation using f-string:

with Timeit():
    for _ in range(repetitions):
        f'{firstname}{lastname}'
Duration: 1.3408 seconds

Microbenchmark for concatenation using add (+) operator:

with Timeit():
    for _ in range(repetitions):
        firstname + lastname
Duration: 1.2745 seconds

Microbenchmark for concatenation using modulo (%) operator:

with Timeit():
    for _ in range(repetitions):
        '%s%s' % (firstname, lastname)
Duration: 2.1886 seconds

Microbenchmark for concatenation using modulo (%) operator:

with Timeit():
    for _ in range(repetitions):
        '%(fname)s%(lname)s' % {'fname': firstname, 'lname': lastname}
Duration: 4.1019 seconds

Microbenchmark for concatenation using str.format() method:

with Timeit():
    for _ in range(repetitions):
        '{}{}'.format(firstname, lastname)
Duration: 2.6623 seconds
with Timeit():
    for _ in range(repetitions):
        '{0}{1}'.format(firstname, lastname)
Duration: 2.7617 seconds

Microbenchmark for concatenation using str.format() method:

with Timeit():
    for _ in range(repetitions):
        '{fname}{lname}'.format(fname=firstname, lname=lastname)
Duration: 5.3505 seconds

7.2.15. Use Case - 8

from unittest import IsolatedAsyncioTestCase
from httpx import AsyncClient, Response, HTTPStatusError
from http import HTTPStatus


BASE_URL = 'https://python3.info'

async def request(method: str = 'GET',
            path: str = '/',
            data: dict | None = None,
            headers: dict | None = None,
            ) -> Response:
    async with AsyncClient(base_url=BASE_URL) as ac:
        return await ac.request(method=method, url=path, data=data, headers=headers)


class WebsiteTest(IsolatedAsyncioTestCase):
    async def test_index(self):
        resp = await request('GET', '/index.html')
        self.assertEqual(resp.status_code, HTTPStatus.OK)
        self.assertIn('Python - from None to AI', resp.text)
        self.assertIn('Matt Harasymczuk', resp.text)
        self.assertIn('Creative Commons Attribution-ShareAlike 4.0 International License', resp.text)

    async def test_license(self):
        resp = await request('GET', '/LICENSE.html')
        self.assertEqual(resp.status_code, HTTPStatus.OK)
        self.assertIn('Matt Harasymczuk', resp.text)
        self.assertIn('matt@astronaut.center', resp.text)
        self.assertIn('last update: ', resp.text)
        self.assertIn('Creative Commons Attribution-ShareAlike 4.0 International Public License', resp.text)

    async def test_login(self):
        resp = await request('POST', '/login.html', data={'username':'mwatney', 'password': 'Ares3'})
        self.assertEqual(resp.status_code, HTTPStatus.FORBIDDEN)
        with self.assertRaises(HTTPStatusError):
            resp.raise_for_status()

    async def test_install(self):
        resp = await request('GET', '/install.html')
        self.assertEqual(resp.status_code, HTTPStatus.OK)
        with self.subTest('Python'):
            self.assertNotIn('3.8', resp.text)
            self.assertNotIn('3.9', resp.text)
            self.assertNotIn('3.10', resp.text)
            self.assertNotIn('3.11', resp.text)
            self.assertIn('3.12', resp.text)
            self.assertIn('3.13', resp.text)
        with self.subTest('PyCharm'):
            self.assertNotIn('2021.1', resp.text)
            self.assertNotIn('2021.2', resp.text)
            self.assertNotIn('2021.3', resp.text)
            self.assertNotIn('2022.1', resp.text)
            self.assertNotIn('2022.2', resp.text)
            self.assertNotIn('2022.3', resp.text)
            self.assertNotIn('2023.1', html.text)
            self.assertNotIn('2023.1', html.text)
            self.assertNotIn('2023.2', html.text)
            self.assertNotIn('2023.3', html.text)
            self.assertIn('2024.1', html.text)
            self.assertIn('2024.2', html.text)
            self.assertIn('2024.3', html.text)
        with self.subTest('Git'):
            self.assertIn('2.47 lub nowszy', resp.text)

7.2.16. Use Case - 9

In the most general sense, the Session establishes all conversations with the database and represents a 'holding zone' for all the objects which you've loaded or associated with it during its lifespan. It provides the interface where SELECT and other queries are made that will return and modify ORM-mapped objects. The ORM objects themselves are maintained inside the Session, inside a structure called the identity map - a data structure that maintains unique copies of each object, where 'unique' means 'only one object with a particular primary key'.


... from sqlalchemy import create_engine
... from sqlalchemy.orm import Session
...
... engine = create_engine("postgresql+psycopg2://alice:secret@localhost:5432/")

Create session and add objects:


... with Session(engine) as session:
...     session.add(some_object)
...     session.add(some_other_object)
...     session.commit()

The long-form sequence of operations illustrated above can be achieved more succinctly by making use of the SessionTransaction object returned by the Session.begin() method, which provides a context manager interface for the same sequence of operations:


... with Session(engine) as session:
...     with session.begin():
...         session.add(some_object)
...         session.add(some_other_object)

Create session and add objects. Inner context calls session.commit(), if there were no exceptions. Outer context calls session.close()

There could be several context managers entered at the same time:


... with Session(engine) as session, session.begin():
...     session.add(some_object)
...     session.add(some_other_object)

Create session and add objects. Inner context calls session.commit(), if there were no exceptions. Outer context calls session.close()

7.2.17. References

7.2.18. Assignments

# %% About
# - Name: Protocol ContextManager File
# - Difficulty: easy
# - Lines: 14
# - Minutes: 5

# %% License
# - Copyright 2025, Matt Harasymczuk <matt@python3.info>
# - This code can be used only for learning by humans
# - This code cannot be used for teaching others
# - This code cannot be used for teaching LLMs and AI algorithms
# - This code cannot be used in commercial or proprietary products
# - This code cannot be distributed in any form
# - This code cannot be changed in any form outside of training course
# - This code cannot have its license changed
# - If you use this code in your product, you must open-source it under GPLv2
# - Exception can be granted only by the author

# %% English
# 1. Define class `File` with parameter: `filename: str`
# 2. `File` must implement Context Manager protocol
# 3. `File` buffers lines added using `File.append(text: str)` method
# 4. On `with` block exit, `File` class:
#    - Creates file (if not exists)
#    - Opens file
#    - Writes buffer to file
#    - Clears buffer
#    - Closes file
# 5. Run doctests - all must succeed

# %% Polish
# 1. Stwórz klasę `File` z parametrem: `filename: str`
# 2. `File` ma implementować protokół Context Manager
# 3. `File` buforuje linie dodawane za pomocą metody `File.append(text: str)`
# 4. Na wyjściu z bloku `with`, klasa `File`:
#    - Tworzy plik (jeżeli nie istnieje)
#    - Otwiera plik
#    - Zapisuje bufor do pliku
#    - Czyści bufor
#    - Zamyka plik
# 5. Uruchom doctesty - wszystkie muszą się powieść

# %% Hints
# - Append newline character (`\n`) before adding to buffer

# %% Doctests
"""
>>> import sys; sys.tracebacklimit = 0
>>> assert sys.version_info >= (3, 9), \
'Python 3.9+ required'

>>> from os import remove
>>> from inspect import isclass, ismethod

>>> assert isclass(File)
>>> assert hasattr(File, 'append')
>>> assert hasattr(File, '__enter__')
>>> assert hasattr(File, '__exit__')
>>> assert ismethod(File(None).append)
>>> assert ismethod(File(None).__enter__)
>>> assert ismethod(File(None).__exit__)

>>> with File('_temporary.txt') as file:
...    file.append('One')
...    file.append('Two')

>>> open('_temporary.txt').read()
'One\\nTwo\\n'

>>> remove('_temporary.txt')
"""

# %% Run
# - PyCharm: right-click in the editor and `Run Doctest in ...`
# - PyCharm: keyboard shortcut `Control + Shift + F10`
# - Terminal: `python -m doctest -v myfile.py`

# %% Imports

# %% Types
from typing import Callable, Any
File: type
__init__: Callable[[object, str], None]
__enter__: Callable[[object], object]
__exit__: Callable[[object, Any, Any, Any], None]

# %% Data

# %% Result
class File:
    ...

# %% About
# - Name: Protocol ContextManager Buffer
# - Difficulty: medium
# - Lines: 15
# - Minutes: 8

# %% License
# - Copyright 2025, Matt Harasymczuk <matt@python3.info>
# - This code can be used only for learning by humans
# - This code cannot be used for teaching others
# - This code cannot be used for teaching LLMs and AI algorithms
# - This code cannot be used in commercial or proprietary products
# - This code cannot be distributed in any form
# - This code cannot be changed in any form outside of training course
# - This code cannot have its license changed
# - If you use this code in your product, you must open-source it under GPLv2
# - Exception can be granted only by the author

# %% English
# 1. Define class attribute `BUFFER_LIMIT: int = 100` bytes
# 2. File has to be written to disk every X bytes of buffer
# 3. Writing and reading takes time,
#    how to make buffer save data in the background,
#    but it could be still used?
# 4. Run doctests - all must succeed

# %% Polish
# 1. Zdefiniuj klasowy atrybut `BUFFER_LIMIT: int = 100` bajtów
# 2. Plik na dysku ma być zapisywany co X bajtów bufora
# 3. Operacje zapisu i odczytu trwają, jak zrobić,
#    aby do bufora podczas zapisu na dysk,
#    nadal można było pisać?
# 4. Uruchom doctesty - wszystkie muszą się powieść

# %% Hints
# - `sys.getsizeof(obj)` returns `obj` size in bytes

# %% Doctests
"""
>>> import sys; sys.tracebacklimit = 0
>>> assert sys.version_info >= (3, 9), \
'Python 3.9+ required'

>>> from os import remove
>>> from inspect import isclass, ismethod

>>> assert isclass(File)
>>> assert hasattr(File, 'append')
>>> assert hasattr(File, 'BUFFER_LIMIT')
>>> assert hasattr(File, '__enter__')
>>> assert hasattr(File, '__exit__')
>>> assert ismethod(File(None).append)
>>> assert ismethod(File(None).__enter__)
>>> assert ismethod(File(None).__exit__)
>>> assert File.BUFFER_LIMIT == 100

>>> with File('_temporary.txt') as file:
...    file.append('One')
...    file.append('Two')
...    file.append('Three')
...    file.append('Four')
...    file.append('Five')
...    file.append('Six')

>>> open('_temporary.txt').read()
'One\\nTwo\\nThree\\nFour\\nFive\\nSix\\n'

>>> remove('_temporary.txt')
"""

# %% Run
# - PyCharm: right-click in the editor and `Run Doctest in ...`
# - PyCharm: keyboard shortcut `Control + Shift + F10`
# - Terminal: `python -m doctest -v myfile.py`

# %% Imports
from sys import getsizeof

# %% Types
from typing import Callable, Any
File: type
__init__: Callable[[object, str], None]
__enter__: Callable[[object], object]
__exit__: Callable[[object, Any, Any, Any], None]

# %% Data

# %% Result
class File:
    ...

# %% About
# - Name: Protocol Context Manager AutoSave
# - Difficulty: hard
# - Lines: 13
# - Minutes: 13

# %% License
# - Copyright 2025, Matt Harasymczuk <matt@python3.info>
# - This code can be used only for learning by humans
# - This code cannot be used for teaching others
# - This code cannot be used for teaching LLMs and AI algorithms
# - This code cannot be used in commercial or proprietary products
# - This code cannot be distributed in any form
# - This code cannot be changed in any form outside of training course
# - This code cannot have its license changed
# - If you use this code in your product, you must open-source it under GPLv2
# - Exception can be granted only by the author

# %% English
# 1. Modify class `File`
# 2. Add class configuration attribute `AUTOSAVE_SECONDS: float = 1.0`
# 3. Save buffer content to file every `AUTOSAVE_SECONDS` seconds
# 4. Writing and reading takes time, how to make buffer save data in the background, but it could be still used?
# 5. Run doctests - all must succeed

# %% Polish
# 1. Zmodyfikuj klasę `File`
# 2. Dodaj klasowy atrybut konfiguracyjny `AUTOSAVE_SECONDS: float = 1.0`
# 3. Zapisuj zawartość bufora do pliku co `AUTOSAVE_SECONDS` sekund
# 4. Operacje zapisu i odczytu trwają, jak zrobić, aby do bufora podczas zapisu na dysk, nadal można było pisać?
# 5. Uruchom doctesty - wszystkie muszą się powieść

# %% Hints
# - `from threading import Timer`
# - `timer = Timer(interval, function)`
# - `timer.start()`
# - `timer.cancel()`
# - `ctrl+c` or stop button kills infinite loop

# %% Doctests
"""
>>> import sys; sys.tracebacklimit = 0
>>> assert sys.version_info >= (3, 9), \
'Python 3.9+ required'

>>> from os import remove
>>> from inspect import isclass, ismethod
>>> from time import sleep

>>> assert isclass(File)
>>> assert hasattr(File, 'append')
>>> assert hasattr(File, 'AUTOSAVE_SECONDS')
>>> assert hasattr(File, '__enter__')
>>> assert hasattr(File, '__exit__')
>>> assert ismethod(File(None).append)
>>> assert ismethod(File(None).__enter__)
>>> assert ismethod(File(None).__exit__)
>>> assert File.AUTOSAVE_SECONDS == 1.0

>>> with File('_temporary.txt') as file:
...     file.append('One')
...     file.append('Two')
...     sleep(0.5)
...     file.append('Three')
...     file.append('Four')
...     sleep(2.0)
...     file.append('Five')
...     file.append('Six')

>>> open('_temporary.txt').read()
'One\\nTwo\\nThree\\nFour\\nFive\\nSix\\n'

>>> remove('_temporary.txt')
"""

# %% Run
# - PyCharm: right-click in the editor and `Run Doctest in ...`
# - PyCharm: keyboard shortcut `Control + Shift + F10`
# - Terminal: `python -m doctest -v myfile.py`

# %% Imports
from threading import Timer

# %% Types
from typing import Callable, Any
File: type
__init__: Callable[[object, str], None]
__enter__: Callable[[object], object]
__exit__: Callable[[object, Any, Any, Any], None]

# %% Data

# %% Result
class File:
    ...