Hi @Kratos – see updated sample below. Please make sure to update host, username, etc. to match your environment (and that the path to the JSON file is correct). Note the format of the JSON file is assumed to be:
[
{...},
{...},
{...}
]
Sample code:
import os
import time
import json
import asyncio
from uuid import uuid4
from acouchbase.cluster import Cluster, get_event_loop, close_event_loop
from couchbase.auth import PasswordAuthenticator
from couchbase.cluster import ClusterOptions
def create_key():
return "{}".format(uuid4())
def load_json_data(json_file_path):
"""
Assumption here is JSON file is in format:
[
{...},
{...},
{...}
]
Args:
json_file_path (str): path to JSON file
Returns:
Any: a python like object representing JSON data
"""
with open(json_file_path) as f:
try:
return json.load(f)
except ValueError:
print("Decoding JSON has failed")
return None
async def load_doc_async(collection, key, doc):
return await collection.insert(key, doc)
async def bulk_load_data(collection, data):
print(f'Loading {len(data)} documents...')
docs = {create_key(): d for d in data}
t = time.perf_counter()
# gather preserves order
loaded_docs = await asyncio.gather(*[load_doc_async(collection, k, v) for k, v in docs.items()])
t2 = time.perf_counter() - t
print(f'Loaded {len(loaded_docs)} documents in {t2:0.2f} seconds.')
doc_keys = list(docs.keys())
for idx, d in enumerate(loaded_docs):
print("Loaded doc w/ key: {} has CAS: {}".format(doc_keys[idx], d.cas))
async def main(host, username, pw, bucket_name, json_data=None):
cluster = Cluster(host, ClusterOptions(
PasswordAuthenticator(username, pw)))
await cluster.on_connect()
bucket = cluster.bucket(bucket_name)
collection = bucket.default_collection()
if json_data:
data = load_json_data(json_data)
else:
data = [{"id": 0, "info": "fake data 0", "type": "fake_data"},
{"id": 1, "info": "fake data 1", "type": "fake_data"},
{"id": 2, "info": "fake data 2", "type": "fake_data"},
{"id": 3, "info": "fake data 3", "type": "fake_data"},
{"id": 4, "info": "fake data 4", "type": "fake_data"},
{"id": 5, "info": "fake data 5", "type": "fake_data"},
{"id": 6, "info": "fake data 6", "type": "fake_data"}]
await bulk_load_data(collection, data)
def run_sample_code(host, username, pw, bucket_name, json_data=None):
try:
loop = get_event_loop()
loop.run_until_complete(
main(host, username, pw, bucket_name, json_data=json_data))
except Exception as ex:
import traceback
traceback.print_exc()
finally:
close_event_loop()
if __name__ == '__main__':
# Update this to your cluster
host = "couchbase://{}".format("localhost")
username = "Administrator"
password = "password"
bucket_name = "default"
# update the following to match the path for your specific json file
# the current example is using <path to current working dir>/ajson_file.json
path_to_json_data = os.path.join(os.getcwd(), "json_file.json")
run_sample_code(host, username, password, bucket_name, path_to_json_data)
I hope this helps.