Showing posts with label pymongo. Show all posts
Showing posts with label pymongo. Show all posts

Friday, August 17, 2018

Looping over Pymongo cursor returns bson.errors.InvalidBSON error after some iterations

Leave a Comment

I'm trying making a simple query with pymongo and looping over the results.

This is the code I'm using:

data = [] tam = db.my_collection.find({'timestamp': {'$gte': start, '$lte':end}}).count() for i,d in enumerate(table.find({'timestamp': {'$gte': start, '$lte':end}}):     print('%s of %s' % (i,tam))     data.append(d) 

start and end variables are datetime python objects. Everything runs fine until I get the following output:

2987 of 12848 2988 of 12848 2989 of 12848 2990 of 12848 2991 of 12848 2992 of 12848 Traceback (most recent call last):   File "db_extraction\extract_data.py", line 68, in <module>     data = extract_data(yesterday,days = 1)   File "db_extraction\extract_data.py", line 24, in extract_data     for i,d in enumerate(table.find({'timestamp': {'$gte': start, '$lte':end}}).limit(100000)):   File "\venv\lib\site-packages\pymongo\cursor.py", line 1169, in next     if len(self.__data) or self._refresh():   File "\venv\lib\site-packages\pymongo\cursor.py", line 1106, in _refresh     self.__send_message(g)   File "\venv\lib\site-packages\pymongo\cursor.py", line 971, in __send_message     codec_options=self.__codec_options)   File "\venv\lib\site-packages\pymongo\cursor.py", line 1055, in _unpack_response     return response.unpack_response(cursor_id, codec_options)   File "\venv\lib\site-packages\pymongo\message.py", line 945, in unpack_response     return bson.decode_all(self.documents, codec_options) bson.errors.InvalidBSON 

First thing I've tried is changing the range of the query to check if it is data related, and it's not. Another range stops at 1615 of 6360 and same error.

I've also tried list(table.find({'timestamp': {'$gte': start, '$lte':end}}) and same error.

Another maybe relevant info is that first queries are really fast. It freezes on the last number for a while before returning the error.

So I need some help. Am I hitting limits here? Or any clue on whats going on?

This is might be related with this 2013 question, but the author says that he gets no error output.

Thanks!

2 Answers

Answers 1

Your code runs fine on my computer. Since it works for your first 2992 records, I think the documents may have some inconsistency. Does every document in your collection follow the same schema and format? and is your pymongo updated?

Here is my suggestion if you want to loop through every record:

data = [] all_posts = db.my_collection.find({'timestamp': {'$gte': start, '$lte':end}}) tam = all_posts.count() i = 0 for post in all_posts:     i += 1     print('%s of %s' % (i,tam))     data.append(post) 

Regards,

Answers 2

Could it be related to specific documents in the DB? Have you checked the document that might cause the error (e.g., the 2992th result of your above query, starting with 0)?

You could also execute some queries against the DB directly (e.g., via the mongo shell) without using pymongo to see whether expected results are returned. For example, you could try db.my_collection.find({...}).skip(2992) to see the result. You could also use cursor.forEach() to print all the retrieved documents.

Read More

Friday, June 8, 2018

PyMongo: How Do Update A Collection Using Aggregate?

Leave a Comment

This is a continuation of this question.

I'm using the following code to find all documents from collection C_a whose text contains the word StackOverflow and store them in another collection called C_b:

import pymongo from pymongo import MongoClient client = MongoClient('127.0.0.1')  # mongodb running locally dbRead = client['C_a']            # using the test database in mongo # create the pipeline required  pipeline = [{"$match": {"$text": {"$search":"StackOverflow"}}},{"$out":"C_b"}]  # all attribute and operator need to quoted in pymongo dbRead.C_a.aggregate(pipeline)  #execution  print (dbRead.C_b.count()) ## verify count of the new collection  

This works great, however, if I run the same snippet for multiple keywords the results get overwritten. For example I want the collection C_b to contain all documents that contain the keywords StackOverflow, StackExchange, and Programming. To do so I simply iterate the snippet using the above keywords. But unfortunately, each iteration overwrites the previous.

