diff options
Diffstat (limited to 'python')
35 files changed, 0 insertions, 3592 deletions
diff --git a/python/.gitignore b/python/.gitignore deleted file mode 100644 index 1ff9f49..0000000 --- a/python/.gitignore +++ /dev/null @@ -1,12 +0,0 @@ -.coverage -.mypy_cache/ -.ropeproject/ -.pytest_cache/ -packages/ -python_olm.egg-info/ -_libolm* -__pycache__ -*.pyc -.hypothesis/ -.tox/ -include/ diff --git a/python/MANIFEST.in b/python/MANIFEST.in deleted file mode 100644 index 824b377..0000000 --- a/python/MANIFEST.in +++ /dev/null @@ -1,5 +0,0 @@ -include include/olm/olm.h -include include/olm/pk.h -include include/olm/sas.h -include Makefile -include olm_build.py diff --git a/python/Makefile b/python/Makefile deleted file mode 100644 index 6bba9cd..0000000 --- a/python/Makefile +++ /dev/null @@ -1,57 +0,0 @@ -all: olm-python2 olm-python3 - -OLM_HEADERS = ../include/olm/olm.h ../include/olm/inbound_group_session.h \ - ../include/olm/outbound_group_session.h \ - -include/olm/olm.h: $(OLM_HEADERS) - mkdir -p include/olm - $(CPP) -I dummy -I ../include -o include/olm/olm.h ../include/olm/olm.h -# add memset to the header so that we can use it to clear buffers - echo 'void *memset(void *s, int c, size_t n);' >> include/olm/olm.h - -include/olm/pk.h: include/olm/olm.h ../include/olm/pk.h - $(CPP) -I dummy -I ../include -o include/olm/pk.h ../include/olm/pk.h - -include/olm/sas.h: include/olm/olm.h ../include/olm/sas.h - $(CPP) -I dummy -I ../include -o include/olm/sas.h ../include/olm/sas.h - -headers: include/olm/olm.h include/olm/pk.h include/olm/sas.h - -olm-python2: headers - DEVELOP=$(DEVELOP) python2 setup.py build - -olm-python3: headers - DEVELOP=$(DEVELOP) python3 setup.py build - -install: install-python2 install-python3 - -install-python2: olm-python2 - python2 setup.py install --skip-build -O1 --root=$(DESTDIR) - -install-python3: olm-python3 - python3 setup.py install --skip-build -O1 --root=$(DESTDIR) - -test: olm-python2 olm-python3 - rm -rf install-temp - mkdir -p install-temp/2 install-temp/3 - PYTHONPATH=install-temp/2 python2 setup.py install --skip-build --install-lib install-temp/2 --install-script install-temp/bin - PYTHONPATH=install-temp/3 python3 setup.py install --skip-build --install-lib install-temp/3 --install-script install-temp/bin - PYTHONPATH=install-temp/3 python3 -m pytest - PYTHONPATH=install-temp/2 python2 -m pytest - PYTHONPATH=install-temp/3 python3 -m pytest --flake8 --benchmark-disable - PYTHONPATH=install-temp/3 python3 -m pytest --isort --benchmark-disable - PYTHONPATH=install-temp/3 python3 -m pytest --cov --cov-branch --benchmark-disable - rm -rf install-temp - -isort: - isort -y -p olm - -clean: - rm -rf python_olm.egg-info/ dist/ __pycache__/ - rm -rf *.so _libolm.o - rm -rf packages/ - rm -rf build/ - rm -rf install-temp/ - rm -rf include/ - -.PHONY: all olm-python2 olm-python3 install install-python2 install-python3 clean test diff --git a/python/README.md b/python/README.md deleted file mode 100644 index b928185..0000000 --- a/python/README.md +++ /dev/null @@ -1,161 +0,0 @@ -python-olm -========== - -Python bindings for Olm. - -The specification of the Olm cryptographic ratchet which is used for peer to -peer sessions of this library can be found [here][4]. - -The specification of the Megolm cryptographic ratchet which is used for group -sessions of this library can be found [here][5]. - -An example of the implementation of the Olm and Megolm cryptographic protocol -can be found in the Matrix protocol for which the implementation guide can be -found [here][6]. - -The full API reference can be found [here][7]. - -# Accounts - -Accounts create and hold the central identity of the Olm protocol, they consist of a fingerprint and identity -key pair. They also produce one time keys that are used to start peer to peer -encrypted communication channels. - -## Account Creation - -A new account is created with the Account class, it creates a new Olm key pair. -The public parts of the key pair are available using the identity_keys property -of the class. - -```python ->>> alice = Account() ->>> alice.identity_keys -{'curve25519': '2PytGagXercwHjzQETLcMa3JOsaU2qkPIESaqoi59zE', - 'ed25519': 'HHpOuFYdHwoa54GxSttz9YmaTmbuVU3js92UTUjYJgM'} -``` - - -## One Time keys - -One time keys need to be generated before people can start an encrypted peer to -peer channel to an account. - -```python ->>> alice.generate_one_time_keys(1) ->>> alice.one_time_keys -{'curve25519': {'AAAAAQ': 'KiHoW6CIy905UC4V1Frmwr3VW8bTWkBL4uWtWFFllxM'}} -``` - -After the one time keys are published they should be marked as such so they -aren't reused. - -```python ->>> alice.mark_keys_as_published() ->>> alice.one_time_keys -{'curve25519': {}} -``` - -## Pickling - -Accounts should be stored for later reuse, storing an account is done with the -pickle method while the restoring step is done with the from_pickle class -method. - -```python ->>> pickle = alice.pickle() ->>> restored = Account.from_pickle(pickle) -``` - -# Sessions - -Sessions are used to create an encrypted peer to peer communication channel -between two accounts. - -## Session Creation -```python ->>> alice = Account() ->>> bob = Account() ->>> bob.generate_one_time_keys(1) ->>> id_key = bob.identity_keys["curve25519"] ->>> one_time = list(bob.one_time_keys["curve25519"].values())[0] ->>> alice_session = OutboundSession(alice, id_key, one_time) -``` - -## Encryption - -After an outbound session is created an encrypted message can be exchanged: - -```python ->>> message = alice_session.encrypt("It's a secret to everybody") ->>> message.ciphertext -'AwogkL7RoakT9gnjcZMra+y39WXKRmnxBPEaEp6OSueIA0cSIJxGpBoP8YZ+CGweXQ10LujbXMgK88 -xG/JZMQJ5ulK9ZGiC8TYrezNYr3qyIBLlecXr/9wnegvJaSFDmWDVOcf4XfyI/AwogqIZfAklRXGC5b -ZJcZxVxQGgJ8Dz4OQII8k0Dp8msUXwQACIQvagY1dO55Qvnk5PZ2GF+wdKnvj6Zxl2g' ->>> message.message_type -0 -``` - -After the message is transfered, bob can create an InboundSession to decrypt the -message. - -```python ->>> bob_session = InboundSession(bob, message) ->>> bob_session.decrypt(message) -"It's a secret to everybody" -``` - -## Pickling - -Sessions like accounts can be stored for later use the API is the same as for -accounts. - -```python ->>> pickle = session.pickle() ->>> restored = Session.from_pickle(pickle) -``` - -# Group Sessions - -Group Sessions are used to create a one-to-many encrypted communication channel. -The group session key needs to be shared with all participants that should be able -to decrypt the group messages. Another thing to notice is that, since the group -session key is ratcheted every time a message is encrypted, the session key should -be shared before any messages are encrypted. - -## Group Session Creation - -Group sessions aren't bound to an account like peer-to-peer sessions so their -creation is straightforward. - -```python ->>> alice_group = OutboundGroupSession() ->>> bob_inbound_group = InboundGroupSession(alice_group.session_key) -``` - -## Group Encryption - -Group encryption is pretty simple. The important part is to share the session -key with all participants over a secure channel (e.g. peer-to-peer Olm -sessions). - -```python ->>> message = alice_group.encrypt("It's a secret to everybody") ->>> bob_inbound_group.decrypt(message) -("It's a secret to everybody", 0) -``` - -## Pickling - -Pickling works the same way as for peer-to-peer Olm sessions. - -```python ->>> pickle = session.pickle() ->>> restored = InboundGroupSession.from_pickle(pickle) -``` -[1]: https://git.matrix.org/git/olm/about/ -[2]: https://git.matrix.org/git/olm/tree/python?id=f8c61b8f8432d0b0b38d57f513c5048fb42f22ab -[3]: https://cffi.readthedocs.io/en/latest/ -[4]: https://git.matrix.org/git/olm/about/docs/olm.rst -[5]: https://git.matrix.org/git/olm/about/docs/megolm.rst -[6]: https://matrix.org/docs/guides/e2e_implementation.html -[7]: https://poljar.github.io/python-olm/html/index.html diff --git a/python/docs/Makefile b/python/docs/Makefile deleted file mode 100644 index a72f9d9..0000000 --- a/python/docs/Makefile +++ /dev/null @@ -1,20 +0,0 @@ -# Minimal makefile for Sphinx documentation -# - -# You can set these variables from the command line. -SPHINXOPTS = -SPHINXBUILD = sphinx-build -SPHINXPROJ = olm -SOURCEDIR = . -BUILDDIR = . - -# Put it first so that "make" without argument is like "make help". -help: - @$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) - -.PHONY: help Makefile - -# Catch-all target: route all unknown targets to Sphinx using the new -# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS). -%: Makefile - @$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) diff --git a/python/docs/conf.py b/python/docs/conf.py deleted file mode 100644 index ce2a88d..0000000 --- a/python/docs/conf.py +++ /dev/null @@ -1,165 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Configuration file for the Sphinx documentation builder. -# -# This file does only contain a selection of the most common options. For a -# full list see the documentation: -# http://www.sphinx-doc.org/en/master/config - -# -- Path setup -------------------------------------------------------------- - -# If extensions (or modules to document with autodoc) are in another directory, -# add these directories to sys.path here. If the directory is relative to the -# documentation root, use os.path.abspath to make it absolute, like shown here. -# -import os -import sys - -sys.path.insert(0, os.path.abspath('../')) - - -# -- Project information ----------------------------------------------------- - -project = 'python-olm' -copyright = '2018, Damir Jelić' -author = 'Damir Jelić' - -# The short X.Y version -version = '' -# The full version, including alpha/beta/rc tags -release = '2.2' - - -# -- General configuration --------------------------------------------------- - -# If your documentation needs a minimal Sphinx version, state it here. -# -# needs_sphinx = '1.0' - -# Add any Sphinx extension module names here, as strings. They can be -# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom -# ones. -extensions = [ - 'sphinx.ext.autodoc', - 'sphinx.ext.doctest', - 'sphinx.ext.coverage', - 'sphinx.ext.viewcode', - 'sphinx.ext.githubpages', - 'sphinx.ext.napoleon', -] - -# Add any paths that contain templates here, relative to this directory. -templates_path = ['_templates'] - -# The suffix(es) of source filenames. -# You can specify multiple suffix as a list of string: -# -# source_suffix = ['.rst', '.md'] -source_suffix = '.rst' - -# The master toctree document. -master_doc = 'index' - -# The language for content autogenerated by Sphinx. Refer to documentation -# for a list of supported languages. -# -# This is also used if you do content translation via gettext catalogs. -# Usually you set "language" from the command line for these cases. -language = None - -# List of patterns, relative to source directory, that match files and -# directories to ignore when looking for source files. -# This pattern also affects html_static_path and html_extra_path . -exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store'] - -# The name of the Pygments (syntax highlighting) style to use. -pygments_style = 'sphinx' - - -# -- Options for HTML output ------------------------------------------------- - -# The theme to use for HTML and HTML Help pages. See the documentation for -# a list of builtin themes. -# -html_theme = 'alabaster' - -# Theme options are theme-specific and customize the look and feel of a theme -# further. For a list of options available for each theme, see the -# documentation. -# -# html_theme_options = {} - -# Add any paths that contain custom static files (such as style sheets) here, -# relative to this directory. They are copied after the builtin static files, -# so a file named "default.css" will overwrite the builtin "default.css". -html_static_path = ['_static'] - -# Custom sidebar templates, must be a dictionary that maps document names -# to template names. -# -# The default sidebars (for documents that don't match any pattern) are -# defined by theme itself. Builtin themes are using these templates by -# default: ``['localtoc.html', 'relations.html', 'sourcelink.html', -# 'searchbox.html']``. -# -# html_sidebars = {} - - -# -- Options for HTMLHelp output --------------------------------------------- - -# Output file base name for HTML help builder. -htmlhelp_basename = 'olmdoc' - - -# -- Options for LaTeX output ------------------------------------------------ - -latex_elements = { - # The paper size ('letterpaper' or 'a4paper'). - # - # 'papersize': 'letterpaper', - - # The font size ('10pt', '11pt' or '12pt'). - # - # 'pointsize': '10pt', - - # Additional stuff for the LaTeX preamble. - # - # 'preamble': '', - - # Latex figure (float) alignment - # - # 'figure_align': 'htbp', -} - -# Grouping the document tree into LaTeX files. List of tuples -# (source start file, target name, title, -# author, documentclass [howto, manual, or own class]). -latex_documents = [ - (master_doc, 'olm.tex', 'olm Documentation', - 'Damir Jelić', 'manual'), -] - - -# -- Options for manual page output ------------------------------------------ - -# One entry per manual page. List of tuples -# (source start file, name, description, authors, manual section). -man_pages = [ - (master_doc, 'olm', 'olm Documentation', - [author], 1) -] - - -# -- Options for Texinfo output ---------------------------------------------- - -# Grouping the document tree into Texinfo files. List of tuples -# (source start file, target name, title, author, -# dir menu entry, description, category) -texinfo_documents = [ - (master_doc, 'olm', 'olm Documentation', - author, 'olm', 'One line description of project.', - 'Miscellaneous'), -] - - -# -- Extension configuration ------------------------------------------------- diff --git a/python/docs/index.html b/python/docs/index.html deleted file mode 100644 index 9644bbb..0000000 --- a/python/docs/index.html +++ /dev/null @@ -1 +0,0 @@ -<meta http-equiv="refresh" content="0; url=./html/index.html" /> diff --git a/python/docs/index.rst b/python/docs/index.rst deleted file mode 100644 index 39e6657..0000000 --- a/python/docs/index.rst +++ /dev/null @@ -1,19 +0,0 @@ -.. olm documentation master file, created by - sphinx-quickstart on Sun Jun 17 15:57:08 2018. - -Welcome to olm's documentation! -=============================== - -.. toctree:: - Olm API reference <olm.rst> - :maxdepth: 2 - :caption: Contents: - - - -Indices and tables -================== - -* :ref:`genindex` -* :ref:`modindex` -* :ref:`search` diff --git a/python/docs/make.bat b/python/docs/make.bat deleted file mode 100644 index 1c5b4d8..0000000 --- a/python/docs/make.bat +++ /dev/null @@ -1,36 +0,0 @@ -@ECHO OFF - -pushd %~dp0 - -REM Command file for Sphinx documentation - -if "%SPHINXBUILD%" == "" ( - set SPHINXBUILD=sphinx-build -) -set SOURCEDIR=. -set BUILDDIR=_build -set SPHINXPROJ=olm - -if "%1" == "" goto help - -%SPHINXBUILD% >NUL 2>NUL -if errorlevel 9009 ( - echo. - echo.The 'sphinx-build' command was not found. Make sure you have Sphinx - echo.installed, then set the SPHINXBUILD environment variable to point - echo.to the full path of the 'sphinx-build' executable. Alternatively you - echo.may add the Sphinx directory to PATH. - echo. - echo.If you don't have Sphinx installed, grab it from - echo.http://sphinx-doc.org/ - exit /b 1 -) - -%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% -goto end - -:help -%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% - -:end -popd diff --git a/python/docs/olm.rst b/python/docs/olm.rst deleted file mode 100644 index 9d8edf0..0000000 --- a/python/docs/olm.rst +++ /dev/null @@ -1,34 +0,0 @@ -olm package -=========== - -olm.account module ------------------- - -.. automodule:: olm.account - :members: - :undoc-members: - :show-inheritance: - -olm.group\_session module -------------------------- - -.. automodule:: olm.group_session - :members: - :undoc-members: - :show-inheritance: - -olm.session module ------------------- - -.. automodule:: olm.session - :members: - :undoc-members: - :show-inheritance: - -olm.utility module ------------------- - -.. automodule:: olm.utility - :members: - :undoc-members: - :show-inheritance: diff --git a/python/dummy/README b/python/dummy/README deleted file mode 100644 index 61039f3..0000000 --- a/python/dummy/README +++ /dev/null @@ -1,2 +0,0 @@ -Dummy header files, so that we can generate the function list for cffi from the -olm header files.
\ No newline at end of file diff --git a/python/dummy/stddef.h b/python/dummy/stddef.h deleted file mode 100644 index e69de29..0000000 --- a/python/dummy/stddef.h +++ /dev/null diff --git a/python/dummy/stdint.h b/python/dummy/stdint.h deleted file mode 100644 index e69de29..0000000 --- a/python/dummy/stdint.h +++ /dev/null diff --git a/python/olm/__init__.py b/python/olm/__init__.py deleted file mode 100644 index 26257a5..0000000 --- a/python/olm/__init__.py +++ /dev/null @@ -1,48 +0,0 @@ -# -*- coding: utf-8 -*- -# libolm python bindings -# Copyright © 2015-2017 OpenMarket Ltd -# Copyright © 2018 Damir Jelić <poljar@termina.org.uk> -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -""" -Olm Python bindings -~~~~~~~~~~~~~~~~~~~~~ -| This package implements python bindings for the libolm C library. -| © Copyright 2015-2017 by OpenMarket Ltd -| © Copyright 2018 by Damir Jelić -""" -from .utility import ed25519_verify, OlmVerifyError, OlmHashError, sha256 -from .account import Account, OlmAccountError -from .session import ( - Session, - InboundSession, - OutboundSession, - OlmSessionError, - OlmMessage, - OlmPreKeyMessage -) -from .group_session import ( - InboundGroupSession, - OutboundGroupSession, - OlmGroupSessionError -) -from .pk import ( - PkMessage, - PkEncryption, - PkDecryption, - PkSigning, - PkEncryptionError, - PkDecryptionError, - PkSigningError -) -from .sas import Sas, OlmSasError diff --git a/python/olm/__version__.py b/python/olm/__version__.py deleted file mode 100644 index 498f89f..0000000 --- a/python/olm/__version__.py +++ /dev/null @@ -1,9 +0,0 @@ -__title__ = "python-olm" -__description__ = ("python CFFI bindings for the olm " - "cryptographic ratchet library") -__url__ = "https://github.com/poljar/python-olm" -__version__ = "3.2.1" -__author__ = "Damir Jelić" -__author_email__ = "poljar@termina.org.uk" -__license__ = "Apache 2.0" -__copyright__ = "Copyright 2018-2019 Damir Jelić" diff --git a/python/olm/_compat.py b/python/olm/_compat.py deleted file mode 100644 index 2ceaa33..0000000 --- a/python/olm/_compat.py +++ /dev/null @@ -1,67 +0,0 @@ -# -*- coding: utf-8 -*- -# libolm python bindings -# Copyright © 2015-2017 OpenMarket Ltd -# Copyright © 2018 Damir Jelić <poljar@termina.org.uk> -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from builtins import bytes, str -from typing import AnyStr - -try: - import secrets - URANDOM = secrets.token_bytes # pragma: no cover -except ImportError: # pragma: no cover - from os import urandom - URANDOM = urandom # type: ignore - - -def to_bytearray(string): - # type: (AnyStr) -> bytes - if isinstance(string, bytes): - return bytearray(string) - elif isinstance(string, str): - return bytearray(string, "utf-8") - - raise TypeError("Invalid type {}".format(type(string))) - - -def to_bytes(string): - # type: (AnyStr) -> bytes - if isinstance(string, bytes): - return string - elif isinstance(string, str): - return bytes(string, "utf-8") - - raise TypeError("Invalid type {}".format(type(string))) - - -def to_unicode_str(byte_string, errors="replace"): - """Turn a byte string into a unicode string. - - Should be used everywhere where the input byte string might not be trusted - and may contain invalid unicode values. - - Args: - byte_string (bytes): The bytestring that will be converted to a native - string. - errors (str, optional): The error handling scheme that should be used - to handle unicode decode errors. Can be one of "strict" (raise an - UnicodeDecodeError exception, "ignore" (remove the offending - characters), "replace" (replace the offending character with - U+FFFD), "xmlcharrefreplace" as well as any other name registered - with codecs.register_error that can handle UnicodeEncodeErrors. - - Returns the decoded native string. - """ - return byte_string.decode(encoding="utf-8", errors=errors) diff --git a/python/olm/_finalize.py b/python/olm/_finalize.py deleted file mode 100644 index 9f467bc..0000000 --- a/python/olm/_finalize.py +++ /dev/null @@ -1,65 +0,0 @@ -# The MIT License (MIT) -# Copyright (c) 2010 Benjamin Peterson <benjamin@python.org> - -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: - -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. - -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, -# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. -# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, -# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR -# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE -# OR OTHER DEALINGS IN THE SOFTWARE. - -"""Finalization with weakrefs - -This is designed for avoiding __del__. -""" -from __future__ import print_function - -import sys -import traceback -import weakref - -__author__ = "Benjamin Peterson <benjamin@python.org>" - - -class OwnerRef(weakref.ref): - """A simple weakref.ref subclass, so attributes can be added.""" - pass - - -def _run_finalizer(ref): - """Internal weakref callback to run finalizers""" - del _finalize_refs[id(ref)] - finalizer = ref.finalizer - item = ref.item - try: - finalizer(item) - except Exception: # pragma: no cover - print("Exception running {}:".format(finalizer), file=sys.stderr) - traceback.print_exc() - - -_finalize_refs = {} - - -def track_for_finalization(owner, item, finalizer): - """Register an object for finalization. - - ``owner`` is the the object which is responsible for ``item``. - ``finalizer`` will be called with ``item`` as its only argument when - ``owner`` is destroyed by the garbage collector. - """ - ref = OwnerRef(owner, _run_finalizer) - ref.item = item - ref.finalizer = finalizer - _finalize_refs[id(ref)] = ref diff --git a/python/olm/account.py b/python/olm/account.py deleted file mode 100644 index 8455655..0000000 --- a/python/olm/account.py +++ /dev/null @@ -1,271 +0,0 @@ -# -*- coding: utf-8 -*- -# libolm python bindings -# Copyright © 2015-2017 OpenMarket Ltd -# Copyright © 2018 Damir Jelić <poljar@termina.org.uk> -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""libolm Account module. - -This module contains the account part of the Olm library. It contains a single -Account class which handles the creation of new accounts as well as the storing -and restoring of them. - -Examples: - >>> acc = Account() - >>> account.identity_keys() - >>> account.generate_one_time_keys(1) - -""" - -import json -# pylint: disable=redefined-builtin,unused-import -from builtins import bytes, super -from typing import AnyStr, Dict, Optional, Type - -from future.utils import bytes_to_native_str - -# pylint: disable=no-name-in-module -from _libolm import ffi, lib # type: ignore - -from ._compat import URANDOM, to_bytearray -from ._finalize import track_for_finalization - -# This is imported only for type checking purposes -if False: - from .session import Session # pragma: no cover - - -def _clear_account(account): - # type: (ffi.cdata) -> None - lib.olm_clear_account(account) - - -class OlmAccountError(Exception): - """libolm Account error exception.""" - - -class Account(object): - """libolm Account class.""" - - def __new__(cls): - # type: (Type[Account]) -> Account - obj = super().__new__(cls) - obj._buf = ffi.new("char[]", lib.olm_account_size()) - obj._account = lib.olm_account(obj._buf) - track_for_finalization(obj, obj._account, _clear_account) - return obj - - def __init__(self): - # type: () -> None - """Create a new Olm account. - - Creates a new account and its matching identity key pair. - - Raises OlmAccountError on failure. If there weren't enough random bytes - for the account creation the error message for the exception will be - NOT_ENOUGH_RANDOM. - """ - # This is needed to silence mypy not knowing the type of _account. - # There has to be a better way for this. - if False: # pragma: no cover - self._account = self._account # type: ffi.cdata - - random_length = lib.olm_create_account_random_length(self._account) - random = URANDOM(random_length) - - self._check_error( - lib.olm_create_account(self._account, ffi.from_buffer(random), - random_length)) - - - def _check_error(self, ret): - # type: (int) -> None - if ret != lib.olm_error(): - return - - last_error = bytes_to_native_str( - ffi.string((lib.olm_account_last_error(self._account)))) - - raise OlmAccountError(last_error) - - def pickle(self, passphrase=""): - # type: (Optional[str]) -> bytes - """Store an Olm account. - - Stores an account as a base64 string. Encrypts the account using the - supplied passphrase. Returns a byte object containing the base64 - encoded string of the pickled account. Raises OlmAccountError on - failure. - - Args: - passphrase(str, optional): The passphrase to be used to encrypt - the account. - """ - byte_key = bytearray(passphrase, "utf-8") if passphrase else b"" - - pickle_length = lib.olm_pickle_account_length(self._account) - pickle_buffer = ffi.new("char[]", pickle_length) - - try: - self._check_error( - lib.olm_pickle_account(self._account, - ffi.from_buffer(byte_key), - len(byte_key), - pickle_buffer, - pickle_length)) - finally: - # zero out copies of the passphrase - for i in range(0, len(byte_key)): - byte_key[i] = 0 - - return ffi.unpack(pickle_buffer, pickle_length) - - @classmethod - def from_pickle(cls, pickle, passphrase=""): - # type: (bytes, Optional[str]) -> Account - """Load a previously stored olm account. - - Loads an account from a pickled base64-encoded string and returns an - Account object. Decrypts the account using the supplied passphrase. - Raises OlmAccountError on failure. If the passphrase doesn't match the - one used to encrypt the account then the error message for the - exception will be "BAD_ACCOUNT_KEY". If the base64 couldn't be decoded - then the error message will be "INVALID_BASE64". - - Args: - pickle(bytes): Base64 encoded byte string containing the pickled - account - passphrase(str, optional): The passphrase used to encrypt the - account. - """ - if not pickle: - raise ValueError("Pickle can't be empty") - - byte_key = bytearray(passphrase, "utf-8") if passphrase else b"" - # copy because unpickle will destroy the buffer - pickle_buffer = ffi.new("char[]", pickle) - - obj = cls.__new__(cls) - - try: - ret = lib.olm_unpickle_account(obj._account, - ffi.from_buffer(byte_key), - len(byte_key), - pickle_buffer, - len(pickle)) - obj._check_error(ret) - finally: - for i in range(0, len(byte_key)): - byte_key[i] = 0 - - return obj - - @property - def identity_keys(self): - # type: () -> Dict[str, str] - """dict: Public part of the identity keys of the account.""" - out_length = lib.olm_account_identity_keys_length(self._account) - out_buffer = ffi.new("char[]", out_length) - - self._check_error( - lib.olm_account_identity_keys(self._account, out_buffer, - out_length)) - return json.loads(ffi.unpack(out_buffer, out_length).decode("utf-8")) - - def sign(self, message): - # type: (AnyStr) -> str - """Signs a message with this account. - - Signs a message with the private ed25519 identity key of this account. - Returns the signature. - Raises OlmAccountError on failure. - - Args: - message(str): The message to sign. - """ - bytes_message = to_bytearray(message) - out_length = lib.olm_account_signature_length(self._account) - out_buffer = ffi.new("char[]", out_length) - - try: - self._check_error( - lib.olm_account_sign(self._account, - ffi.from_buffer(bytes_message), - len(bytes_message), out_buffer, - out_length)) - finally: - # clear out copies of the message, which may be plaintext - if bytes_message is not message: - for i in range(0, len(bytes_message)): - bytes_message[i] = 0 - - return bytes_to_native_str(ffi.unpack(out_buffer, out_length)) - - @property - def max_one_time_keys(self): - # type: () -> int - """int: The maximum number of one-time keys the account can store.""" - return lib.olm_account_max_number_of_one_time_keys(self._account) - - def mark_keys_as_published(self): - # type: () -> None - """Mark the current set of one-time keys as being published.""" - lib.olm_account_mark_keys_as_published(self._account) - - def generate_one_time_keys(self, count): - # type: (int) -> None - """Generate a number of new one-time keys. - - If the total number of keys stored by this account exceeds - max_one_time_keys() then the old keys are discarded. - Raises OlmAccountError on error. - - Args: - count(int): The number of keys to generate. - """ - random_length = lib.olm_account_generate_one_time_keys_random_length( - self._account, count) - random = URANDOM(random_length) - - self._check_error( - lib.olm_account_generate_one_time_keys( - self._account, count, ffi.from_buffer(random), random_length)) - - @property - def one_time_keys(self): - # type: () -> Dict[str, Dict[str, str]] - """dict: The public part of the one-time keys for this account.""" - out_length = lib.olm_account_one_time_keys_length(self._account) - out_buffer = ffi.new("char[]", out_length) - - self._check_error( - lib.olm_account_one_time_keys(self._account, out_buffer, - out_length)) - - return json.loads(ffi.unpack(out_buffer, out_length).decode("utf-8")) - - def remove_one_time_keys(self, session): - # type: (Session) -> None - """Remove used one-time keys. - - Removes the one-time keys that the session used from the account. - Raises OlmAccountError on failure. If the account doesn't have any - matching one-time keys then the error message of the exception will be - "BAD_MESSAGE_KEY_ID". - - Args: - session(Session): An Olm Session object that was created with this - account. - """ - self._check_error(lib.olm_remove_one_time_keys(self._account, - session._session)) diff --git a/python/olm/group_session.py b/python/olm/group_session.py deleted file mode 100644 index 5068192..0000000 --- a/python/olm/group_session.py +++ /dev/null @@ -1,532 +0,0 @@ -# -*- coding: utf-8 -*- -# libolm python bindings -# Copyright © 2015-2017 OpenMarket Ltd -# Copyright © 2018 Damir Jelić <poljar@termina.org.uk> -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""libolm Group session module. - -This module contains the group session part of the Olm library. It contains two -classes for creating inbound and outbound group sessions. - -Examples: - >>> outbound = OutboundGroupSession() - >>> InboundGroupSession(outbound.session_key) -""" - -# pylint: disable=redefined-builtin,unused-import -from builtins import bytes, super -from typing import AnyStr, Optional, Tuple, Type - -from future.utils import bytes_to_native_str - -# pylint: disable=no-name-in-module -from _libolm import ffi, lib # type: ignore - -from ._compat import URANDOM, to_bytearray, to_bytes, to_unicode_str -from ._finalize import track_for_finalization - - -def _clear_inbound_group_session(session): - # type: (ffi.cdata) -> None - lib.olm_clear_inbound_group_session(session) - - -def _clear_outbound_group_session(session): - # type: (ffi.cdata) -> None - lib.olm_clear_outbound_group_session(session) - - -class OlmGroupSessionError(Exception): - """libolm Group session error exception.""" - - -class InboundGroupSession(object): - """Inbound group session for encrypted multiuser communication.""" - - def __new__( - cls, # type: Type[InboundGroupSession] - session_key=None # type: Optional[str] - ): - # type: (...) -> InboundGroupSession - obj = super().__new__(cls) - obj._buf = ffi.new("char[]", lib.olm_inbound_group_session_size()) - obj._session = lib.olm_inbound_group_session(obj._buf) - track_for_finalization(obj, obj._session, _clear_inbound_group_session) - return obj - - def __init__(self, session_key): - # type: (AnyStr) -> None - """Create a new inbound group session. - Start a new inbound group session, from a key exported from - an outbound group session. - - Raises OlmGroupSessionError on failure. The error message of the - exception will be "OLM_INVALID_BASE64" if the session key is not valid - base64 and "OLM_BAD_SESSION_KEY" if the session key is invalid. - """ - if False: # pragma: no cover - self._session = self._session # type: ffi.cdata - - byte_session_key = to_bytearray(session_key) - - try: - ret = lib.olm_init_inbound_group_session( - self._session, - ffi.from_buffer(byte_session_key), len(byte_session_key) - ) - finally: - if byte_session_key is not session_key: - for i in range(0, len(byte_session_key)): - byte_session_key[i] = 0 - self._check_error(ret) - - def pickle(self, passphrase=""): - # type: (Optional[str]) -> bytes - """Store an inbound group session. - - Stores a group session as a base64 string. Encrypts the session using - the supplied passphrase. Returns a byte object containing the base64 - encoded string of the pickled session. - - Args: - passphrase(str, optional): The passphrase to be used to encrypt - the session. - """ - byte_passphrase = bytearray(passphrase, "utf-8") if passphrase else b"" - - pickle_length = lib.olm_pickle_inbound_group_session_length( - self._session) - pickle_buffer = ffi.new("char[]", pickle_length) - - try: - ret = lib.olm_pickle_inbound_group_session( - self._session, - ffi.from_buffer(byte_passphrase), len(byte_passphrase), - pickle_buffer, pickle_length - ) - self._check_error(ret) - finally: - # clear out copies of the passphrase - for i in range(0, len(byte_passphrase)): - byte_passphrase[i] = 0 - - return ffi.unpack(pickle_buffer, pickle_length) - - @classmethod - def from_pickle(cls, pickle, passphrase=""): - # type: (bytes, Optional[str]) -> InboundGroupSession - """Load a previously stored inbound group session. - - Loads an inbound group session from a pickled base64 string and returns - an InboundGroupSession object. Decrypts the session using the supplied - passphrase. Raises OlmSessionError on failure. If the passphrase - doesn't match the one used to encrypt the session then the error - message for the exception will be "BAD_ACCOUNT_KEY". If the base64 - couldn't be decoded then the error message will be "INVALID_BASE64". - - Args: - pickle(bytes): Base64 encoded byte string containing the pickled - session - passphrase(str, optional): The passphrase used to encrypt the - session - """ - if not pickle: - raise ValueError("Pickle can't be empty") - - byte_passphrase = bytearray(passphrase, "utf-8") if passphrase else b"" - # copy because unpickle will destroy the buffer - pickle_buffer = ffi.new("char[]", pickle) - - obj = cls.__new__(cls) - - try: - ret = lib.olm_unpickle_inbound_group_session( - obj._session, - ffi.from_buffer(byte_passphrase), - len(byte_passphrase), - pickle_buffer, - len(pickle) - ) - obj._check_error(ret) - finally: - # clear out copies of the passphrase - for i in range(0, len(byte_passphrase)): - byte_passphrase[i] = 0 - - return obj - - def _check_error(self, ret): - # type: (int) -> None - if ret != lib.olm_error(): - return - - last_error = bytes_to_native_str(ffi.string( - lib.olm_inbound_group_session_last_error(self._session))) - - raise OlmGroupSessionError(last_error) - - def decrypt(self, ciphertext, unicode_errors="replace"): - # type: (AnyStr, str) -> Tuple[str, int] - """Decrypt a message - - Returns a tuple of the decrypted plain-text and the message index of - the decrypted message or raises OlmGroupSessionError on failure. - On failure the error message of the exception will be: - - * OLM_INVALID_BASE64 if the message is not valid base64 - * OLM_BAD_MESSAGE_VERSION if the message was encrypted with an - unsupported version of the protocol - * OLM_BAD_MESSAGE_FORMAT if the message headers could not be - decoded - * OLM_BAD_MESSAGE_MAC if the message could not be verified - * OLM_UNKNOWN_MESSAGE_INDEX if we do not have a session key - corresponding to the message's index (i.e., it was sent before - the session key was shared with us) - - Args: - ciphertext(str): Base64 encoded ciphertext containing the encrypted - message - unicode_errors(str, optional): The error handling scheme to use for - unicode decoding errors. The default is "replace" meaning that - the character that was unable to decode will be replaced with - the unicode replacement character (U+FFFD). Other possible - values are "strict", "ignore" and "xmlcharrefreplace" as well - as any other name registered with codecs.register_error that - can handle UnicodeEncodeErrors. - """ - if not ciphertext: - raise ValueError("Ciphertext can't be empty.") - - byte_ciphertext = to_bytes(ciphertext) - - # copy because max_plaintext_length will destroy the buffer - ciphertext_buffer = ffi.new("char[]", byte_ciphertext) - - max_plaintext_length = lib.olm_group_decrypt_max_plaintext_length( - self._session, ciphertext_buffer, len(byte_ciphertext) - ) - self._check_error(max_plaintext_length) - plaintext_buffer = ffi.new("char[]", max_plaintext_length) - # copy because max_plaintext_length will destroy the buffer - ciphertext_buffer = ffi.new("char[]", byte_ciphertext) - - message_index = ffi.new("uint32_t*") - plaintext_length = lib.olm_group_decrypt( - self._session, ciphertext_buffer, len(byte_ciphertext), - plaintext_buffer, max_plaintext_length, - message_index - ) - - self._check_error(plaintext_length) - - plaintext = to_unicode_str( - ffi.unpack(plaintext_buffer, plaintext_length), - errors=unicode_errors - ) - - # clear out copies of the plaintext - lib.memset(plaintext_buffer, 0, max_plaintext_length) - - return plaintext, message_index[0] - - @property - def id(self): - # type: () -> str - """str: A base64 encoded identifier for this session.""" - id_length = lib.olm_inbound_group_session_id_length(self._session) - id_buffer = ffi.new("char[]", id_length) - ret = lib.olm_inbound_group_session_id( - self._session, - id_buffer, - id_length - ) - self._check_error(ret) - return bytes_to_native_str(ffi.unpack(id_buffer, id_length)) - - @property - def first_known_index(self): - # type: () -> int - """int: The first message index we know how to decrypt.""" - return lib.olm_inbound_group_session_first_known_index(self._session) - - def export_session(self, message_index): - # type: (int) -> str - """Export an inbound group session - - Export the base64-encoded ratchet key for this session, at the given - index, in a format which can be used by import_session(). - - Raises OlmGroupSessionError on failure. The error message for the - exception will be: - - * OLM_UNKNOWN_MESSAGE_INDEX if we do not have a session key - corresponding to the given index (ie, it was sent before the - session key was shared with us) - - Args: - message_index(int): The message index at which the session should - be exported. - """ - - export_length = lib.olm_export_inbound_group_session_length( - self._session) - - export_buffer = ffi.new("char[]", export_length) - ret = lib.olm_export_inbound_group_session( - self._session, - export_buffer, - export_length, - message_index - ) - self._check_error(ret) - export_str = bytes_to_native_str(ffi.unpack(export_buffer, export_length)) - - # clear out copies of the key - lib.memset(export_buffer, 0, export_length) - - return export_str - - @classmethod - def import_session(cls, session_key): - # type: (AnyStr) -> InboundGroupSession - """Create an InboundGroupSession from an exported session key. - - Creates an InboundGroupSession with an previously exported session key, - raises OlmGroupSessionError on failure. The error message for the - exception will be: - - * OLM_INVALID_BASE64 if the session_key is not valid base64 - * OLM_BAD_SESSION_KEY if the session_key is invalid - - Args: - session_key(str): The exported session key with which the inbound - group session will be created - """ - obj = cls.__new__(cls) - - byte_session_key = to_bytearray(session_key) - - try: - ret = lib.olm_import_inbound_group_session( - obj._session, - ffi.from_buffer(byte_session_key), - len(byte_session_key) - ) - obj._check_error(ret) - finally: - # clear out copies of the key - if byte_session_key is not session_key: - for i in range(0, len(byte_session_key)): - byte_session_key[i] = 0 - - return obj - - -class OutboundGroupSession(object): - """Outbound group session for encrypted multiuser communication.""" - - def __new__(cls): - # type: (Type[OutboundGroupSession]) -> OutboundGroupSession - obj = super().__new__(cls) - obj._buf = ffi.new("char[]", lib.olm_outbound_group_session_size()) - obj._session = lib.olm_outbound_group_session(obj._buf) - track_for_finalization( - obj, - obj._session, - _clear_outbound_group_session - ) - return obj - - def __init__(self): - # type: () -> None - """Create a new outbound group session. - - Start a new outbound group session. Raises OlmGroupSessionError on - failure. - """ - if False: # pragma: no cover - self._session = self._session # type: ffi.cdata - - random_length = lib.olm_init_outbound_group_session_random_length( - self._session - ) - random = URANDOM(random_length) - - ret = lib.olm_init_outbound_group_session( - self._session, ffi.from_buffer(random), random_length - ) - self._check_error(ret) - - def _check_error(self, ret): - # type: (int) -> None - if ret != lib.olm_error(): - return - - last_error = bytes_to_native_str(ffi.string( - lib.olm_outbound_group_session_last_error(self._session) - )) - - raise OlmGroupSessionError(last_error) - - def pickle(self, passphrase=""): - # type: (Optional[str]) -> bytes - """Store an outbound group session. - - Stores a group session as a base64 string. Encrypts the session using - the supplied passphrase. Returns a byte object containing the base64 - encoded string of the pickled session. - - Args: - passphrase(str, optional): The passphrase to be used to encrypt - the session. - """ - byte_passphrase = bytearray(passphrase, "utf-8") if passphrase else b"" - pickle_length = lib.olm_pickle_outbound_group_session_length( - self._session) - pickle_buffer = ffi.new("char[]", pickle_length) - - try: - ret = lib.olm_pickle_outbound_group_session( - self._session, - ffi.from_buffer(byte_passphrase), len(byte_passphrase), - pickle_buffer, pickle_length - ) - self._check_error(ret) - finally: - # clear out copies of the passphrase - for i in range(0, len(byte_passphrase)): - byte_passphrase[i] = 0 - - return ffi.unpack(pickle_buffer, pickle_length) - - @classmethod - def from_pickle(cls, pickle, passphrase=""): - # type: (bytes, Optional[str]) -> OutboundGroupSession - """Load a previously stored outbound group session. - - Loads an outbound group session from a pickled base64 string and - returns an OutboundGroupSession object. Decrypts the session using the - supplied passphrase. Raises OlmSessionError on failure. If the - passphrase doesn't match the one used to encrypt the session then the - error message for the exception will be "BAD_ACCOUNT_KEY". If the - base64 couldn't be decoded then the error message will be - "INVALID_BASE64". - - Args: - pickle(bytes): Base64 encoded byte string containing the pickled - session - passphrase(str, optional): The passphrase used to encrypt the - """ - if not pickle: - raise ValueError("Pickle can't be empty") - - byte_passphrase = bytearray(passphrase, "utf-8") if passphrase else b"" - # copy because unpickle will destroy the buffer - pickle_buffer = ffi.new("char[]", pickle) - - obj = cls.__new__(cls) - - try: - ret = lib.olm_unpickle_outbound_group_session( - obj._session, - ffi.from_buffer(byte_passphrase), - len(byte_passphrase), - pickle_buffer, - len(pickle) - ) - obj._check_error(ret) - finally: - # clear out copies of the passphrase - for i in range(0, len(byte_passphrase)): - byte_passphrase[i] = 0 - - return obj - - def encrypt(self, plaintext): - # type: (AnyStr) -> str - """Encrypt a message. - - Returns the encrypted ciphertext. - - Args: - plaintext(str): A string that will be encrypted using the group - session. - """ - byte_plaintext = to_bytearray(plaintext) - message_length = lib.olm_group_encrypt_message_length( - self._session, len(byte_plaintext) - ) - - message_buffer = ffi.new("char[]", message_length) - - try: - ret = lib.olm_group_encrypt( - self._session, - ffi.from_buffer(byte_plaintext), len(byte_plaintext), - message_buffer, message_length, - ) - self._check_error(ret) - finally: - # clear out copies of plaintext - if byte_plaintext is not plaintext: - for i in range(0, len(byte_plaintext)): - byte_plaintext[i] = 0 - - return bytes_to_native_str(ffi.unpack(message_buffer, message_length)) - - @property - def id(self): - # type: () -> str - """str: A base64 encoded identifier for this session.""" - id_length = lib.olm_outbound_group_session_id_length(self._session) - id_buffer = ffi.new("char[]", id_length) - - ret = lib.olm_outbound_group_session_id( - self._session, - id_buffer, - id_length - ) - self._check_error(ret) - - return bytes_to_native_str(ffi.unpack(id_buffer, id_length)) - - @property - def message_index(self): - # type: () -> int - """int: The current message index of the session. - - Each message is encrypted with an increasing index. This is the index - for the next message. - """ - return lib.olm_outbound_group_session_message_index(self._session) - - @property - def session_key(self): - # type: () -> str - """The base64-encoded current ratchet key for this session. - - Each message is encrypted with a different ratchet key. This function - returns the ratchet key that will be used for the next message. - """ - key_length = lib.olm_outbound_group_session_key_length(self._session) - key_buffer = ffi.new("char[]", key_length) - - ret = lib.olm_outbound_group_session_key( - self._session, - key_buffer, - key_length - ) - self._check_error(ret) - - return bytes_to_native_str(ffi.unpack(key_buffer, key_length)) diff --git a/python/olm/pk.py b/python/olm/pk.py deleted file mode 100644 index 4352359..0000000 --- a/python/olm/pk.py +++ /dev/null @@ -1,461 +0,0 @@ -# -*- coding: utf-8 -*- -# libolm python bindings -# Copyright © 2018 Damir Jelić <poljar@termina.org.uk> -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""libolm PK module. - -This module contains bindings to the PK part of the Olm library. -It contains two classes PkDecryption and PkEncryption that are used to -establish an encrypted communication channel using public key encryption, -as well as a class PkSigning that is used to sign a message. - -Examples: - >>> decryption = PkDecryption() - >>> encryption = PkEncryption(decryption.public_key) - >>> plaintext = "It's a secret to everybody." - >>> message = encryption.encrypt(plaintext) - >>> decrypted_plaintext = decryption.decrypt(message) - >>> seed = PkSigning.generate_seed() - >>> signing = PkSigning(seed) - >>> signature = signing.sign(plaintext) - >>> ed25519_verify(signing.public_key, plaintext, signature) - -""" - -from builtins import super -from typing import AnyStr, Type - -from future.utils import bytes_to_native_str - -from _libolm import ffi, lib # type: ignore - -from ._compat import URANDOM, to_bytearray, to_unicode_str -from ._finalize import track_for_finalization - - -class PkEncryptionError(Exception): - """libolm Pk encryption exception.""" - - -class PkDecryptionError(Exception): - """libolm Pk decryption exception.""" - - -class PkSigningError(Exception): - """libolm Pk signing exception.""" - - -def _clear_pk_encryption(pk_struct): - lib.olm_clear_pk_encryption(pk_struct) - - -class PkMessage(object): - """A PK encrypted message.""" - - def __init__(self, ephemeral_key, mac, ciphertext): - # type: (str, str, str) -> None - """Create a new PK encrypted message. - - Args: - ephemeral_key(str): the public part of the ephemeral key - used (together with the recipient's key) to generate a symmetric - encryption key. - mac(str): Message Authentication Code of the encrypted message - ciphertext(str): The cipher text of the encrypted message - """ - self.ephemeral_key = ephemeral_key - self.mac = mac - self.ciphertext = ciphertext - - -class PkEncryption(object): - """PkEncryption class. - - Represents the decryption part of a PK encrypted channel. - """ - - def __init__(self, recipient_key): - # type: (AnyStr) -> None - """Create a new PK encryption object. - - Args: - recipient_key(str): a public key that will be used for encryption - """ - if not recipient_key: - raise ValueError("Recipient key can't be empty") - - self._buf = ffi.new("char[]", lib.olm_pk_encryption_size()) - self._pk_encryption = lib.olm_pk_encryption(self._buf) - track_for_finalization(self, self._pk_encryption, _clear_pk_encryption) - - byte_key = to_bytearray(recipient_key) - lib.olm_pk_encryption_set_recipient_key( - self._pk_encryption, - ffi.from_buffer(byte_key), - len(byte_key) - ) - - # clear out copies of the key - if byte_key is not recipient_key: # pragma: no cover - for i in range(0, len(byte_key)): - byte_key[i] = 0 - - def _check_error(self, ret): # pragma: no cover - # type: (int) -> None - if ret != lib.olm_error(): - return - - last_error = bytes_to_native_str( - ffi.string(lib.olm_pk_encryption_last_error(self._pk_encryption))) - - raise PkEncryptionError(last_error) - - def encrypt(self, plaintext): - # type: (AnyStr) -> PkMessage - """Encrypt a message. - - Returns the encrypted PkMessage. - - Args: - plaintext(str): A string that will be encrypted using the - PkEncryption object. - """ - byte_plaintext = to_bytearray(plaintext) - - r_length = lib.olm_pk_encrypt_random_length(self._pk_encryption) - random = URANDOM(r_length) - random_buffer = ffi.new("char[]", random) - - ciphertext_length = lib.olm_pk_ciphertext_length( - self._pk_encryption, len(byte_plaintext) - ) - ciphertext = ffi.new("char[]", ciphertext_length) - - mac_length = lib.olm_pk_mac_length(self._pk_encryption) - mac = ffi.new("char[]", mac_length) - - ephemeral_key_size = lib.olm_pk_key_length() - ephemeral_key = ffi.new("char[]", ephemeral_key_size) - - ret = lib.olm_pk_encrypt( - self._pk_encryption, - ffi.from_buffer(byte_plaintext), len(byte_plaintext), - ciphertext, ciphertext_length, - mac, mac_length, - ephemeral_key, ephemeral_key_size, - random_buffer, r_length - ) - - try: - self._check_error(ret) - finally: # pragma: no cover - # clear out copies of plaintext - if byte_plaintext is not plaintext: - for i in range(0, len(byte_plaintext)): - byte_plaintext[i] = 0 - - message = PkMessage( - bytes_to_native_str( - ffi.unpack(ephemeral_key, ephemeral_key_size)), - bytes_to_native_str( - ffi.unpack(mac, mac_length)), - bytes_to_native_str( - ffi.unpack(ciphertext, ciphertext_length)) - ) - return message - - -def _clear_pk_decryption(pk_struct): - lib.olm_clear_pk_decryption(pk_struct) - - -class PkDecryption(object): - """PkDecryption class. - - Represents the decryption part of a PK encrypted channel. - - Attributes: - public_key (str): The public key of the PkDecryption object, can be - shared and used to create a PkEncryption object. - - """ - - def __new__(cls): - # type: (Type[PkDecryption]) -> PkDecryption - obj = super().__new__(cls) - obj._buf = ffi.new("char[]", lib.olm_pk_decryption_size()) - obj._pk_decryption = lib.olm_pk_decryption(obj._buf) - obj.public_key = None - track_for_finalization(obj, obj._pk_decryption, _clear_pk_decryption) - return obj - - def __init__(self): - if False: # pragma: no cover - self._pk_decryption = self._pk_decryption # type: ffi.cdata - - random_length = lib.olm_pk_private_key_length() - random = URANDOM(random_length) - random_buffer = ffi.new("char[]", random) - - key_length = lib.olm_pk_key_length() - key_buffer = ffi.new("char[]", key_length) - - ret = lib.olm_pk_key_from_private( - self._pk_decryption, - key_buffer, key_length, - random_buffer, random_length - ) - self._check_error(ret) - self.public_key = bytes_to_native_str(ffi.unpack( - key_buffer, - key_length - )) - - def _check_error(self, ret): - # type: (int) -> None - if ret != lib.olm_error(): - return - - last_error = bytes_to_native_str( - ffi.string(lib.olm_pk_decryption_last_error(self._pk_decryption))) - - raise PkDecryptionError(last_error) - - def pickle(self, passphrase=""): - # type: (str) -> bytes - """Store a PkDecryption object. - - Stores a PkDecryption object as a base64 string. Encrypts the object - using the supplied passphrase. Returns a byte object containing the - base64 encoded string of the pickled session. - - Args: - passphrase(str, optional): The passphrase to be used to encrypt - the object. - """ - byte_key = to_bytearray(passphrase) - - pickle_length = lib.olm_pickle_pk_decryption_length( - self._pk_decryption - ) - pickle_buffer = ffi.new("char[]", pickle_length) - - ret = lib.olm_pickle_pk_decryption( - self._pk_decryption, - ffi.from_buffer(byte_key), len(byte_key), - pickle_buffer, pickle_length - ) - try: - self._check_error(ret) - finally: - # zero out copies of the passphrase - for i in range(0, len(byte_key)): - byte_key[i] = 0 - - return ffi.unpack(pickle_buffer, pickle_length) - - @classmethod - def from_pickle(cls, pickle, passphrase=""): - # types: (bytes, str) -> PkDecryption - """Restore a previously stored PkDecryption object. - - Creates a PkDecryption object from a pickled base64 string. Decrypts - the pickled object using the supplied passphrase. - Raises PkDecryptionError on failure. If the passphrase - doesn't match the one used to encrypt the session then the error - message for the exception will be "BAD_ACCOUNT_KEY". If the base64 - couldn't be decoded then the error message will be "INVALID_BASE64". - - Args: - pickle(bytes): Base64 encoded byte string containing the pickled - PkDecryption object - passphrase(str, optional): The passphrase used to encrypt the - object - """ - if not pickle: - raise ValueError("Pickle can't be empty") - - byte_key = to_bytearray(passphrase) - pickle_buffer = ffi.new("char[]", pickle) - - pubkey_length = lib.olm_pk_key_length() - pubkey_buffer = ffi.new("char[]", pubkey_length) - - obj = cls.__new__(cls) - - ret = lib.olm_unpickle_pk_decryption( - obj._pk_decryption, - ffi.from_buffer(byte_key), len(byte_key), - pickle_buffer, len(pickle), - pubkey_buffer, pubkey_length) - - try: - obj._check_error(ret) - finally: - for i in range(0, len(byte_key)): - byte_key[i] = 0 - - obj.public_key = bytes_to_native_str(ffi.unpack( - pubkey_buffer, - pubkey_length - )) - - return obj - - def decrypt(self, message, unicode_errors="replace"): - # type (PkMessage, str) -> str - """Decrypt a previously encrypted Pk message. - - Returns the decrypted plaintext. - Raises PkDecryptionError on failure. - - Args: - message(PkMessage): the pk message to decrypt. - unicode_errors(str, optional): The error handling scheme to use for - unicode decoding errors. The default is "replace" meaning that - the character that was unable to decode will be replaced with - the unicode replacement character (U+FFFD). Other possible - values are "strict", "ignore" and "xmlcharrefreplace" as well - as any other name registered with codecs.register_error that - can handle UnicodeEncodeErrors. - """ - ephemeral_key = to_bytearray(message.ephemeral_key) - ephemeral_key_size = len(ephemeral_key) - - mac = to_bytearray(message.mac) - mac_length = len(mac) - - ciphertext = to_bytearray(message.ciphertext) - ciphertext_length = len(ciphertext) - - max_plaintext_length = lib.olm_pk_max_plaintext_length( - self._pk_decryption, - ciphertext_length - ) - plaintext_buffer = ffi.new("char[]", max_plaintext_length) - - ret = lib.olm_pk_decrypt( - self._pk_decryption, - ffi.from_buffer(ephemeral_key), ephemeral_key_size, - ffi.from_buffer(mac), mac_length, - ffi.from_buffer(ciphertext), ciphertext_length, - plaintext_buffer, max_plaintext_length) - self._check_error(ret) - - plaintext = (ffi.unpack( - plaintext_buffer, - ret - )) - - # clear out copies of the plaintext - lib.memset(plaintext_buffer, 0, max_plaintext_length) - - return to_unicode_str(plaintext, errors=unicode_errors) - - -def _clear_pk_signing(pk_struct): - lib.olm_clear_pk_signing(pk_struct) - - -class PkSigning(object): - """PkSigning class. - - Signs messages using public key cryptography. - - Attributes: - public_key (str): The public key of the PkSigning object, can be - shared and used to verify using Utility.ed25519_verify. - - """ - - def __init__(self, seed): - # type: (bytes) -> None - """Create a new signing object. - - Args: - seed(bytes): the seed to use as the private key for signing. The - seed must have the same length as the seeds generated by - PkSigning.generate_seed(). - """ - if not seed: - raise ValueError("seed can't be empty") - - self._buf = ffi.new("char[]", lib.olm_pk_signing_size()) - self._pk_signing = lib.olm_pk_signing(self._buf) - track_for_finalization(self, self._pk_signing, _clear_pk_signing) - - seed_buffer = ffi.new("char[]", seed) - - pubkey_length = lib.olm_pk_signing_public_key_length() - pubkey_buffer = ffi.new("char[]", pubkey_length) - - ret = lib.olm_pk_signing_key_from_seed( - self._pk_signing, - pubkey_buffer, pubkey_length, - seed_buffer, len(seed) - ) - - # zero out copies of the seed - lib.memset(seed_buffer, 0, len(seed)) - - self._check_error(ret) - - self.public_key = bytes_to_native_str( - ffi.unpack(pubkey_buffer, pubkey_length) - ) - - def _check_error(self, ret): - # type: (int) -> None - if ret != lib.olm_error(): - return - - last_error = bytes_to_native_str( - ffi.string(lib.olm_pk_signing_last_error(self._pk_signing))) - - raise PkSigningError(last_error) - - @classmethod - def generate_seed(cls): - # type: () -> bytes - """Generate a random seed. - """ - random_length = lib.olm_pk_signing_seed_length() - random = URANDOM(random_length) - - return random - - def sign(self, message): - # type: (AnyStr) -> str - """Sign a message - - Returns the signature. - Raises PkSigningError on failure. - - Args: - message(str): the message to sign. - """ - bytes_message = to_bytearray(message) - - signature_length = lib.olm_pk_signature_length() - signature_buffer = ffi.new("char[]", signature_length) - - ret = lib.olm_pk_sign( - self._pk_signing, - ffi.from_buffer(bytes_message), len(bytes_message), - signature_buffer, signature_length) - self._check_error(ret) - - return bytes_to_native_str( - ffi.unpack(signature_buffer, signature_length) - ) diff --git a/python/olm/sas.py b/python/olm/sas.py deleted file mode 100644 index cf2a443..0000000 --- a/python/olm/sas.py +++ /dev/null @@ -1,245 +0,0 @@ -# -*- coding: utf-8 -*- -# libolm python bindings -# Copyright © 2019 Damir Jelić <poljar@termina.org.uk> -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""libolm SAS module. - -This module contains functions to perform key verification using the Short -Authentication String (SAS) method. - -Examples: - >>> sas = Sas() - >>> bob_key = "3p7bfXt9wbTTW2HC7OQ1Nz+DQ8hbeGdNrfx+FG+IK08" - >>> message = "Hello world!" - >>> extra_info = "MAC" - >>> sas_alice.set_their_pubkey(bob_key) - >>> sas_alice.calculate_mac(message, extra_info) - >>> sas_alice.generate_bytes(extra_info, 5) - -""" - -from builtins import bytes -from functools import wraps -from typing import Optional - -from future.utils import bytes_to_native_str - -from _libolm import ffi, lib - -from ._compat import URANDOM, to_bytearray, to_bytes -from ._finalize import track_for_finalization - - -def _clear_sas(sas): - # type: (ffi.cdata) -> None - lib.olm_clear_sas(sas) - - -class OlmSasError(Exception): - """libolm Sas error exception.""" - - -class Sas(object): - """libolm Short Authenticaton String (SAS) class.""" - - def __init__(self, other_users_pubkey=None): - # type: (Optional[str]) -> None - """Create a new SAS object. - - Args: - other_users_pubkey(str, optional): The other users public key, this - key is necesary to generate bytes for the authentication string - as well as to calculate the MAC. - - Raises OlmSasError on failure. - - """ - self._buf = ffi.new("char[]", lib.olm_sas_size()) - self._sas = lib.olm_sas(self._buf) - track_for_finalization(self, self._sas, _clear_sas) - - random_length = lib.olm_create_sas_random_length(self._sas) - random = URANDOM(random_length) - - self._create_sas(random, random_length) - - if other_users_pubkey: - self.set_their_pubkey(other_users_pubkey) - - def _create_sas(self, buffer, buffer_length): - self._check_error( - lib.olm_create_sas( - self._sas, - ffi.from_buffer(buffer), - buffer_length - ) - ) - - def _check_error(self, ret): - # type: (int) -> None - if ret != lib.olm_error(): - return - - last_error = bytes_to_native_str( - ffi.string((lib.olm_sas_last_error(self._sas)))) - - raise OlmSasError(last_error) - - @property - def pubkey(self): - # type: () -> str - """Get the public key for the SAS object. - - This returns the public key of the SAS object that can then be shared - with another user to perform the authentication process. - - Raises OlmSasError on failure. - - """ - pubkey_length = lib.olm_sas_pubkey_length(self._sas) - pubkey_buffer = ffi.new("char[]", pubkey_length) - - self._check_error( - lib.olm_sas_get_pubkey(self._sas, pubkey_buffer, pubkey_length) - ) - - return bytes_to_native_str(ffi.unpack(pubkey_buffer, pubkey_length)) - - @property - def other_key_set(self): - # type: () -> bool - """Check if the other user's pubkey has been set. - """ - return lib.olm_sas_is_their_key_set(self._sas) == 1 - - def set_their_pubkey(self, key): - # type: (str) -> None - """Set the public key of the other user. - - This sets the public key of the other user, it needs to be set before - bytes can be generated for the authentication string and a MAC can be - calculated. - - Args: - key (str): The other users public key. - - Raises OlmSasError on failure. - - """ - byte_key = to_bytearray(key) - - self._check_error( - lib.olm_sas_set_their_key( - self._sas, - ffi.from_buffer(byte_key), - len(byte_key) - ) - ) - - def generate_bytes(self, extra_info, length): - # type: (str, int) -> bytes - """Generate bytes to use for the short authentication string. - - Args: - extra_info (str): Extra information to mix in when generating the - bytes. - length (int): The number of bytes to generate. - - Raises OlmSasError if the other users persons public key isn't set or - an internal Olm error happens. - - """ - if length < 1: - raise ValueError("The length needs to be a positive integer value") - - byte_info = to_bytearray(extra_info) - out_buffer = ffi.new("char[]", length) - - self._check_error( - lib.olm_sas_generate_bytes( - self._sas, - ffi.from_buffer(byte_info), - len(byte_info), - out_buffer, - length - ) - ) - - return ffi.unpack(out_buffer, length) - - def calculate_mac(self, message, extra_info): - # type: (str, str) -> str - """Generate a message authentication code based on the shared secret. - - Args: - message (str): The message to produce the authentication code for. - extra_info (str): Extra information to mix in when generating the - MAC - - Raises OlmSasError on failure. - - """ - byte_message = to_bytes(message) - byte_info = to_bytes(extra_info) - - mac_length = lib.olm_sas_mac_length(self._sas) - mac_buffer = ffi.new("char[]", mac_length) - - self._check_error( - lib.olm_sas_calculate_mac( - self._sas, - ffi.from_buffer(byte_message), - len(byte_message), - ffi.from_buffer(byte_info), - len(byte_info), - mac_buffer, - mac_length - ) - ) - return bytes_to_native_str(ffi.unpack(mac_buffer, mac_length)) - - def calculate_mac_long_kdf(self, message, extra_info): - # type: (str, str) -> str - """Generate a message authentication code based on the shared secret. - - This function should not be used unless compatibility with an older - non-tagged Olm version is required. - - Args: - message (str): The message to produce the authentication code for. - extra_info (str): Extra information to mix in when generating the - MAC - - Raises OlmSasError on failure. - - """ - byte_message = to_bytes(message) - byte_info = to_bytes(extra_info) - - mac_length = lib.olm_sas_mac_length(self._sas) - mac_buffer = ffi.new("char[]", mac_length) - - self._check_error( - lib.olm_sas_calculate_mac_long_kdf( - self._sas, - ffi.from_buffer(byte_message), - len(byte_message), - ffi.from_buffer(byte_info), - len(byte_info), - mac_buffer, - mac_length - ) - ) - return bytes_to_native_str(ffi.unpack(mac_buffer, mac_length)) diff --git a/python/olm/session.py b/python/olm/session.py deleted file mode 100644 index 636eb3d..0000000 --- a/python/olm/session.py +++ /dev/null @@ -1,495 +0,0 @@ -# -*- coding: utf-8 -*- -# libolm python bindings -# Copyright © 2015-2017 OpenMarket Ltd -# Copyright © 2018 Damir Jelić <poljar@termina.org.uk> -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""libolm Session module. - -This module contains the Olm Session part of the Olm library. - -It is used to establish a peer-to-peer encrypted communication channel between -two Olm accounts. - -Examples: - >>> alice = Account() - >>> bob = Account() - >>> bob.generate_one_time_keys(1) - >>> id_key = bob.identity_keys['curve25519'] - >>> one_time = list(bob.one_time_keys["curve25519"].values())[0] - >>> session = OutboundSession(alice, id_key, one_time) - -""" - -# pylint: disable=redefined-builtin,unused-import -from builtins import bytes, super -from typing import AnyStr, Optional, Type - -from future.utils import bytes_to_native_str - -# pylint: disable=no-name-in-module -from _libolm import ffi, lib # type: ignore - -from ._compat import URANDOM, to_bytearray, to_bytes, to_unicode_str -from ._finalize import track_for_finalization - -# This is imported only for type checking purposes -if False: - from .account import Account # pragma: no cover - - -class OlmSessionError(Exception): - """libolm Session exception.""" - - -class _OlmMessage(object): - def __init__(self, ciphertext, message_type): - # type: (AnyStr, ffi.cdata) -> None - if not ciphertext: - raise ValueError("Ciphertext can't be empty") - - # I don't know why mypy wants a type annotation here nor why AnyStr - # doesn't work - self.ciphertext = ciphertext # type: ignore - self.message_type = message_type - - def __str__(self): - # type: () -> str - type_to_prefix = { - lib.OLM_MESSAGE_TYPE_PRE_KEY: "PRE_KEY", - lib.OLM_MESSAGE_TYPE_MESSAGE: "MESSAGE" - } - - prefix = type_to_prefix[self.message_type] - return "{} {}".format(prefix, self.ciphertext) - - -class OlmPreKeyMessage(_OlmMessage): - """Olm prekey message class - - Prekey messages are used to establish an Olm session. After the first - message exchange the session switches to normal messages - """ - - def __init__(self, ciphertext): - # type: (AnyStr) -> None - """Create a new Olm prekey message with the supplied ciphertext - - Args: - ciphertext(str): The ciphertext of the prekey message. - """ - _OlmMessage.__init__(self, ciphertext, lib.OLM_MESSAGE_TYPE_PRE_KEY) - - def __repr__(self): - # type: () -> str - return "OlmPreKeyMessage({})".format(self.ciphertext) - - -class OlmMessage(_OlmMessage): - """Olm message class""" - - def __init__(self, ciphertext): - # type: (AnyStr) -> None - """Create a new Olm message with the supplied ciphertext - - Args: - ciphertext(str): The ciphertext of the message. - """ - _OlmMessage.__init__(self, ciphertext, lib.OLM_MESSAGE_TYPE_MESSAGE) - - def __repr__(self): - # type: () -> str - return "OlmMessage({})".format(self.ciphertext) - - -def _clear_session(session): - # type: (ffi.cdata) -> None - lib.olm_clear_session(session) - - -class Session(object): - """libolm Session class. - This is an abstract class that can't be instantiated except when unpickling - a previously pickled InboundSession or OutboundSession object with - from_pickle. - """ - - def __new__(cls): - # type: (Type[Session]) -> Session - - obj = super().__new__(cls) - obj._buf = ffi.new("char[]", lib.olm_session_size()) - obj._session = lib.olm_session(obj._buf) - track_for_finalization(obj, obj._session, _clear_session) - return obj - - def __init__(self): - # type: () -> None - if type(self) is Session: - raise TypeError("Session class may not be instantiated.") - - if False: - self._session = self._session # type: ffi.cdata - - def _check_error(self, ret): - # type: (int) -> None - if ret != lib.olm_error(): - return - - last_error = bytes_to_native_str( - ffi.string(lib.olm_session_last_error(self._session))) - - raise OlmSessionError(last_error) - - def pickle(self, passphrase=""): - # type: (Optional[str]) -> bytes - """Store an Olm session. - - Stores a session as a base64 string. Encrypts the session using the - supplied passphrase. Returns a byte object containing the base64 - encoded string of the pickled session. Raises OlmSessionError on - failure. - - Args: - passphrase(str, optional): The passphrase to be used to encrypt - the session. - """ - byte_key = bytearray(passphrase, "utf-8") if passphrase else b"" - - pickle_length = lib.olm_pickle_session_length(self._session) - pickle_buffer = ffi.new("char[]", pickle_length) - - try: - self._check_error( - lib.olm_pickle_session(self._session, - ffi.from_buffer(byte_key), - len(byte_key), - pickle_buffer, pickle_length)) - finally: - # clear out copies of the passphrase - for i in range(0, len(byte_key)): - byte_key[i] = 0 - - return ffi.unpack(pickle_buffer, pickle_length) - - @classmethod - def from_pickle(cls, pickle, passphrase=""): - # type: (bytes, Optional[str]) -> Session - """Load a previously stored Olm session. - - Loads a session from a pickled base64 string and returns a Session - object. Decrypts the session using the supplied passphrase. Raises - OlmSessionError on failure. If the passphrase doesn't match the one - used to encrypt the session then the error message for the - exception will be "BAD_ACCOUNT_KEY". If the base64 couldn't be decoded - then the error message will be "INVALID_BASE64". - - Args: - pickle(bytes): Base64 encoded byte string containing the pickled - session - passphrase(str, optional): The passphrase used to encrypt the - session. - """ - if not pickle: - raise ValueError("Pickle can't be empty") - - byte_key = bytearray(passphrase, "utf-8") if passphrase else b"" - # copy because unpickle will destroy the buffer - pickle_buffer = ffi.new("char[]", pickle) - - session = cls.__new__(cls) - - try: - ret = lib.olm_unpickle_session(session._session, - ffi.from_buffer(byte_key), - len(byte_key), - pickle_buffer, - len(pickle)) - session._check_error(ret) - finally: - # clear out copies of the passphrase - for i in range(0, len(byte_key)): - byte_key[i] = 0 - - return session - - def encrypt(self, plaintext): - # type: (AnyStr) -> _OlmMessage - """Encrypts a message using the session. Returns the ciphertext as a - base64 encoded string on success. Raises OlmSessionError on failure. - - Args: - plaintext(str): The plaintext message that will be encrypted. - """ - byte_plaintext = to_bytearray(plaintext) - - r_length = lib.olm_encrypt_random_length(self._session) - random = URANDOM(r_length) - - try: - message_type = lib.olm_encrypt_message_type(self._session) - - self._check_error(message_type) - - ciphertext_length = lib.olm_encrypt_message_length( - self._session, len(byte_plaintext) - ) - ciphertext_buffer = ffi.new("char[]", ciphertext_length) - - self._check_error(lib.olm_encrypt( - self._session, - ffi.from_buffer(byte_plaintext), len(byte_plaintext), - ffi.from_buffer(random), r_length, - ciphertext_buffer, ciphertext_length, - )) - finally: - # clear out copies of plaintext - if byte_plaintext is not plaintext: - for i in range(0, len(byte_plaintext)): - byte_plaintext[i] = 0 - - if message_type == lib.OLM_MESSAGE_TYPE_PRE_KEY: - return OlmPreKeyMessage( - bytes_to_native_str(ffi.unpack( - ciphertext_buffer, - ciphertext_length - ))) - elif message_type == lib.OLM_MESSAGE_TYPE_MESSAGE: - return OlmMessage( - bytes_to_native_str(ffi.unpack( - ciphertext_buffer, - ciphertext_length - ))) - else: # pragma: no cover - raise ValueError("Unknown message type") - - def decrypt(self, message, unicode_errors="replace"): - # type: (_OlmMessage, str) -> str - """Decrypts a message using the session. Returns the plaintext string - on success. Raises OlmSessionError on failure. If the base64 couldn't - be decoded then the error message will be "INVALID_BASE64". If the - message is for an unsupported version of the protocol the error message - will be "BAD_MESSAGE_VERSION". If the message couldn't be decoded then - the error message will be "BAD_MESSAGE_FORMAT". If the MAC on the - message was invalid then the error message will be "BAD_MESSAGE_MAC". - - Args: - message(OlmMessage): The Olm message that will be decrypted. It can - be either an OlmPreKeyMessage or an OlmMessage. - unicode_errors(str, optional): The error handling scheme to use for - unicode decoding errors. The default is "replace" meaning that - the character that was unable to decode will be replaced with - the unicode replacement character (U+FFFD). Other possible - values are "strict", "ignore" and "xmlcharrefreplace" as well - as any other name registered with codecs.register_error that - can handle UnicodeEncodeErrors. - """ - if not message.ciphertext: - raise ValueError("Ciphertext can't be empty") - - byte_ciphertext = to_bytes(message.ciphertext) - # make a copy the ciphertext buffer, because - # olm_decrypt_max_plaintext_length wants to destroy something - ciphertext_buffer = ffi.new("char[]", byte_ciphertext) - - max_plaintext_length = lib.olm_decrypt_max_plaintext_length( - self._session, message.message_type, ciphertext_buffer, - len(byte_ciphertext) - ) - self._check_error(max_plaintext_length) - plaintext_buffer = ffi.new("char[]", max_plaintext_length) - - # make a copy the ciphertext buffer, because - # olm_decrypt_max_plaintext_length wants to destroy something - ciphertext_buffer = ffi.new("char[]", byte_ciphertext) - plaintext_length = lib.olm_decrypt( - self._session, message.message_type, - ciphertext_buffer, len(byte_ciphertext), - plaintext_buffer, max_plaintext_length - ) - self._check_error(plaintext_length) - plaintext = to_unicode_str( - ffi.unpack(plaintext_buffer, plaintext_length), - errors=unicode_errors - ) - - # clear out copies of the plaintext - lib.memset(plaintext_buffer, 0, max_plaintext_length) - - return plaintext - - @property - def id(self): - # type: () -> str - """str: An identifier for this session. Will be the same for both - ends of the conversation. - """ - id_length = lib.olm_session_id_length(self._session) - id_buffer = ffi.new("char[]", id_length) - - self._check_error( - lib.olm_session_id(self._session, id_buffer, id_length) - ) - return bytes_to_native_str(ffi.unpack(id_buffer, id_length)) - - def matches(self, message, identity_key=None): - # type: (OlmPreKeyMessage, Optional[AnyStr]) -> bool - """Checks if the PRE_KEY message is for this in-bound session. - This can happen if multiple messages are sent to this session before - this session sends a message in reply. Returns True if the session - matches. Returns False if the session does not match. Raises - OlmSessionError on failure. If the base64 couldn't be decoded then the - error message will be "INVALID_BASE64". If the message was for an - unsupported protocol version then the error message will be - "BAD_MESSAGE_VERSION". If the message couldn't be decoded then then the - error message will be * "BAD_MESSAGE_FORMAT". - - Args: - message(OlmPreKeyMessage): The Olm prekey message that will checked - if it is intended for this session. - identity_key(str, optional): The identity key of the sender. To - check if the message was also sent using this identity key. - """ - if not isinstance(message, OlmPreKeyMessage): - raise TypeError("Matches can only be called with prekey messages.") - - if not message.ciphertext: - raise ValueError("Ciphertext can't be empty") - - ret = None - - byte_ciphertext = to_bytes(message.ciphertext) - # make a copy, because olm_matches_inbound_session(_from) will distroy - # it - message_buffer = ffi.new("char[]", byte_ciphertext) - - if identity_key: - byte_id_key = to_bytes(identity_key) - - ret = lib.olm_matches_inbound_session_from( - self._session, - ffi.from_buffer(byte_id_key), len(byte_id_key), - message_buffer, len(byte_ciphertext) - ) - - else: - ret = lib.olm_matches_inbound_session( - self._session, - message_buffer, len(byte_ciphertext)) - - self._check_error(ret) - - return bool(ret) - - -class InboundSession(Session): - """Inbound Olm session for p2p encrypted communication. - """ - - def __new__(cls, account, message, identity_key=None): - # type: (Account, OlmPreKeyMessage, Optional[AnyStr]) -> Session - return super().__new__(cls) - - def __init__(self, account, message, identity_key=None): - # type: (Account, OlmPreKeyMessage, Optional[AnyStr]) -> None - """Create a new inbound Olm session. - - Create a new in-bound session for sending/receiving messages from an - incoming prekey message. Raises OlmSessionError on failure. If the - base64 couldn't be decoded then error message will be "INVALID_BASE64". - If the message was for an unsupported protocol version then - the errror message will be "BAD_MESSAGE_VERSION". If the message - couldn't be decoded then then the error message will be - "BAD_MESSAGE_FORMAT". If the message refers to an unknown one-time - key then the error message will be "BAD_MESSAGE_KEY_ID". - - Args: - account(Account): The Olm Account that will be used to create this - session. - message(OlmPreKeyMessage): The Olm prekey message that will checked - that will be used to create this session. - identity_key(str, optional): The identity key of the sender. To - check if the message was also sent using this identity key. - """ - if not message.ciphertext: - raise ValueError("Ciphertext can't be empty") - - super().__init__() - byte_ciphertext = to_bytes(message.ciphertext) - message_buffer = ffi.new("char[]", byte_ciphertext) - - if identity_key: - byte_id_key = to_bytes(identity_key) - identity_key_buffer = ffi.new("char[]", byte_id_key) - self._check_error(lib.olm_create_inbound_session_from( - self._session, - account._account, - identity_key_buffer, len(byte_id_key), - message_buffer, len(byte_ciphertext) - )) - else: - self._check_error(lib.olm_create_inbound_session( - self._session, - account._account, - message_buffer, len(byte_ciphertext) - )) - - -class OutboundSession(Session): - """Outbound Olm session for p2p encrypted communication.""" - - def __new__(cls, account, identity_key, one_time_key): - # type: (Account, AnyStr, AnyStr) -> Session - return super().__new__(cls) - - def __init__(self, account, identity_key, one_time_key): - # type: (Account, AnyStr, AnyStr) -> None - """Create a new outbound Olm session. - - Creates a new outbound session for sending messages to a given - identity key and one-time key. - - Raises OlmSessionError on failure. If the keys couldn't be decoded as - base64 then the error message will be "INVALID_BASE64". - - Args: - account(Account): The Olm Account that will be used to create this - session. - identity_key(str): The identity key of the person with whom we want - to start the session. - one_time_key(str): A one-time key from the person with whom we want - to start the session. - """ - if not identity_key: - raise ValueError("Identity key can't be empty") - - if not one_time_key: - raise ValueError("One-time key can't be empty") - - super().__init__() - - byte_id_key = to_bytes(identity_key) - byte_one_time = to_bytes(one_time_key) - - session_random_length = lib.olm_create_outbound_session_random_length( - self._session) - - random = URANDOM(session_random_length) - - self._check_error(lib.olm_create_outbound_session( - self._session, - account._account, - ffi.from_buffer(byte_id_key), len(byte_id_key), - ffi.from_buffer(byte_one_time), len(byte_one_time), - ffi.from_buffer(random), session_random_length - )) diff --git a/python/olm/utility.py b/python/olm/utility.py deleted file mode 100644 index bddef38..0000000 --- a/python/olm/utility.py +++ /dev/null @@ -1,151 +0,0 @@ -# -*- coding: utf-8 -*- -# libolm python bindings -# Copyright © 2015-2017 OpenMarket Ltd -# Copyright © 2018 Damir Jelić <poljar@termina.org.uk> -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""libolm Utility module. - -This module contains utilities for olm. -It only contains the ed25519_verify function for signature verification. - -Examples: - >>> alice = Account() - - >>> message = "Test" - >>> signature = alice.sign(message) - >>> signing_key = alice.identity_keys["ed25519"] - - >>> ed25519_verify(signing_key, message, signature) - -""" - -# pylint: disable=redefined-builtin,unused-import -from typing import AnyStr, Type - -from future.utils import bytes_to_native_str - -# pylint: disable=no-name-in-module -from _libolm import ffi, lib # type: ignore - -from ._compat import to_bytearray, to_bytes -from ._finalize import track_for_finalization - - -def _clear_utility(utility): # pragma: no cover - # type: (ffi.cdata) -> None - lib.olm_clear_utility(utility) - - -class OlmVerifyError(Exception): - """libolm signature verification exception.""" - - -class OlmHashError(Exception): - """libolm hash calculation exception.""" - - -class _Utility(object): - # pylint: disable=too-few-public-methods - """libolm Utility class.""" - - _buf = None - _utility = None - - @classmethod - def _allocate(cls): - # type: (Type[_Utility]) -> None - cls._buf = ffi.new("char[]", lib.olm_utility_size()) - cls._utility = lib.olm_utility(cls._buf) - track_for_finalization(cls, cls._utility, _clear_utility) - - @classmethod - def _check_error(cls, ret, error_class): - # type: (int, Type) -> None - if ret != lib.olm_error(): - return - - raise error_class("{}".format( - ffi.string(lib.olm_utility_last_error( - cls._utility)).decode("utf-8"))) - - @classmethod - def _ed25519_verify(cls, key, message, signature): - # type: (Type[_Utility], AnyStr, AnyStr, AnyStr) -> None - if not cls._utility: - cls._allocate() - - byte_key = to_bytes(key) - byte_message = to_bytearray(message) - byte_signature = to_bytearray(signature) - - try: - ret = lib.olm_ed25519_verify( - cls._utility, - byte_key, - len(byte_key), - ffi.from_buffer(byte_message), - len(byte_message), - ffi.from_buffer(byte_signature), - len(byte_signature) - ) - - cls._check_error(ret, OlmVerifyError) - - finally: - # clear out copies of the message, which may be a plaintext - if byte_message is not message: - for i in range(0, len(byte_message)): - byte_message[i] = 0 - - @classmethod - def _sha256(cls, input): - # type: (Type[_Utility], AnyStr) -> str - if not cls._utility: - cls._allocate() - - byte_input = to_bytes(input) - hash_length = lib.olm_sha256_length(cls._utility) - hash = ffi.new("char[]", hash_length) - - ret = lib.olm_sha256(cls._utility, byte_input, len(byte_input), - hash, hash_length) - - cls._check_error(ret, OlmHashError) - - return bytes_to_native_str(ffi.unpack(hash, hash_length)) - - -def ed25519_verify(key, message, signature): - # type: (AnyStr, AnyStr, AnyStr) -> None - """Verify an ed25519 signature. - - Raises an OlmVerifyError if verification fails. - - Args: - key(str): The ed25519 public key used for signing. - message(str): The signed message. - signature(bytes): The message signature. - """ - return _Utility._ed25519_verify(key, message, signature) - - -def sha256(input_string): - # type: (AnyStr) -> str - """Calculate the SHA-256 hash of the input and encodes it as base64. - - Args: - input_string(str): The input for which the hash will be calculated. - - """ - return _Utility._sha256(input_string) diff --git a/python/olm_build.py b/python/olm_build.py deleted file mode 100644 index 0606337..0000000 --- a/python/olm_build.py +++ /dev/null @@ -1,62 +0,0 @@ -# -*- coding: utf-8 -*- - -# libolm python bindings -# Copyright © 2018 Damir Jelić <poljar@termina.org.uk> -# -# Permission to use, copy, modify, and/or distribute this software for -# any purpose with or without fee is hereby granted, provided that the -# above copyright notice and this permission notice appear in all copies. -# -# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES -# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF -# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY -# SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER -# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF -# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN -# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - -from __future__ import unicode_literals - -import os -import subprocess - -from cffi import FFI - -ffibuilder = FFI() -PATH = os.path.dirname(__file__) - -DEVELOP = os.environ.get("DEVELOP") - -compile_args = ["-I../include"] -link_args = ["-L../build"] - -if DEVELOP and DEVELOP.lower() in ["yes", "true", "1"]: - link_args.append('-Wl,-rpath=../build') - -headers_build = subprocess.Popen("make headers", shell=True) -headers_build.wait() - -ffibuilder.set_source( - "_libolm", - r""" - #include <olm/olm.h> - #include <olm/inbound_group_session.h> - #include <olm/outbound_group_session.h> - #include <olm/pk.h> - #include <olm/sas.h> - """, - libraries=["olm"], - extra_compile_args=compile_args, - extra_link_args=link_args) - -with open(os.path.join(PATH, "include/olm/olm.h")) as f: - ffibuilder.cdef(f.read(), override=True) - -with open(os.path.join(PATH, "include/olm/pk.h")) as f: - ffibuilder.cdef(f.read(), override=True) - -with open(os.path.join(PATH, "include/olm/sas.h")) as f: - ffibuilder.cdef(f.read(), override=True) - -if __name__ == "__main__": - ffibuilder.compile(verbose=True) diff --git a/python/requirements.txt b/python/requirements.txt deleted file mode 100644 index c627b85..0000000 --- a/python/requirements.txt +++ /dev/null @@ -1,3 +0,0 @@ -future -cffi -typing diff --git a/python/setup.cfg b/python/setup.cfg deleted file mode 100644 index d10b7e4..0000000 --- a/python/setup.cfg +++ /dev/null @@ -1,8 +0,0 @@ -[tool:pytest] -testpaths = tests -flake8-ignore = - olm/*.py F401 - tests/*.py W503 - -[coverage:run] -omit=olm/__version__.py diff --git a/python/setup.py b/python/setup.py deleted file mode 100644 index 5742fd9..0000000 --- a/python/setup.py +++ /dev/null @@ -1,31 +0,0 @@ -# -*- coding: utf-8 -*- - -import os -from codecs import open - -from setuptools import setup - -here = os.path.abspath(os.path.dirname(__file__)) - -about = {} -with open(os.path.join(here, "olm", "__version__.py"), "r", "utf-8") as f: - exec(f.read(), about) - -setup( - name=about["__title__"], - version=about["__version__"], - description=about["__description__"], - author=about["__author__"], - author_email=about["__author_email__"], - url=about["__url__"], - license=about["__license__"], - packages=["olm"], - setup_requires=["cffi>=1.0.0"], - cffi_modules=["olm_build.py:ffibuilder"], - install_requires=[ - "cffi>=1.0.0", - "future", - "typing;python_version<'3.5'" - ], - zip_safe=False -) diff --git a/python/test-requirements.txt b/python/test-requirements.txt deleted file mode 100644 index b18f27f..0000000 --- a/python/test-requirements.txt +++ /dev/null @@ -1,6 +0,0 @@ -pytest -pytest-flake8 -pytest-isort -pytest-cov -pytest-benchmark -aspectlib diff --git a/python/tests/account_test.py b/python/tests/account_test.py deleted file mode 100644 index f3b71eb..0000000 --- a/python/tests/account_test.py +++ /dev/null @@ -1,114 +0,0 @@ -from builtins import int - -import pytest - -from olm import Account, OlmAccountError, OlmVerifyError, ed25519_verify -from olm._compat import to_bytes - - -class TestClass(object): - def test_to_bytes(self): - assert isinstance(to_bytes("a"), bytes) - assert isinstance(to_bytes(u"a"), bytes) - assert isinstance(to_bytes(b"a"), bytes) - assert isinstance(to_bytes(r"a"), bytes) - with pytest.raises(TypeError): - to_bytes(0) - - def test_account_creation(self): - alice = Account() - assert alice.identity_keys - assert len(alice.identity_keys) == 2 - - def test_account_pickle(self): - alice = Account() - pickle = alice.pickle() - assert (alice.identity_keys == Account.from_pickle(pickle) - .identity_keys) - - def test_invalid_unpickle(self): - with pytest.raises(ValueError): - Account.from_pickle(b"") - - def test_passphrase_pickle(self): - alice = Account() - passphrase = "It's a secret to everybody" - pickle = alice.pickle(passphrase) - assert (alice.identity_keys == Account.from_pickle( - pickle, passphrase).identity_keys) - - def test_wrong_passphrase_pickle(self): - alice = Account() - passphrase = "It's a secret to everybody" - pickle = alice.pickle(passphrase) - - with pytest.raises(OlmAccountError): - Account.from_pickle(pickle, "") - - def test_one_time_keys(self): - alice = Account() - alice.generate_one_time_keys(10) - one_time_keys = alice.one_time_keys - assert one_time_keys - assert len(one_time_keys["curve25519"]) == 10 - - def test_max_one_time_keys(self): - alice = Account() - assert isinstance(alice.max_one_time_keys, int) - - def test_publish_one_time_keys(self): - alice = Account() - alice.generate_one_time_keys(10) - one_time_keys = alice.one_time_keys - - assert one_time_keys - assert len(one_time_keys["curve25519"]) == 10 - - alice.mark_keys_as_published() - assert not alice.one_time_keys["curve25519"] - - def test_clear(self): - alice = Account() - del alice - - def test_valid_signature(self): - message = "It's a secret to everybody" - alice = Account() - - signature = alice.sign(message) - signing_key = alice.identity_keys["ed25519"] - - assert signature - assert signing_key - - ed25519_verify(signing_key, message, signature) - - def test_invalid_signature(self): - message = "It's a secret to everybody" - alice = Account() - bob = Account() - - signature = alice.sign(message) - signing_key = bob.identity_keys["ed25519"] - - assert signature - assert signing_key - - with pytest.raises(OlmVerifyError): - ed25519_verify(signing_key, message, signature) - - def test_signature_verification_twice(self): - message = "It's a secret to everybody" - alice = Account() - - signature = alice.sign(message) - signing_key = alice.identity_keys["ed25519"] - - assert signature - assert signing_key - - ed25519_verify(signing_key, message, signature) - assert signature == alice.sign(message) - - ed25519_verify(signing_key, message, signature) - assert signature == alice.sign(message) diff --git a/python/tests/group_session_test.py b/python/tests/group_session_test.py deleted file mode 100644 index 4632a60..0000000 --- a/python/tests/group_session_test.py +++ /dev/null @@ -1,128 +0,0 @@ -# -*- coding: utf-8 -*- -import pytest - -from olm import InboundGroupSession, OlmGroupSessionError, OutboundGroupSession - - -class TestClass(object): - def test_session_create(self): - OutboundGroupSession() - - def test_session_id(self): - session = OutboundGroupSession() - assert isinstance(session.id, str) - - def test_session_index(self): - session = OutboundGroupSession() - assert isinstance(session.message_index, int) - assert session.message_index == 0 - - def test_outbound_pickle(self): - session = OutboundGroupSession() - pickle = session.pickle() - - assert (session.id == OutboundGroupSession.from_pickle( - pickle).id) - - def test_invalid_unpickle(self): - with pytest.raises(ValueError): - OutboundGroupSession.from_pickle(b"") - - with pytest.raises(ValueError): - InboundGroupSession.from_pickle(b"") - - def test_inbound_create(self): - outbound = OutboundGroupSession() - InboundGroupSession(outbound.session_key) - - def test_invalid_decrypt(self): - outbound = OutboundGroupSession() - inbound = InboundGroupSession(outbound.session_key) - - with pytest.raises(ValueError): - inbound.decrypt("") - - def test_inbound_pickle(self): - outbound = OutboundGroupSession() - inbound = InboundGroupSession(outbound.session_key) - pickle = inbound.pickle() - InboundGroupSession.from_pickle(pickle) - - def test_inbound_export(self): - outbound = OutboundGroupSession() - inbound = InboundGroupSession(outbound.session_key) - imported = InboundGroupSession.import_session( - inbound.export_session(inbound.first_known_index) - ) - assert "Test", 0 == imported.decrypt(outbound.encrypt("Test")) - - def test_first_index(self): - outbound = OutboundGroupSession() - inbound = InboundGroupSession(outbound.session_key) - index = inbound.first_known_index - assert isinstance(index, int) - - def test_encrypt(self, benchmark): - benchmark.weave(OutboundGroupSession.encrypt, lazy=True) - outbound = OutboundGroupSession() - inbound = InboundGroupSession(outbound.session_key) - assert "Test", 0 == inbound.decrypt(outbound.encrypt("Test")) - - def test_decrypt(self, benchmark): - benchmark.weave(InboundGroupSession.decrypt, lazy=True) - outbound = OutboundGroupSession() - inbound = InboundGroupSession(outbound.session_key) - assert "Test", 0 == inbound.decrypt(outbound.encrypt("Test")) - - def test_decrypt_twice(self): - outbound = OutboundGroupSession() - inbound = InboundGroupSession(outbound.session_key) - outbound.encrypt("Test 1") - message, index = inbound.decrypt(outbound.encrypt("Test 2")) - assert isinstance(index, int) - assert ("Test 2", 1) == (message, index) - - def test_decrypt_failure(self): - outbound = OutboundGroupSession() - inbound = InboundGroupSession(outbound.session_key) - eve_outbound = OutboundGroupSession() - with pytest.raises(OlmGroupSessionError): - inbound.decrypt(eve_outbound.encrypt("Test")) - - def test_id(self): - outbound = OutboundGroupSession() - inbound = InboundGroupSession(outbound.session_key) - assert outbound.id == inbound.id - - def test_inbound_fail(self): - with pytest.raises(TypeError): - InboundGroupSession() - - def test_oubtound_pickle_fail(self): - outbound = OutboundGroupSession() - pickle = outbound.pickle("Test") - - with pytest.raises(OlmGroupSessionError): - OutboundGroupSession.from_pickle(pickle) - - def test_outbound_clear(self): - session = OutboundGroupSession() - del session - - def test_inbound_clear(self): - outbound = OutboundGroupSession() - inbound = InboundGroupSession(outbound.session_key) - del inbound - - def test_invalid_unicode_decrypt(self): - outbound = OutboundGroupSession() - inbound = InboundGroupSession(outbound.session_key) - - text = outbound.encrypt(b"\xed") - plaintext, _ = inbound.decrypt(text) - - print(plaintext) - assert plaintext == u"�" - - plaintext, _ = inbound.decrypt(text, "ignore") - assert plaintext == "" diff --git a/python/tests/pk_test.py b/python/tests/pk_test.py deleted file mode 100644 index ef87465..0000000 --- a/python/tests/pk_test.py +++ /dev/null @@ -1,65 +0,0 @@ -# -*- coding: utf-8 -*- -import pytest - -from olm import (PkDecryption, PkDecryptionError, PkEncryption, PkSigning, - ed25519_verify) - - -class TestClass(object): - def test_invalid_encryption(self): - with pytest.raises(ValueError): - PkEncryption("") - - def test_decrytion(self): - decryption = PkDecryption() - encryption = PkEncryption(decryption.public_key) - plaintext = "It's a secret to everybody." - message = encryption.encrypt(plaintext) - decrypted_plaintext = decryption.decrypt(message) - isinstance(decrypted_plaintext, str) - assert plaintext == decrypted_plaintext - - def test_invalid_decrytion(self): - decryption = PkDecryption() - encryption = PkEncryption(decryption.public_key) - plaintext = "It's a secret to everybody." - message = encryption.encrypt(plaintext) - message.ephemeral_key = "?" - with pytest.raises(PkDecryptionError): - decryption.decrypt(message) - - def test_pickling(self): - decryption = PkDecryption() - encryption = PkEncryption(decryption.public_key) - plaintext = "It's a secret to everybody." - message = encryption.encrypt(plaintext) - - pickle = decryption.pickle() - unpickled = PkDecryption.from_pickle(pickle) - decrypted_plaintext = unpickled.decrypt(message) - assert plaintext == decrypted_plaintext - - def test_invalid_unpickling(self): - with pytest.raises(ValueError): - PkDecryption.from_pickle("") - - def test_invalid_pass_pickling(self): - decryption = PkDecryption() - pickle = decryption.pickle("Secret") - - with pytest.raises(PkDecryptionError): - PkDecryption.from_pickle(pickle, "Not secret") - - def test_signing(self): - seed = PkSigning.generate_seed() - signing = PkSigning(seed) - message = "This statement is true" - signature = signing.sign(message) - ed25519_verify(signing.public_key, message, signature) - - def test_invalid_unicode_decrypt(self): - decryption = PkDecryption() - encryption = PkEncryption(decryption.public_key) - message = encryption.encrypt(b"\xed") - plaintext = decryption.decrypt(message) - assert plaintext == u"�" diff --git a/python/tests/sas_test.py b/python/tests/sas_test.py deleted file mode 100644 index 9001e67..0000000 --- a/python/tests/sas_test.py +++ /dev/null @@ -1,99 +0,0 @@ -from builtins import bytes - -import pytest - -from olm import OlmSasError, Sas - -MESSAGE = "Test message" -EXTRA_INFO = "extra_info" - - -class TestClass(object): - def test_sas_creation(self): - sas = Sas() - assert sas.pubkey - - def test_other_key_setting(self): - sas_alice = Sas() - sas_bob = Sas() - - assert not sas_alice.other_key_set - sas_alice.set_their_pubkey(sas_bob.pubkey) - assert sas_alice.other_key_set - - def test_bytes_generating(self): - sas_alice = Sas() - sas_bob = Sas(sas_alice.pubkey) - - assert sas_bob.other_key_set - - with pytest.raises(OlmSasError): - sas_alice.generate_bytes(EXTRA_INFO, 5) - - sas_alice.set_their_pubkey(sas_bob.pubkey) - - with pytest.raises(ValueError): - sas_alice.generate_bytes(EXTRA_INFO, 0) - - alice_bytes = sas_alice.generate_bytes(EXTRA_INFO, 5) - bob_bytes = sas_bob.generate_bytes(EXTRA_INFO, 5) - - assert alice_bytes == bob_bytes - - def test_mac_generating(self): - sas_alice = Sas() - sas_bob = Sas() - - with pytest.raises(OlmSasError): - sas_alice.calculate_mac(MESSAGE, EXTRA_INFO) - - sas_alice.set_their_pubkey(sas_bob.pubkey) - sas_bob.set_their_pubkey(sas_alice.pubkey) - - alice_mac = sas_alice.calculate_mac(MESSAGE, EXTRA_INFO) - bob_mac = sas_bob.calculate_mac(MESSAGE, EXTRA_INFO) - - assert alice_mac == bob_mac - - def test_cross_language_mac(self): - """Test MAC generating with a predefined key pair. - - This test imports a private and public key from the C test and checks - if we are getting the same MAC that the C code calculated. - """ - alice_private = [ - 0x77, 0x07, 0x6D, 0x0A, 0x73, 0x18, 0xA5, 0x7D, - 0x3C, 0x16, 0xC1, 0x72, 0x51, 0xB2, 0x66, 0x45, - 0xDF, 0x4C, 0x2F, 0x87, 0xEB, 0xC0, 0x99, 0x2A, - 0xB1, 0x77, 0xFB, 0xA5, 0x1D, 0xB9, 0x2C, 0x2A - ] - - bob_key = "3p7bfXt9wbTTW2HC7OQ1Nz+DQ8hbeGdNrfx+FG+IK08" - message = "Hello world!" - extra_info = "MAC" - expected_mac = "2nSMTXM+TStTU3RUVTNSVVZUTlNWVlpVVGxOV1ZscFY" - - sas_alice = Sas() - sas_alice._create_sas(bytes(alice_private), 32) - sas_alice.set_their_pubkey(bob_key) - - alice_mac = sas_alice.calculate_mac(message, extra_info) - - assert alice_mac == expected_mac - - def test_long_mac_generating(self): - sas_alice = Sas() - sas_bob = Sas() - - with pytest.raises(OlmSasError): - sas_alice.calculate_mac_long_kdf(MESSAGE, EXTRA_INFO) - - sas_alice.set_their_pubkey(sas_bob.pubkey) - sas_bob.set_their_pubkey(sas_alice.pubkey) - - alice_mac = sas_alice.calculate_mac_long_kdf(MESSAGE, EXTRA_INFO) - bob_mac = sas_bob.calculate_mac_long_kdf(MESSAGE, EXTRA_INFO) - bob_short_mac = sas_bob.calculate_mac(MESSAGE, EXTRA_INFO) - - assert alice_mac == bob_mac - assert alice_mac != bob_short_mac diff --git a/python/tests/session_test.py b/python/tests/session_test.py deleted file mode 100644 index b856585..0000000 --- a/python/tests/session_test.py +++ /dev/null @@ -1,152 +0,0 @@ -# -*- coding: utf-8 -*- -import pytest - -from olm import (Account, InboundSession, OlmMessage, OlmPreKeyMessage, - OlmSessionError, OutboundSession, Session) - - -class TestClass(object): - def _create_session(self): - alice = Account() - bob = Account() - bob.generate_one_time_keys(1) - id_key = bob.identity_keys["curve25519"] - one_time = list(bob.one_time_keys["curve25519"].values())[0] - session = OutboundSession(alice, id_key, one_time) - return alice, bob, session - - def test_session_create(self): - _, _, session_1 = self._create_session() - _, _, session_2 = self._create_session() - assert session_1 - assert session_2 - assert session_1.id != session_2.id - assert isinstance(session_1.id, str) - - def test_session_clear(self): - _, _, session = self._create_session() - del session - - def test_invalid_session_create(self): - with pytest.raises(TypeError): - Session() - - def test_session_pickle(self): - alice, bob, session = self._create_session() - Session.from_pickle(session.pickle()).id == session.id - - def test_session_invalid_pickle(self): - with pytest.raises(ValueError): - Session.from_pickle(b"") - - def test_wrong_passphrase_pickle(self): - alice, bob, session = self._create_session() - passphrase = "It's a secret to everybody" - pickle = alice.pickle(passphrase) - - with pytest.raises(OlmSessionError): - Session.from_pickle(pickle, "") - - def test_encrypt(self): - plaintext = "It's a secret to everybody" - alice, bob, session = self._create_session() - message = session.encrypt(plaintext) - - assert (repr(message) - == "OlmPreKeyMessage({})".format(message.ciphertext)) - - assert (str(message) - == "PRE_KEY {}".format(message.ciphertext)) - - bob_session = InboundSession(bob, message) - assert plaintext == bob_session.decrypt(message) - - def test_empty_message(self): - with pytest.raises(ValueError): - OlmPreKeyMessage("") - empty = OlmPreKeyMessage("x") - empty.ciphertext = "" - alice, bob, session = self._create_session() - - with pytest.raises(ValueError): - session.decrypt(empty) - - def test_inbound_with_id(self): - plaintext = "It's a secret to everybody" - alice, bob, session = self._create_session() - message = session.encrypt(plaintext) - alice_id = alice.identity_keys["curve25519"] - bob_session = InboundSession(bob, message, alice_id) - assert plaintext == bob_session.decrypt(message) - - def test_two_messages(self): - plaintext = "It's a secret to everybody" - alice, bob, session = self._create_session() - message = session.encrypt(plaintext) - alice_id = alice.identity_keys["curve25519"] - bob_session = InboundSession(bob, message, alice_id) - bob.remove_one_time_keys(bob_session) - assert plaintext == bob_session.decrypt(message) - - bob_plaintext = "Grumble, Grumble" - bob_message = bob_session.encrypt(bob_plaintext) - - assert (repr(bob_message) - == "OlmMessage({})".format(bob_message.ciphertext)) - - assert bob_plaintext == session.decrypt(bob_message) - - def test_matches(self): - plaintext = "It's a secret to everybody" - alice, bob, session = self._create_session() - message = session.encrypt(plaintext) - alice_id = alice.identity_keys["curve25519"] - bob_session = InboundSession(bob, message, alice_id) - assert plaintext == bob_session.decrypt(message) - - message_2nd = session.encrypt("Hey! Listen!") - - assert bob_session.matches(message_2nd) is True - assert bob_session.matches(message_2nd, alice_id) is True - - def test_invalid(self): - alice, bob, session = self._create_session() - message = OlmMessage("x") - - with pytest.raises(TypeError): - session.matches(message) - - message = OlmPreKeyMessage("x") - message.ciphertext = "" - - with pytest.raises(ValueError): - session.matches(message) - - with pytest.raises(ValueError): - InboundSession(bob, message) - - with pytest.raises(ValueError): - OutboundSession(alice, "", "x") - - with pytest.raises(ValueError): - OutboundSession(alice, "x", "") - - def test_doesnt_match(self): - plaintext = "It's a secret to everybody" - alice, bob, session = self._create_session() - message = session.encrypt(plaintext) - alice_id = alice.identity_keys["curve25519"] - bob_session = InboundSession(bob, message, alice_id) - - _, _, new_session = self._create_session() - - new_message = new_session.encrypt(plaintext) - assert bob_session.matches(new_message) is False - - def test_invalid_unicode_decrypt(self): - alice, bob, session = self._create_session() - message = session.encrypt(b"\xed") - - bob_session = InboundSession(bob, message) - plaintext = bob_session.decrypt(message) - assert plaintext == u"�" diff --git a/python/tests/utils_test.py b/python/tests/utils_test.py deleted file mode 100644 index 86fb34f..0000000 --- a/python/tests/utils_test.py +++ /dev/null @@ -1,25 +0,0 @@ -import base64 -import hashlib - -from future.utils import bytes_to_native_str - -from olm import sha256 -from olm._compat import to_bytes - - -class TestClass(object): - def test_sha256(self): - input1 = "It's a secret to everybody" - input2 = "It's a secret to nobody" - - first_hash = sha256(input1) - second_hash = sha256(input2) - - hashlib_hash = base64.b64encode( - hashlib.sha256(to_bytes(input1)).digest() - ) - - hashlib_hash = bytes_to_native_str(hashlib_hash[:-1]) - - assert first_hash != second_hash - assert hashlib_hash == first_hash diff --git a/python/tox.ini b/python/tox.ini deleted file mode 100644 index e3a0188..0000000 --- a/python/tox.ini +++ /dev/null @@ -1,43 +0,0 @@ -# content of: tox.ini , put in same dir as setup.py -[tox] -envlist = py27,py36,pypy,{py2,py3}-cov,coverage -[testenv] -basepython = - py27: python2.7 - py36: python3.6 - pypy: pypy - py2: python2.7 - py3: python3.6 - -deps = -rrequirements.txt - -rtest-requirements.txt - -passenv = TOXENV CI TRAVIS TRAVIS_* -commands = pytest --benchmark-disable -usedevelop = True - -[testenv:py2-cov] -commands = - pytest --cov-report term-missing --cov=olm --benchmark-disable --cov-branch -setenv = - COVERAGE_FILE=.coverage.py2 - -[testenv:py3-cov] -commands = - py.test --cov=olm --cov-report term-missing --benchmark-disable --cov-branch -setenv = - COVERAGE_FILE=.coverage.py3 - -[testenv:coverage] -basepython = python3.6 -commands = - coverage erase - coverage combine - coverage xml - coverage report --show-missing - codecov -e TOXENV -deps = - coverage - codecov>=1.4.0 -setenv = - COVERAGE_FILE=.coverage |