-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathautogitbundle
More file actions
executable file
·296 lines (239 loc) · 8.15 KB
/
autogitbundle
File metadata and controls
executable file
·296 lines (239 loc) · 8.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
#!/usr/bin/env uv --quiet run --no-project --script --isolated --refresh --
# /// script
# # Docopt issues SyntaxWarning in Python 3.12
# requires-python = ">=3.11,<3.12"
# dependencies = [
# "docopt >=0.6.2",
# ]
# ///
"""
Usage:
{prog}
"""
import sys, locale, asyncio, subprocess, os, pathlib, re, typing
import docopt
async def main(*, args, prog):
locale.setlocale(locale.LC_ALL, "")
params = docopt.docopt(
__doc__.replace("\t", " " * 4).format(prog=os.path.basename(prog)),
argv=args,
help=True,
version=True,
options_first=False
)
assert not params, params
log = asyncio.Queue()
pending_tasks = []
printer_task = asyncio.create_task(
printer(queue=log, fo=sys.stdout)
)
await create_bundles(log=log)
await asyncio.gather(*pending_tasks)
current_task = asyncio.current_task()
all_tasks = asyncio.all_tasks()
assert {current_task, printer_task} == all_tasks, (current_task, all_tasks)
await log.put(None)
await printer_task
async def create_bundles(*, log):
"""
Configurations:
All configuration keys below should be in sections: autogitbundle.<bundles_root_dir>.
If bundles_root_dir is empty, current folder is used.
prefix
Current repo path must start with specified prefix.
suffix
Current repo path must end with specified suffix.
suffixregex
Current repo path must end with a string that matches specified regex.
The suffix value is the matching part.
bundle
Generate a bundle.
If the bundle config key doesn't have value, the suffix value is used.
The bundle will be created at (bundles_root_dir + value) path.
Example configuration:
[autogitbundle "~/Sync/zzz-git-bundles"]
prefix = ~/Sync/zzz-git-vruyr-
suffix = software/scripts.git
suffix = software/repos.git
suffix = software/metasnap.git
Will generate:
~/Sync/zzz-git-bundles/software/scripts.gitbundle
~/Sync/zzz-git-bundles/software/repos.gitbundle
~/Sync/zzz-git-bundles/software/metasnap.gitbundle
"""
git = Git(git_path="git", output_queue=log)
repo_path = pathlib.Path.cwd()
repo_path_prefix: str | None = None
repo_path_suffix: str | re.Pattern[str] | None = None
# prefix
# Always a string unless misconfigured.
# There should always be a prefix before suffix or bundle.
#
# suffix
# str
# The configured suffix. A bundle will be created if repo is at (prefix + suffix) path.
# re.Pattern
# The suffix regex. A bundle will be created with regex matches what's left after removing a matching prefix.
# None
# A suffix was not configured. The "prefix" was followed by "bundle".
async for bundle_dir, key, value in git.config_sections("autogitbundle"):
if bundle_dir is None:
bundle_dir = "."
bundle_dir = repo_path.parent / os.path.expanduser(bundle_dir)
if key == "prefix":
assert value is not None
repo_path_prefix = os.path.expanduser(value)
repo_path_suffix = None
continue
if key == "suffix":
repo_path_suffix = value
continue
if key == "suffixregex":
assert value is not None
repo_path_suffix = re.compile(value)
continue
if key == "bundle":
prefix, suffix, bundle = repo_path_prefix, repo_path_suffix, value
repo_path_suffix = None
if isinstance(suffix, re.Pattern):
s = strip_prefix(repo_path.as_posix(), prefix, None)
if s is None:
# prefix didn't match
continue
if not suffix.fullmatch(s):
# suffix regex didn't match
continue
suffix = s
del s
if suffix is None:
suffix = bundle
if prefix is None or suffix is None:
continue
p = pathlib.Path(prefix + suffix)
if p.exists() and p.samefile(repo_path):
bundle_path = bundle or suffix
assert bundle_path, (bundle_path,)
bundle_path = bundle_dir / (strip_suffix(bundle_path, ".git", bundle_path) + "")
if await git.is_bundle_out_of_date(bundle_path):
bundle_path.parent.mkdir(parents=True, exist_ok=True)
await log.put(f"Creating bundle {bundle_path}")
await git.cmd("bundle", "create", str(bundle_path), "--all", stdout=False)
else:
await log.put(f"Bundle is up to date: {bundle_path}")
continue
await log.put(f"Unknown config key {key}")
class Git(object):
def __init__(self, *, output_queue, git_path, encoding="UTF-8"):
self._output = output_queue
self._git_path = git_path
self._encoding = encoding
async def get_all_refs(self):
result = {
"HEAD": (await self.cmd("rev-parse", "--verify", "HEAD", decode=True)).strip(),
}
refs = await self.cmd("for-each-ref", "--format", "%(objectname) %(refname)", decode=True)
for ref0 in refs.splitlines(keepends=False):
oid, sep, ref = ref0.partition(" ")
assert sep == " ", (ref0,)
result[ref] = oid
return result
async def bundle_list_refs(self, bundle_path, /):
result = {}
refs = await self.cmd("bundle", "list-heads", str(bundle_path), decode=True)
for ref0 in refs.splitlines(keepends=False):
oid, sep, ref = ref0.partition(" ")
result[ref] = oid
return result
async def is_bundle_out_of_date(self, bundle_path, /):
if not bundle_path.exists():
return True
heads_repo = await self.get_all_refs()
heads_bundle = await self.bundle_list_refs(bundle_path)
return heads_repo != heads_bundle
async def cmd(self, *args, stdin_data=None, stdout=True, stderr=True, decode=False):
if isinstance(stdin_data, str):
stdin_data = stdin_data.encode(self._encoding)
elif stdin_data is None:
stdin_data = b""
assert isinstance(stdin_data, bytes)
env = dict(os.environ)
env["GIT_TERMINAL_PROMPT"] = "0"
env["GIT_ASKPASS"] = "true"
p = await asyncio.create_subprocess_exec(
self._git_path, *args,
stdin=subprocess.PIPE,
stdout=(subprocess.PIPE if stdout else None),
stderr=(subprocess.PIPE if stderr else None),
env=env,
)
stdout_bytes, stderr_bytes = await p.communicate(input=stdin_data)
assert stdout_bytes is None or isinstance(stdout_bytes, bytes), (stdout_bytes,)
assert stderr_bytes is None or isinstance(stderr_bytes, bytes), (stderr_bytes)
if stderr_bytes:
raise RuntimeError(f"git command {args} in {os.getcwd()} produced stderr output - {stderr_bytes}")
return stdout_bytes.decode(self._encoding) if decode else stdout_bytes
async def config(self, *args, decode=True, single=True):
out = (await self.cmd("config", "--null", *args, decode=False)).split(b"\0")
assert out[-1] == b""
del out[-1]
out = [c.decode(self._encoding) for c in out] if decode else out
if single:
assert len(out) == 1, (out,)
out = out[0]
return out
async def config_sections(self, section_name) -> typing.Generator[typing.Tuple[str | None, str, str | None], None, None]:
out = await self.config("--get-regexp", re.escape(section_name) + r"\..*", single=False)
for c in out:
(section, middle, key), value = self._split_config_entry(c)
assert section == section_name, section
yield (middle, key, value)
_config_entry_key_pattern = re.compile(r"^([^.]+)\.(?:(.*)\.)?([^.]+)$")
def _split_config_entry(self, entry: str) -> typing.Tuple[str, str | None, str, str | None]:
# Use None if the config entry doesn't have a value.
key: str
key, value = (entry.split("\n", maxsplit=1) + [None])[:2]
m: re.Match[str] | None
m = self._config_entry_key_pattern.match(key)
if m is None:
raise ValueError(f"Got an invalid config key - {repr(key)}")
return (m.groups(), value)
def strip_prefix(s, prefix, default=None):
"""
Returns the suffix if prefix matches and value of the default argument otherwise.
"""
if s.startswith(prefix):
return s[len(prefix):]
return default
def strip_suffix(s, suffix, default=None):
"""
Returns the prefix if suffix matches and value of the default argument otherwise.
"""
if s.endswith(suffix):
return s[:-len(suffix)]
return default
async def printer(*, queue, fo):
while True:
item = await queue.get()
if item is None:
break
if isinstance(item, (list, tuple)):
print(*item, file=fo)
else:
print(item, file=fo)
def smain(argv=None):
if argv is None:
argv = sys.argv
try:
if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
return asyncio.run(
main(
args=argv[1:],
prog=argv[0]
),
debug=False,
)
except KeyboardInterrupt:
print(file=sys.stderr)
if __name__ == "__main__":
sys.exit(smain())