Question: How do I update the output collection instead of overwriting it?

Plus: Is there a clever way to avoid duplicates, or do I have to check for duplicates afterwards?

1 Answers

Answers 1

If you look at the documentation $out doesn't support update

https://docs.mongodb.com/manual/reference/operator/aggregation/out/#pipe._S_out

So you need to do a two stage operation

pipeline = [{"$match": {"$text": {"$search":"StackOverflow"}}},{"$out":"temp"}]  # all attribute and operator need to quoted in pymongo dbRead.C_a.aggregate(pipeline) 

and then use approach discussed in

https://stackoverflow.com/a/37433640/2830850

dbRead.C_b.insert(    dbRead.temp.aggregate([]).toArray() ) 

And before starting the run you will need to drop the C_b collection

Read More

Friday, March 3, 2017

Do bulk inserts/update in MongoDB with PyMongo

Leave a Comment

How do I bulk update/insert in mongoDb with pymongo/pandas. The error I get is batch op errors occurred I reason I get is because I set the "_id", which I want to do. I code runs fine on first run, but on second run it fails. I want to use pandas in workflow. The data does have a datetime object.

The syntax is completely different for upsert = True, with Update. An efficient solution with update would be helpful, where "_id" or "qid" could be set. But, there are python datetime objects!

InSQL   = 'SELECT * from  database2.table2 ' sqlOut  = pd.read_sql(InSQL,cxn) sqlOut['_id'] = "20170101" + ":"+ sqlOut['Var']     dfOut   = sqlOut.to_json(orient='records',date_format='iso' ) try:     db["test"].insert_many(json.loads(dfOut)) except Exception as e:  print e 

0 Answers

Read More

Wednesday, April 6, 2016

limit() and sort() order pymongo and mongodb

Leave a Comment

Despite reading peoples answers stating that the sort is done first, evidence shows something different that the limit is done before the sort. Is there a way to force sort always first?

views = mongo.db.view_logging.find().sort([('count', 1)]).limit(10) 

Whether I use .sort().limit() or .limit().sort(), the limit takes precedence. I wonder if this is something to do with pymongo...

4 Answers

Answers 1

According to the documentation, regardless of which goes first in your chain of commands, sort() would be always applied before the limit().

You can also study the .explain() results of your query and look at the execution stages - you will find that the sorting input stage examines all of the filtered (in your case all documents in the collection) and then the limit is applied.


Let's go through an example.

Imagine there is a foo database with a test collection having 6 documents:

>>> col = db.foo.test >>> for doc in col.find(): ...     print(doc) {'time': '2016-03-28 12:12:00', '_id': ObjectId('56f9716ce4b05e6b92be87f2'), 'value': 90} {'time': '2016-03-28 12:13:00', '_id': ObjectId('56f971a3e4b05e6b92be87fc'), 'value': 82} {'time': '2016-03-28 12:14:00', '_id': ObjectId('56f971afe4b05e6b92be87fd'), 'value': 75} {'time': '2016-03-28 12:15:00', '_id': ObjectId('56f971b7e4b05e6b92be87ff'), 'value': 72} {'time': '2016-03-28 12:16:00', '_id': ObjectId('56f971c0e4b05e6b92be8803'), 'value': 81} {'time': '2016-03-28 12:17:00', '_id': ObjectId('56f971c8e4b05e6b92be8806'), 'value': 90} 

Now, let's execute queries with different order of sort() and limit() and check the results and the explain plan.

Sort and then limit:

