Examples

Few examples with various fsspec supported/compatible python packages are given below.

pandas

Example to read/write csv file from/to CDF Files using pandas package. Environment variables are used to authenticate.

"""Example script for pandas package."""
import os

import numpy as np
import pandas as pd
from cognite.client import global_config
from cognite.client.data_classes.files import FileMetadata

from cognite import cdffs  # noqa

global_config.disable_pypi_version_check = True

DATASET_ID = os.environ.get("DATASET_ID")


def main():
    # Create a dataframe
    df = pd.DataFrame({"x": np.arange(1000), "y": np.arange(1000)})
    file_metadata = FileMetadata(source="pandas_test", mime_type="text/csv", data_set_id=DATASET_ID)

    # Write the data using pandas to CDF Files.
    df.to_csv(
        "cdffs://pandas_test/out/pandas_df.csv",
        index=False,
        storage_options={"file_metadata": file_metadata},
    )

    # Read the data using pandas from CDF Files.
    df2 = pd.read_csv("cdffs://pandas_test/out/pandas_df.csv")

    print(df.shape, df2.shape)


if __name__ == "__main__":
    main()

xarray

Example to read/write zarr files from/to CDF Files using xarray package.

"""Example script for xarray package."""
import os

import numpy as np
import pandas as pd
import xarray
from cognite.client import ClientConfig, global_config
from cognite.client.credentials import OAuthClientCredentials
from cognite.client.data_classes.files import FileMetadata

from cognite import cdffs  # noqa

global_config.disable_pypi_version_check = True

CLIENT_ID = os.environ.get("CLIENT_ID")
CLIENT_SECRET = os.environ.get("CLIENT_SECRET")
COGNITE_PROJECT = os.environ.get("COGNITE_PROJECT")
TENANT_ID = os.environ.get("TENANT_ID")
CDF_CLUSTER = os.environ.get("CDF_CLUSTER")
DATASET_ID = os.environ.get("DATASET_ID")

# Create a CDF Client Config
SCOPES = [f"https://{CDF_CLUSTER}.cognitedata.com/.default"]
TOKEN_URL = f"https://login.microsoftonline.com/{TENANT_ID}/oauth2/v2.0/token"


def main():
    oauth_creds = OAuthClientCredentials(
        token_url=TOKEN_URL, client_id=CLIENT_ID, client_secret=CLIENT_SECRET, scopes=SCOPES
    )
    client_cnf = ClientConfig(
        client_name="cdf-client",
        base_url=f"https://{CDF_CLUSTER}.cognitedata.com",
        project=COGNITE_PROJECT,
        credentials=oauth_creds,
        timeout=60,
    )

    # Create a dataset
    df = pd.DataFrame({"x": np.arange(1000), "y": np.arange(1000)})
    ds1 = df.to_xarray()

    # Write the zarr files using xarray to CDF Files.
    file_metadata = FileMetadata(source="sample_zarr", mime_type="application/octet-stream", data_set_id=DATASET_ID)
    ds1.to_zarr(
        "cdffs://sample/test.zarr", storage_options={"connection_config": client_cnf, "file_metadata": file_metadata}
    )

    # Read the zarr files using xarray from CDF Files.
    ds2 = xarray.open_zarr("cdffs://sample/test.zarr", storage_options={"connection_config": client_cnf})

    print(ds1.info, ds2.info)


if __name__ == "__main__":
    main()

dask

Example to read/write csv file from/to CDF Files using dask package.

"""Example script for dask package."""
import os

import dask.dataframe as dd
import numpy as np
import pandas as pd
from cognite.client import ClientConfig, global_config
from cognite.client.credentials import OAuthClientCredentials
from cognite.client.data_classes.files import FileMetadata

from cognite import cdffs  # noqa

global_config.disable_pypi_version_check = True

CLIENT_ID = os.environ.get("CLIENT_ID")
CLIENT_SECRET = os.environ.get("CLIENT_SECRET")
COGNITE_PROJECT = os.environ.get("COGNITE_PROJECT")
TENANT_ID = os.environ.get("TENANT_ID")
CDF_CLUSTER = os.environ.get("CDF_CLUSTER")
DATASET_ID = os.environ.get("DATASET_ID")

# Create a CDF Client Config
SCOPES = [f"https://{CDF_CLUSTER}.cognitedata.com/.default"]
TOKEN_URL = f"https://login.microsoftonline.com/{TENANT_ID}/oauth2/v2.0/token"


def main():
    oauth_creds = OAuthClientCredentials(
        token_url=TOKEN_URL, client_id=CLIENT_ID, client_secret=CLIENT_SECRET, scopes=SCOPES
    )
    client_cnf = ClientConfig(
        client_name="cdf-client",
        base_url=f"https://{CDF_CLUSTER}.cognitedata.com",
        project=COGNITE_PROJECT,
        credentials=oauth_creds,
        timeout=60,
    )

    # Create a Dask Dataframe from pandas
    inp_df = dd.from_pandas(pd.DataFrame(np.arange(10).reshape(5, 2), columns=["x", "y"]), npartitions=1).reset_index()
    file_metadata = FileMetadata(source="sample_csv", mime_type="text/csv", data_set_id=DATASET_ID)

    # Write the data using dask to CDF Files.
    inp_df.to_csv(
        "cdffs://sample/sample_csv_file.csv",
        index=False,
        single_file=True,
        storage_options={"connection_config": client_cnf, "file_metadata": file_metadata},
    )

    # Read the data using dask from CDF Files.
    res_df = dd.read_csv("cdffs://sample/sample_csv_file.csv", storage_options={"connection_config": client_cnf})

    print(inp_df.size, res_df.size)


