Hi all,
I am trying to get query working when using libevent and libcouchbase. It works for queries which return less number of rows, but fails when multiple rows are selected. Below are the the details using an example code.
I used the example code libcouchbase/example/libeventdirect/main.c and added code for querying.
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <iostream>
#include <event2/event.h>
#include <libcouchbase/couchbase.h>
int nreq = 1;
int nresp = 1;
int interval = 0;
struct event *timer = NULL;
static void bootstrap_callback(lcb_INSTANCE *instance, lcb_STATUS err) {
if (err != LCB_SUCCESS) {
std::cerr << "ERROR: " << lcb_strerror_short(err) << std::endl;
exit(EXIT_FAILURE);
}
std::cout << ("successfully bootstrapped\n") << std::endl;
}
static void query_callback(lcb_INSTANCE *instance, int type,
const lcb_RESPQUERY *resp) {
std::cout << "Entering query_callback" << std::endl;
const char *row;
size_t nrow;
lcb_STATUS rc = lcb_respquery_status(resp);
if (rc != LCB_SUCCESS) {
std::cout << (lcb_strerror_short(rc)) << std::endl;
return;
} else {
lcb_respquery_row(resp, &row, &nrow);
const std::string row_string(row, nrow);
std::cout << "Row string : " << (row_string) << std::endl;
if (lcb_respquery_is_final(resp)) {
std::cout << "Final" << std::endl;
}
}
}
static void get_callback(lcb_INSTANCE *instance, int cbtype,
const lcb_RESPGET *rg) {
const char *value;
size_t nvalue;
lcb_STATUS rc = lcb_respget_status(rg);
if (rc != LCB_SUCCESS) {
fprintf(stderr, "Failed to get key: %s\n", lcb_strerror_short(rc));
exit(EXIT_FAILURE);
}
lcb_respget_value(rg, &value, &nvalue);
std::cout << "Retrieved the value: " << std::string(value, (int)nvalue)
<< std::endl;
fflush(stdout);
nresp--;
if (nresp == 0) {
std::cout << ("Stopping the loop") << std::endl;
event_base_loopbreak(reinterpret_cast<event_base *>(
const_cast<void *>(lcb_get_cookie(instance))));
}
(void)cbtype;
}
static void schedule_timer();
static void timer_callback(int fd, short event, void *arg) {
lcb_INSTANCE *instance = reinterpret_cast<lcb_INSTANCE *>(arg);
{
// Get
lcb_STATUS rc;
lcb_CMDGET *gcmd;
const std::string &key = "Some Key 0";
lcb_cmdget_create(&gcmd);
lcb_cmdget_key(gcmd, key.c_str(), key.size());
rc = lcb_get(instance, NULL, gcmd);
lcb_cmdget_destroy(gcmd);
if (rc != LCB_SUCCESS) {
fprintf(stderr, "Failed to schedule get request: %s\n",
lcb_strerror_short(rc));
exit(EXIT_FAILURE);
}
}
(void)fd;
(void)event;
schedule_timer();
}
static void schedule_timer() {
struct timeval tv;
if (!nreq) {
return;
}
tv.tv_sec = interval;
tv.tv_usec = 0;
evtimer_add(timer, &tv);
nreq--;
}
static void store_callback(lcb_INSTANCE *instance, int cbtype,
const lcb_RESPSTORE *resp) {
lcb_STATUS rc = lcb_respstore_status(resp);
if (rc != LCB_SUCCESS) {
fprintf(stderr, "Failed to store key: %s\n", lcb_strerror_short(rc));
exit(EXIT_FAILURE);
}
fflush(stdout);
{
struct event_base *evbase = (struct event_base *)lcb_get_cookie(instance);
timer = evtimer_new(evbase, timer_callback, instance);
schedule_timer();
}
(void)cbtype;
}
static lcb_io_opt_t create_libevent_io_ops(struct event_base *evbase) {
struct lcb_create_io_ops_st ciops;
lcb_io_opt_t ioops;
lcb_STATUS error;
memset(&ciops, 0, sizeof(ciops));
ciops.v.v0.type = LCB_IO_OPS_LIBEVENT;
ciops.v.v0.cookie = evbase;
error = lcb_create_io_ops(&ioops, &ciops);
if (error != LCB_SUCCESS) {
fprintf(stderr, "Failed to create an IOOPS structure for libevent: %s\n",
lcb_strerror_short(error));
exit(EXIT_FAILURE);
}
return ioops;
}
static lcb_INSTANCE *create_libcouchbase_handle(lcb_io_opt_t ioops,
const std::string &conn_string,
const std::string &uname,
const std::string &password) {
lcb_INSTANCE *instance;
lcb_STATUS error;
lcb_CREATEOPTS *options = NULL;
lcb_createopts_create(&options, LCB_TYPE_BUCKET);
/* If NULL, will default to localhost */
lcb_createopts_connstr(options, conn_string.c_str(), conn_string.size());
lcb_createopts_credentials(options, uname.c_str(), uname.size(),
password.c_str(), password.size());
lcb_createopts_io(options, ioops);
error = lcb_create(&instance, options);
lcb_createopts_destroy(options);
if (error != LCB_SUCCESS) {
fprintf(stderr, "Failed to create a libcouchbase instance: %s\n",
lcb_strerror_short(error));
exit(EXIT_FAILURE);
}
/* Set up the callbacks */
lcb_set_bootstrap_callback(instance, bootstrap_callback);
lcb_install_callback(instance, LCB_CALLBACK_GET,
(lcb_RESPCALLBACK)get_callback);
lcb_install_callback(instance, LCB_CALLBACK_STORE,
(lcb_RESPCALLBACK)store_callback);
if ((error = lcb_connect(instance)) != LCB_SUCCESS) {
fprintf(stderr, "Failed to connect libcouchbase instance: %s\n",
lcb_strerror_short(error));
lcb_destroy(instance);
exit(EXIT_FAILURE);
}
return instance;
}
event_base *evbase;
lcb_io_opt_t ioops;
void destroy_instance(lcb_INSTANCE *instance) {
if (instance && evbase) {
/* Cleanup */
lcb_destroy(instance);
if (timer) {
evtimer_del(timer);
}
lcb_destroy_io_ops(ioops);
event_base_free(evbase);
}
return;
}
lcb_INSTANCE *create_instance(const std::string &conn_string,
const std::string &uname,
const std::string &password) {
uint32_t expiration = 0;
evbase = event_base_new();
ioops = create_libevent_io_ops(evbase);
lcb_INSTANCE *instance =
create_libcouchbase_handle(ioops, conn_string, uname, password);
/*Store the event base as the user cookie in our instance so that
* we may terminate the program when we're done */
lcb_set_cookie(instance, evbase);
{
// Store code
for (int i = 0; i < 1000; ++i) {
const std::string key = "Some Key " + std::to_string(i);
const std::string value =
"{\"Some\": \"Value : " + std::to_string(i) + "\"}";
lcb_CMDSTORE *scmd;
lcb_cmdstore_create(&scmd, LCB_STORE_UPSERT);
lcb_cmdstore_key(scmd, key.c_str(), key.length());
lcb_cmdstore_value(scmd, value.c_str(), value.length());
if (expiration > 0) {
lcb_cmdstore_expiry(scmd, expiration);
}
lcb_store(instance, NULL, scmd);
}
}
{
// Try query before get
lcb_CMDQUERY *cmd;
const std::string &query = "SELECT meta().id AS _k, _v FROM demo_bucket _v";
lcb_cmdquery_create(&cmd);
lcb_cmdquery_statement(cmd, query.c_str(), query.length());
lcb_cmdquery_callback(cmd, query_callback);
lcb_cmdquery_consistency(cmd, LCB_QUERY_CONSISTENCY_REQUEST);
lcb_query(instance, NULL, cmd);
lcb_cmdquery_destroy(cmd);
}
/* Run the event loop */
event_base_loop(evbase, 0);
return instance;
}
int main(int argc, char *argv[]) {
std::string bucket("couchbase://couchbase/demo_bucket");
std::string user, password;
if (argc < 3) {
std::cerr << argv[0] << "<user> <password> [bucket]" << std::endl;
exit(1);
}
user = std::string(argv[1]);
password = std::string(argv[2]);
if (argc == 4) {
bucket = std::string(argv[3]);
}
auto instance = create_instance(bucket, user, password);
destroy_instance(instance);
return 0;
}
I am inserting 1000 values using lcb_store() and then querying them. But it fails with LCB_ERR_REQUEST_CANCELED (202) after entering query_callback(). If the query is quick (like select count(*)) it displays the query result and works fine.
I feel that I am doing something wrong here and this is not the correct way to use query using libevent plugin. I couldn’t find any other examples on the web where query is used (with libevent).
Can anyone check why query is not waiting for all rows here? It would be really helpful if I can get an example on the correct usage.
Please reply if you need more details. Happy to provide them.
Thanks,
Anoop