Skip to content

Commit e5b0991

Browse files
committed
Implement ReadStateBytes + WriteStateBytes
1 parent f8b0051 commit e5b0991

File tree

16 files changed

+1433
-469
lines changed

16 files changed

+1433
-469
lines changed

docs/plugin-protocol/tfplugin6.proto

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,11 @@ service Provider {
390390
// ConfigureStateStore configures the state store, such as S3 connection in the context of already configured provider
391391
rpc ConfigureStateStore(ConfigureStateStore.Request) returns (ConfigureStateStore.Response);
392392

393+
// ReadStateBytes streams byte chunks of a given state file from a state store
394+
rpc ReadStateBytes(ReadStateBytes.Request) returns (stream ReadStateBytes.ResponseChunk);
395+
// WriteStateBytes streams byte chunks of a given state file into a state store
396+
rpc WriteStateBytes(stream WriteStateBytes.RequestChunk) returns (WriteStateBytes.Response);
397+
393398
// GetStates returns a list of all states (i.e. CE workspaces) managed by a given state store
394399
rpc GetStates(GetStates.Request) returns (GetStates.Response);
395400
// DeleteState instructs a given state store to delete a specific state (i.e. a CE workspace)
@@ -917,6 +922,37 @@ message ConfigureStateStore {
917922
}
918923
}
919924

