Skip to content

Commit 6322501

Browse files
committed
Implement ReadStateBytes + WriteStateBytes
1 parent a4cc769 commit 6322501

File tree

16 files changed

+1433
-469
lines changed

16 files changed

+1433
-469
lines changed

docs/plugin-protocol/tfplugin6.proto

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,11 @@ service Provider {
390390
// ConfigureStateStore configures the state store, such as S3 connection in the context of already configured provider
391391
rpc ConfigureStateStore(ConfigureStateStore.Request) returns (ConfigureStateStore.Response);
392392

393+
// ReadStateBytes streams byte chunks of a given state file from a state store
394+
rpc ReadStateBytes(ReadStateBytes.Request) returns (stream ReadStateBytes.ResponseChunk);
395+
// WriteStateBytes streams byte chunks of a given state file into a state store
396+
rpc WriteStateBytes(stream WriteStateBytes.RequestChunk) returns (WriteStateBytes.Response);
397+
393398
// GetStates returns a list of all states (i.e. CE workspaces) managed by a given state store
394399
rpc GetStates(GetStates.Request) returns (GetStates.Response);
395400
// DeleteState instructs a given state store to delete a specific state (i.e. a CE workspace)
@@ -917,6 +922,37 @@ message ConfigureStateStore {
917922
}
918923
}
919924

