Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using Orleans.Runtime;
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Threading.Tasks;

namespace Orleans.Reminders.DynamoDB
Expand All @@ -20,6 +21,11 @@ internal sealed partial class DynamoDBReminderTable : IReminderTable
private const string SERVICE_ID_PROPERTY_NAME = "ServiceId";
private const string START_TIME_PROPERTY_NAME = "StartTime";
private const string PERIOD_PROPERTY_NAME = "Period";
private const string CRON_EXPRESSION_PROPERTY_NAME = "CronExpression";
private const string NEXT_DUE_UTC_PROPERTY_NAME = "NextDueUtc";
private const string LAST_FIRE_UTC_PROPERTY_NAME = "LastFireUtc";
private const string PRIORITY_PROPERTY_NAME = "Priority";
private const string ACTION_PROPERTY_NAME = "Action";
private const string GRAIN_HASH_PROPERTY_NAME = "GrainHash";
private const string REMINDER_ID_PROPERTY_NAME = "ReminderId";
private const string ETAG_PROPERTY_NAME = "ETag";
Expand Down Expand Up @@ -219,12 +225,73 @@ private ReminderEntry Resolve(Dictionary<string, AttributeValue> item)
{
ETag = item[ETAG_PROPERTY_NAME].N,
GrainId = GrainId.Parse(item[GRAIN_REFERENCE_PROPERTY_NAME].S),
Period = TimeSpan.Parse(item[PERIOD_PROPERTY_NAME].S),
Period = TimeSpan.Parse(item[PERIOD_PROPERTY_NAME].S, CultureInfo.InvariantCulture),
CronExpression = ReadOptionalString(item, CRON_EXPRESSION_PROPERTY_NAME),
NextDueUtc = ReadOptionalDateTime(item, NEXT_DUE_UTC_PROPERTY_NAME),
LastFireUtc = ReadOptionalDateTime(item, LAST_FIRE_UTC_PROPERTY_NAME),
Priority = ReadPriority(item),
Action = ReadAction(item),
ReminderName = item[REMINDER_NAME_PROPERTY_NAME].S,
StartAt = DateTime.Parse(item[START_TIME_PROPERTY_NAME].S)
StartAt = DateTime.Parse(item[START_TIME_PROPERTY_NAME].S, CultureInfo.InvariantCulture, DateTimeStyles.RoundtripKind)
};
}

private static string ReadOptionalString(Dictionary<string, AttributeValue> item, string propertyName)
=> item.TryGetValue(propertyName, out var value) ? value.S : null;

private static DateTime? ReadOptionalDateTime(Dictionary<string, AttributeValue> item, string propertyName)
{
if (!item.TryGetValue(propertyName, out var value) || string.IsNullOrWhiteSpace(value.S))
{
return null;
}

return DateTime.Parse(value.S, CultureInfo.InvariantCulture, DateTimeStyles.RoundtripKind);
}

private static ReminderPriority ReadPriority(Dictionary<string, AttributeValue> item)
{
if (!TryReadInt32(item, PRIORITY_PROPERTY_NAME, out var value))
{
return ReminderPriority.Normal;
}

return ParsePriority(value);
}

private static MissedReminderAction ReadAction(Dictionary<string, AttributeValue> item)
{
if (!TryReadInt32(item, ACTION_PROPERTY_NAME, out var value))
{
return MissedReminderAction.Skip;
}

return ParseAction(value);
}

private static bool TryReadInt32(Dictionary<string, AttributeValue> item, string propertyName, out int value)
{
value = default;
return item.TryGetValue(propertyName, out var attributeValue)
&& !string.IsNullOrWhiteSpace(attributeValue.N)
&& int.TryParse(attributeValue.N, NumberStyles.Integer, CultureInfo.InvariantCulture, out value);
}

private static ReminderPriority ParsePriority(int value) => value switch
{
(int)ReminderPriority.High => ReminderPriority.High,
(int)ReminderPriority.Normal => ReminderPriority.Normal,
_ => ReminderPriority.Normal,
};

private static MissedReminderAction ParseAction(int value) => value switch
{
(int)MissedReminderAction.FireImmediately => MissedReminderAction.FireImmediately,
(int)MissedReminderAction.Skip => MissedReminderAction.Skip,
(int)MissedReminderAction.Notify => MissedReminderAction.Notify,
_ => MissedReminderAction.Skip,
};

/// <summary>
/// Remove one row from the reminder table
/// </summary>
Expand Down Expand Up @@ -313,12 +380,29 @@ public async Task<string> UpsertRow(ReminderEntry entry)
{ GRAIN_HASH_PROPERTY_NAME, new AttributeValue { N = entry.GrainId.GetUniformHashCode().ToString() } },
{ SERVICE_ID_PROPERTY_NAME, new AttributeValue(this.serviceId) },
{ GRAIN_REFERENCE_PROPERTY_NAME, new AttributeValue( entry.GrainId.ToString()) },
{ PERIOD_PROPERTY_NAME, new AttributeValue(entry.Period.ToString()) },
{ START_TIME_PROPERTY_NAME, new AttributeValue(entry.StartAt.ToString()) },
{ PERIOD_PROPERTY_NAME, new AttributeValue(entry.Period.ToString("c", CultureInfo.InvariantCulture)) },
{ START_TIME_PROPERTY_NAME, new AttributeValue(entry.StartAt.ToString("O", CultureInfo.InvariantCulture)) },
{ REMINDER_NAME_PROPERTY_NAME, new AttributeValue(entry.ReminderName) },
{ ETAG_PROPERTY_NAME, new AttributeValue { N = Random.Shared.Next().ToString() } }
{ PRIORITY_PROPERTY_NAME, new AttributeValue { N = ((int)entry.Priority).ToString(CultureInfo.InvariantCulture) } },
{ ACTION_PROPERTY_NAME, new AttributeValue { N = ((int)entry.Action).ToString(CultureInfo.InvariantCulture) } },
{ ETAG_PROPERTY_NAME, new AttributeValue { N = Random.Shared.Next().ToString(CultureInfo.InvariantCulture) } }
};

