Python Analytics SDK Quickstart Guide

      +
      Install, connect, try. A quick start guide to get you up and running with Enterprise Analytics and the Python Analytics SDK.

      Enterprise Analytics is a real-time analytical database (RT-OLAP) for real time apps and operational intelligence. Although maintaining some syntactic similarities with the operational SDKs, the Python Analytics SDK is developed from the ground-up for column-based analytical use cases, and supports streaming APIs to handle large datasets.

      Before You Start

      Install and configure an Enterprise Analytics Cluster.

      Prerequisites

      Currently Python 3.9 - Python 3.12 is supported. See the compatibility page for more information about platform support.

      Getting the SDK

      The SDK can be installed via pip:

      python -m pip install couchbase-analytics

      For other installation methods, see the installation page.

      Connecting and Executing a Query

      Synchronous API

      from couchbase_analytics.cluster import Cluster
      from couchbase_analytics.credential import Credential
      from couchbase_analytics.options import QueryOptions
      
      
      def main() -> None:
          # Update this to your cluster
          endpoint = 'https://--your-instance--'
          username = 'username'
          pw = 'Password!123'
          # User Input ends here.
      
          cred = Credential.from_username_and_password(username, pw)
          cluster = Cluster.create_instance(endpoint, cred)
      
          # Execute a query and buffer all result rows in client memory.
          statement = 'SELECT * FROM `travel-sample`.inventory.airline LIMIT 10;'
          res = cluster.execute_query(statement)
          all_rows = res.get_all_rows()
          for row in all_rows:
              print(f'Found row: {row}')
          print(f'metadata={res.metadata()}')
      
          # Execute a query and process rows as they arrive from server.
          statement = 'SELECT * FROM `travel-sample`.inventory.airline WHERE country="United States" LIMIT 10;'
          res = cluster.execute_query(statement)
          for row in res.rows():
              print(f'Found row: {row}')
          print(f'metadata={res.metadata()}')
      
          # Execute a streaming query with positional arguments.
          statement = 'SELECT * FROM `travel-sample`.inventory.airline WHERE country=$1 LIMIT $2;'
          res = cluster.execute_query(statement, QueryOptions(positional_parameters=['United States', 10]))
          for row in res:
              print(f'Found row: {row}')
          print(f'metadata={res.metadata()}')
      
          # Execute a streaming query with named arguments.
          statement = 'SELECT * FROM `travel-sample`.inventory.airline WHERE country=$country LIMIT $limit;'
          res = cluster.execute_query(statement, QueryOptions(named_parameters={'country': 'United States',
                                                                                'limit': 10}))
          for row in res.rows():
              print(f'Found row: {row}')
          print(f'metadata={res.metadata()}')
      
      
      if __name__ == '__main__':
          main()

      Asynchronous (asyncio) API

      import asyncio
      
      from acouchbase_analytics.cluster import AsyncCluster
      from acouchbase_analytics.credential import Credential
      from acouchbase_analytics.options import QueryOptions
      
      
      async def main() -> None:
          # Update this to your cluster
          endpoint = 'https://--your-instance--'
          username = 'username'
          pw = 'Password!123'
          # User Input ends here.
      
          cred = Credential.from_username_and_password(username, pw)
          cluster = AsyncCluster.create_instance(endpoint, cred)
      
          # Execute a query and buffer all result rows in client memory.
          statement = 'SELECT * FROM `travel-sample`.inventory.airline LIMIT 10;'
          res = await cluster.execute_query(statement)
          all_rows = await res.get_all_rows()
          # NOTE: all_rows is a list, _do not_ use `async for`
          for row in all_rows:
              print(f'Found row: {row}')
          print(f'metadata={res.metadata()}')
      
          # Execute a query and process rows as they arrive from server.
          statement = 'SELECT * FROM `travel-sample`.inventory.airline WHERE country="United States" LIMIT 10;'
          res = await cluster.execute_query(statement)
          async for row in res.rows():
              print(f'Found row: {row}')
          print(f'metadata={res.metadata()}')
      
          # Execute a streaming query with positional arguments.
          statement = 'SELECT * FROM `travel-sample`.inventory.airline WHERE country=$1 LIMIT $2;'
          res = await cluster.execute_query(statement, QueryOptions(positional_parameters=['United States', 10]))
          async for row in res:
              print(f'Found row: {row}')
          print(f'metadata={res.metadata()}')
      
          # Execute a streaming query with named arguments.
          statement = 'SELECT * FROM `travel-sample`.inventory.airline WHERE country=$country LIMIT $limit;'
          res = await cluster.execute_query(statement, QueryOptions(named_parameters={'country': 'United States',
                                                                                      'limit': 10}))
          async for row in res.rows():
              print(f'Found row: {row}')
          print(f'metadata={res.metadata()}')
      
      if __name__ == '__main__':
          asyncio.run(main())

      Connection String

      The connStr in the above example should takes the form of "https://<your_hostname>:" + PORT. The default port is 443, for TLS connections. You do not need to give a port number if you are using port 443 — hostname = "https://<your_hostname>" is effectively the same as `hostname = "https://<your_hostname>:" + "443"

      If you are using a different port — for example, connecting to a cluster without a load balancer, directly to the Analytics port, 18095 — or not using TLS, then see the Connecting to Enterprise Analytics page.

      Migration from Row-Based Analytics

      If you are migrating a project from CBAS — our Analytics service on Capella Operational and Couchbase Server, using our operational SDKs — then information on migration can be found in the Enterprise Analytics docs.

      In particular, refer to the SDK section of the Enterprise Analytics migration pages.