Examples
Few examples with various fsspec supported/compatible python packages are given below.
pandas
Example to read/write csv file from/to CDF Files using pandas package. Environment variables are used to authenticate.
"""Example script for pandas package."""
import os
import numpy as np
import pandas as pd
from cognite.client import global_config
from cognite.client.data_classes.files import FileMetadata
from cognite import cdffs # noqa
global_config.disable_pypi_version_check = True
DATASET_ID = os.environ.get("DATASET_ID")
def main():
# Create a dataframe
df = pd.DataFrame({"x": np.arange(1000), "y": np.arange(1000)})
file_metadata = FileMetadata(source="pandas_test", mime_type="text/csv", data_set_id=DATASET_ID)
# Write the data using pandas to CDF Files.
df.to_csv(
"cdffs://pandas_test/out/pandas_df.csv",
index=False,
storage_options={"file_metadata": file_metadata},
)
# Read the data using pandas from CDF Files.
df2 = pd.read_csv("cdffs://pandas_test/out/pandas_df.csv")
print(df.shape, df2.shape)
if __name__ == "__main__":
main()
xarray
Example to read/write zarr files from/to CDF Files using xarray package.
"""Example script for xarray package."""
import os
import numpy as np
import pandas as pd
import xarray
from cognite.client import ClientConfig, global_config
from cognite.client.credentials import OAuthClientCredentials
from cognite.client.data_classes.files import FileMetadata
from cognite import cdffs # noqa
global_config.disable_pypi_version_check = True
CLIENT_ID = os.environ.get("CLIENT_ID")
CLIENT_SECRET = os.environ.get("CLIENT_SECRET")
COGNITE_PROJECT = os.environ.get("COGNITE_PROJECT")
TENANT_ID = os.environ.get("TENANT_ID")
CDF_CLUSTER = os.environ.get("CDF_CLUSTER")
DATASET_ID = os.environ.get("DATASET_ID")
# Create a CDF Client Config
SCOPES = [f"https://{CDF_CLUSTER}.cognitedata.com/.default"]
TOKEN_URL = f"https://login.microsoftonline.com/{TENANT_ID}/oauth2/v2.0/token"
def main():
oauth_creds = OAuthClientCredentials(
token_url=TOKEN_URL, client_id=CLIENT_ID, client_secret=CLIENT_SECRET, scopes=SCOPES
)
client_cnf = ClientConfig(
client_name="cdf-client",
base_url=f"https://{CDF_CLUSTER}.cognitedata.com",
project=COGNITE_PROJECT,
credentials=oauth_creds,
timeout=60,
)
# Create a dataset
df = pd.DataFrame({"x": np.arange(1000), "y": np.arange(1000)})
ds1 = df.to_xarray()
# Write the zarr files using xarray to CDF Files.
file_metadata = FileMetadata(source="sample_zarr", mime_type="application/octet-stream", data_set_id=DATASET_ID)
ds1.to_zarr(
"cdffs://sample/test.zarr", storage_options={"connection_config": client_cnf, "file_metadata": file_metadata}
)
# Read the zarr files using xarray from CDF Files.
ds2 = xarray.open_zarr("cdffs://sample/test.zarr", storage_options={"connection_config": client_cnf})
print(ds1.info, ds2.info)
if __name__ == "__main__":
main()
dask
Example to read/write csv file from/to CDF Files using dask package.
"""Example script for dask package."""
import os
import dask.dataframe as dd
import numpy as np
import pandas as pd
from cognite.client import ClientConfig, global_config
from cognite.client.credentials import OAuthClientCredentials
from cognite.client.data_classes.files import FileMetadata
from cognite import cdffs # noqa
global_config.disable_pypi_version_check = True
CLIENT_ID = os.environ.get("CLIENT_ID")
CLIENT_SECRET = os.environ.get("CLIENT_SECRET")
COGNITE_PROJECT = os.environ.get("COGNITE_PROJECT")
TENANT_ID = os.environ.get("TENANT_ID")
CDF_CLUSTER = os.environ.get("CDF_CLUSTER")
DATASET_ID = os.environ.get("DATASET_ID")
# Create a CDF Client Config
SCOPES = [f"https://{CDF_CLUSTER}.cognitedata.com/.default"]
TOKEN_URL = f"https://login.microsoftonline.com/{TENANT_ID}/oauth2/v2.0/token"
def main():
oauth_creds = OAuthClientCredentials(
token_url=TOKEN_URL, client_id=CLIENT_ID, client_secret=CLIENT_SECRET, scopes=SCOPES
)
client_cnf = ClientConfig(
client_name="cdf-client",
base_url=f"https://{CDF_CLUSTER}.cognitedata.com",
project=COGNITE_PROJECT,
credentials=oauth_creds,
timeout=60,
)
# Create a Dask Dataframe from pandas
inp_df = dd.from_pandas(pd.DataFrame(np.arange(10).reshape(5, 2), columns=["x", "y"]), npartitions=1).reset_index()
file_metadata = FileMetadata(source="sample_csv", mime_type="text/csv", data_set_id=DATASET_ID)
# Write the data using dask to CDF Files.
inp_df.to_csv(
"cdffs://sample/sample_csv_file.csv",
index=False,
single_file=True,
storage_options={"connection_config": client_cnf, "file_metadata": file_metadata},
)
# Read the data using dask from CDF Files.
res_df = dd.read_csv("cdffs://sample/sample_csv_file.csv", storage_options={"connection_config": client_cnf})
print(inp_df.size, res_df.size)
if __name__ == "__main__":
main()
geopandas
Example to read/write parquet file from/to CDF Files using geopandas package.
Note: geopandas package use pyarrow to read/write the parquet files from the underlying storage. pyarrow has it is own generic file system specification but it is compatible with fsspec. So, We can still make use of cdffs but we need to instantiate a new CdfFileSystem with client config and pass it as filesystem when reading/writing the files from/to CDF Files.
"""Example script for geopandas package."""
import os
import geopandas as gpd
from cognite.client import ClientConfig, global_config
from cognite.client.credentials import OAuthClientCredentials
from cognite.client.data_classes.files import FileMetadata
from cognite.cdffs import CdfFileSystem
global_config.disable_pypi_version_check = True
CLIENT_ID = os.environ.get("CLIENT_ID")
CLIENT_SECRET = os.environ.get("CLIENT_SECRET")
COGNITE_PROJECT = os.environ.get("COGNITE_PROJECT")
TENANT_ID = os.environ.get("TENANT_ID")
CDF_CLUSTER = os.environ.get("CDF_CLUSTER")
DATASET_ID = os.environ.get("DATASET_ID")
# Create a CDF Client Config
SCOPES = [f"https://{CDF_CLUSTER}.cognitedata.com/.default"]
TOKEN_URL = f"https://login.microsoftonline.com/{TENANT_ID}/oauth2/v2.0/token"
def main():
oauth_creds = OAuthClientCredentials(
token_url=TOKEN_URL, client_id=CLIENT_ID, client_secret=CLIENT_SECRET, scopes=SCOPES
)
client_cnf = ClientConfig(
client_name="cdf-client",
base_url=f"https://{CDF_CLUSTER}.cognitedata.com",
project=COGNITE_PROJECT,
credentials=oauth_creds,
timeout=60,
)
# Create a geopandas dataframe
inp_df = gpd.GeoDataFrame.from_file(gpd.datasets.get_path("nybb"))
file_metadata = FileMetadata(source="test", mime_type="application/octet-stream", data_set_id=DATASET_ID)
# geopandas package use pyarrow to read/write the parquet files from the underlying storage.
# As highlighted, pyarrow has it is own generic file system specification but it is compatible with fsspec.
# We can still make use of cdffs, we need to instantiate a new CdfFileSystem with client config
# and pass it as filesystem when reading/writing the files from/to CDF.
# Write the parquet file using Geopandas to CDF Files.
inp_df.to_parquet(
"/sample/geopandas/sample.parquet",
filesystem=CdfFileSystem(connection_config=client_cnf, file_metadata=file_metadata),
)
# Read the parquet file using Geopandas from CDF Files.
res_df = gpd.read_parquet(
"cdffs://sample/geopandas/sample.parquet", filesystem=CdfFileSystem(connection_config=client_cnf)
)
print(inp_df.shape, res_df.shape)
if __name__ == "__main__":
main()
zip file
Example to read folder from local filesystem and write zip file to CDF Files using ZipFile package.
import re
import zipfile
from pathlib import Path
import fsspec
from cognite.client.data_classes import FileMetadata
from cognite import cdffs # noqa
def clean_filename(path):
# Replace multiple slashes or colons with a single underscore
cleaned = re.sub(r"[:/]+", "_", path)
return cleaned
def main():
source_fs = fsspec.filesystem("file")
target_fs = fsspec.filesystem("cdffs", upload_strategy="azure")
BUFFER_SIZE = 1 * 1024 * 1024 # 1MB
source_files = [f for f in source_fs.glob(f"{Path(__file__).parent.parent.absolute()}/**/*") if Path(f).is_file()]
with target_fs.open(
"archive/test.zip",
"wb",
file_metadata=FileMetadata(
source="test",
data_set_id=8576667485598960,
mime_type="application/zip",
metadata={"test": "test"},
),
block_size=BUFFER_SIZE,
) as target_file:
with zipfile.ZipFile(target_file, "w") as zipf:
for source_path in source_files:
print(source_path)
with source_fs.open(source_path, "rb") as source_file:
zip_file_name = clean_filename(source_path)
zipf.writestr(zip_file_name, source_file.read())
with target_fs.open("archive/test.zip", mode="rb") as source:
with source_fs.open("test.zip", mode="wb") as destination:
destination.write(source.read())
if __name__ == "__main__":
main()