-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgitguardian_mappers.py
More file actions
319 lines (270 loc) · 16.4 KB
/
gitguardian_mappers.py
File metadata and controls
319 lines (270 loc) · 16.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
# Copyright 2025 Cisco Systems, Inc. and its affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# SPDX-License-Identifier: Apache-2.0
"""
Mappers for converting GitHub Advanced Security secret types to GitGuardian detector formats.
"""
import base64
import json
import logging
from abc import ABC, abstractmethod
from typing import Any, cast
from urllib.parse import urlparse
import crypto_utils
from crypto_utils import strip_key_metadata, parse_base64_header
LOGGER = logging.getLogger(__name__)
# Global set for tracking base64 decoding errors
base64_errors = set()
class AbstractGitGuardianMapper(ABC):
"""
Abstract base class for mapping GitHub Advanced Security secrets to GitGuardian incidents
"""
@abstractmethod
def secret_present_in_row(self, ghas_secret: str, incident_row):
"""Returns true if the secret is present in the GitGuardian export"""
@abstractmethod
def get_detector_name(self) -> str:
"""Returns the detector name"""
class SimpleGitGuardianMapper(AbstractGitGuardianMapper):
"""
For secrets where the value matches exactly between GHAS and GitGuardian in
terms of values in the CSV.
"""
def __init__(self, detector_name: str, json_key: str, debug_log_secrets: bool):
self.detector_name = detector_name
self.json_key = json_key
self.debug_log_secrets = debug_log_secrets
def get_detector_name(self) -> str:
return self.detector_name
def secret_present_in_row(self, ghas_secret: str, incident_row):
"""Returns true if the secret is present in the GitGuardian dismissed CSV row"""
if self.detector_name == incident_row.detector_name:
row_secret = incident_row.matches.get(self.json_key)
if self.debug_log_secrets:
LOGGER.debug(f'Checking if {ghas_secret} == {row_secret}')
return ghas_secret == row_secret
return False
class SlackWebhookGitGuardianMapper(SimpleGitGuardianMapper):
"""
For Slack Webhook URLs because they lack 'https:// in GitGuardian
"""
def __init__(self, detector_name: str, json_key: str, debug_log_secrets: bool):
super().__init__(detector_name, json_key, debug_log_secrets)
def secret_present_in_row(self, ghas_secret: str, incident_row):
secret_without_scheme = ghas_secret.removeprefix('https://')
return super().secret_present_in_row(secret_without_scheme, incident_row)
class Base64KeyGitGuardianMapper(AbstractGitGuardianMapper):
"""
For 'rsa_private_key, 'openssh_private_key', and 'pgp_private_key' where some normalization
of the base64 encoded key is needed.
"""
def __init__(self, detector_name: str, json_key: str, debug_log_secrets: bool):
self.detector_name = detector_name
self.json_key = json_key
self.debug_log_secrets = debug_log_secrets
def get_detector_name(self) -> str:
return self.detector_name
def secret_present_in_row(self, ghas_secret: str, incident_row):
"""Returns true if the secret is present in the GitGuardian dismissed CSV row"""
alert_secret_decoded = strip_key_metadata(ghas_secret)
if self.detector_name == incident_row.detector_name:
cached_row_secret = incident_row.get_cached_secret()
if not cached_row_secret:
row_secret = incident_row.matches.get(self.json_key)
row_secret = cast(str, row_secret)
row_secret_decoded = strip_key_metadata(row_secret)
incident_row.store_cached_secret(row_secret_decoded)
else:
row_secret_decoded = cached_row_secret
if self.debug_log_secrets:
LOGGER.debug(f'Checking if {alert_secret_decoded} == {row_secret_decoded}')
return alert_secret_decoded == row_secret_decoded
return False
class Base64BasicAuthenticationGitGuardianMapper(AbstractGitGuardianMapper):
"""
For 'http_basic_authentication_header' where the value is a base64 encoded string
of the form 'username:password'.
"""
def __init__(self, detector_name: str, debug_log_secrets: bool):
self.detector_name = detector_name
self.debug_log_secrets = debug_log_secrets
def get_detector_name(self) -> str:
return self.detector_name
def _decode_poorly_encoded_gitguardian_base64(self, base64_string: str):
"""
Decode a poorly encoded base64 string by adding padding if necessary.
"""
# Add padding to the base64 string if it doesn't have it
stripped_base64_string = base64_string.rstrip('=')
padding_needed = len(stripped_base64_string) % 4
if padding_needed:
stripped_base64_string += '=' * (4 - padding_needed)
try:
decoded = base64.b64decode(stripped_base64_string).decode('utf-8', errors='ignore')
return decoded
except Exception:
# If decoding fails, return the original string and track the number of errors
if self.debug_log_secrets:
LOGGER.debug(f"Failed to decode base64 string: {stripped_base64_string} orig {base64_string}")
base64_errors.add(base64_string)
return base64_string
def secret_present_in_row(self, ghas_secret: str, incident_row):
"""Returns true if the secret is present in the GitGuardian dismissed CSV row"""
(alert_secret_username, alert_secret_password) = parse_base64_header(ghas_secret)
if self.detector_name == incident_row.detector_name:
# Ok so GitGuardian does some rather unusual things here
# Imagine a secret: Authorization: Basic bmd4X3Rlc3Q6bmd4X3Rlc3Q=
# That's ngx_test:ngx_test base64 encoded
# GHAS stores this just as the base64 encoded string
# In a GitGuardian CSV export though it's represented in a split form
# but is not padded correctly
# {'password': 'bmd4X3Rlc3Q', 'username': 'bmd4X3Rlc3Q'}
username_encoded = incident_row.matches.get('username')
username_encoded = cast(str, username_encoded)
password_encoded = incident_row.matches.get('password')
password_encoded = cast(str, password_encoded)
row_username_decoded = self._decode_poorly_encoded_gitguardian_base64(username_encoded)
row_password_decoded = self._decode_poorly_encoded_gitguardian_base64(password_encoded)
if self.debug_log_secrets:
LOGGER.debug(f"incident_row['detector_name'] {incident_row.detector_name}")
LOGGER.debug(f'Checking if {alert_secret_username} == {row_username_decoded}')
LOGGER.debug(f'Checking if {alert_secret_password} == {row_password_decoded}')
return alert_secret_username == row_username_decoded and \
alert_secret_password == row_password_decoded
return False
class DbConnectionUrlGitGuardianMapper(AbstractGitGuardianMapper):
"""
For adapting database connection strings like "postgres://bob:secret@1.2.3.4:5432/mydb?sslmode=verify-full"
to the GitGuardian format
"""
def __init__(self, detector_name: str, debug_log_secrets: bool):
self.detector_name = detector_name
self.debug_log_secrets = debug_log_secrets
def get_detector_name(self) -> str:
return self.detector_name
def _int_or_none(self, port: Any) -> int | None:
"""
Convert a port to an int or None if it's not a number.
"""
try:
return int(port)
except (ValueError, TypeError):
return None
def secret_present_in_row(self, ghas_secret: str, incident_row) -> bool:
if self.detector_name == incident_row.detector_name:
url = urlparse(ghas_secret)
ghas_dict = {
'scheme': url.scheme,
'host': url.hostname,
'port': self._int_or_none(url.port),
'username': url.username,
'password': url.password,
}
# Gitguardian matches column also has fields like 'database' & 'connection_uri' we can ignore
# ignoring port since it likely isn't needed
filtered_gitguardian_results = {
'scheme': incident_row.matches.get('scheme'),
'host': incident_row.matches.get('host'),
'port': self._int_or_none(incident_row.matches.get('port')),
'username': incident_row.matches.get('username'),
'password': incident_row.matches.get('password')
}
if self.debug_log_secrets:
LOGGER.debug(f'Checking if {ghas_dict} == {filtered_gitguardian_results}')
return ghas_dict == filtered_gitguardian_results
raise ValueError(f"Illegal detector name {self.detector_name}")
class GoogleCloudKeysMapper(AbstractGitGuardianMapper):
"""
For Google Cloud keys where the JSON key is a dictionary
"""
def __init__(self, debug_log_secrets: bool):
self.detector_name = 'Google Cloud Keys'
self.debug_log_secrets = debug_log_secrets
def get_detector_name(self) -> str:
return self.detector_name
def secret_present_in_row(self, ghas_secret: str, incident_row):
if self.detector_name == incident_row.detector_name:
full_secret_dict = json.loads(ghas_secret)
filtered_ghas_dict = {
'client_id': full_secret_dict.get('client_id'),
'project_id': full_secret_dict.get('project_id'),
'private_key': crypto_utils.strip_key_metadata(cast(str, full_secret_dict.get('private_key'))),
}
filtered_gitguardian_results = {
'client_id': incident_row.matches.get('client_id'),
'project_id': incident_row.matches.get('project_id'),
'private_key': crypto_utils.strip_key_metadata(cast(str, incident_row.matches.get('private_key'))),
}
if self.debug_log_secrets:
LOGGER.debug(f'Checking if {filtered_ghas_dict} == {filtered_gitguardian_results}')
return filtered_ghas_dict == filtered_gitguardian_results
return False
def create_ghas_to_gitguardian_converters(debug_log_secrets: bool) -> dict[str, list[AbstractGitGuardianMapper]]:
"""
Create and return the mapping dictionary from GHAS secret types to GitGuardian mappers.
This data structure maps GHAS Secret Scanning patterns to a list of AbstractGitGuardianMapper objects.
This is how we get from a GHAS Secret Scanning alert to searching for the appropriate entries in the
GitGuardian CSV files which are based on GitGuardian detectors which store data about the secrets
in different ways.
Helpful resources to decoding this:
https://docs.github.com/en/code-security/secret-scanning/introduction/supported-secret-scanning-patterns
https://docs.gitguardian.com/secrets-detection/secrets-detection-engine/detectors/supported_credentials
NOTE: "Not tested" below means that the mapping is probably correct, but it just hasn't been formally verified with a test secret
"""
return {
'aws_access_key_id': [ SimpleGitGuardianMapper('AWS Keys', 'client_id', debug_log_secrets)],
'aws_temporary_access_key_id': [ SimpleGitGuardianMapper('AWS Keys', 'client_id', debug_log_secrets)],
'aws_secret_access_key': [ SimpleGitGuardianMapper('AWS Keys', 'client_secret', debug_log_secrets)],
'aws_session_token': [ SimpleGitGuardianMapper('AWS Keys', 'session_token', debug_log_secrets)],
'github_personal_access_token': [ SimpleGitGuardianMapper('GitHub Access Token', 'apikey', debug_log_secrets)],
'jfrog_platform_api_key': [ SimpleGitGuardianMapper('Artifactory Token', 'apikey', debug_log_secrets)],
'databricks_access_token': [ SimpleGitGuardianMapper('Databricks Authentication Token', 'token', debug_log_secrets),
SimpleGitGuardianMapper('Databricks Authentication Token With Hostname', 'token', debug_log_secrets)], # Not tested
'google_api_key': [ SimpleGitGuardianMapper('Google API Key', 'apikey', debug_log_secrets)],
'google_oauth_client_id': [ SimpleGitGuardianMapper('Google OAuth2 Keys', 'client_id', debug_log_secrets)], # Not tested
'google_oauth_client_secret': [ SimpleGitGuardianMapper('Google OAuth2 Keys', 'client_secret', debug_log_secrets)], # Not tested
'http_bearer_authentication_header': [ SimpleGitGuardianMapper('Bearer Token', 'apikey', debug_log_secrets)],
'google_cloud_service_account_credentials': [ GoogleCloudKeysMapper(debug_log_secrets) ], # Not tested
'slack_api_token': [ SimpleGitGuardianMapper('Slack API Token', 'apikey', debug_log_secrets)], # Not tested
'slack_incoming_webhook_url': [ SlackWebhookGitGuardianMapper('Slack Webhook URL', 'apikey', debug_log_secrets)],
'slack_workflow_webhook_url': [ SlackWebhookGitGuardianMapper('Slack Webhook URL', 'apikey', debug_log_secrets)], # Not tested
'postgres_connection_string': [ DbConnectionUrlGitGuardianMapper('PostgreSQL Credentials', debug_log_secrets)],
'openssh_private_key': [ Base64KeyGitGuardianMapper('OpenSSH Private Key', 'apikey', debug_log_secrets)],
# GHAS probably lacks support for "Generic Private Key", "DSA Private Key", "Elliptic Curve Private Key", and "Encrypted Private Key"
'rsa_private_key': [ Base64KeyGitGuardianMapper('RSA Private Key', 'apikey', debug_log_secrets)],
'pgp_private_key': [ Base64KeyGitGuardianMapper('PGP Private Key', 'apikey', debug_log_secrets)],
# "Base64 Generic High Entropy Secret" may relate to this somehow
'http_basic_authentication_header': [ Base64BasicAuthenticationGitGuardianMapper('Base64 Basic Authentication', debug_log_secrets)],
# GitHub's 'password' type could map to a variety of GitGuardian credentials. This list is likely incomplete but is a good start.
'password': [ SimpleGitGuardianMapper('Authentication Tuple', 'password', debug_log_secrets),
SimpleGitGuardianMapper('Basic Auth String', 'password', debug_log_secrets), # Not tested
SimpleGitGuardianMapper('Company Email Password', 'password', debug_log_secrets), # Not tested
SimpleGitGuardianMapper('FTP Credentials', 'password', debug_log_secrets), # Not tested
SimpleGitGuardianMapper('Generic High Entropy Secret', 'apikey', debug_log_secrets),
SimpleGitGuardianMapper('Generic Database Assignment', 'password', debug_log_secrets), # Not tested
SimpleGitGuardianMapper('Generic Password', 'password', debug_log_secrets), # Not tested
SimpleGitGuardianMapper('Bearer Token', 'apikey', debug_log_secrets), # Not tested
SimpleGitGuardianMapper('Generic CLI Secret', 'apikey', debug_log_secrets), # Not tested
SimpleGitGuardianMapper('Generic Terraform Variable Secret', 'apikey', debug_log_secrets), # Not tested
],
}
def retrieve_mapper(ghas_secret_type: str, debug_log_secrets: bool) -> list[AbstractGitGuardianMapper]:
"""
Retrieve the appropriate gitguardian mappers for the given GitHub Advanced Security secret type.
"""
converters = create_ghas_to_gitguardian_converters(debug_log_secrets)
if ghas_secret_type in converters:
return cast(list[AbstractGitGuardianMapper], converters.get(ghas_secret_type))
raise ValueError(f"No mapper found for secret type '{ghas_secret_type}'")