-
-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Expand file tree
/
Copy pathplugin.py
More file actions
246 lines (201 loc) · 8.97 KB
/
plugin.py
File metadata and controls
246 lines (201 loc) · 8.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
"""Class for plugins in HACS."""
from __future__ import annotations
from contextlib import suppress
import re
from typing import TYPE_CHECKING
from ..enums import HacsCategory, HacsDispatchEvent
from ..exceptions import HacsException
from ..utils.decorator import concurrent
from ..utils.json import json_loads
from .base import HacsRepository
HACSTAG_REPLACER = re.compile(r"\D+")
if TYPE_CHECKING:
from homeassistant.components.lovelace.resources import ResourceStorageCollection
from ..base import HacsBase
class HacsPluginRepository(HacsRepository):
"""Plugins in HACS."""
def __init__(self, hacs: HacsBase, full_name: str):
"""Initialize."""
super().__init__(hacs=hacs)
self.data.full_name = full_name
self.data.full_name_lower = full_name.lower()
self.data.file_name = None
self.data.category = HacsCategory.PLUGIN
self.content.path.local = self.localpath
@property
def localpath(self):
"""Return localpath."""
return f"{self.hacs.core.config_path}/www/community/{self.data.full_name.split('/')[-1]}"
async def validate_repository(self):
"""Validate."""
# Run common validation steps.
await self.common_validate()
# Custom step 1: Validate content.
self.update_filenames()
if self.content.path.remote is None:
raise HacsException(
f"{self.string} Repository structure for {self.ref.replace('tags/', '')} is not compliant"
)
if self.content.path.remote == "release":
self.content.single = True
# Handle potential errors
if self.validate.errors:
for error in self.validate.errors:
if not self.hacs.status.startup:
self.logger.error("%s %s", self.string, error)
return self.validate.success
async def async_post_installation(self):
"""Run post installation steps."""
await self.hacs.async_setup_frontend_endpoint_plugin()
await self.update_dashboard_resources()
async def async_post_uninstall(self):
"""Run post uninstall steps."""
await self.remove_dashboard_resources()
@concurrent(concurrenttasks=10, backoff_time=5)
async def update_repository(self, ignore_issues=False, force=False):
"""Update."""
if not await self.common_update(ignore_issues, force) and not force:
return
# Get plugin objects.
self.update_filenames()
if self.content.path.remote is None:
self.validate.errors.append(
f"{self.string} Repository structure for {self.ref.replace('tags/', '')} is not compliant"
)
if self.content.path.remote == "release":
self.content.single = True
# Signal frontend to refresh
if self.data.installed:
self.hacs.async_dispatch(
HacsDispatchEvent.REPOSITORY,
{
"id": 1337,
"action": "update",
"repository": self.data.full_name,
"repository_id": self.data.id,
},
)
async def get_package_content(self):
"""Get package content."""
with suppress(Exception):
result = await self.hacs.async_download_file(
f"https://raw.githubusercontent.com/{self.data.full_name}/{self.ref}/package.json",
nolog=True,
)
if result is not None and (package := json_loads(result)):
self.data.authors = package["author"]
def update_filenames(self) -> None:
"""Get the filename to target."""
content_in_root = self.repository_manifest.content_in_root
if specific_filename := self.repository_manifest.filename:
valid_filenames = (specific_filename,)
else:
valid_filenames = (
f"{self.data.name.replace('lovelace-', '')}.js",
f"{self.data.name}.js",
f"{self.data.name}.umd.js",
f"{self.data.name}-bundle.js",
)
if not content_in_root:
if self.releases.objects:
release = self.releases.objects[0]
if release.assets:
if assetnames := [
filename
for filename in valid_filenames
for asset in release.assets
if filename == asset.name
]:
self.data.file_name = assetnames[0]
self.content.path.remote = "release"
return
all_paths = {x.full_path for x in self.tree}
for filename in valid_filenames:
if filename in all_paths:
self.data.file_name = filename
self.content.path.remote = ""
return
if not content_in_root and f"dist/{filename}" in all_paths:
self.data.file_name = filename.split("/")[-1]
self.content.path.remote = "dist"
return
def generate_dashboard_resource_hacstag(self) -> str:
"""Get the HACS tag used by dashboard resources."""
version = (
self.display_installed_version
or self.data.selected_tag
or self.display_available_version
)
return f"{self.data.id}{HACSTAG_REPLACER.sub('', version)}"
def generate_dashboard_resource_namespace(self) -> str:
"""Get the dashboard resource namespace."""
return f"/hacsfiles/{self.data.full_name.split('/')[1]}"
def generate_dashboard_resource_url(self) -> str:
"""Get the dashboard resource namespace."""
filename = self.data.file_name
if "/" in filename:
self.logger.warning("%s have defined an invalid file name %s", self.string, filename)
filename = filename.split("/")[-1]
return (
f"{self.generate_dashboard_resource_namespace()}/{filename}"
f"?hacstag={self.generate_dashboard_resource_hacstag()}"
)
def _get_resource_handler(self) -> ResourceStorageCollection | None:
"""Get the resource handler."""
resources: ResourceStorageCollection | None
if not (hass_data := self.hacs.hass.data):
self.logger.error("%s Can not access the hass data", self.string)
return
if (lovelace_data := hass_data.get("lovelace")) is None:
self.logger.warning("%s Can not access the lovelace integration data", self.string)
return
if self.hacs.core.ha_version > "2025.1.99":
# Changed to 2025.2.0
# Changed in https://github.com/home-assistant/core/pull/136313
resources = lovelace_data.resources
else:
resources = lovelace_data.get("resources")
if resources is None:
self.logger.warning("%s Can not access the dashboard resources", self.string)
return
if not hasattr(resources, "store") or resources.store is None:
self.logger.info("%s YAML mode detected, can not update resources", self.string)
return
if resources.store.key != "lovelace_resources" or resources.store.version != 1:
self.logger.warning("%s Can not use the dashboard resources", self.string)
return
return resources
async def update_dashboard_resources(self) -> None:
"""Update dashboard resources."""
if not (resources := self._get_resource_handler()):
return
if not resources.loaded:
await resources.async_load()
namespace = self.generate_dashboard_resource_namespace()
url = self.generate_dashboard_resource_url()
for entry in resources.async_items():
if (entry_url := entry["url"]).startswith(namespace):
if entry_url != url:
self.logger.info(
"%s Updating existing dashboard resource from %s to %s",
self.string,
entry_url,
url,
)
await resources.async_update_item(entry["id"], {"url": url})
return
# Nothing was updated, add the resource
self.logger.info("%s Adding dashboard resource %s", self.string, url)
await resources.async_create_item({"res_type": "module", "url": url})
async def remove_dashboard_resources(self) -> None:
"""Remove dashboard resources."""
if not (resources := self._get_resource_handler()):
return
if not resources.loaded:
await resources.async_load()
namespace = self.generate_dashboard_resource_namespace()
for entry in resources.async_items():
if entry["url"].startswith(namespace):
self.logger.info("%s Removing dashboard resource %s", self.string, entry["url"])
await resources.async_delete_item(entry["id"])
return