feat: Add HDFS as a feature registry (#5655) · feast-dev/feast@4c65872
1+import json
2+import uuid
3+from pathlib import Path, PurePosixPath
4+from typing import Optional
5+from urllib.parse import urlparse
6+7+from pyarrow import fs
8+9+from feast.infra.registry.registry_store import RegistryStore
10+from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
11+from feast.repo_config import RegistryConfig
12+from feast.utils import _utc_now
13+14+15+class HDFSRegistryStore(RegistryStore):
16+"""HDFS implementation of RegistryStore.
17+ registryConfig.path should be a hdfs path like hdfs://namenode:8020/path/to/registry.db
18+ """
19+20+def __init__(self, registry_config: RegistryConfig, repo_path: Path):
21+try:
22+from pyarrow.fs import HadoopFileSystem
23+except ImportError as e:
24+from feast.errors import FeastExtrasDependencyImportError
25+26+raise FeastExtrasDependencyImportError(
27+"pyarrow.fs.HadoopFileSystem", str(e)
28+ )
29+uri = registry_config.path
30+self._uri = urlparse(uri)
31+if self._uri.scheme != "hdfs":
32+raise ValueError(
33+f"Unsupported scheme {self._uri.scheme} in HDFS path {uri}"
34+ )
35+self._hdfs = HadoopFileSystem(self._uri.hostname, self._uri.port or 8020)
36+self._path = PurePosixPath(self._uri.path)
37+38+def get_registry_proto(self):
39+registry_proto = RegistryProto()
40+if _check_hdfs_path_exists(self._hdfs, str(self._path)):
41+with self._hdfs.open_input_file(str(self._path)) as f:
42+registry_proto.ParseFromString(f.read())
43+return registry_proto
44+raise FileNotFoundError(
45+f'Registry not found at path "{self._uri.geturl()}". Have you run "feast apply"?'
46+ )
47+48+def update_registry_proto(self, registry_proto: RegistryProto):
49+self._write_registry(registry_proto)
50+51+def teardown(self):
52+if _check_hdfs_path_exists(self._hdfs, str(self._path)):
53+self._hdfs.delete_file(str(self._path))
54+else:
55+# Nothing to do
56+pass
57+58+def _write_registry(self, registry_proto: RegistryProto):
59+"""Write registry protobuf to HDFS."""
60+registry_proto.version_id = str(uuid.uuid4())
61+registry_proto.last_updated.FromDatetime(_utc_now())
62+63+dir_path = self._path.parent
64+if not _check_hdfs_path_exists(self._hdfs, str(dir_path)):
65+self._hdfs.create_dir(str(dir_path), recursive=True)
66+67+with self._hdfs.open_output_stream(str(self._path)) as f:
68+f.write(registry_proto.SerializeToString())
69+70+def set_project_metadata(self, project: str, key: str, value: str):
71+"""Set a custom project metadata key-value pair in the registry (HDFS backend)."""
72+registry_proto = self.get_registry_proto()
73+found = False
74+75+for pm in registry_proto.project_metadata:
76+if pm.project == project:
77+# Load JSON metadata from project_uuid
78+try:
79+meta = json.loads(pm.project_uuid) if pm.project_uuid else {}
80+except Exception:
81+meta = {}
82+83+if not isinstance(meta, dict):
84+meta = {}
85+86+meta[key] = value
87+pm.project_uuid = json.dumps(meta)
88+found = True
89+break
90+91+if not found:
92+# Create new ProjectMetadata entry
93+from feast.project_metadata import ProjectMetadata
94+95+pm = ProjectMetadata(project_name=project)
96+pm.project_uuid = json.dumps({key: value})
97+registry_proto.project_metadata.append(pm.to_proto())
98+99+# Write back
100+self.update_registry_proto(registry_proto)
101+102+def get_project_metadata(self, project: str, key: str) -> Optional[str]:
103+"""Get custom project metadata key from registry (HDFS backend)."""
104+registry_proto = self.get_registry_proto()
105+106+for pm in registry_proto.project_metadata:
107+if pm.project == project:
108+try:
109+meta = json.loads(pm.project_uuid) if pm.project_uuid else {}
110+except Exception:
111+meta = {}
112+113+if not isinstance(meta, dict):
114+return None
115+return meta.get(key, None)
116+return None
117+118+119+def _check_hdfs_path_exists(hdfs, path: str) -> bool:
120+info = hdfs.get_file_info([path])[0]
121+return info.type != fs.FileType.NotFound