>>> from pprint import pprint >>> cursor = col.find().sort([('time', 1)]).limit(3)   >>> sort_limit_plan = cursor.explain() >>> pprint(sort_limit_plan) {u'executionStats': {u'allPlansExecution': [],                      u'executionStages': {u'advanced': 3,                                           u'executionTimeMillisEstimate': 0,                                           u'inputStage': {u'advanced': 6,                                                           u'direction': u'forward',                                                           u'docsExamined': 6,                                                           u'executionTimeMillisEstimate': 0,                                                           u'filter': {u'$and': []},                                                           u'invalidates': 0,                                                           u'isEOF': 1,                                                           u'nReturned': 6,                                                           u'needFetch': 0,                                                           u'needTime': 1,                                                           u'restoreState': 0,                                                           u'saveState': 0,                                                           u'stage': u'COLLSCAN',                                                           u'works': 8},                                           u'invalidates': 0,                                           u'isEOF': 1,                                           u'limitAmount': 3,                                           u'memLimit': 33554432,                                           u'memUsage': 213,                                           u'nReturned': 3,                                           u'needFetch': 0,                                           u'needTime': 8,                                           u'restoreState': 0,                                           u'saveState': 0,                                           u'sortPattern': {u'time': 1},                                           u'stage': u'SORT',                                           u'works': 13},                      u'executionSuccess': True,                      u'executionTimeMillis': 0,                      u'nReturned': 3,                      u'totalDocsExamined': 6,                      u'totalKeysExamined': 0},  u'queryPlanner': {u'indexFilterSet': False,                    u'namespace': u'foo.test',                    u'parsedQuery': {u'$and': []},                    u'plannerVersion': 1,                    u'rejectedPlans': [],                    u'winningPlan': {u'inputStage': {u'direction': u'forward',                                                     u'filter': {u'$and': []},                                                     u'stage': u'COLLSCAN'},                                     u'limitAmount': 3,                                     u'sortPattern': {u'time': 1},                                     u'stage': u'SORT'}},  u'serverInfo': {u'gitVersion': u'6ce7cbe8c6b899552dadd907604559806aa2e9bd',                  u'host': u'h008742.mongolab.com',                  u'port': 53439,                  u'version': u'3.0.7'}} 

Limit and then sort:

>>> cursor = col.find().limit(3).sort([('time', 1)]) >>> limit_sort_plan = cursor.explain() >>> pprint(limit_sort_plan) {u'executionStats': {u'allPlansExecution': [],                      u'executionStages': {u'advanced': 3,                                           u'executionTimeMillisEstimate': 0,                                           u'inputStage': {u'advanced': 6,                                                           u'direction': u'forward',                                                           u'docsExamined': 6,                                                           u'executionTimeMillisEstimate': 0,                                                           u'filter': {u'$and': []},                                                           u'invalidates': 0,                                                           u'isEOF': 1,                                                           u'nReturned': 6,                                                           u'needFetch': 0,                                                           u'needTime': 1,                                                           u'restoreState': 0,                                                           u'saveState': 0,                                                           u'stage': u'COLLSCAN',                                                           u'works': 8},                                           u'invalidates': 0,                                           u'isEOF': 1,                                           u'limitAmount': 3,                                           u'memLimit': 33554432,                                           u'memUsage': 213,                                           u'nReturned': 3,                                           u'needFetch': 0,                                           u'needTime': 8,                                           u'restoreState': 0,                                           u'saveState': 0,                                           u'sortPattern': {u'time': 1},                                           u'stage': u'SORT',                                           u'works': 13},                      u'executionSuccess': True,                      u'executionTimeMillis': 0,                      u'nReturned': 3,                      u'totalDocsExamined': 6,                      u'totalKeysExamined': 0},  u'queryPlanner': {u'indexFilterSet': False,                    u'namespace': u'foo.test',                    u'parsedQuery': {u'$and': []},                    u'plannerVersion': 1,                    u'rejectedPlans': [],                    u'winningPlan': {u'inputStage': {u'direction': u'forward',                                                     u'filter': {u'$and': []},                                                     u'stage': u'COLLSCAN'},                                     u'limitAmount': 3,                                     u'sortPattern': {u'time': 1},                                     u'stage': u'SORT'}},  u'serverInfo': {u'gitVersion': u'6ce7cbe8c6b899552dadd907604559806aa2e9bd',                  u'host': u'h008742.mongolab.com',                  u'port': 53439,                  u'version': u'3.0.7'}} 

