Skip to content

Commit 28244e7

Browse files
committed
YT-26425: Distributed API http proxy light requests
* Changelog entry Type: feature Component: cpp-sdk Support distributed API in C\+\+ SDK <Message for release notes> commit_hash:689a3c978864fa4623f3b38ce031faa96532b3fe
1 parent 4a9fcc0 commit 28244e7

File tree

18 files changed

+997
-17
lines changed

18 files changed

+997
-17
lines changed

yt/cpp/mapreduce/client/client.cpp

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include "client_writer.h"
66
#include "file_reader.h"
77
#include "file_writer.h"
8+
#include "file_fragment_writer.h"
89
#include "format_hints.h"
910
#include "init.h"
1011
#include "lock.h"
@@ -401,6 +402,13 @@ IFileWriterPtr TClientBase::CreateFileWriter(
401402
return new TFileWriter(realPath, RawClient_, ClientRetryPolicy_, GetTransactionPinger(), Context_, TransactionId_, options);
402403
}
403404

405+
IFileFragmentWriterPtr TClientBase::CreateFileFragmentWriter(
406+
const TDistributedWriteFileCookie& cookie,
407+
const TFileFragmentWriterOptions& options)
408+
{
409+
return NDetail::CreateFileFragmentWriter(RawClient_, ClientRetryPolicy_->CreatePolicyForGenericRequest(), cookie, options);
410+
}
411+
404412
TTableWriterPtr<::google::protobuf::Message> TClientBase::CreateTableWriter(
405413
const TRichYPath& path, const ::google::protobuf::Descriptor& descriptor, const TTableWriterOptions& options)
406414
{
@@ -978,6 +986,23 @@ ::TIntrusivePtr<IProtoWriterImpl> TClientBase::CreateProtoWriter(
978986
}
979987
}
980988

989+
::TIntrusivePtr<ITableFragmentWriter<TNode>> TClientBase::CreateNodeFragmentWriter(
990+
const TDistributedWriteTableCookie& cookie,
991+
const TTableFragmentWriterOptions& options)
992+
{
993+
auto format = TFormat::YsonBinary();
994+
995+
// TODO(achains): Make proper wrapper class with retries and auto ping.
996+
auto stream = NDetail::RequestWithRetry<std::unique_ptr<IOutputStreamWithResponse>>(
997+
ClientRetryPolicy_->CreatePolicyForGenericRequest(),
998+
[&] (TMutationId /*mutationId*/) {
999+
return RawClient_->WriteTableFragment(cookie, format, options);
1000+
}
1001+
);
1002+
1003+
return ::MakeIntrusive<TNodeTableFragmentWriter>(std::move(stream));
1004+
}
1005+
9811006
TBatchRequestPtr TClientBase::CreateBatchRequest()
9821007
{
9831008
return MakeIntrusive<TBatchRequest>(TransactionId_, GetParentClientImpl());
@@ -1497,6 +1522,82 @@ void TClient::ResumeOperation(
14971522
});
14981523
}
14991524