925+
message ReadStateBytes {
926+
message Request {
927+
string type_name = 1;
928+
string state_id = 2;
929+
}
930+
message ResponseChunk {
931+
bytes bytes = 1;
932+
int64 total_length = 2;
933+
StateRange range = 3;
934+
repeated Diagnostic diagnostics = 4;
935+
}
936+
}
937+
938+
message WriteStateBytes {
939+
message RequestChunk {
940+
string type_name = 1;
941+
bytes bytes = 2;
942+
string state_id = 3;
943+
int64 total_length = 4;
944+
StateRange range = 5;
945+
}
946+
message Response {
947+
repeated Diagnostic diagnostics = 1;
948+
}
949+
}
950+
951+
message StateRange {
952+
int64 start = 1;
953+
int64 end = 2;
954+
}
955+
920956
message GetStates {
921957
message Request {
922958
string type_name = 1;

internal/builtin/providers/terraform/provider.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,18 @@ func (p *Provider) ConfigureStateStore(req providers.ConfigureStateStoreRequest)
295295
return resp
296296
}
297297

298+
func (p *Provider) ReadStateBytes(req providers.ReadStateBytesRequest) providers.ReadStateBytesResponse {
299+
var resp providers.ReadStateBytesResponse
300+
resp.Diagnostics.Append(fmt.Errorf("unsupported state store type %q", req.TypeName))
301+
return resp
302+
}
303+
304+
func (p *Provider) WriteStateBytes(req providers.WriteStateBytesRequest) providers.WriteStateBytesResponse {
305+
var resp providers.WriteStateBytesResponse
306+
resp.Diagnostics.Append(fmt.Errorf("unsupported state store type %q", req.TypeName))
307+
return resp
308+
}
309+
298310
func (p *Provider) GetStates(req providers.GetStatesRequest) providers.GetStatesResponse {
299311
var resp providers.GetStatesResponse
300312
resp.Diagnostics.Append(fmt.Errorf("unsupported state store type %q", req.TypeName))

internal/grpcwrap/provider6.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -907,6 +907,14 @@ func (p *provider6) ConfigureStateStore(ctx context.Context, req *tfplugin6.Conf
907907
panic("not implemented")
908908
}
909909

910+
func (p *provider6) ReadStateBytes(req *tfplugin6.ReadStateBytes_Request, srv tfplugin6.Provider_ReadStateBytesServer) error {
911+
panic("not implemented")
912+
}
913+
914+
func (p *provider6) WriteStateBytes(srv tfplugin6.Provider_WriteStateBytesServer) error {
915+
panic("not implemented")
916+
}
917+
910918
func (p *provider6) GetStates(ctx context.Context, req *tfplugin6.GetStates_Request) (*tfplugin6.GetStates_Response, error) {
911919
panic("not implemented")
912920
}

internal/plugin/grpc_provider.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1458,6 +1458,14 @@ func (p *GRPCProvider) ConfigureStateStore(r providers.ConfigureStateStoreReques
14581458
panic("not implemented")
14591459
}
14601460

1461+
func (p *GRPCProvider) ReadStateBytes(r providers.ReadStateBytesRequest) providers.ReadStateBytesResponse {
1462+
panic("not implemented")
1463+
}
1464+
1465+
func (p *GRPCProvider) WriteStateBytes(r providers.WriteStateBytesRequest) providers.WriteStateBytesResponse {
1466+
panic("not implemented")
1467+
}
1468+
14611469
func (p *GRPCProvider) GetStates(r providers.GetStatesRequest) providers.GetStatesResponse {
14621470
panic("not implemented")
14631471
}

internal/plugin6/grpc_provider.go

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package plugin6
55

66
import (
7+
"bytes"
78
"context"
89
"errors"
910
"fmt"
@@ -1518,6 +1519,140 @@ func (p *GRPCProvider) ConfigureStateStore(r providers.ConfigureStateStoreReques
15181519
return resp
15191520
}
15201521

1522+
func (p *GRPCProvider) ReadStateBytes(r providers.ReadStateBytesRequest) (resp providers.ReadStateBytesResponse) {
1523+
logger.Trace("GRPCProvider.v6: ReadStateBytes")
1524+
1525+
schema := p.GetProviderSchema()
1526+
if schema.Diagnostics.HasErrors() {
1527+
resp.Diagnostics = schema.Diagnostics
1528+
return resp
1529+
}
1530+
1531+
if _, ok := schema.StateStores[r.TypeName]; !ok {
1532+
resp.Diagnostics = resp.Diagnostics.Append(fmt.Errorf("unknown state store type %q", r.TypeName))
1533+
return resp
1534+
}
1535+
1536+
protoReq := &proto6.ReadStateBytes_Request{
1537+
TypeName: r.TypeName,
1538+
StateId: r.StateId,
1539+
}
1540+
1541+
// Start the streaming RPC with a context. The context will be cancelled
1542+
// when this function returns, which will stop the stream if it is still
1543+
// running.
1544+
ctx, cancel := context.WithCancel(p.ctx)
1545+
defer cancel()
1546+
1547+
client, err := p.client.ReadStateBytes(ctx, protoReq)
1548+
if err != nil {
1549+
resp.Diagnostics = resp.Diagnostics.Append(grpcErr(err))
1550+
return resp
1551+
}
1552+
1553+
var buf *bytes.Buffer
1554+
var expectedTotalLength int
1555+
for {
1556+
chunk, err := client.Recv()
1557+
if err == io.EOF {
1558+
// End of stream, we're done
1559+
break
1560+
}
1561+
if err != nil {
1562+
resp.Diagnostics = resp.Diagnostics.Append(err)
1563+
break
1564+
}
1565+
resp.Diagnostics = resp.Diagnostics.Append(convert.ProtoToDiagnostics(chunk.Diagnostics))
1566+
if resp.Diagnostics.HasErrors() {
1567+
// If we have errors, we stop processing and return early
1568+
break
1569+
}
1570+
1571+
if expectedTotalLength == 0 {
1572+
expectedTotalLength = int(chunk.TotalLength)
1573+
}
1574+
logger.Trace("GRPCProvider.v6: ReadStateBytes: received chunk for range", chunk.Range)
1575+
1576+
n, err := buf.Write(chunk.Bytes)
1577+
if err != nil {
1578+
resp.Diagnostics = resp.Diagnostics.Append(err)
1579+
break
1580+
}
1581+
logger.Trace("GRPCProvider.v6: ReadStateBytes: read bytes of a chunk", n)
1582+
}
1583+
1584+
logger.Trace("GRPCProvider.v6: ReadStateBytes: received all chunks", buf.Len())
1585+
if buf.Len() != expectedTotalLength {
1586+
err = fmt.Errorf("expected state file of total %d bytes, received %d bytes",
1587+
expectedTotalLength, buf.Len())
1588+
resp.Diagnostics = resp.Diagnostics.Append(err)
1589+
return resp
1590+
}
1591+
resp.Bytes = buf.Bytes()
1592+
1593+
return resp
1594+
}
1595+
1596+
func (p *GRPCProvider) WriteStateBytes(r providers.WriteStateBytesRequest) (resp providers.WriteStateBytesResponse) {
1597+
logger.Trace("GRPCProvider.v6: WriteStateBytes")
1598+
1599+
schema := p.GetProviderSchema()
1600+
if schema.Diagnostics.HasErrors() {
1601+
resp.Diagnostics = schema.Diagnostics
1602+
return resp
1603+
}
1604+
1605+
if _, ok := schema.StateStores[r.TypeName]; !ok {
1606+
resp.Diagnostics = resp.Diagnostics.Append(fmt.Errorf("unknown state store type %q", r.TypeName))
1607+
return resp
1608+
}
1609+
1610+
// Start the streaming RPC with a context. The context will be cancelled
1611+
// when this function returns, which will stop the stream if it is still
1612+
// running.
1613+
ctx, cancel := context.WithCancel(p.ctx)
1614+
defer cancel()
1615+
1616+
// TODO: Configurable chunk size
1617+
chunkSize := 4 * 1_000_000 // 4MB
1618+
1619+
if len(r.Bytes) < chunkSize {
1620+
protoReq := &proto6.WriteStateBytes_RequestChunk{
1621+
TypeName: r.TypeName,
1622+
StateId: r.StateId,
1623+
Bytes: r.Bytes,
1624+
TotalLength: int64(len(r.Bytes)),
1625+
Range: &proto6.StateRange{
1626+
Start: 0,
1627+
End: int64(len(r.Bytes)),
1628+
},
1629+
}
1630+
client, err := p.client.WriteStateBytes(ctx)
1631+
if err != nil {
1632+
resp.Diagnostics = resp.Diagnostics.Append(grpcErr(err))
1633+
return resp
1634+
}
1635+
err = client.Send(protoReq)
1636+
if err != nil {
1637+
resp.Diagnostics = resp.Diagnostics.Append(grpcErr(err))
1638+
return resp
1639+
}
1640+
protoResp, err := client.CloseAndRecv()
1641+
if err != nil {
1642+
resp.Diagnostics = resp.Diagnostics.Append(grpcErr(err))
1643+
return resp
1644+
}
1645+
resp.Diagnostics = resp.Diagnostics.Append(convert.ProtoToDiagnostics(protoResp.Diagnostics))
1646+
if resp.Diagnostics.HasErrors() {
1647+
return resp
1648+
}
1649+
}
1650+
1651+
// TODO: implement chunking for state files larger than chunkSize
1652+
1653+
return resp
1654+
}
1655+
15211656
func (p *GRPCProvider) GetStates(r providers.GetStatesRequest) (resp providers.GetStatesResponse) {
15221657
logger.Trace("GRPCProvider.v6: GetStates")
15231658

internal/plugin6/mock_proto/mock.go

Lines changed: 40 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/provider-simple-v6/provider.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,14 @@ func (s simple) ConfigureStateStore(req providers.ConfigureStateStoreRequest) pr
315315
panic("not implemented")
316316
}
317317

318+
func (s simple) ReadStateBytes(req providers.ReadStateBytesRequest) providers.ReadStateBytesResponse {
319+
panic("not implemented")
320+
}
321+
322+
func (s simple) WriteStateBytes(req providers.WriteStateBytesRequest) providers.WriteStateBytesResponse {
323+
panic("not implemented")
324+
}
325+
318326
func (s simple) GetStates(req providers.GetStatesRequest) providers.GetStatesResponse {
319327
panic("not implemented")
320328
}

internal/provider-simple/provider.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,14 @@ func (s simple) ConfigureStateStore(req providers.ConfigureStateStoreRequest) pr
275275
panic("not implemented")
276276
}
277277

278+
func (s simple) ReadStateBytes(req providers.ReadStateBytesRequest) providers.ReadStateBytesResponse {
279+
panic("not implemented")
280+
}
281+
282+
func (s simple) WriteStateBytes(req providers.WriteStateBytesRequest) providers.WriteStateBytesResponse {
283+
panic("not implemented")
284+
}
285+
278286
func (s simple) GetStates(req providers.GetStatesRequest) providers.GetStatesResponse {
279287
// provider-simple uses protocol version 5, which does not include the RPC that maps to this method
280288
panic("not implemented")

internal/providers/mock.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,14 @@ func (m *Mock) ConfigureStateStore(req ConfigureStateStoreRequest) ConfigureStat
436436
return m.Provider.ConfigureStateStore(req)
437437
}
438438

439+
func (m *Mock) ReadStateBytes(req ReadStateBytesRequest) ReadStateBytesResponse {
440+
return m.Provider.ReadStateBytes(req)
441+
}
442+
443+
func (m *Mock) WriteStateBytes(req WriteStateBytesRequest) WriteStateBytesResponse {
444+
return m.Provider.WriteStateBytes(req)
445+
}
446+
439447
func (m *Mock) GetStates(req GetStatesRequest) GetStatesResponse {
440448
return m.Provider.GetStates(req)
441449
}

internal/providers/provider.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,11 @@ type Interface interface {
123123
// ConfigureStateStore configures the state store, such as S3 connection in the context of already configured provider
124124
ConfigureStateStore(ConfigureStateStoreRequest) ConfigureStateStoreResponse
125125

126+
// ReadStateBytes streams byte chunks of a given state file from a state store
127+
ReadStateBytes(ReadStateBytesRequest) ReadStateBytesResponse
128+
// WriteStateBytes streams byte chunks of a given state file into a state store
129+
WriteStateBytes(WriteStateBytesRequest) WriteStateBytesResponse
130+
126131
// GetStates returns a list of all states (i.e. CE workspaces) managed by a given state store
127132
GetStates(GetStatesRequest) GetStatesResponse
128133
// DeleteState instructs a given state store to delete a specific state (i.e. a CE workspace)
@@ -841,6 +846,34 @@ type ConfigureStateStoreResponse struct {
841846
Diagnostics tfdiags.Diagnostics
842847
}
843848

849+
type ReadStateBytesRequest struct {
850+
// TypeName is the name of the state store to read state from
851+
TypeName string
852+
// StateId is the ID of a state file to read
853+
StateId string
854+
}
855+
856+
type ReadStateBytesResponse struct {
857+
// Bytes represents all received bytes of the given state file
858+
Bytes []byte
859+
// Diagnostics contains any warnings or errors from the method call.
860+
Diagnostics tfdiags.Diagnostics
861+
}
862+
863+
type WriteStateBytesRequest struct {
864+
// TypeName is the name of the state store to write state to
865+
TypeName string
866+
// Bytes represents all bytes of the given state file to write
867+
Bytes []byte
868+
// StateId is the ID of a state file to write
869+
StateId string
870+
}
871+
872+
type WriteStateBytesResponse struct {
873+
// Diagnostics contains any warnings or errors from the method call.
874+
Diagnostics tfdiags.Diagnostics
875+
}
876+
844877
type GetStatesRequest struct {
845878
// TypeName is the name of the state store to request the list of states from
846879
TypeName string

0 commit comments

Comments
 (0)