if __name__ == "__main__":
    main()

geopandas

Example to read/write parquet file from/to CDF Files using geopandas package.

Note: geopandas package use pyarrow to read/write the parquet files from the underlying storage. pyarrow has it is own generic file system specification but it is compatible with fsspec. So, We can still make use of cdffs but we need to instantiate a new CdfFileSystem with client config and pass it as filesystem when reading/writing the files from/to CDF Files.

"""Example script for geopandas package."""
import os

import geopandas as gpd
from cognite.client import ClientConfig, global_config
from cognite.client.credentials import OAuthClientCredentials
from cognite.client.data_classes.files import FileMetadata

from cognite.cdffs import CdfFileSystem

global_config.disable_pypi_version_check = True

CLIENT_ID = os.environ.get("CLIENT_ID")
CLIENT_SECRET = os.environ.get("CLIENT_SECRET")
COGNITE_PROJECT = os.environ.get("COGNITE_PROJECT")
TENANT_ID = os.environ.get("TENANT_ID")
CDF_CLUSTER = os.environ.get("CDF_CLUSTER")
DATASET_ID = os.environ.get("DATASET_ID")

# Create a CDF Client Config
SCOPES = [f"https://{CDF_CLUSTER}.cognitedata.com/.default"]
TOKEN_URL = f"https://login.microsoftonline.com/{TENANT_ID}/oauth2/v2.0/token"


def main():
    oauth_creds = OAuthClientCredentials(
        token_url=TOKEN_URL, client_id=CLIENT_ID, client_secret=CLIENT_SECRET, scopes=SCOPES
    )
    client_cnf = ClientConfig(
        client_name="cdf-client",
        base_url=f"https://{CDF_CLUSTER}.cognitedata.com",
        project=COGNITE_PROJECT,
        credentials=oauth_creds,
        timeout=60,
    )

    # Create a geopandas dataframe
    inp_df = gpd.GeoDataFrame.from_file(gpd.datasets.get_path("nybb"))
    file_metadata = FileMetadata(source="test", mime_type="application/octet-stream", data_set_id=DATASET_ID)

    # geopandas package use pyarrow to read/write the parquet files from the underlying storage.
    # As highlighted, pyarrow has it is own generic file system specification but it is compatible with fsspec.
    # We can still make use of cdffs, we need to instantiate a new CdfFileSystem with client config
    # and pass it as filesystem when reading/writing the files from/to CDF.

    # Write the parquet file using Geopandas to CDF Files.
    inp_df.to_parquet(
        "/sample/geopandas/sample.parquet",
        filesystem=CdfFileSystem(connection_config=client_cnf, file_metadata=file_metadata),
    )

    # Read the parquet file using Geopandas from CDF Files.
    res_df = gpd.read_parquet(
        "cdffs://sample/geopandas/sample.parquet", filesystem=CdfFileSystem(connection_config=client_cnf)
    )

    print(inp_df.shape, res_df.shape)


if __name__ == "__main__":
    main()

zip file

Example to read folder from local filesystem and write zip file to CDF Files using ZipFile package.

import re
import zipfile
from pathlib import Path

import fsspec
from cognite.client.data_classes import FileMetadata

from cognite import cdffs  # noqa


def clean_filename(path):
    # Replace multiple slashes or colons with a single underscore
    cleaned = re.sub(r"[:/]+", "_", path)
    return cleaned


def main():
    source_fs = fsspec.filesystem("file")
    target_fs = fsspec.filesystem("cdffs", upload_strategy="azure")

    BUFFER_SIZE = 1 * 1024 * 1024  # 1MB

    source_files = [f for f in source_fs.glob(f"{Path(__file__).parent.parent.absolute()}/**/*") if Path(f).is_file()]

    with target_fs.open(
        "archive/test.zip",
        "wb",
        file_metadata=FileMetadata(
            source="test",
            data_set_id=8576667485598960,
            mime_type="application/zip",
            metadata={"test": "test"},
        ),
        block_size=BUFFER_SIZE,
    ) as target_file:
        with zipfile.ZipFile(target_file, "w") as zipf:
            for source_path in source_files:
                print(source_path)
                with source_fs.open(source_path, "rb") as source_file:
                    zip_file_name = clean_filename(source_path)
                    zipf.writestr(zip_file_name, source_file.read())

    with target_fs.open("archive/test.zip", mode="rb") as source:
        with source_fs.open("test.zip", mode="wb") as destination:
            destination.write(source.read())


if __name__ == "__main__":
    main()