It seems in trying to “fix” my connection problem I’ve made it worse. Now even trying to connect to the CB instance is failing.
This is what I changed the app CB account permissions to:
As for versions:
couchbase python SDK : 3.2.7
Python : 3.7.8
FastAPI: 0.75.1
OS ubuntu : 20.04
import sys
import os
import asyncio
import json
import numbers
import socket
import time
import re # regex package
import uvicorn # fastAPI ingress service
from fastapi import FastAPI, Header, HTTPException, Request, status, Response
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from pydantic import BaseModel
# local libraries
from dal import DAL
from logstash_logger import send_log_message
from data_classes import bvMeta, IncentiveProgramResponse, Message
#-----------------------
# Declarations |
#-----------------------
# CONSTANTS
dbHostUri = "http://dev-couchbase.bluevolt.io"
DbServiceUser = os.getenv('DB_SERVICE_USER')
DbServiceKey = os.getenv('DB_SERVICE_PASSWORD')
POOL_SIZE = 4
MAX_RETRIES = 10
def get_available_connection():
# this routine finds the next available free connection in our database connection pool, reserves it, and
# passes it back to the requester. If no connection is available after MAX_RETRIES, None is returned
global cache_pool
connection = None
retries = 0
while not connection and retries < MAX_RETRIES:
for item in range(POOL_SIZE):
if not cache_pool[item]['busy']:
connection = cache_pool[item]
cache_pool[item]['busy'] = True
break
if not connection:
time.sleep(0.1)
retries += 1
if retries == MAX_RETRIES: send_log_message("WARNING", None, "No available connections in pool", SERVICE_NAME)
return connection
def free_connection(pool_id):
# this routine frees a connection up in our cache connection pool
global cache_pool
cache_pool[pool_id]['busy'] = False
return
#
# initilize db connection
#
# create cache connection. Since our kafka listener is asynchronous, we only need
# a single synchronous cache connection. Per couchbase doc, it is best to create this globally and allow it to
# persist, for performance reasons. If no connection could be established, set cache_status accordingly
cache_pool = []
try:
for i in range(POOL_SIZE):
cache_pool.append({'id': i, 'cache': DAL(DbServiceUser, DbServiceKey, dbHostUri), 'busy': False})
cache_status = 'connected'
except Exception as e:
cache_status = 'unable to connect'
# initialize (FAST) API framework.
app = FastAPI()
# configure Cross-Origin Resource Sharing configuration
# TODO : Have to enter all the allowed origins
origins = ["*"]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# GET available incentive programs for a given uni
# The uniId selector is pulled from the BVMeta header object
@app.get("/incentives/uniPrograms", responses = {
200: {"model": IncentiveProgramResponse},
400: {"model": Message},
404: {"model": Message, "description": "No incentive programs found."},
403: {"model": Message, "description": "BAD_REQUEST"},
422: {"model": Message, "message": "Missing or Invalid BVMeta.", "description": "UNPROCESSABLE_ENTITY"}})
async def getUniIncentivePrograms(request: Request, bvmeta: str = Header(None)):
# validate ourBVMeta header contents
bv_header = validate_bvmeta_header(bvmeta)
uniId = 0
if bv_header["universityId"] != {}:
if bv_header["universityId"] == None: return JSONResponse(status_code=403, content={"message": "Missing or invalid parameters."})
uniId = bv_header["universityId"]
# various validations - may well be moved to the DAL or BR layer
# validate user is a member of uniAdmin for the university or a BTAdmin with proper permissions.
# validate Uni is enabled for 3rd party incentive programs
# validate incentive program meta data exists.
try:
if cache_status == "connected":
cache_instance = get_available_connection()
if cache_instance:
DAL = cache_instance["cache"]
results = DAL.fetchUniIncentivePrograms(uniId)
return JSONResponse(status_code=200, content={"message": results})
else:
return JSONResponse(status_code=404)
except: # catch *all* exceptions
e = sys.exc_info()[0]
return JSONResponse(status_code=400, content={"message": "<p>Error: %s</p>" % e})
else: return JSONResponse(status_code=422, content={"message": "Missing or invalid bvMeta."})
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port=8000)
In the DAL.py; the data access layer:
class DAL:
def __init__(self, user, passwd, host, bucket_name='BvUniversity',
collection_name="_default"):
self.cluster = None
self.bucket = None
self.scope = None
self.collection = None
self.bucket_name = bucket_name
self.scope_name = '_default'
self.collection_name = collection_name
self.user = user
self.passwd = passwd
self.host = host
self.connect()
#
def connect(self):
self.cluster = Cluster(self.host, ClusterOptions(PasswordAuthenticator(self.user, self.passwd)))
self.bucket = self.cluster.bucket(self.bucket_name)
self.collection = self.bucket.scope(self.scope_name).collection(self.collection_name)
#
def query(self, n1ql, params=[], autocommit=False):
#if self.conn.closed: # first off, lets confirm that we are still connected to the database. Reconnect if not.
# self.connect()
# self.open_cursor()
try:
if params:
result = self.cluster.query(n1ql, QueryOptions(positional_parameters=params))
else:
result = self.cluster.query(n1ql)
except:
result = {}
#if autocommit and result:sql
# self.conn.commit()
return result
#
def getUsersAvailableIncentivePrograms(self, userId, universityId):
rows = self.query("SELECT im.userId, \
im.incentivePrograms[*].incentiveProgramId, \
ip.programName \
FROM default:`incentiveMemberships` im \
UNNEST im.incentivePrograms \
JOIN default:`incentivePrograms` ip \
ON im.incentivePrograms[*].incentiveProgramId = META(ip).id \
WHERE im.userId = $1", [userId])
incentivePrograms = []
for row in rows:
incentivePrograms.append(row)
return incentivePrograms
In the schema classes:
import uuid
from pydantic import BaseModel
from enum import Enum
class IncentiveProgramResponse(BaseModel):
incentiveProgramId : str
Name : str
# StatusResponseCodes - http codes used in this service
# TODO: add statuses.
class StatusResponseCodes(Enum):
Approved = 200
BadRequest = 400
Unauthorized = 401
Forbidden = 403
NotFound = 404
Conflict = 409
InvalidAttributes = 422
TooManyRequests = 429
InternalServerError = 500
ServiceUnavailable = 503
class bvMeta (BaseModel):
transactionId: int
universityId: int
userId: int
class Message (BaseModel):
message: str
I’m working on setting up and finding the log files.