Python Analytics SDK Quickstart Guide
Install, connect, try. A quick start guide to get you up and running with Enterprise Analytics and the Python Analytics SDK.
Enterprise Analytics is a real-time analytical database (RT-OLAP) for real time apps and operational intelligence. Although maintaining some syntactic similarities with the operational SDKs, the Python Analytics SDK is developed from the ground-up for column-based analytical use cases, and supports streaming APIs to handle large datasets.
Before You Start
Install and configure an Enterprise Analytics Cluster.
Prerequisites
Currently Python 3.9 - Python 3.12 is supported. See the compatibility page for more information about platform support.
Getting the SDK
The SDK can be installed via pip
:
python -m pip install couchbase-analytics
For other installation methods, see the installation page.
Connecting and Executing a Query
Synchronous API
from couchbase_analytics.cluster import Cluster
from couchbase_analytics.credential import Credential
from couchbase_analytics.options import QueryOptions
def main() -> None:
# Update this to your cluster
endpoint = 'https://--your-instance--'
username = 'username'
pw = 'Password!123'
# User Input ends here.
cred = Credential.from_username_and_password(username, pw)
cluster = Cluster.create_instance(endpoint, cred)
# Execute a query and buffer all result rows in client memory.
statement = 'SELECT * FROM `travel-sample`.inventory.airline LIMIT 10;'
res = cluster.execute_query(statement)
all_rows = res.get_all_rows()
for row in all_rows:
print(f'Found row: {row}')
print(f'metadata={res.metadata()}')
# Execute a query and process rows as they arrive from server.
statement = 'SELECT * FROM `travel-sample`.inventory.airline WHERE country="United States" LIMIT 10;'
res = cluster.execute_query(statement)
for row in res.rows():
print(f'Found row: {row}')
print(f'metadata={res.metadata()}')
# Execute a streaming query with positional arguments.
statement = 'SELECT * FROM `travel-sample`.inventory.airline WHERE country=$1 LIMIT $2;'
res = cluster.execute_query(statement, QueryOptions(positional_parameters=['United States', 10]))
for row in res:
print(f'Found row: {row}')
print(f'metadata={res.metadata()}')
# Execute a streaming query with named arguments.
statement = 'SELECT * FROM `travel-sample`.inventory.airline WHERE country=$country LIMIT $limit;'
res = cluster.execute_query(statement, QueryOptions(named_parameters={'country': 'United States',
'limit': 10}))
for row in res.rows():
print(f'Found row: {row}')
print(f'metadata={res.metadata()}')
if __name__ == '__main__':
main()
Asynchronous (asyncio) API
import asyncio
from acouchbase_analytics.cluster import AsyncCluster
from acouchbase_analytics.credential import Credential
from acouchbase_analytics.options import QueryOptions
async def main() -> None:
# Update this to your cluster
endpoint = 'https://--your-instance--'
username = 'username'
pw = 'Password!123'
# User Input ends here.
cred = Credential.from_username_and_password(username, pw)
cluster = AsyncCluster.create_instance(endpoint, cred)
# Execute a query and buffer all result rows in client memory.
statement = 'SELECT * FROM `travel-sample`.inventory.airline LIMIT 10;'
res = await cluster.execute_query(statement)
all_rows = await res.get_all_rows()
# NOTE: all_rows is a list, _do not_ use `async for`
for row in all_rows:
print(f'Found row: {row}')
print(f'metadata={res.metadata()}')
# Execute a query and process rows as they arrive from server.
statement = 'SELECT * FROM `travel-sample`.inventory.airline WHERE country="United States" LIMIT 10;'
res = await cluster.execute_query(statement)
async for row in res.rows():
print(f'Found row: {row}')
print(f'metadata={res.metadata()}')
# Execute a streaming query with positional arguments.
statement = 'SELECT * FROM `travel-sample`.inventory.airline WHERE country=$1 LIMIT $2;'
res = await cluster.execute_query(statement, QueryOptions(positional_parameters=['United States', 10]))
async for row in res:
print(f'Found row: {row}')
print(f'metadata={res.metadata()}')
# Execute a streaming query with named arguments.
statement = 'SELECT * FROM `travel-sample`.inventory.airline WHERE country=$country LIMIT $limit;'
res = await cluster.execute_query(statement, QueryOptions(named_parameters={'country': 'United States',
'limit': 10}))
async for row in res.rows():
print(f'Found row: {row}')
print(f'metadata={res.metadata()}')
if __name__ == '__main__':
asyncio.run(main())
Connection String
The connStr
in the above example should takes the form of "https://<your_hostname>:" + PORT.
The default port is 443, for TLS connections.
You do not need to give a port number if you are using port 443 — hostname = "https://<your_hostname>"
is effectively the same as `hostname = "https://<your_hostname>:" + "443"
If you are using a different port — for example, connecting to a cluster without a load balancer, directly to the Analytics port, 18095
— or not using TLS,
then see the Connecting to Enterprise Analytics page.
Migration from Row-Based Analytics
If you are migrating a project from CBAS — our Analytics service on Capella Operational and Couchbase Server, using our operational SDKs — then information on migration can be found in the Enterprise Analytics docs.
In particular, refer to the SDK section of the Enterprise Analytics migration pages.