Operations per second is very low(5-50 ops/sec at max) while loading json data to Couchbase bucket

Hi Everyone,

While loading large json data to Couchbase Bucket using Python SDK the Operations/sec is very low (5-50 ops/sec max). I use below mentioned script to load the data

#Connection establishment and accessing the Couchbase Bucket
cluster = Cluster(‘couchbase://localhost’)
authenticator = PasswordAuthenticator(‘username’, ‘password’)
cluster.authenticate(authenticator)
cb = cluster.open_bucket(‘bucket_name’)

#Inserting Data to Couchbase Bucket
for doc in data_list:
key=uuid.uuid4(). str ()
cb.insert(key,doc)

Please suggest a way to get high Operations per sec, anything above 500 ops/sec is appreciable.

@vsr1 any solution for this?
sorry for the tag.

Instead of one thread synchronous, Try to in parallel (multiple threads) and async

1 Like

Hi @Kratos - See below for some sample code I have that shows bulk loading documents via the acouchbase API (integration w/ the standard lib asyncio). You can of course go can down the path of implementing threads, another option is to use the multi_*() methods provided by the SDK. You should be able to get solid performance with the async approach and it is the path we would recommend. Of course, it is just an example and there could be further optimizations (and additions like error handling, etc.).

I hope this helps.

import time
import asyncio
from uuid import uuid4

from acouchbase.cluster import Cluster, get_event_loop, close_event_loop
from couchbase.auth import PasswordAuthenticator
from couchbase.cluster import ClusterOptions


def create_key():
    return "{}".format(uuid4())


async def load_doc_async(collection, key, doc):
    return await collection.insert(key, doc)


async def bulk_load_data(collection, data):
    print(f'Loading {len(data)} documents...')

    docs = {create_key(): d for d in data}
    t = time.perf_counter()
    res = await asyncio.gather(*[load_doc_async(collection, k, v) for k, v in docs.items()])
    t2 = time.perf_counter() - t
    print(f'Loaded {len(data)} documents in {t2:0.2f} seconds.')


async def main():
    cluster = Cluster("couchbase://{}".format("localhost"),
                      ClusterOptions(PasswordAuthenticator("Administrator", "password")))
    await cluster.on_connect()
    bucket = cluster.bucket("default")
    collection = bucket.default_collection()
    data = [{"id": 0, "info": "fake data 0", "type": "fake_data"},
            {"id": 1, "info": "fake data 1", "type": "fake_data"},
            {"id": 2, "info": "fake data 2", "type": "fake_data"},
            {"id": 3, "info": "fake data 3", "type": "fake_data"},
            {"id": 4, "info": "fake data 4", "type": "fake_data"},
            {"id": 5, "info": "fake data 5", "type": "fake_data"},
            {"id": 6, "info": "fake data 6", "type": "fake_data"}]

    await bulk_load_data(collection, data)


def run_sample_code():
    try:
        loop = get_event_loop()
        loop.run_until_complete(main())
    except Exception as ex:
        import traceback
        traceback.print_exc()
    finally:
        close_event_loop()


if __name__ == '__main__':
    run_sample_code()

1 Like

Hi @jcasey
I was able to run this code but data is not loading to the bucket.

If you don’t mind can you please modify the sample code according to below mentioned condition.

i) list_result is the json file that we are trying to load to couchbase bucket named “abc”.

If possible please help I have been trying this since few days but still no progress.

Hi @Kratos - Did you update the sample to match your environment (connection string, username, password, bucket name, change the data list, etc.)? Also, you can loop through the results from the asyncio.gather() call, it will either contain exceptions or results and that will give some information at to what might be happening.

I can try to modify the code, but it might be a day or so until I can get to the changes. What is the format of the JSON file (each line a new object, a big JSON array with JSON objects, etc)? Maybe a sample of what the file contains would be helpful.

1 Like

Hi @jcasey,

JSON file is in the format of List.

Yes sir I updated the corresponding info to match my environment.

If it is list you can also consider cbimport json | Couchbase Docs

1 Like

Hi sir,

We want to load data through Python script rather than from command.

Hi @Kratos – see updated sample below. Please make sure to update host, username, etc. to match your environment (and that the path to the JSON file is correct). Note the format of the JSON file is assumed to be:

[
    {...},
    {...},
    {...}
]

Sample code:

import os
import time
import json
import asyncio
from uuid import uuid4

from acouchbase.cluster import Cluster, get_event_loop, close_event_loop
from couchbase.auth import PasswordAuthenticator
from couchbase.cluster import ClusterOptions


def create_key():
    return "{}".format(uuid4())


def load_json_data(json_file_path):
    """
    Assumption here is JSON file is in format:
        [
            {...},
            {...},
            {...}
        ]

    Args:
        json_file_path (str): path to JSON file

    Returns:
        Any: a python like object representing JSON data
    """
    with open(json_file_path) as f:
        try:
            return json.load(f)
        except ValueError:
            print("Decoding JSON has failed")

    return None


async def load_doc_async(collection, key, doc):
    return await collection.insert(key, doc)


async def bulk_load_data(collection, data):
    print(f'Loading {len(data)} documents...')

    docs = {create_key(): d for d in data}
    t = time.perf_counter()
    # gather preserves order
    loaded_docs = await asyncio.gather(*[load_doc_async(collection, k, v) for k, v in docs.items()])
    t2 = time.perf_counter() - t
    print(f'Loaded {len(loaded_docs)} documents in {t2:0.2f} seconds.')
    doc_keys = list(docs.keys())
    for idx, d in enumerate(loaded_docs):
        print("Loaded doc w/ key: {} has CAS: {}".format(doc_keys[idx], d.cas))


async def main(host, username, pw, bucket_name, json_data=None):
    cluster = Cluster(host, ClusterOptions(
        PasswordAuthenticator(username, pw)))
    await cluster.on_connect()
    bucket = cluster.bucket(bucket_name)
    collection = bucket.default_collection()
    if json_data:
        data = load_json_data(json_data)
    else:
        data = [{"id": 0, "info": "fake data 0", "type": "fake_data"},
                {"id": 1, "info": "fake data 1", "type": "fake_data"},
                {"id": 2, "info": "fake data 2", "type": "fake_data"},
                {"id": 3, "info": "fake data 3", "type": "fake_data"},
                {"id": 4, "info": "fake data 4", "type": "fake_data"},
                {"id": 5, "info": "fake data 5", "type": "fake_data"},
                {"id": 6, "info": "fake data 6", "type": "fake_data"}]

    await bulk_load_data(collection, data)


def run_sample_code(host, username, pw, bucket_name, json_data=None):
    try:
        loop = get_event_loop()
        loop.run_until_complete(
            main(host, username, pw, bucket_name, json_data=json_data))
    except Exception as ex:
        import traceback
        traceback.print_exc()
    finally:
        close_event_loop()


if __name__ == '__main__':
    # Update this to your cluster
    host = "couchbase://{}".format("localhost")
    username = "Administrator"
    password = "password"
    bucket_name = "default"
    # update the following to match the path for your specific json file
    # the current example is using <path to current working dir>/ajson_file.json
    path_to_json_data = os.path.join(os.getcwd(), "json_file.json")

    run_sample_code(host, username, password, bucket_name, path_to_json_data)

I hope this helps.

1 Like

Hi @jcasey

Thanks a lot sir for taking your time to help me out.