Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion pymongo/asynchronous/bulk.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ async def write_command(
)
if bwc.publish:
bwc._succeed(request_id, reply, duration) # type: ignore[arg-type]
await client._process_response(reply, bwc.session) # type: ignore[arg-type]
except Exception as exc:
duration = datetime.datetime.now() - bwc.start_time
if isinstance(exc, (NotPrimaryError, OperationFailure)):
Expand Down Expand Up @@ -308,6 +309,11 @@ async def write_command(

if bwc.publish:
bwc._fail(request_id, failure, duration)
# Process the response from the server.
if isinstance(exc, OperationFailure):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isinstance(exc, (NotPrimaryError, OperationFailure))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

await client._process_response(exc.details, bwc.session) # type: ignore[arg-type]
else:
await client._process_response({}, bwc.session) # type: ignore[arg-type]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need an else here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

raise
finally:
bwc.start_time = datetime.datetime.now()
Expand Down Expand Up @@ -449,7 +455,6 @@ async def _execute_batch(
else:
request_id, msg, to_send = bwc.batch_command(cmd, ops)
result = await self.write_command(bwc, cmd, request_id, msg, to_send, client) # type: ignore[arg-type]
await client._process_response(result, bwc.session) # type: ignore[arg-type]

return result, to_send # type: ignore[return-value]

Expand Down
8 changes: 7 additions & 1 deletion pymongo/asynchronous/client_bulk.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ async def write_command(
)
if bwc.publish:
bwc._succeed(request_id, reply, duration) # type: ignore[arg-type]
# Process the response from the server.
await self.client._process_response(reply, bwc.session) # type: ignore[arg-type]
except Exception as exc:
duration = datetime.datetime.now() - bwc.start_time
if isinstance(exc, (NotPrimaryError, OperationFailure)):
Expand Down Expand Up @@ -312,6 +314,11 @@ async def write_command(
bwc._fail(request_id, failure, duration)
# Top-level error will be embedded in ClientBulkWriteException.
reply = {"error": exc}
# Process the response from the server.
if isinstance(exc, OperationFailure):
await self.client._process_response(exc.details, bwc.session) # type: ignore[arg-type]
else:
await self.client._process_response({}, bwc.session) # type: ignore[arg-type]
finally:
bwc.start_time = datetime.datetime.now()
return reply # type: ignore[return-value]
Expand Down Expand Up @@ -431,7 +438,6 @@ async def _execute_batch(
result = await self.write_command(
bwc, cmd, request_id, msg, to_send_ops, to_send_ns, self.client
) # type: ignore[arg-type]
await self.client._process_response(result, bwc.session) # type: ignore[arg-type]
return result, to_send_ops, to_send_ns # type: ignore[return-value]

async def _process_results_cursor(
Expand Down
7 changes: 6 additions & 1 deletion pymongo/synchronous/bulk.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ def write_command(
)
if bwc.publish:
bwc._succeed(request_id, reply, duration) # type: ignore[arg-type]
client._process_response(reply, bwc.session) # type: ignore[arg-type]
except Exception as exc:
duration = datetime.datetime.now() - bwc.start_time
if isinstance(exc, (NotPrimaryError, OperationFailure)):
Expand Down Expand Up @@ -308,6 +309,11 @@ def write_command(

if bwc.publish:
bwc._fail(request_id, failure, duration)
# Process the response from the server.
if isinstance(exc, OperationFailure):
client._process_response(exc.details, bwc.session) # type: ignore[arg-type]
else:
client._process_response({}, bwc.session) # type: ignore[arg-type]
raise
finally:
bwc.start_time = datetime.datetime.now()
Expand Down Expand Up @@ -449,7 +455,6 @@ def _execute_batch(
else:
request_id, msg, to_send = bwc.batch_command(cmd, ops)
result = self.write_command(bwc, cmd, request_id, msg, to_send, client) # type: ignore[arg-type]
client._process_response(result, bwc.session) # type: ignore[arg-type]

return result, to_send # type: ignore[return-value]

Expand Down
8 changes: 7 additions & 1 deletion pymongo/synchronous/client_bulk.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ def write_command(
)
if bwc.publish:
bwc._succeed(request_id, reply, duration) # type: ignore[arg-type]
# Process the response from the server.
self.client._process_response(reply, bwc.session) # type: ignore[arg-type]
except Exception as exc:
duration = datetime.datetime.now() - bwc.start_time
if isinstance(exc, (NotPrimaryError, OperationFailure)):
Expand Down Expand Up @@ -312,6 +314,11 @@ def write_command(
bwc._fail(request_id, failure, duration)
# Top-level error will be embedded in ClientBulkWriteException.
reply = {"error": exc}
# Process the response from the server.
if isinstance(exc, OperationFailure):
self.client._process_response(exc.details, bwc.session) # type: ignore[arg-type]
else:
self.client._process_response({}, bwc.session) # type: ignore[arg-type]
finally:
bwc.start_time = datetime.datetime.now()
return reply # type: ignore[return-value]
Expand Down Expand Up @@ -429,7 +436,6 @@ def _execute_batch(
"""Executes a batch of bulkWrite server commands (ack)."""
request_id, msg, to_send_ops, to_send_ns = bwc.batch_command(cmd, ops, namespaces)
result = self.write_command(bwc, cmd, request_id, msg, to_send_ops, to_send_ns, self.client) # type: ignore[arg-type]
self.client._process_response(result, bwc.session) # type: ignore[arg-type]
return result, to_send_ops, to_send_ns # type: ignore[return-value]

def _process_results_cursor(
Expand Down
Loading