As you can see, in both cases the sort is applied first and affects all the 6 documents and then the limit limits the results to 3.

And, the execution plans are exactly the same:

>>> from copy import deepcopy  # just in case >>> cursor = col.find().sort([('time', 1)]).limit(3) >>> sort_limit_plan = deepcopy(cursor.explain()) >>> cursor = col.find().limit(3).sort([('time', 1)]) >>> limit_sort_plan = deepcopy(cursor.explain()) >>> sort_limit_plan == limit_sort_plan True 

Also see:

Answers 2

I suspect, you're passing wrong key in sort parameter. something like "$key_name" instead of just "key_name"

refer How do you tell Mongo to sort a collection before limiting the results?solution for same problem as yours

Answers 3

Logically it should be whatever comes first in pipeline, But MongoDB always sort first before limit.

In my test Sort operation does takes precedence regardless of if it's coming before skip or after. However, it appears to be very strange behavior to me.

My sample dataset is:

[     {         "_id" : ObjectId("56f845fea524b4d098e0ef81"),          "number" : 48.98052410874508     },      {         "_id" : ObjectId("56f845fea524b4d098e0ef82"),          "number" : 50.98747461471063     },      {         "_id" : ObjectId("56f845fea524b4d098e0ef83"),          "number" : 81.32911244349772     },      {         "_id" : ObjectId("56f845fea524b4d098e0ef84"),          "number" : 87.95549919039071     },      {         "_id" : ObjectId("56f845fea524b4d098e0ef85"),          "number" : 81.63582683594402     },      {         "_id" : ObjectId("56f845fea524b4d098e0ef86"),          "number" : 43.25696270026136     },      {         "_id" : ObjectId("56f845fea524b4d098e0ef87"),          "number" : 88.22046335409453     },      {         "_id" : ObjectId("56f845fea524b4d098e0ef88"),          "number" : 64.00556739160076     },      {         "_id" : ObjectId("56f845fea524b4d098e0ef89"),          "number" : 16.09353150244296     },      {         "_id" : ObjectId("56f845fea524b4d098e0ef8a"),          "number" : 17.46667776660574     } ] 

Python test code:

import pymongo  client = pymongo.MongoClient("mongodb://localhost:27017") database = client.get_database("test") collection = database.get_collection("collection")  print("----------------[limit -> sort]--------------------------") result = collection.find().limit(5).sort([("number", pymongo.ASCENDING)]) for r in result:     print(r)  print("----------------[sort -> limit]--------------------------") result = collection.find().sort([("number", pymongo.ASCENDING)]).limit(5) for r in result:     print(r) 

Result:

----------------[limit -> sort]-------------------------- {u'_id': ObjectId('56f845fea524b4d098e0ef89'), u'number': 16.09353150244296} {u'_id': ObjectId('56f845fea524b4d098e0ef8a'), u'number': 17.46667776660574} {u'_id': ObjectId('56f845fea524b4d098e0ef86'), u'number': 43.25696270026136} {u'_id': ObjectId('56f845fea524b4d098e0ef81'), u'number': 48.98052410874508} {u'_id': ObjectId('56f845fea524b4d098e0ef82'), u'number': 50.98747461471063} ----------------[sort -> limit]-------------------------- {u'_id': ObjectId('56f845fea524b4d098e0ef89'), u'number': 16.09353150244296} {u'_id': ObjectId('56f845fea524b4d098e0ef8a'), u'number': 17.46667776660574} {u'_id': ObjectId('56f845fea524b4d098e0ef86'), u'number': 43.25696270026136} {u'_id': ObjectId('56f845fea524b4d098e0ef81'), u'number': 48.98052410874508} {u'_id': ObjectId('56f845fea524b4d098e0ef82'), u'number': 50.98747461471063} 

Answers 4

The mongodb documentation states that the skip() method controls the starting point of the results set, followed by sort() and ends with the limit() method.

This is regardless the order of your code. The reason is that mongo gets all the methods for the query, then it orders the skip-sort-limit methods in that exact order, and then runs the query.

Read More