feat: Add HDFS as a feature registry (#5655) · feast-dev/feast@4c65872

1+

import json

2+

import uuid

3+

from pathlib import Path, PurePosixPath

4+

from typing import Optional

5+

from urllib.parse import urlparse

6+7+

from pyarrow import fs

8+9+

from feast.infra.registry.registry_store import RegistryStore

10+

from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto

11+

from feast.repo_config import RegistryConfig

12+

from feast.utils import _utc_now

13+14+15+

class HDFSRegistryStore(RegistryStore):

16+

"""HDFS implementation of RegistryStore.

17+

registryConfig.path should be a hdfs path like hdfs://namenode:8020/path/to/registry.db

18+

"""

19+20+

def __init__(self, registry_config: RegistryConfig, repo_path: Path):

21+

try:

22+

from pyarrow.fs import HadoopFileSystem

23+

except ImportError as e:

24+

from feast.errors import FeastExtrasDependencyImportError

25+26+

raise FeastExtrasDependencyImportError(

27+

"pyarrow.fs.HadoopFileSystem", str(e)

28+

)

29+

uri = registry_config.path

30+

self._uri = urlparse(uri)

31+

if self._uri.scheme != "hdfs":

32+

raise ValueError(

33+

f"Unsupported scheme {self._uri.scheme} in HDFS path {uri}"

34+

)

35+

self._hdfs = HadoopFileSystem(self._uri.hostname, self._uri.port or 8020)

36+

self._path = PurePosixPath(self._uri.path)

37+38+

def get_registry_proto(self):

39+

registry_proto = RegistryProto()

40+

if _check_hdfs_path_exists(self._hdfs, str(self._path)):

41+

with self._hdfs.open_input_file(str(self._path)) as f:

42+

registry_proto.ParseFromString(f.read())

43+

return registry_proto

44+

raise FileNotFoundError(

45+

f'Registry not found at path "{self._uri.geturl()}". Have you run "feast apply"?'

46+

)

47+48+

def update_registry_proto(self, registry_proto: RegistryProto):

49+

self._write_registry(registry_proto)

50+51+

def teardown(self):

52+

if _check_hdfs_path_exists(self._hdfs, str(self._path)):

53+

self._hdfs.delete_file(str(self._path))

54+

else:

55+

# Nothing to do

56+

pass

57+58+

def _write_registry(self, registry_proto: RegistryProto):

59+

"""Write registry protobuf to HDFS."""

60+

registry_proto.version_id = str(uuid.uuid4())

61+

registry_proto.last_updated.FromDatetime(_utc_now())

62+63+

dir_path = self._path.parent

64+

if not _check_hdfs_path_exists(self._hdfs, str(dir_path)):

65+

self._hdfs.create_dir(str(dir_path), recursive=True)

66+67+

with self._hdfs.open_output_stream(str(self._path)) as f:

68+

f.write(registry_proto.SerializeToString())

69+70+

def set_project_metadata(self, project: str, key: str, value: str):

71+

"""Set a custom project metadata key-value pair in the registry (HDFS backend)."""

72+

registry_proto = self.get_registry_proto()

73+

found = False

74+75+

for pm in registry_proto.project_metadata:

76+

if pm.project == project:

77+

# Load JSON metadata from project_uuid

78+

try:

79+

meta = json.loads(pm.project_uuid) if pm.project_uuid else {}

80+

except Exception:

81+

meta = {}

82+83+

if not isinstance(meta, dict):

84+

meta = {}

85+86+

meta[key] = value

87+

pm.project_uuid = json.dumps(meta)

88+

found = True

89+

break

90+91+

if not found:

92+

# Create new ProjectMetadata entry

93+

from feast.project_metadata import ProjectMetadata

94+95+

pm = ProjectMetadata(project_name=project)

96+

pm.project_uuid = json.dumps({key: value})

97+

registry_proto.project_metadata.append(pm.to_proto())

98+99+

# Write back

100+

self.update_registry_proto(registry_proto)

101+102+

def get_project_metadata(self, project: str, key: str) -> Optional[str]:

103+

"""Get custom project metadata key from registry (HDFS backend)."""

104+

registry_proto = self.get_registry_proto()

105+106+

for pm in registry_proto.project_metadata:

107+

if pm.project == project:

108+

try:

109+

meta = json.loads(pm.project_uuid) if pm.project_uuid else {}

110+

except Exception:

111+

meta = {}

112+113+

if not isinstance(meta, dict):

114+

return None

115+

return meta.get(key, None)

116+

return None

117+118+119+

def _check_hdfs_path_exists(hdfs, path: str) -> bool:

120+

info = hdfs.get_file_info([path])[0]

121+

return info.type != fs.FileType.NotFound