aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authordec05eba <dec05eba@protonmail.com>2020-11-05 01:45:06 +0100
committerdec05eba <dec05eba@protonmail.com>2020-11-05 01:45:06 +0100
commit2a8202e74846d191a321cca1202175af9db6107d (patch)
treea6f455caf07da1186851f343a237a4c4e4484f46 /python
parent8efa0ec17d8c262f9c3fd7603e8074f74a053708 (diff)
Convert to sibs projectHEADmaster
Diffstat (limited to 'python')
-rw-r--r--python/.gitignore12
-rw-r--r--python/MANIFEST.in5
-rw-r--r--python/Makefile57
-rw-r--r--python/README.md161
-rw-r--r--python/docs/Makefile20
-rw-r--r--python/docs/conf.py165
-rw-r--r--python/docs/index.html1
-rw-r--r--python/docs/index.rst19
-rw-r--r--python/docs/make.bat36
-rw-r--r--python/docs/olm.rst34
-rw-r--r--python/dummy/README2
-rw-r--r--python/dummy/stddef.h0
-rw-r--r--python/dummy/stdint.h0
-rw-r--r--python/olm/__init__.py48
-rw-r--r--python/olm/__version__.py9
-rw-r--r--python/olm/_compat.py67
-rw-r--r--python/olm/_finalize.py65
-rw-r--r--python/olm/account.py271
-rw-r--r--python/olm/group_session.py532
-rw-r--r--python/olm/pk.py461
-rw-r--r--python/olm/sas.py245
-rw-r--r--python/olm/session.py495
-rw-r--r--python/olm/utility.py151
-rw-r--r--python/olm_build.py62
-rw-r--r--python/requirements.txt3
-rw-r--r--python/setup.cfg8
-rw-r--r--python/setup.py31
-rw-r--r--python/test-requirements.txt6
-rw-r--r--python/tests/account_test.py114
-rw-r--r--python/tests/group_session_test.py128
-rw-r--r--python/tests/pk_test.py65
-rw-r--r--python/tests/sas_test.py99
-rw-r--r--python/tests/session_test.py152
-rw-r--r--python/tests/utils_test.py25
-rw-r--r--python/tox.ini43
35 files changed, 0 insertions, 3592 deletions
diff --git a/python/.gitignore b/python/.gitignore
deleted file mode 100644
index 1ff9f49..0000000
--- a/python/.gitignore
+++ /dev/null
@@ -1,12 +0,0 @@
-.coverage
-.mypy_cache/
-.ropeproject/
-.pytest_cache/
-packages/
-python_olm.egg-info/
-_libolm*
-__pycache__
-*.pyc
-.hypothesis/
-.tox/
-include/
diff --git a/python/MANIFEST.in b/python/MANIFEST.in
deleted file mode 100644
index 824b377..0000000
--- a/python/MANIFEST.in
+++ /dev/null
@@ -1,5 +0,0 @@
-include include/olm/olm.h
-include include/olm/pk.h
-include include/olm/sas.h
-include Makefile
-include olm_build.py
diff --git a/python/Makefile b/python/Makefile
deleted file mode 100644
index 6bba9cd..0000000
--- a/python/Makefile
+++ /dev/null
@@ -1,57 +0,0 @@
-all: olm-python2 olm-python3
-
-OLM_HEADERS = ../include/olm/olm.h ../include/olm/inbound_group_session.h \
- ../include/olm/outbound_group_session.h \
-
-include/olm/olm.h: $(OLM_HEADERS)
- mkdir -p include/olm
- $(CPP) -I dummy -I ../include -o include/olm/olm.h ../include/olm/olm.h
-# add memset to the header so that we can use it to clear buffers
- echo 'void *memset(void *s, int c, size_t n);' >> include/olm/olm.h
-
-include/olm/pk.h: include/olm/olm.h ../include/olm/pk.h
- $(CPP) -I dummy -I ../include -o include/olm/pk.h ../include/olm/pk.h
-
-include/olm/sas.h: include/olm/olm.h ../include/olm/sas.h
- $(CPP) -I dummy -I ../include -o include/olm/sas.h ../include/olm/sas.h
-
-headers: include/olm/olm.h include/olm/pk.h include/olm/sas.h
-
-olm-python2: headers
- DEVELOP=$(DEVELOP) python2 setup.py build
-
-olm-python3: headers
- DEVELOP=$(DEVELOP) python3 setup.py build
-
-install: install-python2 install-python3
-
-install-python2: olm-python2
- python2 setup.py install --skip-build -O1 --root=$(DESTDIR)
-
-install-python3: olm-python3
- python3 setup.py install --skip-build -O1 --root=$(DESTDIR)
-
-test: olm-python2 olm-python3
- rm -rf install-temp
- mkdir -p install-temp/2 install-temp/3
- PYTHONPATH=install-temp/2 python2 setup.py install --skip-build --install-lib install-temp/2 --install-script install-temp/bin
- PYTHONPATH=install-temp/3 python3 setup.py install --skip-build --install-lib install-temp/3 --install-script install-temp/bin
- PYTHONPATH=install-temp/3 python3 -m pytest
- PYTHONPATH=install-temp/2 python2 -m pytest
- PYTHONPATH=install-temp/3 python3 -m pytest --flake8 --benchmark-disable
- PYTHONPATH=install-temp/3 python3 -m pytest --isort --benchmark-disable
- PYTHONPATH=install-temp/3 python3 -m pytest --cov --cov-branch --benchmark-disable
- rm -rf install-temp
-
-isort:
- isort -y -p olm
-
-clean:
- rm -rf python_olm.egg-info/ dist/ __pycache__/
- rm -rf *.so _libolm.o
- rm -rf packages/
- rm -rf build/
- rm -rf install-temp/
- rm -rf include/
-
-.PHONY: all olm-python2 olm-python3 install install-python2 install-python3 clean test
diff --git a/python/README.md b/python/README.md
deleted file mode 100644
index b928185..0000000
--- a/python/README.md
+++ /dev/null
@@ -1,161 +0,0 @@
-python-olm
-==========
-
-Python bindings for Olm.
-
-The specification of the Olm cryptographic ratchet which is used for peer to
-peer sessions of this library can be found [here][4].
-
-The specification of the Megolm cryptographic ratchet which is used for group
-sessions of this library can be found [here][5].
-
-An example of the implementation of the Olm and Megolm cryptographic protocol
-can be found in the Matrix protocol for which the implementation guide can be
-found [here][6].
-
-The full API reference can be found [here][7].
-
-# Accounts
-
-Accounts create and hold the central identity of the Olm protocol, they consist of a fingerprint and identity
-key pair. They also produce one time keys that are used to start peer to peer
-encrypted communication channels.
-
-## Account Creation
-
-A new account is created with the Account class, it creates a new Olm key pair.
-The public parts of the key pair are available using the identity_keys property
-of the class.
-
-```python
->>> alice = Account()
->>> alice.identity_keys
-{'curve25519': '2PytGagXercwHjzQETLcMa3JOsaU2qkPIESaqoi59zE',
- 'ed25519': 'HHpOuFYdHwoa54GxSttz9YmaTmbuVU3js92UTUjYJgM'}
-```
-
-
-## One Time keys
-
-One time keys need to be generated before people can start an encrypted peer to
-peer channel to an account.
-
-```python
->>> alice.generate_one_time_keys(1)
->>> alice.one_time_keys
-{'curve25519': {'AAAAAQ': 'KiHoW6CIy905UC4V1Frmwr3VW8bTWkBL4uWtWFFllxM'}}
-```
-
-After the one time keys are published they should be marked as such so they
-aren't reused.
-
-```python
->>> alice.mark_keys_as_published()
->>> alice.one_time_keys
-{'curve25519': {}}
-```
-
-## Pickling
-
-Accounts should be stored for later reuse, storing an account is done with the
-pickle method while the restoring step is done with the from_pickle class
-method.
-
-```python
->>> pickle = alice.pickle()
->>> restored = Account.from_pickle(pickle)
-```
-
-# Sessions
-
-Sessions are used to create an encrypted peer to peer communication channel
-between two accounts.
-
-## Session Creation
-```python
->>> alice = Account()
->>> bob = Account()
->>> bob.generate_one_time_keys(1)
->>> id_key = bob.identity_keys["curve25519"]
->>> one_time = list(bob.one_time_keys["curve25519"].values())[0]
->>> alice_session = OutboundSession(alice, id_key, one_time)
-```
-
-## Encryption
-
-After an outbound session is created an encrypted message can be exchanged:
-
-```python
->>> message = alice_session.encrypt("It's a secret to everybody")
->>> message.ciphertext
-'AwogkL7RoakT9gnjcZMra+y39WXKRmnxBPEaEp6OSueIA0cSIJxGpBoP8YZ+CGweXQ10LujbXMgK88
-xG/JZMQJ5ulK9ZGiC8TYrezNYr3qyIBLlecXr/9wnegvJaSFDmWDVOcf4XfyI/AwogqIZfAklRXGC5b
-ZJcZxVxQGgJ8Dz4OQII8k0Dp8msUXwQACIQvagY1dO55Qvnk5PZ2GF+wdKnvj6Zxl2g'
->>> message.message_type
-0
-```
-
-After the message is transfered, bob can create an InboundSession to decrypt the
-message.
-
-```python
->>> bob_session = InboundSession(bob, message)
->>> bob_session.decrypt(message)
-"It's a secret to everybody"
-```
-
-## Pickling
-
-Sessions like accounts can be stored for later use the API is the same as for
-accounts.
-
-```python
->>> pickle = session.pickle()
->>> restored = Session.from_pickle(pickle)
-```
-
-# Group Sessions
-
-Group Sessions are used to create a one-to-many encrypted communication channel.
-The group session key needs to be shared with all participants that should be able
-to decrypt the group messages. Another thing to notice is that, since the group
-session key is ratcheted every time a message is encrypted, the session key should
-be shared before any messages are encrypted.
-
-## Group Session Creation
-
-Group sessions aren't bound to an account like peer-to-peer sessions so their
-creation is straightforward.
-
-```python
->>> alice_group = OutboundGroupSession()
->>> bob_inbound_group = InboundGroupSession(alice_group.session_key)
-```
-
-## Group Encryption
-
-Group encryption is pretty simple. The important part is to share the session
-key with all participants over a secure channel (e.g. peer-to-peer Olm
-sessions).
-
-```python
->>> message = alice_group.encrypt("It's a secret to everybody")
->>> bob_inbound_group.decrypt(message)
-("It's a secret to everybody", 0)
-```
-
-## Pickling
-
-Pickling works the same way as for peer-to-peer Olm sessions.
-
-```python
->>> pickle = session.pickle()
->>> restored = InboundGroupSession.from_pickle(pickle)
-```
-[1]: https://git.matrix.org/git/olm/about/
-[2]: https://git.matrix.org/git/olm/tree/python?id=f8c61b8f8432d0b0b38d57f513c5048fb42f22ab
-[3]: https://cffi.readthedocs.io/en/latest/
-[4]: https://git.matrix.org/git/olm/about/docs/olm.rst
-[5]: https://git.matrix.org/git/olm/about/docs/megolm.rst
-[6]: https://matrix.org/docs/guides/e2e_implementation.html
-[7]: https://poljar.github.io/python-olm/html/index.html
diff --git a/python/docs/Makefile b/python/docs/Makefile
deleted file mode 100644
index a72f9d9..0000000
--- a/python/docs/Makefile
+++ /dev/null
@@ -1,20 +0,0 @@
-# Minimal makefile for Sphinx documentation
-#
-
-# You can set these variables from the command line.
-SPHINXOPTS =
-SPHINXBUILD = sphinx-build
-SPHINXPROJ = olm
-SOURCEDIR = .
-BUILDDIR = .
-
-# Put it first so that "make" without argument is like "make help".
-help:
- @$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
-
-.PHONY: help Makefile
-
-# Catch-all target: route all unknown targets to Sphinx using the new
-# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS).
-%: Makefile
- @$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
diff --git a/python/docs/conf.py b/python/docs/conf.py
deleted file mode 100644
index ce2a88d..0000000
--- a/python/docs/conf.py
+++ /dev/null
@@ -1,165 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Configuration file for the Sphinx documentation builder.
-#
-# This file does only contain a selection of the most common options. For a
-# full list see the documentation:
-# http://www.sphinx-doc.org/en/master/config
-
-# -- Path setup --------------------------------------------------------------
-
-# If extensions (or modules to document with autodoc) are in another directory,
-# add these directories to sys.path here. If the directory is relative to the
-# documentation root, use os.path.abspath to make it absolute, like shown here.
-#
-import os
-import sys
-
-sys.path.insert(0, os.path.abspath('../'))
-
-
-# -- Project information -----------------------------------------------------
-
-project = 'python-olm'
-copyright = '2018, Damir Jelić'
-author = 'Damir Jelić'
-
-# The short X.Y version
-version = ''
-# The full version, including alpha/beta/rc tags
-release = '2.2'
-
-
-# -- General configuration ---------------------------------------------------
-
-# If your documentation needs a minimal Sphinx version, state it here.
-#
-# needs_sphinx = '1.0'
-
-# Add any Sphinx extension module names here, as strings. They can be
-# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
-# ones.
-extensions = [
- 'sphinx.ext.autodoc',
- 'sphinx.ext.doctest',
- 'sphinx.ext.coverage',
- 'sphinx.ext.viewcode',
- 'sphinx.ext.githubpages',
- 'sphinx.ext.napoleon',
-]
-
-# Add any paths that contain templates here, relative to this directory.
-templates_path = ['_templates']
-
-# The suffix(es) of source filenames.
-# You can specify multiple suffix as a list of string:
-#
-# source_suffix = ['.rst', '.md']
-source_suffix = '.rst'
-
-# The master toctree document.
-master_doc = 'index'
-
-# The language for content autogenerated by Sphinx. Refer to documentation
-# for a list of supported languages.
-#
-# This is also used if you do content translation via gettext catalogs.
-# Usually you set "language" from the command line for these cases.
-language = None
-
-# List of patterns, relative to source directory, that match files and
-# directories to ignore when looking for source files.
-# This pattern also affects html_static_path and html_extra_path .
-exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store']
-
-# The name of the Pygments (syntax highlighting) style to use.
-pygments_style = 'sphinx'
-
-
-# -- Options for HTML output -------------------------------------------------
-
-# The theme to use for HTML and HTML Help pages. See the documentation for
-# a list of builtin themes.
-#
-html_theme = 'alabaster'
-
-# Theme options are theme-specific and customize the look and feel of a theme
-# further. For a list of options available for each theme, see the
-# documentation.
-#
-# html_theme_options = {}
-
-# Add any paths that contain custom static files (such as style sheets) here,
-# relative to this directory. They are copied after the builtin static files,
-# so a file named "default.css" will overwrite the builtin "default.css".
-html_static_path = ['_static']
-
-# Custom sidebar templates, must be a dictionary that maps document names
-# to template names.
-#
-# The default sidebars (for documents that don't match any pattern) are
-# defined by theme itself. Builtin themes are using these templates by
-# default: ``['localtoc.html', 'relations.html', 'sourcelink.html',
-# 'searchbox.html']``.
-#
-# html_sidebars = {}
-
-
-# -- Options for HTMLHelp output ---------------------------------------------
-
-# Output file base name for HTML help builder.
-htmlhelp_basename = 'olmdoc'
-
-
-# -- Options for LaTeX output ------------------------------------------------
-
-latex_elements = {
- # The paper size ('letterpaper' or 'a4paper').
- #
- # 'papersize': 'letterpaper',
-
- # The font size ('10pt', '11pt' or '12pt').
- #
- # 'pointsize': '10pt',
-
- # Additional stuff for the LaTeX preamble.
- #
- # 'preamble': '',
-
- # Latex figure (float) alignment
- #
- # 'figure_align': 'htbp',
-}
-
-# Grouping the document tree into LaTeX files. List of tuples
-# (source start file, target name, title,
-# author, documentclass [howto, manual, or own class]).
-latex_documents = [
- (master_doc, 'olm.tex', 'olm Documentation',
- 'Damir Jelić', 'manual'),
-]
-
-
-# -- Options for manual page output ------------------------------------------
-
-# One entry per manual page. List of tuples
-# (source start file, name, description, authors, manual section).
-man_pages = [
- (master_doc, 'olm', 'olm Documentation',
- [author], 1)
-]
-
-
-# -- Options for Texinfo output ----------------------------------------------
-
-# Grouping the document tree into Texinfo files. List of tuples
-# (source start file, target name, title, author,
-# dir menu entry, description, category)
-texinfo_documents = [
- (master_doc, 'olm', 'olm Documentation',
- author, 'olm', 'One line description of project.',
- 'Miscellaneous'),
-]
-
-
-# -- Extension configuration -------------------------------------------------
diff --git a/python/docs/index.html b/python/docs/index.html
deleted file mode 100644
index 9644bbb..0000000
--- a/python/docs/index.html
+++ /dev/null
@@ -1 +0,0 @@
-<meta http-equiv="refresh" content="0; url=./html/index.html" />
diff --git a/python/docs/index.rst b/python/docs/index.rst
deleted file mode 100644
index 39e6657..0000000
--- a/python/docs/index.rst
+++ /dev/null
@@ -1,19 +0,0 @@
-.. olm documentation master file, created by
- sphinx-quickstart on Sun Jun 17 15:57:08 2018.
-
-Welcome to olm's documentation!
-===============================
-
-.. toctree::
- Olm API reference <olm.rst>
- :maxdepth: 2
- :caption: Contents:
-
-
-
-Indices and tables
-==================
-
-* :ref:`genindex`
-* :ref:`modindex`
-* :ref:`search`
diff --git a/python/docs/make.bat b/python/docs/make.bat
deleted file mode 100644
index 1c5b4d8..0000000
--- a/python/docs/make.bat
+++ /dev/null
@@ -1,36 +0,0 @@
-@ECHO OFF
-
-pushd %~dp0
-
-REM Command file for Sphinx documentation
-
-if "%SPHINXBUILD%" == "" (
- set SPHINXBUILD=sphinx-build
-)
-set SOURCEDIR=.
-set BUILDDIR=_build
-set SPHINXPROJ=olm
-
-if "%1" == "" goto help
-
-%SPHINXBUILD% >NUL 2>NUL
-if errorlevel 9009 (
- echo.
- echo.The 'sphinx-build' command was not found. Make sure you have Sphinx
- echo.installed, then set the SPHINXBUILD environment variable to point
- echo.to the full path of the 'sphinx-build' executable. Alternatively you
- echo.may add the Sphinx directory to PATH.
- echo.
- echo.If you don't have Sphinx installed, grab it from
- echo.http://sphinx-doc.org/
- exit /b 1
-)
-
-%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS%
-goto end
-
-:help
-%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS%
-
-:end
-popd
diff --git a/python/docs/olm.rst b/python/docs/olm.rst
deleted file mode 100644
index 9d8edf0..0000000
--- a/python/docs/olm.rst
+++ /dev/null
@@ -1,34 +0,0 @@
-olm package
-===========
-
-olm.account module
-------------------
-
-.. automodule:: olm.account
- :members:
- :undoc-members:
- :show-inheritance:
-
-olm.group\_session module
--------------------------
-
-.. automodule:: olm.group_session
- :members:
- :undoc-members:
- :show-inheritance:
-
-olm.session module
-------------------
-
-.. automodule:: olm.session
- :members:
- :undoc-members:
- :show-inheritance:
-
-olm.utility module
-------------------
-
-.. automodule:: olm.utility
- :members:
- :undoc-members:
- :show-inheritance:
diff --git a/python/dummy/README b/python/dummy/README
deleted file mode 100644
index 61039f3..0000000
--- a/python/dummy/README
+++ /dev/null
@@ -1,2 +0,0 @@
-Dummy header files, so that we can generate the function list for cffi from the
-olm header files. \ No newline at end of file
diff --git a/python/dummy/stddef.h b/python/dummy/stddef.h
deleted file mode 100644
index e69de29..0000000
--- a/python/dummy/stddef.h
+++ /dev/null
diff --git a/python/dummy/stdint.h b/python/dummy/stdint.h
deleted file mode 100644
index e69de29..0000000
--- a/python/dummy/stdint.h
+++ /dev/null
diff --git a/python/olm/__init__.py b/python/olm/__init__.py
deleted file mode 100644
index 26257a5..0000000
--- a/python/olm/__init__.py
+++ /dev/null
@@ -1,48 +0,0 @@
-# -*- coding: utf-8 -*-
-# libolm python bindings
-# Copyright © 2015-2017 OpenMarket Ltd
-# Copyright © 2018 Damir Jelić <poljar@termina.org.uk>
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""
-Olm Python bindings
-~~~~~~~~~~~~~~~~~~~~~
-| This package implements python bindings for the libolm C library.
-| © Copyright 2015-2017 by OpenMarket Ltd
-| © Copyright 2018 by Damir Jelić
-"""
-from .utility import ed25519_verify, OlmVerifyError, OlmHashError, sha256
-from .account import Account, OlmAccountError
-from .session import (
- Session,
- InboundSession,
- OutboundSession,
- OlmSessionError,
- OlmMessage,
- OlmPreKeyMessage
-)
-from .group_session import (
- InboundGroupSession,
- OutboundGroupSession,
- OlmGroupSessionError
-)
-from .pk import (
- PkMessage,
- PkEncryption,
- PkDecryption,
- PkSigning,
- PkEncryptionError,
- PkDecryptionError,
- PkSigningError
-)
-from .sas import Sas, OlmSasError
diff --git a/python/olm/__version__.py b/python/olm/__version__.py
deleted file mode 100644
index 498f89f..0000000
--- a/python/olm/__version__.py
+++ /dev/null
@@ -1,9 +0,0 @@
-__title__ = "python-olm"
-__description__ = ("python CFFI bindings for the olm "
- "cryptographic ratchet library")
-__url__ = "https://github.com/poljar/python-olm"
-__version__ = "3.2.1"
-__author__ = "Damir Jelić"
-__author_email__ = "poljar@termina.org.uk"
-__license__ = "Apache 2.0"
-__copyright__ = "Copyright 2018-2019 Damir Jelić"
diff --git a/python/olm/_compat.py b/python/olm/_compat.py
deleted file mode 100644
index 2ceaa33..0000000
--- a/python/olm/_compat.py
+++ /dev/null
@@ -1,67 +0,0 @@
-# -*- coding: utf-8 -*-
-# libolm python bindings
-# Copyright © 2015-2017 OpenMarket Ltd
-# Copyright © 2018 Damir Jelić <poljar@termina.org.uk>
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from builtins import bytes, str
-from typing import AnyStr
-
-try:
- import secrets
- URANDOM = secrets.token_bytes # pragma: no cover
-except ImportError: # pragma: no cover
- from os import urandom
- URANDOM = urandom # type: ignore
-
-
-def to_bytearray(string):
- # type: (AnyStr) -> bytes
- if isinstance(string, bytes):
- return bytearray(string)
- elif isinstance(string, str):
- return bytearray(string, "utf-8")
-
- raise TypeError("Invalid type {}".format(type(string)))
-
-
-def to_bytes(string):
- # type: (AnyStr) -> bytes
- if isinstance(string, bytes):
- return string
- elif isinstance(string, str):
- return bytes(string, "utf-8")
-
- raise TypeError("Invalid type {}".format(type(string)))
-
-
-def to_unicode_str(byte_string, errors="replace"):
- """Turn a byte string into a unicode string.
-
- Should be used everywhere where the input byte string might not be trusted
- and may contain invalid unicode values.
-
- Args:
- byte_string (bytes): The bytestring that will be converted to a native
- string.
- errors (str, optional): The error handling scheme that should be used
- to handle unicode decode errors. Can be one of "strict" (raise an
- UnicodeDecodeError exception, "ignore" (remove the offending
- characters), "replace" (replace the offending character with
- U+FFFD), "xmlcharrefreplace" as well as any other name registered
- with codecs.register_error that can handle UnicodeEncodeErrors.
-
- Returns the decoded native string.
- """
- return byte_string.decode(encoding="utf-8", errors=errors)
diff --git a/python/olm/_finalize.py b/python/olm/_finalize.py
deleted file mode 100644
index 9f467bc..0000000
--- a/python/olm/_finalize.py
+++ /dev/null
@@ -1,65 +0,0 @@
-# The MIT License (MIT)
-# Copyright (c) 2010 Benjamin Peterson <benjamin@python.org>
-
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
-# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
-# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
-# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
-# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
-# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
-# OR OTHER DEALINGS IN THE SOFTWARE.
-
-"""Finalization with weakrefs
-
-This is designed for avoiding __del__.
-"""
-from __future__ import print_function
-
-import sys
-import traceback
-import weakref
-
-__author__ = "Benjamin Peterson <benjamin@python.org>"
-
-
-class OwnerRef(weakref.ref):
- """A simple weakref.ref subclass, so attributes can be added."""
- pass
-
-
-def _run_finalizer(ref):
- """Internal weakref callback to run finalizers"""
- del _finalize_refs[id(ref)]
- finalizer = ref.finalizer
- item = ref.item
- try:
- finalizer(item)
- except Exception: # pragma: no cover
- print("Exception running {}:".format(finalizer), file=sys.stderr)
- traceback.print_exc()
-
-
-_finalize_refs = {}
-
-
-def track_for_finalization(owner, item, finalizer):
- """Register an object for finalization.
-
- ``owner`` is the the object which is responsible for ``item``.
- ``finalizer`` will be called with ``item`` as its only argument when
- ``owner`` is destroyed by the garbage collector.
- """
- ref = OwnerRef(owner, _run_finalizer)
- ref.item = item
- ref.finalizer = finalizer
- _finalize_refs[id(ref)] = ref
diff --git a/python/olm/account.py b/python/olm/account.py
deleted file mode 100644
index 8455655..0000000
--- a/python/olm/account.py
+++ /dev/null
@@ -1,271 +0,0 @@
-# -*- coding: utf-8 -*-
-# libolm python bindings
-# Copyright © 2015-2017 OpenMarket Ltd
-# Copyright © 2018 Damir Jelić <poljar@termina.org.uk>
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""libolm Account module.
-
-This module contains the account part of the Olm library. It contains a single
-Account class which handles the creation of new accounts as well as the storing
-and restoring of them.
-
-Examples:
- >>> acc = Account()
- >>> account.identity_keys()
- >>> account.generate_one_time_keys(1)
-
-"""
-
-import json
-# pylint: disable=redefined-builtin,unused-import
-from builtins import bytes, super
-from typing import AnyStr, Dict, Optional, Type
-
-from future.utils import bytes_to_native_str
-
-# pylint: disable=no-name-in-module
-from _libolm import ffi, lib # type: ignore
-
-from ._compat import URANDOM, to_bytearray
-from ._finalize import track_for_finalization
-
-# This is imported only for type checking purposes
-if False:
- from .session import Session # pragma: no cover
-
-
-def _clear_account(account):
- # type: (ffi.cdata) -> None
- lib.olm_clear_account(account)
-
-
-class OlmAccountError(Exception):
- """libolm Account error exception."""
-
-
-class Account(object):
- """libolm Account class."""
-
- def __new__(cls):
- # type: (Type[Account]) -> Account
- obj = super().__new__(cls)
- obj._buf = ffi.new("char[]", lib.olm_account_size())
- obj._account = lib.olm_account(obj._buf)
- track_for_finalization(obj, obj._account, _clear_account)
- return obj
-
- def __init__(self):
- # type: () -> None
- """Create a new Olm account.
-
- Creates a new account and its matching identity key pair.
-
- Raises OlmAccountError on failure. If there weren't enough random bytes
- for the account creation the error message for the exception will be
- NOT_ENOUGH_RANDOM.
- """
- # This is needed to silence mypy not knowing the type of _account.
- # There has to be a better way for this.
- if False: # pragma: no cover
- self._account = self._account # type: ffi.cdata
-
- random_length = lib.olm_create_account_random_length(self._account)
- random = URANDOM(random_length)
-
- self._check_error(
- lib.olm_create_account(self._account, ffi.from_buffer(random),
- random_length))
-
-
- def _check_error(self, ret):
- # type: (int) -> None
- if ret != lib.olm_error():
- return
-
- last_error = bytes_to_native_str(
- ffi.string((lib.olm_account_last_error(self._account))))
-
- raise OlmAccountError(last_error)
-
- def pickle(self, passphrase=""):
- # type: (Optional[str]) -> bytes
- """Store an Olm account.
-
- Stores an account as a base64 string. Encrypts the account using the
- supplied passphrase. Returns a byte object containing the base64
- encoded string of the pickled account. Raises OlmAccountError on
- failure.
-
- Args:
- passphrase(str, optional): The passphrase to be used to encrypt
- the account.
- """
- byte_key = bytearray(passphrase, "utf-8") if passphrase else b""
-
- pickle_length = lib.olm_pickle_account_length(self._account)
- pickle_buffer = ffi.new("char[]", pickle_length)
-
- try:
- self._check_error(
- lib.olm_pickle_account(self._account,
- ffi.from_buffer(byte_key),
- len(byte_key),
- pickle_buffer,
- pickle_length))
- finally:
- # zero out copies of the passphrase
- for i in range(0, len(byte_key)):
- byte_key[i] = 0
-
- return ffi.unpack(pickle_buffer, pickle_length)
-
- @classmethod
- def from_pickle(cls, pickle, passphrase=""):
- # type: (bytes, Optional[str]) -> Account
- """Load a previously stored olm account.
-
- Loads an account from a pickled base64-encoded string and returns an
- Account object. Decrypts the account using the supplied passphrase.
- Raises OlmAccountError on failure. If the passphrase doesn't match the
- one used to encrypt the account then the error message for the
- exception will be "BAD_ACCOUNT_KEY". If the base64 couldn't be decoded
- then the error message will be "INVALID_BASE64".
-
- Args:
- pickle(bytes): Base64 encoded byte string containing the pickled
- account
- passphrase(str, optional): The passphrase used to encrypt the
- account.
- """
- if not pickle:
- raise ValueError("Pickle can't be empty")
-
- byte_key = bytearray(passphrase, "utf-8") if passphrase else b""
- # copy because unpickle will destroy the buffer
- pickle_buffer = ffi.new("char[]", pickle)
-
- obj = cls.__new__(cls)
-
- try:
- ret = lib.olm_unpickle_account(obj._account,
- ffi.from_buffer(byte_key),
- len(byte_key),
- pickle_buffer,
- len(pickle))
- obj._check_error(ret)
- finally:
- for i in range(0, len(byte_key)):
- byte_key[i] = 0
-
- return obj
-
- @property
- def identity_keys(self):
- # type: () -> Dict[str, str]
- """dict: Public part of the identity keys of the account."""
- out_length = lib.olm_account_identity_keys_length(self._account)
- out_buffer = ffi.new("char[]", out_length)
-
- self._check_error(
- lib.olm_account_identity_keys(self._account, out_buffer,
- out_length))
- return json.loads(ffi.unpack(out_buffer, out_length).decode("utf-8"))
-
- def sign(self, message):
- # type: (AnyStr) -> str
- """Signs a message with this account.
-
- Signs a message with the private ed25519 identity key of this account.
- Returns the signature.
- Raises OlmAccountError on failure.
-
- Args:
- message(str): The message to sign.
- """
- bytes_message = to_bytearray(message)
- out_length = lib.olm_account_signature_length(self._account)
- out_buffer = ffi.new("char[]", out_length)
-
- try:
- self._check_error(
- lib.olm_account_sign(self._account,
- ffi.from_buffer(bytes_message),
- len(bytes_message), out_buffer,
- out_length))
- finally:
- # clear out copies of the message, which may be plaintext
- if bytes_message is not message:
- for i in range(0, len(bytes_message)):
- bytes_message[i] = 0
-
- return bytes_to_native_str(ffi.unpack(out_buffer, out_length))
-
- @property
- def max_one_time_keys(self):
- # type: () -> int
- """int: The maximum number of one-time keys the account can store."""
- return lib.olm_account_max_number_of_one_time_keys(self._account)
-
- def mark_keys_as_published(self):
- # type: () -> None
- """Mark the current set of one-time keys as being published."""
- lib.olm_account_mark_keys_as_published(self._account)
-
- def generate_one_time_keys(self, count):
- # type: (int) -> None
- """Generate a number of new one-time keys.
-
- If the total number of keys stored by this account exceeds
- max_one_time_keys() then the old keys are discarded.
- Raises OlmAccountError on error.
-
- Args:
- count(int): The number of keys to generate.
- """
- random_length = lib.olm_account_generate_one_time_keys_random_length(
- self._account, count)
- random = URANDOM(random_length)
-
- self._check_error(
- lib.olm_account_generate_one_time_keys(
- self._account, count, ffi.from_buffer(random), random_length))
-
- @property
- def one_time_keys(self):
- # type: () -> Dict[str, Dict[str, str]]
- """dict: The public part of the one-time keys for this account."""
- out_length = lib.olm_account_one_time_keys_length(self._account)
- out_buffer = ffi.new("char[]", out_length)
-
- self._check_error(
- lib.olm_account_one_time_keys(self._account, out_buffer,
- out_length))
-
- return json.loads(ffi.unpack(out_buffer, out_length).decode("utf-8"))
-
- def remove_one_time_keys(self, session):
- # type: (Session) -> None
- """Remove used one-time keys.
-
- Removes the one-time keys that the session used from the account.
- Raises OlmAccountError on failure. If the account doesn't have any
- matching one-time keys then the error message of the exception will be
- "BAD_MESSAGE_KEY_ID".
-
- Args:
- session(Session): An Olm Session object that was created with this
- account.
- """
- self._check_error(lib.olm_remove_one_time_keys(self._account,
- session._session))
diff --git a/python/olm/group_session.py b/python/olm/group_session.py
deleted file mode 100644
index 5068192..0000000
--- a/python/olm/group_session.py
+++ /dev/null
@@ -1,532 +0,0 @@
-# -*- coding: utf-8 -*-
-# libolm python bindings
-# Copyright © 2015-2017 OpenMarket Ltd
-# Copyright © 2018 Damir Jelić <poljar@termina.org.uk>
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""libolm Group session module.
-
-This module contains the group session part of the Olm library. It contains two
-classes for creating inbound and outbound group sessions.
-
-Examples:
- >>> outbound = OutboundGroupSession()
- >>> InboundGroupSession(outbound.session_key)
-"""
-
-# pylint: disable=redefined-builtin,unused-import
-from builtins import bytes, super
-from typing import AnyStr, Optional, Tuple, Type
-
-from future.utils import bytes_to_native_str
-
-# pylint: disable=no-name-in-module
-from _libolm import ffi, lib # type: ignore
-
-from ._compat import URANDOM, to_bytearray, to_bytes, to_unicode_str
-from ._finalize import track_for_finalization
-
-
-def _clear_inbound_group_session(session):
- # type: (ffi.cdata) -> None
- lib.olm_clear_inbound_group_session(session)
-
-
-def _clear_outbound_group_session(session):
- # type: (ffi.cdata) -> None
- lib.olm_clear_outbound_group_session(session)
-
-
-class OlmGroupSessionError(Exception):
- """libolm Group session error exception."""
-
-
-class InboundGroupSession(object):
- """Inbound group session for encrypted multiuser communication."""
-
- def __new__(
- cls, # type: Type[InboundGroupSession]
- session_key=None # type: Optional[str]
- ):
- # type: (...) -> InboundGroupSession
- obj = super().__new__(cls)
- obj._buf = ffi.new("char[]", lib.olm_inbound_group_session_size())
- obj._session = lib.olm_inbound_group_session(obj._buf)
- track_for_finalization(obj, obj._session, _clear_inbound_group_session)
- return obj
-
- def __init__(self, session_key):
- # type: (AnyStr) -> None
- """Create a new inbound group session.
- Start a new inbound group session, from a key exported from
- an outbound group session.
-
- Raises OlmGroupSessionError on failure. The error message of the
- exception will be "OLM_INVALID_BASE64" if the session key is not valid
- base64 and "OLM_BAD_SESSION_KEY" if the session key is invalid.
- """
- if False: # pragma: no cover
- self._session = self._session # type: ffi.cdata
-
- byte_session_key = to_bytearray(session_key)
-
- try:
- ret = lib.olm_init_inbound_group_session(
- self._session,
- ffi.from_buffer(byte_session_key), len(byte_session_key)
- )
- finally:
- if byte_session_key is not session_key:
- for i in range(0, len(byte_session_key)):
- byte_session_key[i] = 0
- self._check_error(ret)
-
- def pickle(self, passphrase=""):
- # type: (Optional[str]) -> bytes
- """Store an inbound group session.
-
- Stores a group session as a base64 string. Encrypts the session using
- the supplied passphrase. Returns a byte object containing the base64
- encoded string of the pickled session.
-
- Args:
- passphrase(str, optional): The passphrase to be used to encrypt
- the session.
- """
- byte_passphrase = bytearray(passphrase, "utf-8") if passphrase else b""
-
- pickle_length = lib.olm_pickle_inbound_group_session_length(
- self._session)
- pickle_buffer = ffi.new("char[]", pickle_length)
-
- try:
- ret = lib.olm_pickle_inbound_group_session(
- self._session,
- ffi.from_buffer(byte_passphrase), len(byte_passphrase),
- pickle_buffer, pickle_length
- )
- self._check_error(ret)
- finally:
- # clear out copies of the passphrase
- for i in range(0, len(byte_passphrase)):
- byte_passphrase[i] = 0
-
- return ffi.unpack(pickle_buffer, pickle_length)
-
- @classmethod
- def from_pickle(cls, pickle, passphrase=""):
- # type: (bytes, Optional[str]) -> InboundGroupSession
- """Load a previously stored inbound group session.
-
- Loads an inbound group session from a pickled base64 string and returns
- an InboundGroupSession object. Decrypts the session using the supplied
- passphrase. Raises OlmSessionError on failure. If the passphrase
- doesn't match the one used to encrypt the session then the error
- message for the exception will be "BAD_ACCOUNT_KEY". If the base64
- couldn't be decoded then the error message will be "INVALID_BASE64".
-
- Args:
- pickle(bytes): Base64 encoded byte string containing the pickled
- session
- passphrase(str, optional): The passphrase used to encrypt the
- session
- """
- if not pickle:
- raise ValueError("Pickle can't be empty")
-
- byte_passphrase = bytearray(passphrase, "utf-8") if passphrase else b""
- # copy because unpickle will destroy the buffer
- pickle_buffer = ffi.new("char[]", pickle)
-
- obj = cls.__new__(cls)
-
- try:
- ret = lib.olm_unpickle_inbound_group_session(
- obj._session,
- ffi.from_buffer(byte_passphrase),
- len(byte_passphrase),
- pickle_buffer,
- len(pickle)
- )
- obj._check_error(ret)
- finally:
- # clear out copies of the passphrase
- for i in range(0, len(byte_passphrase)):
- byte_passphrase[i] = 0
-
- return obj
-
- def _check_error(self, ret):
- # type: (int) -> None
- if ret != lib.olm_error():
- return
-
- last_error = bytes_to_native_str(ffi.string(
- lib.olm_inbound_group_session_last_error(self._session)))
-
- raise OlmGroupSessionError(last_error)
-
- def decrypt(self, ciphertext, unicode_errors="replace"):
- # type: (AnyStr, str) -> Tuple[str, int]
- """Decrypt a message
-
- Returns a tuple of the decrypted plain-text and the message index of
- the decrypted message or raises OlmGroupSessionError on failure.
- On failure the error message of the exception will be:
-
- * OLM_INVALID_BASE64 if the message is not valid base64
- * OLM_BAD_MESSAGE_VERSION if the message was encrypted with an
- unsupported version of the protocol
- * OLM_BAD_MESSAGE_FORMAT if the message headers could not be
- decoded
- * OLM_BAD_MESSAGE_MAC if the message could not be verified
- * OLM_UNKNOWN_MESSAGE_INDEX if we do not have a session key
- corresponding to the message's index (i.e., it was sent before
- the session key was shared with us)
-
- Args:
- ciphertext(str): Base64 encoded ciphertext containing the encrypted
- message
- unicode_errors(str, optional): The error handling scheme to use for
- unicode decoding errors. The default is "replace" meaning that
- the character that was unable to decode will be replaced with
- the unicode replacement character (U+FFFD). Other possible
- values are "strict", "ignore" and "xmlcharrefreplace" as well
- as any other name registered with codecs.register_error that
- can handle UnicodeEncodeErrors.
- """
- if not ciphertext:
- raise ValueError("Ciphertext can't be empty.")
-
- byte_ciphertext = to_bytes(ciphertext)
-
- # copy because max_plaintext_length will destroy the buffer
- ciphertext_buffer = ffi.new("char[]", byte_ciphertext)
-
- max_plaintext_length = lib.olm_group_decrypt_max_plaintext_length(
- self._session, ciphertext_buffer, len(byte_ciphertext)
- )
- self._check_error(max_plaintext_length)
- plaintext_buffer = ffi.new("char[]", max_plaintext_length)
- # copy because max_plaintext_length will destroy the buffer
- ciphertext_buffer = ffi.new("char[]", byte_ciphertext)
-
- message_index = ffi.new("uint32_t*")
- plaintext_length = lib.olm_group_decrypt(
- self._session, ciphertext_buffer, len(byte_ciphertext),
- plaintext_buffer, max_plaintext_length,
- message_index
- )
-
- self._check_error(plaintext_length)
-
- plaintext = to_unicode_str(
- ffi.unpack(plaintext_buffer, plaintext_length),
- errors=unicode_errors
- )
-
- # clear out copies of the plaintext
- lib.memset(plaintext_buffer, 0, max_plaintext_length)
-
- return plaintext, message_index[0]
-
- @property
- def id(self):
- # type: () -> str
- """str: A base64 encoded identifier for this session."""
- id_length = lib.olm_inbound_group_session_id_length(self._session)
- id_buffer = ffi.new("char[]", id_length)
- ret = lib.olm_inbound_group_session_id(
- self._session,
- id_buffer,
- id_length
- )
- self._check_error(ret)
- return bytes_to_native_str(ffi.unpack(id_buffer, id_length))
-
- @property
- def first_known_index(self):
- # type: () -> int
- """int: The first message index we know how to decrypt."""
- return lib.olm_inbound_group_session_first_known_index(self._session)
-
- def export_session(self, message_index):
- # type: (int) -> str
- """Export an inbound group session
-
- Export the base64-encoded ratchet key for this session, at the given
- index, in a format which can be used by import_session().
-
- Raises OlmGroupSessionError on failure. The error message for the
- exception will be:
-
- * OLM_UNKNOWN_MESSAGE_INDEX if we do not have a session key
- corresponding to the given index (ie, it was sent before the
- session key was shared with us)
-
- Args:
- message_index(int): The message index at which the session should
- be exported.
- """
-
- export_length = lib.olm_export_inbound_group_session_length(
- self._session)
-
- export_buffer = ffi.new("char[]", export_length)
- ret = lib.olm_export_inbound_group_session(
- self._session,
- export_buffer,
- export_length,
- message_index
- )
- self._check_error(ret)
- export_str = bytes_to_native_str(ffi.unpack(export_buffer, export_length))
-
- # clear out copies of the key
- lib.memset(export_buffer, 0, export_length)
-
- return export_str
-
- @classmethod
- def import_session(cls, session_key):
- # type: (AnyStr) -> InboundGroupSession
- """Create an InboundGroupSession from an exported session key.
-
- Creates an InboundGroupSession with an previously exported session key,
- raises OlmGroupSessionError on failure. The error message for the
- exception will be:
-
- * OLM_INVALID_BASE64 if the session_key is not valid base64
- * OLM_BAD_SESSION_KEY if the session_key is invalid
-
- Args:
- session_key(str): The exported session key with which the inbound
- group session will be created
- """
- obj = cls.__new__(cls)
-
- byte_session_key = to_bytearray(session_key)
-
- try:
- ret = lib.olm_import_inbound_group_session(
- obj._session,
- ffi.from_buffer(byte_session_key),
- len(byte_session_key)
- )
- obj._check_error(ret)
- finally:
- # clear out copies of the key
- if byte_session_key is not session_key:
- for i in range(0, len(byte_session_key)):
- byte_session_key[i] = 0
-
- return obj
-
-
-class OutboundGroupSession(object):
- """Outbound group session for encrypted multiuser communication."""
-
- def __new__(cls):
- # type: (Type[OutboundGroupSession]) -> OutboundGroupSession
- obj = super().__new__(cls)
- obj._buf = ffi.new("char[]", lib.olm_outbound_group_session_size())
- obj._session = lib.olm_outbound_group_session(obj._buf)
- track_for_finalization(
- obj,
- obj._session,
- _clear_outbound_group_session
- )
- return obj
-
- def __init__(self):
- # type: () -> None
- """Create a new outbound group session.
-
- Start a new outbound group session. Raises OlmGroupSessionError on
- failure.
- """
- if False: # pragma: no cover
- self._session = self._session # type: ffi.cdata
-
- random_length = lib.olm_init_outbound_group_session_random_length(
- self._session
- )
- random = URANDOM(random_length)
-
- ret = lib.olm_init_outbound_group_session(
- self._session, ffi.from_buffer(random), random_length
- )
- self._check_error(ret)
-
- def _check_error(self, ret):
- # type: (int) -> None
- if ret != lib.olm_error():
- return
-
- last_error = bytes_to_native_str(ffi.string(
- lib.olm_outbound_group_session_last_error(self._session)
- ))
-
- raise OlmGroupSessionError(last_error)
-
- def pickle(self, passphrase=""):
- # type: (Optional[str]) -> bytes
- """Store an outbound group session.
-
- Stores a group session as a base64 string. Encrypts the session using
- the supplied passphrase. Returns a byte object containing the base64
- encoded string of the pickled session.
-
- Args:
- passphrase(str, optional): The passphrase to be used to encrypt
- the session.
- """
- byte_passphrase = bytearray(passphrase, "utf-8") if passphrase else b""
- pickle_length = lib.olm_pickle_outbound_group_session_length(
- self._session)
- pickle_buffer = ffi.new("char[]", pickle_length)
-
- try:
- ret = lib.olm_pickle_outbound_group_session(
- self._session,
- ffi.from_buffer(byte_passphrase), len(byte_passphrase),
- pickle_buffer, pickle_length
- )
- self._check_error(ret)
- finally:
- # clear out copies of the passphrase
- for i in range(0, len(byte_passphrase)):
- byte_passphrase[i] = 0
-
- return ffi.unpack(pickle_buffer, pickle_length)
-
- @classmethod
- def from_pickle(cls, pickle, passphrase=""):
- # type: (bytes, Optional[str]) -> OutboundGroupSession
- """Load a previously stored outbound group session.
-
- Loads an outbound group session from a pickled base64 string and
- returns an OutboundGroupSession object. Decrypts the session using the
- supplied passphrase. Raises OlmSessionError on failure. If the
- passphrase doesn't match the one used to encrypt the session then the
- error message for the exception will be "BAD_ACCOUNT_KEY". If the
- base64 couldn't be decoded then the error message will be
- "INVALID_BASE64".
-
- Args:
- pickle(bytes): Base64 encoded byte string containing the pickled
- session
- passphrase(str, optional): The passphrase used to encrypt the
- """
- if not pickle:
- raise ValueError("Pickle can't be empty")
-
- byte_passphrase = bytearray(passphrase, "utf-8") if passphrase else b""
- # copy because unpickle will destroy the buffer
- pickle_buffer = ffi.new("char[]", pickle)
-
- obj = cls.__new__(cls)
-
- try:
- ret = lib.olm_unpickle_outbound_group_session(
- obj._session,
- ffi.from_buffer(byte_passphrase),
- len(byte_passphrase),
- pickle_buffer,
- len(pickle)
- )
- obj._check_error(ret)
- finally:
- # clear out copies of the passphrase
- for i in range(0, len(byte_passphrase)):
- byte_passphrase[i] = 0
-
- return obj
-
- def encrypt(self, plaintext):
- # type: (AnyStr) -> str
- """Encrypt a message.
-
- Returns the encrypted ciphertext.
-
- Args:
- plaintext(str): A string that will be encrypted using the group
- session.
- """
- byte_plaintext = to_bytearray(plaintext)
- message_length = lib.olm_group_encrypt_message_length(
- self._session, len(byte_plaintext)
- )
-
- message_buffer = ffi.new("char[]", message_length)
-
- try:
- ret = lib.olm_group_encrypt(
- self._session,
- ffi.from_buffer(byte_plaintext), len(byte_plaintext),
- message_buffer, message_length,
- )
- self._check_error(ret)
- finally:
- # clear out copies of plaintext
- if byte_plaintext is not plaintext:
- for i in range(0, len(byte_plaintext)):
- byte_plaintext[i] = 0
-
- return bytes_to_native_str(ffi.unpack(message_buffer, message_length))
-
- @property
- def id(self):
- # type: () -> str
- """str: A base64 encoded identifier for this session."""
- id_length = lib.olm_outbound_group_session_id_length(self._session)
- id_buffer = ffi.new("char[]", id_length)
-
- ret = lib.olm_outbound_group_session_id(
- self._session,
- id_buffer,
- id_length
- )
- self._check_error(ret)
-
- return bytes_to_native_str(ffi.unpack(id_buffer, id_length))
-
- @property
- def message_index(self):
- # type: () -> int
- """int: The current message index of the session.
-
- Each message is encrypted with an increasing index. This is the index
- for the next message.
- """
- return lib.olm_outbound_group_session_message_index(self._session)
-
- @property
- def session_key(self):
- # type: () -> str
- """The base64-encoded current ratchet key for this session.
-
- Each message is encrypted with a different ratchet key. This function
- returns the ratchet key that will be used for the next message.
- """
- key_length = lib.olm_outbound_group_session_key_length(self._session)
- key_buffer = ffi.new("char[]", key_length)
-
- ret = lib.olm_outbound_group_session_key(
- self._session,
- key_buffer,
- key_length
- )
- self._check_error(ret)
-
- return bytes_to_native_str(ffi.unpack(key_buffer, key_length))
diff --git a/python/olm/pk.py b/python/olm/pk.py
deleted file mode 100644
index 4352359..0000000
--- a/python/olm/pk.py
+++ /dev/null
@@ -1,461 +0,0 @@
-# -*- coding: utf-8 -*-
-# libolm python bindings
-# Copyright © 2018 Damir Jelić <poljar@termina.org.uk>
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""libolm PK module.
-
-This module contains bindings to the PK part of the Olm library.
-It contains two classes PkDecryption and PkEncryption that are used to
-establish an encrypted communication channel using public key encryption,
-as well as a class PkSigning that is used to sign a message.
-
-Examples:
- >>> decryption = PkDecryption()
- >>> encryption = PkEncryption(decryption.public_key)
- >>> plaintext = "It's a secret to everybody."
- >>> message = encryption.encrypt(plaintext)
- >>> decrypted_plaintext = decryption.decrypt(message)
- >>> seed = PkSigning.generate_seed()
- >>> signing = PkSigning(seed)
- >>> signature = signing.sign(plaintext)
- >>> ed25519_verify(signing.public_key, plaintext, signature)
-
-"""
-
-from builtins import super
-from typing import AnyStr, Type
-
-from future.utils import bytes_to_native_str
-
-from _libolm import ffi, lib # type: ignore
-
-from ._compat import URANDOM, to_bytearray, to_unicode_str
-from ._finalize import track_for_finalization
-
-
-class PkEncryptionError(Exception):
- """libolm Pk encryption exception."""
-
-
-class PkDecryptionError(Exception):
- """libolm Pk decryption exception."""
-
-
-class PkSigningError(Exception):
- """libolm Pk signing exception."""
-
-
-def _clear_pk_encryption(pk_struct):
- lib.olm_clear_pk_encryption(pk_struct)
-
-
-class PkMessage(object):
- """A PK encrypted message."""
-
- def __init__(self, ephemeral_key, mac, ciphertext):
- # type: (str, str, str) -> None
- """Create a new PK encrypted message.
-
- Args:
- ephemeral_key(str): the public part of the ephemeral key
- used (together with the recipient's key) to generate a symmetric
- encryption key.
- mac(str): Message Authentication Code of the encrypted message
- ciphertext(str): The cipher text of the encrypted message
- """
- self.ephemeral_key = ephemeral_key
- self.mac = mac
- self.ciphertext = ciphertext
-
-
-class PkEncryption(object):
- """PkEncryption class.
-
- Represents the decryption part of a PK encrypted channel.
- """
-
- def __init__(self, recipient_key):
- # type: (AnyStr) -> None
- """Create a new PK encryption object.
-
- Args:
- recipient_key(str): a public key that will be used for encryption
- """
- if not recipient_key:
- raise ValueError("Recipient key can't be empty")
-
- self._buf = ffi.new("char[]", lib.olm_pk_encryption_size())
- self._pk_encryption = lib.olm_pk_encryption(self._buf)
- track_for_finalization(self, self._pk_encryption, _clear_pk_encryption)
-
- byte_key = to_bytearray(recipient_key)
- lib.olm_pk_encryption_set_recipient_key(
- self._pk_encryption,
- ffi.from_buffer(byte_key),
- len(byte_key)
- )
-
- # clear out copies of the key
- if byte_key is not recipient_key: # pragma: no cover
- for i in range(0, len(byte_key)):
- byte_key[i] = 0
-
- def _check_error(self, ret): # pragma: no cover
- # type: (int) -> None
- if ret != lib.olm_error():
- return
-
- last_error = bytes_to_native_str(
- ffi.string(lib.olm_pk_encryption_last_error(self._pk_encryption)))
-
- raise PkEncryptionError(last_error)
-
- def encrypt(self, plaintext):
- # type: (AnyStr) -> PkMessage
- """Encrypt a message.
-
- Returns the encrypted PkMessage.
-
- Args:
- plaintext(str): A string that will be encrypted using the
- PkEncryption object.
- """
- byte_plaintext = to_bytearray(plaintext)
-
- r_length = lib.olm_pk_encrypt_random_length(self._pk_encryption)
- random = URANDOM(r_length)
- random_buffer = ffi.new("char[]", random)
-
- ciphertext_length = lib.olm_pk_ciphertext_length(
- self._pk_encryption, len(byte_plaintext)
- )
- ciphertext = ffi.new("char[]", ciphertext_length)
-
- mac_length = lib.olm_pk_mac_length(self._pk_encryption)
- mac = ffi.new("char[]", mac_length)
-
- ephemeral_key_size = lib.olm_pk_key_length()
- ephemeral_key = ffi.new("char[]", ephemeral_key_size)
-
- ret = lib.olm_pk_encrypt(
- self._pk_encryption,
- ffi.from_buffer(byte_plaintext), len(byte_plaintext),
- ciphertext, ciphertext_length,
- mac, mac_length,
- ephemeral_key, ephemeral_key_size,
- random_buffer, r_length
- )
-
- try:
- self._check_error(ret)
- finally: # pragma: no cover
- # clear out copies of plaintext
- if byte_plaintext is not plaintext:
- for i in range(0, len(byte_plaintext)):
- byte_plaintext[i] = 0
-
- message = PkMessage(
- bytes_to_native_str(
- ffi.unpack(ephemeral_key, ephemeral_key_size)),
- bytes_to_native_str(
- ffi.unpack(mac, mac_length)),
- bytes_to_native_str(
- ffi.unpack(ciphertext, ciphertext_length))
- )
- return message
-
-
-def _clear_pk_decryption(pk_struct):
- lib.olm_clear_pk_decryption(pk_struct)
-
-
-class PkDecryption(object):
- """PkDecryption class.
-
- Represents the decryption part of a PK encrypted channel.
-
- Attributes:
- public_key (str): The public key of the PkDecryption object, can be
- shared and used to create a PkEncryption object.
-
- """
-
- def __new__(cls):
- # type: (Type[PkDecryption]) -> PkDecryption
- obj = super().__new__(cls)
- obj._buf = ffi.new("char[]", lib.olm_pk_decryption_size())
- obj._pk_decryption = lib.olm_pk_decryption(obj._buf)
- obj.public_key = None
- track_for_finalization(obj, obj._pk_decryption, _clear_pk_decryption)
- return obj
-
- def __init__(self):
- if False: # pragma: no cover
- self._pk_decryption = self._pk_decryption # type: ffi.cdata
-
- random_length = lib.olm_pk_private_key_length()
- random = URANDOM(random_length)
- random_buffer = ffi.new("char[]", random)
-
- key_length = lib.olm_pk_key_length()
- key_buffer = ffi.new("char[]", key_length)
-
- ret = lib.olm_pk_key_from_private(
- self._pk_decryption,
- key_buffer, key_length,
- random_buffer, random_length
- )
- self._check_error(ret)
- self.public_key = bytes_to_native_str(ffi.unpack(
- key_buffer,
- key_length
- ))
-
- def _check_error(self, ret):
- # type: (int) -> None
- if ret != lib.olm_error():
- return
-
- last_error = bytes_to_native_str(
- ffi.string(lib.olm_pk_decryption_last_error(self._pk_decryption)))
-
- raise PkDecryptionError(last_error)
-
- def pickle(self, passphrase=""):
- # type: (str) -> bytes
- """Store a PkDecryption object.
-
- Stores a PkDecryption object as a base64 string. Encrypts the object
- using the supplied passphrase. Returns a byte object containing the
- base64 encoded string of the pickled session.
-
- Args:
- passphrase(str, optional): The passphrase to be used to encrypt
- the object.
- """
- byte_key = to_bytearray(passphrase)
-
- pickle_length = lib.olm_pickle_pk_decryption_length(
- self._pk_decryption
- )
- pickle_buffer = ffi.new("char[]", pickle_length)
-
- ret = lib.olm_pickle_pk_decryption(
- self._pk_decryption,
- ffi.from_buffer(byte_key), len(byte_key),
- pickle_buffer, pickle_length
- )
- try:
- self._check_error(ret)
- finally:
- # zero out copies of the passphrase
- for i in range(0, len(byte_key)):
- byte_key[i] = 0
-
- return ffi.unpack(pickle_buffer, pickle_length)
-
- @classmethod
- def from_pickle(cls, pickle, passphrase=""):
- # types: (bytes, str) -> PkDecryption
- """Restore a previously stored PkDecryption object.
-
- Creates a PkDecryption object from a pickled base64 string. Decrypts
- the pickled object using the supplied passphrase.
- Raises PkDecryptionError on failure. If the passphrase
- doesn't match the one used to encrypt the session then the error
- message for the exception will be "BAD_ACCOUNT_KEY". If the base64
- couldn't be decoded then the error message will be "INVALID_BASE64".
-
- Args:
- pickle(bytes): Base64 encoded byte string containing the pickled
- PkDecryption object
- passphrase(str, optional): The passphrase used to encrypt the
- object
- """
- if not pickle:
- raise ValueError("Pickle can't be empty")
-
- byte_key = to_bytearray(passphrase)
- pickle_buffer = ffi.new("char[]", pickle)
-
- pubkey_length = lib.olm_pk_key_length()
- pubkey_buffer = ffi.new("char[]", pubkey_length)
-
- obj = cls.__new__(cls)
-
- ret = lib.olm_unpickle_pk_decryption(
- obj._pk_decryption,
- ffi.from_buffer(byte_key), len(byte_key),
- pickle_buffer, len(pickle),
- pubkey_buffer, pubkey_length)
-
- try:
- obj._check_error(ret)
- finally:
- for i in range(0, len(byte_key)):
- byte_key[i] = 0
-
- obj.public_key = bytes_to_native_str(ffi.unpack(
- pubkey_buffer,
- pubkey_length
- ))
-
- return obj
-
- def decrypt(self, message, unicode_errors="replace"):
- # type (PkMessage, str) -> str
- """Decrypt a previously encrypted Pk message.
-
- Returns the decrypted plaintext.
- Raises PkDecryptionError on failure.
-
- Args:
- message(PkMessage): the pk message to decrypt.
- unicode_errors(str, optional): The error handling scheme to use for
- unicode decoding errors. The default is "replace" meaning that
- the character that was unable to decode will be replaced with
- the unicode replacement character (U+FFFD). Other possible
- values are "strict", "ignore" and "xmlcharrefreplace" as well
- as any other name registered with codecs.register_error that
- can handle UnicodeEncodeErrors.
- """
- ephemeral_key = to_bytearray(message.ephemeral_key)
- ephemeral_key_size = len(ephemeral_key)
-
- mac = to_bytearray(message.mac)
- mac_length = len(mac)
-
- ciphertext = to_bytearray(message.ciphertext)
- ciphertext_length = len(ciphertext)
-
- max_plaintext_length = lib.olm_pk_max_plaintext_length(
- self._pk_decryption,
- ciphertext_length
- )
- plaintext_buffer = ffi.new("char[]", max_plaintext_length)
-
- ret = lib.olm_pk_decrypt(
- self._pk_decryption,
- ffi.from_buffer(ephemeral_key), ephemeral_key_size,
- ffi.from_buffer(mac), mac_length,
- ffi.from_buffer(ciphertext), ciphertext_length,
- plaintext_buffer, max_plaintext_length)
- self._check_error(ret)
-
- plaintext = (ffi.unpack(
- plaintext_buffer,
- ret
- ))
-
- # clear out copies of the plaintext
- lib.memset(plaintext_buffer, 0, max_plaintext_length)
-
- return to_unicode_str(plaintext, errors=unicode_errors)
-
-
-def _clear_pk_signing(pk_struct):
- lib.olm_clear_pk_signing(pk_struct)
-
-
-class PkSigning(object):
- """PkSigning class.
-
- Signs messages using public key cryptography.
-
- Attributes:
- public_key (str): The public key of the PkSigning object, can be
- shared and used to verify using Utility.ed25519_verify.
-
- """
-
- def __init__(self, seed):
- # type: (bytes) -> None
- """Create a new signing object.
-
- Args:
- seed(bytes): the seed to use as the private key for signing. The
- seed must have the same length as the seeds generated by
- PkSigning.generate_seed().
- """
- if not seed:
- raise ValueError("seed can't be empty")
-
- self._buf = ffi.new("char[]", lib.olm_pk_signing_size())
- self._pk_signing = lib.olm_pk_signing(self._buf)
- track_for_finalization(self, self._pk_signing, _clear_pk_signing)
-
- seed_buffer = ffi.new("char[]", seed)
-
- pubkey_length = lib.olm_pk_signing_public_key_length()
- pubkey_buffer = ffi.new("char[]", pubkey_length)
-
- ret = lib.olm_pk_signing_key_from_seed(
- self._pk_signing,
- pubkey_buffer, pubkey_length,
- seed_buffer, len(seed)
- )
-
- # zero out copies of the seed
- lib.memset(seed_buffer, 0, len(seed))
-
- self._check_error(ret)
-
- self.public_key = bytes_to_native_str(
- ffi.unpack(pubkey_buffer, pubkey_length)
- )
-
- def _check_error(self, ret):
- # type: (int) -> None
- if ret != lib.olm_error():
- return
-
- last_error = bytes_to_native_str(
- ffi.string(lib.olm_pk_signing_last_error(self._pk_signing)))
-
- raise PkSigningError(last_error)
-
- @classmethod
- def generate_seed(cls):
- # type: () -> bytes
- """Generate a random seed.
- """
- random_length = lib.olm_pk_signing_seed_length()
- random = URANDOM(random_length)
-
- return random
-
- def sign(self, message):
- # type: (AnyStr) -> str
- """Sign a message
-
- Returns the signature.
- Raises PkSigningError on failure.
-
- Args:
- message(str): the message to sign.
- """
- bytes_message = to_bytearray(message)
-
- signature_length = lib.olm_pk_signature_length()
- signature_buffer = ffi.new("char[]", signature_length)
-
- ret = lib.olm_pk_sign(
- self._pk_signing,
- ffi.from_buffer(bytes_message), len(bytes_message),
- signature_buffer, signature_length)
- self._check_error(ret)
-
- return bytes_to_native_str(
- ffi.unpack(signature_buffer, signature_length)
- )
diff --git a/python/olm/sas.py b/python/olm/sas.py
deleted file mode 100644
index cf2a443..0000000
--- a/python/olm/sas.py
+++ /dev/null
@@ -1,245 +0,0 @@
-# -*- coding: utf-8 -*-
-# libolm python bindings
-# Copyright © 2019 Damir Jelić <poljar@termina.org.uk>
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""libolm SAS module.
-
-This module contains functions to perform key verification using the Short
-Authentication String (SAS) method.
-
-Examples:
- >>> sas = Sas()
- >>> bob_key = "3p7bfXt9wbTTW2HC7OQ1Nz+DQ8hbeGdNrfx+FG+IK08"
- >>> message = "Hello world!"
- >>> extra_info = "MAC"
- >>> sas_alice.set_their_pubkey(bob_key)
- >>> sas_alice.calculate_mac(message, extra_info)
- >>> sas_alice.generate_bytes(extra_info, 5)
-
-"""
-
-from builtins import bytes
-from functools import wraps
-from typing import Optional
-
-from future.utils import bytes_to_native_str
-
-from _libolm import ffi, lib
-
-from ._compat import URANDOM, to_bytearray, to_bytes
-from ._finalize import track_for_finalization
-
-
-def _clear_sas(sas):
- # type: (ffi.cdata) -> None
- lib.olm_clear_sas(sas)
-
-
-class OlmSasError(Exception):
- """libolm Sas error exception."""
-
-
-class Sas(object):
- """libolm Short Authenticaton String (SAS) class."""
-
- def __init__(self, other_users_pubkey=None):
- # type: (Optional[str]) -> None
- """Create a new SAS object.
-
- Args:
- other_users_pubkey(str, optional): The other users public key, this
- key is necesary to generate bytes for the authentication string
- as well as to calculate the MAC.
-
- Raises OlmSasError on failure.
-
- """
- self._buf = ffi.new("char[]", lib.olm_sas_size())
- self._sas = lib.olm_sas(self._buf)
- track_for_finalization(self, self._sas, _clear_sas)
-
- random_length = lib.olm_create_sas_random_length(self._sas)
- random = URANDOM(random_length)
-
- self._create_sas(random, random_length)
-
- if other_users_pubkey:
- self.set_their_pubkey(other_users_pubkey)
-
- def _create_sas(self, buffer, buffer_length):
- self._check_error(
- lib.olm_create_sas(
- self._sas,
- ffi.from_buffer(buffer),
- buffer_length
- )
- )
-
- def _check_error(self, ret):
- # type: (int) -> None
- if ret != lib.olm_error():
- return
-
- last_error = bytes_to_native_str(
- ffi.string((lib.olm_sas_last_error(self._sas))))
-
- raise OlmSasError(last_error)
-
- @property
- def pubkey(self):
- # type: () -> str
- """Get the public key for the SAS object.
-
- This returns the public key of the SAS object that can then be shared
- with another user to perform the authentication process.
-
- Raises OlmSasError on failure.
-
- """
- pubkey_length = lib.olm_sas_pubkey_length(self._sas)
- pubkey_buffer = ffi.new("char[]", pubkey_length)
-
- self._check_error(
- lib.olm_sas_get_pubkey(self._sas, pubkey_buffer, pubkey_length)
- )
-
- return bytes_to_native_str(ffi.unpack(pubkey_buffer, pubkey_length))
-
- @property
- def other_key_set(self):
- # type: () -> bool
- """Check if the other user's pubkey has been set.
- """
- return lib.olm_sas_is_their_key_set(self._sas) == 1
-
- def set_their_pubkey(self, key):
- # type: (str) -> None
- """Set the public key of the other user.
-
- This sets the public key of the other user, it needs to be set before
- bytes can be generated for the authentication string and a MAC can be
- calculated.
-
- Args:
- key (str): The other users public key.
-
- Raises OlmSasError on failure.
-
- """
- byte_key = to_bytearray(key)
-
- self._check_error(
- lib.olm_sas_set_their_key(
- self._sas,
- ffi.from_buffer(byte_key),
- len(byte_key)
- )
- )
-
- def generate_bytes(self, extra_info, length):
- # type: (str, int) -> bytes
- """Generate bytes to use for the short authentication string.
-
- Args:
- extra_info (str): Extra information to mix in when generating the
- bytes.
- length (int): The number of bytes to generate.
-
- Raises OlmSasError if the other users persons public key isn't set or
- an internal Olm error happens.
-
- """
- if length < 1:
- raise ValueError("The length needs to be a positive integer value")
-
- byte_info = to_bytearray(extra_info)
- out_buffer = ffi.new("char[]", length)
-
- self._check_error(
- lib.olm_sas_generate_bytes(
- self._sas,
- ffi.from_buffer(byte_info),
- len(byte_info),
- out_buffer,
- length
- )
- )
-
- return ffi.unpack(out_buffer, length)
-
- def calculate_mac(self, message, extra_info):
- # type: (str, str) -> str
- """Generate a message authentication code based on the shared secret.
-
- Args:
- message (str): The message to produce the authentication code for.
- extra_info (str): Extra information to mix in when generating the
- MAC
-
- Raises OlmSasError on failure.
-
- """
- byte_message = to_bytes(message)
- byte_info = to_bytes(extra_info)
-
- mac_length = lib.olm_sas_mac_length(self._sas)
- mac_buffer = ffi.new("char[]", mac_length)
-
- self._check_error(
- lib.olm_sas_calculate_mac(
- self._sas,
- ffi.from_buffer(byte_message),
- len(byte_message),
- ffi.from_buffer(byte_info),
- len(byte_info),
- mac_buffer,
- mac_length
- )
- )
- return bytes_to_native_str(ffi.unpack(mac_buffer, mac_length))
-
- def calculate_mac_long_kdf(self, message, extra_info):
- # type: (str, str) -> str
- """Generate a message authentication code based on the shared secret.
-
- This function should not be used unless compatibility with an older
- non-tagged Olm version is required.
-
- Args:
- message (str): The message to produce the authentication code for.
- extra_info (str): Extra information to mix in when generating the
- MAC
-
- Raises OlmSasError on failure.
-
- """
- byte_message = to_bytes(message)
- byte_info = to_bytes(extra_info)
-
- mac_length = lib.olm_sas_mac_length(self._sas)
- mac_buffer = ffi.new("char[]", mac_length)
-
- self._check_error(
- lib.olm_sas_calculate_mac_long_kdf(
- self._sas,
- ffi.from_buffer(byte_message),
- len(byte_message),
- ffi.from_buffer(byte_info),
- len(byte_info),
- mac_buffer,
- mac_length
- )
- )
- return bytes_to_native_str(ffi.unpack(mac_buffer, mac_length))
diff --git a/python/olm/session.py b/python/olm/session.py
deleted file mode 100644
index 636eb3d..0000000
--- a/python/olm/session.py
+++ /dev/null
@@ -1,495 +0,0 @@
-# -*- coding: utf-8 -*-
-# libolm python bindings
-# Copyright © 2015-2017 OpenMarket Ltd
-# Copyright © 2018 Damir Jelić <poljar@termina.org.uk>
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""libolm Session module.
-
-This module contains the Olm Session part of the Olm library.
-
-It is used to establish a peer-to-peer encrypted communication channel between
-two Olm accounts.
-
-Examples:
- >>> alice = Account()
- >>> bob = Account()
- >>> bob.generate_one_time_keys(1)
- >>> id_key = bob.identity_keys['curve25519']
- >>> one_time = list(bob.one_time_keys["curve25519"].values())[0]
- >>> session = OutboundSession(alice, id_key, one_time)
-
-"""
-
-# pylint: disable=redefined-builtin,unused-import
-from builtins import bytes, super
-from typing import AnyStr, Optional, Type
-
-from future.utils import bytes_to_native_str
-
-# pylint: disable=no-name-in-module
-from _libolm import ffi, lib # type: ignore
-
-from ._compat import URANDOM, to_bytearray, to_bytes, to_unicode_str
-from ._finalize import track_for_finalization
-
-# This is imported only for type checking purposes
-if False:
- from .account import Account # pragma: no cover
-
-
-class OlmSessionError(Exception):
- """libolm Session exception."""
-
-
-class _OlmMessage(object):
- def __init__(self, ciphertext, message_type):
- # type: (AnyStr, ffi.cdata) -> None
- if not ciphertext:
- raise ValueError("Ciphertext can't be empty")
-
- # I don't know why mypy wants a type annotation here nor why AnyStr
- # doesn't work
- self.ciphertext = ciphertext # type: ignore
- self.message_type = message_type
-
- def __str__(self):
- # type: () -> str
- type_to_prefix = {
- lib.OLM_MESSAGE_TYPE_PRE_KEY: "PRE_KEY",
- lib.OLM_MESSAGE_TYPE_MESSAGE: "MESSAGE"
- }
-
- prefix = type_to_prefix[self.message_type]
- return "{} {}".format(prefix, self.ciphertext)
-
-
-class OlmPreKeyMessage(_OlmMessage):
- """Olm prekey message class
-
- Prekey messages are used to establish an Olm session. After the first
- message exchange the session switches to normal messages
- """
-
- def __init__(self, ciphertext):
- # type: (AnyStr) -> None
- """Create a new Olm prekey message with the supplied ciphertext
-
- Args:
- ciphertext(str): The ciphertext of the prekey message.
- """
- _OlmMessage.__init__(self, ciphertext, lib.OLM_MESSAGE_TYPE_PRE_KEY)
-
- def __repr__(self):
- # type: () -> str
- return "OlmPreKeyMessage({})".format(self.ciphertext)
-
-
-class OlmMessage(_OlmMessage):
- """Olm message class"""
-
- def __init__(self, ciphertext):
- # type: (AnyStr) -> None
- """Create a new Olm message with the supplied ciphertext
-
- Args:
- ciphertext(str): The ciphertext of the message.
- """
- _OlmMessage.__init__(self, ciphertext, lib.OLM_MESSAGE_TYPE_MESSAGE)
-
- def __repr__(self):
- # type: () -> str
- return "OlmMessage({})".format(self.ciphertext)
-
-
-def _clear_session(session):
- # type: (ffi.cdata) -> None
- lib.olm_clear_session(session)
-
-
-class Session(object):
- """libolm Session class.
- This is an abstract class that can't be instantiated except when unpickling
- a previously pickled InboundSession or OutboundSession object with
- from_pickle.
- """
-
- def __new__(cls):
- # type: (Type[Session]) -> Session
-
- obj = super().__new__(cls)
- obj._buf = ffi.new("char[]", lib.olm_session_size())
- obj._session = lib.olm_session(obj._buf)
- track_for_finalization(obj, obj._session, _clear_session)
- return obj
-
- def __init__(self):
- # type: () -> None
- if type(self) is Session:
- raise TypeError("Session class may not be instantiated.")
-
- if False:
- self._session = self._session # type: ffi.cdata
-
- def _check_error(self, ret):
- # type: (int) -> None
- if ret != lib.olm_error():
- return
-
- last_error = bytes_to_native_str(
- ffi.string(lib.olm_session_last_error(self._session)))
-
- raise OlmSessionError(last_error)
-
- def pickle(self, passphrase=""):
- # type: (Optional[str]) -> bytes
- """Store an Olm session.
-
- Stores a session as a base64 string. Encrypts the session using the
- supplied passphrase. Returns a byte object containing the base64
- encoded string of the pickled session. Raises OlmSessionError on
- failure.
-
- Args:
- passphrase(str, optional): The passphrase to be used to encrypt
- the session.
- """
- byte_key = bytearray(passphrase, "utf-8") if passphrase else b""
-
- pickle_length = lib.olm_pickle_session_length(self._session)
- pickle_buffer = ffi.new("char[]", pickle_length)
-
- try:
- self._check_error(
- lib.olm_pickle_session(self._session,
- ffi.from_buffer(byte_key),
- len(byte_key),
- pickle_buffer, pickle_length))
- finally:
- # clear out copies of the passphrase
- for i in range(0, len(byte_key)):
- byte_key[i] = 0
-
- return ffi.unpack(pickle_buffer, pickle_length)
-
- @classmethod
- def from_pickle(cls, pickle, passphrase=""):
- # type: (bytes, Optional[str]) -> Session
- """Load a previously stored Olm session.
-
- Loads a session from a pickled base64 string and returns a Session
- object. Decrypts the session using the supplied passphrase. Raises
- OlmSessionError on failure. If the passphrase doesn't match the one
- used to encrypt the session then the error message for the
- exception will be "BAD_ACCOUNT_KEY". If the base64 couldn't be decoded
- then the error message will be "INVALID_BASE64".
-
- Args:
- pickle(bytes): Base64 encoded byte string containing the pickled
- session
- passphrase(str, optional): The passphrase used to encrypt the
- session.
- """
- if not pickle:
- raise ValueError("Pickle can't be empty")
-
- byte_key = bytearray(passphrase, "utf-8") if passphrase else b""
- # copy because unpickle will destroy the buffer
- pickle_buffer = ffi.new("char[]", pickle)
-
- session = cls.__new__(cls)
-
- try:
- ret = lib.olm_unpickle_session(session._session,
- ffi.from_buffer(byte_key),
- len(byte_key),
- pickle_buffer,
- len(pickle))
- session._check_error(ret)
- finally:
- # clear out copies of the passphrase
- for i in range(0, len(byte_key)):
- byte_key[i] = 0
-
- return session
-
- def encrypt(self, plaintext):
- # type: (AnyStr) -> _OlmMessage
- """Encrypts a message using the session. Returns the ciphertext as a
- base64 encoded string on success. Raises OlmSessionError on failure.
-
- Args:
- plaintext(str): The plaintext message that will be encrypted.
- """
- byte_plaintext = to_bytearray(plaintext)
-
- r_length = lib.olm_encrypt_random_length(self._session)
- random = URANDOM(r_length)
-
- try:
- message_type = lib.olm_encrypt_message_type(self._session)
-
- self._check_error(message_type)
-
- ciphertext_length = lib.olm_encrypt_message_length(
- self._session, len(byte_plaintext)
- )
- ciphertext_buffer = ffi.new("char[]", ciphertext_length)
-
- self._check_error(lib.olm_encrypt(
- self._session,
- ffi.from_buffer(byte_plaintext), len(byte_plaintext),
- ffi.from_buffer(random), r_length,
- ciphertext_buffer, ciphertext_length,
- ))
- finally:
- # clear out copies of plaintext
- if byte_plaintext is not plaintext:
- for i in range(0, len(byte_plaintext)):
- byte_plaintext[i] = 0
-
- if message_type == lib.OLM_MESSAGE_TYPE_PRE_KEY:
- return OlmPreKeyMessage(
- bytes_to_native_str(ffi.unpack(
- ciphertext_buffer,
- ciphertext_length
- )))
- elif message_type == lib.OLM_MESSAGE_TYPE_MESSAGE:
- return OlmMessage(
- bytes_to_native_str(ffi.unpack(
- ciphertext_buffer,
- ciphertext_length
- )))
- else: # pragma: no cover
- raise ValueError("Unknown message type")
-
- def decrypt(self, message, unicode_errors="replace"):
- # type: (_OlmMessage, str) -> str
- """Decrypts a message using the session. Returns the plaintext string
- on success. Raises OlmSessionError on failure. If the base64 couldn't
- be decoded then the error message will be "INVALID_BASE64". If the
- message is for an unsupported version of the protocol the error message
- will be "BAD_MESSAGE_VERSION". If the message couldn't be decoded then
- the error message will be "BAD_MESSAGE_FORMAT". If the MAC on the
- message was invalid then the error message will be "BAD_MESSAGE_MAC".
-
- Args:
- message(OlmMessage): The Olm message that will be decrypted. It can
- be either an OlmPreKeyMessage or an OlmMessage.
- unicode_errors(str, optional): The error handling scheme to use for
- unicode decoding errors. The default is "replace" meaning that
- the character that was unable to decode will be replaced with
- the unicode replacement character (U+FFFD). Other possible
- values are "strict", "ignore" and "xmlcharrefreplace" as well
- as any other name registered with codecs.register_error that
- can handle UnicodeEncodeErrors.
- """
- if not message.ciphertext:
- raise ValueError("Ciphertext can't be empty")
-
- byte_ciphertext = to_bytes(message.ciphertext)
- # make a copy the ciphertext buffer, because
- # olm_decrypt_max_plaintext_length wants to destroy something
- ciphertext_buffer = ffi.new("char[]", byte_ciphertext)
-
- max_plaintext_length = lib.olm_decrypt_max_plaintext_length(
- self._session, message.message_type, ciphertext_buffer,
- len(byte_ciphertext)
- )
- self._check_error(max_plaintext_length)
- plaintext_buffer = ffi.new("char[]", max_plaintext_length)
-
- # make a copy the ciphertext buffer, because
- # olm_decrypt_max_plaintext_length wants to destroy something
- ciphertext_buffer = ffi.new("char[]", byte_ciphertext)
- plaintext_length = lib.olm_decrypt(
- self._session, message.message_type,
- ciphertext_buffer, len(byte_ciphertext),
- plaintext_buffer, max_plaintext_length
- )
- self._check_error(plaintext_length)
- plaintext = to_unicode_str(
- ffi.unpack(plaintext_buffer, plaintext_length),
- errors=unicode_errors
- )
-
- # clear out copies of the plaintext
- lib.memset(plaintext_buffer, 0, max_plaintext_length)
-
- return plaintext
-
- @property
- def id(self):
- # type: () -> str
- """str: An identifier for this session. Will be the same for both
- ends of the conversation.
- """
- id_length = lib.olm_session_id_length(self._session)
- id_buffer = ffi.new("char[]", id_length)
-
- self._check_error(
- lib.olm_session_id(self._session, id_buffer, id_length)
- )
- return bytes_to_native_str(ffi.unpack(id_buffer, id_length))
-
- def matches(self, message, identity_key=None):
- # type: (OlmPreKeyMessage, Optional[AnyStr]) -> bool
- """Checks if the PRE_KEY message is for this in-bound session.
- This can happen if multiple messages are sent to this session before
- this session sends a message in reply. Returns True if the session
- matches. Returns False if the session does not match. Raises
- OlmSessionError on failure. If the base64 couldn't be decoded then the
- error message will be "INVALID_BASE64". If the message was for an
- unsupported protocol version then the error message will be
- "BAD_MESSAGE_VERSION". If the message couldn't be decoded then then the
- error message will be * "BAD_MESSAGE_FORMAT".
-
- Args:
- message(OlmPreKeyMessage): The Olm prekey message that will checked
- if it is intended for this session.
- identity_key(str, optional): The identity key of the sender. To
- check if the message was also sent using this identity key.
- """
- if not isinstance(message, OlmPreKeyMessage):
- raise TypeError("Matches can only be called with prekey messages.")
-
- if not message.ciphertext:
- raise ValueError("Ciphertext can't be empty")
-
- ret = None
-
- byte_ciphertext = to_bytes(message.ciphertext)
- # make a copy, because olm_matches_inbound_session(_from) will distroy
- # it
- message_buffer = ffi.new("char[]", byte_ciphertext)
-
- if identity_key:
- byte_id_key = to_bytes(identity_key)
-
- ret = lib.olm_matches_inbound_session_from(
- self._session,
- ffi.from_buffer(byte_id_key), len(byte_id_key),
- message_buffer, len(byte_ciphertext)
- )
-
- else:
- ret = lib.olm_matches_inbound_session(
- self._session,
- message_buffer, len(byte_ciphertext))
-
- self._check_error(ret)
-
- return bool(ret)
-
-
-class InboundSession(Session):
- """Inbound Olm session for p2p encrypted communication.
- """
-
- def __new__(cls, account, message, identity_key=None):
- # type: (Account, OlmPreKeyMessage, Optional[AnyStr]) -> Session
- return super().__new__(cls)
-
- def __init__(self, account, message, identity_key=None):
- # type: (Account, OlmPreKeyMessage, Optional[AnyStr]) -> None
- """Create a new inbound Olm session.
-
- Create a new in-bound session for sending/receiving messages from an
- incoming prekey message. Raises OlmSessionError on failure. If the
- base64 couldn't be decoded then error message will be "INVALID_BASE64".
- If the message was for an unsupported protocol version then
- the errror message will be "BAD_MESSAGE_VERSION". If the message
- couldn't be decoded then then the error message will be
- "BAD_MESSAGE_FORMAT". If the message refers to an unknown one-time
- key then the error message will be "BAD_MESSAGE_KEY_ID".
-
- Args:
- account(Account): The Olm Account that will be used to create this
- session.
- message(OlmPreKeyMessage): The Olm prekey message that will checked
- that will be used to create this session.
- identity_key(str, optional): The identity key of the sender. To
- check if the message was also sent using this identity key.
- """
- if not message.ciphertext:
- raise ValueError("Ciphertext can't be empty")
-
- super().__init__()
- byte_ciphertext = to_bytes(message.ciphertext)
- message_buffer = ffi.new("char[]", byte_ciphertext)
-
- if identity_key:
- byte_id_key = to_bytes(identity_key)
- identity_key_buffer = ffi.new("char[]", byte_id_key)
- self._check_error(lib.olm_create_inbound_session_from(
- self._session,
- account._account,
- identity_key_buffer, len(byte_id_key),
- message_buffer, len(byte_ciphertext)
- ))
- else:
- self._check_error(lib.olm_create_inbound_session(
- self._session,
- account._account,
- message_buffer, len(byte_ciphertext)
- ))
-
-
-class OutboundSession(Session):
- """Outbound Olm session for p2p encrypted communication."""
-
- def __new__(cls, account, identity_key, one_time_key):
- # type: (Account, AnyStr, AnyStr) -> Session
- return super().__new__(cls)
-
- def __init__(self, account, identity_key, one_time_key):
- # type: (Account, AnyStr, AnyStr) -> None
- """Create a new outbound Olm session.
-
- Creates a new outbound session for sending messages to a given
- identity key and one-time key.
-
- Raises OlmSessionError on failure. If the keys couldn't be decoded as
- base64 then the error message will be "INVALID_BASE64".
-
- Args:
- account(Account): The Olm Account that will be used to create this
- session.
- identity_key(str): The identity key of the person with whom we want
- to start the session.
- one_time_key(str): A one-time key from the person with whom we want
- to start the session.
- """
- if not identity_key:
- raise ValueError("Identity key can't be empty")
-
- if not one_time_key:
- raise ValueError("One-time key can't be empty")
-
- super().__init__()
-
- byte_id_key = to_bytes(identity_key)
- byte_one_time = to_bytes(one_time_key)
-
- session_random_length = lib.olm_create_outbound_session_random_length(
- self._session)
-
- random = URANDOM(session_random_length)
-
- self._check_error(lib.olm_create_outbound_session(
- self._session,
- account._account,
- ffi.from_buffer(byte_id_key), len(byte_id_key),
- ffi.from_buffer(byte_one_time), len(byte_one_time),
- ffi.from_buffer(random), session_random_length
- ))
diff --git a/python/olm/utility.py b/python/olm/utility.py
deleted file mode 100644
index bddef38..0000000
--- a/python/olm/utility.py
+++ /dev/null
@@ -1,151 +0,0 @@
-# -*- coding: utf-8 -*-
-# libolm python bindings
-# Copyright © 2015-2017 OpenMarket Ltd
-# Copyright © 2018 Damir Jelić <poljar@termina.org.uk>
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""libolm Utility module.
-
-This module contains utilities for olm.
-It only contains the ed25519_verify function for signature verification.
-
-Examples:
- >>> alice = Account()
-
- >>> message = "Test"
- >>> signature = alice.sign(message)
- >>> signing_key = alice.identity_keys["ed25519"]
-
- >>> ed25519_verify(signing_key, message, signature)
-
-"""
-
-# pylint: disable=redefined-builtin,unused-import
-from typing import AnyStr, Type
-
-from future.utils import bytes_to_native_str
-
-# pylint: disable=no-name-in-module
-from _libolm import ffi, lib # type: ignore
-
-from ._compat import to_bytearray, to_bytes
-from ._finalize import track_for_finalization
-
-
-def _clear_utility(utility): # pragma: no cover
- # type: (ffi.cdata) -> None
- lib.olm_clear_utility(utility)
-
-
-class OlmVerifyError(Exception):
- """libolm signature verification exception."""
-
-
-class OlmHashError(Exception):
- """libolm hash calculation exception."""
-
-
-class _Utility(object):
- # pylint: disable=too-few-public-methods
- """libolm Utility class."""
-
- _buf = None
- _utility = None
-
- @classmethod
- def _allocate(cls):
- # type: (Type[_Utility]) -> None
- cls._buf = ffi.new("char[]", lib.olm_utility_size())
- cls._utility = lib.olm_utility(cls._buf)
- track_for_finalization(cls, cls._utility, _clear_utility)
-
- @classmethod
- def _check_error(cls, ret, error_class):
- # type: (int, Type) -> None
- if ret != lib.olm_error():
- return
-
- raise error_class("{}".format(
- ffi.string(lib.olm_utility_last_error(
- cls._utility)).decode("utf-8")))
-
- @classmethod
- def _ed25519_verify(cls, key, message, signature):
- # type: (Type[_Utility], AnyStr, AnyStr, AnyStr) -> None
- if not cls._utility:
- cls._allocate()
-
- byte_key = to_bytes(key)
- byte_message = to_bytearray(message)
- byte_signature = to_bytearray(signature)
-
- try:
- ret = lib.olm_ed25519_verify(
- cls._utility,
- byte_key,
- len(byte_key),
- ffi.from_buffer(byte_message),
- len(byte_message),
- ffi.from_buffer(byte_signature),
- len(byte_signature)
- )
-
- cls._check_error(ret, OlmVerifyError)
-
- finally:
- # clear out copies of the message, which may be a plaintext
- if byte_message is not message:
- for i in range(0, len(byte_message)):
- byte_message[i] = 0
-
- @classmethod
- def _sha256(cls, input):
- # type: (Type[_Utility], AnyStr) -> str
- if not cls._utility:
- cls._allocate()
-
- byte_input = to_bytes(input)
- hash_length = lib.olm_sha256_length(cls._utility)
- hash = ffi.new("char[]", hash_length)
-
- ret = lib.olm_sha256(cls._utility, byte_input, len(byte_input),
- hash, hash_length)
-
- cls._check_error(ret, OlmHashError)
-
- return bytes_to_native_str(ffi.unpack(hash, hash_length))
-
-
-def ed25519_verify(key, message, signature):
- # type: (AnyStr, AnyStr, AnyStr) -> None
- """Verify an ed25519 signature.
-
- Raises an OlmVerifyError if verification fails.
-
- Args:
- key(str): The ed25519 public key used for signing.
- message(str): The signed message.
- signature(bytes): The message signature.
- """
- return _Utility._ed25519_verify(key, message, signature)
-
-
-def sha256(input_string):
- # type: (AnyStr) -> str
- """Calculate the SHA-256 hash of the input and encodes it as base64.
-
- Args:
- input_string(str): The input for which the hash will be calculated.
-
- """
- return _Utility._sha256(input_string)
diff --git a/python/olm_build.py b/python/olm_build.py
deleted file mode 100644
index 0606337..0000000
--- a/python/olm_build.py
+++ /dev/null
@@ -1,62 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# libolm python bindings
-# Copyright © 2018 Damir Jelić <poljar@termina.org.uk>
-#
-# Permission to use, copy, modify, and/or distribute this software for
-# any purpose with or without fee is hereby granted, provided that the
-# above copyright notice and this permission notice appear in all copies.
-#
-# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
-# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
-# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
-# SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
-# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
-# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
-# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-
-from __future__ import unicode_literals
-
-import os
-import subprocess
-
-from cffi import FFI
-
-ffibuilder = FFI()
-PATH = os.path.dirname(__file__)
-
-DEVELOP = os.environ.get("DEVELOP")
-
-compile_args = ["-I../include"]
-link_args = ["-L../build"]
-
-if DEVELOP and DEVELOP.lower() in ["yes", "true", "1"]:
- link_args.append('-Wl,-rpath=../build')
-
-headers_build = subprocess.Popen("make headers", shell=True)
-headers_build.wait()
-
-ffibuilder.set_source(
- "_libolm",
- r"""
- #include <olm/olm.h>
- #include <olm/inbound_group_session.h>
- #include <olm/outbound_group_session.h>
- #include <olm/pk.h>
- #include <olm/sas.h>
- """,
- libraries=["olm"],
- extra_compile_args=compile_args,
- extra_link_args=link_args)
-
-with open(os.path.join(PATH, "include/olm/olm.h")) as f:
- ffibuilder.cdef(f.read(), override=True)
-
-with open(os.path.join(PATH, "include/olm/pk.h")) as f:
- ffibuilder.cdef(f.read(), override=True)
-
-with open(os.path.join(PATH, "include/olm/sas.h")) as f:
- ffibuilder.cdef(f.read(), override=True)
-
-if __name__ == "__main__":
- ffibuilder.compile(verbose=True)
diff --git a/python/requirements.txt b/python/requirements.txt
deleted file mode 100644
index c627b85..0000000
--- a/python/requirements.txt
+++ /dev/null
@@ -1,3 +0,0 @@
-future
-cffi
-typing
diff --git a/python/setup.cfg b/python/setup.cfg
deleted file mode 100644
index d10b7e4..0000000
--- a/python/setup.cfg
+++ /dev/null
@@ -1,8 +0,0 @@
-[tool:pytest]
-testpaths = tests
-flake8-ignore =
- olm/*.py F401
- tests/*.py W503
-
-[coverage:run]
-omit=olm/__version__.py
diff --git a/python/setup.py b/python/setup.py
deleted file mode 100644
index 5742fd9..0000000
--- a/python/setup.py
+++ /dev/null
@@ -1,31 +0,0 @@
-# -*- coding: utf-8 -*-
-
-import os
-from codecs import open
-
-from setuptools import setup
-
-here = os.path.abspath(os.path.dirname(__file__))
-
-about = {}
-with open(os.path.join(here, "olm", "__version__.py"), "r", "utf-8") as f:
- exec(f.read(), about)
-
-setup(
- name=about["__title__"],
- version=about["__version__"],
- description=about["__description__"],
- author=about["__author__"],
- author_email=about["__author_email__"],
- url=about["__url__"],
- license=about["__license__"],
- packages=["olm"],
- setup_requires=["cffi>=1.0.0"],
- cffi_modules=["olm_build.py:ffibuilder"],
- install_requires=[
- "cffi>=1.0.0",
- "future",
- "typing;python_version<'3.5'"
- ],
- zip_safe=False
-)
diff --git a/python/test-requirements.txt b/python/test-requirements.txt
deleted file mode 100644
index b18f27f..0000000
--- a/python/test-requirements.txt
+++ /dev/null
@@ -1,6 +0,0 @@
-pytest
-pytest-flake8
-pytest-isort
-pytest-cov
-pytest-benchmark
-aspectlib
diff --git a/python/tests/account_test.py b/python/tests/account_test.py
deleted file mode 100644
index f3b71eb..0000000
--- a/python/tests/account_test.py
+++ /dev/null
@@ -1,114 +0,0 @@
-from builtins import int
-
-import pytest
-
-from olm import Account, OlmAccountError, OlmVerifyError, ed25519_verify
-from olm._compat import to_bytes
-
-
-class TestClass(object):
- def test_to_bytes(self):
- assert isinstance(to_bytes("a"), bytes)
- assert isinstance(to_bytes(u"a"), bytes)
- assert isinstance(to_bytes(b"a"), bytes)
- assert isinstance(to_bytes(r"a"), bytes)
- with pytest.raises(TypeError):
- to_bytes(0)
-
- def test_account_creation(self):
- alice = Account()
- assert alice.identity_keys
- assert len(alice.identity_keys) == 2
-
- def test_account_pickle(self):
- alice = Account()
- pickle = alice.pickle()
- assert (alice.identity_keys == Account.from_pickle(pickle)
- .identity_keys)
-
- def test_invalid_unpickle(self):
- with pytest.raises(ValueError):
- Account.from_pickle(b"")
-
- def test_passphrase_pickle(self):
- alice = Account()
- passphrase = "It's a secret to everybody"
- pickle = alice.pickle(passphrase)
- assert (alice.identity_keys == Account.from_pickle(
- pickle, passphrase).identity_keys)
-
- def test_wrong_passphrase_pickle(self):
- alice = Account()
- passphrase = "It's a secret to everybody"
- pickle = alice.pickle(passphrase)
-
- with pytest.raises(OlmAccountError):
- Account.from_pickle(pickle, "")
-
- def test_one_time_keys(self):
- alice = Account()
- alice.generate_one_time_keys(10)
- one_time_keys = alice.one_time_keys
- assert one_time_keys
- assert len(one_time_keys["curve25519"]) == 10
-
- def test_max_one_time_keys(self):
- alice = Account()
- assert isinstance(alice.max_one_time_keys, int)
-
- def test_publish_one_time_keys(self):
- alice = Account()
- alice.generate_one_time_keys(10)
- one_time_keys = alice.one_time_keys
-
- assert one_time_keys
- assert len(one_time_keys["curve25519"]) == 10
-
- alice.mark_keys_as_published()
- assert not alice.one_time_keys["curve25519"]
-
- def test_clear(self):
- alice = Account()
- del alice
-
- def test_valid_signature(self):
- message = "It's a secret to everybody"
- alice = Account()
-
- signature = alice.sign(message)
- signing_key = alice.identity_keys["ed25519"]
-
- assert signature
- assert signing_key
-
- ed25519_verify(signing_key, message, signature)
-
- def test_invalid_signature(self):
- message = "It's a secret to everybody"
- alice = Account()
- bob = Account()
-
- signature = alice.sign(message)
- signing_key = bob.identity_keys["ed25519"]
-
- assert signature
- assert signing_key
-
- with pytest.raises(OlmVerifyError):
- ed25519_verify(signing_key, message, signature)
-
- def test_signature_verification_twice(self):
- message = "It's a secret to everybody"
- alice = Account()
-
- signature = alice.sign(message)
- signing_key = alice.identity_keys["ed25519"]
-
- assert signature
- assert signing_key
-
- ed25519_verify(signing_key, message, signature)
- assert signature == alice.sign(message)
-
- ed25519_verify(signing_key, message, signature)
- assert signature == alice.sign(message)
diff --git a/python/tests/group_session_test.py b/python/tests/group_session_test.py
deleted file mode 100644
index 4632a60..0000000
--- a/python/tests/group_session_test.py
+++ /dev/null
@@ -1,128 +0,0 @@
-# -*- coding: utf-8 -*-
-import pytest
-
-from olm import InboundGroupSession, OlmGroupSessionError, OutboundGroupSession
-
-
-class TestClass(object):
- def test_session_create(self):
- OutboundGroupSession()
-
- def test_session_id(self):
- session = OutboundGroupSession()
- assert isinstance(session.id, str)
-
- def test_session_index(self):
- session = OutboundGroupSession()
- assert isinstance(session.message_index, int)
- assert session.message_index == 0
-
- def test_outbound_pickle(self):
- session = OutboundGroupSession()
- pickle = session.pickle()
-
- assert (session.id == OutboundGroupSession.from_pickle(
- pickle).id)
-
- def test_invalid_unpickle(self):
- with pytest.raises(ValueError):
- OutboundGroupSession.from_pickle(b"")
-
- with pytest.raises(ValueError):
- InboundGroupSession.from_pickle(b"")
-
- def test_inbound_create(self):
- outbound = OutboundGroupSession()
- InboundGroupSession(outbound.session_key)
-
- def test_invalid_decrypt(self):
- outbound = OutboundGroupSession()
- inbound = InboundGroupSession(outbound.session_key)
-
- with pytest.raises(ValueError):
- inbound.decrypt("")
-
- def test_inbound_pickle(self):
- outbound = OutboundGroupSession()
- inbound = InboundGroupSession(outbound.session_key)
- pickle = inbound.pickle()
- InboundGroupSession.from_pickle(pickle)
-
- def test_inbound_export(self):
- outbound = OutboundGroupSession()
- inbound = InboundGroupSession(outbound.session_key)
- imported = InboundGroupSession.import_session(
- inbound.export_session(inbound.first_known_index)
- )
- assert "Test", 0 == imported.decrypt(outbound.encrypt("Test"))
-
- def test_first_index(self):
- outbound = OutboundGroupSession()
- inbound = InboundGroupSession(outbound.session_key)
- index = inbound.first_known_index
- assert isinstance(index, int)
-
- def test_encrypt(self, benchmark):
- benchmark.weave(OutboundGroupSession.encrypt, lazy=True)
- outbound = OutboundGroupSession()
- inbound = InboundGroupSession(outbound.session_key)
- assert "Test", 0 == inbound.decrypt(outbound.encrypt("Test"))
-
- def test_decrypt(self, benchmark):
- benchmark.weave(InboundGroupSession.decrypt, lazy=True)
- outbound = OutboundGroupSession()
- inbound = InboundGroupSession(outbound.session_key)
- assert "Test", 0 == inbound.decrypt(outbound.encrypt("Test"))
-
- def test_decrypt_twice(self):
- outbound = OutboundGroupSession()
- inbound = InboundGroupSession(outbound.session_key)
- outbound.encrypt("Test 1")
- message, index = inbound.decrypt(outbound.encrypt("Test 2"))
- assert isinstance(index, int)
- assert ("Test 2", 1) == (message, index)
-
- def test_decrypt_failure(self):
- outbound = OutboundGroupSession()
- inbound = InboundGroupSession(outbound.session_key)
- eve_outbound = OutboundGroupSession()
- with pytest.raises(OlmGroupSessionError):
- inbound.decrypt(eve_outbound.encrypt("Test"))
-
- def test_id(self):
- outbound = OutboundGroupSession()
- inbound = InboundGroupSession(outbound.session_key)
- assert outbound.id == inbound.id
-
- def test_inbound_fail(self):
- with pytest.raises(TypeError):
- InboundGroupSession()
-
- def test_oubtound_pickle_fail(self):
- outbound = OutboundGroupSession()
- pickle = outbound.pickle("Test")
-
- with pytest.raises(OlmGroupSessionError):
- OutboundGroupSession.from_pickle(pickle)
-
- def test_outbound_clear(self):
- session = OutboundGroupSession()
- del session
-
- def test_inbound_clear(self):
- outbound = OutboundGroupSession()
- inbound = InboundGroupSession(outbound.session_key)
- del inbound
-
- def test_invalid_unicode_decrypt(self):
- outbound = OutboundGroupSession()
- inbound = InboundGroupSession(outbound.session_key)
-
- text = outbound.encrypt(b"\xed")
- plaintext, _ = inbound.decrypt(text)
-
- print(plaintext)
- assert plaintext == u"�"
-
- plaintext, _ = inbound.decrypt(text, "ignore")
- assert plaintext == ""
diff --git a/python/tests/pk_test.py b/python/tests/pk_test.py
deleted file mode 100644
index ef87465..0000000
--- a/python/tests/pk_test.py
+++ /dev/null
@@ -1,65 +0,0 @@
-# -*- coding: utf-8 -*-
-import pytest
-
-from olm import (PkDecryption, PkDecryptionError, PkEncryption, PkSigning,
- ed25519_verify)
-
-
-class TestClass(object):
- def test_invalid_encryption(self):
- with pytest.raises(ValueError):
- PkEncryption("")
-
- def test_decrytion(self):
- decryption = PkDecryption()
- encryption = PkEncryption(decryption.public_key)
- plaintext = "It's a secret to everybody."
- message = encryption.encrypt(plaintext)
- decrypted_plaintext = decryption.decrypt(message)
- isinstance(decrypted_plaintext, str)
- assert plaintext == decrypted_plaintext
-
- def test_invalid_decrytion(self):
- decryption = PkDecryption()
- encryption = PkEncryption(decryption.public_key)
- plaintext = "It's a secret to everybody."
- message = encryption.encrypt(plaintext)
- message.ephemeral_key = "?"
- with pytest.raises(PkDecryptionError):
- decryption.decrypt(message)
-
- def test_pickling(self):
- decryption = PkDecryption()
- encryption = PkEncryption(decryption.public_key)
- plaintext = "It's a secret to everybody."
- message = encryption.encrypt(plaintext)
-
- pickle = decryption.pickle()
- unpickled = PkDecryption.from_pickle(pickle)
- decrypted_plaintext = unpickled.decrypt(message)
- assert plaintext == decrypted_plaintext
-
- def test_invalid_unpickling(self):
- with pytest.raises(ValueError):
- PkDecryption.from_pickle("")
-
- def test_invalid_pass_pickling(self):
- decryption = PkDecryption()
- pickle = decryption.pickle("Secret")
-
- with pytest.raises(PkDecryptionError):
- PkDecryption.from_pickle(pickle, "Not secret")
-
- def test_signing(self):
- seed = PkSigning.generate_seed()
- signing = PkSigning(seed)
- message = "This statement is true"
- signature = signing.sign(message)
- ed25519_verify(signing.public_key, message, signature)
-
- def test_invalid_unicode_decrypt(self):
- decryption = PkDecryption()
- encryption = PkEncryption(decryption.public_key)
- message = encryption.encrypt(b"\xed")
- plaintext = decryption.decrypt(message)
- assert plaintext == u"�"
diff --git a/python/tests/sas_test.py b/python/tests/sas_test.py
deleted file mode 100644
index 9001e67..0000000
--- a/python/tests/sas_test.py
+++ /dev/null
@@ -1,99 +0,0 @@
-from builtins import bytes
-
-import pytest
-
-from olm import OlmSasError, Sas
-
-MESSAGE = "Test message"
-EXTRA_INFO = "extra_info"
-
-
-class TestClass(object):
- def test_sas_creation(self):
- sas = Sas()
- assert sas.pubkey
-
- def test_other_key_setting(self):
- sas_alice = Sas()
- sas_bob = Sas()
-
- assert not sas_alice.other_key_set
- sas_alice.set_their_pubkey(sas_bob.pubkey)
- assert sas_alice.other_key_set
-
- def test_bytes_generating(self):
- sas_alice = Sas()
- sas_bob = Sas(sas_alice.pubkey)
-
- assert sas_bob.other_key_set
-
- with pytest.raises(OlmSasError):
- sas_alice.generate_bytes(EXTRA_INFO, 5)
-
- sas_alice.set_their_pubkey(sas_bob.pubkey)
-
- with pytest.raises(ValueError):
- sas_alice.generate_bytes(EXTRA_INFO, 0)
-
- alice_bytes = sas_alice.generate_bytes(EXTRA_INFO, 5)
- bob_bytes = sas_bob.generate_bytes(EXTRA_INFO, 5)
-
- assert alice_bytes == bob_bytes
-
- def test_mac_generating(self):
- sas_alice = Sas()
- sas_bob = Sas()
-
- with pytest.raises(OlmSasError):
- sas_alice.calculate_mac(MESSAGE, EXTRA_INFO)
-
- sas_alice.set_their_pubkey(sas_bob.pubkey)
- sas_bob.set_their_pubkey(sas_alice.pubkey)
-
- alice_mac = sas_alice.calculate_mac(MESSAGE, EXTRA_INFO)
- bob_mac = sas_bob.calculate_mac(MESSAGE, EXTRA_INFO)
-
- assert alice_mac == bob_mac
-
- def test_cross_language_mac(self):
- """Test MAC generating with a predefined key pair.
-
- This test imports a private and public key from the C test and checks
- if we are getting the same MAC that the C code calculated.
- """
- alice_private = [
- 0x77, 0x07, 0x6D, 0x0A, 0x73, 0x18, 0xA5, 0x7D,
- 0x3C, 0x16, 0xC1, 0x72, 0x51, 0xB2, 0x66, 0x45,
- 0xDF, 0x4C, 0x2F, 0x87, 0xEB, 0xC0, 0x99, 0x2A,
- 0xB1, 0x77, 0xFB, 0xA5, 0x1D, 0xB9, 0x2C, 0x2A
- ]
-
- bob_key = "3p7bfXt9wbTTW2HC7OQ1Nz+DQ8hbeGdNrfx+FG+IK08"
- message = "Hello world!"
- extra_info = "MAC"
- expected_mac = "2nSMTXM+TStTU3RUVTNSVVZUTlNWVlpVVGxOV1ZscFY"
-
- sas_alice = Sas()
- sas_alice._create_sas(bytes(alice_private), 32)
- sas_alice.set_their_pubkey(bob_key)
-
- alice_mac = sas_alice.calculate_mac(message, extra_info)
-
- assert alice_mac == expected_mac
-
- def test_long_mac_generating(self):
- sas_alice = Sas()
- sas_bob = Sas()
-
- with pytest.raises(OlmSasError):
- sas_alice.calculate_mac_long_kdf(MESSAGE, EXTRA_INFO)
-
- sas_alice.set_their_pubkey(sas_bob.pubkey)
- sas_bob.set_their_pubkey(sas_alice.pubkey)
-
- alice_mac = sas_alice.calculate_mac_long_kdf(MESSAGE, EXTRA_INFO)
- bob_mac = sas_bob.calculate_mac_long_kdf(MESSAGE, EXTRA_INFO)
- bob_short_mac = sas_bob.calculate_mac(MESSAGE, EXTRA_INFO)
-
- assert alice_mac == bob_mac
- assert alice_mac != bob_short_mac
diff --git a/python/tests/session_test.py b/python/tests/session_test.py
deleted file mode 100644
index b856585..0000000
--- a/python/tests/session_test.py
+++ /dev/null
@@ -1,152 +0,0 @@
-# -*- coding: utf-8 -*-
-import pytest
-
-from olm import (Account, InboundSession, OlmMessage, OlmPreKeyMessage,
- OlmSessionError, OutboundSession, Session)
-
-
-class TestClass(object):
- def _create_session(self):
- alice = Account()
- bob = Account()
- bob.generate_one_time_keys(1)
- id_key = bob.identity_keys["curve25519"]
- one_time = list(bob.one_time_keys["curve25519"].values())[0]
- session = OutboundSession(alice, id_key, one_time)
- return alice, bob, session
-
- def test_session_create(self):
- _, _, session_1 = self._create_session()
- _, _, session_2 = self._create_session()
- assert session_1
- assert session_2
- assert session_1.id != session_2.id
- assert isinstance(session_1.id, str)
-
- def test_session_clear(self):
- _, _, session = self._create_session()
- del session
-
- def test_invalid_session_create(self):
- with pytest.raises(TypeError):
- Session()
-
- def test_session_pickle(self):
- alice, bob, session = self._create_session()
- Session.from_pickle(session.pickle()).id == session.id
-
- def test_session_invalid_pickle(self):
- with pytest.raises(ValueError):
- Session.from_pickle(b"")
-
- def test_wrong_passphrase_pickle(self):
- alice, bob, session = self._create_session()
- passphrase = "It's a secret to everybody"
- pickle = alice.pickle(passphrase)
-
- with pytest.raises(OlmSessionError):
- Session.from_pickle(pickle, "")
-
- def test_encrypt(self):
- plaintext = "It's a secret to everybody"
- alice, bob, session = self._create_session()
- message = session.encrypt(plaintext)
-
- assert (repr(message)
- == "OlmPreKeyMessage({})".format(message.ciphertext))
-
- assert (str(message)
- == "PRE_KEY {}".format(message.ciphertext))
-
- bob_session = InboundSession(bob, message)
- assert plaintext == bob_session.decrypt(message)
-
- def test_empty_message(self):
- with pytest.raises(ValueError):
- OlmPreKeyMessage("")
- empty = OlmPreKeyMessage("x")
- empty.ciphertext = ""
- alice, bob, session = self._create_session()
-
- with pytest.raises(ValueError):
- session.decrypt(empty)
-
- def test_inbound_with_id(self):
- plaintext = "It's a secret to everybody"
- alice, bob, session = self._create_session()
- message = session.encrypt(plaintext)
- alice_id = alice.identity_keys["curve25519"]
- bob_session = InboundSession(bob, message, alice_id)
- assert plaintext == bob_session.decrypt(message)
-
- def test_two_messages(self):
- plaintext = "It's a secret to everybody"
- alice, bob, session = self._create_session()
- message = session.encrypt(plaintext)
- alice_id = alice.identity_keys["curve25519"]
- bob_session = InboundSession(bob, message, alice_id)
- bob.remove_one_time_keys(bob_session)
- assert plaintext == bob_session.decrypt(message)
-
- bob_plaintext = "Grumble, Grumble"
- bob_message = bob_session.encrypt(bob_plaintext)
-
- assert (repr(bob_message)
- == "OlmMessage({})".format(bob_message.ciphertext))
-
- assert bob_plaintext == session.decrypt(bob_message)
-
- def test_matches(self):
- plaintext = "It's a secret to everybody"
- alice, bob, session = self._create_session()
- message = session.encrypt(plaintext)
- alice_id = alice.identity_keys["curve25519"]
- bob_session = InboundSession(bob, message, alice_id)
- assert plaintext == bob_session.decrypt(message)
-
- message_2nd = session.encrypt("Hey! Listen!")
-
- assert bob_session.matches(message_2nd) is True
- assert bob_session.matches(message_2nd, alice_id) is True
-
- def test_invalid(self):
- alice, bob, session = self._create_session()
- message = OlmMessage("x")
-
- with pytest.raises(TypeError):
- session.matches(message)
-
- message = OlmPreKeyMessage("x")
- message.ciphertext = ""
-
- with pytest.raises(ValueError):
- session.matches(message)
-
- with pytest.raises(ValueError):
- InboundSession(bob, message)
-
- with pytest.raises(ValueError):
- OutboundSession(alice, "", "x")
-
- with pytest.raises(ValueError):
- OutboundSession(alice, "x", "")
-
- def test_doesnt_match(self):
- plaintext = "It's a secret to everybody"
- alice, bob, session = self._create_session()
- message = session.encrypt(plaintext)
- alice_id = alice.identity_keys["curve25519"]
- bob_session = InboundSession(bob, message, alice_id)
-
- _, _, new_session = self._create_session()
-
- new_message = new_session.encrypt(plaintext)
- assert bob_session.matches(new_message) is False
-
- def test_invalid_unicode_decrypt(self):
- alice, bob, session = self._create_session()
- message = session.encrypt(b"\xed")
-
- bob_session = InboundSession(bob, message)
- plaintext = bob_session.decrypt(message)
- assert plaintext == u"�"
diff --git a/python/tests/utils_test.py b/python/tests/utils_test.py
deleted file mode 100644
index 86fb34f..0000000
--- a/python/tests/utils_test.py
+++ /dev/null
@@ -1,25 +0,0 @@
-import base64
-import hashlib
-
-from future.utils import bytes_to_native_str
-
-from olm import sha256
-from olm._compat import to_bytes
-
-
-class TestClass(object):
- def test_sha256(self):
- input1 = "It's a secret to everybody"
- input2 = "It's a secret to nobody"
-
- first_hash = sha256(input1)
- second_hash = sha256(input2)
-
- hashlib_hash = base64.b64encode(
- hashlib.sha256(to_bytes(input1)).digest()
- )
-
- hashlib_hash = bytes_to_native_str(hashlib_hash[:-1])
-
- assert first_hash != second_hash
- assert hashlib_hash == first_hash
diff --git a/python/tox.ini b/python/tox.ini
deleted file mode 100644
index e3a0188..0000000
--- a/python/tox.ini
+++ /dev/null
@@ -1,43 +0,0 @@
-# content of: tox.ini , put in same dir as setup.py
-[tox]
-envlist = py27,py36,pypy,{py2,py3}-cov,coverage
-[testenv]
-basepython =
- py27: python2.7
- py36: python3.6
- pypy: pypy
- py2: python2.7
- py3: python3.6
-
-deps = -rrequirements.txt
- -rtest-requirements.txt
-
-passenv = TOXENV CI TRAVIS TRAVIS_*
-commands = pytest --benchmark-disable
-usedevelop = True
-
-[testenv:py2-cov]
-commands =
- pytest --cov-report term-missing --cov=olm --benchmark-disable --cov-branch
-setenv =
- COVERAGE_FILE=.coverage.py2
-
-[testenv:py3-cov]
-commands =
- py.test --cov=olm --cov-report term-missing --benchmark-disable --cov-branch
-setenv =
- COVERAGE_FILE=.coverage.py3
-
-[testenv:coverage]
-basepython = python3.6
-commands =
- coverage erase
- coverage combine
- coverage xml
- coverage report --show-missing
- codecov -e TOXENV
-deps =
- coverage
- codecov>=1.4.0
-setenv =
- COVERAGE_FILE=.coverage