I have a MERGE INTO ... WHEN MATCHED THEN UPDATE SET ... WHEN NOT MATCHED THEN INSERT-style query, that works if I execute it using the couchbase web UI. Although, if I execute the same query using the python SDK, the inserted documents don’t appear in the document lookup page of the UI.
My script looks like this:
from couchbase.cluster import Cluster, ClusterOptions
from couchbase_core.cluster import PasswordAuthenticator
from some.internal.lib import (
create_my_connection_string,
get_my_merge_into_query_statement
)
from couchbase_core import __version__
print(__version__) # 3.0.10
connection_string = create_my_connection_string()
auth = PasswordAuthenticator(self._username, self._password)
cluster = Cluster(connection_string, ClusterOptions(auth))
query_statement = get_my_merge_into_query_statement()
cluster.query(query_statement)
I can see that in order to update or insert documents with the Python SDK, one should use the functions couchbase.cluster.Cluster.insert or couchbase.cluster.Cluster.upsert, but what I want to do can be done very naturally with a MERGE INTO statement, which AFAIK I can only execute using couchbase.cluster.Cluster.query.
Found the solution. There is a lazy-evaluation-like behaviour.
I just have to iterate through the query results.
Something like this:
res = cluster.query(some_query_statement) # this doesn't do anything.
res.rows() # this line actually posts the query HTTP request and evaluates the query.
# the console log after this line:
# 6699ms [Idc6b6e608960ef0e] {93549} [TRACE] (http-io - L:390) <localhost:8093> POST http://localhost:8093/query/service. Body=279 bytes
Hi @mtannerman - another approach could be using the .execute() method which acts as pass/fail. The following sample code should provide the necessary info. Hope this helps.
from couchbase.cluster import Cluster, ClusterOptions, QueryOptions
from couchbase.auth import PasswordAuthenticator
from couchbase.exceptions import CouchbaseException
import traceback
db_info = {
'host': 'couchbase://localhost',
'secure': False,
'bucket_name': 'beer-sample',
'username': 'Administrator',
'password': 'password'
}
def run_sample_code():
try:
auth = PasswordAuthenticator(db_info['username'], db_info['password'])
cluster = Cluster(db_info['host'], authenticator=auth)
query_str = """
UPDATE `beer-sample` b0
SET beers = (
SELECT RAW b3.name
FROM `beer-sample` b1 USE KEYS META(b0).id
JOIN (
SELECT b2.name,
b2.brewery_id
FROM `beer-sample` b2
WHERE b2.type='beer') AS b3 ON b3.brewery_id = META(b1).id)
WHERE b0.type='brewery'
AND META(b0).id = '21st_amendment_brewery_cafe'
"""
cluster.query(query_str).execute()
except CouchbaseException as ex:
traceback.print_exc()
except Exception as ex:
traceback.print_exc()
if __name__ == '__main__':
run_sample_code()