1525+
TDistributedWriteTableSessionWithCookies TClient::StartDistributedWriteTableSession(
1526+
const TRichYPath& richPath,
1527+
i64 cookieCount,
1528+
const TStartDistributedWriteTableOptions& options)
1529+
{
1530+
CheckShutdown();
1531+
return RequestWithRetry<TDistributedWriteTableSessionWithCookies>(
1532+
ClientRetryPolicy_->CreatePolicyForGenericRequest(),
1533+
[this, cookieCount, &richPath, &options] (TMutationId& mutationId) {
1534+
return RawClient_->StartDistributedWriteTableSession(mutationId, richPath, cookieCount, options);
1535+
});
1536+
}
1537+
1538+
void TClient::PingDistributedWriteTableSession(
1539+
const TDistributedWriteTableSession& session,
1540+
const TPingDistributedWriteTableOptions& options)
1541+
{
1542+
CheckShutdown();
1543+
RequestWithRetry<void>(
1544+
ClientRetryPolicy_->CreatePolicyForGenericRequest(),
1545+
[this, &session, &options] (TMutationId& /*mutationId*/) {
1546+
RawClient_->PingDistributedWriteTableSession(session, options);
1547+
});
1548+
}
1549+
1550+
void TClient::FinishDistributedWriteTableSession(
1551+
const TDistributedWriteTableSession& session,
1552+
const TVector<TWriteTableFragmentResult>& results,
1553+
const TFinishDistributedWriteTableOptions& options)
1554+
{
1555+
CheckShutdown();
1556+
RequestWithRetry<void>(
1557+
ClientRetryPolicy_->CreatePolicyForGenericRequest(),
1558+
[this, &session, &results, &options] (TMutationId& mutationId) {
1559+
RawClient_->FinishDistributedWriteTableSession(mutationId, session, results, options);
1560+
});
1561+
}
1562+
1563+
TDistributedWriteFileSessionWithCookies TClient::StartDistributedWriteFileSession(
1564+
const TRichYPath& richPath,
1565+
i64 cookieCount,
1566+
const TStartDistributedWriteFileOptions& options)
1567+
{
1568+
CheckShutdown();
1569+
return RequestWithRetry<TDistributedWriteFileSessionWithCookies>(
1570+
ClientRetryPolicy_->CreatePolicyForGenericRequest(),
1571+
[this, cookieCount, &richPath, &options] (TMutationId& mutationId) {
1572+
return RawClient_->StartDistributedWriteFileSession(mutationId, richPath, cookieCount, options);
1573+
});
1574+
}
1575+
1576+
void TClient::PingDistributedWriteFileSession(
1577+
const TDistributedWriteFileSession& session,
1578+
const TPingDistributedWriteFileOptions& options)
1579+
{
1580+
CheckShutdown();
1581+
RequestWithRetry<void>(
1582+
ClientRetryPolicy_->CreatePolicyForGenericRequest(),
1583+
[this, &session, &options] (TMutationId& /*mutationId*/) {
1584+
RawClient_->PingDistributedWriteFileSession(session, options);
1585+
});
1586+
}
1587+
1588+
void TClient::FinishDistributedWriteFileSession(
1589+
const TDistributedWriteFileSession& session,
1590+
const TVector<TWriteFileFragmentResult>& results,
1591+
const TFinishDistributedWriteFileOptions& options)
1592+
{
1593+
CheckShutdown();
1594+
RequestWithRetry<void>(
1595+
ClientRetryPolicy_->CreatePolicyForGenericRequest(),
1596+
[this, &session, &results, &options] (TMutationId& mutationId) {
1597+
RawClient_->FinishDistributedWriteFileSession(mutationId, session, results, options);
1598+
});
1599+
}
1600+
15001601
TYtPoller& TClient::GetYtPoller()
15011602
{
15021603
auto g = Guard(Lock_);

yt/cpp/mapreduce/client/client.h

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,10 @@ class TClientBase
120120
const TRichYPath& path,
121121
const TFileWriterOptions& options) override;
122122

123+
IFileFragmentWriterPtr CreateFileFragmentWriter(
124+
const TDistributedWriteFileCookie& cookie,
125+
const TFileFragmentWriterOptions& options = {}) override;
126+
123127
TTableWriterPtr<::google::protobuf::Message> CreateTableWriter(
124128
const TRichYPath& path,
125129
const ::google::protobuf::Descriptor& descriptor,
@@ -300,6 +304,10 @@ class TClientBase
300304
const TRichYPath& path,
301305
const TTableWriterOptions& options,
302306
const Message* prototype) override;
307+
308+
::TIntrusivePtr<ITableFragmentWriter<TNode>> CreateNodeFragmentWriter(
309+
const TDistributedWriteTableCookie& cookie,
310+
const TTableFragmentWriterOptions& options) override;
303311
};
304312

305313
////////////////////////////////////////////////////////////////////////////////
@@ -509,6 +517,34 @@ class TClient
509517
const TOperationId& operationId,
510518
const TResumeOperationOptions& options) override;
511519

