import couchdb import requests import time import re import json class CouchDBInterface: def __init__(self, couchdb_url, username, password, database): self.connect_couch(couchdb_url, username, password, database) def connect_couch(self, couchdb_url, username, password, database): self.connect_to_couch(username, password, database, couchdb_url) def connect_to_couch(self, username, password, databasename, url): if re.search('cloudant\.com', url): self.HTTP = 'https' couch = couchdb.Server(self.HTTP+'://%s:%s@%s/' % (username, password, url)) else: self.HTTP = 'http' couch = couchdb.Server('http://%s:%s@%s/' % (username, password, url)) # Create if not exists self.create_couch_db(couch, databasename) self.couch_db = couch[databasename] def create_couch_db(self, server, database): if database not in server: server.create(database) def save_document(self, doc): save_doc = self.couch_db.save(doc) if 'error' in save_doc: time.sleep(1) save_doc = self.couch_db.save(doc) elif 'too_many_requests' in save_doc: time.sleep(10) save_doc = self.couch_db.save(doc) return save_doc def update_document(self, doc): save_doc = self.couch_db.update(doc) return save_doc def delete_document(self, doc): self.couch_db.delete(doc) def get_document(self, doc_id): ret = self.couch_db.get(doc_id) if 'error' in ret: ret = self.couch_db.get(doc_id) return ret def get_attachment(self, doc_id, file_name): ret = self.couch_db.get_attachment(doc_id, file_name) return ret def get_document_filter(self, couch_url, username, password, database, design, view, start_key=None, end_key=None, is_array=None, descending='true'): # protocol = "https" protocol = self.HTTP if not start_key and not end_key: url = '%s://%s:%s@%s/%s/_design/%s/_view/%s?&descending=%s' % (protocol, username, password, couch_url, database, design, view, descending) elif start_key and not end_key: url = '%s://%s:%s@%s/%s/_design/%s/_view/%s?key="%s"&descending=%s' % (protocol, username, password, couch_url, database, design, view, start_key, descending) elif start_key and end_key and not is_array: url = '%s://%s:%s@%s/%s/_design/%s/_view/%s?startkey=["%s"]&endkey=["%s",{}]&descending=%s' % (protocol, username, password, couch_url, database, design, view, start_key, end_key, descending) elif start_key and end_key and is_array: url = '%s://%s:%s@%s/%s/_design/%s/_view/%s?startkey=["%s"]&endkey=["%s"]&descending=%s' % (protocol, username, password, couch_url, database, design, view, start_key, end_key, descending) print(url) result = requests.get(url) json_data = result.json() rows = [] if json_data and 'rows' in json_data: rows = json_data["rows"] return rows