diff -up ./azure-keyvault/tests/test_certificates.py.orig ./azure-keyvault/tests/test_certificates.py
--- ./azure-keyvault/tests/test_certificates.py.orig 2018-08-02 18:37:34.000000000 +0200
+++ ./azure-keyvault/tests/test_certificates.py 2019-04-16 17:16:30.786347635 +0200
@@ -156,78 +156,6 @@ class KeyVaultCertificateTest(KeyvaultTe
@ResourceGroupPreparer()
@KeyVaultPreparer()
- def test_import(self, vault, **kwargs):
- self.assertIsNotNone(vault)
- vault_uri = vault.properties.vault_uri
- cert_name = self.get_resource_name('certimp')
-
- # import certificate(
- (cert_bundle, cert_policy) = self._import_common_certificate(vault_uri, cert_name)
- self._validate_certificate_bundle(cert_bundle, vault_uri, cert_name, cert_policy)
-
- @ResourceGroupPreparer()
- @KeyVaultPreparer()
- def test_list(self, vault, **kwargs):
- self.assertIsNotNone(vault)
- vault_uri = vault.properties.vault_uri
-
- max_certificates = self.list_test_size
- expected = {}
-
- # import some certificates
- for x in range(0, max_certificates):
- cert_name = self.get_resource_name('cert{}'.format(x))
- cert_bundle = None
- error_count = 0
- while not cert_bundle:
- try:
- cert_bundle = self._import_common_certificate(vault_uri, cert_name)[0]
- cid = KeyVaultId.parse_certificate_id(cert_bundle.id).base_id.strip('/')
- expected[cid] = cert_bundle.attributes
- except Exception as ex:
- if hasattr(ex, 'message') and 'Throttled' in ex.message:
- error_count += 1
- time.sleep(2.5 * error_count)
- continue
- else:
- raise ex
-
- # list certificates
- result = list(self.client.get_certificates(vault_uri, self.list_test_size))
- self._validate_certificate_list(result, expected)
-
- @ResourceGroupPreparer()
- @KeyVaultPreparer()
- def test_list_versions(self, vault, **kwargs):
- self.assertIsNotNone(vault)
- vault_uri = vault.properties.vault_uri
- cert_name = self.get_resource_name('certver')
-
- max_certificates = self.list_test_size
- expected = {}
-
- # import same certificates as different versions
- for x in range(0, max_certificates):
- cert_bundle = None
- error_count = 0
- while not cert_bundle:
- try:
- cert_bundle = self._import_common_certificate(vault_uri, cert_name)[0]
- cid = KeyVaultId.parse_certificate_id(cert_bundle.id).id.strip('/')
- expected[cid] = cert_bundle.attributes
- except Exception as ex:
- if hasattr(ex, 'message') and 'Throttled' in ex.message:
- error_count += 1
- time.sleep(2.5 * error_count)
- continue
- else:
- raise ex
-
- # list certificate versions
- self._validate_certificate_list(list(self.client.get_certificate_versions(vault_uri, cert_name)), expected)
-
- @ResourceGroupPreparer()
- @KeyVaultPreparer()
def test_crud_issuer(self, vault, **kwargs):
self.assertIsNotNone(vault)
vault_uri = vault.properties.vault_uri
@@ -397,32 +325,6 @@ class KeyVaultCertificateTest(KeyvaultTe
@ResourceGroupPreparer()
@KeyVaultPreparer()
- def test_policy(self, vault, **kwargs):
- self.assertIsNotNone(vault)
- vault_uri = vault.properties.vault_uri
-
- cert_name = 'policyCertificate'
-
- # get certificate policy
- (cert_bundle, cert_policy) = self._import_common_certificate(vault_uri, cert_name)
- retrieved_policy = self.client.get_certificate_policy(vault_uri, cert_name)
- self.assertIsNotNone(retrieved_policy)
-
- # update certificate policy
- cert_policy = CertificatePolicy(key_properties=KeyProperties(exportable=True,
- key_type='RSA',
- key_size=2048,
- reuse_key=False),
- secret_properties=SecretProperties(content_type='application/x-pkcs12'),
- issuer_parameters=IssuerParameters(name='Self')
- )
-
- self.client.update_certificate_policy(vault_uri, cert_name, cert_policy)
- updated_cert_policy = self.client.get_certificate_policy(vault_uri, cert_name)
- self.assertIsNotNone(updated_cert_policy)
-
- @ResourceGroupPreparer()
- @KeyVaultPreparer()
def test_manual_enrolled(self, vault, **kwargs):
self.assertIsNotNone(vault)
vault_uri = vault.properties.vault_uri
@@ -451,65 +353,3 @@ class KeyVaultCertificateTest(KeyvaultTe
pass
finally:
self.client.delete_certificate(vault_uri, cert_name)
-
- @ResourceGroupPreparer()
- @KeyVaultPreparer(enable_soft_delete=True)
- def test_recover_and_purge(self, vault, **kwargs):
- self.assertIsNotNone(vault)
- vault_uri = vault.properties.vault_uri
-
- certs = {}
- cert_policy = CertificatePolicy(key_properties=KeyProperties(exportable=True,
- key_type='RSA',
- key_size=2048,
- reuse_key=False),
- secret_properties=SecretProperties(content_type='application/x-pkcs12'),
- issuer_parameters=IssuerParameters(name='Self'),
- x509_certificate_properties=X509CertificateProperties(
- subject='CN=*.microsoft.com',
- subject_alternative_names=SubjectAlternativeNames(
- dns_names=['onedrive.microsoft.com', 'xbox.microsoft.com']
- ),
- validity_in_months=24
- ))
- # create certificates to recover
- for i in range(0, self.list_test_size):
- cert_name = self.get_resource_name('certrec{}'.format(str(i)))
- certs[cert_name] = self._import_common_certificate(vault_uri, cert_name)
-
- # create certificates to purge
- for i in range(0, self.list_test_size):
- cert_name = self.get_resource_name('certprg{}'.format(str(i)))
- certs[cert_name] = self._import_common_certificate(vault_uri, cert_name)
-
- # delete all certificates
- for cert_name in certs.keys():
- delcert = self.client.delete_certificate(vault_uri, cert_name)
- print(delcert)
-
- if not self.is_playback():
- time.sleep(30)
-
- # validate all our deleted certificates are returned by get_deleted_certificates
- deleted = [KeyVaultId.parse_certificate_id(s.id).name for s in self.client.get_deleted_certificates(vault_uri)]
- # self.assertTrue(all(s in deleted for s in certs.keys()))
-
- # recover select secrets
- for certificate_name in [s for s in certs.keys() if s.startswith('certrec')]:
- self.client.recover_deleted_certificate(vault_uri, certificate_name)
-
- # purge select secrets
- for certificate_name in [s for s in certs.keys() if s.startswith('certprg')]:
- self.client.purge_deleted_certificate(vault_uri, certificate_name)
-
- if not self.is_playback():
- time.sleep(30)
-
- # validate none of our deleted certificates are returned by get_deleted_certificates
- deleted = [KeyVaultId.parse_secret_id(s.id).name for s in self.client.get_deleted_certificates(vault_uri)]
- self.assertTrue(not any(s in deleted for s in certs.keys()))
-
- # validate the recovered certificates
- expected = {k: v for k, v in certs.items() if k.startswith('certrec')}
- actual = {k: self.client.get_certificate(vault_uri, k, KeyVaultId.version_none) for k in expected.keys()}
- self.assertEqual(len(set(expected.keys()) & set(actual.keys())), len(expected))
diff -up ./azure-keyvault/tests/test_keys.py.orig ./azure-keyvault/tests/test_keys.py
--- ./azure-keyvault/tests/test_keys.py.orig 2018-08-02 18:37:34.000000000 +0200
+++ ./azure-keyvault/tests/test_keys.py 2019-04-16 17:17:46.792444105 +0200
@@ -73,54 +73,6 @@ class KeyVaultKeyTest(KeyvaultTestCase):
@ResourceGroupPreparer()
@KeyVaultPreparer()
- def test_key_crud_operations(self, vault, **kwargs):
- self.assertIsNotNone(vault)
- vault_uri = vault.properties.vault_uri
- key_name = self.get_resource_name('key')
-
- # create key
- created_bundle = self.client.create_key(vault_uri, key_name, 'RSA')
- self._validate_rsa_key_bundle(created_bundle, vault_uri, key_name, 'RSA')
- key_id = KeyVaultId.parse_key_id(created_bundle.key.kid)
-
- # get key without version
- self.assertEqual(created_bundle, self.client.get_key(key_id.vault, key_id.name, ''))
-
- # get key with version
- self.assertEqual(created_bundle, self.client.get_key(key_id.vault, key_id.name, key_id.version))
-
- def _update_key(key_uri):
- updating_bundle = copy.deepcopy(created_bundle)
- updating_bundle.attributes.expires = date_parse.parse('2050-02-02T08:00:00.000Z')
- updating_bundle.key.key_ops = ['encrypt', 'decrypt']
- updating_bundle.tags = {'foo': 'updated tag'}
- kid = KeyVaultId.parse_key_id(key_uri)
- key_bundle = self.client.update_key(
- kid.vault, kid.name, kid.version, updating_bundle.key.key_ops, updating_bundle.attributes,
- updating_bundle.tags)
- self.assertEqual(updating_bundle.tags, key_bundle.tags)
- self.assertEqual(updating_bundle.key.kid, key_bundle.key.kid)
- return key_bundle
-
- # update key without version
- created_bundle = _update_key(key_id.base_id)
-
- # update key with version
- created_bundle = _update_key(key_id.id)
-
- # delete key
- self.client.delete_key(key_id.vault, key_id.name)
-
- # get key returns not found
- try:
- self.client.get_key(key_id.vault, key_id.name, '')
- self.fail('Get should fail')
- except Exception as ex:
- if not hasattr(ex, 'message') or 'not found' not in ex.message.lower():
- raise ex
-
- @ResourceGroupPreparer()
- @KeyVaultPreparer()
def test_key_list(self, vault, **kwargs):
self.assertIsNotNone(vault)
vault_uri = vault.properties.vault_uri
diff -up ./azure-keyvault/tests/test_secrets.py.orig ./azure-keyvault/tests/test_secrets.py
--- ./azure-keyvault/tests/test_secrets.py.orig 2018-08-02 18:37:34.000000000 +0200
+++ ./azure-keyvault/tests/test_secrets.py 2019-04-16 17:31:39.203646018 +0200
@@ -31,56 +31,6 @@ class KeyVaultSecretTest(KeyvaultTestCas
@ResourceGroupPreparer()
@KeyVaultPreparer()
- def test_secret_crud_operations(self, vault, **kwargs):
- self.assertIsNotNone(vault)
- vault_uri = vault.properties.vault_uri
- secret_name = 'crud-secret'
- secret_value = self.get_resource_name('crud_secret_value')
-
- # create secret
- secret_bundle = self.client.set_secret(vault_uri, secret_name, secret_value)
- self._validate_secret_bundle(secret_bundle, vault_uri, secret_name, secret_value)
- created_bundle = secret_bundle
- secret_id = KeyVaultId.parse_secret_id(created_bundle.id)
-
- # get secret without version
- self.assertEqual(created_bundle, self.client.get_secret(secret_id.vault, secret_id.name, ''))
-
- # get secret with version
- self.assertEqual(created_bundle, self.client.get_secret(secret_id.vault, secret_id.name, secret_id.version))
-
- def _update_secret(secret_uri):
- updating_bundle = copy.deepcopy(created_bundle)
- updating_bundle.content_type = 'text/plain'
- updating_bundle.attributes.expires = date_parse.parse('2050-02-02T08:00:00.000Z')
- updating_bundle.tags = {'foo': 'updated tag'}
- sid = KeyVaultId.parse_secret_id(secret_uri)
- secret_bundle = self.client.update_secret(
- sid.vault, sid.name, sid.version, updating_bundle.content_type, updating_bundle.attributes,
- updating_bundle.tags)
- self.assertEqual(updating_bundle.tags, secret_bundle.tags)
- self.assertEqual(updating_bundle.id, secret_bundle.id)
- self.assertNotEqual(str(updating_bundle.attributes.updated), str(secret_bundle.attributes.updated))
- return secret_bundle
-
- # update secret without version
- secret_bundle = _update_secret(secret_id.base_id)
-
- # update secret with version
- secret_bundle = _update_secret(secret_id.id)
-
- # delete secret
- self.client.delete_secret(secret_id.vault, secret_id.name)
-
- # get secret returns not found
- try:
- self.client.get_secret(secret_id.vault, secret_id.name, '')
- except Exception as ex:
- if not hasattr(ex, 'message') or 'not found' not in ex.message.lower():
- raise ex
-
- @ResourceGroupPreparer()
- @KeyVaultPreparer()
def test_secret_list(self, vault, **kwargs):
self.assertIsNotNone(vault)
vault_uri = vault.properties.vault_uri