It seems in trying to “fix” my connection problem I’ve made it worse. Now even trying to connect to the CB instance is failing.
This is what I changed the app CB account permissions to:
As for versions:
couchbase python SDK : 3.2.7
Python : 3.7.8
FastAPI: 0.75.1
OS ubuntu : 20.04
import sys
import os
import asyncio
import json
import numbers
import socket
import time
import re            # regex package
import uvicorn      # fastAPI ingress service
from fastapi import FastAPI, Header, HTTPException, Request, status, Response
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from pydantic import BaseModel
# local libraries
from dal import DAL
from logstash_logger import send_log_message
from data_classes import bvMeta, IncentiveProgramResponse, Message
#-----------------------
# Declarations         |
#-----------------------
# CONSTANTS
dbHostUri = "http://dev-couchbase.bluevolt.io"
DbServiceUser = os.getenv('DB_SERVICE_USER')
DbServiceKey = os.getenv('DB_SERVICE_PASSWORD')
POOL_SIZE = 4
MAX_RETRIES = 10
def get_available_connection():
    # this routine finds the next available free connection in our database connection pool, reserves it, and
    # passes it back to the requester.  If no connection is available after MAX_RETRIES, None is returned
    global cache_pool
    connection = None
    retries = 0
    while not connection and retries < MAX_RETRIES:
        for item in range(POOL_SIZE):
            if not cache_pool[item]['busy']:
                connection = cache_pool[item]
                cache_pool[item]['busy'] = True
                break
        if not connection:
            time.sleep(0.1)
            retries += 1
            if retries == MAX_RETRIES: send_log_message("WARNING", None, "No available connections in pool", SERVICE_NAME)
    return connection
def free_connection(pool_id):
    # this routine frees a connection up in our cache connection pool
    global cache_pool
    cache_pool[pool_id]['busy'] = False
    return
#
# initilize db connection
#
# create cache connection.  Since our kafka listener is asynchronous, we only need
# a single synchronous cache connection.  Per couchbase doc, it is best to create this globally and allow it to
# persist, for performance reasons.  If no connection could be established, set cache_status accordingly
cache_pool = []
try:
    for i in range(POOL_SIZE):
        cache_pool.append({'id': i, 'cache': DAL(DbServiceUser, DbServiceKey, dbHostUri), 'busy': False})
    cache_status = 'connected'
except Exception as e:
    cache_status = 'unable to connect'
# initialize (FAST) API framework.
app = FastAPI()
# configure Cross-Origin Resource Sharing configuration
# TODO : Have to enter all the allowed origins
origins = ["*"]
app.add_middleware(
    CORSMiddleware,
    allow_origins=origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)   
# GET available incentive programs for a given uni
#   The uniId selector is pulled from the BVMeta header object
@app.get("/incentives/uniPrograms", responses = {
    200: {"model": IncentiveProgramResponse},
    400: {"model": Message},
    404: {"model": Message, "description": "No incentive programs found."},
    403: {"model": Message, "description": "BAD_REQUEST"},
    422: {"model": Message, "message": "Missing or Invalid BVMeta.", "description": "UNPROCESSABLE_ENTITY"}})
async def getUniIncentivePrograms(request: Request, bvmeta: str = Header(None)):
    # validate ourBVMeta header contents
    bv_header = validate_bvmeta_header(bvmeta)
    uniId = 0
    if bv_header["universityId"] != {}: 
        if bv_header["universityId"] == None: return JSONResponse(status_code=403, content={"message": "Missing or invalid parameters."})
        uniId = bv_header["universityId"]
        # various validations - may well be moved to the DAL or BR layer
        # validate user is a member of uniAdmin for the university or a BTAdmin with proper permissions.
        # validate Uni is enabled for 3rd party incentive programs
        # validate incentive program meta data exists.
        try:
            if cache_status == "connected":
                cache_instance = get_available_connection()
                if cache_instance:
                    DAL = cache_instance["cache"]
                    results = DAL.fetchUniIncentivePrograms(uniId)
                    return JSONResponse(status_code=200, content={"message": results})
            else:
                return JSONResponse(status_code=404)    
        except: # catch *all* exceptions
            e = sys.exc_info()[0]
            return JSONResponse(status_code=400, content={"message": "<p>Error: %s</p>" % e})
    else: return JSONResponse(status_code=422, content={"message": "Missing or invalid bvMeta."})
if __name__ == "__main__":
    uvicorn.run(app, host="127.0.0.1", port=8000)
In the DAL.py; the data access layer:
class DAL:
    def __init__(self, user, passwd, host, bucket_name='BvUniversity',
                 collection_name="_default"):
        self.cluster = None
        self.bucket = None
        self.scope = None
        self.collection = None
        self.bucket_name = bucket_name
        self.scope_name = '_default'
        self.collection_name = collection_name
        self.user = user
        self.passwd = passwd
        self.host = host
        self.connect()
    #
    def connect(self):
        self.cluster = Cluster(self.host, ClusterOptions(PasswordAuthenticator(self.user, self.passwd)))
        self.bucket = self.cluster.bucket(self.bucket_name)
        self.collection = self.bucket.scope(self.scope_name).collection(self.collection_name)
    #
    def query(self, n1ql, params=[], autocommit=False):
        #if self.conn.closed:  # first off, lets confirm that we are still connected to the database.  Reconnect if not.
        #    self.connect()
        #    self.open_cursor()
        try:
            if params:
                result = self.cluster.query(n1ql, QueryOptions(positional_parameters=params))
            else:
                result = self.cluster.query(n1ql)
        except:
            result = {}
        #if autocommit and result:sql
        #    self.conn.commit()
        return result
    #
      def getUsersAvailableIncentivePrograms(self, userId, universityId):
        rows = self.query("SELECT im.userId, \
                                im.incentivePrograms[*].incentiveProgramId, \
                                ip.programName \
                            FROM default:`incentiveMemberships` im \
                            UNNEST im.incentivePrograms \
                            JOIN default:`incentivePrograms` ip \
                                ON im.incentivePrograms[*].incentiveProgramId = META(ip).id \
                            WHERE im.userId = $1", [userId])
        incentivePrograms = []
        for row in rows:
            incentivePrograms.append(row)
        return incentivePrograms
In the schema classes:
import uuid
from pydantic import BaseModel
from enum import Enum
class IncentiveProgramResponse(BaseModel):
    incentiveProgramId : str
    Name : str
#   StatusResponseCodes  - http codes used in this service
# TODO: add  statuses. 
class StatusResponseCodes(Enum):
    Approved = 200
    BadRequest = 400
    Unauthorized = 401
    Forbidden = 403
    NotFound = 404
    Conflict = 409
    InvalidAttributes = 422
    TooManyRequests = 429
    InternalServerError = 500
    ServiceUnavailable = 503   
    
class bvMeta (BaseModel):    
    transactionId: int
    universityId: int
    userId: int
    
class Message (BaseModel):
    message: str
    
I’m working on setting up and finding the log files.