925+
message ReadStateBytes {
926+
message Request {
927+
string type_name = 1;
928+
string state_id = 2;
929+
}
930+
message ResponseChunk {
931+
bytes bytes = 1;
932+
int64 total_length = 2;
933+
StateRange range = 3;
934+
repeated Diagnostic diagnostics = 4;
935+
}
936+
}
937+
938+
message WriteStateBytes {
939+
message RequestChunk {
940+
string type_name = 1;
941+
bytes bytes = 2;
942+
string state_id = 3;
943+
int64 total_length = 4;
944+
StateRange range = 5;
945+
}
946+
message Response {
947+
repeated Diagnostic diagnostics = 1;
948+
}
949+
}
950+
951+
message StateRange {
952+
int64 start = 1;
953+
int64 end = 2;
954+
}
955+
920956
message GetStates {
921957
message Request {
922958
string type_name = 1;

internal/builtin/providers/terraform/provider.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,18 @@ func (p *Provider) ConfigureStateStore(req providers.ConfigureStateStoreRequest)
295295
return resp
296296
}
297297

298+
func (p *Provider) ReadStateBytes(req providers.ReadStateBytesRequest) providers.ReadStateBytesResponse {
299+
var resp providers.ReadStateBytesResponse
300+
resp.Diagnostics.Append(fmt.Errorf("unsupported state store type %q", req.TypeName))
301+
return resp
302+
}
303+
304+
func (p *Provider) WriteStateBytes(req providers.WriteStateBytesRequest) providers.WriteStateBytesResponse {
305+
var resp providers.WriteStateBytesResponse
306+
resp.Diagnostics.Append(fmt.Errorf("unsupported state store type %q", req.TypeName))
307+
return resp
308+
}
309+
298310
func (p *Provider) GetStates(req providers.GetStatesRequest) providers.GetStatesResponse {
299311
var resp providers.GetStatesResponse
300312
resp.Diagnostics.Append(fmt.Errorf("unsupported state store type %q", req.TypeName))

internal/grpcwrap/provider6.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -907,6 +907,14 @@ func (p *provider6) ConfigureStateStore(ctx context.Context, req *tfplugin6.Conf
907907
panic("not implemented")
908908
}
909909

910+
func (p *provider6) ReadStateBytes(req *tfplugin6.ReadStateBytes_Request, srv tfplugin6.Provider_ReadStateBytesServer) error {
911+
panic("not implemented")
912+
}
913+
914+
func (p *provider6) WriteStateBytes(srv tfplugin6.Provider_WriteStateBytesServer) error {
915+
panic("not implemented")
916+
}
917+
910918
func (p *provider6) GetStates(ctx context.Context, req *tfplugin6.GetStates_Request) (*tfplugin6.GetStates_Response, error) {
911919
panic("not implemented")
912920
}

internal/plugin/grpc_provider.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1444,6 +1444,14 @@ func (p *GRPCProvider) ConfigureStateStore(r providers.ConfigureStateStoreReques
14441444
panic("not implemented")
14451445
}
14461446

1447+
func (p *GRPCProvider) ReadStateBytes(r providers.ReadStateBytesRequest) providers.ReadStateBytesResponse {
1448+
panic("not implemented")
1449+
}
1450+
1451+
func (p *GRPCProvider) WriteStateBytes(r providers.WriteStateBytesRequest) providers.WriteStateBytesResponse {
1452+
panic("not implemented")
1453+
}
1454+
14471455
func (p *GRPCProvider) GetStates(r providers.GetStatesRequest) providers.GetStatesResponse {
14481456
panic("not implemented")
14491457
}

internal/plugin6/grpc_provider.go

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package plugin6
55

66
import (
7+
"bytes"
78
"context"
89
"errors"
910
"fmt"
@@ -1505,6 +1506,140 @@ func (p *GRPCProvider) ConfigureStateStore(r providers.ConfigureStateStoreReques
15051506
return resp
15061507
}
15071508

1509+
func (p *GRPCProvider) ReadStateBytes(r providers.ReadStateBytesRequest) (resp providers.ReadStateBytesResponse) {
1510+
logger.Trace("GRPCProvider.v6: ReadStateBytes")
1511+
1512+
schema := p.GetProviderSchema()
1513+
if schema.Diagnostics.HasErrors() {
1514+
resp.Diagnostics = schema.Diagnostics
1515+
return resp
1516+
}
1517+
1518+
if _, ok := schema.StateStores[r.TypeName]; !ok {
1519+
resp.Diagnostics = resp.Diagnostics.Append(fmt.Errorf("unknown state store type %q", r.TypeName))
1520+
return resp
1521+
}
1522+
1523+
protoReq := &proto6.ReadStateBytes_Request{
1524+
TypeName: r.TypeName,
1525+
StateId: r.StateId,
1526+
}
1527+
1528+
// Start the streaming RPC with a context. The context will be cancelled
1529+
// when this function returns, which will stop the stream if it is still
1530+
// running.
1531+
ctx, cancel := context.WithCancel(p.ctx)
1532+
defer cancel()
1533+
1534+
client, err := p.client.ReadStateBytes(ctx, protoReq)
1535+
if err != nil {
1536+
resp.Diagnostics = resp.Diagnostics.Append(grpcErr(err))
1537+
return resp
1538+
}
1539+
1540+
var buf *bytes.Buffer
1541+
var expectedTotalLength int
1542+
for {
1543+
chunk, err := client.Recv()
1544+
if err == io.EOF {
1545+
// End of stream, we're done
1546+
break
1547+
}
1548+
if err != nil {
1549+
resp.Diagnostics = resp.Diagnostics.Append(err)
1550+
break
1551+
}
1552+
resp.Diagnostics = resp.Diagnostics.Append(convert.ProtoToDiagnostics(chunk.Diagnostics))
1553+
if resp.Diagnostics.HasErrors() {
1554+
// If we have errors, we stop processing and return early
1555+
break
1556+
}
1557+
1558+
if expectedTotalLength == 0 {
1559+
expectedTotalLength = int(chunk.TotalLength)
1560+
}
1561+
logger.Trace("GRPCProvider.v6: ReadStateBytes: received chunk for range", chunk.Range)
1562+
1563+
n, err := buf.Write(chunk.Bytes)
1564+
if err != nil {
1565+
resp.Diagnostics = resp.Diagnostics.Append(err)
1566+
break
1567+
}
1568+
logger.Trace("GRPCProvider.v6: ReadStateBytes: read bytes of a chunk", n)
1569+
}
1570+
1571+
logger.Trace("GRPCProvider.v6: ReadStateBytes: received all chunks", buf.Len())
1572+
if buf.Len() != expectedTotalLength {
1573+
err = fmt.Errorf("expected state file of total %d bytes, received %d bytes",
1574+
expectedTotalLength, buf.Len())
1575+
resp.Diagnostics = resp.Diagnostics.Append(err)
1576+
return resp
1577+
}
1578+
resp.Bytes = buf.Bytes()
1579+
1580+
return resp
1581+
}
1582+
1583+
func (p *GRPCProvider) WriteStateBytes(r providers.WriteStateBytesRequest) (resp providers.WriteStateBytesResponse) {
1584+
logger.Trace("GRPCProvider.v6: WriteStateBytes")
1585+
1586+
schema := p.GetProviderSchema()
1587+
if schema.Diagnostics.HasErrors() {
1588+
resp.Diagnostics = schema.Diagnostics
1589+
return resp
1590+
}
1591+
1592+
if _, ok := schema.StateStores[r.TypeName]; !ok {
1593+
resp.Diagnostics = resp.Diagnostics.Append(fmt.Errorf("unknown state store type %q", r.TypeName))
1594+
return resp
1595+
}
1596+
1597+
// Start the streaming RPC with a context. The context will be cancelled
1598+
// when this function returns, which will stop the stream if it is still
1599+
// running.
1600+
ctx, cancel := context.WithCancel(p.ctx)
1601+
defer cancel()
1602+
1603+
// TODO: Configurable chunk size
1604+
chunkSize := 4 * 1_000_000 // 4MB
1605+
1606+
if len(r.Bytes) < chunkSize {
1607+
protoReq := &proto6.WriteStateBytes_RequestChunk{
1608+
TypeName: r.TypeName,
1609+
StateId: r.StateId,
1610+
Bytes: r.Bytes,
1611+
TotalLength: int64(len(r.Bytes)),
1612+
Range: &proto6.StateRange{
1613+
Start: 0,
1614+
End: int64(len(r.Bytes)),
1615+
},
1616+
}
1617+
client, err := p.client.WriteStateBytes(ctx)
1618+
if err != nil {
1619+
resp.Diagnostics = resp.Diagnostics.Append(grpcErr(err))
1620+
return resp
1621+
}
1622+
err = client.Send(protoReq)
1623+
if err != nil {
1624+
resp.Diagnostics = resp.Diagnostics.Append(grpcErr(err))
1625+
return resp
1626+
}
1627+
protoResp, err := client.CloseAndRecv()
1628+
if err != nil {
1629+
resp.Diagnostics = resp.Diagnostics.Append(grpcErr(err))
1630+
return resp
1631+
}
1632+
resp.Diagnostics = resp.Diagnostics.Append(convert.ProtoToDiagnostics(protoResp.Diagnostics))
1633+
if resp.Diagnostics.HasErrors() {
1634+
return resp
1635+
}
1636+
}
1637+
1638+
// TODO: implement chunking for state files larger than chunkSize
1639+
1640+
return resp
1641+
}
1642+
15081643
func (p *GRPCProvider) GetStates(r providers.GetStatesRequest) (resp providers.GetStatesResponse) {
15091644
logger.Trace("GRPCProvider.v6: GetStates")
15101645

internal/plugin6/mock_proto/mock.go

Lines changed: 40 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/provider-simple-v6/provider.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,14 @@ func (s simple) ConfigureStateStore(req providers.ConfigureStateStoreRequest) pr
317317
panic("not implemented")
318318
}
319319

320+
func (s simple) ReadStateBytes(req providers.ReadStateBytesRequest) providers.ReadStateBytesResponse {
321+
panic("not implemented")
322+
}
323+
324+
func (s simple) WriteStateBytes(req providers.WriteStateBytesRequest) providers.WriteStateBytesResponse {
325+
panic("not implemented")
326+
}
327+
320328
func (s simple) GetStates(req providers.GetStatesRequest) providers.GetStatesResponse {
321329
panic("not implemented")
322330
}

internal/provider-simple/provider.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,14 @@ func (s simple) ConfigureStateStore(req providers.ConfigureStateStoreRequest) pr
277277
panic("not implemented")
278278
}
279279

280+
func (s simple) ReadStateBytes(req providers.ReadStateBytesRequest) providers.ReadStateBytesResponse {
281+
panic("not implemented")
282+
}
283+
284+
func (s simple) WriteStateBytes(req providers.WriteStateBytesRequest) providers.WriteStateBytesResponse {
285+
panic("not implemented")
286+
}
287+
280288
func (s simple) GetStates(req providers.GetStatesRequest) providers.GetStatesResponse {
281289
// provider-simple uses protocol version 5, which does not include the RPC that maps to this method
282290
panic("not implemented")

internal/providers/mock.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,14 @@ func (m *Mock) ConfigureStateStore(req ConfigureStateStoreRequest) ConfigureStat
436436
return m.Provider.ConfigureStateStore(req)
437437
}
438438

439+
func (m *Mock) ReadStateBytes(req ReadStateBytesRequest) ReadStateBytesResponse {
440+
return m.Provider.ReadStateBytes(req)
441+
}
442+
443+
func (m *Mock) WriteStateBytes(req WriteStateBytesRequest) WriteStateBytesResponse {
444+
return m.Provider.WriteStateBytes(req)
445+
}
446+
439447
func (m *Mock) GetStates(req GetStatesRequest) GetStatesResponse {
440448
return m.Provider.GetStates(req)
441449
}

internal/providers/provider.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,11 @@ type Interface interface {
123123
// ConfigureStateStore configures the state store, such as S3 connection in the context of already configured provider
124124
ConfigureStateStore(ConfigureStateStoreRequest) ConfigureStateStoreResponse
125125

126+
// ReadStateBytes streams byte chunks of a given state file from a state store
127+
ReadStateBytes(ReadStateBytesRequest) ReadStateBytesResponse
128+
// WriteStateBytes streams byte chunks of a given state file into a state store
129+
WriteStateBytes(WriteStateBytesRequest) WriteStateBytesResponse
130+
126131
// GetStates returns a list of all states (i.e. CE workspaces) managed by a given state store
127132
GetStates(GetStatesRequest) GetStatesResponse
128133
// DeleteState instructs a given state store to delete a specific state (i.e. a CE workspace)
@@ -854,6 +859,34 @@ type ConfigureStateStoreResponse struct {
854859
Diagnostics tfdiags.Diagnostics
855860
}
856861

862+
type ReadStateBytesRequest struct {
863+
// TypeName is the name of the state store to read state from
864+
TypeName string
865+
// StateId is the ID of a state file to read
866+
StateId string
867+
}
868+
869+
type ReadStateBytesResponse struct {
870+
// Bytes represents all received bytes of the given state file
871+
Bytes []byte
872+
// Diagnostics contains any warnings or errors from the method call.
873+
Diagnostics tfdiags.Diagnostics
874+
}
875+
876+
type WriteStateBytesRequest struct {
877+
// TypeName is the name of the state store to write state to
878+
TypeName string
879+
// Bytes represents all bytes of the given state file to write
880+
Bytes []byte
881+
// StateId is the ID of a state file to write
882+
StateId string
883+
}
884+
885+
type WriteStateBytesResponse struct {
886+
// Diagnostics contains any warnings or errors from the method call.
887+
Diagnostics tfdiags.Diagnostics
888+
}
889+
857890
type GetStatesRequest struct {
858891
// TypeName is the name of the state store to request the list of states from
859892
TypeName string

0 commit comments

Comments
 (0)