Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions packages/gooddata-sdk/src/gooddata_sdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,17 @@
import logging

from gooddata_sdk._version import __version__
from gooddata_sdk.catalog.ai_lake.entity_model.database_instance import (
CatalogDatabaseInstance,
CatalogProvisionDatabaseInstanceRequest,
)
from gooddata_sdk.catalog.ai_lake.entity_model.pipe_table import (
CatalogCreatePipeTableRequest,
CatalogPipeTable,
CatalogPipeTableSummary,
)
from gooddata_sdk.catalog.ai_lake.entity_model.service_info import CatalogServiceInfo
from gooddata_sdk.catalog.ai_lake.service import CatalogAiLakeService
from gooddata_sdk.catalog.appearance.entity_model.color_palette import (
CatalogColorPalette,
CatalogColorPaletteAttributes,
Expand Down Expand Up @@ -36,6 +47,7 @@
)
from gooddata_sdk.catalog.data_source.entity_model.data_source import (
CatalogDataSource,
CatalogDataSourceAiLakehouse,
CatalogDataSourceBigQuery,
CatalogDataSourceDatabricks,
CatalogDataSourceGdStorage,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# (C) 2026 GoodData Corporation
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# (C) 2026 GoodData Corporation
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# (C) 2026 GoodData Corporation
from __future__ import annotations

from typing import Any

import attrs
from gooddata_api_client.model.provision_database_instance_request import ProvisionDatabaseInstanceRequest


@attrs.define(kw_only=True)
class CatalogDatabaseInstance:
"""Represents an AI Lake database instance."""

id: str
name: str
storage_ids: list[str] = attrs.field(factory=list)

@classmethod
def from_api(cls, entity: dict[str, Any]) -> CatalogDatabaseInstance:
return cls(
id=entity["id"],
name=entity["name"],
storage_ids=entity.get("storage_ids") or [],
)


@attrs.define(kw_only=True)
class CatalogProvisionDatabaseInstanceRequest:
"""Request to provision a new AI Lake database instance."""

name: str
storage_ids: list[str] = attrs.field(factory=list)

def as_api_model(self) -> ProvisionDatabaseInstanceRequest:
return ProvisionDatabaseInstanceRequest(
name=self.name,
storage_ids=self.storage_ids,
_check_type=False,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# (C) 2026 GoodData Corporation
from __future__ import annotations

from typing import Any

import attrs
from gooddata_api_client.model.create_pipe_table_request import CreatePipeTableRequest


@attrs.define(kw_only=True)
class CatalogPipeTableSummary:
"""Summary of an AI Lake pipe table as returned by the list endpoint."""

pipe_table_id: str
table_name: str
path_prefix: str
columns: list[dict[str, Any]] = attrs.field(factory=list)

@classmethod
def from_api(cls, entity: dict[str, Any]) -> CatalogPipeTableSummary:
return cls(
pipe_table_id=entity["pipe_table_id"],
table_name=entity["table_name"],
path_prefix=entity["path_prefix"],
columns=entity.get("columns") or [],
)


@attrs.define(kw_only=True)
class CatalogPipeTable:
"""Full representation of an AI Lake pipe table."""

pipe_table_id: str
table_name: str
source_storage_name: str
path_prefix: str
database_name: str
polling_interval_seconds: int
partition_columns: list[str] = attrs.field(factory=list)
table_properties: dict[str, str] = attrs.field(factory=dict)
columns: list[dict[str, Any]] = attrs.field(factory=list)

@classmethod
def from_api(cls, entity: dict[str, Any]) -> CatalogPipeTable:
return cls(
pipe_table_id=entity["pipe_table_id"],
table_name=entity["table_name"],
source_storage_name=entity["source_storage_name"],
path_prefix=entity["path_prefix"],
database_name=entity["database_name"],
polling_interval_seconds=entity["polling_interval_seconds"],
partition_columns=entity.get("partition_columns") or [],
table_properties=entity.get("table_properties") or {},
columns=entity.get("columns") or [],
)


@attrs.define(kw_only=True)
class CatalogCreatePipeTableRequest:
"""Request to create a new AI Lake pipe table."""

path_prefix: str
source_storage_name: str
table_name: str
column_overrides: dict[str, str] | None = None
max_varchar_length: int | None = None
polling_interval_seconds: int | None = None
table_properties: dict[str, str] | None = None

def as_api_model(self) -> CreatePipeTableRequest:
kwargs: dict[str, Any] = {}
if self.column_overrides is not None:
kwargs["column_overrides"] = self.column_overrides
if self.max_varchar_length is not None:
kwargs["max_varchar_length"] = self.max_varchar_length
if self.polling_interval_seconds is not None:
kwargs["polling_interval_seconds"] = self.polling_interval_seconds
if self.table_properties is not None:
kwargs["table_properties"] = self.table_properties
return CreatePipeTableRequest(
path_prefix=self.path_prefix,
source_storage_name=self.source_storage_name,
table_name=self.table_name,
_check_type=False,
**kwargs,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# (C) 2026 GoodData Corporation
from __future__ import annotations

from typing import Any

import attrs


@attrs.define(kw_only=True)
class CatalogServiceInfo:
"""Information about an AI Lake service."""

name: str
service_id: str

@classmethod
def from_api(cls, entity: dict[str, Any]) -> CatalogServiceInfo:
return cls(
name=entity["name"],
service_id=entity["service_id"],
)
183 changes: 183 additions & 0 deletions packages/gooddata-sdk/src/gooddata_sdk/catalog/ai_lake/service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
# (C) 2026 GoodData Corporation
from __future__ import annotations

from gooddata_sdk.catalog.ai_lake.entity_model.database_instance import (
CatalogDatabaseInstance,
CatalogProvisionDatabaseInstanceRequest,
)
from gooddata_sdk.catalog.ai_lake.entity_model.pipe_table import (
CatalogCreatePipeTableRequest,
CatalogPipeTable,
CatalogPipeTableSummary,
)
from gooddata_sdk.catalog.ai_lake.entity_model.service_info import CatalogServiceInfo
from gooddata_sdk.client import GoodDataApiClient


class CatalogAiLakeService:
"""Service for managing AI Lake database instances, pipe tables, and StarRocks services."""

def __init__(self, api_client: GoodDataApiClient) -> None:
self._client = api_client
self._ai_lake_api = api_client.ai_lake_api
self._ai_lake_pipe_tables_api = api_client.ai_lake_pipe_tables_api

# Database instance methods

def provision_database_instance(
self,
request: CatalogProvisionDatabaseInstanceRequest,
) -> None:
"""Provision a new AI Lake database instance.

Args:
request (CatalogProvisionDatabaseInstanceRequest):
The provision request containing name and storage IDs.

Returns:
None
"""
self._ai_lake_api.provision_ai_lake_database_instance(
request.as_api_model(),
_check_return_type=False,
)

def get_database_instance(self, instance_id: str) -> CatalogDatabaseInstance:
"""Retrieve an AI Lake database instance by ID or name.

Args:
instance_id (str):
Database instance identifier (name preferred, or UUID).

Returns:
CatalogDatabaseInstance:
The database instance.
"""
response = self._ai_lake_api.get_ai_lake_database_instance(
instance_id,
_check_return_type=False,
)
return CatalogDatabaseInstance.from_api(response.to_dict(camel_case=False))

def list_database_instances(self) -> list[CatalogDatabaseInstance]:
"""List all AI Lake database instances.

Returns:
list[CatalogDatabaseInstance]:
All database instances in the organization.
"""
response = self._ai_lake_api.list_ai_lake_database_instances(
_check_return_type=False,
)
data = response.to_dict(camel_case=False)
return [CatalogDatabaseInstance.from_api(db) for db in data.get("databases") or []]

def deprovision_database_instance(self, instance_id: str) -> None:
"""Delete an existing AI Lake database instance.

Args:
instance_id (str):
Database instance identifier (name preferred, or UUID).

Returns:
None
"""
self._ai_lake_api.deprovision_ai_lake_database_instance(
instance_id,
_check_return_type=False,
)

# Pipe table methods

def create_pipe_table(
self,
instance_id: str,
request: CatalogCreatePipeTableRequest,
) -> None:
"""Create a new AI Lake pipe table in the given database instance.

Args:
instance_id (str):
Database instance identifier.
request (CatalogCreatePipeTableRequest):
The create request with path prefix, source storage name, and table name.

Returns:
None
"""
self._ai_lake_pipe_tables_api.create_ai_lake_pipe_table(
instance_id,
request.as_api_model(),
_check_return_type=False,
)

def get_pipe_table(self, instance_id: str, table_name: str) -> CatalogPipeTable:
"""Retrieve a specific AI Lake pipe table.

Args:
instance_id (str):
Database instance identifier.
table_name (str):
OLAP table name.

Returns:
CatalogPipeTable:
The full pipe table details.
"""
response = self._ai_lake_pipe_tables_api.get_ai_lake_pipe_table(
instance_id,
table_name,
_check_return_type=False,
)
return CatalogPipeTable.from_api(response.to_dict(camel_case=False))

def list_pipe_tables(self, instance_id: str) -> list[CatalogPipeTableSummary]:
"""List all AI Lake pipe tables for a database instance.

Args:
instance_id (str):
Database instance identifier.

Returns:
list[CatalogPipeTableSummary]:
All pipe tables in the given database instance.
"""
response = self._ai_lake_pipe_tables_api.list_ai_lake_pipe_tables(
instance_id,
_check_return_type=False,
)
data = response.to_dict(camel_case=False)
return [CatalogPipeTableSummary.from_api(pt) for pt in data.get("pipe_tables") or []]

def delete_pipe_table(self, instance_id: str, table_name: str) -> None:
"""Delete an AI Lake pipe table.

Args:
instance_id (str):
Database instance identifier.
table_name (str):
OLAP table name.

Returns:
None
"""
self._ai_lake_pipe_tables_api.delete_ai_lake_pipe_table(
instance_id,
table_name,
_check_return_type=False,
)

# Service methods

def list_services(self) -> list[CatalogServiceInfo]:
"""List all AI Lake services configured for the organization.

Returns:
list[CatalogServiceInfo]:
All services configured in the organization's AI Lake.
"""
response = self._ai_lake_api.list_ai_lake_services(
_check_return_type=False,
)
data = response.to_dict(camel_case=False)
return [CatalogServiceInfo.from_api(svc) for svc in data.get("services") or []]
Original file line number Diff line number Diff line change
Expand Up @@ -318,3 +318,12 @@ class CatalogDataSourceGdStorage(CatalogDataSource):
type: str = "GDSTORAGE"
schema: str = ""
credentials: Credentials = field(factory=_NoCredentials, repr=False)


@define(kw_only=True, eq=False)
class CatalogDataSourceAiLakehouse(CatalogDataSource):
"""Data source backed by an AI Lake (StarRocks) database instance."""

type: str = "AILAKEHOUSE"
schema: str = ""
credentials: Credentials = field(factory=_NoCredentials, repr=False)
Loading
Loading