if (!string.IsNullOrWhiteSpace(entry.CronExpression))
{
fields[CRON_EXPRESSION_PROPERTY_NAME] = new AttributeValue(entry.CronExpression);
}

if (entry.NextDueUtc is { } nextDueUtc)
{
fields[NEXT_DUE_UTC_PROPERTY_NAME] = new AttributeValue(nextDueUtc.ToString("O"));
}

if (entry.LastFireUtc is { } lastFireUtc)
{
fields[LAST_FIRE_UTC_PROPERTY_NAME] = new AttributeValue(lastFireUtc.ToString("O"));
}

try
{
LogDebugUpsertRow(logger, entry, entry.ETag);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
-- Run this migration for upgrading MySQL reminder tables created before 10.0.0.

ALTER TABLE OrleansRemindersTable
ADD COLUMN IF NOT EXISTS CronExpression NVARCHAR(200) NULL,
ADD COLUMN IF NOT EXISTS NextDueUtc DATETIME NULL,
ADD COLUMN IF NOT EXISTS LastFireUtc DATETIME NULL,
ADD COLUMN IF NOT EXISTS Priority TINYINT NOT NULL DEFAULT 0,
ADD COLUMN IF NOT EXISTS Action TINYINT NOT NULL DEFAULT 0;

CREATE INDEX IF NOT EXISTS IX_RemindersTable_NextDueUtc_Priority
ON OrleansRemindersTable(ServiceId, NextDueUtc, Priority);

UPDATE OrleansQuery
SET QueryText = '
INSERT INTO OrleansRemindersTable
(
ServiceId,
GrainId,
ReminderName,
StartTime,
Period,
CronExpression,
NextDueUtc,
LastFireUtc,
Priority,
Action,
GrainHash,
Version
)
VALUES
(
@ServiceId,
@GrainId,
@ReminderName,
@StartTime,
@Period,
@CronExpression,
@NextDueUtc,
@LastFireUtc,
@Priority,
@Action,
@GrainHash,
last_insert_id(0)
)
ON DUPLICATE KEY
UPDATE
StartTime = @StartTime,
Period = @Period,
CronExpression = @CronExpression,
NextDueUtc = @NextDueUtc,
LastFireUtc = @LastFireUtc,
Priority = @Priority,
Action = @Action,
GrainHash = @GrainHash,
Version = last_insert_id(Version+1);


SELECT last_insert_id() AS Version;
'
WHERE QueryKey = 'UpsertReminderRowKey';

UPDATE OrleansQuery
SET QueryText = '
SELECT
GrainId,
ReminderName,
StartTime,
Period,
CronExpression,
NextDueUtc,
LastFireUtc,
Priority,
Action,
Version
FROM OrleansRemindersTable
WHERE
ServiceId = @ServiceId AND @ServiceId IS NOT NULL
AND GrainId = @GrainId AND @GrainId IS NOT NULL;
'
WHERE QueryKey = 'ReadReminderRowsKey';

UPDATE OrleansQuery
SET QueryText = '
SELECT
GrainId,
ReminderName,
StartTime,
Period,
CronExpression,
NextDueUtc,
LastFireUtc,
Priority,
Action,
Version
FROM OrleansRemindersTable
WHERE
ServiceId = @ServiceId AND @ServiceId IS NOT NULL
AND GrainId = @GrainId AND @GrainId IS NOT NULL
AND ReminderName = @ReminderName AND @ReminderName IS NOT NULL;
'
WHERE QueryKey = 'ReadReminderRowKey';

UPDATE OrleansQuery
SET QueryText = '
SELECT
GrainId,
ReminderName,
StartTime,
Period,
CronExpression,
NextDueUtc,
LastFireUtc,
Priority,
Action,
Version
FROM OrleansRemindersTable
WHERE
ServiceId = @ServiceId AND @ServiceId IS NOT NULL
AND GrainHash > @BeginHash AND @BeginHash IS NOT NULL
AND GrainHash <= @EndHash AND @EndHash IS NOT NULL;
'
WHERE QueryKey = 'ReadRangeRows1Key';

UPDATE OrleansQuery
SET QueryText = '
SELECT
GrainId,
ReminderName,
StartTime,
Period,
CronExpression,
NextDueUtc,
LastFireUtc,
Priority,
Action,
Version
FROM OrleansRemindersTable
WHERE
ServiceId = @ServiceId AND @ServiceId IS NOT NULL
AND ((GrainHash > @BeginHash AND @BeginHash IS NOT NULL)
OR (GrainHash <= @EndHash AND @EndHash IS NOT NULL));
'
WHERE QueryKey = 'ReadRangeRows2Key';
Loading
Loading