|
5 | 5 | # Licensed under the MIT License. See License.txt in the project root for
|
6 | 6 | # license information.
|
7 | 7 | #--------------------------------------------------------------------------
|
| 8 | +import binascii |
| 9 | +import hashlib |
8 | 10 | import io
|
| 11 | +import json |
9 | 12 | import logging
|
10 | 13 | import time
|
11 | 14 | import unittest
|
12 | 15 |
|
13 | 16 | import requests
|
| 17 | +import six |
14 | 18 |
|
15 | 19 | import azure.mgmt.batch
|
16 | 20 | from azure.mgmt.batch import models
|
17 | 21 | import azure.mgmt.network.models
|
18 | 22 | from mgmt_batch_preparers import KeyVaultPreparer, SimpleBatchPreparer
|
19 | 23 |
|
| 24 | +from azure_devtools.scenario_tests.recording_processors import GeneralNameReplacer, RecordingProcessor |
20 | 25 | from devtools_testutils import (
|
21 | 26 | AzureMgmtTestCase,
|
22 | 27 | ResourceGroupPreparer,
|
|
32 | 37 | EXPECTED_DEDICATED_CORE_QUOTA = 500
|
33 | 38 | EXPECTED_LOW_PRIO_CORE_QUOTA = 500
|
34 | 39 | EXPECTED_POOL_QUOTA = 100
|
| 40 | +SECRET_FIELDS = ["primary", "secondary"] |
| 41 | + |
| 42 | + |
| 43 | +def get_redacted_key(key): |
| 44 | + redacted_value = "redacted" |
| 45 | + digest = hashlib.sha256(six.ensure_binary(key)).digest() |
| 46 | + redacted_value += six.ensure_str(binascii.hexlify(digest))[:6] |
| 47 | + return redacted_value |
| 48 | + |
| 49 | + |
| 50 | +class RecordingRedactor(RecordingProcessor): |
| 51 | + """Removes keys from test recordings""" |
| 52 | + |
| 53 | + def process_response(self, response): |
| 54 | + try: |
| 55 | + body = json.loads(response["body"]["string"]) |
| 56 | + except (KeyError, ValueError): |
| 57 | + return response |
| 58 | + |
| 59 | + for field in body: |
| 60 | + if field in SECRET_FIELDS: |
| 61 | + body[field] = get_redacted_key(body[field]) |
| 62 | + |
| 63 | + response["body"]["string"] = json.dumps(body) |
| 64 | + return response |
35 | 65 |
|
36 | 66 |
|
37 | 67 | class MgmtBatchTest(AzureMgmtTestCase):
|
38 | 68 |
|
| 69 | + def __init__(self, *args, **kwargs): |
| 70 | + scrubber = GeneralNameReplacer() |
| 71 | + redactor = RecordingRedactor() |
| 72 | + super(MgmtBatchTest, self).__init__(*args, recording_processors=[redactor, scrubber], **kwargs) |
| 73 | + |
39 | 74 | def setUp(self):
|
40 | 75 | super(MgmtBatchTest, self).setUp()
|
41 | 76 | self.mgmt_batch_client = self.create_mgmt_client(
|
|
0 commit comments