feat: BasicCrawler.export_data helper by janbuchar · Pull Request #222 · apify/crawlee-python
Expand Up
@@ -2,7 +2,8 @@
import csv import io from typing import TYPE_CHECKING, AsyncIterator, Literal, TypedDict, cast import json from typing import TYPE_CHECKING, AsyncIterator, Literal, TextIO, TypedDict, cast
from typing_extensions import NotRequired, Required, Unpack, override
Expand Down Expand Up @@ -189,6 +190,32 @@ async def get_data(self, **kwargs: Unpack[GetDataKwargs]) -> DatasetItemsListPag # https://github.com/apify/apify-sdk-python/issues/140 return await self._resource_client.list_items(**kwargs)
async def write_to(self, content_type: Literal['json', 'csv'], destination: TextIO) -> None: """Exports the entire dataset into an arbitrary stream.
Args: content_type: Specifies the output format destination: The stream into which the dataset contents should be written """ items: list[dict] = [] limit = 1000 offset = 0
while True: list_items = await self._resource_client.list_items(limit=limit, offset=offset) items.extend(list_items.items) if list_items.total <= offset + list_items.count: break offset += list_items.count
if content_type == 'csv': writer = csv.writer(destination, quoting=csv.QUOTE_MINIMAL) writer.writerows([items[0].keys(), *[item.values() for item in items]]) elif content_type == 'json': json.dump(items, destination) else: raise ValueError(f'Unsupported content type: {content_type}')
async def export_to(self, **kwargs: Unpack[ExportToKwargs]) -> None: """Exports the entire dataset into a specified file stored under a key in a key-value store.
Expand All @@ -206,30 +233,15 @@ async def export_to(self, **kwargs: Unpack[ExportToKwargs]) -> None: to_key_value_store_name = kwargs.get('to_key_value_store_name', None)
key_value_store = await KeyValueStore.open(id=to_key_value_store_id, name=to_key_value_store_name) items: list[dict] = [] limit = 1000 offset = 0
while True: list_items = await self._resource_client.list_items(limit=limit, offset=offset) items.extend(list_items.items) if list_items.total <= offset + list_items.count: break offset += list_items.count output = io.StringIO() await self.write_to(content_type, output)
if content_type == 'csv': content_type_full = 'text/csv' output = io.StringIO() writer = csv.writer(output, quoting=csv.QUOTE_MINIMAL) writer.writerows([items[0].keys(), *[item.values() for item in items]]) value = output.getvalue() return await key_value_store.set_value(key, value, content_type_full) await key_value_store.set_value(key, output.getvalue(), 'text/csv')
if content_type == 'json': content_type_full = 'application/json' return await key_value_store.set_value(key, items, content_type_full)
raise ValueError(f'Unsupported content type: {content_type}') await key_value_store.set_value(key, output.getvalue(), 'application/json')
async def get_info(self) -> DatasetMetadata | None: """Get an object containing general information about the dataset.""" Expand Down
import csv import io from typing import TYPE_CHECKING, AsyncIterator, Literal, TypedDict, cast import json from typing import TYPE_CHECKING, AsyncIterator, Literal, TextIO, TypedDict, cast
from typing_extensions import NotRequired, Required, Unpack, override
Expand Down Expand Up @@ -189,6 +190,32 @@ async def get_data(self, **kwargs: Unpack[GetDataKwargs]) -> DatasetItemsListPag # https://github.com/apify/apify-sdk-python/issues/140 return await self._resource_client.list_items(**kwargs)
async def write_to(self, content_type: Literal['json', 'csv'], destination: TextIO) -> None: """Exports the entire dataset into an arbitrary stream.
Args: content_type: Specifies the output format destination: The stream into which the dataset contents should be written """ items: list[dict] = [] limit = 1000 offset = 0
while True: list_items = await self._resource_client.list_items(limit=limit, offset=offset) items.extend(list_items.items) if list_items.total <= offset + list_items.count: break offset += list_items.count
if content_type == 'csv': writer = csv.writer(destination, quoting=csv.QUOTE_MINIMAL) writer.writerows([items[0].keys(), *[item.values() for item in items]]) elif content_type == 'json': json.dump(items, destination) else: raise ValueError(f'Unsupported content type: {content_type}')
async def export_to(self, **kwargs: Unpack[ExportToKwargs]) -> None: """Exports the entire dataset into a specified file stored under a key in a key-value store.
Expand All @@ -206,30 +233,15 @@ async def export_to(self, **kwargs: Unpack[ExportToKwargs]) -> None: to_key_value_store_name = kwargs.get('to_key_value_store_name', None)
key_value_store = await KeyValueStore.open(id=to_key_value_store_id, name=to_key_value_store_name) items: list[dict] = [] limit = 1000 offset = 0
while True: list_items = await self._resource_client.list_items(limit=limit, offset=offset) items.extend(list_items.items) if list_items.total <= offset + list_items.count: break offset += list_items.count output = io.StringIO() await self.write_to(content_type, output)
if content_type == 'csv': content_type_full = 'text/csv' output = io.StringIO() writer = csv.writer(output, quoting=csv.QUOTE_MINIMAL) writer.writerows([items[0].keys(), *[item.values() for item in items]]) value = output.getvalue() return await key_value_store.set_value(key, value, content_type_full) await key_value_store.set_value(key, output.getvalue(), 'text/csv')
if content_type == 'json': content_type_full = 'application/json' return await key_value_store.set_value(key, items, content_type_full)
raise ValueError(f'Unsupported content type: {content_type}') await key_value_store.set_value(key, output.getvalue(), 'application/json')
async def get_info(self) -> DatasetMetadata | None: """Get an object containing general information about the dataset.""" Expand Down