520+
TDistributedWriteTableSessionWithCookies StartDistributedWriteTableSession(
521+
const TRichYPath& richPath,
522+
i64 cookieCount,
523+
const TStartDistributedWriteTableOptions& options = {}) override;
524+
525+
void PingDistributedWriteTableSession(
526+
const TDistributedWriteTableSession& session,
527+
const TPingDistributedWriteTableOptions& options = {}) override;
528+
529+
void FinishDistributedWriteTableSession(
530+
const TDistributedWriteTableSession& session,
531+
const TVector<TWriteTableFragmentResult>& results,
532+
const TFinishDistributedWriteTableOptions& options = {}) override;
533+
534+
TDistributedWriteFileSessionWithCookies StartDistributedWriteFileSession(
535+
const TRichYPath& richPath,
536+
i64 cookieCount,
537+
const TStartDistributedWriteFileOptions& options = {}) override;
538+
539+
void PingDistributedWriteFileSession(
540+
const TDistributedWriteFileSession& session,
541+
const TPingDistributedWriteFileOptions& options = {}) override;
542+
543+
void FinishDistributedWriteFileSession(
544+
const TDistributedWriteFileSession& session,
545+
const TVector<TWriteFileFragmentResult>& results,
546+
const TFinishDistributedWriteFileOptions& options = {}) override;
547+
512548
void Shutdown() override;
513549

514550
ITransactionPingerPtr GetTransactionPinger() override;
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
#include "file_fragment_writer.h"
2+
3+
#include <yt/cpp/mapreduce/common/retry_request.h>
4+
5+
#include <yt/cpp/mapreduce/interface/raw_client.h>
6+
7+
#include <library/cpp/yson/node/node_io.h>
8+
9+
namespace NYT::NDetail {
10+
11+
////////////////////////////////////////////////////////////////////////////////
12+
13+
class TFileFragmentWriter
14+
: public IFileFragmentWriter
15+
{
16+
public:
17+
explicit TFileFragmentWriter(std::unique_ptr<IOutputStreamWithResponse> stream)
18+
: Underlying_(std::move(stream))
19+
{ }
20+
21+
TWriteFileFragmentResult GetWriteFragmentResult() const override
22+
{
23+
return TWriteFileFragmentResult(NodeFromYsonString(Underlying_->GetResponse()));
24+
}
25+
26+
private:
27+
std::unique_ptr<IOutputStreamWithResponse> Underlying_;
28+
29+
void DoWrite(const void* buf, size_t len) override
30+
{
31+
Underlying_->Write(buf, len);
32+
}
33+
34+
void DoFinish() override
35+
{
36+
Underlying_->Finish();
37+
}
38+
};
39+
40+
////////////////////////////////////////////////////////////////////////////////
41+
42+
IFileFragmentWriterPtr CreateFileFragmentWriter(
43+
const IRawClientPtr& rawClient,
44+
const IRequestRetryPolicyPtr& retryPolicy,
45+
const TDistributedWriteFileCookie& cookie,
46+
const TFileFragmentWriterOptions& options)
47+
{
48+
auto stream = NDetail::RequestWithRetry<std::unique_ptr<IOutputStreamWithResponse>>(
49+
retryPolicy,
50+
[&] (TMutationId /*mutationId*/) {
51+
return rawClient->WriteFileFragment(cookie, options);
52+
}
53+
);
54+
55+
return MakeIntrusive<TFileFragmentWriter>(std::move(stream));
56+
}
57+
58+
////////////////////////////////////////////////////////////////////////////////
59+
60+
} // namespace NYT::NDetail
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
#pragma once
2+
3+
#include <yt/cpp/mapreduce/common/fwd.h>
4+
5+
#include <yt/cpp/mapreduce/interface/io.h>
6+
7+
namespace NYT::NDetail {
8+
9+
////////////////////////////////////////////////////////////////////////////////
10+
11+
IFileFragmentWriterPtr CreateFileFragmentWriter(
12+
const IRawClientPtr& rawClient,
13+
const IRequestRetryPolicyPtr& retryPolicy,
14+
const TDistributedWriteFileCookie& cookie,
15+
const TFileFragmentWriterOptions& options);
16+
17+
////////////////////////////////////////////////////////////////////////////////
18+
19+
} // namespace NYT::NDetail

yt/cpp/mapreduce/client/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ SRCS(
1010
client.cpp
1111
file_reader.cpp
1212
file_writer.cpp
13+
file_fragment_writer.cpp
1314
format_hints.cpp
1415
init.cpp
1516
lock.cpp

0 commit comments

Comments
 (0)