aboutsummaryrefslogtreecommitdiff
path: root/python/olm
diff options
context:
space:
mode:
Diffstat (limited to 'python/olm')
-rw-r--r--python/olm/__init__.py31
-rwxr-xr-xpython/olm/__main__.py468
-rw-r--r--python/olm/__version__.py9
-rw-r--r--python/olm/_base.py17
-rw-r--r--python/olm/_compat.py24
-rw-r--r--python/olm/_finalize.py65
-rw-r--r--python/olm/account.py343
-rw-r--r--python/olm/group_session.py467
-rw-r--r--python/olm/inbound_group_session.py138
-rw-r--r--python/olm/outbound_group_session.py134
-rw-r--r--python/olm/session.py594
-rw-r--r--python/olm/utility.py113
12 files changed, 1309 insertions, 1094 deletions
diff --git a/python/olm/__init__.py b/python/olm/__init__.py
index f74cbcb..b52c88b 100644
--- a/python/olm/__init__.py
+++ b/python/olm/__init__.py
@@ -1,5 +1,26 @@
-from .account import Account
-from .session import Session
-from .outbound_group_session import OutboundGroupSession
-from .inbound_group_session import InboundGroupSession
-from .utility import ed25519_verify
+# -*- coding: utf-8 -*-
+# libolm python bindings
+# Copyright © 2015-2017 OpenMarket Ltd
+# Copyright © 2018 Damir Jelić <poljar@termina.org.uk>
+"""
+Olm Python bindings
+~~~~~~~~~~~~~~~~~~~~~
+| This package implements python bindings for the libolm C library.
+| © Copyright 2015-2017 by OpenMarket Ltd
+| © Copyright 2018 by Damir Jelić
+"""
+from .utility import ed25519_verify, OlmVerifyError
+from .account import Account, OlmAccountError
+from .session import (
+ Session,
+ InboundSession,
+ OutboundSession,
+ OlmSessionError,
+ OlmMessage,
+ OlmPreKeyMessage
+)
+from .group_session import (
+ InboundGroupSession,
+ OutboundGroupSession,
+ OlmGroupSessionError
+)
diff --git a/python/olm/__main__.py b/python/olm/__main__.py
deleted file mode 100755
index d062459..0000000
--- a/python/olm/__main__.py
+++ /dev/null
@@ -1,468 +0,0 @@
-#!/usr/bin/env python
-
-from __future__ import print_function
-
-import argparse
-import json
-import os
-import sys
-import yaml
-
-from . import *
-
-
-def read_base64_file(filename):
- """Read a base64 file, dropping any CR/LF characters"""
- with open(filename, "rb") as f:
- return f.read().translate(None, "\r\n")
-
-
-def build_arg_parser():
- parser = argparse.ArgumentParser()
- parser.add_argument("--key", help="Account encryption key", default="")
- commands = parser.add_subparsers()
-
- create_account = commands.add_parser("create_account",
- help="Create a new account")
- create_account.add_argument("account_file", help="Local account file")
-
- def do_create_account(args):
- if os.path.exists(args.account_file):
- sys.stderr.write("Account %r file already exists" % (
- args.account_file,
- ))
- sys.exit(1)
- account = Account()
- account.create()
- with open(args.account_file, "wb") as f:
- f.write(account.pickle(args.key))
-
- create_account.set_defaults(func=do_create_account)
-
- keys = commands.add_parser("keys", help="List public keys for an account")
- keys.add_argument("account_file", help="Local account file")
- keys.add_argument("--json", action="store_true", help="Output as JSON")
-
- def do_keys(args):
- account = Account()
- account.unpickle(args.key, read_base64_file(args.account_file))
- result = {
- "account_keys": account.identity_keys(),
- "one_time_keys": account.one_time_keys(),
- }
- try:
- if args.json:
- json.dump(result, sys.stdout, indent=4)
- else:
- yaml.safe_dump(result, sys.stdout, default_flow_style=False)
- except:
- pass
-
- keys.set_defaults(func=do_keys)
-
- def do_id_key(args):
- account = Account()
- account.unpickle(args.key, read_base64_file(args.account_file))
- print(account.identity_keys()['curve25519'])
-
- id_key = commands.add_parser(
- "identity_key",
- help="Get the public part of the identity key for an account",
- )
- id_key.add_argument("account_file", help="Local account file")
- id_key.set_defaults(func=do_id_key)
-
- def do_signing_key(args):
- account = Account()
- account.unpickle(args.key, read_base64_file(args.account_file))
- print(account.identity_keys()['ed25519'])
-
- signing_key = commands.add_parser(
- "signing_key",
- help="Get the public part of the signing key for an account",
- )
- signing_key.add_argument("account_file", help="Local account file")
- signing_key.set_defaults(func=do_signing_key)
-
- def do_one_time_key(args):
- account = Account()
- account.unpickle(args.key, read_base64_file(args.account_file))
- keys = account.one_time_keys()['curve25519'].values()
- key_num = args.key_num
- if key_num < 1 or key_num > len(keys):
- print(
- "Invalid key number %i: %i keys available" % (
- key_num, len(keys),
- ), file=sys.stderr,
- )
- sys.exit(1)
- print(keys[key_num-1])
-
- one_time_key = commands.add_parser(
- "one_time_key",
- help="Get a one-time key for the account",
- )
- one_time_key.add_argument("account_file", help="Local account file")
- one_time_key.add_argument("--key-num", "-n", type=int, default=1,
- help="Index of key to retrieve (default: 1)")
- one_time_key.set_defaults(func=do_one_time_key)
-
- sign = commands.add_parser("sign", help="Sign a message")
- sign.add_argument("account_file", help="Local account file")
- sign.add_argument("message_file", help="Message to sign")
- sign.add_argument("signature_file", help="Signature to output")
-
- def do_sign(args):
- account = Account()
- account.unpickle(args.key, read_base64_file(args.account_file))
- with open_in(args.message_file) as f:
- message = f.read()
- signature = account.sign(message)
- with open_out(args.signature_file) as f:
- f.write(signature)
-
- sign.set_defaults(func=do_sign)
-
- generate_keys = commands.add_parser("generate_keys",
- help="Generate one time keys")
- generate_keys.add_argument("account_file", help="Local account file")
- generate_keys.add_argument("count", type=int,
- help="Number of keys to generate")
-
- def do_generate_keys(args):
- account = Account()
- account.unpickle(args.key, read_base64_file(args.account_file))
- account.generate_one_time_keys(args.count)
- with open(args.account_file, "wb") as f:
- f.write(account.pickle(args.key))
-
- generate_keys.set_defaults(func=do_generate_keys)
-
- outbound = commands.add_parser("outbound",
- help="Create an outbound session")
- outbound.add_argument("account_file", help="Local account file")
- outbound.add_argument("session_file", help="Local session file")
- outbound.add_argument("identity_key", help="Remote identity key")
- outbound.add_argument("one_time_key", help="Remote one time key")
-
- def do_outbound(args):
- if os.path.exists(args.session_file):
- sys.stderr.write("Session %r file already exists" % (
- args.session_file,
- ))
- sys.exit(1)
- account = Account()
- account.unpickle(args.key, read_base64_file(args.account_file))
- session = Session()
- session.create_outbound(
- account, args.identity_key, args.one_time_key
- )
- with open(args.session_file, "wb") as f:
- f.write(session.pickle(args.key))
-
- outbound.set_defaults(func=do_outbound)
-
- def open_in(path):
- if path == "-":
- return sys.stdin
- else:
- return open(path, "rb")
-
- def open_out(path):
- if path == "-":
- return sys.stdout
- else:
- return open(path, "wb")
-
- inbound = commands.add_parser("inbound", help="Create an inbound session")
- inbound.add_argument("account_file", help="Local account file")
- inbound.add_argument("session_file", help="Local session file")
- inbound.add_argument("message_file", help="Message", default="-")
- inbound.add_argument("plaintext_file", help="Plaintext", default="-")
-
- def do_inbound(args):
- if os.path.exists(args.session_file):
- sys.stderr.write("Session %r file already exists" % (
- args.session_file,
- ))
- sys.exit(1)
- account = Account()
- account.unpickle(args.key, read_base64_file(args.account_file))
- with open_in(args.message_file) as f:
- message_type = f.read(8)
- message = f.read()
- if message_type != "PRE_KEY ":
- sys.stderr.write("Expecting a PRE_KEY message")
- sys.exit(1)
- session = Session()
- session.create_inbound(account, message)
- plaintext = session.decrypt(0, message)
- with open(args.session_file, "wb") as f:
- f.write(session.pickle(args.key))
- with open_out(args.plaintext_file) as f:
- f.write(plaintext)
-
- inbound.set_defaults(func=do_inbound)
-
- session_id = commands.add_parser("session_id", help="Session ID")
- session_id.add_argument("session_file", help="Local session file")
-
- def do_session_id(args):
- session = Session()
- session.unpickle(args.key, read_base64_file(args.session_file))
- sys.stdout.write(session.session_id() + "\n")
-
- session_id.set_defaults(func=do_session_id)
-
- encrypt = commands.add_parser("encrypt", help="Encrypt a message")
- encrypt.add_argument("session_file", help="Local session file")
- encrypt.add_argument("plaintext_file", help="Plaintext", default="-")
- encrypt.add_argument("message_file", help="Message", default="-")
-
- def do_encrypt(args):
- session = Session()
- session.unpickle(args.key, read_base64_file(args.session_file))
- with open_in(args.plaintext_file) as f:
- plaintext = f.read()
- message_type, message = session.encrypt(plaintext)
- with open(args.session_file, "wb") as f:
- f.write(session.pickle(args.key))
- with open_out(args.message_file) as f:
- f.write(["PRE_KEY ", "MESSAGE "][message_type])
- f.write(message)
-
- encrypt.set_defaults(func=do_encrypt)
-
- decrypt = commands.add_parser("decrypt", help="Decrypt a message")
- decrypt.add_argument("session_file", help="Local session file")
- decrypt.add_argument("message_file", help="Message", default="-")
- decrypt.add_argument("plaintext_file", help="Plaintext", default="-")
-
- def do_decrypt(args):
- session = Session()
- session.unpickle(args.key, read_base64_file(args.session_file))
- with open_in(args.message_file) as f:
- message_type = f.read(8)
- message = f.read()
- if message_type not in {"PRE_KEY ", "MESSAGE "}:
- sys.stderr.write("Expecting a PRE_KEY or MESSAGE message")
- sys.exit(1)
- message_type = 1 if message_type == "MESSAGE " else 0
- plaintext = session.decrypt(message_type, message)
- with open(args.session_file, "wb") as f:
- f.write(session.pickle(args.key))
- with open_out(args.plaintext_file) as f:
- f.write(plaintext)
-
- decrypt.set_defaults(func=do_decrypt)
-
- outbound_group = commands.add_parser(
- "outbound_group",
- help="Create an outbound group session",
- )
- outbound_group.add_argument("session_file",
- help="Local group session file")
- outbound_group.set_defaults(func=do_outbound_group)
-
- group_credentials = commands.add_parser(
- "group_credentials",
- help="Export the current outbound group session credentials",
- )
- group_credentials.add_argument(
- "session_file",
- help="Local outbound group session file",
- )
- group_credentials.add_argument(
- "credentials_file",
- help="File to write credentials to (default stdout)",
- type=argparse.FileType('w'), nargs='?',
- default=sys.stdout,
- )
- group_credentials.set_defaults(func=do_group_credentials)
-
- group_encrypt = commands.add_parser(
- "group_encrypt",
- help="Encrypt a group message",
- )
- group_encrypt.add_argument("session_file",
- help="Local outbound group session file")
- group_encrypt.add_argument("plaintext_file",
- help="Plaintext file (default stdin)",
- type=argparse.FileType('rb'), nargs='?',
- default=sys.stdin)
- group_encrypt.add_argument("message_file",
- help="Message file (default stdout)",
- type=argparse.FileType('w'), nargs='?',
- default=sys.stdout)
- group_encrypt.set_defaults(func=do_group_encrypt)
-
- inbound_group = commands.add_parser(
- "inbound_group",
- help=("Create an inbound group session based on credentials from an " +
- "outbound group session"))
- inbound_group.add_argument("session_file",
- help="Local inbound group session file")
- inbound_group.add_argument(
- "credentials_file",
- help="File to read credentials from (default stdin)",
- type=argparse.FileType('r'), nargs='?',
- default=sys.stdin,
- )
- inbound_group.set_defaults(func=do_inbound_group)
-
- import_inbound_group = commands.add_parser(
- "import_inbound_group",
- help="Create an inbound group session based an exported inbound group"
- )
- import_inbound_group.add_argument("session_file",
- help="Local inbound group session file")
- import_inbound_group.add_argument(
- "export_file",
- help="File to read credentials from (default stdin)",
- type=argparse.FileType('r'), nargs='?',
- default=sys.stdin,
- )
- import_inbound_group.set_defaults(func=do_import_inbound_group)
-
- group_decrypt = commands.add_parser("group_decrypt",
- help="Decrypt a group message")
- group_decrypt.add_argument("session_file",
- help="Local inbound group session file")
- group_decrypt.add_argument("message_file",
- help="Message file (default stdin)",
- type=argparse.FileType('r'), nargs='?',
- default=sys.stdin)
- group_decrypt.add_argument("plaintext_file",
- help="Plaintext file (default stdout)",
- type=argparse.FileType('wb'), nargs='?',
- default=sys.stdout)
- group_decrypt.set_defaults(func=do_group_decrypt)
-
- export_inbound_group = commands.add_parser(
- "export_inbound_group",
- help="Export the keys for an inbound group session",
- )
- export_inbound_group.add_argument(
- "session_file", help="Local inbound group session file",
- )
- export_inbound_group.add_argument(
- "export_file", help="File to export to (default stdout)",
- type=argparse.FileType('w'), nargs='?',
- default=sys.stdout,
- )
- export_inbound_group.add_argument(
- "--message_index",
- help=("Index to export session at. Defaults to the earliest known " +
- "index"),
- type=int,
- )
- export_inbound_group.set_defaults(func=do_export_inbound_group)
-
- ed25519_verify = commands.add_parser("ed25519_verify",
- help="Verify an ed25519 signature")
- ed25519_verify.add_argument(
- "signing_key",
- help="Public signing key used to create the signature"
- )
- ed25519_verify.add_argument("signature",
- help="Signature to be verified")
- ed25519_verify.add_argument("message_file",
- help="Message file (default stdin)",
- type=argparse.FileType('r'), nargs='?',
- default=sys.stdin)
- ed25519_verify.set_defaults(func=do_verify_ed25519_signature)
- return parser
-
-
-def do_outbound_group(args):
- if os.path.exists(args.session_file):
- sys.stderr.write("Session %r file already exists" % (
- args.session_file,
- ))
- sys.exit(1)
- session = OutboundGroupSession()
- with open(args.session_file, "wb") as f:
- f.write(session.pickle(args.key))
-
-
-def do_group_encrypt(args):
- session = OutboundGroupSession()
- session.unpickle(args.key, read_base64_file(args.session_file))
- plaintext = args.plaintext_file.read()
- message = session.encrypt(plaintext)
- with open(args.session_file, "wb") as f:
- f.write(session.pickle(args.key))
- args.message_file.write(message)
-
-
-def do_group_credentials(args):
- session = OutboundGroupSession()
- session.unpickle(args.key, read_base64_file(args.session_file))
- result = {
- 'message_index': session.message_index(),
- 'session_key': session.session_key(),
- }
- json.dump(result, args.credentials_file, indent=4)
-
-
-def do_inbound_group(args):
- if os.path.exists(args.session_file):
- sys.stderr.write("Session %r file already exists\n" % (
- args.session_file,
- ))
- sys.exit(1)
- credentials = json.load(args.credentials_file)
- for k in ('session_key', ):
- if k not in credentials:
- sys.stderr.write("Credentials file is missing %s\n" % k)
- sys.exit(1)
-
- session = InboundGroupSession()
- session.init(credentials['session_key'])
- with open(args.session_file, "wb") as f:
- f.write(session.pickle(args.key))
-
-
-def do_import_inbound_group(args):
- if os.path.exists(args.session_file):
- sys.stderr.write("Session %r file already exists\n" % (
- args.session_file,
- ))
- sys.exit(1)
- data = args.export_file.read().translate(None, "\r\n")
-
- session = InboundGroupSession()
- session.import_session(data)
- with open(args.session_file, "wb") as f:
- f.write(session.pickle(args.key))
-
-
-def do_group_decrypt(args):
- session = InboundGroupSession()
- session.unpickle(args.key, read_base64_file(args.session_file))
- message = args.message_file.read()
- plaintext, message_index = session.decrypt(message)
- with open(args.session_file, "wb") as f:
- f.write(session.pickle(args.key))
- args.plaintext_file.write(plaintext)
-
-
-def do_export_inbound_group(args):
- session = InboundGroupSession()
- session.unpickle(args.key, read_base64_file(args.session_file))
- index = args.message_index
- if index is None:
- # default to first known index
- index = session.first_known_index()
- args.export_file.write(session.export_session(index))
-
-
-def do_verify_ed25519_signature(args):
- message = args.message_file.read()
- ed25519_verify(args.signing_key, message, args.signature)
-
-
-if __name__ == '__main__':
- parser = build_arg_parser()
- args = parser.parse_args()
- args.func(args)
diff --git a/python/olm/__version__.py b/python/olm/__version__.py
new file mode 100644
index 0000000..dccfdd0
--- /dev/null
+++ b/python/olm/__version__.py
@@ -0,0 +1,9 @@
+__title__ = "python-olm"
+__description__ = ("python CFFI bindings for the olm "
+ "cryptographic ratchet library")
+__url__ = "https://github.com/poljar/python-olm"
+__version__ = "0.1"
+__author__ = "Damir Jelić"
+__author_email__ = "poljar@termina.org.uk"
+__license__ = "Apache 2.0"
+__copyright__ = "Copyright 2018 Damir Jelić"
diff --git a/python/olm/_base.py b/python/olm/_base.py
deleted file mode 100644
index ad21d6f..0000000
--- a/python/olm/_base.py
+++ /dev/null
@@ -1,17 +0,0 @@
-import os.path
-
-from ctypes import *
-
-
-lib = cdll.LoadLibrary(os.path.join(
- os.path.dirname(__file__), "..", "..", "build", "libolm.so.2")
-)
-
-lib.olm_error.argtypes = []
-lib.olm_error.restypes = c_size_t
-
-ERR = lib.olm_error()
-
-
-class OlmError(Exception):
- pass
diff --git a/python/olm/_compat.py b/python/olm/_compat.py
new file mode 100644
index 0000000..67d312b
--- /dev/null
+++ b/python/olm/_compat.py
@@ -0,0 +1,24 @@
+# -*- coding: utf-8 -*-
+# libolm python bindings
+# Copyright © 2015-2017 OpenMarket Ltd
+# Copyright © 2018 Damir Jelić <poljar@termina.org.uk>
+
+from builtins import bytes, str
+from typing import AnyStr
+
+try:
+ import secrets
+ URANDOM = secrets.token_bytes # pragma: no cover
+except ImportError: # pragma: no cover
+ from os import urandom
+ URANDOM = urandom # type: ignore
+
+
+def to_bytes(string):
+ # type: (AnyStr) -> bytes
+ if isinstance(string, bytes):
+ return string
+ elif isinstance(string, str):
+ return bytes(string, "utf-8")
+
+ raise TypeError("Invalid type {}".format(type(string)))
diff --git a/python/olm/_finalize.py b/python/olm/_finalize.py
new file mode 100644
index 0000000..9f467bc
--- /dev/null
+++ b/python/olm/_finalize.py
@@ -0,0 +1,65 @@
+# The MIT License (MIT)
+# Copyright (c) 2010 Benjamin Peterson <benjamin@python.org>
+
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
+# OR OTHER DEALINGS IN THE SOFTWARE.
+
+"""Finalization with weakrefs
+
+This is designed for avoiding __del__.
+"""
+from __future__ import print_function
+
+import sys
+import traceback
+import weakref
+
+__author__ = "Benjamin Peterson <benjamin@python.org>"
+
+
+class OwnerRef(weakref.ref):
+ """A simple weakref.ref subclass, so attributes can be added."""
+ pass
+
+
+def _run_finalizer(ref):
+ """Internal weakref callback to run finalizers"""
+ del _finalize_refs[id(ref)]
+ finalizer = ref.finalizer
+ item = ref.item
+ try:
+ finalizer(item)
+ except Exception: # pragma: no cover
+ print("Exception running {}:".format(finalizer), file=sys.stderr)
+ traceback.print_exc()
+
+
+_finalize_refs = {}
+
+
+def track_for_finalization(owner, item, finalizer):
+ """Register an object for finalization.
+
+ ``owner`` is the the object which is responsible for ``item``.
+ ``finalizer`` will be called with ``item`` as its only argument when
+ ``owner`` is destroyed by the garbage collector.
+ """
+ ref = OwnerRef(owner, _run_finalizer)
+ ref.item = item
+ ref.finalizer = finalizer
+ _finalize_refs[id(ref)] = ref
diff --git a/python/olm/account.py b/python/olm/account.py
index b103a51..5705fe3 100644
--- a/python/olm/account.py
+++ b/python/olm/account.py
@@ -1,136 +1,239 @@
+# -*- coding: utf-8 -*-
+# libolm python bindings
+# Copyright © 2015-2017 OpenMarket Ltd
+# Copyright © 2018 Damir Jelić <poljar@termina.org.uk>
+"""libolm Account module.
+
+This module contains the account part of the Olm library. It contains a single
+Account class which handles the creation of new accounts as well as the storing
+and restoring of them.
+
+Examples:
+ >>> acc = Account()
+ >>> account.identity_keys()
+ >>> account.generate_one_time_keys(1)
+
+"""
+
import json
-from os import urandom
-
-from ._base import *
-
-lib.olm_account_size.argtypes = []
-lib.olm_account_size.restype = c_size_t
-
-lib.olm_account.argtypes = [c_void_p]
-lib.olm_account.restype = c_void_p
-
-lib.olm_account_last_error.argtypes = [c_void_p]
-lib.olm_account_last_error.restype = c_char_p
-
-
-def account_errcheck(res, func, args):
- if res == ERR:
- raise OlmError("%s: %s" % (
- func.__name__, lib.olm_account_last_error(args[0])
- ))
- return res
-
-
-def account_function(func, *types):
- func.argtypes = (c_void_p,) + types
- func.restypes = c_size_t
- func.errcheck = account_errcheck
-
-
-account_function(
- lib.olm_pickle_account, c_void_p, c_size_t, c_void_p, c_size_t
-)
-account_function(
- lib.olm_unpickle_account, c_void_p, c_size_t, c_void_p, c_size_t
-)
-account_function(lib.olm_create_account_random_length)
-account_function(lib.olm_create_account, c_void_p, c_size_t)
-account_function(lib.olm_account_identity_keys_length)
-account_function(lib.olm_account_identity_keys, c_void_p, c_size_t)
-account_function(lib.olm_account_signature_length)
-account_function(lib.olm_account_sign, c_void_p, c_size_t, c_void_p, c_size_t)
-account_function(lib.olm_account_one_time_keys_length)
-account_function(lib.olm_account_one_time_keys, c_void_p, c_size_t)
-account_function(lib.olm_account_mark_keys_as_published)
-account_function(lib.olm_account_max_number_of_one_time_keys)
-account_function(lib.olm_pickle_account_length)
-account_function(
- lib.olm_account_generate_one_time_keys_random_length,
- c_size_t
-)
-account_function(
- lib.olm_account_generate_one_time_keys,
- c_size_t,
- c_void_p, c_size_t
-)
-account_function(
- lib.olm_remove_one_time_keys,
- c_void_p # Session
-)
+# pylint: disable=redefined-builtin,unused-import
+from builtins import bytes, super
+from typing import AnyStr, Dict, Optional, Type
+
+from future.utils import bytes_to_native_str
+
+# pylint: disable=no-name-in-module
+from _libolm import ffi, lib # type: ignore
+
+from ._compat import URANDOM, to_bytes
+from ._finalize import track_for_finalization
+
+# This is imported only for type checking purposes
+if False:
+ from .session import Session # pragma: no cover
+
+
+def _clear_account(account):
+ # type: (ffi.cdata) -> None
+ lib.olm_clear_account(account)
+
+
+class OlmAccountError(Exception):
+ """libolm Account error exception."""
class Account(object):
- def __init__(self):
- self.buf = create_string_buffer(lib.olm_account_size())
- self.ptr = lib.olm_account(self.buf)
-
- def create(self):
- random_length = lib.olm_create_account_random_length(self.ptr)
- random = urandom(random_length)
- random_buffer = create_string_buffer(random)
- lib.olm_create_account(self.ptr, random_buffer, random_length)
-
- def pickle(self, key):
- key_buffer = create_string_buffer(key)
- pickle_length = lib.olm_pickle_account_length(self.ptr)
- pickle_buffer = create_string_buffer(pickle_length)
- lib.olm_pickle_account(
- self.ptr, key_buffer, len(key), pickle_buffer, pickle_length
- )
- return pickle_buffer.raw
-
- def unpickle(self, key, pickle):
- key_buffer = create_string_buffer(key)
- pickle_buffer = create_string_buffer(pickle)
- lib.olm_unpickle_account(
- self.ptr, key_buffer, len(key), pickle_buffer, len(pickle)
- )
+ """libolm Account class."""
+ def __new__(cls):
+ # type: (Type[Account]) -> Account
+ obj = super().__new__(cls)
+ obj._buf = ffi.new("char[]", lib.olm_account_size())
+ obj._account = lib.olm_account(obj._buf)
+ track_for_finalization(obj, obj._account, _clear_account)
+ return obj
+
+ def __init__(self):
+ # type: () -> None
+ """Create a new Olm account.
+
+ Creates a new account and its matching identity key pair.
+
+ Raises OlmAccountError on failure. If there weren't enough random bytes
+ for the account creation the error message for the exception will be
+ NOT_ENOUGH_RANDOM.
+ """
+ # This is needed to silence mypy not knowing the type of _account.
+ # There has to be a better way for this.
+ if False: # pragma: no cover
+ self._account = self._account # type: ffi.cdata
+
+ random_length = lib.olm_create_account_random_length(self._account)
+ random = URANDOM(random_length)
+ random_buffer = ffi.new("char[]", random)
+
+ self._check_error(
+ lib.olm_create_account(self._account, random_buffer,
+ random_length))
+
+ def _check_error(self, ret):
+ # type: (int) -> None
+ if ret != lib.olm_error():
+ return
+
+ last_error = bytes_to_native_str(
+ ffi.string((lib.olm_account_last_error(self._account))))
+
+ raise OlmAccountError(last_error)
+
+ def pickle(self, passphrase=""):
+ # type: (Optional[str]) -> bytes
+ """Store an Olm account.
+
+ Stores an account as a base64 string. Encrypts the account using the
+ supplied passphrase. Returns a byte object containing the base64
+ encoded string of the pickled account. Raises OlmAccountError on
+ failure.
+
+ Args:
+ passphrase(str, optional): The passphrase to be used to encrypt
+ the account.
+ """
+ byte_key = bytes(passphrase, "utf-8") if passphrase else b""
+ key_buffer = ffi.new("char[]", byte_key)
+
+ pickle_length = lib.olm_pickle_account_length(self._account)
+ pickle_buffer = ffi.new("char[]", pickle_length)
+
+ self._check_error(
+ lib.olm_pickle_account(self._account, key_buffer, len(byte_key),
+ pickle_buffer, pickle_length))
+ return ffi.unpack(pickle_buffer, pickle_length)
+
+ @classmethod
+ def from_pickle(cls, pickle, passphrase=""):
+ # type: (bytes, Optional[str]) -> Account
+ """Load a previously stored olm account.
+
+ Loads an account from a pickled base64-encoded string and returns an
+ Account object. Decrypts the account using the supplied passphrase.
+ Raises OlmAccountError on failure. If the passphrase doesn't match the
+ one used to encrypt the account then the error message for the
+ exception will be "BAD_ACCOUNT_KEY". If the base64 couldn't be decoded
+ then the error message will be "INVALID_BASE64".
+
+ Args:
+ pickle(bytes): Base64 encoded byte string containing the pickled
+ account
+ passphrase(str, optional): The passphrase used to encrypt the
+ account.
+ """
+ if not pickle:
+ raise ValueError("Pickle can't be empty")
+
+ byte_key = bytes(passphrase, "utf-8") if passphrase else b""
+ key_buffer = ffi.new("char[]", byte_key)
+ pickle_buffer = ffi.new("char[]", pickle)
+
+ obj = cls.__new__(cls)
+
+ ret = lib.olm_unpickle_account(obj._account, key_buffer, len(byte_key),
+ pickle_buffer, len(pickle))
+ obj._check_error(ret)
+
+ return obj
+
+ @property
def identity_keys(self):
- out_length = lib.olm_account_identity_keys_length(self.ptr)
- out_buffer = create_string_buffer(out_length)
- lib.olm_account_identity_keys(
- self.ptr,
- out_buffer, out_length
- )
- return json.loads(out_buffer.raw)
+ # type: () -> Dict[str, str]
+ """dict: Public part of the identity keys of the account."""
+ out_length = lib.olm_account_identity_keys_length(self._account)
+ out_buffer = ffi.new("char[]", out_length)
+
+ self._check_error(
+ lib.olm_account_identity_keys(self._account, out_buffer,
+ out_length))
+ return json.loads(ffi.unpack(out_buffer, out_length).decode("utf-8"))
def sign(self, message):
- out_length = lib.olm_account_signature_length(self.ptr)
- message_buffer = create_string_buffer(message)
- out_buffer = create_string_buffer(out_length)
- lib.olm_account_sign(
- self.ptr, message_buffer, len(message), out_buffer, out_length
- )
- return out_buffer.raw
+ # type: (AnyStr) -> str
+ """Signs a message with this account.
- def one_time_keys(self):
- out_length = lib.olm_account_one_time_keys_length(self.ptr)
- out_buffer = create_string_buffer(out_length)
- lib.olm_account_one_time_keys(self.ptr, out_buffer, out_length)
- return json.loads(out_buffer.raw)
+ Signs a message with the private ed25519 identity key of this account.
+ Returns the signature.
+ Raises OlmAccountError on failure.
- def mark_keys_as_published(self):
- lib.olm_account_mark_keys_as_published(self.ptr)
+ Args:
+ message(str): The message to sign.
+ """
+ bytes_message = to_bytes(message)
+ out_length = lib.olm_account_signature_length(self._account)
+ message_buffer = ffi.new("char[]", bytes_message)
+ out_buffer = ffi.new("char[]", out_length)
+
+ self._check_error(
+ lib.olm_account_sign(self._account, message_buffer,
+ len(bytes_message), out_buffer, out_length))
+
+ return bytes_to_native_str(ffi.unpack(out_buffer, out_length))
- def max_number_of_one_time_keys(self):
- return lib.olm_account_max_number_of_one_time_keys(self.ptr)
+ @property
+ def max_one_time_keys(self):
+ # type: () -> int
+ """int: The maximum number of one-time keys the account can store."""
+ return lib.olm_account_max_number_of_one_time_keys(self._account)
+
+ def mark_keys_as_published(self):
+ # type: () -> None
+ """Mark the current set of one-time keys as being published."""
+ lib.olm_account_mark_keys_as_published(self._account)
def generate_one_time_keys(self, count):
+ # type: (int) -> None
+ """Generate a number of new one-time keys.
+
+ If the total number of keys stored by this account exceeds
+ max_one_time_keys() then the old keys are discarded.
+ Raises OlmAccountError on error. If the number of random bytes is
+ too small then the error message of the exception will be
+ NOT_ENOUGH_RANDOM.
+
+ Args:
+ count(int): The number of keys to generate.
+ """
random_length = lib.olm_account_generate_one_time_keys_random_length(
- self.ptr, count
- )
- random = urandom(random_length)
- random_buffer = create_string_buffer(random)
- lib.olm_account_generate_one_time_keys(
- self.ptr, count, random_buffer, random_length
- )
+ self._account, count)
+ random = URANDOM(random_length)
+ random_buffer = ffi.new("char[]", random)
+ self._check_error(
+ lib.olm_account_generate_one_time_keys(
+ self._account, count, random_buffer, random_length))
+
+ @property
+ def one_time_keys(self):
+ # type: () -> Dict[str, Dict[str, str]]
+ """dict: The public part of the one-time keys for this account."""
+ out_length = lib.olm_account_one_time_keys_length(self._account)
+ out_buffer = ffi.new("char[]", out_length)
- def remove_one_time_keys(self, session):
- lib.olm_remove_one_time_keys(
- self.ptr,
- session.ptr
- )
+ self._check_error(
+ lib.olm_account_one_time_keys(self._account, out_buffer,
+ out_length))
+
+ return json.loads(ffi.unpack(out_buffer, out_length).decode("utf-8"))
- def clear(self):
- pass
+ def remove_one_time_keys(self, session):
+ # type: (Session) -> None
+ """Remove used one-time keys.
+
+ Removes the one-time keys that the session used from the account.
+ Raises OlmAccountError on failure. If the account doesn't have any
+ matching one-time keys then the error message of the exception will be
+ "BAD_MESSAGE_KEY_ID".
+
+ Args:
+ session(Session): An Olm Session object that was created with this
+ account.
+ """
+ self._check_error(lib.olm_remove_one_time_keys(self._account,
+ session._session))
diff --git a/python/olm/group_session.py b/python/olm/group_session.py
new file mode 100644
index 0000000..6b13c60
--- /dev/null
+++ b/python/olm/group_session.py
@@ -0,0 +1,467 @@
+# -*- coding: utf-8 -*-
+# libolm python bindings
+# Copyright © 2015-2017 OpenMarket Ltd
+# Copyright © 2018 Damir Jelić <poljar@termina.org.uk>
+"""libolm Group session module.
+
+This module contains the group session part of the Olm library. It contains two
+classes for creating inbound and outbound group sessions.
+
+Examples:
+ >>> outbound = OutboundGroupSession()
+ >>> InboundGroupSession(outbound.session_key)
+"""
+
+# pylint: disable=redefined-builtin,unused-import
+from builtins import bytes, super
+from typing import AnyStr, Optional, Tuple, Type
+
+from future.utils import bytes_to_native_str
+
+# pylint: disable=no-name-in-module
+from _libolm import ffi, lib # type: ignore
+
+from ._compat import URANDOM, to_bytes
+from ._finalize import track_for_finalization
+
+
+def _clear_inbound_group_session(session):
+ # type: (ffi.cdata) -> None
+ lib.olm_clear_inbound_group_session(session)
+
+
+def _clear_outbound_group_session(session):
+ # type: (ffi.cdata) -> None
+ lib.olm_clear_outbound_group_session(session)
+
+
+class OlmGroupSessionError(Exception):
+ """libolm Group session error exception."""
+
+
+class InboundGroupSession(object):
+ """Inbound group session for encrypted multiuser communication."""
+
+ def __new__(
+ cls, # type: Type[InboundGroupSession]
+ session_key=None # type: Optional[str]
+ ):
+ # type: (...) -> InboundGroupSession
+ obj = super().__new__(cls)
+ obj._buf = ffi.new("char[]", lib.olm_inbound_group_session_size())
+ obj._session = lib.olm_inbound_group_session(obj._buf)
+ track_for_finalization(obj, obj._session, _clear_inbound_group_session)
+ return obj
+
+ def __init__(self, session_key):
+ # type: (AnyStr) -> None
+ """Create a new inbound group session.
+ Start a new inbound group session, from a key exported from
+ an outbound group session.
+
+ Raises OlmGroupSessionError on failure. The error message of the
+ exception will be "OLM_INVALID_BASE64" if the session key is not valid
+ base64 and "OLM_BAD_SESSION_KEY" if the session key is invalid.
+ """
+ if False: # pragma: no cover
+ self._session = self._session # type: ffi.cdata
+
+ byte_session_key = to_bytes(session_key)
+
+ key_buffer = ffi.new("char[]", byte_session_key)
+ ret = lib.olm_init_inbound_group_session(
+ self._session, key_buffer, len(byte_session_key)
+ )
+ self._check_error(ret)
+
+ def pickle(self, passphrase=""):
+ # type: (Optional[str]) -> bytes
+ """Store an inbound group session.
+
+ Stores a group session as a base64 string. Encrypts the session using
+ the supplied passphrase. Returns a byte object containing the base64
+ encoded string of the pickled session.
+
+ Args:
+ passphrase(str, optional): The passphrase to be used to encrypt
+ the session.
+ """
+ byte_passphrase = bytes(passphrase, "utf-8") if passphrase else b""
+
+ passphrase_buffer = ffi.new("char[]", byte_passphrase)
+ pickle_length = lib.olm_pickle_inbound_group_session_length(
+ self._session)
+ pickle_buffer = ffi.new("char[]", pickle_length)
+
+ ret = lib.olm_pickle_inbound_group_session(
+ self._session, passphrase_buffer, len(byte_passphrase),
+ pickle_buffer, pickle_length
+ )
+
+ self._check_error(ret)
+
+ return ffi.unpack(pickle_buffer, pickle_length)
+
+ @classmethod
+ def from_pickle(cls, pickle, passphrase=""):
+ # type: (bytes, Optional[str]) -> InboundGroupSession
+ """Load a previously stored inbound group session.
+
+ Loads an inbound group session from a pickled base64 string and returns
+ an InboundGroupSession object. Decrypts the session using the supplied
+ passphrase. Raises OlmSessionError on failure. If the passphrase
+ doesn't match the one used to encrypt the session then the error
+ message for the exception will be "BAD_ACCOUNT_KEY". If the base64
+ couldn't be decoded then the error message will be "INVALID_BASE64".
+
+ Args:
+ pickle(bytes): Base64 encoded byte string containing the pickled
+ session
+ passphrase(str, optional): The passphrase used to encrypt the
+ session
+ """
+ if not pickle:
+ raise ValueError("Pickle can't be empty")
+
+ byte_passphrase = bytes(passphrase, "utf-8") if passphrase else b""
+ passphrase_buffer = ffi.new("char[]", byte_passphrase)
+ pickle_buffer = ffi.new("char[]", pickle)
+
+ obj = cls.__new__(cls)
+
+ ret = lib.olm_unpickle_inbound_group_session(
+ obj._session,
+ passphrase_buffer,
+ len(byte_passphrase),
+ pickle_buffer,
+ len(pickle)
+ )
+ obj._check_error(ret)
+
+ return obj
+
+ def _check_error(self, ret):
+ # type: (int) -> None
+ if ret != lib.olm_error():
+ return
+
+ last_error = bytes_to_native_str(ffi.string(
+ lib.olm_inbound_group_session_last_error(self._session)))
+
+ raise OlmGroupSessionError(last_error)
+
+ def decrypt(self, ciphertext):
+ # type: (AnyStr) -> Tuple[str, int]
+ """Decrypt a message
+
+ Returns a tuple of the decrypted plain-text and the message index of
+ the decrypted message or raises OlmGroupSessionError on failure.
+ On failure the error message of the exception will be:
+
+ * OLM_INVALID_BASE64 if the message is not valid base64
+ * OLM_BAD_MESSAGE_VERSION if the message was encrypted with an
+ unsupported version of the protocol
+ * OLM_BAD_MESSAGE_FORMAT if the message headers could not be
+ decoded
+ * OLM_BAD_MESSAGE_MAC if the message could not be verified
+ * OLM_UNKNOWN_MESSAGE_INDEX if we do not have a session key
+ corresponding to the message's index (i.e., it was sent before
+ the session key was shared with us)
+
+ Args:
+ ciphertext(str): Base64 encoded ciphertext containing the encrypted
+ message
+ """
+ if not ciphertext:
+ raise ValueError("Ciphertext can't be empty.")
+
+ byte_ciphertext = to_bytes(ciphertext)
+
+ ciphertext_buffer = ffi.new("char[]", byte_ciphertext)
+
+ max_plaintext_length = lib.olm_group_decrypt_max_plaintext_length(
+ self._session, ciphertext_buffer, len(byte_ciphertext)
+ )
+ plaintext_buffer = ffi.new("char[]", max_plaintext_length)
+ ciphertext_buffer = ffi.new("char[]", byte_ciphertext)
+
+ message_index = ffi.new("uint32_t*")
+ plaintext_length = lib.olm_group_decrypt(
+ self._session, ciphertext_buffer, len(byte_ciphertext),
+ plaintext_buffer, max_plaintext_length,
+ message_index
+ )
+
+ self._check_error(plaintext_length)
+
+ return bytes_to_native_str(ffi.unpack(
+ plaintext_buffer,
+ plaintext_length
+ )), message_index[0]
+
+ @property
+ def id(self):
+ # type: () -> str
+ """str: A base64 encoded identifier for this session."""
+ id_length = lib.olm_inbound_group_session_id_length(self._session)
+ id_buffer = ffi.new("char[]", id_length)
+ ret = lib.olm_inbound_group_session_id(
+ self._session,
+ id_buffer,
+ id_length
+ )
+ self._check_error(ret)
+ return bytes_to_native_str(ffi.unpack(id_buffer, id_length))
+
+ @property
+ def first_known_index(self):
+ # type: () -> int
+ """int: The first message index we know how to decrypt."""
+ return lib.olm_inbound_group_session_first_known_index(self._session)
+
+ def export_session(self, message_index):
+ # type: (int) -> str
+ """Export an inbound group session
+
+ Export the base64-encoded ratchet key for this session, at the given
+ index, in a format which can be used by import_session().
+
+ Raises OlmGroupSessionError on failure. The error message for the
+ exception will be:
+
+ * OLM_UNKNOWN_MESSAGE_INDEX if we do not have a session key
+ corresponding to the given index (ie, it was sent before the
+ session key was shared with us)
+
+ Args:
+ message_index(int): The message index at which the session should
+ be exported.
+ """
+
+ export_length = lib.olm_export_inbound_group_session_length(
+ self._session)
+
+ export_buffer = ffi.new("char[]", export_length)
+ ret = lib.olm_export_inbound_group_session(
+ self._session,
+ export_buffer,
+ export_length,
+ message_index
+ )
+ self._check_error(ret)
+ return bytes_to_native_str(ffi.unpack(export_buffer, export_length))
+
+ @classmethod
+ def import_session(cls, session_key):
+ # type: (AnyStr) -> InboundGroupSession
+ """Create an InboundGroupSession from an exported session key.
+
+ Creates an InboundGroupSession with an previously exported session key,
+ raises OlmGroupSessionError on failure. The error message for the
+ exception will be:
+
+ * OLM_INVALID_BASE64 if the session_key is not valid base64
+ * OLM_BAD_SESSION_KEY if the session_key is invalid
+
+ Args:
+ session_key(str): The exported session key with which the inbound
+ group session will be created
+ """
+ obj = cls.__new__(cls)
+
+ byte_session_key = to_bytes(session_key)
+
+ key_buffer = ffi.new("char[]", byte_session_key)
+ ret = lib.olm_import_inbound_group_session(
+ obj._session,
+ key_buffer,
+ len(byte_session_key)
+ )
+ obj._check_error(ret)
+
+ return obj
+
+
+class OutboundGroupSession(object):
+ """Outbound group session for encrypted multiuser communication."""
+
+ def __new__(cls):
+ # type: (Type[OutboundGroupSession]) -> OutboundGroupSession
+ obj = super().__new__(cls)
+ obj._buf = ffi.new("char[]", lib.olm_outbound_group_session_size())
+ obj._session = lib.olm_outbound_group_session(obj._buf)
+ track_for_finalization(
+ obj,
+ obj._session,
+ _clear_outbound_group_session
+ )
+ return obj
+
+ def __init__(self):
+ # type: () -> None
+ """Create a new outbound group session.
+
+ Start a new outbound group session. Raises OlmGroupSessionError on
+ failure. If there weren't enough random bytes for the session creation
+ the error message for the exception will be NOT_ENOUGH_RANDOM.
+ """
+ if False: # pragma: no cover
+ self._session = self._session # type: ffi.cdata
+
+ random_length = lib.olm_init_outbound_group_session_random_length(
+ self._session
+ )
+ random = URANDOM(random_length)
+ random_buffer = ffi.new("char[]", random)
+
+ ret = lib.olm_init_outbound_group_session(
+ self._session, random_buffer, random_length
+ )
+ self._check_error(ret)
+
+ def _check_error(self, ret):
+ # type: (int) -> None
+ if ret != lib.olm_error():
+ return
+
+ last_error = bytes_to_native_str(ffi.string(
+ lib.olm_outbound_group_session_last_error(self._session)
+ ))
+
+ raise OlmGroupSessionError(last_error)
+
+ def pickle(self, passphrase=""):
+ # type: (Optional[str]) -> bytes
+ """Store an outbound group session.
+
+ Stores a group session as a base64 string. Encrypts the session using
+ the supplied passphrase. Returns a byte object containing the base64
+ encoded string of the pickled session.
+
+ Args:
+ passphrase(str, optional): The passphrase to be used to encrypt
+ the session.
+ """
+ byte_passphrase = bytes(passphrase, "utf-8") if passphrase else b""
+ passphrase_buffer = ffi.new("char[]", byte_passphrase)
+ pickle_length = lib.olm_pickle_outbound_group_session_length(
+ self._session)
+ pickle_buffer = ffi.new("char[]", pickle_length)
+
+ ret = lib.olm_pickle_outbound_group_session(
+ self._session, passphrase_buffer, len(byte_passphrase),
+ pickle_buffer, pickle_length
+ )
+ self._check_error(ret)
+ return ffi.unpack(pickle_buffer, pickle_length)
+
+ @classmethod
+ def from_pickle(cls, pickle, passphrase=""):
+ # type: (bytes, Optional[str]) -> OutboundGroupSession
+ """Load a previously stored outbound group session.
+
+ Loads an outbound group session from a pickled base64 string and
+ returns an OutboundGroupSession object. Decrypts the session using the
+ supplied passphrase. Raises OlmSessionError on failure. If the
+ passphrase doesn't match the one used to encrypt the session then the
+ error message for the exception will be "BAD_ACCOUNT_KEY". If the
+ base64 couldn't be decoded then the error message will be
+ "INVALID_BASE64".
+
+ Args:
+ pickle(bytes): Base64 encoded byte string containing the pickled
+ session
+ passphrase(str, optional): The passphrase used to encrypt the
+ """
+ if not pickle:
+ raise ValueError("Pickle can't be empty")
+
+ byte_passphrase = bytes(passphrase, "utf-8") if passphrase else b""
+ passphrase_buffer = ffi.new("char[]", byte_passphrase)
+ pickle_buffer = ffi.new("char[]", pickle)
+
+ obj = cls.__new__(cls)
+
+ ret = lib.olm_unpickle_outbound_group_session(
+ obj._session,
+ passphrase_buffer,
+ len(byte_passphrase),
+ pickle_buffer,
+ len(pickle)
+ )
+ obj._check_error(ret)
+
+ return obj
+
+ def encrypt(self, plaintext):
+ # type: (AnyStr) -> str
+ """Encrypt a message.
+
+ Returns the encrypted ciphertext.
+
+ Args:
+ plaintext(str): A string that will be encrypted using the group
+ session.
+ """
+ byte_plaintext = to_bytes(plaintext)
+ message_length = lib.olm_group_encrypt_message_length(
+ self._session, len(byte_plaintext)
+ )
+
+ message_buffer = ffi.new("char[]", message_length)
+
+ plaintext_buffer = ffi.new("char[]", byte_plaintext)
+
+ ret = lib.olm_group_encrypt(
+ self._session,
+ plaintext_buffer, len(byte_plaintext),
+ message_buffer, message_length,
+ )
+ self._check_error(ret)
+ return bytes_to_native_str(ffi.unpack(message_buffer, message_length))
+
+ @property
+ def id(self):
+ # type: () -> str
+ """str: A base64 encoded identifier for this session."""
+ id_length = lib.olm_outbound_group_session_id_length(self._session)
+ id_buffer = ffi.new("char[]", id_length)
+
+ ret = lib.olm_outbound_group_session_id(
+ self._session,
+ id_buffer,
+ id_length
+ )
+ self._check_error(ret)
+
+ return bytes_to_native_str(ffi.unpack(id_buffer, id_length))
+
+ @property
+ def message_index(self):
+ # type: () -> int
+ """int: The current message index of the session.
+
+ Each message is encrypted with an increasing index. This is the index
+ for the next message.
+ """
+ return lib.olm_outbound_group_session_message_index(self._session)
+
+ @property
+ def session_key(self):
+ # type: () -> str
+ """The base64-encoded current ratchet key for this session.
+
+ Each message is encrypted with a different ratchet key. This function
+ returns the ratchet key that will be used for the next message.
+ """
+ key_length = lib.olm_outbound_group_session_key_length(self._session)
+ key_buffer = ffi.new("char[]", key_length)
+
+ ret = lib.olm_outbound_group_session_key(
+ self._session,
+ key_buffer,
+ key_length
+ )
+ self._check_error(ret)
+
+ return bytes_to_native_str(ffi.unpack(key_buffer, key_length))
diff --git a/python/olm/inbound_group_session.py b/python/olm/inbound_group_session.py
deleted file mode 100644
index 286aedb..0000000
--- a/python/olm/inbound_group_session.py
+++ /dev/null
@@ -1,138 +0,0 @@
-import json
-
-from ._base import *
-
-lib.olm_inbound_group_session_size.argtypes = []
-lib.olm_inbound_group_session_size.restype = c_size_t
-
-lib.olm_inbound_group_session.argtypes = [c_void_p]
-lib.olm_inbound_group_session.restype = c_void_p
-
-lib.olm_inbound_group_session_last_error.argtypes = [c_void_p]
-lib.olm_inbound_group_session_last_error.restype = c_char_p
-
-
-def inbound_group_session_errcheck(res, func, args):
- if res == ERR:
- raise OlmError("%s: %s" % (
- func.__name__, lib.olm_inbound_group_session_last_error(args[0])
- ))
- return res
-
-
-def inbound_group_session_function(func, *types):
- func.argtypes = (c_void_p,) + types
- func.restypes = c_size_t
- func.errcheck = inbound_group_session_errcheck
-
-
-inbound_group_session_function(
- lib.olm_pickle_inbound_group_session,
- c_void_p, c_size_t, c_void_p, c_size_t,
-)
-inbound_group_session_function(
- lib.olm_unpickle_inbound_group_session,
- c_void_p, c_size_t, c_void_p, c_size_t,
-)
-
-inbound_group_session_function(
- lib.olm_init_inbound_group_session, c_void_p, c_size_t
-)
-
-inbound_group_session_function(
- lib.olm_import_inbound_group_session, c_void_p, c_size_t
-)
-
-inbound_group_session_function(
- lib.olm_group_decrypt_max_plaintext_length, c_void_p, c_size_t
-)
-inbound_group_session_function(
- lib.olm_group_decrypt,
- c_void_p, c_size_t, # message
- c_void_p, c_size_t, # plaintext
- POINTER(c_uint32), # message_index
-)
-
-inbound_group_session_function(
- lib.olm_inbound_group_session_id_length,
-)
-inbound_group_session_function(
- lib.olm_inbound_group_session_id,
- c_void_p, c_size_t,
-)
-
-lib.olm_inbound_group_session_first_known_index.argtypes = (c_void_p,)
-lib.olm_inbound_group_session_first_known_index.restypes = c_uint32
-
-inbound_group_session_function(
- lib.olm_export_inbound_group_session_length,
-)
-inbound_group_session_function(
- lib.olm_export_inbound_group_session, c_void_p, c_size_t, c_uint32,
-)
-
-
-class InboundGroupSession(object):
- def __init__(self):
- self.buf = create_string_buffer(lib.olm_inbound_group_session_size())
- self.ptr = lib.olm_inbound_group_session(self.buf)
-
- def pickle(self, key):
- key_buffer = create_string_buffer(key)
- pickle_length = lib.olm_pickle_inbound_group_session_length(self.ptr)
- pickle_buffer = create_string_buffer(pickle_length)
- lib.olm_pickle_inbound_group_session(
- self.ptr, key_buffer, len(key), pickle_buffer, pickle_length
- )
- return pickle_buffer.raw
-
- def unpickle(self, key, pickle):
- key_buffer = create_string_buffer(key)
- pickle_buffer = create_string_buffer(pickle)
- lib.olm_unpickle_inbound_group_session(
- self.ptr, key_buffer, len(key), pickle_buffer, len(pickle)
- )
-
- def init(self, session_key):
- key_buffer = create_string_buffer(session_key)
- lib.olm_init_inbound_group_session(
- self.ptr, key_buffer, len(session_key)
- )
-
- def import_session(self, session_key):
- key_buffer = create_string_buffer(session_key)
- lib.olm_import_inbound_group_session(
- self.ptr, key_buffer, len(session_key)
- )
-
- def decrypt(self, message):
- message_buffer = create_string_buffer(message)
- max_plaintext_length = lib.olm_group_decrypt_max_plaintext_length(
- self.ptr, message_buffer, len(message)
- )
- plaintext_buffer = create_string_buffer(max_plaintext_length)
- message_buffer = create_string_buffer(message)
-
- message_index = c_uint32()
- plaintext_length = lib.olm_group_decrypt(
- self.ptr, message_buffer, len(message),
- plaintext_buffer, max_plaintext_length,
- byref(message_index)
- )
- return plaintext_buffer.raw[:plaintext_length], message_index.value
-
- def session_id(self):
- id_length = lib.olm_inbound_group_session_id_length(self.ptr)
- id_buffer = create_string_buffer(id_length)
- lib.olm_inbound_group_session_id(self.ptr, id_buffer, id_length)
- return id_buffer.raw
-
- def first_known_index(self):
- return lib.olm_inbound_group_session_first_known_index(self.ptr)
-
- def export_session(self, message_index):
- length = lib.olm_export_inbound_group_session_length(self.ptr)
- buffer = create_string_buffer(length)
- lib.olm_export_inbound_group_session(self.ptr, buffer, length,
- message_index)
- return buffer.raw
diff --git a/python/olm/outbound_group_session.py b/python/olm/outbound_group_session.py
deleted file mode 100644
index 5032b34..0000000
--- a/python/olm/outbound_group_session.py
+++ /dev/null
@@ -1,134 +0,0 @@
-import json
-from os import urandom
-
-from ._base import *
-
-lib.olm_outbound_group_session_size.argtypes = []
-lib.olm_outbound_group_session_size.restype = c_size_t
-
-lib.olm_outbound_group_session.argtypes = [c_void_p]
-lib.olm_outbound_group_session.restype = c_void_p
-
-lib.olm_outbound_group_session_last_error.argtypes = [c_void_p]
-lib.olm_outbound_group_session_last_error.restype = c_char_p
-
-
-def outbound_group_session_errcheck(res, func, args):
- if res == ERR:
- raise OlmError("%s: %s" % (
- func.__name__, lib.olm_outbound_group_session_last_error(args[0])
- ))
- return res
-
-
-def outbound_group_session_function(func, *types):
- func.argtypes = (c_void_p,) + types
- func.restypes = c_size_t
- func.errcheck = outbound_group_session_errcheck
-
-
-outbound_group_session_function(
- lib.olm_pickle_outbound_group_session,
- c_void_p, c_size_t, c_void_p, c_size_t,
-)
-outbound_group_session_function(
- lib.olm_unpickle_outbound_group_session,
- c_void_p, c_size_t, c_void_p, c_size_t,
-)
-
-outbound_group_session_function(
- lib.olm_init_outbound_group_session_random_length,
-)
-outbound_group_session_function(
- lib.olm_init_outbound_group_session,
- c_void_p, c_size_t,
-)
-
-lib.olm_outbound_group_session_message_index.argtypes = [c_void_p]
-lib.olm_outbound_group_session_message_index.restype = c_uint32
-
-outbound_group_session_function(
- lib.olm_group_encrypt_message_length,
- c_size_t,
-)
-outbound_group_session_function(
- lib.olm_group_encrypt,
- c_void_p, c_size_t, # Plaintext
- c_void_p, c_size_t, # Message
-)
-
-outbound_group_session_function(
- lib.olm_outbound_group_session_id_length,
-)
-outbound_group_session_function(
- lib.olm_outbound_group_session_id,
- c_void_p, c_size_t,
-)
-outbound_group_session_function(
- lib.olm_outbound_group_session_key_length,
-)
-outbound_group_session_function(
- lib.olm_outbound_group_session_key,
- c_void_p, c_size_t,
-)
-
-
-class OutboundGroupSession(object):
- def __init__(self):
- self.buf = create_string_buffer(lib.olm_outbound_group_session_size())
- self.ptr = lib.olm_outbound_group_session(self.buf)
-
- random_length = lib.olm_init_outbound_group_session_random_length(
- self.ptr
- )
- random = urandom(random_length)
- random_buffer = create_string_buffer(random)
- lib.olm_init_outbound_group_session(
- self.ptr, random_buffer, random_length
- )
-
- def pickle(self, key):
- key_buffer = create_string_buffer(key)
- pickle_length = lib.olm_pickle_outbound_group_session_length(self.ptr)
- pickle_buffer = create_string_buffer(pickle_length)
- lib.olm_pickle_outbound_group_session(
- self.ptr, key_buffer, len(key), pickle_buffer, pickle_length
- )
- return pickle_buffer.raw
-
- def unpickle(self, key, pickle):
- key_buffer = create_string_buffer(key)
- pickle_buffer = create_string_buffer(pickle)
- lib.olm_unpickle_outbound_group_session(
- self.ptr, key_buffer, len(key), pickle_buffer, len(pickle)
- )
-
- def encrypt(self, plaintext):
- message_length = lib.olm_group_encrypt_message_length(
- self.ptr, len(plaintext)
- )
- message_buffer = create_string_buffer(message_length)
-
- plaintext_buffer = create_string_buffer(plaintext)
-
- lib.olm_group_encrypt(
- self.ptr,
- plaintext_buffer, len(plaintext),
- message_buffer, message_length,
- )
- return message_buffer.raw
-
- def session_id(self):
- id_length = lib.olm_outbound_group_session_id_length(self.ptr)
- id_buffer = create_string_buffer(id_length)
- lib.olm_outbound_group_session_id(self.ptr, id_buffer, id_length)
- return id_buffer.raw
-
- def message_index(self):
- return lib.olm_outbound_group_session_message_index(self.ptr)
-
- def session_key(self):
- key_length = lib.olm_outbound_group_session_key_length(self.ptr)
- key_buffer = create_string_buffer(key_length)
- lib.olm_outbound_group_session_key(self.ptr, key_buffer, key_length)
- return key_buffer.raw
diff --git a/python/olm/session.py b/python/olm/session.py
index 019ea9e..b7b2c4b 100644
--- a/python/olm/session.py
+++ b/python/olm/session.py
@@ -1,204 +1,452 @@
-from os import urandom
+# -*- coding: utf-8 -*-
+# libolm python bindings
+# Copyright © 2015-2017 OpenMarket Ltd
+# Copyright © 2018 Damir Jelić <poljar@termina.org.uk>
+"""libolm Session module.
-from ._base import *
+This module contains the Olm Session part of the Olm library.
+It is used to establish a peer-to-peer encrypted communication channel between
+two Olm accounts.
-lib.olm_session_size.argtypes = []
-lib.olm_session_size.restype = c_size_t
+Examples:
+ >>> alice = Account()
+ >>> bob = Account()
+ >>> bob.generate_one_time_keys(1)
+ >>> id_key = bob.identity_keys['curve25519']
+ >>> one_time = list(bob.one_time_keys["curve25519"].values())[0]
+ >>> session = OutboundSession(alice, id_key, one_time)
-lib.olm_session.argtypes = [c_void_p]
-lib.olm_session.restype = c_void_p
+"""
-lib.olm_session_last_error.argtypes = [c_void_p]
-lib.olm_session_last_error.restype = c_char_p
+# pylint: disable=redefined-builtin,unused-import
+from builtins import bytes, super
+from typing import AnyStr, Optional, Type
+from future.utils import bytes_to_native_str
-def session_errcheck(res, func, args):
- if res == ERR:
- raise OlmError("%s: %s" % (
- func.__name__, lib.olm_session_last_error(args[0])
- ))
- return res
-
-
-def session_function(func, *types):
- func.argtypes = (c_void_p,) + types
- func.restypes = c_size_t
- func.errcheck = session_errcheck
-
-session_function(lib.olm_session_last_error)
-session_function(
- lib.olm_pickle_session, c_void_p, c_size_t, c_void_p, c_size_t
-)
-session_function(
- lib.olm_unpickle_session, c_void_p, c_size_t, c_void_p, c_size_t
-)
-session_function(lib.olm_create_outbound_session_random_length)
-session_function(
- lib.olm_create_outbound_session,
- c_void_p, # Account
- c_void_p, c_size_t, # Identity Key
- c_void_p, c_size_t, # One Time Key
- c_void_p, c_size_t, # Random
-)
-session_function(
- lib.olm_create_inbound_session,
- c_void_p, # Account
- c_void_p, c_size_t, # Pre Key Message
-)
-session_function(
- lib.olm_create_inbound_session_from,
- c_void_p, # Account
- c_void_p, c_size_t, # Identity Key
- c_void_p, c_size_t, # Pre Key Message
-)
-session_function(lib.olm_session_id_length)
-session_function(lib.olm_session_id, c_void_p, c_size_t)
-session_function(lib.olm_matches_inbound_session, c_void_p, c_size_t)
-session_function(
- lib.olm_matches_inbound_session_from,
- c_void_p, c_size_t, # Identity Key
- c_void_p, c_size_t, # Pre Key Message
-)
-session_function(lib.olm_pickle_session_length)
-session_function(lib.olm_encrypt_message_type)
-session_function(lib.olm_encrypt_random_length)
-session_function(lib.olm_encrypt_message_length, c_size_t)
-session_function(
- lib.olm_encrypt,
- c_void_p, c_size_t, # Plaintext
- c_void_p, c_size_t, # Random
- c_void_p, c_size_t, # Message
-)
-session_function(
- lib.olm_decrypt_max_plaintext_length,
- c_size_t, # Message Type
- c_void_p, c_size_t, # Message
-)
-session_function(
- lib.olm_decrypt,
- c_size_t, # Message Type
- c_void_p, c_size_t, # Message
- c_void_p, c_size_t, # Plaintext
-)
+# pylint: disable=no-name-in-module
+from _libolm import ffi, lib # type: ignore
+
+from ._compat import URANDOM, to_bytes
+from ._finalize import track_for_finalization
+
+# This is imported only for type checking purposes
+if False:
+ from .account import Account # pragma: no cover
+
+
+class OlmSessionError(Exception):
+ """libolm Session exception."""
+
+
+class _OlmMessage(object):
+ def __init__(self, ciphertext, message_type):
+ # type: (AnyStr, ffi.cdata) -> None
+ if not ciphertext:
+ raise ValueError("Ciphertext can't be empty")
+
+ # I don't know why mypy wants a type annotation here nor why AnyStr
+ # doesn't work
+ self.ciphertext = ciphertext # type: ignore
+ self.message_type = message_type
+
+ def __str__(self):
+ # type: () -> str
+ type_to_prefix = {
+ lib.OLM_MESSAGE_TYPE_PRE_KEY: "PRE_KEY",
+ lib.OLM_MESSAGE_TYPE_MESSAGE: "MESSAGE"
+ }
+
+ prefix = type_to_prefix[self.message_type]
+ return "{} {}".format(prefix, self.ciphertext)
+
+
+class OlmPreKeyMessage(_OlmMessage):
+ """Olm prekey message class
+
+ Prekey messages are used to establish an Olm session. After the first
+ message exchange the session switches to normal messages
+ """
+
+ def __init__(self, ciphertext):
+ # type: (AnyStr) -> None
+ """Create a new Olm prekey message with the supplied ciphertext
+
+ Args:
+ ciphertext(str): The ciphertext of the prekey message.
+ """
+ _OlmMessage.__init__(self, ciphertext, lib.OLM_MESSAGE_TYPE_PRE_KEY)
+
+ def __repr__(self):
+ # type: () -> str
+ return "OlmPreKeyMessage({})".format(self.ciphertext)
+
+
+class OlmMessage(_OlmMessage):
+ """Olm message class"""
+
+ def __init__(self, ciphertext):
+ # type: (AnyStr) -> None
+ """Create a new Olm message with the supplied ciphertext
+
+ Args:
+ ciphertext(str): The ciphertext of the message.
+ """
+ _OlmMessage.__init__(self, ciphertext, lib.OLM_MESSAGE_TYPE_MESSAGE)
+
+ def __repr__(self):
+ # type: () -> str
+ return "OlmMessage({})".format(self.ciphertext)
+
+
+def _clear_session(session):
+ # type: (ffi.cdata) -> None
+ lib.olm_clear_session(session)
class Session(object):
+ """libolm Session class.
+ This is an abstract class that can't be instantiated except when unpickling
+ a previously pickled InboundSession or OutboundSession object with
+ from_pickle.
+ """
+
+ def __new__(cls):
+ # type: (Type[Session]) -> Session
+
+ obj = super().__new__(cls)
+ obj._buf = ffi.new("char[]", lib.olm_session_size())
+ obj._session = lib.olm_session(obj._buf)
+ track_for_finalization(obj, obj._session, _clear_session)
+ return obj
+
def __init__(self):
- self.buf = create_string_buffer(lib.olm_session_size())
- self.ptr = lib.olm_session(self.buf)
-
- def pickle(self, key):
- key_buffer = create_string_buffer(key)
- pickle_length = lib.olm_pickle_session_length(self.ptr)
- pickle_buffer = create_string_buffer(pickle_length)
- lib.olm_pickle_session(
- self.ptr, key_buffer, len(key), pickle_buffer, pickle_length
- )
- return pickle_buffer.raw
+ # type: () -> None
+ if type(self) is Session:
+ raise TypeError("Session class may not be instantiated.")
- def unpickle(self, key, pickle):
- key_buffer = create_string_buffer(key)
- pickle_buffer = create_string_buffer(pickle)
- lib.olm_unpickle_session(
- self.ptr, key_buffer, len(key), pickle_buffer, len(pickle)
- )
+ if False:
+ self._session = self._session # type: ffi.cdata
- def create_outbound(self, account, identity_key, one_time_key):
- r_length = lib.olm_create_outbound_session_random_length(self.ptr)
- random = urandom(r_length)
- random_buffer = create_string_buffer(random)
- identity_key_buffer = create_string_buffer(identity_key)
- one_time_key_buffer = create_string_buffer(one_time_key)
- lib.olm_create_outbound_session(
- self.ptr,
- account.ptr,
- identity_key_buffer, len(identity_key),
- one_time_key_buffer, len(one_time_key),
- random_buffer, r_length
- )
+ def _check_error(self, ret):
+ # type: (int) -> None
+ if ret != lib.olm_error():
+ return
- def create_inbound(self, account, one_time_key_message):
- one_time_key_message_buffer = create_string_buffer(
- one_time_key_message
- )
- lib.olm_create_inbound_session(
- self.ptr,
- account.ptr,
- one_time_key_message_buffer, len(one_time_key_message)
- )
+ last_error = bytes_to_native_str(
+ ffi.string(lib.olm_session_last_error(self._session)))
- def create_inbound_from(self, account, identity_key, one_time_key_message):
- identity_key_buffer = create_string_buffer(identity_key)
- one_time_key_message_buffer = create_string_buffer(
- one_time_key_message
- )
- lib.olm_create_inbound_session_from(
- self.ptr,
- account.ptr,
- identity_key_buffer, len(identity_key),
- one_time_key_message_buffer, len(one_time_key_message)
- )
+ raise OlmSessionError(last_error)
- def session_id(self):
- id_length = lib.olm_session_id_length(self.ptr)
- id_buffer = create_string_buffer(id_length)
- lib.olm_session_id(self.ptr, id_buffer, id_length)
- return id_buffer.raw
+ def pickle(self, passphrase=""):
+ # type: (Optional[str]) -> bytes
+ """Store an Olm session.
- def matches_inbound(self, one_time_key_message):
- one_time_key_message_buffer = create_string_buffer(
- one_time_key_message,
- )
- return bool(lib.olm_matches_inbound_session(
- self.ptr,
- one_time_key_message_buffer, len(one_time_key_message)
- ))
+ Stores a session as a base64 string. Encrypts the session using the
+ supplied passphrase. Returns a byte object containing the base64
+ encoded string of the pickled session. Raises OlmSessionError on
+ failure.
- def matches_inbound_from(self, identity_key, one_time_key_message):
- identity_key_buffer = create_string_buffer(identity_key)
- one_time_key_message_buffer = create_string_buffer(
- one_time_key_message,
- )
- return bool(lib.olm_matches_inbound_session(
- self.ptr,
- identity_key_buffer, len(identity_key),
- one_time_key_message_buffer, len(one_time_key_message)
- ))
+ Args:
+ passphrase(str, optional): The passphrase to be used to encrypt
+ the session.
+ """
+ byte_key = bytes(passphrase, "utf-8") if passphrase else b""
+ key_buffer = ffi.new("char[]", byte_key)
+
+ pickle_length = lib.olm_pickle_session_length(self._session)
+ pickle_buffer = ffi.new("char[]", pickle_length)
+
+ self._check_error(
+ lib.olm_pickle_session(self._session, key_buffer, len(byte_key),
+ pickle_buffer, pickle_length))
+ return ffi.unpack(pickle_buffer, pickle_length)
+
+ @classmethod
+ def from_pickle(cls, pickle, passphrase=""):
+ # type: (bytes, Optional[str]) -> Session
+ """Load a previously stored Olm session.
+
+ Loads a session from a pickled base64 string and returns a Session
+ object. Decrypts the session using the supplied passphrase. Raises
+ OlmSessionError on failure. If the passphrase doesn't match the one
+ used to encrypt the session then the error message for the
+ exception will be "BAD_ACCOUNT_KEY". If the base64 couldn't be decoded
+ then the error message will be "INVALID_BASE64".
+
+ Args:
+ pickle(bytes): Base64 encoded byte string containing the pickled
+ session
+ passphrase(str, optional): The passphrase used to encrypt the
+ session.
+ """
+ if not pickle:
+ raise ValueError("Pickle can't be empty")
+
+ byte_key = bytes(passphrase, "utf-8") if passphrase else b""
+ key_buffer = ffi.new("char[]", byte_key)
+ pickle_buffer = ffi.new("char[]", pickle)
+
+ session = cls.__new__(cls)
+
+ ret = lib.olm_unpickle_session(session._session, key_buffer,
+ len(byte_key), pickle_buffer,
+ len(pickle))
+ session._check_error(ret)
+
+ return session
def encrypt(self, plaintext):
- r_length = lib.olm_encrypt_random_length(self.ptr)
- random = urandom(r_length)
- random_buffer = create_string_buffer(random)
+ # type: (AnyStr) -> _OlmMessage
+ """Encrypts a message using the session. Returns the ciphertext as a
+ base64 encoded string on success. Raises OlmSessionError on failure. If
+ there weren't enough random bytes to encrypt the message the error
+ message for the exception will be NOT_ENOUGH_RANDOM.
- message_type = lib.olm_encrypt_message_type(self.ptr)
- message_length = lib.olm_encrypt_message_length(
- self.ptr, len(plaintext)
+ Args:
+ plaintext(str): The plaintext message that will be encrypted.
+ """
+ byte_plaintext = to_bytes(plaintext)
+
+ r_length = lib.olm_encrypt_random_length(self._session)
+ random = URANDOM(r_length)
+ random_buffer = ffi.new("char[]", random)
+
+ message_type = lib.olm_encrypt_message_type(self._session)
+
+ self._check_error(message_type)
+
+ ciphertext_length = lib.olm_encrypt_message_length(
+ self._session, len(plaintext)
)
- message_buffer = create_string_buffer(message_length)
+ ciphertext_buffer = ffi.new("char[]", ciphertext_length)
- plaintext_buffer = create_string_buffer(plaintext)
+ plaintext_buffer = ffi.new("char[]", byte_plaintext)
- lib.olm_encrypt(
- self.ptr,
- plaintext_buffer, len(plaintext),
+ self._check_error(lib.olm_encrypt(
+ self._session,
+ plaintext_buffer, len(byte_plaintext),
random_buffer, r_length,
- message_buffer, message_length,
- )
- return message_type, message_buffer.raw
+ ciphertext_buffer, ciphertext_length,
+ ))
+
+ if message_type == lib.OLM_MESSAGE_TYPE_PRE_KEY:
+ return OlmPreKeyMessage(
+ bytes_to_native_str(ffi.unpack(
+ ciphertext_buffer,
+ ciphertext_length
+ )))
+ elif message_type == lib.OLM_MESSAGE_TYPE_MESSAGE:
+ return OlmMessage(
+ bytes_to_native_str(ffi.unpack(
+ ciphertext_buffer,
+ ciphertext_length
+ )))
+ else: # pragma: no cover
+ raise ValueError("Unknown message type")
+
+ def decrypt(self, message):
+ # type: (_OlmMessage) -> str
+ """Decrypts a message using the session. Returns the plaintext string
+ on success. Raises OlmSessionError on failure. If the base64 couldn't
+ be decoded then the error message will be "INVALID_BASE64". If the
+ message is for an unsupported version of the protocol the error message
+ will be "BAD_MESSAGE_VERSION". If the message couldn't be decoded then
+ the error message will be "BAD_MESSAGE_FORMAT". If the MAC on the
+ message was invalid then the error message will be "BAD_MESSAGE_MAC".
+
+ Args:
+ message(OlmMessage): The Olm message that will be decrypted. It can
+ be either an OlmPreKeyMessage or an OlmMessage.
+ """
+ if not message.ciphertext:
+ raise ValueError("Ciphertext can't be empty")
+
+ byte_ciphertext = to_bytes(message.ciphertext)
+ ciphertext_buffer = ffi.new("char[]", byte_ciphertext)
- def decrypt(self, message_type, message):
- message_buffer = create_string_buffer(message)
max_plaintext_length = lib.olm_decrypt_max_plaintext_length(
- self.ptr, message_type, message_buffer, len(message)
+ self._session, message.message_type, ciphertext_buffer,
+ len(byte_ciphertext)
)
- plaintext_buffer = create_string_buffer(max_plaintext_length)
- message_buffer = create_string_buffer(message)
+ plaintext_buffer = ffi.new("char[]", max_plaintext_length)
+ ciphertext_buffer = ffi.new("char[]", byte_ciphertext)
plaintext_length = lib.olm_decrypt(
- self.ptr, message_type, message_buffer, len(message),
- plaintext_buffer, max_plaintext_length
+ self._session, message.message_type, ciphertext_buffer,
+ len(byte_ciphertext), plaintext_buffer, max_plaintext_length
)
- return plaintext_buffer.raw[:plaintext_length]
+ self._check_error(plaintext_length)
+ return bytes_to_native_str(
+ ffi.unpack(plaintext_buffer, plaintext_length))
+
+ @property
+ def id(self):
+ # type: () -> str
+ """str: An identifier for this session. Will be the same for both
+ ends of the conversation.
+ """
+ id_length = lib.olm_session_id_length(self._session)
+ id_buffer = ffi.new("char[]", id_length)
+
+ self._check_error(
+ lib.olm_session_id(self._session, id_buffer, id_length)
+ )
+ return bytes_to_native_str(ffi.unpack(id_buffer, id_length))
+
+ def matches(self, message, identity_key=None):
+ # type: (OlmPreKeyMessage, Optional[AnyStr]) -> bool
+ """Checks if the PRE_KEY message is for this in-bound session.
+ This can happen if multiple messages are sent to this session before
+ this session sends a message in reply. Returns True if the session
+ matches. Returns False if the session does not match. Raises
+ OlmSessionError on failure. If the base64 couldn't be decoded then the
+ error message will be "INVALID_BASE64". If the message was for an
+ unsupported protocol version then the error message will be
+ "BAD_MESSAGE_VERSION". If the message couldn't be decoded then then the
+ error message will be * "BAD_MESSAGE_FORMAT".
+
+ Args:
+ message(OlmPreKeyMessage): The Olm prekey message that will checked
+ if it is intended for this session.
+ identity_key(str, optional): The identity key of the sender. To
+ check if the message was also sent using this identity key.
+ """
+ if not isinstance(message, OlmPreKeyMessage):
+ raise TypeError("Matches can only be called with prekey messages.")
+
+ if not message.ciphertext:
+ raise ValueError("Ciphertext can't be empty")
+
+ ret = None
+
+ byte_ciphertext = to_bytes(message.ciphertext)
+
+ message_buffer = ffi.new("char[]", byte_ciphertext)
+
+ if identity_key:
+ byte_id_key = to_bytes(identity_key)
+ identity_key_buffer = ffi.new("char[]", byte_id_key)
+
+ ret = lib.olm_matches_inbound_session_from(
+ self._session,
+ identity_key_buffer, len(byte_id_key),
+ message_buffer, len(byte_ciphertext)
+ )
+
+ else:
+ ret = lib.olm_matches_inbound_session(
+ self._session,
+ message_buffer, len(byte_ciphertext))
+
+ self._check_error(ret)
- def clear(self):
- pass
+ return bool(ret)
+
+
+class InboundSession(Session):
+ """Inbound Olm session for p2p encrypted communication.
+ """
+
+ def __new__(cls, account, message, identity_key=None):
+ # type: (Account, OlmPreKeyMessage, Optional[AnyStr]) -> Session
+ return super().__new__(cls)
+
+ def __init__(self, account, message, identity_key=None):
+ # type: (Account, OlmPreKeyMessage, Optional[AnyStr]) -> None
+ """Create a new inbound Olm session.
+
+ Create a new in-bound session for sending/receiving messages from an
+ incoming prekey message. Raises OlmSessionError on failure. If the
+ base64 couldn't be decoded then error message will be "INVALID_BASE64".
+ If the message was for an unsupported protocol version then
+ the errror message will be "BAD_MESSAGE_VERSION". If the message
+ couldn't be decoded then then the error message will be
+ "BAD_MESSAGE_FORMAT". If the message refers to an unknown one-time
+ key then the error message will be "BAD_MESSAGE_KEY_ID".
+
+ Args:
+ account(Account): The Olm Account that will be used to create this
+ session.
+ message(OlmPreKeyMessage): The Olm prekey message that will checked
+ that will be used to create this session.
+ identity_key(str, optional): The identity key of the sender. To
+ check if the message was also sent using this identity key.
+ """
+ if not message.ciphertext:
+ raise ValueError("Ciphertext can't be empty")
+
+ super().__init__()
+ byte_ciphertext = to_bytes(message.ciphertext)
+ message_buffer = ffi.new("char[]", byte_ciphertext)
+
+ if identity_key:
+ byte_id_key = to_bytes(identity_key)
+ identity_key_buffer = ffi.new("char[]", byte_id_key)
+ self._check_error(lib.olm_create_inbound_session_from(
+ self._session,
+ account._account,
+ identity_key_buffer, len(byte_id_key),
+ message_buffer, len(byte_ciphertext)
+ ))
+ else:
+ self._check_error(lib.olm_create_inbound_session(
+ self._session,
+ account._account,
+ message_buffer, len(byte_ciphertext)
+ ))
+
+
+class OutboundSession(Session):
+ """Outbound Olm session for p2p encrypted communication."""
+
+ def __new__(cls, account, identity_key, one_time_key):
+ # type: (Account, AnyStr, AnyStr) -> Session
+ return super().__new__(cls)
+
+ def __init__(self, account, identity_key, one_time_key):
+ # type: (Account, AnyStr, AnyStr) -> None
+ """Create a new outbound Olm session.
+
+ Creates a new outbound session for sending messages to a given
+ identity key and one-time key.
+
+ Raises OlmSessionError on failure. If the keys couldn't be decoded as
+ base64 then the error message will be "INVALID_BASE64". If there
+ weren't enough random bytes for the session creation the error message
+ for the exception will be NOT_ENOUGH_RANDOM.
+
+ Args:
+ account(Account): The Olm Account that will be used to create this
+ session.
+ identity_key(str): The identity key of the person with whom we want
+ to start the session.
+ one_time_key(str): A one-time key from the person with whom we want
+ to start the session.
+ """
+ if not identity_key:
+ raise ValueError("Identity key can't be empty")
+
+ if not one_time_key:
+ raise ValueError("One-time key can't be empty")
+
+ super().__init__()
+
+ byte_id_key = to_bytes(identity_key)
+ byte_one_time = to_bytes(one_time_key)
+
+ session_random_length = lib.olm_create_outbound_session_random_length(
+ self._session)
+
+ random = URANDOM(session_random_length)
+ random_buffer = ffi.new("char[]", random)
+ identity_key_buffer = ffi.new("char[]", byte_id_key)
+ one_time_key_buffer = ffi.new("char[]", byte_one_time)
+
+ self._check_error(lib.olm_create_outbound_session(
+ self._session,
+ account._account,
+ identity_key_buffer, len(byte_id_key),
+ one_time_key_buffer, len(byte_one_time),
+ random_buffer, session_random_length
+ ))
diff --git a/python/olm/utility.py b/python/olm/utility.py
index dac0225..838cf3f 100644
--- a/python/olm/utility.py
+++ b/python/olm/utility.py
@@ -1,56 +1,91 @@
-from ._base import lib, c_void_p, c_size_t, c_char_p, \
- create_string_buffer, ERR, OlmError
+# -*- coding: utf-8 -*-
+# libolm python bindings
+# Copyright © 2015-2017 OpenMarket Ltd
+# Copyright © 2018 Damir Jelić <poljar@termina.org.uk>
+"""libolm Utility module.
-lib.olm_utility_size.argtypes = []
-lib.olm_utility_size.restype = c_size_t
+This module contains utilities for olm.
+It only contains the ed25519_verify function for signature verification.
-lib.olm_utility.argtypes = [c_void_p]
-lib.olm_utility.restype = c_void_p
+Examples:
+ >>> alice = Account()
-lib.olm_utility_last_error.argtypes = [c_void_p]
-lib.olm_utility_last_error.restype = c_char_p
+ >>> message = "Test"
+ >>> signature = alice.sign(message)
+ >>> signing_key = alice.identity_keys["ed25519"]
+ >>> ed25519_verify(signing_key, message, signature)
-def utility_errcheck(res, func, args):
- if res == ERR:
- raise OlmError("%s: %s" % (
- func.__name__, lib.olm_utility_last_error(args[0])
- ))
- return res
+"""
+# pylint: disable=redefined-builtin,unused-import
+from typing import AnyStr, Type
-def utility_function(func, *types):
- func.argtypes = (c_void_p,) + types
- func.restypes = c_size_t
- func.errcheck = utility_errcheck
+# pylint: disable=no-name-in-module
+from _libolm import ffi, lib # type: ignore
-utility_function(
- lib.olm_ed25519_verify,
- c_void_p, c_size_t, # key, key_length
- c_void_p, c_size_t, # message, message_length
- c_void_p, c_size_t, # signature, signature_length
-)
+from ._compat import to_bytes
+from ._finalize import track_for_finalization
-class Utility(object):
- def __init__(self):
- self.buf = create_string_buffer(lib.olm_utility_size())
- self.ptr = lib.olm_utility(self.buf)
+def _clear_utility(utility): # pragma: no cover
+ # type: (ffi.cdata) -> None
+ lib.olm_clear_utility(utility)
-_utility = None
+
+class OlmVerifyError(Exception):
+ """libolm signature verification exception."""
+
+
+class _Utility(object):
+ # pylint: disable=too-few-public-methods
+ """libolm Utility class."""
+
+ _buf = None
+ _utility = None
+
+ @classmethod
+ def _allocate(cls):
+ # type: (Type[_Utility]) -> None
+ cls._buf = ffi.new("char[]", lib.olm_utility_size())
+ cls._utility = lib.olm_utility(cls._buf)
+ track_for_finalization(cls, cls._utility, _clear_utility)
+
+ @classmethod
+ def _check_error(cls, ret):
+ # type: (int) -> None
+ if ret != lib.olm_error():
+ return
+
+ raise OlmVerifyError("{}".format(
+ ffi.string(lib.olm_utility_last_error(
+ cls._utility)).decode("utf-8")))
+
+ @classmethod
+ def _ed25519_verify(cls, key, message, signature):
+ # type: (Type[_Utility], AnyStr, AnyStr, AnyStr) -> None
+ if not cls._utility:
+ cls._allocate()
+
+ byte_key = to_bytes(key)
+ byte_message = to_bytes(message)
+ byte_signature = to_bytes(signature)
+
+ cls._check_error(
+ lib.olm_ed25519_verify(cls._utility, byte_key, len(byte_key),
+ byte_message, len(byte_message),
+ byte_signature, len(byte_signature)))
def ed25519_verify(key, message, signature):
- """ Verify an ed25519 signature. Raises an OlmError if verification fails.
+ # type: (AnyStr, AnyStr, AnyStr) -> None
+ """Verify an ed25519 signature.
+
+ Raises an OlmVerifyError if verification fails.
+
Args:
- key(bytes): The ed25519 public key used for signing.
- message(bytes): The signed message.
+ key(str): The ed25519 public key used for signing.
+ message(str): The signed message.
signature(bytes): The message signature.
"""
- global _utility
- if not _utility:
- _utility = Utility()
- lib.olm_ed25519_verify(_utility.ptr,
- key, len(key),
- message, len(message),
- signature, len(signature))
+ return _Utility._ed25519_verify(key, message, signature)